YARP
Yet Another Robot Platform
McastCarrier.cpp
Go to the documentation of this file.
1/*
2 * SPDX-FileCopyrightText: 2006-2021 Istituto Italiano di Tecnologia (IIT)
3 * SPDX-FileCopyrightText: 2006-2010 RobotCub Consortium
4 * SPDX-License-Identifier: BSD-3-Clause
5 */
6
8
9#include <yarp/conf/system.h>
10#include <yarp/conf/string.h>
11#include <yarp/conf/numeric.h>
12
14#include <yarp/os/Network.h>
15#include <yarp/os/Route.h>
17
18#include <cstdlib>
19
20using namespace yarp::os::impl;
21using namespace yarp::os;
22
23namespace {
24YARP_OS_LOG_COMPONENT(MCASTCARRIER, "yarp.os.impl.McastCarrier")
25} // namespace
26
27ElectionOf<PeerRecord<McastCarrier>>* McastCarrier::caster = nullptr;
28
29ElectionOf<PeerRecord<McastCarrier>>& McastCarrier::getCaster()
30{
32 if (caster == nullptr) {
35 if (caster == nullptr) {
36 yCError(MCASTCARRIER, "No memory for McastCarrier::caster");
37 std::exit(1);
38 }
39 } else {
41 }
42 return *caster;
43}
44
45
47{
48 stream = nullptr;
49 key = "";
50}
51
53{
54 if (!key.empty()) {
55 bool elect = isElect();
56 removeSender(key);
57 if (elect) {
58 McastCarrier* peer = getCaster().getElect(key);
59 if (peer == nullptr) {
60 // time to remove registration
62 } else {
63 if (!peer->takeElection()) {
64 yCError(MCASTCARRIER, "Something went wrong during the shift of the election...");
65 }
66 }
67 }
68 }
69}
70
72{
73 return new McastCarrier();
74}
75
77{
78 return "mcast";
79}
80
82{
83 return 1;
84}
85
86
88{
89 // need to do more than the default
90 bool ok = defaultSendHeader(proto);
91 if (!ok) {
92 return false;
93 }
94
95 yCDebug(MCASTCARRIER, "Adding extra mcast header");
96
97 Contact addr;
98
99 Contact alt = proto.getStreams().getLocalAddress();
100 std::string altKey = proto.getRoute().getFromName() + "/net=" + alt.getHost();
101 McastCarrier* elect = getCaster().getElect(altKey);
102 if (elect != nullptr) {
103 yCDebug(MCASTCARRIER, "picking up peer mcast name");
104 addr = elect->mcastAddress;
105 mcastName = elect->mcastName;
106 } else {
107
108 // fetch an mcast address
109 Contact target("...", "mcast", "...", 0);
110 addr = NetworkBase::registerContact(target);
111 mcastName = addr.getRegName();
112 if (addr.isValid()) {
113 // mark owner of mcast address
115 "owns",
116 Value(mcastName));
117 }
118 }
119
120 constexpr size_t ipv4_size = 4;
121 int ip[ipv4_size] = {224, 3, 1, 1};
122 constexpr int default_port = 11000;
123 int port = default_port;
124 if (addr.isValid()) {
125 auto ss = yarp::conf::string::split(addr.getHost(), '.');
126 if (ss.size() != ipv4_size) {
127 addr = Contact();
128 } else {
129 for (size_t i = 0; i < ipv4_size; ++i) {
130 ip[i] = yarp::conf::numeric::from_string<int>(ss[i]);
131 }
132 port = addr.getPort();
133 }
134 }
135
136 if (!addr.isValid()) {
137 yCError(MCASTCARRIER, "Name server not responding helpfully, setting mcast name arbitrarily.");
138 yCError(MCASTCARRIER, "Only a single mcast address supported in this mode.");
139 addr = Contact("/tmp/mcast", "mcast", "224.3.1.1", 11000);
140 }
141
142 ManagedBytes block(6);
143 for (size_t i = 0; i < ipv4_size; i++) {
144 ((unsigned char*)block.get())[i] = (unsigned char)ip[i];
145 }
146 block.get()[5] = (char)(port % 256);
147 block.get()[4] = (char)(port / 256);
148 proto.os().write(block.bytes());
149 mcastAddress = addr;
150 return true;
151}
152
154{
155 yCDebug(MCASTCARRIER, "Expecting extra mcast header");
156 ManagedBytes block(6);
157 yarp::conf::ssize_t len = proto.is().readFull(block.bytes());
158 if ((size_t)len != block.length()) {
159 yCError(MCASTCARRIER, "problem with MCAST header");
160 return false;
161 }
162
163 constexpr size_t ipv4_size = 4;
164 int ip[] = {0, 0, 0, 0};
165 int port = -1;
166
167 auto* base = (unsigned char*)block.get();
168 std::string add;
169 for (size_t i = 0; i < ipv4_size; i++) {
170 ip[i] = base[i];
171 if (i != 0) {
172 add += ".";
173 }
174 char buf[100];
175 sprintf(buf, "%d", ip[i]);
176 add += buf;
177 }
178 port = 256 * base[4] + base[5];
179 Contact addr("mcast", add, port);
180 yCDebug(MCASTCARRIER, "got mcast header %s", addr.toURI().c_str());
181 mcastAddress = addr;
182
183 return true;
184}
185
186
188{
189 stream = new DgramTwoWayStream();
190 yCAssert(MCASTCARRIER, stream != nullptr);
191 Contact remote = proto.getStreams().getRemoteAddress();
192 local = proto.getStreams().getLocalAddress();
193 //(yarp::NameConfig::getEnv("YARP_MCAST_TEST")!="");
194 proto.takeStreams(nullptr); // free up port from tcp
195
196 if (sender) {
197 /*
198 Multicast behavior seems a bit variable.
199 We assume here that if packages need to be broadcast
200 to targets via different network interfaces, that
201 we'll need to send independently on those two
202 interfaces. This may or may not always be the case,
203 the author doesn't know, so is being cautious.
204 */
205 key = proto.getRoute().getFromName();
206 key += "/net=";
207 key += local.getHost();
208
209 yCDebug(MCASTCARRIER, "multicast key: %s", key.c_str());
210 addSender(key);
211 }
212
213 bool ok = true;
214 if (isElect() || !sender) {
215 ok = stream->join(mcastAddress, sender, local);
216 }
217
218 if (!ok) {
219 delete stream;
220 return false;
221 }
222 proto.takeStreams(stream);
223 return true;
224}
225
227{
228 return becomeMcast(proto, false);
229}
230
231
233{
234 return becomeMcast(proto, true);
235}
236
237void yarp::os::impl::McastCarrier::addSender(const std::string& key)
238{
239 getCaster().add(key, this);
240}
241
243{
244 getCaster().remove(key, this);
245}
246
248{
249 void* elect = getCaster().getElect(key);
250 //void *elect = caster.getElect(mcastAddress.toString());
251 return elect == this || elect == nullptr;
252}
253
255{
256 if (stream != nullptr) {
257 return stream->join(mcastAddress, true, local);
258 }
259 return false;
260}
261
262
264{
265 return isElect();
266}
267
269{
270 return true;
271}
A base class for connection types (tcp, mcast, shmem, ...) which are called carriers in YARP.
Definition: Carrier.h:44
The basic state of a connection - route, streams in use, etc.
OutputStream & os()
Shorthand for getOutputStream()
virtual const Route & getRoute() const =0
Get the route associated with this connection.
InputStream & is()
Shorthand for getInputStream()
virtual void takeStreams(TwoWayStream *streams)=0
Provide streams to be used with the connection.
virtual TwoWayStream & getStreams()=0
Access the streams associated with the connection.
Represents how to reach a part of a YARP network.
Definition: Contact.h:33
bool isValid() const
Checks if a Contact is tagged as valid.
Definition: Contact.cpp:298
std::string getRegName() const
Get the name associated with this Contact.
Definition: Contact.cpp:217
std::string toURI(bool includeCarrier=true) const
Get a representation of the Contact as a URI.
Definition: Contact.cpp:313
int getPort() const
Get the port number associated with this Contact for socket communication.
Definition: Contact.cpp:239
std::string getHost() const
Get the host name associated with this Contact for socket communication.
Definition: Contact.cpp:228
Pick one of a set of peers to be "active".
Definition: Election.h:60
yarp::conf::ssize_t readFull(Bytes &b)
Keep reading until buffer is full.
Definition: InputStream.cpp:96
An abstraction for a block of bytes, with optional responsibility for allocating/destroying that bloc...
Definition: ManagedBytes.h:21
const Bytes & bytes() const
const char * get() const
static void unlock()
Call post() on a global mutual-exclusion semaphore allocated by YARP.
Definition: Network.cpp:1464
static Contact unregisterName(const std::string &name)
Removes the registration for a name from the name server.
Definition: Network.cpp:1023
static Contact registerContact(const Contact &contact)
Register contact information with the name server.
Definition: Network.cpp:1017
static bool setProperty(const char *name, const char *key, const Value &value)
Names registered with the nameserver can have arbitrary key->value properties associated with them.
Definition: Network.cpp:1035
static void lock()
Call wait() on a global mutual-exclusion semaphore allocated by YARP.
Definition: Network.cpp:1459
virtual void write(char ch)
Write a single byte to the stream.
const std::string & getFromName() const
Get the source of the route.
Definition: Route.cpp:93
virtual const Contact & getRemoteAddress() const =0
Get the address of the remote side of the stream.
virtual const Contact & getLocalAddress() const =0
Get the address of the local side of the stream.
A single value (typically within a Bottle).
Definition: Value.h:43
A stream abstraction for datagram communication.
Communicating between two ports via MCAST.
Definition: McastCarrier.h:24
bool respondToHeader(ConnectionState &proto) override
Respond to the header.
bool expectExtraHeader(ConnectionState &proto) override
Receive any carrier-specific header.
bool expectReplyToHeader(ConnectionState &proto) override
Process reply to header, if one is expected for this carrier.
bool takeElection()
takeElection, this function is called when the elect mcast carrier dies and pass the write buffers to...
void addSender(const std::string &key)
std::string getName() const override
Get the name of this connection type ("tcp", "mcast", "shmem", ...)
void removeSender(const std::string &key)
Carrier * create() const override
Factory method.
static ElectionOf< PeerRecord< McastCarrier > > * caster
Definition: McastCarrier.h:32
bool becomeMcast(ConnectionState &proto, bool sender)
int getSpecifierCode() const override
bool isActive() const override
Check if carrier is alive and error free.
bool isBroadcast() const override
Check if this carrier uses a broadcast mechanism.
bool sendHeader(ConnectionState &proto) override
Write a header appropriate to the carrier to the connection, followed by any carrier-specific data.
#define yCError(component,...)
Definition: LogComponent.h:213
#define yCAssert(component, x)
Definition: LogComponent.h:240
#define yCDebug(component,...)
Definition: LogComponent.h:128
#define YARP_OS_LOG_COMPONENT(name, name_string)
Definition: LogComponent.h:29
ContainerT split(const typename ContainerT::value_type &s, std::basic_regex< typename ContainerT::value_type::value_type > regex)
Utility to split a string by a separator, into a vector of strings.
Definition: string.h:26
::ssize_t ssize_t
Definition: numeric.h:86
The components from which ports and connections are built.
An interface to the operating system, including Port based communication.