YARP
Yet Another Robot Platform
McastCarrier.cpp
Go to the documentation of this file.
1 /*
2  * Copyright (C) 2006-2020 Istituto Italiano di Tecnologia (IIT)
3  * Copyright (C) 2006-2010 RobotCub Consortium
4  * All rights reserved.
5  *
6  * This software may be modified and distributed under the terms of the
7  * BSD-3-Clause license. See the accompanying LICENSE file for details.
8  */
9 
11 
12 #include <yarp/conf/system.h>
13 
15 #include <yarp/os/Network.h>
16 #include <yarp/os/Route.h>
18 
19 #include <cstdlib>
20 
21 using namespace yarp::os::impl;
22 using namespace yarp::os;
23 
24 namespace {
25 YARP_OS_LOG_COMPONENT(MCASTCARRIER, "yarp.os.impl.McastCarrier")
26 } // namespace
27 
29 
31 {
33  if (caster == nullptr) {
36  if (caster == nullptr) {
37  yCError(MCASTCARRIER, "No memory for McastCarrier::caster");
38  std::exit(1);
39  }
40  } else {
42  }
43  return *caster;
44 }
45 
46 
48 {
49  stream = nullptr;
50  key = "";
51 }
52 
54 {
55  if (!key.empty()) {
56  bool elect = isElect();
57  removeSender(key);
58  if (elect) {
59  McastCarrier* peer = getCaster().getElect(key);
60  if (peer == nullptr) {
61  // time to remove registration
62  NetworkBase::unregisterName(mcastName);
63  } else {
64  if (!peer->takeElection()) {
65  yCError(MCASTCARRIER, "Something went wrong during the shift of the election...");
66  }
67  }
68  }
69  }
70 }
71 
73 {
74  return new McastCarrier();
75 }
76 
78 {
79  return "mcast";
80 }
81 
83 {
84  return 1;
85 }
86 
87 
89 {
90  // need to do more than the default
91  bool ok = defaultSendHeader(proto);
92  if (!ok) {
93  return false;
94  }
95 
96  yCDebug(MCASTCARRIER, "Adding extra mcast header");
97 
98  Contact addr;
99 
100  Contact alt = proto.getStreams().getLocalAddress();
101  std::string altKey = proto.getRoute().getFromName() + "/net=" + alt.getHost();
102  McastCarrier* elect = getCaster().getElect(altKey);
103  if (elect != nullptr) {
104  yCDebug(MCASTCARRIER, "picking up peer mcast name");
105  addr = elect->mcastAddress;
106  mcastName = elect->mcastName;
107  } else {
108 
109  // fetch an mcast address
110  Contact target("...", "mcast", "...", 0);
111  addr = NetworkBase::registerContact(target);
112  mcastName = addr.getRegName();
113  if (addr.isValid()) {
114  // mark owner of mcast address
116  "owns",
117  Value(mcastName));
118  }
119  }
120 
121  int ip[] = {224, 3, 1, 1};
122  int port = 11000;
123  if (addr.isValid()) {
124  SplitString ss(addr.getHost().c_str(), '.');
125  if (ss.size() != 4) {
126  addr = Contact();
127  } else {
128  yCAssert(MCASTCARRIER, ss.size() == 4);
129  for (int i = 0; i < 4; i++) {
130  ip[i] = NetType::toInt(ss.get(i));
131  }
132  port = addr.getPort();
133  }
134  }
135 
136  if (!addr.isValid()) {
137  yCError(MCASTCARRIER, "Name server not responding helpfully, setting mcast name arbitrarily.");
138  yCError(MCASTCARRIER, "Only a single mcast address supported in this mode.");
139  addr = Contact("/tmp/mcast", "mcast", "224.3.1.1", 11000);
140  }
141 
142  ManagedBytes block(6);
143  for (int i = 0; i < 4; i++) {
144  ((unsigned char*)block.get())[i] = (unsigned char)ip[i];
145  }
146  block.get()[5] = (char)(port % 256);
147  block.get()[4] = (char)(port / 256);
148  proto.os().write(block.bytes());
149  mcastAddress = addr;
150  return true;
151 }
152 
154 {
155  yCDebug(MCASTCARRIER, "Expecting extra mcast header");
156  ManagedBytes block(6);
157  yarp::conf::ssize_t len = proto.is().readFull(block.bytes());
158  if ((size_t)len != block.length()) {
159  yCError(MCASTCARRIER, "problem with MCAST header");
160  return false;
161  }
162 
163  int ip[] = {0, 0, 0, 0};
164  int port = -1;
165 
166  auto* base = (unsigned char*)block.get();
167  std::string add;
168  for (int i = 0; i < 4; i++) {
169  ip[i] = base[i];
170  if (i != 0) {
171  add += ".";
172  }
173  char buf[100];
174  sprintf(buf, "%d", ip[i]);
175  add += buf;
176  }
177  port = 256 * base[4] + base[5];
178  Contact addr("mcast", add, port);
179  yCDebug(MCASTCARRIER, "got mcast header %s", addr.toURI().c_str());
180  mcastAddress = addr;
181 
182  return true;
183 }
184 
185 
187 {
188  stream = new DgramTwoWayStream();
189  yCAssert(MCASTCARRIER, stream != nullptr);
190  Contact remote = proto.getStreams().getRemoteAddress();
191  local = proto.getStreams().getLocalAddress();
192  //(yarp::NameConfig::getEnv("YARP_MCAST_TEST")!="");
193  proto.takeStreams(nullptr); // free up port from tcp
194 
195  if (sender) {
196  /*
197  Multicast behavior seems a bit variable.
198  We assume here that if packages need to be broadcast
199  to targets via different network interfaces, that
200  we'll need to send independently on those two
201  interfaces. This may or may not always be the case,
202  the author doesn't know, so is being cautious.
203  */
204  key = proto.getRoute().getFromName();
205  key += "/net=";
206  key += local.getHost();
207 
208  yCDebug(MCASTCARRIER, "multicast key: %s", key.c_str());
209  addSender(key);
210  }
211 
212  bool ok = true;
213  if (isElect() || !sender) {
214  ok = stream->join(mcastAddress, sender, local);
215  }
216 
217  if (!ok) {
218  delete stream;
219  return false;
220  }
221  proto.takeStreams(stream);
222  return true;
223 }
224 
226 {
227  return becomeMcast(proto, false);
228 }
229 
230 
232 {
233  return becomeMcast(proto, true);
234 }
235 
236 void yarp::os::impl::McastCarrier::addSender(const std::string& key)
237 {
238  getCaster().add(key, this);
239 }
240 
241 void yarp::os::impl::McastCarrier::removeSender(const std::string& key)
242 {
243  getCaster().remove(key, this);
244 }
245 
247 {
248  void* elect = getCaster().getElect(key);
249  //void *elect = caster.getElect(mcastAddress.toString());
250  return elect == this || elect == nullptr;
251 }
252 
254 {
255  if (stream != nullptr) {
256  return stream->join(mcastAddress, true, local);
257  }
258  return false;
259 }
260 
261 
263 {
264  return isElect();
265 }
266 
268 {
269  return true;
270 }
A base class for connection types (tcp, mcast, shmem, ...) which are called carriers in YARP.
Definition: Carrier.h:48
The basic state of a connection - route, streams in use, etc.
virtual TwoWayStream & getStreams()=0
Access the streams associated with the connection.
virtual const Route & getRoute() const =0
Get the route associated with this connection.
virtual void takeStreams(TwoWayStream *streams)=0
Provide streams to be used with the connection.
InputStream & is()
Shorthand for getInputStream()
OutputStream & os()
Shorthand for getOutputStream()
Represents how to reach a part of a YARP network.
Definition: Contact.h:39
bool isValid() const
Checks if a Contact is tagged as valid.
Definition: Contact.cpp:301
std::string getRegName() const
Get the name associated with this Contact.
Definition: Contact.cpp:220
std::string toURI(bool includeCarrier=true) const
Get a representation of the Contact as a URI.
Definition: Contact.cpp:316
int getPort() const
Get the port number associated with this Contact for socket communication.
Definition: Contact.cpp:242
std::string getHost() const
Get the host name associated with this Contact for socket communication.
Definition: Contact.cpp:231
Pick one of a set of peers to be "active".
Definition: Election.h:64
yarp::conf::ssize_t readFull(Bytes &b)
Keep reading until buffer is full.
Definition: InputStream.cpp:99
An abstraction for a block of bytes, with optional responsibility for allocating/destroying that bloc...
Definition: ManagedBytes.h:25
const Bytes & bytes() const
const char * get() const
static int toInt(const std::string &x)
Definition: NetType.cpp:160
static void unlock()
Call post() on a global mutual-exclusion semaphore allocated by YARP.
Definition: Network.cpp:1467
static Contact unregisterName(const std::string &name)
Removes the registration for a name from the name server.
Definition: Network.cpp:1026
static Contact registerContact(const Contact &contact)
Register contact information with the name server.
Definition: Network.cpp:1020
static bool setProperty(const char *name, const char *key, const Value &value)
Names registered with the nameserver can have arbitrary key->value properties associated with them.
Definition: Network.cpp:1038
static void lock()
Call wait() on a global mutual-exclusion semaphore allocated by YARP.
Definition: Network.cpp:1462
virtual void write(char ch)
Write a single byte to the stream.
const std::string & getFromName() const
Get the source of the route.
Definition: Route.cpp:96
virtual const Contact & getLocalAddress() const =0
Get the address of the local side of the stream.
virtual const Contact & getRemoteAddress() const =0
Get the address of the remote side of the stream.
A single value (typically within a Bottle).
Definition: Value.h:47
A stream abstraction for datagram communication.
Communicating between two ports via MCAST.
Definition: McastCarrier.h:30
bool respondToHeader(ConnectionState &proto) override
Respond to the header.
bool expectExtraHeader(ConnectionState &proto) override
Receive any carrier-specific header.
bool expectReplyToHeader(ConnectionState &proto) override
Process reply to header, if one is expected for this carrier.
bool takeElection()
takeElection, this function is called when the elect mcast carrier dies and pass the write buffers to...
void addSender(const std::string &key)
std::string getName() const override
Get the name of this connection type ("tcp", "mcast", "shmem", ...)
void removeSender(const std::string &key)
Carrier * create() const override
Factory method.
static ElectionOf< PeerRecord< McastCarrier > > * caster
Definition: McastCarrier.h:38
bool becomeMcast(ConnectionState &proto, bool sender)
int getSpecifierCode() const override
bool isActive() const override
Check if carrier is alive and error free.
bool isBroadcast() const override
Check if this carrier uses a broadcast mechanism.
bool sendHeader(ConnectionState &proto) override
Write a header appropriate to the carrier to the connection, followed by any carrier-specific data.
static ElectionOf< PeerRecord< McastCarrier > > & getCaster()
Split a string into pieces.
Definition: SplitString.h:27
const char * get(int idx)
Definition: SplitString.cpp:44
#define yCError(component,...)
Definition: LogComponent.h:157
#define yCAssert(component, x)
Definition: LogComponent.h:172
#define yCDebug(component,...)
Definition: LogComponent.h:112
#define YARP_OS_LOG_COMPONENT(name, name_string)
Definition: LogComponent.h:37
::ssize_t ssize_t
Definition: numeric.h:60
The components from which ports and connections are built.
An interface to the operating system, including Port based communication.