YARP
Yet Another Robot Platform
LocalCarrier.cpp
Go to the documentation of this file.
1 /*
2  * SPDX-FileCopyrightText: 2006-2021 Istituto Italiano di Tecnologia (IIT)
3  * SPDX-FileCopyrightText: 2006-2010 RobotCub Consortium
4  * SPDX-License-Identifier: BSD-3-Clause
5  */
6 
8 
10 #include <yarp/os/Portable.h>
11 #include <yarp/os/Route.h>
12 #include <yarp/os/SizedWriter.h>
13 
15 
16 using namespace yarp::os;
17 
18 namespace {
19 YARP_OS_LOG_COMPONENT(LOCALCARRIER, "yarp.os.impl.LocalCarrier")
20 } // namespace
21 
23 
25  senderMutex(),
26  receiverMutex(),
27  received(0),
28  sender(nullptr),
29  receiver(nullptr)
30 {
31 }
32 
34 {
35  senderMutex.lock();
36  this->sender = sender;
37 }
38 
40 {
41  received.wait();
42  LocalCarrier* result = receiver;
43  sender = nullptr;
44  senderMutex.unlock();
45  return result;
46 }
47 
49 {
50  receiverMutex.lock();
51  this->receiver = receiver;
52  LocalCarrier* result = sender;
53  received.post();
54  receiverMutex.unlock();
55  return result;
56 }
57 
59 {
60  if (sender == carrier) {
61  senderMutex.unlock();
62  }
63 }
64 
65 
67 {
68  this->owner = owner;
69  this->sender = sender;
70  done = false;
71 }
72 
74 {
75  return *this;
76 }
77 
79 {
80  return *this;
81 }
82 
84 {
85  return localAddress;
86 }
87 
89 {
90  return remoteAddress;
91 }
92 
94 {
95  YARP_UNUSED(tos);
96  return true;
97 }
98 
100 {
101  yCAssert(LOCALCARRIER, false);
102  return b.length();
103 }
104 
106 {
107  YARP_UNUSED(b);
108  yCAssert(LOCALCARRIER, false);
109 }
110 
112 {
113 }
114 
116 {
117 }
118 
120 {
121 }
122 
124 {
125  done = true;
126 }
127 
129 {
130  if (owner != nullptr) {
131  LocalCarrier* owned = owner;
132  owner = nullptr;
133  owned->shutdown();
134  }
135  done = true;
136 }
137 
139 {
140  return !done;
141 }
142 
143 
145  peerMutex(), sent(0), received(0)
146 {
147  ref = nullptr;
148  peer = nullptr;
149  doomed = false;
150 }
151 
153 {
154  shutdown();
155 }
156 
158 {
159  return new LocalCarrier();
160 }
161 
163 {
164  if (!doomed) {
165  peerMutex.lock();
166  peer = nullptr;
167  peerMutex.unlock();
168  }
169 }
170 
172 {
173  if (!doomed) {
174  doomed = true;
175  peerMutex.lock();
176  if (peer != nullptr) {
177  peer->accept(nullptr);
178  LocalCarrier* wasPeer = peer;
179  peer = nullptr;
180  wasPeer->removePeer();
181  }
182  peerMutex.unlock();
183  }
184 }
185 
187 {
188  return "local";
189 }
190 
192 {
193  return false;
194 }
195 
197 {
198  return false;
199 }
200 
202 {
203  return false;
204 }
205 
207 {
208  return true;
209 }
210 
212 {
213  return "LOCALITY";
214 }
215 
217 {
218  if (header.length() == 8) {
219  std::string target = getSpecifierName();
220  for (int i = 0; i < 8; i++) {
221  if (!(target[i] == header.get()[i])) {
222  return false;
223  }
224  }
225  return true;
226  }
227  return false;
228 }
229 
231 {
232  if (header.length() == 8) {
233  std::string target = getSpecifierName();
234  for (int i = 0; i < 8; i++) {
235  header.get()[i] = target[i];
236  }
237  }
238 }
239 
241 {
242  YARP_UNUSED(header);
243 }
244 
246 {
247  portName = proto.getRoute().getFromName();
248 
249  manager.setSender(this);
250 
251  defaultSendHeader(proto);
252  // now switch over to some local structure to communicate
253  peerMutex.lock();
254  peer = manager.getReceiver();
255  yCDebug(LOCALCARRIER,
256  "sender %p sees receiver %p",
257  this,
258  peer);
259  peerMutex.unlock();
260 
261  return true;
262 }
263 
265 {
266  portName = proto.getRoute().getToName();
267  // switch over to some local structure to communicate
268  peerMutex.lock();
269  peer = manager.getSender(this);
270  yCDebug(LOCALCARRIER,
271  "receiver %p (%s) sees sender %p (%s)",
272  this,
273  portName.c_str(),
274  peer,
275  peer->portName.c_str());
276  Route route = proto.getRoute();
277  route.setFromName(peer->portName);
278  proto.setRoute(route);
279  peerMutex.unlock();
280 
281  return true;
282 }
283 
285 {
286  auto* stream = new LocalCarrierStream();
287  if (stream != nullptr) {
288  stream->attach(this, sender);
289  }
290  proto.takeStreams(stream);
291  return true;
292 }
293 
295 {
296  YARP_UNUSED(proto);
297  yarp::os::Portable* ref = writer.getReference();
298  if (ref != nullptr) {
299  peerMutex.lock();
300  if (peer != nullptr) {
301  peer->accept(ref);
302  } else {
303  yCError(LOCALCARRIER, "local send failed - write without peer");
304  }
305  peerMutex.unlock();
306  } else {
307  yCError(LOCALCARRIER, "local send failed - no object");
308  }
309 
310  return true;
311 }
312 
314 {
315  // I am the receiver
316  return becomeLocal(proto, false);
317 }
318 
319 
321 {
322  // I am the sender
323  return becomeLocal(proto, true);
324 }
325 
327 {
328 
329  yCDebug(LOCALCARRIER, "local recv: wait send");
330  sent.wait();
331  yCDebug(LOCALCARRIER, "local recv: got send");
332  proto.setReference(ref);
333  received.post();
334  if (ref != nullptr) {
335  yCDebug(LOCALCARRIER, "local recv: received");
336  } else {
337  yCDebug(LOCALCARRIER, "local recv: shutdown");
338  proto.is().interrupt();
339  return false;
340  }
341 
342  return true;
343 }
344 
346 {
347  this->ref = ref;
348  yCDebug(LOCALCARRIER, "local send: send ref");
349  sent.post();
350  if (ref != nullptr && !doomed) {
351  yCDebug(LOCALCARRIER, "local send: wait receipt");
352  received.wait();
353  yCDebug(LOCALCARRIER, "local send: received");
354  }
355 }
A simple abstraction for a block of bytes.
Definition: Bytes.h:25
size_t length() const
Definition: Bytes.cpp:22
const char * get() const
Definition: Bytes.cpp:27
A base class for connection types (tcp, mcast, shmem, ...) which are called carriers in YARP.
Definition: Carrier.h:45
The basic state of a connection - route, streams in use, etc.
virtual const Route & getRoute() const =0
Get the route associated with this connection.
virtual void setReference(yarp::os::Portable *ref)=0
Give a direct pointer to an object being sent on the connection.
virtual void takeStreams(TwoWayStream *streams)=0
Provide streams to be used with the connection.
InputStream & is()
Shorthand for getInputStream()
virtual void setRoute(const Route &route)=0
Set the route associated with this connection.
Represents how to reach a part of a YARP network.
Definition: Contact.h:36
Simple specification of the minimum functions needed from input streams.
Definition: InputStream.h:26
virtual int read()
Read and return a single byte.
Definition: InputStream.cpp:20
virtual void interrupt()
Interrupt the stream.
Definition: InputStream.cpp:42
Simple specification of the minimum functions needed from output streams.
Definition: OutputStream.h:22
This is a base class for objects that can be both read from and be written to the YARP network.
Definition: Portable.h:26
Information about a connection between two ports.
Definition: Route.h:29
const std::string & getToName() const
Get the destination of the route.
Definition: Route.cpp:103
const std::string & getFromName() const
Get the source of the route.
Definition: Route.cpp:93
void setFromName(const std::string &fromName)
Set the source of the route.
Definition: Route.cpp:98
Minimal requirements for an efficient Writer.
Definition: SizedWriter.h:33
virtual Portable * getReference()=0
Coordinate ports communicating locally within a process.
Definition: LocalCarrier.h:29
void setSender(LocalCarrier *sender)
void revoke(LocalCarrier *carrier)
LocalCarrier * getSender(LocalCarrier *receiver)
A stream for communicating locally within a process.
Definition: LocalCarrier.h:53
bool isOk() const override
Check if the stream is ok or in an error state.
void interrupt() override
Interrupt the stream.
InputStream & getInputStream() override
Get an InputStream to read from.
const Contact & getLocalAddress() const override
Get the address of the local side of the stream.
OutputStream & getOutputStream() override
Get an OutputStream to write to.
void close() override
Terminate the stream.
void beginPacket() override
Mark the beginning of a logical packet.
void write(const yarp::os::Bytes &b) override
Write a block of bytes to the stream.
void attach(LocalCarrier *owner, bool sender)
void endPacket() override
Mark the end of a logical packet (see beginPacket).
bool setTypeOfService(int tos) override
const Contact & getRemoteAddress() const override
Get the address of the remote side of the stream.
void reset() override
Reset the stream.
A carrier for communicating locally within a process.
Definition: LocalCarrier.h:88
bool sendHeader(ConnectionState &proto) override
Write a header appropriate to the carrier to the connection, followed by any carrier-specific data.
static LocalCarrierManager manager
Definition: LocalCarrier.h:127
bool write(ConnectionState &proto, SizedWriter &writer) override
Write a message.
void accept(yarp::os::Portable *ref)
bool expectReplyToHeader(ConnectionState &proto) override
Process reply to header, if one is expected for this carrier.
bool expectExtraHeader(ConnectionState &proto) override
Receive any carrier-specific header.
bool requireAck() const override
Check if carrier has flow control, requiring sent messages to be acknowledged by recipient.
bool checkHeader(const Bytes &header) override
Given the first 8 bytes received on a connection, decide if this is the right carrier type to use for...
yarp::os::Portable * ref
Definition: LocalCarrier.h:120
bool canEscape() const override
Check if carrier can encode administrative messages, as opposed to just user data.
void setParameters(const Bytes &header) override
Configure this carrier based on the first 8 bytes of the connection.
Carrier * create() const override
Factory method.
virtual bool becomeLocal(ConnectionState &proto, bool sender)
virtual std::string getSpecifierName() const
bool expectIndex(ConnectionState &proto) override
Expect a message header, if there is one for this carrier.
std::string getName() const override
Get the name of this connection type ("tcp", "mcast", "shmem", ...)
void getHeader(Bytes &header) const override
Provide 8 bytes describing this connection sufficiently to allow the other side of a connection to se...
bool isLocal() const override
Check if carrier operates within a single process.
bool respondToHeader(ConnectionState &proto) override
Respond to the header.
bool isConnectionless() const override
Check if this carrier is connectionless (like udp, mcast) or connection based (like tcp).
#define yCError(component,...)
Definition: LogComponent.h:154
#define yCAssert(component, x)
Definition: LogComponent.h:169
#define yCDebug(component,...)
Definition: LogComponent.h:109
#define YARP_OS_LOG_COMPONENT(name, name_string)
Definition: LogComponent.h:34
::ssize_t ssize_t
Definition: numeric.h:86
An interface to the operating system, including Port based communication.
#define YARP_UNUSED(var)
Definition: api.h:162