YARP
Yet Another Robot Platform
McastCarrier.cpp
Go to the documentation of this file.
1 /*
2  * SPDX-FileCopyrightText: 2006-2021 Istituto Italiano di Tecnologia (IIT)
3  * SPDX-FileCopyrightText: 2006-2010 RobotCub Consortium
4  * SPDX-License-Identifier: BSD-3-Clause
5  */
6 
8 
9 #include <yarp/conf/system.h>
10 #include <yarp/conf/string.h>
11 #include <yarp/conf/numeric.h>
12 
14 #include <yarp/os/Network.h>
15 #include <yarp/os/Route.h>
17 
18 #include <cstdlib>
19 
20 using namespace yarp::os::impl;
21 using namespace yarp::os;
22 
23 namespace {
24 YARP_OS_LOG_COMPONENT(MCASTCARRIER, "yarp.os.impl.McastCarrier")
25 } // namespace
26 
28 
30 {
32  if (caster == nullptr) {
35  if (caster == nullptr) {
36  yCError(MCASTCARRIER, "No memory for McastCarrier::caster");
37  std::exit(1);
38  }
39  } else {
41  }
42  return *caster;
43 }
44 
45 
47 {
48  stream = nullptr;
49  key = "";
50 }
51 
53 {
54  if (!key.empty()) {
55  bool elect = isElect();
56  removeSender(key);
57  if (elect) {
58  McastCarrier* peer = getCaster().getElect(key);
59  if (peer == nullptr) {
60  // time to remove registration
61  NetworkBase::unregisterName(mcastName);
62  } else {
63  if (!peer->takeElection()) {
64  yCError(MCASTCARRIER, "Something went wrong during the shift of the election...");
65  }
66  }
67  }
68  }
69 }
70 
72 {
73  return new McastCarrier();
74 }
75 
77 {
78  return "mcast";
79 }
80 
82 {
83  return 1;
84 }
85 
86 
88 {
89  // need to do more than the default
90  bool ok = defaultSendHeader(proto);
91  if (!ok) {
92  return false;
93  }
94 
95  yCDebug(MCASTCARRIER, "Adding extra mcast header");
96 
97  Contact addr;
98 
99  Contact alt = proto.getStreams().getLocalAddress();
100  std::string altKey = proto.getRoute().getFromName() + "/net=" + alt.getHost();
101  McastCarrier* elect = getCaster().getElect(altKey);
102  if (elect != nullptr) {
103  yCDebug(MCASTCARRIER, "picking up peer mcast name");
104  addr = elect->mcastAddress;
105  mcastName = elect->mcastName;
106  } else {
107 
108  // fetch an mcast address
109  Contact target("...", "mcast", "...", 0);
110  addr = NetworkBase::registerContact(target);
111  mcastName = addr.getRegName();
112  if (addr.isValid()) {
113  // mark owner of mcast address
115  "owns",
116  Value(mcastName));
117  }
118  }
119 
120  constexpr size_t ipv4_size = 4;
121  int ip[ipv4_size] = {224, 3, 1, 1};
122  constexpr int default_port = 11000;
123  int port = default_port;
124  if (addr.isValid()) {
125  auto ss = yarp::conf::string::split(addr.getHost(), '.');
126  if (ss.size() != ipv4_size) {
127  addr = Contact();
128  } else {
129  for (size_t i = 0; i < ipv4_size; ++i) {
130  ip[i] = yarp::conf::numeric::from_string<int>(ss[i]);
131  }
132  port = addr.getPort();
133  }
134  }
135 
136  if (!addr.isValid()) {
137  yCError(MCASTCARRIER, "Name server not responding helpfully, setting mcast name arbitrarily.");
138  yCError(MCASTCARRIER, "Only a single mcast address supported in this mode.");
139  addr = Contact("/tmp/mcast", "mcast", "224.3.1.1", 11000);
140  }
141 
142  ManagedBytes block(6);
143  for (size_t i = 0; i < ipv4_size; i++) {
144  ((unsigned char*)block.get())[i] = (unsigned char)ip[i];
145  }
146  block.get()[5] = (char)(port % 256);
147  block.get()[4] = (char)(port / 256);
148  proto.os().write(block.bytes());
149  mcastAddress = addr;
150  return true;
151 }
152 
154 {
155  yCDebug(MCASTCARRIER, "Expecting extra mcast header");
156  ManagedBytes block(6);
157  yarp::conf::ssize_t len = proto.is().readFull(block.bytes());
158  if ((size_t)len != block.length()) {
159  yCError(MCASTCARRIER, "problem with MCAST header");
160  return false;
161  }
162 
163  constexpr size_t ipv4_size = 4;
164  int ip[] = {0, 0, 0, 0};
165  int port = -1;
166 
167  auto* base = (unsigned char*)block.get();
168  std::string add;
169  for (size_t i = 0; i < ipv4_size; i++) {
170  ip[i] = base[i];
171  if (i != 0) {
172  add += ".";
173  }
174  char buf[100];
175  sprintf(buf, "%d", ip[i]);
176  add += buf;
177  }
178  port = 256 * base[4] + base[5];
179  Contact addr("mcast", add, port);
180  yCDebug(MCASTCARRIER, "got mcast header %s", addr.toURI().c_str());
181  mcastAddress = addr;
182 
183  return true;
184 }
185 
186 
188 {
189  stream = new DgramTwoWayStream();
190  yCAssert(MCASTCARRIER, stream != nullptr);
191  Contact remote = proto.getStreams().getRemoteAddress();
192  local = proto.getStreams().getLocalAddress();
193  //(yarp::NameConfig::getEnv("YARP_MCAST_TEST")!="");
194  proto.takeStreams(nullptr); // free up port from tcp
195 
196  if (sender) {
197  /*
198  Multicast behavior seems a bit variable.
199  We assume here that if packages need to be broadcast
200  to targets via different network interfaces, that
201  we'll need to send independently on those two
202  interfaces. This may or may not always be the case,
203  the author doesn't know, so is being cautious.
204  */
205  key = proto.getRoute().getFromName();
206  key += "/net=";
207  key += local.getHost();
208 
209  yCDebug(MCASTCARRIER, "multicast key: %s", key.c_str());
210  addSender(key);
211  }
212 
213  bool ok = true;
214  if (isElect() || !sender) {
215  ok = stream->join(mcastAddress, sender, local);
216  }
217 
218  if (!ok) {
219  delete stream;
220  return false;
221  }
222  proto.takeStreams(stream);
223  return true;
224 }
225 
227 {
228  return becomeMcast(proto, false);
229 }
230 
231 
233 {
234  return becomeMcast(proto, true);
235 }
236 
237 void yarp::os::impl::McastCarrier::addSender(const std::string& key)
238 {
239  getCaster().add(key, this);
240 }
241 
242 void yarp::os::impl::McastCarrier::removeSender(const std::string& key)
243 {
244  getCaster().remove(key, this);
245 }
246 
248 {
249  void* elect = getCaster().getElect(key);
250  //void *elect = caster.getElect(mcastAddress.toString());
251  return elect == this || elect == nullptr;
252 }
253 
255 {
256  if (stream != nullptr) {
257  return stream->join(mcastAddress, true, local);
258  }
259  return false;
260 }
261 
262 
264 {
265  return isElect();
266 }
267 
269 {
270  return true;
271 }
A base class for connection types (tcp, mcast, shmem, ...) which are called carriers in YARP.
Definition: Carrier.h:45
The basic state of a connection - route, streams in use, etc.
virtual TwoWayStream & getStreams()=0
Access the streams associated with the connection.
virtual const Route & getRoute() const =0
Get the route associated with this connection.
virtual void takeStreams(TwoWayStream *streams)=0
Provide streams to be used with the connection.
InputStream & is()
Shorthand for getInputStream()
OutputStream & os()
Shorthand for getOutputStream()
Represents how to reach a part of a YARP network.
Definition: Contact.h:36
bool isValid() const
Checks if a Contact is tagged as valid.
Definition: Contact.cpp:298
std::string getRegName() const
Get the name associated with this Contact.
Definition: Contact.cpp:217
std::string toURI(bool includeCarrier=true) const
Get a representation of the Contact as a URI.
Definition: Contact.cpp:313
int getPort() const
Get the port number associated with this Contact for socket communication.
Definition: Contact.cpp:239
std::string getHost() const
Get the host name associated with this Contact for socket communication.
Definition: Contact.cpp:228
Pick one of a set of peers to be "active".
Definition: Election.h:61
yarp::conf::ssize_t readFull(Bytes &b)
Keep reading until buffer is full.
Definition: InputStream.cpp:96
An abstraction for a block of bytes, with optional responsibility for allocating/destroying that bloc...
Definition: ManagedBytes.h:22
const Bytes & bytes() const
const char * get() const
static void unlock()
Call post() on a global mutual-exclusion semaphore allocated by YARP.
Definition: Network.cpp:1464
static Contact unregisterName(const std::string &name)
Removes the registration for a name from the name server.
Definition: Network.cpp:1023
static Contact registerContact(const Contact &contact)
Register contact information with the name server.
Definition: Network.cpp:1017
static bool setProperty(const char *name, const char *key, const Value &value)
Names registered with the nameserver can have arbitrary key->value properties associated with them.
Definition: Network.cpp:1035
static void lock()
Call wait() on a global mutual-exclusion semaphore allocated by YARP.
Definition: Network.cpp:1459
virtual void write(char ch)
Write a single byte to the stream.
const std::string & getFromName() const
Get the source of the route.
Definition: Route.cpp:93
virtual const Contact & getLocalAddress() const =0
Get the address of the local side of the stream.
virtual const Contact & getRemoteAddress() const =0
Get the address of the remote side of the stream.
A single value (typically within a Bottle).
Definition: Value.h:45
A stream abstraction for datagram communication.
Communicating between two ports via MCAST.
Definition: McastCarrier.h:26
bool respondToHeader(ConnectionState &proto) override
Respond to the header.
bool expectExtraHeader(ConnectionState &proto) override
Receive any carrier-specific header.
bool expectReplyToHeader(ConnectionState &proto) override
Process reply to header, if one is expected for this carrier.
bool takeElection()
takeElection, this function is called when the elect mcast carrier dies and pass the write buffers to...
void addSender(const std::string &key)
std::string getName() const override
Get the name of this connection type ("tcp", "mcast", "shmem", ...)
void removeSender(const std::string &key)
Carrier * create() const override
Factory method.
static ElectionOf< PeerRecord< McastCarrier > > * caster
Definition: McastCarrier.h:34
bool becomeMcast(ConnectionState &proto, bool sender)
int getSpecifierCode() const override
bool isActive() const override
Check if carrier is alive and error free.
bool isBroadcast() const override
Check if this carrier uses a broadcast mechanism.
bool sendHeader(ConnectionState &proto) override
Write a header appropriate to the carrier to the connection, followed by any carrier-specific data.
static ElectionOf< PeerRecord< McastCarrier > > & getCaster()
#define yCError(component,...)
Definition: LogComponent.h:154
#define yCAssert(component, x)
Definition: LogComponent.h:169
#define yCDebug(component,...)
Definition: LogComponent.h:109
#define YARP_OS_LOG_COMPONENT(name, name_string)
Definition: LogComponent.h:35
ContainerT split(const typename ContainerT::value_type &s, std::basic_regex< typename ContainerT::value_type::value_type > regex)
Utility to split a string by a separator, into a vector of strings.
Definition: string.h:27
::ssize_t ssize_t
Definition: numeric.h:86
The components from which ports and connections are built.
An interface to the operating system, including Port based communication.