YARP
Yet Another Robot Platform
BufferedConnectionWriter.cpp
Go to the documentation of this file.
1 /*
2  * SPDX-FileCopyrightText: 2006-2021 Istituto Italiano di Tecnologia (IIT)
3  * SPDX-FileCopyrightText: 2006-2010 RobotCub Consortium
4  * SPDX-License-Identifier: BSD-3-Clause
5  */
6 
8 
9 #include <yarp/os/Bottle.h>
10 #include <yarp/os/DummyConnector.h>
11 #include <yarp/os/ManagedBytes.h>
12 #include <yarp/os/NetFloat32.h>
13 #include <yarp/os/NetFloat64.h>
14 #include <yarp/os/NetInt16.h>
15 #include <yarp/os/NetInt32.h>
16 #include <yarp/os/NetInt64.h>
17 #include <yarp/os/NetInt8.h>
18 #include <yarp/os/NetType.h>
19 #include <yarp/os/SizedWriter.h>
21 #include <yarp/os/Vocab.h>
22 
23 #include <cstdlib>
24 #include <cstring>
25 
26 
27 using namespace yarp::os::impl;
28 using namespace yarp::os;
29 
31  bool bareMode) :
32  target(&lst),
33  reader(nullptr),
34  textMode(textMode),
35  bareMode(bareMode),
36  convertTextModePending(false),
37  ref(nullptr),
38  shouldDrop(false),
39  lst_used(0),
40  header_used(0),
41  target_used(&lst_used),
43 {
44  stopPool();
45 }
46 
47 
49 {
50  clear();
51 }
52 
53 
55 {
56  this->textMode = textMode;
57  clear();
58  reader = nullptr;
59  ref = nullptr;
60  convertTextModePending = false;
61 }
62 
64 {
65  lst_used = 0;
66  header_used = 0;
67  reader = nullptr;
68  ref = nullptr;
69  convertTextModePending = false;
70  target = &lst;
71  target_used = &lst_used;
72  stopPool();
73 }
74 
76 {
77  target = &lst;
78  target_used = &lst_used;
79 
80  size_t i;
81  for (i = 0; i < lst.size(); i++) {
82  delete lst[i];
83  }
84  lst.clear();
85  for (i = 0; i < header.size(); i++) {
86  delete header[i];
87  }
88  header.clear();
89  stopPool();
90  lst_used = 0;
91  header_used = 0;
92 }
93 
95 {
96  if (pool != nullptr) {
97  if (data.length() + poolIndex > pool->length()) {
98  pool = nullptr;
99  }
100  }
101  if (pool == nullptr && data.length() < poolLength) {
102  bool add = false;
103  if (*target_used < target->size()) {
104  yarp::os::ManagedBytes*& bytes = (*target)[*target_used];
105  if (bytes->length() < poolLength) {
106  delete bytes;
107  bytes = new yarp::os::ManagedBytes(poolLength);
108  }
109  pool = bytes;
110  if (pool == nullptr) {
111  return false;
112  }
113  } else {
114  pool = new yarp::os::ManagedBytes(poolLength);
115  if (pool == nullptr) {
116  return false;
117  }
118  add = true;
119  }
120  (*target_used)++;
121  poolCount++;
122  poolIndex = 0;
123  if (poolLength < 65536) {
124  poolLength *= 2;
125  }
126  pool->setUsed(0);
127  if (add) {
128  target->push_back(pool);
129  }
130  }
131  if (pool != nullptr) {
132  memcpy(pool->get() + poolIndex, data.get(), data.length());
133  poolIndex += data.length();
134  pool->setUsed(poolIndex);
135  return true;
136  }
137  return false;
138 }
139 
140 
142 {
143  pool = nullptr;
144  poolIndex = 0;
145  poolLength = initialPoolSize;
146  poolCount = 0;
147 }
148 
149 
150 void BufferedConnectionWriter::push(const Bytes& data, bool copy)
151 {
152  if (copy) {
153  if (addPool(data)) {
154  return;
155  }
156  }
157  yarp::os::ManagedBytes* buf = nullptr;
158  if (*target_used < target->size()) {
159  yarp::os::ManagedBytes*& bytes = (*target)[*target_used];
160  if (bytes->isOwner() != copy || bytes->length() < data.length()) {
161  delete bytes;
162  bytes = new yarp::os::ManagedBytes(data, false);
163  if (copy) {
164  bytes->copy();
165  }
166  (*target_used)++;
167  return;
168  }
169  buf = bytes;
170  bytes->setUsed(data.length());
171  }
172  if (buf == nullptr) {
173  buf = new yarp::os::ManagedBytes(data, false);
174  if (copy) {
175  buf->copy();
176  }
177  target->push_back(buf);
178  } else {
179  if (copy) {
180  buf->copy();
181  memmove(buf->get(), data.get(), data.length());
182  } else {
183  *buf = ManagedBytes(data, false);
184  }
185  }
186  (*target_used)++;
187 }
188 
189 
191 {
192  return textMode;
193 }
194 
196 {
197  return bareMode;
198 }
199 
200 
202 {
203  if (isTextMode()) {
204  convertTextModePending = true;
205  }
206  return true;
207 }
208 
209 void BufferedConnectionWriter::declareSizes(int argc, int* argv)
210 {
211  YARP_UNUSED(argc);
212  YARP_UNUSED(argv);
213  // this method is never called yet, so no point using it yet.
214 }
215 
217 {
218  this->reader = &reader;
219 }
220 
221 namespace {
222 template <typename T, typename NetT>
223 inline void appendType(BufferedConnectionWriter* buf, T data)
224 {
225  if (std::is_same<T, NetT>::value) {
226  yarp::os::Bytes b(reinterpret_cast<char*>(&data), sizeof(T));
227  buf->push(b, true);
228  } else {
229  NetT i = data;
230  yarp::os::Bytes b(reinterpret_cast<char*>(&i), sizeof(T));
231  buf->push(b, true);
232  }
233 }
234 } // namespace
235 
237 {
238  appendType<std::int8_t, NetInt8>(this, data);
239 }
240 
242 {
243  appendType<std::int16_t, NetInt16>(this, data);
244 }
245 
247 {
248  appendType<std::int32_t, NetInt32>(this, data);
249 }
250 
252 {
253  appendType<std::int64_t, NetInt64>(this, data);
254 }
255 
257 {
258  appendType<yarp::conf::float32_t, NetFloat32>(this, data);
259 }
260 
262 {
263  appendType<yarp::conf::float64_t, NetFloat64>(this, data);
264 }
265 
266 void BufferedConnectionWriter::appendBlock(const char* data, size_t len)
267 {
268  appendBlockCopy(yarp::os::Bytes(const_cast<char*>(data), len));
269 }
270 
271 void BufferedConnectionWriter::appendText(const std::string& str, const char terminate)
272 {
273  if (terminate == '\n') {
274  appendLine(str);
275  } else if (terminate == 0) {
276  yarp::os::Bytes b(const_cast<char*>(str.data()), str.length() + 1);
277  push(b, true);
278  } else {
279  std::string s = str;
280  s += terminate;
281  appendBlockCopy(yarp::os::Bytes(const_cast<char*>(s.c_str()), s.length()));
282  }
283 }
284 
285 void BufferedConnectionWriter::appendExternalBlock(const char* data, size_t len)
286 {
287  appendBlock(yarp::os::Bytes(const_cast<char*>(data), len));
288 }
289 
291 {
292  stopPool();
293  push(data, false);
294 }
295 
297 {
298  push(data, true);
299 }
300 
301 void BufferedConnectionWriter::appendLine(const std::string& data)
302 {
303  yarp::os::Bytes b(const_cast<char*>(data.c_str()), data.length());
304  push(b, true);
305  const char* eol = "\r\n"; // for windows compatibility
306  yarp::os::Bytes beol(const_cast<char*>(eol), 2);
307  push(beol, true);
308 }
309 
310 
312 {
313  return header_used + lst_used;
314 }
315 
317 {
318  return header_used;
319 }
320 
321 size_t BufferedConnectionWriter::length(size_t index) const
322 {
323  if (index < header_used) {
324  yarp::os::ManagedBytes& b = *(header[index]);
325  return b.used();
326  }
327  yarp::os::ManagedBytes& b = *(lst[index - header.size()]);
328  return b.used();
329 }
330 
331 const char* BufferedConnectionWriter::data(size_t index) const
332 {
333  if (index < header_used) {
334  yarp::os::ManagedBytes& b = *(header[index]);
335  return b.get();
336  }
337  yarp::os::ManagedBytes& b = *(lst[index - header.size()]);
338  return b.get();
339 }
340 
342 {
343  stopWrite();
344  size_t i;
345  for (i = 0; i < header_used; i++) {
346  yarp::os::ManagedBytes& b = *(header[i]);
347  connection.appendBlock(b.get(), b.used());
348  }
349  for (i = 0; i < lst_used; i++) {
350  yarp::os::ManagedBytes& b = *(lst[i]);
351  connection.appendBlock(b.get(), b.used());
352  }
353  return !connection.isError();
354 }
355 
357 {
358  stopWrite();
359  for (size_t i = 0; i < header_used; i++) {
360  yarp::os::ManagedBytes& b = *(header[i]);
361  os.write(b.usedBytes());
362  }
363  for (size_t i = 0; i < lst_used; i++) {
364  yarp::os::ManagedBytes& b = *(lst[i]);
365  os.write(b.usedBytes());
366  }
367  os.flush();
368 }
369 
371 {
372  DummyConnector con;
373  con.setTextMode(isTextMode());
374  if (!write(con.getWriter())) {
375  return false;
376  }
377  return obj.read(con.getReader());
378 }
379 
380 
382 {
383  size_t i;
384  size_t len = 0;
385  for (i = 0; i < header_used; i++) {
386  yarp::os::ManagedBytes& b = *(header[i]);
387  len += b.usedBytes().length();
388  }
389  for (i = 0; i < lst_used; i++) {
390  yarp::os::ManagedBytes& b = *(lst[i]);
391  len += b.usedBytes().length();
392  }
393  return len;
394 }
395 
397 {
398  return header.size() + lst.size();
399 }
400 
401 
403 {
404  return reader;
405 }
406 
407 
409 {
410  stopPool();
411  target = &header;
412  target_used = &header_used;
413 }
414 
416 {
417  return ref;
418 }
419 
421 {
422  ref = obj;
423 }
424 
426 {
427  return true;
428 }
429 
431 {
432  return true;
433 }
434 
436 {
437  return false; // output errors are of no significance at user level
438 }
439 
441 {
442  shouldDrop = true;
443 }
444 
446 {
447  return shouldDrop;
448 }
449 
451 {
452 }
453 
455 {
456  // convert, last thing, if requested
457  applyConvertTextMode();
458 }
459 
461 {
462  return const_cast<BufferedConnectionWriter*>(this);
463 }
464 
465 
467 {
468  initialPoolSize = size;
469 }
470 
471 
473 {
474  stopWrite();
475  size_t total_size = dataSize();
476  std::string output(total_size, 0);
477  char* dest = const_cast<char*>(output.c_str());
478  for (size_t i = 0; i < header_used; i++) {
479  const char* data = header[i]->get();
480  size_t len = header[i]->used();
481  memmove(dest, data, len);
482  dest += len;
483  }
484  for (size_t i = 0; i < lst_used; i++) {
485  const char* data = lst[i]->get();
486  size_t len = lst[i]->used();
487  memmove(dest, data, len);
488  dest += len;
489  }
490  return output;
491 }
492 
493 bool BufferedConnectionWriter::applyConvertTextMode() const
494 {
495  return const_cast<BufferedConnectionWriter*>(this)->applyConvertTextMode();
496 }
497 
498 bool BufferedConnectionWriter::applyConvertTextMode()
499 {
500  if (convertTextModePending) {
501  convertTextModePending = false;
502 
503  Bottle b;
504  StringOutputStream sos;
505  for (size_t i = 0; i < lst_used; i++) {
506  yarp::os::ManagedBytes& m = *(lst[i]);
507  sos.write(m.usedBytes());
508  }
509  const std::string& str = sos.str();
510  b.fromBinary(str.c_str(), static_cast<int>(str.length()));
511  std::string replacement = b.toString() + "\n";
512  for (auto& i : lst) {
513  delete i;
514  }
515  lst_used = 0;
516  target = &lst;
517  lst.clear();
518  stopPool();
519  Bytes data(const_cast<char*>(replacement.c_str()), replacement.length());
521  }
522  return true;
523 }
A simple collection of objects that can be described and transmitted in a portable way.
Definition: Bottle.h:74
std::string toString() const override
Gives a human-readable textual representation of the bottle.
Definition: Bottle.cpp:211
void fromBinary(const char *buf, size_t len)
Initializes bottle from a binary representation.
Definition: Bottle.cpp:216
A simple abstraction for a block of bytes.
Definition: Bytes.h:25
size_t length() const
Definition: Bytes.cpp:22
An interface for writing to a network connection.
virtual bool isError() const =0
virtual void appendBlock(const char *data, size_t len)=0
Send a block of data to the network connection.
A dummy connection to test yarp::os::Portable implementations.
ConnectionWriter & getWriter()
Get the dummy ConnectionWriter loaded with whatever was written the ConnectionWriter since it was las...
void setTextMode(bool textmode)
Set the textMode of the dummy connection.
ConnectionReader & getReader(ConnectionWriter *replyWriter=nullptr)
Get the dummy ConnectionReader loaded with whatever was written the ConnectionWriter since it was las...
An abstraction for a block of bytes, with optional responsibility for allocating/destroying that bloc...
Definition: ManagedBytes.h:22
size_t setUsed(size_t used)
explicitly declare how many of the bytes are in use.
void copy()
Makes sure data block is owned, making a copy if necessary.
const char * get() const
Simple specification of the minimum functions needed from output streams.
Definition: OutputStream.h:22
virtual void flush()
Make sure all pending write operations are finished.
virtual void write(char ch)
Write a single byte to the stream.
Interface implemented by all objects that can read themselves from the network, such as Bottle object...
Definition: PortReader.h:25
virtual bool read(ConnectionReader &reader)=0
Read this object from a network connection.
This is a base class for objects that can be both read from and be written to the YARP network.
Definition: Portable.h:26
Minimal requirements for an efficient Writer.
Definition: SizedWriter.h:33
An OutputStream that produces a string.
void write(const Bytes &b) override
Write a block of bytes to the stream.
const std::string & str() const
A helper for creating cached object descriptions.
void requestDrop() override
Tag the connection to be dropped after the current message.
void startWrite() const override
Call when writing is about to begin.
virtual void appendBlockCopy(const Bytes &data)
Add a buffer by copying its contents.
void appendInt32(std::int32_t data) override
Send a representation of a 32-bit integer to the network connection.
void appendBlock(const char *data, size_t len) override
Send a block of data to the network connection.
void declareSizes(int argc, int *argv) override
If you can easily determine how many blocks there are in a message, call this first,...
void appendInt8(std::int8_t data) override
Send a representation of a 8-bit integer to the network connection.
void setReplyHandler(PortReader &reader) override
This sets a handler to deal with replies to the message.
const char * data(size_t index) const override
bool isBareMode() const override
Check if the connection is bare mode.
void stopPool()
Stop adding to the current pool buffer.
bool convertTextMode() override
Converts a standard description in binary into a textual description, if the connection is in text-mo...
bool write(ConnectionWriter &connection) const override
Write this object to a network connection.
void stopWrite() const override
Call when all writing is finished.
void push(const Bytes &data, bool copy)
Add the specified buffer to the list of buffers to be written.
void appendInt16(std::int16_t data) override
Send a representation of a 16-bit integer to the network connection.
void setInitialPoolSize(size_t size)
Set a custom initial pool size, which affects the size of buffers created for temporary data storage.
bool addPool(const yarp::os::Bytes &data)
Add the specified bytes to the current pool buffer.
void appendFloat64(yarp::conf::float64_t data) override
Send a representation of a 64-bit floating point number to the network connection.
void clear() override
Clear all cached data.
BufferedConnectionWriter(bool textMode=false, bool bareMode=false)
Constructor.
bool isTextMode() const override
Check if the connection is text mode.
void restart()
Tell the writer that we will be serializing a new object, but to keep any cached buffers that already...
void addToHeader()
Switch to storing a header.
void appendFloat32(yarp::conf::float32_t data) override
Send a representation of a 32-bit floating point number to the network connection.
void appendInt64(std::int64_t data) override
Send a representation of a 64-bit integer to the network connection.
void setReference(yarp::os::Portable *obj) override
Stores a direct pointer to the object being sent.
void appendText(const std::string &str, const char terminate='\n') override
Send a terminated string to the network connection.
virtual void appendLine(const std::string &data)
Send a string along with a carriage-return-line-feed sequence.
void reset(bool textMode)
Completely clear the writer and start afresh.
void appendExternalBlock(const char *data, size_t len) override
Send a block of data to the network connection, without making a copy.
double float64_t
Definition: numeric.h:77
float float32_t
Definition: numeric.h:76
The components from which ports and connections are built.
constexpr size_t BUFFERED_CONNECTION_INITIAL_POOL_SIZE
An interface to the operating system, including Port based communication.
#define YARP_UNUSED(var)
Definition: api.h:162