YARP
Yet Another Robot Platform
AbstractCarrier.cpp
Go to the documentation of this file.
1 /*
2  * Copyright (C) 2006-2020 Istituto Italiano di Tecnologia (IIT)
3  * Copyright (C) 2006-2010 RobotCub Consortium
4  * All rights reserved.
5  *
6  * This software may be modified and distributed under the terms of the
7  * BSD-3-Clause license. See the accompanying LICENSE file for details.
8  */
9 
11 
13 #include <yarp/os/ManagedBytes.h>
14 #include <yarp/os/OutputStream.h>
15 #include <yarp/os/Route.h>
16 #include <yarp/os/SizedWriter.h>
18 
19 using namespace yarp::os;
20 using namespace yarp::os::impl;
21 
22 namespace {
23 YARP_OS_LOG_COMPONENT(ABSTRACTCARRIER, "yarp.os.AbstractCarrier")
24 } // namespace
25 
27 {
28  YARP_UNUSED(header);
29  // default - no parameters
30 }
31 
33 {
34  // conservative choice - shortcuts are taken for connection
35  return true;
36 }
37 
39 {
40  return !isConnectionless();
41 }
42 
44 {
45  return true;
46 }
47 
49 {
50  return true;
51 }
52 
54 {
55  return false;
56 }
57 
59 {
60  return false;
61 }
62 
64 {
65  return true;
66 }
67 
69 {
70  return false;
71 }
72 
73 std::string AbstractCarrier::toString() const
74 {
75  return getName();
76 }
77 
79 {
80  YARP_UNUSED(proto);
81  return true;
82 }
83 
85 {
86  YARP_UNUSED(proto);
87  return defaultSendHeader(proto);
88 }
89 
91 {
92  YARP_UNUSED(proto);
93  return true;
94 }
95 
97 {
98  return defaultSendIndex(proto, writer);
99 }
100 
102 {
103  YARP_UNUSED(proto);
104  return true;
105 }
106 
108 {
109  return defaultExpectIndex(proto);
110 }
111 
113 {
114  NetInt32 numberSrc;
115  Bytes number((char*)&numberSrc, sizeof(NetInt32));
116  int len = 0;
117  yarp::conf::ssize_t r = proto.is().readFull(number);
118  if ((size_t)r != number.length()) {
119  yCDebug(ABSTRACTCARRIER, "did not get sender name length");
120  return false;
121  }
122  len = NetType::netInt(number);
123  if (len > 1000) {
124  len = 1000;
125  }
126  if (len < 1) {
127  len = 1;
128  }
129  ManagedBytes b(len + 1);
130  Bytes bytes(b.get(), len);
131  r = proto.is().readFull(bytes);
132  if ((int)r != len) {
133  yCDebug(ABSTRACTCARRIER, "did not get sender name");
134  return false;
135  }
136  std::string s = b.get();
137  Route route = proto.getRoute();
138  route.setFromName(s);
139  proto.setRoute(route);
140  return true;
141 }
142 
144 {
145  return defaultSendAck(proto);
146 }
147 
149 {
150  return defaultExpectAck(proto);
151 }
152 
154 {
155  return true;
156 }
157 
159 {
160  YARP_UNUSED(params);
161 }
162 
164 {
165  YARP_UNUSED(params);
166 }
167 
169 {
170  int x = interpretYarpNumber(b);
171  if (x >= 0) {
172  return x - 7777;
173  }
174  return x;
175 }
176 
177 void AbstractCarrier::createStandardHeader(int specifier, Bytes& header) const
178 {
179  createYarpNumber(7777 + specifier, header);
180 }
181 
183 {
184  bool ok = sendIndex(proto, writer);
185  if (!ok) {
186  return false;
187  }
188  writer.write(proto.os());
189  proto.os().flush();
190  return proto.os().isOk();
191 }
192 
194 {
195  bool ok = sendConnectionStateSpecifier(proto);
196  if (!ok) {
197  return false;
198  }
199  return sendSenderSpecifier(proto);
200 }
201 
203 {
204  char buf[8];
205  Bytes header((char*)&buf[0], sizeof(buf));
206  OutputStream& os = proto.os();
207  proto.getConnection().getHeader(header);
208  os.write(header);
209  os.flush();
210  return os.isOk();
211 }
212 
214 {
215  NetInt32 numberSrc;
216  Bytes number((char*)&numberSrc, sizeof(NetInt32));
217  const std::string senderName = proto.getSenderSpecifier();
218  //const std::string& senderName = getRoute().getFromName();
219  NetType::netInt((int)senderName.length() + 1, number);
220  OutputStream& os = proto.os();
221  os.write(number);
222  Bytes b((char*)senderName.c_str(), senderName.length() + 1);
223  os.write(b);
224  os.flush();
225  return os.isOk();
226 }
227 
229 {
230  writeYarpInt(10, proto);
231  int len = (int)writer.length();
232  char lens[] = {(char)len, (char)1, (char)-1, (char)-1, (char)-1, (char)-1, (char)-1, (char)-1, (char)-1, (char)-1};
233  Bytes b(lens, 10);
234  OutputStream& os = proto.os();
235  os.write(b);
236  NetInt32 numberSrc;
237  Bytes number((char*)&numberSrc, sizeof(NetInt32));
238  for (int i = 0; i < len; i++) {
239  NetType::netInt((int)writer.length(i), number);
240  os.write(number);
241  }
242  NetType::netInt(0, number);
243  os.write(number);
244  return os.isOk();
245 }
246 
248 {
249  if (proto.getConnection().requireAck()) {
250  char buf[8];
251  Bytes header((char*)&buf[0], sizeof(buf));
252  yarp::conf::ssize_t hdr = proto.is().readFull(header);
253  if ((size_t)hdr != header.length()) {
254  yCDebug(ABSTRACTCARRIER, "did not get acknowledgement header");
255  return false;
256  }
257  int len = interpretYarpNumber(header);
258  if (len < 0) {
259  yCDebug(ABSTRACTCARRIER, "acknowledgement header is bad");
260  return false;
261  }
262  size_t len2 = proto.is().readDiscard(len);
263  if ((size_t)len != len2) {
264  yCDebug(ABSTRACTCARRIER, "did not get an acknowledgement of the promised length");
265  return false;
266  }
267  }
268  return true;
269 }
270 
272 {
273  yCDebug(ABSTRACTCARRIER, "expecting an index");
274  yCDebug(ABSTRACTCARRIER, "ConnectionState::expectIndex for %s", proto.getRoute().toString().c_str());
275  // expect index header
276  char buf[8];
277  Bytes header((char*)&buf[0], sizeof(buf));
278  yarp::conf::ssize_t r = proto.is().readFull(header);
279  if ((size_t)r != header.length()) {
280  yCDebug(ABSTRACTCARRIER, "broken index");
281  return false;
282  }
283  int len = interpretYarpNumber(header);
284  if (len < 0) {
285  yCDebug(ABSTRACTCARRIER, "broken index - header is not a number");
286  return false;
287  }
288  if (len != 10) {
289  yCDebug(ABSTRACTCARRIER, "broken index - header is wrong length");
290  return false;
291  }
292  yCDebug(ABSTRACTCARRIER, "index coming in happily...");
293  char buf2[10];
294  Bytes indexHeader((char*)&buf2[0], sizeof(buf2));
295  r = proto.is().readFull(indexHeader);
296  if ((size_t)r != indexHeader.length()) {
297  yCDebug(ABSTRACTCARRIER, "broken index, secondary header");
298  return false;
299  }
300  yCDebug(ABSTRACTCARRIER, "secondary header came in happily...");
301  int inLen = (unsigned char)(indexHeader.get()[0]);
302  int outLen = (unsigned char)(indexHeader.get()[1]);
303  // Big limit on number of blocks here! Inherited from QNX.
304  // should make it go away if it hurts someone.
305 
306  int total = 0;
307  NetInt32 numberSrc;
308  Bytes number((char*)&numberSrc, sizeof(NetInt32));
309  for (int i = 0; i < inLen; i++) {
310  yarp::conf::ssize_t l = proto.is().readFull(number);
311  if ((size_t)l != number.length()) {
312  yCDebug(ABSTRACTCARRIER, "bad input block length");
313  return false;
314  }
315  int x = NetType::netInt(number);
316  total += x;
317  }
318  for (int i2 = 0; i2 < outLen; i2++) {
319  yarp::conf::ssize_t l = proto.is().readFull(number);
320  if ((size_t)l != number.length()) {
321  yCDebug(ABSTRACTCARRIER, "bad output block length");
322  return false;
323  }
324  int x = NetType::netInt(number);
325  total += x;
326  }
327  proto.setRemainingLength(total);
328  yCDebug(ABSTRACTCARRIER, "Total message length: %d", total);
329  return true;
330 }
331 
332 
334 {
335  yCDebug(ABSTRACTCARRIER, "sending an acknowledgment");
336  if (proto.getConnection().requireAck()) {
337  writeYarpInt(0, proto);
338  }
339  return true;
340 }
341 
343 {
344  char buf[8];
345  Bytes header(&(buf[0]), sizeof(buf));
346  yarp::conf::ssize_t len = proto.is().readFull(header);
347  if ((size_t)len != header.length()) {
348  yCDebug(ABSTRACTCARRIER, "data stream died");
349  return -1;
350  }
351  return interpretYarpNumber(header);
352 }
353 
355 {
356  char buf[8];
357  Bytes header(&(buf[0]), sizeof(buf));
358  createYarpNumber(n, header);
359  proto.os().write(header);
360 }
361 
363 {
364  if (b.length() == 8) {
365  const char* base = b.get();
366  if (base[0] == 'Y' && base[1] == 'A' && base[6] == 'R' && base[7] == 'P') {
367  yarp::os::Bytes b2(const_cast<char*>(b.get()) + 2, 4);
368  int x = NetType::netInt(b2);
369  return x;
370  }
371  }
372  return -1;
373 }
374 
376 {
377  if (header.length() != 8) {
378  return;
379  }
380  char* base = header.get();
381  base[0] = 'Y';
382  base[1] = 'A';
383  base[6] = 'R';
384  base[7] = 'P';
385  yarp::os::Bytes code(base + 2, 4);
386  NetType::netInt(x, code);
387 }
bool sendConnectionStateSpecifier(ConnectionState &proto)
virtual bool sendIndex(ConnectionState &proto, SizedWriter &writer)
static int interpretYarpNumber(const yarp::os::Bytes &b)
bool requireAck() const override
Check if carrier has flow control, requiring sent messages to be acknowledged by recipient.
bool defaultSendAck(ConnectionState &proto)
Default implementation for the sendAck method.
bool canAccept() const override
Check if reading is implemented for this carrier.
void setCarrierParams(const yarp::os::Property &params) override
Configure carrier from port administrative commands.
void setParameters(const yarp::os::Bytes &header) override
Configure this carrier based on the first 8 bytes of the connection.
bool expectAck(ConnectionState &proto) override
Receive an acknowledgement, if expected for this carrier.
int getSpecifier(const Bytes &b) const
bool write(ConnectionState &proto, SizedWriter &writer) override
Write a message.
void writeYarpInt(int n, ConnectionState &proto)
Write n as an 8 bytes yarp number.
bool sendHeader(ConnectionState &proto) override
Write a header appropriate to the carrier to the connection, followed by any carrier-specific data.
bool prepareSend(ConnectionState &proto) override
Perform any initialization needed before writing on a connection.
void createStandardHeader(int specifier, yarp::os::Bytes &header) const
bool sendAck(ConnectionState &proto) override
Send an acknowledgement, if needed for this carrier.
bool isLocal() const override
Check if carrier operates within a single process.
bool sendSenderSpecifier(ConnectionState &proto)
bool defaultSendIndex(ConnectionState &proto, SizedWriter &writer)
Default implementation for the sendIndex method.
void getCarrierParams(yarp::os::Property &params) const override
Get carrier configuration and deliver it by port administrative commands.
bool expectSenderSpecifier(ConnectionState &proto) override
Expect the name of the sending port.
bool expectReplyToHeader(ConnectionState &proto) override
Process reply to header, if one is expected for this carrier.
bool defaultExpectAck(ConnectionState &proto)
Default implementation for the expectAck method.
bool canOffer() const override
Check if writing is implemented for this carrier.
bool isTextMode() const override
Check if carrier is textual in nature.
bool isConnectionless() const override
Check if this carrier is connectionless (like udp, mcast) or connection based (like tcp).
int readYarpInt(ConnectionState &proto)
Read 8 bytes and interpret them as a YARP number.
bool canEscape() const override
Check if carrier can encode administrative messages, as opposed to just user data.
bool defaultExpectIndex(ConnectionState &proto)
Default implementation for the expectIndex method.
bool defaultSendHeader(ConnectionState &proto)
Default implementation for the sendHeader method.
std::string toString() const override
Get name of carrier.
bool supportReply() const override
This flag is used by YARP to determine whether the connection can carry RPC traffic,...
bool isActive() const override
Check if carrier is alive and error free.
bool expectIndex(ConnectionState &proto) override
Expect a message header, if there is one for this carrier.
bool expectExtraHeader(ConnectionState &proto) override
Receive any carrier-specific header.
static void createYarpNumber(int x, yarp::os::Bytes &header)
A simple abstraction for a block of bytes.
Definition: Bytes.h:28
size_t length() const
Definition: Bytes.cpp:25
const char * get() const
Definition: Bytes.cpp:30
The basic state of a connection - route, streams in use, etc.
virtual void setRemainingLength(int len)=0
Tell the connection that the given number of bytes are left to be read.
virtual const Route & getRoute() const =0
Get the route associated with this connection.
virtual Connection & getConnection()=0
Access the controller for this connection.
virtual std::string getSenderSpecifier() const =0
Extract a name for the sender, if the connection type supports that.
InputStream & is()
Shorthand for getInputStream()
OutputStream & os()
Shorthand for getOutputStream()
virtual void setRoute(const Route &route)=0
Set the route associated with this connection.
virtual bool requireAck() const =0
Check if carrier has flow control, requiring sent messages to be acknowledged by recipient.
virtual void getHeader(yarp::os::Bytes &header) const =0
Provide 8 bytes describing this connection sufficiently to allow the other side of a connection to se...
yarp::conf::ssize_t readDiscard(size_t len)
Read and discard a fixed number of bytes.
yarp::conf::ssize_t readFull(Bytes &b)
Keep reading until buffer is full.
Definition: InputStream.cpp:99
An abstraction for a block of bytes, with optional responsibility for allocating/destroying that bloc...
Definition: ManagedBytes.h:25
const char * get() const
static int netInt(const yarp::os::Bytes &code)
Definition: NetType.cpp:96
Simple specification of the minimum functions needed from output streams.
Definition: OutputStream.h:25
virtual void flush()
Make sure all pending write operations are finished.
virtual bool isOk() const =0
Check if the stream is ok or in an error state.
virtual void write(char ch)
Write a single byte to the stream.
A class for storing options and configuration information.
Definition: Property.h:37
Information about a connection between two ports.
Definition: Route.h:32
std::string toString() const
Render a text form of the route, "source->carrier->dest".
Definition: Route.cpp:141
void setFromName(const std::string &fromName)
Set the source of the route.
Definition: Route.cpp:101
Minimal requirements for an efficient Writer.
Definition: SizedWriter.h:36
virtual void write(OutputStream &os)
Definition: SizedWriter.cpp:19
virtual size_t length() const =0
#define yCDebug(component,...)
Definition: LogComponent.h:112
#define YARP_OS_LOG_COMPONENT(name, name_string)
Definition: LogComponent.h:37
::ssize_t ssize_t
Definition: numeric.h:60
The components from which ports and connections are built.
An interface to the operating system, including Port based communication.
std::int32_t NetInt32
Definition of the NetInt32 type.
Definition: NetInt32.h:33
#define YARP_UNUSED(var)
Definition: api.h:159