YARP
Yet Another Robot Platform
 
Loading...
Searching...
No Matches
AbstractCarrier.cpp
Go to the documentation of this file.
1/*
2 * SPDX-FileCopyrightText: 2006-2021 Istituto Italiano di Tecnologia (IIT)
3 * SPDX-FileCopyrightText: 2006-2010 RobotCub Consortium
4 * SPDX-License-Identifier: BSD-3-Clause
5 */
6
8
12#include <yarp/os/Route.h>
13#include <yarp/os/SizedWriter.h>
15
16using namespace yarp::os;
17using namespace yarp::os::impl;
18
19namespace {
20YARP_OS_LOG_COMPONENT(ABSTRACTCARRIER, "yarp.os.AbstractCarrier")
21} // namespace
22
24{
25 YARP_UNUSED(header);
26 // default - no parameters
27}
28
30{
31 // conservative choice - shortcuts are taken for connection
32 return true;
33}
34
36{
37 return !isConnectionless();
38}
39
41{
42 return true;
43}
44
46{
47 return true;
48}
49
51{
52 return false;
53}
54
56{
57 return false;
58}
59
61{
62 return true;
63}
64
66{
67 return false;
68}
69
70std::string AbstractCarrier::toString() const
71{
72 return getName();
73}
74
76{
77 YARP_UNUSED(proto);
78 return true;
79}
80
82{
83 YARP_UNUSED(proto);
84 return defaultSendHeader(proto);
85}
86
88{
89 YARP_UNUSED(proto);
90 return true;
91}
92
94{
95 return defaultSendIndex(proto, writer);
96}
97
99{
100 YARP_UNUSED(proto);
101 return true;
102}
103
105{
106 return defaultExpectIndex(proto);
107}
108
110{
112 Bytes number((char*)&numberSrc, sizeof(NetInt32));
113 int len = 0;
115 if ((size_t)r != number.length()) {
116 yCDebug(ABSTRACTCARRIER, "did not get sender name length");
117 return false;
118 }
119 len = NetType::netInt(number);
120 if (len > 1000) {
121 len = 1000;
122 }
123 if (len < 1) {
124 len = 1;
125 }
126 ManagedBytes b(len + 1);
127 Bytes bytes(b.get(), len);
128 r = proto.is().readFull(bytes);
129 if ((int)r != len) {
130 yCDebug(ABSTRACTCARRIER, "did not get sender name");
131 return false;
132 }
133 std::string s = b.get();
134 Route route = proto.getRoute();
135 route.setFromName(s);
136 proto.setRoute(route);
137 return true;
138}
139
141{
142 return defaultSendAck(proto);
143}
144
146{
147 return defaultExpectAck(proto);
148}
149
151{
152 return true;
153}
154
156{
157 YARP_UNUSED(params);
158}
159
161{
162 YARP_UNUSED(params);
163}
164
166{
167 int x = interpretYarpNumber(b);
168 if (x >= 0) {
169 return x - 7777;
170 }
171 return x;
172}
173
175{
176 createYarpNumber(7777 + specifier, header);
177}
178
180{
181 bool ok = sendIndex(proto, writer);
182 if (!ok) {
183 return false;
184 }
185 writer.write(proto.os());
186 proto.os().flush();
187 return proto.os().isOk();
188}
189
191{
192 bool ok = sendConnectionStateSpecifier(proto);
193 if (!ok) {
194 return false;
195 }
196 return sendSenderSpecifier(proto);
197}
198
200{
201 char buf[8];
202 Bytes header((char*)&buf[0], sizeof(buf));
203 OutputStream& os = proto.os();
204 proto.getConnection().getHeader(header);
205 os.write(header);
206 os.flush();
207 return os.isOk();
208}
209
211{
213 Bytes number((char*)&numberSrc, sizeof(NetInt32));
214 const std::string senderName = proto.getSenderSpecifier();
215 //const std::string& senderName = getRoute().getFromName();
216 NetType::netInt((int)senderName.length() + 1, number);
217 OutputStream& os = proto.os();
218 os.write(number);
219 Bytes b((char*)senderName.c_str(), senderName.length() + 1);
220 os.write(b);
221 os.flush();
222 return os.isOk();
223}
224
226{
227 writeYarpInt(10, proto);
228 int len = (int)writer.length();
229 char lens[] = {(char)len, (char)1, (char)-1, (char)-1, (char)-1, (char)-1, (char)-1, (char)-1, (char)-1, (char)-1};
230 Bytes b(lens, 10);
231 OutputStream& os = proto.os();
232 os.write(b);
234 Bytes number((char*)&numberSrc, sizeof(NetInt32));
235 for (int i = 0; i < len; i++) {
236 NetType::netInt((int)writer.length(i), number);
237 os.write(number);
238 }
240 os.write(number);
241 return os.isOk();
242}
243
245{
246 if (proto.getConnection().requireAck()) {
247 char buf[8];
248 Bytes header((char*)&buf[0], sizeof(buf));
249 yarp::conf::ssize_t hdr = proto.is().readFull(header);
250 if ((size_t)hdr != header.length()) {
251 yCDebug(ABSTRACTCARRIER, "did not get acknowledgement header");
252 return false;
253 }
254 int len = interpretYarpNumber(header);
255 if (len < 0) {
256 yCDebug(ABSTRACTCARRIER, "acknowledgement header is bad");
257 return false;
258 }
259 size_t len2 = proto.is().readDiscard(len);
260 if ((size_t)len != len2) {
261 yCDebug(ABSTRACTCARRIER, "did not get an acknowledgement of the promised length");
262 return false;
263 }
264 }
265 return true;
266}
267
269{
270 yCDebug(ABSTRACTCARRIER, "expecting an index");
271 yCDebug(ABSTRACTCARRIER, "ConnectionState::expectIndex for %s", proto.getRoute().toString().c_str());
272 // expect index header
273 char buf[8];
274 Bytes header((char*)&buf[0], sizeof(buf));
275 yarp::conf::ssize_t r = proto.is().readFull(header);
276 if ((size_t)r != header.length()) {
277 yCDebug(ABSTRACTCARRIER, "broken index");
278 return false;
279 }
280 int len = interpretYarpNumber(header);
281 if (len < 0) {
282 yCDebug(ABSTRACTCARRIER, "broken index - header is not a number");
283 return false;
284 }
285 if (len != 10) {
286 yCDebug(ABSTRACTCARRIER, "broken index - header is wrong length");
287 return false;
288 }
289 yCDebug(ABSTRACTCARRIER, "index coming in happily...");
290 char buf2[10];
291 Bytes indexHeader((char*)&buf2[0], sizeof(buf2));
292 r = proto.is().readFull(indexHeader);
293 if ((size_t)r != indexHeader.length()) {
294 yCDebug(ABSTRACTCARRIER, "broken index, secondary header");
295 return false;
296 }
297 yCDebug(ABSTRACTCARRIER, "secondary header came in happily...");
298 int inLen = (unsigned char)(indexHeader.get()[0]);
299 int outLen = (unsigned char)(indexHeader.get()[1]);
300 // Big limit on number of blocks here! Inherited from QNX.
301 // should make it go away if it hurts someone.
302
303 int total = 0;
305 Bytes number((char*)&numberSrc, sizeof(NetInt32));
306 for (int i = 0; i < inLen; i++) {
308 if ((size_t)l != number.length()) {
309 yCDebug(ABSTRACTCARRIER, "bad input block length");
310 return false;
311 }
312 int x = NetType::netInt(number);
313 total += x;
314 }
315 for (int i2 = 0; i2 < outLen; i2++) {
317 if ((size_t)l != number.length()) {
318 yCDebug(ABSTRACTCARRIER, "bad output block length");
319 return false;
320 }
321 int x = NetType::netInt(number);
322 total += x;
323 }
325 yCDebug(ABSTRACTCARRIER, "Total message length: %d", total);
326 return true;
327}
328
329
331{
332 yCDebug(ABSTRACTCARRIER, "sending an acknowledgment");
333 if (proto.getConnection().requireAck()) {
334 writeYarpInt(0, proto);
335 }
336 return true;
337}
338
340{
341 char buf[8];
342 Bytes header(&(buf[0]), sizeof(buf));
343 yarp::conf::ssize_t len = proto.is().readFull(header);
344 if ((size_t)len != header.length()) {
345 yCDebug(ABSTRACTCARRIER, "data stream died");
346 return -1;
347 }
348 return interpretYarpNumber(header);
349}
350
352{
353 char buf[8];
354 Bytes header(&(buf[0]), sizeof(buf));
355 createYarpNumber(n, header);
356 proto.os().write(header);
357}
358
360{
361 if (b.length() == 8) {
362 const char* base = b.get();
363 if (base[0] == 'Y' && base[1] == 'A' && base[6] == 'R' && base[7] == 'P') {
364 yarp::os::Bytes b2(const_cast<char*>(b.get()) + 2, 4);
365 int x = NetType::netInt(b2);
366 return x;
367 }
368 }
369 return -1;
370}
371
373{
374 if (header.length() != 8) {
375 return;
376 }
377 char* base = header.get();
378 base[0] = 'Y';
379 base[1] = 'A';
380 base[6] = 'R';
381 base[7] = 'P';
382 yarp::os::Bytes code(base + 2, 4);
383 NetType::netInt(x, code);
384}
bool sendConnectionStateSpecifier(ConnectionState &proto)
virtual bool sendIndex(ConnectionState &proto, SizedWriter &writer)
static int interpretYarpNumber(const yarp::os::Bytes &b)
bool requireAck() const override
Check if carrier has flow control, requiring sent messages to be acknowledged by recipient.
bool defaultSendAck(ConnectionState &proto)
Default implementation for the sendAck method.
bool canAccept() const override
Check if reading is implemented for this carrier.
void setCarrierParams(const yarp::os::Property &params) override
Configure carrier from port administrative commands.
void setParameters(const yarp::os::Bytes &header) override
Configure this carrier based on the first 8 bytes of the connection.
bool expectAck(ConnectionState &proto) override
Receive an acknowledgement, if expected for this carrier.
int getSpecifier(const Bytes &b) const
bool write(ConnectionState &proto, SizedWriter &writer) override
Write a message.
void writeYarpInt(int n, ConnectionState &proto)
Write n as an 8 bytes yarp number.
bool sendHeader(ConnectionState &proto) override
Write a header appropriate to the carrier to the connection, followed by any carrier-specific data.
bool prepareSend(ConnectionState &proto) override
Perform any initialization needed before writing on a connection.
void createStandardHeader(int specifier, yarp::os::Bytes &header) const
bool sendAck(ConnectionState &proto) override
Send an acknowledgement, if needed for this carrier.
bool isLocal() const override
Check if carrier operates within a single process.
bool sendSenderSpecifier(ConnectionState &proto)
bool defaultSendIndex(ConnectionState &proto, SizedWriter &writer)
Default implementation for the sendIndex method.
void getCarrierParams(yarp::os::Property &params) const override
Get carrier configuration and deliver it by port administrative commands.
bool expectSenderSpecifier(ConnectionState &proto) override
Expect the name of the sending port.
bool expectReplyToHeader(ConnectionState &proto) override
Process reply to header, if one is expected for this carrier.
bool defaultExpectAck(ConnectionState &proto)
Default implementation for the expectAck method.
bool canOffer() const override
Check if writing is implemented for this carrier.
bool isTextMode() const override
Check if carrier is textual in nature.
std::string getName() const override=0
Get the name of this connection type ("tcp", "mcast", "shmem", ...)
bool isConnectionless() const override
Check if this carrier is connectionless (like udp, mcast) or connection based (like tcp).
int readYarpInt(ConnectionState &proto)
Read 8 bytes and interpret them as a YARP number.
bool canEscape() const override
Check if carrier can encode administrative messages, as opposed to just user data.
bool defaultExpectIndex(ConnectionState &proto)
Default implementation for the expectIndex method.
bool defaultSendHeader(ConnectionState &proto)
Default implementation for the sendHeader method.
std::string toString() const override
Get name of carrier.
bool supportReply() const override
This flag is used by YARP to determine whether the connection can carry RPC traffic,...
bool isActive() const override
Check if carrier is alive and error free.
bool expectIndex(ConnectionState &proto) override
Expect a message header, if there is one for this carrier.
bool expectExtraHeader(ConnectionState &proto) override
Receive any carrier-specific header.
static void createYarpNumber(int x, yarp::os::Bytes &header)
A mini-server for performing network communication in the background.
A simple abstraction for a block of bytes.
Definition Bytes.h:24
size_t length() const
Definition Bytes.cpp:22
const char * get() const
Definition Bytes.cpp:27
The basic state of a connection - route, streams in use, etc.
OutputStream & os()
Shorthand for getOutputStream()
virtual void setRemainingLength(int len)=0
Tell the connection that the given number of bytes are left to be read.
virtual const Route & getRoute() const =0
Get the route associated with this connection.
virtual std::string getSenderSpecifier() const =0
Extract a name for the sender, if the connection type supports that.
InputStream & is()
Shorthand for getInputStream()
virtual void setRoute(const Route &route)=0
Set the route associated with this connection.
virtual Connection & getConnection()=0
Access the controller for this connection.
yarp::conf::ssize_t readDiscard(size_t len)
Read and discard a fixed number of bytes.
yarp::conf::ssize_t readFull(Bytes &b)
Keep reading until buffer is full.
An abstraction for a block of bytes, with optional responsibility for allocating/destroying that bloc...
const char * get() const
static int netInt(const yarp::os::Bytes &code)
Definition NetType.cpp:27
Simple specification of the minimum functions needed from output streams.
virtual void flush()
Make sure all pending write operations are finished.
virtual bool isOk() const =0
Check if the stream is ok or in an error state.
virtual void write(char ch)
Write a single byte to the stream.
A class for storing options and configuration information.
Definition Property.h:33
Information about a connection between two ports.
Definition Route.h:28
void setFromName(const std::string &fromName)
Set the source of the route.
Definition Route.cpp:98
Minimal requirements for an efficient Writer.
Definition SizedWriter.h:32
virtual void write(OutputStream &os)
virtual size_t length() const =0
#define yCDebug(component,...)
#define YARP_OS_LOG_COMPONENT(name, name_string)
::ssize_t ssize_t
Definition numeric.h:86
The components from which ports and connections are built.
An interface to the operating system, including Port based communication.
std::int32_t NetInt32
Definition of the NetInt32 type.
Definition NetInt32.h:29
#define YARP_UNUSED(var)
Definition api.h:162