YARP
Yet Another Robot Platform
BufferedConnectionWriter.cpp
Go to the documentation of this file.
1 /*
2  * Copyright (C) 2006-2020 Istituto Italiano di Tecnologia (IIT)
3  * Copyright (C) 2006-2010 RobotCub Consortium
4  * All rights reserved.
5  *
6  * This software may be modified and distributed under the terms of the
7  * BSD-3-Clause license. See the accompanying LICENSE file for details.
8  */
9 
11 
12 #include <yarp/os/Bottle.h>
13 #include <yarp/os/DummyConnector.h>
14 #include <yarp/os/ManagedBytes.h>
15 #include <yarp/os/NetFloat32.h>
16 #include <yarp/os/NetFloat64.h>
17 #include <yarp/os/NetInt16.h>
18 #include <yarp/os/NetInt32.h>
19 #include <yarp/os/NetInt64.h>
20 #include <yarp/os/NetInt8.h>
21 #include <yarp/os/NetType.h>
22 #include <yarp/os/SizedWriter.h>
24 #include <yarp/os/Vocab.h>
25 
26 #include <cstdlib>
27 #include <cstring>
28 
29 
30 using namespace yarp::os::impl;
31 using namespace yarp::os;
32 
34  bool bareMode) :
35  target(&lst),
36  reader(nullptr),
37  textMode(textMode),
38  bareMode(bareMode),
39  convertTextModePending(false),
40  ref(nullptr),
41  shouldDrop(false),
42  lst_used(0),
43  header_used(0),
44  target_used(&lst_used),
46 {
47  stopPool();
48 }
49 
50 
52 {
53  clear();
54 }
55 
56 
58 {
59  this->textMode = textMode;
60  clear();
61  reader = nullptr;
62  ref = nullptr;
63  convertTextModePending = false;
64 }
65 
67 {
68  lst_used = 0;
69  header_used = 0;
70  reader = nullptr;
71  ref = nullptr;
72  convertTextModePending = false;
73  target = &lst;
74  target_used = &lst_used;
75  stopPool();
76 }
77 
79 {
80  target = &lst;
81  target_used = &lst_used;
82 
83  size_t i;
84  for (i = 0; i < lst.size(); i++) {
85  delete lst[i];
86  }
87  lst.clear();
88  for (i = 0; i < header.size(); i++) {
89  delete header[i];
90  }
91  header.clear();
92  stopPool();
93  lst_used = 0;
94  header_used = 0;
95 }
96 
98 {
99  if (pool != nullptr) {
100  if (data.length() + poolIndex > pool->length()) {
101  pool = nullptr;
102  }
103  }
104  if (pool == nullptr && data.length() < poolLength) {
105  bool add = false;
106  if (*target_used < target->size()) {
107  yarp::os::ManagedBytes*& bytes = (*target)[*target_used];
108  if (bytes->length() < poolLength) {
109  delete bytes;
110  bytes = new yarp::os::ManagedBytes(poolLength);
111  }
112  pool = bytes;
113  if (pool == nullptr) {
114  return false;
115  }
116  } else {
117  pool = new yarp::os::ManagedBytes(poolLength);
118  if (pool == nullptr) {
119  return false;
120  }
121  add = true;
122  }
123  (*target_used)++;
124  poolCount++;
125  poolIndex = 0;
126  if (poolLength < 65536) {
127  poolLength *= 2;
128  }
129  pool->setUsed(0);
130  if (add) {
131  target->push_back(pool);
132  }
133  }
134  if (pool != nullptr) {
135  memcpy(pool->get() + poolIndex, data.get(), data.length());
136  poolIndex += data.length();
137  pool->setUsed(poolIndex);
138  return true;
139  }
140  return false;
141 }
142 
143 
145 {
146  pool = nullptr;
147  poolIndex = 0;
148  poolLength = initialPoolSize;
149  poolCount = 0;
150 }
151 
152 
153 void BufferedConnectionWriter::push(const Bytes& data, bool copy)
154 {
155  if (copy) {
156  if (addPool(data)) {
157  return;
158  }
159  }
160  yarp::os::ManagedBytes* buf = nullptr;
161  if (*target_used < target->size()) {
162  yarp::os::ManagedBytes*& bytes = (*target)[*target_used];
163  if (bytes->isOwner() != copy || bytes->length() < data.length()) {
164  delete bytes;
165  bytes = new yarp::os::ManagedBytes(data, false);
166  if (copy) {
167  bytes->copy();
168  }
169  (*target_used)++;
170  return;
171  }
172  buf = bytes;
173  bytes->setUsed(data.length());
174  }
175  if (buf == nullptr) {
176  buf = new yarp::os::ManagedBytes(data, false);
177  if (copy) {
178  buf->copy();
179  }
180  target->push_back(buf);
181  } else {
182  if (copy) {
183  buf->copy();
184  memmove(buf->get(), data.get(), data.length());
185  } else {
186  *buf = ManagedBytes(data, false);
187  }
188  }
189  (*target_used)++;
190 }
191 
192 
194 {
195  return textMode;
196 }
197 
199 {
200  return bareMode;
201 }
202 
203 
205 {
206  if (isTextMode()) {
207  convertTextModePending = true;
208  }
209  return true;
210 }
211 
212 void BufferedConnectionWriter::declareSizes(int argc, int* argv)
213 {
214  YARP_UNUSED(argc);
215  YARP_UNUSED(argv);
216  // this method is never called yet, so no point using it yet.
217 }
218 
220 {
221  this->reader = &reader;
222 }
223 
224 namespace {
225 template <typename T, typename NetT>
226 inline void appendType(BufferedConnectionWriter* buf, T data)
227 {
228  if (std::is_same<T, NetT>::value) {
229  yarp::os::Bytes b(reinterpret_cast<char*>(&data), sizeof(T));
230  buf->push(b, true);
231  } else {
232  NetT i = data;
233  yarp::os::Bytes b(reinterpret_cast<char*>(&i), sizeof(T));
234  buf->push(b, true);
235  }
236 }
237 } // namespace
238 
240 {
241  appendType<std::int8_t, NetInt8>(this, data);
242 }
243 
245 {
246  appendType<std::int16_t, NetInt16>(this, data);
247 }
248 
250 {
251  appendType<std::int32_t, NetInt32>(this, data);
252 }
253 
255 {
256  appendType<std::int64_t, NetInt64>(this, data);
257 }
258 
260 {
261  appendType<yarp::conf::float32_t, NetFloat32>(this, data);
262 }
263 
265 {
266  appendType<yarp::conf::float64_t, NetFloat64>(this, data);
267 }
268 
269 void BufferedConnectionWriter::appendBlock(const char* data, size_t len)
270 {
271  appendBlockCopy(yarp::os::Bytes(const_cast<char*>(data), len));
272 }
273 
274 void BufferedConnectionWriter::appendText(const std::string& str, const char terminate)
275 {
276  if (terminate == '\n') {
277  appendLine(str);
278  } else if (terminate == 0) {
279  yarp::os::Bytes b(const_cast<char*>(str.data()), str.length() + 1);
280  push(b, true);
281  } else {
282  std::string s = str;
283  s += terminate;
284  appendBlockCopy(yarp::os::Bytes(const_cast<char*>(s.c_str()), s.length()));
285  }
286 }
287 
288 void BufferedConnectionWriter::appendExternalBlock(const char* data, size_t len)
289 {
290  appendBlock(yarp::os::Bytes(const_cast<char*>(data), len));
291 }
292 
294 {
295  stopPool();
296  push(data, false);
297 }
298 
300 {
301  push(data, true);
302 }
303 
304 void BufferedConnectionWriter::appendLine(const std::string& data)
305 {
306  yarp::os::Bytes b(const_cast<char*>(data.c_str()), data.length());
307  push(b, true);
308  const char* eol = "\r\n"; // for windows compatibility
309  yarp::os::Bytes beol(const_cast<char*>(eol), 2);
310  push(beol, true);
311 }
312 
313 
315 {
316  return header_used + lst_used;
317 }
318 
320 {
321  return header_used;
322 }
323 
324 size_t BufferedConnectionWriter::length(size_t index) const
325 {
326  if (index < header_used) {
327  yarp::os::ManagedBytes& b = *(header[index]);
328  return b.used();
329  }
330  yarp::os::ManagedBytes& b = *(lst[index - header.size()]);
331  return b.used();
332 }
333 
334 const char* BufferedConnectionWriter::data(size_t index) const
335 {
336  if (index < header_used) {
337  yarp::os::ManagedBytes& b = *(header[index]);
338  return b.get();
339  }
340  yarp::os::ManagedBytes& b = *(lst[index - header.size()]);
341  return b.get();
342 }
343 
345 {
346  stopWrite();
347  size_t i;
348  for (i = 0; i < header_used; i++) {
349  yarp::os::ManagedBytes& b = *(header[i]);
350  connection.appendBlock(b.get(), b.used());
351  }
352  for (i = 0; i < lst_used; i++) {
353  yarp::os::ManagedBytes& b = *(lst[i]);
354  connection.appendBlock(b.get(), b.used());
355  }
356  return !connection.isError();
357 }
358 
360 {
361  stopWrite();
362  for (size_t i = 0; i < header_used; i++) {
363  yarp::os::ManagedBytes& b = *(header[i]);
364  os.write(b.usedBytes());
365  }
366  for (size_t i = 0; i < lst_used; i++) {
367  yarp::os::ManagedBytes& b = *(lst[i]);
368  os.write(b.usedBytes());
369  }
370  os.flush();
371 }
372 
374 {
375  DummyConnector con;
376  con.setTextMode(isTextMode());
377  if (!write(con.getWriter())) {
378  return false;
379  }
380  return obj.read(con.getReader());
381 }
382 
383 
385 {
386  size_t i;
387  size_t len = 0;
388  for (i = 0; i < header_used; i++) {
389  yarp::os::ManagedBytes& b = *(header[i]);
390  len += b.usedBytes().length();
391  }
392  for (i = 0; i < lst_used; i++) {
393  yarp::os::ManagedBytes& b = *(lst[i]);
394  len += b.usedBytes().length();
395  }
396  return len;
397 }
398 
400 {
401  return header.size() + lst.size();
402 }
403 
404 
406 {
407  return reader;
408 }
409 
410 
412 {
413  stopPool();
414  target = &header;
415  target_used = &header_used;
416 }
417 
419 {
420  return ref;
421 }
422 
424 {
425  ref = obj;
426 }
427 
429 {
430  return true;
431 }
432 
434 {
435  return true;
436 }
437 
439 {
440  return false; // output errors are of no significance at user level
441 }
442 
444 {
445  shouldDrop = true;
446 }
447 
449 {
450  return shouldDrop;
451 }
452 
454 {
455 }
456 
458 {
459  // convert, last thing, if requested
460  applyConvertTextMode();
461 }
462 
464 {
465  return const_cast<BufferedConnectionWriter*>(this);
466 }
467 
468 
470 {
471  initialPoolSize = size;
472 }
473 
474 
476 {
477  stopWrite();
478  size_t total_size = dataSize();
479  std::string output(total_size, 0);
480  char* dest = const_cast<char*>(output.c_str());
481  for (size_t i = 0; i < header_used; i++) {
482  const char* data = header[i]->get();
483  size_t len = header[i]->used();
484  memmove(dest, data, len);
485  dest += len;
486  }
487  for (size_t i = 0; i < lst_used; i++) {
488  const char* data = lst[i]->get();
489  size_t len = lst[i]->used();
490  memmove(dest, data, len);
491  dest += len;
492  }
493  return output;
494 }
495 
496 bool BufferedConnectionWriter::applyConvertTextMode() const
497 {
498  return const_cast<BufferedConnectionWriter*>(this)->applyConvertTextMode();
499 }
500 
501 bool BufferedConnectionWriter::applyConvertTextMode()
502 {
503  if (convertTextModePending) {
504  convertTextModePending = false;
505 
506  Bottle b;
507  StringOutputStream sos;
508  for (size_t i = 0; i < lst_used; i++) {
509  yarp::os::ManagedBytes& m = *(lst[i]);
510  sos.write(m.usedBytes());
511  }
512  const std::string& str = sos.str();
513  b.fromBinary(str.c_str(), static_cast<int>(str.length()));
514  std::string replacement = b.toString() + "\n";
515  for (auto& i : lst) {
516  delete i;
517  }
518  lst_used = 0;
519  target = &lst;
520  lst.clear();
521  stopPool();
522  Bytes data(const_cast<char*>(replacement.c_str()), replacement.length());
524  }
525  return true;
526 }
A simple collection of objects that can be described and transmitted in a portable way.
Definition: Bottle.h:73
std::string toString() const override
Gives a human-readable textual representation of the bottle.
Definition: Bottle.cpp:214
void fromBinary(const char *buf, size_t len)
Initializes bottle from a binary representation.
Definition: Bottle.cpp:219
A simple abstraction for a block of bytes.
Definition: Bytes.h:28
size_t length() const
Definition: Bytes.cpp:25
An interface for writing to a network connection.
virtual bool isError() const =0
virtual void appendBlock(const char *data, size_t len)=0
Send a block of data to the network connection.
A dummy connection to test yarp::os::Portable implementations.
ConnectionWriter & getWriter()
Get the dummy ConnectionWriter loaded with whatever was written the ConnectionWriter since it was las...
void setTextMode(bool textmode)
Set the textMode of the dummy connection.
ConnectionReader & getReader()
Get the dummy ConnectionReader loaded with whatever was written the ConnectionWriter since it was las...
An abstraction for a block of bytes, with optional responsibility for allocating/destroying that bloc...
Definition: ManagedBytes.h:25
size_t setUsed(size_t used)
explicitly declare how many of the bytes are in use.
void copy()
Makes sure data block is owned, making a copy if necessary.
const char * get() const
Simple specification of the minimum functions needed from output streams.
Definition: OutputStream.h:25
virtual void flush()
Make sure all pending write operations are finished.
virtual void write(char ch)
Write a single byte to the stream.
Interface implemented by all objects that can read themselves from the network, such as Bottle object...
Definition: PortReader.h:28
virtual bool read(ConnectionReader &reader)=0
Read this object from a network connection.
This is a base class for objects that can be both read from and be written to the YARP network.
Definition: Portable.h:29
Minimal requirements for an efficient Writer.
Definition: SizedWriter.h:36
An OutputStream that produces a string.
void write(const Bytes &b) override
Write a block of bytes to the stream.
const std::string & str() const
A helper for creating cached object descriptions.
void requestDrop() override
Tag the connection to be dropped after the current message.
void startWrite() const override
Call when writing is about to begin.
virtual void appendBlockCopy(const Bytes &data)
Add a buffer by copying its contents.
void appendInt32(std::int32_t data) override
Send a representation of a 32-bit integer to the network connection.
void appendBlock(const char *data, size_t len) override
Send a block of data to the network connection.
void declareSizes(int argc, int *argv) override
If you can easily determine how many blocks there are in a message, call this first,...
void appendInt8(std::int8_t data) override
Send a representation of a 8-bit integer to the network connection.
void setReplyHandler(PortReader &reader) override
This sets a handler to deal with replies to the message.
const char * data(size_t index) const override
bool isBareMode() const override
Check if the connection is bare mode.
void stopPool()
Stop adding to the current pool buffer.
bool convertTextMode() override
Converts a standard description in binary into a textual description, if the connection is in text-mo...
bool write(ConnectionWriter &connection) const override
Write this object to a network connection.
void stopWrite() const override
Call when all writing is finished.
void push(const Bytes &data, bool copy)
Add the specified buffer to the list of buffers to be written.
void appendInt16(std::int16_t data) override
Send a representation of a 16-bit integer to the network connection.
void setInitialPoolSize(size_t size)
Set a custom initial pool size, which affects the size of buffers created for temporary data storage.
bool addPool(const yarp::os::Bytes &data)
Add the specified bytes to the current pool buffer.
void appendFloat64(yarp::conf::float64_t data) override
Send a representation of a 64-bit floating point number to the network connection.
void clear() override
Clear all cached data.
BufferedConnectionWriter(bool textMode=false, bool bareMode=false)
Constructor.
bool isTextMode() const override
Check if the connection is text mode.
void restart()
Tell the writer that we will be serializing a new object, but to keep any cached buffers that already...
void addToHeader()
Switch to storing a header.
void appendFloat32(yarp::conf::float32_t data) override
Send a representation of a 32-bit floating point number to the network connection.
void appendInt64(std::int64_t data) override
Send a representation of a 64-bit integer to the network connection.
void setReference(yarp::os::Portable *obj) override
Stores a direct pointer to the object being sent.
void appendText(const std::string &str, const char terminate='\n') override
Send a terminated string to the network connection.
virtual void appendLine(const std::string &data)
Send a string along with a carriage-return-line-feed sequence.
void reset(bool textMode)
Completely clear the writer and start afresh.
void appendExternalBlock(const char *data, size_t len) override
Send a block of data to the network connection, without making a copy.
double float64_t
Definition: numeric.h:51
float float32_t
Definition: numeric.h:50
The components from which ports and connections are built.
constexpr size_t BUFFERED_CONNECTION_INITIAL_POOL_SIZE
An interface to the operating system, including Port based communication.
#define YARP_UNUSED(var)
Definition: api.h:159