YARP
Yet Another Robot Platform
AbstractCarrier.cpp
Go to the documentation of this file.
1 /*
2  * SPDX-FileCopyrightText: 2006-2021 Istituto Italiano di Tecnologia (IIT)
3  * SPDX-FileCopyrightText: 2006-2010 RobotCub Consortium
4  * SPDX-License-Identifier: BSD-3-Clause
5  */
6 
8 
10 #include <yarp/os/ManagedBytes.h>
11 #include <yarp/os/OutputStream.h>
12 #include <yarp/os/Route.h>
13 #include <yarp/os/SizedWriter.h>
15 
16 using namespace yarp::os;
17 using namespace yarp::os::impl;
18 
19 namespace {
20 YARP_OS_LOG_COMPONENT(ABSTRACTCARRIER, "yarp.os.AbstractCarrier")
21 } // namespace
22 
24 {
25  YARP_UNUSED(header);
26  // default - no parameters
27 }
28 
30 {
31  // conservative choice - shortcuts are taken for connection
32  return true;
33 }
34 
36 {
37  return !isConnectionless();
38 }
39 
41 {
42  return true;
43 }
44 
46 {
47  return true;
48 }
49 
51 {
52  return false;
53 }
54 
56 {
57  return false;
58 }
59 
61 {
62  return true;
63 }
64 
66 {
67  return false;
68 }
69 
70 std::string AbstractCarrier::toString() const
71 {
72  return getName();
73 }
74 
76 {
77  YARP_UNUSED(proto);
78  return true;
79 }
80 
82 {
83  YARP_UNUSED(proto);
84  return defaultSendHeader(proto);
85 }
86 
88 {
89  YARP_UNUSED(proto);
90  return true;
91 }
92 
94 {
95  return defaultSendIndex(proto, writer);
96 }
97 
99 {
100  YARP_UNUSED(proto);
101  return true;
102 }
103 
105 {
106  return defaultExpectIndex(proto);
107 }
108 
110 {
111  NetInt32 numberSrc;
112  Bytes number((char*)&numberSrc, sizeof(NetInt32));
113  int len = 0;
114  yarp::conf::ssize_t r = proto.is().readFull(number);
115  if ((size_t)r != number.length()) {
116  yCDebug(ABSTRACTCARRIER, "did not get sender name length");
117  return false;
118  }
119  len = NetType::netInt(number);
120  if (len > 1000) {
121  len = 1000;
122  }
123  if (len < 1) {
124  len = 1;
125  }
126  ManagedBytes b(len + 1);
127  Bytes bytes(b.get(), len);
128  r = proto.is().readFull(bytes);
129  if ((int)r != len) {
130  yCDebug(ABSTRACTCARRIER, "did not get sender name");
131  return false;
132  }
133  std::string s = b.get();
134  Route route = proto.getRoute();
135  route.setFromName(s);
136  proto.setRoute(route);
137  return true;
138 }
139 
141 {
142  return defaultSendAck(proto);
143 }
144 
146 {
147  return defaultExpectAck(proto);
148 }
149 
151 {
152  return true;
153 }
154 
156 {
157  YARP_UNUSED(params);
158 }
159 
161 {
162  YARP_UNUSED(params);
163 }
164 
166 {
167  int x = interpretYarpNumber(b);
168  if (x >= 0) {
169  return x - 7777;
170  }
171  return x;
172 }
173 
174 void AbstractCarrier::createStandardHeader(int specifier, Bytes& header) const
175 {
176  createYarpNumber(7777 + specifier, header);
177 }
178 
180 {
181  bool ok = sendIndex(proto, writer);
182  if (!ok) {
183  return false;
184  }
185  writer.write(proto.os());
186  proto.os().flush();
187  return proto.os().isOk();
188 }
189 
191 {
192  bool ok = sendConnectionStateSpecifier(proto);
193  if (!ok) {
194  return false;
195  }
196  return sendSenderSpecifier(proto);
197 }
198 
200 {
201  char buf[8];
202  Bytes header((char*)&buf[0], sizeof(buf));
203  OutputStream& os = proto.os();
204  proto.getConnection().getHeader(header);
205  os.write(header);
206  os.flush();
207  return os.isOk();
208 }
209 
211 {
212  NetInt32 numberSrc;
213  Bytes number((char*)&numberSrc, sizeof(NetInt32));
214  const std::string senderName = proto.getSenderSpecifier();
215  //const std::string& senderName = getRoute().getFromName();
216  NetType::netInt((int)senderName.length() + 1, number);
217  OutputStream& os = proto.os();
218  os.write(number);
219  Bytes b((char*)senderName.c_str(), senderName.length() + 1);
220  os.write(b);
221  os.flush();
222  return os.isOk();
223 }
224 
226 {
227  writeYarpInt(10, proto);
228  int len = (int)writer.length();
229  char lens[] = {(char)len, (char)1, (char)-1, (char)-1, (char)-1, (char)-1, (char)-1, (char)-1, (char)-1, (char)-1};
230  Bytes b(lens, 10);
231  OutputStream& os = proto.os();
232  os.write(b);
233  NetInt32 numberSrc;
234  Bytes number((char*)&numberSrc, sizeof(NetInt32));
235  for (int i = 0; i < len; i++) {
236  NetType::netInt((int)writer.length(i), number);
237  os.write(number);
238  }
239  NetType::netInt(0, number);
240  os.write(number);
241  return os.isOk();
242 }
243 
245 {
246  if (proto.getConnection().requireAck()) {
247  char buf[8];
248  Bytes header((char*)&buf[0], sizeof(buf));
249  yarp::conf::ssize_t hdr = proto.is().readFull(header);
250  if ((size_t)hdr != header.length()) {
251  yCDebug(ABSTRACTCARRIER, "did not get acknowledgement header");
252  return false;
253  }
254  int len = interpretYarpNumber(header);
255  if (len < 0) {
256  yCDebug(ABSTRACTCARRIER, "acknowledgement header is bad");
257  return false;
258  }
259  size_t len2 = proto.is().readDiscard(len);
260  if ((size_t)len != len2) {
261  yCDebug(ABSTRACTCARRIER, "did not get an acknowledgement of the promised length");
262  return false;
263  }
264  }
265  return true;
266 }
267 
269 {
270  yCDebug(ABSTRACTCARRIER, "expecting an index");
271  yCDebug(ABSTRACTCARRIER, "ConnectionState::expectIndex for %s", proto.getRoute().toString().c_str());
272  // expect index header
273  char buf[8];
274  Bytes header((char*)&buf[0], sizeof(buf));
275  yarp::conf::ssize_t r = proto.is().readFull(header);
276  if ((size_t)r != header.length()) {
277  yCDebug(ABSTRACTCARRIER, "broken index");
278  return false;
279  }
280  int len = interpretYarpNumber(header);
281  if (len < 0) {
282  yCDebug(ABSTRACTCARRIER, "broken index - header is not a number");
283  return false;
284  }
285  if (len != 10) {
286  yCDebug(ABSTRACTCARRIER, "broken index - header is wrong length");
287  return false;
288  }
289  yCDebug(ABSTRACTCARRIER, "index coming in happily...");
290  char buf2[10];
291  Bytes indexHeader((char*)&buf2[0], sizeof(buf2));
292  r = proto.is().readFull(indexHeader);
293  if ((size_t)r != indexHeader.length()) {
294  yCDebug(ABSTRACTCARRIER, "broken index, secondary header");
295  return false;
296  }
297  yCDebug(ABSTRACTCARRIER, "secondary header came in happily...");
298  int inLen = (unsigned char)(indexHeader.get()[0]);
299  int outLen = (unsigned char)(indexHeader.get()[1]);
300  // Big limit on number of blocks here! Inherited from QNX.
301  // should make it go away if it hurts someone.
302 
303  int total = 0;
304  NetInt32 numberSrc;
305  Bytes number((char*)&numberSrc, sizeof(NetInt32));
306  for (int i = 0; i < inLen; i++) {
307  yarp::conf::ssize_t l = proto.is().readFull(number);
308  if ((size_t)l != number.length()) {
309  yCDebug(ABSTRACTCARRIER, "bad input block length");
310  return false;
311  }
312  int x = NetType::netInt(number);
313  total += x;
314  }
315  for (int i2 = 0; i2 < outLen; i2++) {
316  yarp::conf::ssize_t l = proto.is().readFull(number);
317  if ((size_t)l != number.length()) {
318  yCDebug(ABSTRACTCARRIER, "bad output block length");
319  return false;
320  }
321  int x = NetType::netInt(number);
322  total += x;
323  }
324  proto.setRemainingLength(total);
325  yCDebug(ABSTRACTCARRIER, "Total message length: %d", total);
326  return true;
327 }
328 
329 
331 {
332  yCDebug(ABSTRACTCARRIER, "sending an acknowledgment");
333  if (proto.getConnection().requireAck()) {
334  writeYarpInt(0, proto);
335  }
336  return true;
337 }
338 
340 {
341  char buf[8];
342  Bytes header(&(buf[0]), sizeof(buf));
343  yarp::conf::ssize_t len = proto.is().readFull(header);
344  if ((size_t)len != header.length()) {
345  yCDebug(ABSTRACTCARRIER, "data stream died");
346  return -1;
347  }
348  return interpretYarpNumber(header);
349 }
350 
352 {
353  char buf[8];
354  Bytes header(&(buf[0]), sizeof(buf));
355  createYarpNumber(n, header);
356  proto.os().write(header);
357 }
358 
360 {
361  if (b.length() == 8) {
362  const char* base = b.get();
363  if (base[0] == 'Y' && base[1] == 'A' && base[6] == 'R' && base[7] == 'P') {
364  yarp::os::Bytes b2(const_cast<char*>(b.get()) + 2, 4);
365  int x = NetType::netInt(b2);
366  return x;
367  }
368  }
369  return -1;
370 }
371 
373 {
374  if (header.length() != 8) {
375  return;
376  }
377  char* base = header.get();
378  base[0] = 'Y';
379  base[1] = 'A';
380  base[6] = 'R';
381  base[7] = 'P';
382  yarp::os::Bytes code(base + 2, 4);
383  NetType::netInt(x, code);
384 }
bool sendConnectionStateSpecifier(ConnectionState &proto)
virtual bool sendIndex(ConnectionState &proto, SizedWriter &writer)
static int interpretYarpNumber(const yarp::os::Bytes &b)
bool requireAck() const override
Check if carrier has flow control, requiring sent messages to be acknowledged by recipient.
bool defaultSendAck(ConnectionState &proto)
Default implementation for the sendAck method.
bool canAccept() const override
Check if reading is implemented for this carrier.
void setCarrierParams(const yarp::os::Property &params) override
Configure carrier from port administrative commands.
void setParameters(const yarp::os::Bytes &header) override
Configure this carrier based on the first 8 bytes of the connection.
bool expectAck(ConnectionState &proto) override
Receive an acknowledgement, if expected for this carrier.
int getSpecifier(const Bytes &b) const
bool write(ConnectionState &proto, SizedWriter &writer) override
Write a message.
void writeYarpInt(int n, ConnectionState &proto)
Write n as an 8 bytes yarp number.
bool sendHeader(ConnectionState &proto) override
Write a header appropriate to the carrier to the connection, followed by any carrier-specific data.
bool prepareSend(ConnectionState &proto) override
Perform any initialization needed before writing on a connection.
void createStandardHeader(int specifier, yarp::os::Bytes &header) const
bool sendAck(ConnectionState &proto) override
Send an acknowledgement, if needed for this carrier.
bool isLocal() const override
Check if carrier operates within a single process.
bool sendSenderSpecifier(ConnectionState &proto)
bool defaultSendIndex(ConnectionState &proto, SizedWriter &writer)
Default implementation for the sendIndex method.
void getCarrierParams(yarp::os::Property &params) const override
Get carrier configuration and deliver it by port administrative commands.
bool expectSenderSpecifier(ConnectionState &proto) override
Expect the name of the sending port.
bool expectReplyToHeader(ConnectionState &proto) override
Process reply to header, if one is expected for this carrier.
bool defaultExpectAck(ConnectionState &proto)
Default implementation for the expectAck method.
bool canOffer() const override
Check if writing is implemented for this carrier.
bool isTextMode() const override
Check if carrier is textual in nature.
bool isConnectionless() const override
Check if this carrier is connectionless (like udp, mcast) or connection based (like tcp).
int readYarpInt(ConnectionState &proto)
Read 8 bytes and interpret them as a YARP number.
bool canEscape() const override
Check if carrier can encode administrative messages, as opposed to just user data.
bool defaultExpectIndex(ConnectionState &proto)
Default implementation for the expectIndex method.
bool defaultSendHeader(ConnectionState &proto)
Default implementation for the sendHeader method.
std::string toString() const override
Get name of carrier.
bool supportReply() const override
This flag is used by YARP to determine whether the connection can carry RPC traffic,...
bool isActive() const override
Check if carrier is alive and error free.
bool expectIndex(ConnectionState &proto) override
Expect a message header, if there is one for this carrier.
bool expectExtraHeader(ConnectionState &proto) override
Receive any carrier-specific header.
static void createYarpNumber(int x, yarp::os::Bytes &header)
A simple abstraction for a block of bytes.
Definition: Bytes.h:24
size_t length() const
Definition: Bytes.cpp:22
const char * get() const
Definition: Bytes.cpp:27
The basic state of a connection - route, streams in use, etc.
virtual void setRemainingLength(int len)=0
Tell the connection that the given number of bytes are left to be read.
virtual const Route & getRoute() const =0
Get the route associated with this connection.
virtual Connection & getConnection()=0
Access the controller for this connection.
virtual std::string getSenderSpecifier() const =0
Extract a name for the sender, if the connection type supports that.
InputStream & is()
Shorthand for getInputStream()
OutputStream & os()
Shorthand for getOutputStream()
virtual void setRoute(const Route &route)=0
Set the route associated with this connection.
virtual bool requireAck() const =0
Check if carrier has flow control, requiring sent messages to be acknowledged by recipient.
virtual void getHeader(yarp::os::Bytes &header) const =0
Provide 8 bytes describing this connection sufficiently to allow the other side of a connection to se...
yarp::conf::ssize_t readDiscard(size_t len)
Read and discard a fixed number of bytes.
yarp::conf::ssize_t readFull(Bytes &b)
Keep reading until buffer is full.
Definition: InputStream.cpp:96
An abstraction for a block of bytes, with optional responsibility for allocating/destroying that bloc...
Definition: ManagedBytes.h:21
const char * get() const
static int netInt(const yarp::os::Bytes &code)
Definition: NetType.cpp:27
Simple specification of the minimum functions needed from output streams.
Definition: OutputStream.h:21
virtual void flush()
Make sure all pending write operations are finished.
virtual bool isOk() const =0
Check if the stream is ok or in an error state.
virtual void write(char ch)
Write a single byte to the stream.
A class for storing options and configuration information.
Definition: Property.h:33
Information about a connection between two ports.
Definition: Route.h:28
std::string toString() const
Render a text form of the route, "source->carrier->dest".
Definition: Route.cpp:138
void setFromName(const std::string &fromName)
Set the source of the route.
Definition: Route.cpp:98
Minimal requirements for an efficient Writer.
Definition: SizedWriter.h:32
virtual void write(OutputStream &os)
Definition: SizedWriter.cpp:16
virtual size_t length() const =0
#define yCDebug(component,...)
Definition: LogComponent.h:128
#define YARP_OS_LOG_COMPONENT(name, name_string)
Definition: LogComponent.h:29
::ssize_t ssize_t
Definition: numeric.h:86
The components from which ports and connections are built.
An interface to the operating system, including Port based communication.
std::int32_t NetInt32
Definition of the NetInt32 type.
Definition: NetInt32.h:29
#define YARP_UNUSED(var)
Definition: api.h:162