YARP
Yet Another Robot Platform
StreamConnectionReader.cpp
Go to the documentation of this file.
1 /*
2  * SPDX-FileCopyrightText: 2006-2021 Istituto Italiano di Tecnologia (IIT)
3  * SPDX-FileCopyrightText: 2006-2010 RobotCub Consortium
4  * SPDX-License-Identifier: BSD-3-Clause
5  */
6 
7 #include <yarp/os/Bottle.h>
8 #include <yarp/os/Log.h>
9 #include <yarp/os/NetType.h>
11 #include <yarp/os/impl/Protocol.h>
13 
14 using namespace yarp::os::impl;
15 using namespace yarp::os;
16 
19  writer(nullptr),
20  in(nullptr),
21  str(nullptr),
22  protocol(nullptr),
23  messageLen(0),
24  textMode(false),
25  bareMode(false),
26  valid(false),
27  err(false),
28  shouldDrop(false),
29  writePending(false),
30  ref(nullptr),
31  convertedTextMode(false),
32  pushedIntFlag(false),
33  pushedInt(-1),
34  parentConnectionReader(nullptr)
35 {
36 }
37 
39 {
40  if (writer != nullptr) {
41  delete writer;
42  writer = nullptr;
43  }
44 }
45 
47  TwoWayStream* str,
48  const Route& route,
49  size_t len,
50  bool textMode,
51  bool bareMode)
52 {
53  this->in = &in;
54  this->str = str;
55  this->route = route;
56  this->messageLen = len;
57  this->textMode = textMode;
58  this->bareMode = bareMode;
59  this->valid = true;
60  ref = nullptr;
61  err = false;
62  convertedTextMode = false;
63  pushedIntFlag = false;
64 }
65 
67 {
68  this->protocol = protocol;
69 }
70 
72 {
73  str = nullptr;
74 }
75 
77 {
78  return shouldDrop;
79 }
80 
82 {
83  if (!isGood()) {
84  return false;
85  }
86  yAssert(in != nullptr);
87  size_t len = b.length();
88  if (len == 0) {
89  return true;
90  }
91  //if (len<0) len = messageLen;
92  if (len > 0) {
93  yarp::conf::ssize_t rlen = in->readFull(b);
94  if (rlen >= 0) {
95  messageLen -= len;
96  return true;
97  }
98  }
99  err = true;
100  return false;
101 }
102 
103 #ifndef YARP_NO_DEPRECATED // Since YARP 3.2
105 {
106  if (!isGood()) {
107  return {};
108  }
109  char* buf = new char[len];
110  yarp::os::Bytes b(buf, len);
111  yAssert(in != nullptr);
112  yarp::conf::ssize_t r = in->read(b);
113  if (r < 0 || static_cast<size_t>(r) < b.length()) {
114  err = true;
115  delete[] buf;
116  return {};
117  }
118  messageLen -= b.length();
119  std::string s = buf;
120  delete[] buf;
121  return s;
122 }
123 #endif // YARP_NO_DEPRECATED
124 
126 {
127  if (!isGood()) {
128  return {};
129  }
130  yAssert(in != nullptr);
131  bool success = false;
132  std::string result = in->readLine('\n', &success);
133  if (!success) {
134  err = true;
135  return {};
136  }
137  messageLen -= result.length() + 1;
138  return result;
139 }
140 
142 {
143  if (writer != nullptr) {
144  if (writePending) {
145  if (str != nullptr) {
146  if (protocol != nullptr) {
147  protocol->reply(*writer);
148  } else {
149  writer->write(str->getOutputStream());
150  }
151  writer->clear();
152  }
153  }
154  }
155  writePending = false;
156 }
157 
159 {
160  ref = obj;
161 }
162 
164 {
165  reset(*in, str, route, len, textMode, bareMode);
166  return true;
167 }
168 
170 {
171  return messageLen + (pushedIntFlag ? sizeof(yarp::os::NetInt32) : 0);
172 }
173 
175 {
176  if (pushedIntFlag) {
177  return false;
178  }
179  pushedIntFlag = true;
180  pushedInt = x;
181  return true;
182 }
183 
184 template <typename T, typename NetT>
185 inline T StreamConnectionReader::expectType()
186 {
187  yAssert(in != nullptr);
188 
189  NetT x = 0;
190  yarp::os::Bytes b(reinterpret_cast<char*>(&x), sizeof(T));
191  yarp::conf::ssize_t r = in->read(b);
192  if (r < 0 || static_cast<size_t>(r) < b.length()) {
193  err = true;
194  return 0;
195  }
196  messageLen -= b.length();
197 
198  return static_cast<T>(x);
199 }
200 
202 {
203  if (!isGood()) {
204  return 0;
205  }
206  return expectType<std::int8_t, NetInt8>();
207 }
208 
210 {
211  if (!isGood()) {
212  return 0;
213  }
214  return expectType<std::int16_t, NetInt16>();
215 }
216 
218 {
219  if (pushedIntFlag) {
220  pushedIntFlag = false;
221  return pushedInt;
222  }
223  if (!isGood()) {
224  return 0;
225  }
226  return expectType<std::int32_t, NetInt32>();
227 }
228 
230 {
231  if (!isGood()) {
232  return 0;
233  }
234  return expectType<std::int64_t, NetInt64>();
235 }
236 
238 {
239  if (!isGood()) {
240  return 0;
241  }
242  return expectType<yarp::conf::float32_t, NetFloat32>();
243 }
244 
246 {
247  if (!isGood()) {
248  return 0;
249  }
250  return expectType<yarp::conf::float64_t, NetFloat64>();
251 }
252 
253 bool StreamConnectionReader::expectBlock(char* data, size_t len)
254 {
255  yarp::os::Bytes bytes(data, len);
256  return expectBlock(bytes);
257 }
258 
259 std::string StreamConnectionReader::expectText(const char terminatingChar)
260 {
261  if (!isGood()) {
262  return {};
263  }
264  yAssert(in != nullptr);
265  bool lsuccess = false;
266  std::string result = in->readLine(terminatingChar, &lsuccess);
267  if (lsuccess) {
268  messageLen -= result.length() + 1;
269  }
270  return result;
271 }
272 
274 {
275  return textMode;
276 }
277 
279 {
280  return bareMode;
281 }
282 
284 {
285  if (isTextMode()) {
286  if (!convertedTextMode) {
287  Bottle bot;
288  bot.read(*this);
290  bot.write(writer);
291  std::string s = writer.toString();
292  altStream.reset(s);
293  in = &altStream;
294  convertedTextMode = true;
295  }
296  }
297 
298  return true;
299 }
300 
302 {
303  if (str == nullptr) {
304  return nullptr;
305  }
306  if (writer == nullptr) {
308  yAssert(writer != nullptr);
309  }
310  writer->clear();
311  writePending = true;
312  if (protocol != nullptr) {
313  protocol->willReply();
314  }
315  return writer;
316 }
317 
319 {
320  if (str != nullptr) {
321  Contact remote = str->getRemoteAddress();
322  remote.setName(route.getFromName());
323  return remote;
324  }
325  Contact remote = yarp::os::Contact(route.getFromName(), route.getCarrierName());
326  return remote;
327 }
328 
330 {
331  if (str != nullptr) {
332  Contact local = str->getLocalAddress();
333  local.setName(route.getToName());
334  return local;
335  }
336  return yarp::os::Contact();
337 }
338 
340 {
341  return valid;
342 }
343 
345 {
346  if (err) {
347  return true;
348  }
349  return !isActive();
350 }
351 
353 {
354  if (shouldDrop) {
355  return false;
356  }
357  if (!isValid()) {
358  return false;
359  }
360  if (in != nullptr) {
361  if (in->isOk()) {
362  return true;
363  }
364  }
365  return false;
366 }
367 
369 {
370  return ref;
371 }
372 
374 {
375  if (protocol != nullptr) {
376  const std::string& env = protocol->getEnvelope();
377  return {const_cast<char*>(env.c_str()), env.length()};
378  }
379  if (parentConnectionReader != nullptr) {
380  return parentConnectionReader->readEnvelope();
381  }
382  return {nullptr, 0};
383 }
384 
386 {
387  shouldDrop = true;
388 }
389 
391 {
392  if (config.size() == 0) {
393  if (protocol != nullptr) {
394  const_cast<Bottle&>(config).fromString(protocol->getSenderSpecifier());
395  }
396  }
397  return config;
398 }
399 
401 {
402  this->parentConnectionReader = parentConnectionReader;
403 }
404 
405 bool StreamConnectionReader::isGood()
406 {
407  return isActive() && isValid() && !isError();
408 }
#define yAssert(x)
Definition: Log.h:294
A simple collection of objects that can be described and transmitted in a portable way.
Definition: Bottle.h:74
size_type size() const
Gets the number of elements in the bottle.
Definition: Bottle.cpp:251
bool read(ConnectionReader &reader) override
Set the bottle's value based on input from a network connection.
Definition: Bottle.cpp:240
bool write(ConnectionWriter &writer) const override
Output a representation of the bottle to a network connection.
Definition: Bottle.cpp:230
A simple abstraction for a block of bytes.
Definition: Bytes.h:25
size_t length() const
Definition: Bytes.cpp:22
An interface for reading from a network connection.
virtual Bytes readEnvelope()
Read a message envelope, if available.
An interface for writing to a network connection.
Represents how to reach a part of a YARP network.
Definition: Contact.h:36
void setName(const std::string &name)
Set the name associated with this Contact.
Definition: Contact.cpp:222
Simple specification of the minimum functions needed from input streams.
Definition: InputStream.h:26
virtual bool isOk() const =0
Check if the stream is ok or in an error state.
virtual int read()
Read and return a single byte.
Definition: InputStream.cpp:20
yarp::conf::ssize_t readFull(Bytes &b)
Keep reading until buffer is full.
Definition: InputStream.cpp:96
std::string readLine(const char terminal='\n', bool *success=nullptr)
Read a block of text terminated with a specific marker (or EOF).
Definition: InputStream.cpp:54
This is a base class for objects that can be both read from and be written to the YARP network.
Definition: Portable.h:26
Information about a connection between two ports.
Definition: Route.h:29
const std::string & getToName() const
Get the destination of the route.
Definition: Route.cpp:103
const std::string & getCarrierName() const
Get the carrier type of the route.
Definition: Route.cpp:123
const std::string & getFromName() const
Get the source of the route.
Definition: Route.cpp:93
A base class for nested structures that can be searched.
Definition: Searchable.h:66
A stream which can be asked to perform bidirectional communication.
Definition: TwoWayStream.h:26
virtual OutputStream & getOutputStream()=0
Get an OutputStream to write to.
virtual const Contact & getLocalAddress() const =0
Get the address of the local side of the stream.
virtual const Contact & getRemoteAddress() const =0
Get the address of the remote side of the stream.
A helper for creating cached object descriptions.
bool write(ConnectionWriter &connection) const override
Write this object to a network connection.
void clear() override
Clear all cached data.
Connection choreographer.
Definition: Protocol.h:31
std::string getSenderSpecifier() const override
Extract a name for the sender, if the connection type supports that.
Definition: Protocol.cpp:149
const std::string & getEnvelope() const override
Read the envelope associated with the current message.
Definition: Protocol.cpp:178
void reply(SizedWriter &writer) override
Reply to a message we have just read.
Definition: Protocol.cpp:320
void willReply()
Promise that we'll be making a reply.
Definition: Protocol.h:90
virtual std::string expectString()
Read a string from the network connection.
std::int64_t expectInt64() override
Read a 64-bit integer from the network connection.
yarp::os::Contact getRemoteContact() const override
Gets information about who is supplying the data being read, if that information is available.
bool convertTextMode() override
Reads in a standard description in text mode, and converts it to a standard description in binary.
void setParentConnectionReader(ConnectionReader *parentConnectionReader) override
Set ConnectionReader to be used for reading the envelope.
yarp::os::ConnectionWriter * getWriter() override
Gets a way to reply to the message, if possible.
bool pushInt(int x) override
Store an integer to return on the next call to expectInt()
bool isBareMode() const override
Check if the connection is bare mode.
std::int16_t expectInt16() override
Read a 16-bit integer from the network connection.
yarp::conf::float64_t expectFloat64() override
Read a 64-bit floating point number from the network connection.
yarp::os::Contact getLocalContact() const override
Gets information about who is receiving the data, if that information is available.
const yarp::os::Searchable & getConnectionModifiers() const override
Access modifiers associated with the connection, if any.
size_t getSize() const override
Checks how much data is available.
std::string expectText(const char terminatingChar) override
Read some text from the network connection.
std::int8_t expectInt8() override
Read a 8-bit integer from the network connection.
yarp::conf::float32_t expectFloat32() override
Read a 32-bit floating point number from the network connection.
yarp::os::Bytes readEnvelope() override
Read a message envelope, if available.
void reset(yarp::os::InputStream &in, TwoWayStream *str, const Route &route, size_t len, bool textMode, bool bareMode=false)
virtual bool expectBlock(yarp::os::Bytes &b)
bool isTextMode() const override
Check if the connection is text mode.
yarp::os::Portable * getReference() const override
Get a direct pointer to the object being sent, if possible.
virtual void setReference(yarp::os::Portable *obj)
std::int32_t expectInt32() override
Read a 32-bit integer from the network connection.
void requestDrop() override
Tag the connection to be dropped after the current message.
::ssize_t ssize_t
Definition: numeric.h:86
double float64_t
Definition: numeric.h:77
float float32_t
Definition: numeric.h:76
The components from which ports and connections are built.
An interface to the operating system, including Port based communication.
std::int32_t NetInt32
Definition of the NetInt32 type.
Definition: NetInt32.h:30