36 yCError(MCASTCARRIER,
"No memory for McastCarrier::caster");
55 bool elect = isElect();
59 if (peer ==
nullptr) {
64 yCError(MCASTCARRIER,
"Something went wrong during the shift of the election...");
90 bool ok = defaultSendHeader(proto);
95 yCDebug(MCASTCARRIER,
"Adding extra mcast header");
102 if (elect !=
nullptr) {
103 yCDebug(MCASTCARRIER,
"picking up peer mcast name");
109 Contact target(
"...",
"mcast",
"...", 0);
120 constexpr size_t ipv4_size = 4;
121 int ip[ipv4_size] = {224, 3, 1, 1};
122 constexpr int default_port = 11000;
123 int port = default_port;
126 if (ss.size() != ipv4_size) {
129 for (
size_t i = 0; i < ipv4_size; ++i) {
130 ip[i] = yarp::conf::numeric::from_string<int>(ss[i]);
137 yCError(MCASTCARRIER,
"Name server not responding helpfully, setting mcast name arbitrarily.");
138 yCError(MCASTCARRIER,
"Only a single mcast address supported in this mode.");
139 addr =
Contact(
"/tmp/mcast",
"mcast",
"224.3.1.1", 11000);
143 for (
size_t i = 0; i < ipv4_size; i++) {
144 ((
unsigned char*)block.
get())[i] = (
unsigned char)ip[i];
146 block.
get()[5] = (char)(port % 256);
147 block.
get()[4] = (char)(port / 256);
155 yCDebug(MCASTCARRIER,
"Expecting extra mcast header");
158 if ((
size_t)len != block.
length()) {
159 yCError(MCASTCARRIER,
"problem with MCAST header");
163 constexpr size_t ipv4_size = 4;
164 int ip[] = {0, 0, 0, 0};
167 auto* base = (
unsigned char*)block.
get();
169 for (
size_t i = 0; i < ipv4_size; i++) {
175 sprintf(buf,
"%d", ip[i]);
178 port = 256 * base[4] + base[5];
179 Contact addr(
"mcast", add, port);
180 yCDebug(MCASTCARRIER,
"got mcast header %s", addr.
toURI().c_str());
190 yCAssert(MCASTCARRIER, stream !=
nullptr);
207 key += local.getHost();
209 yCDebug(MCASTCARRIER,
"multicast key: %s", key.c_str());
214 if (isElect() || !sender) {
215 ok = stream->join(mcastAddress, sender, local);
228 return becomeMcast(proto,
false);
234 return becomeMcast(proto,
true);
239 getCaster().add(key,
this);
244 getCaster().remove(key,
this);
249 void* elect = getCaster().getElect(key);
251 return elect ==
this || elect ==
nullptr;
256 if (stream !=
nullptr) {
257 return stream->join(mcastAddress,
true, local);
A base class for connection types (tcp, mcast, shmem, ...) which are called carriers in YARP.
The basic state of a connection - route, streams in use, etc.
OutputStream & os()
Shorthand for getOutputStream()
virtual const Route & getRoute() const =0
Get the route associated with this connection.
InputStream & is()
Shorthand for getInputStream()
virtual void takeStreams(TwoWayStream *streams)=0
Provide streams to be used with the connection.
virtual TwoWayStream & getStreams()=0
Access the streams associated with the connection.
Pick one of a set of peers to be "active".
An abstraction for a block of bytes, with optional responsibility for allocating/destroying that bloc...
const Bytes & bytes() const
static void unlock()
Call post() on a global mutual-exclusion semaphore allocated by YARP.
static Contact unregisterName(const std::string &name)
Removes the registration for a name from the name server.
static Contact registerContact(const Contact &contact)
Register contact information with the name server.
static bool setProperty(const char *name, const char *key, const Value &value)
Names registered with the nameserver can have arbitrary key->value properties associated with them.
static void lock()
Call wait() on a global mutual-exclusion semaphore allocated by YARP.
virtual void write(char ch)
Write a single byte to the stream.
const std::string & getFromName() const
Get the source of the route.
virtual const Contact & getRemoteAddress() const =0
Get the address of the remote side of the stream.
virtual const Contact & getLocalAddress() const =0
Get the address of the local side of the stream.
A single value (typically within a Bottle).
A stream abstraction for datagram communication.
Communicating between two ports via MCAST.
bool respondToHeader(ConnectionState &proto) override
Respond to the header.
bool expectExtraHeader(ConnectionState &proto) override
Receive any carrier-specific header.
bool expectReplyToHeader(ConnectionState &proto) override
Process reply to header, if one is expected for this carrier.
bool takeElection()
takeElection, this function is called when the elect mcast carrier dies and pass the write buffers to...
void addSender(const std::string &key)
std::string getName() const override
Get the name of this connection type ("tcp", "mcast", "shmem", ...)
void removeSender(const std::string &key)
Carrier * create() const override
Factory method.
static ElectionOf< PeerRecord< McastCarrier > > * caster
bool becomeMcast(ConnectionState &proto, bool sender)
int getSpecifierCode() const override
bool isActive() const override
Check if carrier is alive and error free.
bool isBroadcast() const override
Check if this carrier uses a broadcast mechanism.
bool sendHeader(ConnectionState &proto) override
Write a header appropriate to the carrier to the connection, followed by any carrier-specific data.
#define yCError(component,...)
#define yCAssert(component, x)
#define yCDebug(component,...)
#define YARP_OS_LOG_COMPONENT(name, name_string)
ContainerT split(const typename ContainerT::value_type &s, std::basic_regex< typename ContainerT::value_type::value_type > regex)
Utility to split a string by a separator, into a vector of strings.
The components from which ports and connections are built.
An interface to the operating system, including Port based communication.