YARP
Yet Another Robot Platform
LocalCarrier.cpp
Go to the documentation of this file.
1/*
2 * SPDX-FileCopyrightText: 2006-2021 Istituto Italiano di Tecnologia (IIT)
3 * SPDX-FileCopyrightText: 2006-2010 RobotCub Consortium
4 * SPDX-License-Identifier: BSD-3-Clause
5 */
6
8
10#include <yarp/os/Portable.h>
11#include <yarp/os/Route.h>
12#include <yarp/os/SizedWriter.h>
13
15
16using namespace yarp::os;
17
18namespace {
19YARP_OS_LOG_COMPONENT(LOCALCARRIER, "yarp.os.impl.LocalCarrier")
20} // namespace
21
23
25 senderMutex(),
26 receiverMutex(),
27 received(0),
28 sender(nullptr),
29 receiver(nullptr)
30{
31}
32
34{
35 senderMutex.lock();
36 this->sender = sender;
37}
38
40{
41 received.wait();
42 LocalCarrier* result = receiver;
43 sender = nullptr;
44 senderMutex.unlock();
45 return result;
46}
47
49{
50 receiverMutex.lock();
51 this->receiver = receiver;
52 LocalCarrier* result = sender;
53 received.post();
54 receiverMutex.unlock();
55 return result;
56}
57
59{
60 if (sender == carrier) {
61 senderMutex.unlock();
62 }
63}
64
65
67{
68 this->owner = owner;
69 this->sender = sender;
70 done = false;
71}
72
74{
75 return *this;
76}
77
79{
80 return *this;
81}
82
84{
85 return localAddress;
86}
87
89{
90 return remoteAddress;
91}
92
94{
95 YARP_UNUSED(tos);
96 return true;
97}
98
100{
101 yCAssert(LOCALCARRIER, false);
102 return b.length();
103}
104
106{
107 YARP_UNUSED(b);
108 yCAssert(LOCALCARRIER, false);
109}
110
112{
113}
114
116{
117}
118
120{
121}
122
124{
125 done = true;
126}
127
129{
130 if (owner != nullptr) {
131 LocalCarrier* owned = owner;
132 owner = nullptr;
133 owned->shutdown();
134 }
135 done = true;
136}
137
139{
140 return !done;
141}
142
143
145 peerMutex(), sent(0), received(0)
146{
147 ref = nullptr;
148 peer = nullptr;
149 doomed = false;
150}
151
153{
154 shutdown();
155}
156
158{
159 return new LocalCarrier();
160}
161
163{
164 if (!doomed) {
165 peerMutex.lock();
166 peer = nullptr;
167 peerMutex.unlock();
168 }
169}
170
172{
173 if (!doomed) {
174 doomed = true;
175 peerMutex.lock();
176 if (peer != nullptr) {
177 peer->accept(nullptr);
178 LocalCarrier* wasPeer = peer;
179 peer = nullptr;
180 wasPeer->removePeer();
181 }
182 peerMutex.unlock();
183 }
184}
185
187{
188 return "local";
189}
190
192{
193 return false;
194}
195
197{
198 return false;
199}
200
202{
203 return false;
204}
205
207{
208 return true;
209}
210
212{
213 return "LOCALITY";
214}
215
217{
218 if (header.length() == 8) {
219 std::string target = getSpecifierName();
220 for (int i = 0; i < 8; i++) {
221 if (!(target[i] == header.get()[i])) {
222 return false;
223 }
224 }
225 return true;
226 }
227 return false;
228}
229
231{
232 if (header.length() == 8) {
233 std::string target = getSpecifierName();
234 for (int i = 0; i < 8; i++) {
235 header.get()[i] = target[i];
236 }
237 }
238}
239
241{
242 YARP_UNUSED(header);
243}
244
246{
247 portName = proto.getRoute().getFromName();
248
249 manager.setSender(this);
250
251 defaultSendHeader(proto);
252 // now switch over to some local structure to communicate
253 peerMutex.lock();
254 peer = manager.getReceiver();
255 yCDebug(LOCALCARRIER,
256 "sender %p sees receiver %p",
257 this,
258 peer);
259 peerMutex.unlock();
260
261 return true;
262}
263
265{
266 portName = proto.getRoute().getToName();
267 // switch over to some local structure to communicate
268 peerMutex.lock();
269 peer = manager.getSender(this);
270 yCDebug(LOCALCARRIER,
271 "receiver %p (%s) sees sender %p (%s)",
272 this,
273 portName.c_str(),
274 peer,
275 peer->portName.c_str());
276 Route route = proto.getRoute();
277 route.setFromName(peer->portName);
278 proto.setRoute(route);
279 peerMutex.unlock();
280
281 return true;
282}
283
285{
286 auto* stream = new LocalCarrierStream();
287 if (stream != nullptr) {
288 stream->attach(this, sender);
289 }
290 proto.takeStreams(stream);
291 return true;
292}
293
295{
296 YARP_UNUSED(proto);
297 yarp::os::Portable* ref = writer.getReference();
298 if (ref != nullptr) {
299 peerMutex.lock();
300 if (peer != nullptr) {
301 peer->accept(ref);
302 } else {
303 yCError(LOCALCARRIER, "local send failed - write without peer");
304 }
305 peerMutex.unlock();
306 } else {
307 yCError(LOCALCARRIER, "local send failed - no object");
308 }
309
310 return true;
311}
312
314{
315 // I am the receiver
316 return becomeLocal(proto, false);
317}
318
319
321{
322 // I am the sender
323 return becomeLocal(proto, true);
324}
325
327{
328
329 yCDebug(LOCALCARRIER, "local recv: wait send");
330 sent.wait();
331 yCDebug(LOCALCARRIER, "local recv: got send");
332 proto.setReference(ref);
333 received.post();
334 if (ref != nullptr) {
335 yCDebug(LOCALCARRIER, "local recv: received");
336 } else {
337 yCDebug(LOCALCARRIER, "local recv: shutdown");
338 proto.is().interrupt();
339 return false;
340 }
341
342 return true;
343}
344
346{
347 this->ref = ref;
348 yCDebug(LOCALCARRIER, "local send: send ref");
349 sent.post();
350 if (ref != nullptr && !doomed) {
351 yCDebug(LOCALCARRIER, "local send: wait receipt");
352 received.wait();
353 yCDebug(LOCALCARRIER, "local send: received");
354 }
355}
A simple abstraction for a block of bytes.
Definition: Bytes.h:24
size_t length() const
Definition: Bytes.cpp:22
const char * get() const
Definition: Bytes.cpp:27
A base class for connection types (tcp, mcast, shmem, ...) which are called carriers in YARP.
Definition: Carrier.h:44
The basic state of a connection - route, streams in use, etc.
virtual void setReference(yarp::os::Portable *ref)=0
Give a direct pointer to an object being sent on the connection.
virtual const Route & getRoute() const =0
Get the route associated with this connection.
InputStream & is()
Shorthand for getInputStream()
virtual void takeStreams(TwoWayStream *streams)=0
Provide streams to be used with the connection.
virtual void setRoute(const Route &route)=0
Set the route associated with this connection.
Represents how to reach a part of a YARP network.
Definition: Contact.h:33
Simple specification of the minimum functions needed from input streams.
Definition: InputStream.h:25
virtual int read()
Read and return a single byte.
Definition: InputStream.cpp:20
virtual void interrupt()
Interrupt the stream.
Definition: InputStream.cpp:42
Simple specification of the minimum functions needed from output streams.
Definition: OutputStream.h:21
This is a base class for objects that can be both read from and be written to the YARP network.
Definition: Portable.h:25
Information about a connection between two ports.
Definition: Route.h:28
const std::string & getToName() const
Get the destination of the route.
Definition: Route.cpp:103
const std::string & getFromName() const
Get the source of the route.
Definition: Route.cpp:93
void setFromName(const std::string &fromName)
Set the source of the route.
Definition: Route.cpp:98
Minimal requirements for an efficient Writer.
Definition: SizedWriter.h:32
virtual Portable * getReference()=0
Coordinate ports communicating locally within a process.
Definition: LocalCarrier.h:27
void setSender(LocalCarrier *sender)
void revoke(LocalCarrier *carrier)
LocalCarrier * getSender(LocalCarrier *receiver)
A stream for communicating locally within a process.
Definition: LocalCarrier.h:51
bool isOk() const override
Check if the stream is ok or in an error state.
void interrupt() override
Interrupt the stream.
InputStream & getInputStream() override
Get an InputStream to read from.
const Contact & getLocalAddress() const override
Get the address of the local side of the stream.
OutputStream & getOutputStream() override
Get an OutputStream to write to.
void close() override
Terminate the stream.
void beginPacket() override
Mark the beginning of a logical packet.
void write(const yarp::os::Bytes &b) override
Write a block of bytes to the stream.
void attach(LocalCarrier *owner, bool sender)
void endPacket() override
Mark the end of a logical packet (see beginPacket).
bool setTypeOfService(int tos) override
const Contact & getRemoteAddress() const override
Get the address of the remote side of the stream.
void reset() override
Reset the stream.
A carrier for communicating locally within a process.
Definition: LocalCarrier.h:86
bool sendHeader(ConnectionState &proto) override
Write a header appropriate to the carrier to the connection, followed by any carrier-specific data.
static LocalCarrierManager manager
Definition: LocalCarrier.h:125
bool write(ConnectionState &proto, SizedWriter &writer) override
Write a message.
void accept(yarp::os::Portable *ref)
bool expectReplyToHeader(ConnectionState &proto) override
Process reply to header, if one is expected for this carrier.
bool expectExtraHeader(ConnectionState &proto) override
Receive any carrier-specific header.
bool requireAck() const override
Check if carrier has flow control, requiring sent messages to be acknowledged by recipient.
bool checkHeader(const Bytes &header) override
Given the first 8 bytes received on a connection, decide if this is the right carrier type to use for...
yarp::os::Portable * ref
Definition: LocalCarrier.h:118
bool canEscape() const override
Check if carrier can encode administrative messages, as opposed to just user data.
void setParameters(const Bytes &header) override
Configure this carrier based on the first 8 bytes of the connection.
Carrier * create() const override
Factory method.
virtual bool becomeLocal(ConnectionState &proto, bool sender)
virtual std::string getSpecifierName() const
bool expectIndex(ConnectionState &proto) override
Expect a message header, if there is one for this carrier.
std::string getName() const override
Get the name of this connection type ("tcp", "mcast", "shmem", ...)
void getHeader(Bytes &header) const override
Provide 8 bytes describing this connection sufficiently to allow the other side of a connection to se...
bool isLocal() const override
Check if carrier operates within a single process.
bool respondToHeader(ConnectionState &proto) override
Respond to the header.
bool isConnectionless() const override
Check if this carrier is connectionless (like udp, mcast) or connection based (like tcp).
#define yCError(component,...)
Definition: LogComponent.h:213
#define yCAssert(component, x)
Definition: LogComponent.h:240
#define yCDebug(component,...)
Definition: LogComponent.h:128
#define YARP_OS_LOG_COMPONENT(name, name_string)
Definition: LogComponent.h:29
::ssize_t ssize_t
Definition: numeric.h:86
An interface to the operating system, including Port based communication.
#define YARP_UNUSED(var)
Definition: api.h:162