YARP
Yet Another Robot Platform
 
Loading...
Searching...
No Matches
BufferedConnectionWriter.cpp
Go to the documentation of this file.
1/*
2 * SPDX-FileCopyrightText: 2006-2021 Istituto Italiano di Tecnologia (IIT)
3 * SPDX-FileCopyrightText: 2006-2010 RobotCub Consortium
4 * SPDX-License-Identifier: BSD-3-Clause
5 */
6
8
9#include <yarp/os/Bottle.h>
12#include <yarp/os/NetFloat32.h>
13#include <yarp/os/NetFloat64.h>
14#include <yarp/os/NetInt16.h>
15#include <yarp/os/NetInt32.h>
16#include <yarp/os/NetInt64.h>
17#include <yarp/os/NetInt8.h>
18#include <yarp/os/NetType.h>
19#include <yarp/os/SizedWriter.h>
21#include <yarp/os/Vocab.h>
22
23#include <cstdlib>
24#include <cstring>
25
26
27using namespace yarp::os::impl;
28using namespace yarp::os;
29
31 bool bareMode) :
32 target(&lst),
33 reader(nullptr),
34 textMode(textMode),
35 bareMode(bareMode),
36 convertTextModePending(false),
37 ref(nullptr),
38 shouldDrop(false),
39 lst_used(0),
40 header_used(0),
41 target_used(&lst_used),
43{
44 stopPool();
45}
46
47
52
53
55{
56 this->textMode = textMode;
57 clear();
58 reader = nullptr;
59 ref = nullptr;
60 convertTextModePending = false;
61}
62
64{
65 lst_used = 0;
66 header_used = 0;
67 reader = nullptr;
68 ref = nullptr;
69 convertTextModePending = false;
70 target = &lst;
71 target_used = &lst_used;
72 stopPool();
73}
74
76{
77 target = &lst;
78 target_used = &lst_used;
79
80 size_t i;
81 for (i = 0; i < lst.size(); i++) {
82 delete lst[i];
83 }
84 lst.clear();
85 for (i = 0; i < header.size(); i++) {
86 delete header[i];
87 }
88 header.clear();
89 stopPool();
90 lst_used = 0;
91 header_used = 0;
92}
93
95{
96 if (pool != nullptr) {
97 if (data.length() + poolIndex > pool->length()) {
98 pool = nullptr;
99 }
100 }
101 if (pool == nullptr && data.length() < poolLength) {
102 bool add = false;
103 if (*target_used < target->size()) {
104 yarp::os::ManagedBytes*& bytes = (*target)[*target_used];
105 if (bytes->length() < poolLength) {
106 delete bytes;
107 bytes = new yarp::os::ManagedBytes(poolLength);
108 }
109 pool = bytes;
110 if (pool == nullptr) {
111 return false;
112 }
113 } else {
114 pool = new yarp::os::ManagedBytes(poolLength);
115 if (pool == nullptr) {
116 return false;
117 }
118 add = true;
119 }
120 (*target_used)++;
121 poolCount++;
122 poolIndex = 0;
123 if (poolLength < 65536) {
124 poolLength *= 2;
125 }
126 pool->setUsed(0);
127 if (add) {
128 target->push_back(pool);
129 }
130 }
131 if (pool != nullptr) {
132 memcpy(pool->get() + poolIndex, data.get(), data.length());
133 poolIndex += data.length();
134 pool->setUsed(poolIndex);
135 return true;
136 }
137 return false;
138}
139
140
142{
143 pool = nullptr;
144 poolIndex = 0;
145 poolLength = initialPoolSize;
146 poolCount = 0;
147}
148
149
150void BufferedConnectionWriter::push(const Bytes& data, bool copy)
151{
152 if (copy) {
153 if (addPool(data)) {
154 return;
155 }
156 }
157 yarp::os::ManagedBytes* buf = nullptr;
158 if (*target_used < target->size()) {
159 yarp::os::ManagedBytes*& bytes = (*target)[*target_used];
160 if (bytes->isOwner() != copy || bytes->length() < data.length()) {
161 delete bytes;
162 bytes = new yarp::os::ManagedBytes(data, false);
163 if (copy) {
164 bytes->copy();
165 }
166 (*target_used)++;
167 return;
168 }
169 buf = bytes;
170 bytes->setUsed(data.length());
171 }
172 if (buf == nullptr) {
173 buf = new yarp::os::ManagedBytes(data, false);
174 if (copy) {
175 buf->copy();
176 }
177 target->push_back(buf);
178 } else {
179 if (copy) {
180 buf->copy();
181 memmove(buf->get(), data.get(), data.length());
182 } else {
183 *buf = ManagedBytes(data, false);
184 }
185 }
186 (*target_used)++;
187}
188
189
191{
192 return textMode;
193}
194
196{
197 return bareMode;
198}
199
200
202{
203 if (isTextMode()) {
204 convertTextModePending = true;
205 }
206 return true;
207}
208
210{
211 YARP_UNUSED(argc);
212 YARP_UNUSED(argv);
213 // this method is never called yet, so no point using it yet.
214}
215
217{
218 this->reader = &reader;
219}
220
221namespace {
222template <typename T, typename NetT>
223inline void appendType(BufferedConnectionWriter* buf, T data)
224{
225 if (std::is_same<T, NetT>::value) {
226 yarp::os::Bytes b(reinterpret_cast<char*>(&data), sizeof(T));
227 buf->push(b, true);
228 } else {
229 NetT i = data;
230 yarp::os::Bytes b(reinterpret_cast<char*>(&i), sizeof(T));
231 buf->push(b, true);
232 }
233}
234} // namespace
235
240
245
250
255
260
265
266void BufferedConnectionWriter::appendBlock(const char* data, size_t len)
267{
268 appendBlockCopy(yarp::os::Bytes(const_cast<char*>(data), len));
269}
270
271void BufferedConnectionWriter::appendText(const std::string& str, const char terminate)
272{
273 if (terminate == '\n') {
274 appendLine(str);
275 } else if (terminate == 0) {
276 yarp::os::Bytes b(const_cast<char*>(str.data()), str.length() + 1);
277 push(b, true);
278 } else {
279 std::string s = str;
280 s += terminate;
281 appendBlockCopy(yarp::os::Bytes(const_cast<char*>(s.c_str()), s.length()));
282 }
283}
284
285void BufferedConnectionWriter::appendExternalBlock(const char* data, size_t len)
286{
287 appendBlock(yarp::os::Bytes(const_cast<char*>(data), len));
288}
289
291{
292 stopPool();
293 push(data, false);
294}
295
300
301void BufferedConnectionWriter::appendLine(const std::string& data)
302{
303 yarp::os::Bytes b(const_cast<char*>(data.c_str()), data.length());
304 push(b, true);
305 const char* eol = "\r\n"; // for windows compatibility
306 yarp::os::Bytes beol(const_cast<char*>(eol), 2);
307 push(beol, true);
308}
309
310
312{
313 return header_used + lst_used;
314}
315
317{
318 return header_used;
319}
320
321size_t BufferedConnectionWriter::length(size_t index) const
322{
323 if (index < header_used) {
324 yarp::os::ManagedBytes& b = *(header[index]);
325 return b.used();
326 }
327 yarp::os::ManagedBytes& b = *(lst[index - header.size()]);
328 return b.used();
329}
330
331const char* BufferedConnectionWriter::data(size_t index) const
332{
333 if (index < header_used) {
334 yarp::os::ManagedBytes& b = *(header[index]);
335 return b.get();
336 }
337 yarp::os::ManagedBytes& b = *(lst[index - header.size()]);
338 return b.get();
339}
340
342{
343 stopWrite();
344 size_t i;
345 for (i = 0; i < header_used; i++) {
346 yarp::os::ManagedBytes& b = *(header[i]);
347 connection.appendBlock(b.get(), b.used());
348 }
349 for (i = 0; i < lst_used; i++) {
350 yarp::os::ManagedBytes& b = *(lst[i]);
351 connection.appendBlock(b.get(), b.used());
352 }
353 return !connection.isError();
354}
355
357{
358 stopWrite();
359 for (size_t i = 0; i < header_used; i++) {
360 yarp::os::ManagedBytes& b = *(header[i]);
361 os.write(b.usedBytes());
362 }
363 for (size_t i = 0; i < lst_used; i++) {
364 yarp::os::ManagedBytes& b = *(lst[i]);
365 os.write(b.usedBytes());
366 }
367 os.flush();
368}
369
371{
372 DummyConnector con;
373 con.setTextMode(isTextMode());
374 if (!write(con.getWriter())) {
375 return false;
376 }
377 return obj.read(con.getReader());
378}
379
380
382{
383 size_t i;
384 size_t len = 0;
385 for (i = 0; i < header_used; i++) {
386 yarp::os::ManagedBytes& b = *(header[i]);
387 len += b.usedBytes().length();
388 }
389 for (i = 0; i < lst_used; i++) {
390 yarp::os::ManagedBytes& b = *(lst[i]);
391 len += b.usedBytes().length();
392 }
393 return len;
394}
395
397{
398 return header.size() + lst.size();
399}
400
401
406
407
409{
410 stopPool();
411 target = &header;
412 target_used = &header_used;
413}
414
419
424
426{
427 return true;
428}
429
431{
432 return true;
433}
434
436{
437 return false; // output errors are of no significance at user level
438}
439
441{
442 shouldDrop = true;
443}
444
446{
447 return shouldDrop;
448}
449
453
455{
456 // convert, last thing, if requested
457 applyConvertTextMode();
458}
459
464
466{
467 return this;
468}
469
471{
472 initialPoolSize = size;
473}
474
475
477{
478 stopWrite();
479 size_t total_size = dataSize();
480 std::string output(total_size, 0);
481 char* dest = const_cast<char*>(output.c_str());
482 for (size_t i = 0; i < header_used; i++) {
483 const char* data = header[i]->get();
484 size_t len = header[i]->used();
485 memmove(dest, data, len);
486 dest += len;
487 }
488 for (size_t i = 0; i < lst_used; i++) {
489 const char* data = lst[i]->get();
490 size_t len = lst[i]->used();
491 memmove(dest, data, len);
492 dest += len;
493 }
494 return output;
495}
496
497bool BufferedConnectionWriter::applyConvertTextMode() const
498{
499 return const_cast<BufferedConnectionWriter*>(this)->applyConvertTextMode();
500}
501
502bool BufferedConnectionWriter::applyConvertTextMode()
503{
504 if (convertTextModePending) {
505 convertTextModePending = false;
506
507 Bottle b;
509 for (size_t i = 0; i < lst_used; i++) {
510 yarp::os::ManagedBytes& m = *(lst[i]);
511 sos.write(m.usedBytes());
512 }
513 const std::string& str = sos.str();
514 b.fromBinary(str.c_str(), static_cast<int>(str.length()));
515 std::string replacement = b.toString() + "\n";
516 for (auto& i : lst) {
517 delete i;
518 }
519 lst_used = 0;
520 target = &lst;
521 lst.clear();
522 stopPool();
523 Bytes data(const_cast<char*>(replacement.c_str()), replacement.length());
525 }
526 return true;
527}
A simple collection of objects that can be described and transmitted in a portable way.
Definition Bottle.h:64
std::string toString() const override
Gives a human-readable textual representation of the bottle.
Definition Bottle.cpp:211
void fromBinary(const char *buf, size_t len)
Initializes bottle from a binary representation.
Definition Bottle.cpp:216
A mini-server for performing network communication in the background.
T * read(bool shouldWait=true) override
Read an available object from the port.
A simple abstraction for a block of bytes.
Definition Bytes.h:24
size_t length() const
Definition Bytes.cpp:22
An interface for writing to a network connection.
A dummy connection to test yarp::os::Portable implementations.
ConnectionWriter & getWriter()
Get the dummy ConnectionWriter loaded with whatever was written the ConnectionWriter since it was las...
void setTextMode(bool textmode)
Set the textMode of the dummy connection.
ConnectionReader & getReader(ConnectionWriter *replyWriter=nullptr)
Get the dummy ConnectionReader loaded with whatever was written the ConnectionWriter since it was las...
An abstraction for a block of bytes, with optional responsibility for allocating/destroying that bloc...
size_t setUsed(size_t used)
explicitly declare how many of the bytes are in use.
void copy()
Makes sure data block is owned, making a copy if necessary.
const char * get() const
Simple specification of the minimum functions needed from output streams.
virtual void flush()
Make sure all pending write operations are finished.
virtual void write(char ch)
Write a single byte to the stream.
Interface implemented by all objects that can read themselves from the network, such as Bottle object...
Definition PortReader.h:24
This is a base class for objects that can be both read from and be written to the YARP network.
Definition Portable.h:25
Minimal requirements for an efficient Writer.
Definition SizedWriter.h:32
An OutputStream that produces a string.
const std::string & str() const
void write(const Bytes &b) override
Write a block of bytes to the stream.
A helper for creating cached object descriptions.
void requestDrop() override
Tag the connection to be dropped after the current message.
void startWrite() const override
Call when writing is about to begin.
virtual void appendBlockCopy(const Bytes &data)
Add a buffer by copying its contents.
void appendInt32(std::int32_t data) override
Send a representation of a 32-bit integer to the network connection.
void appendBlock(const char *data, size_t len) override
Send a block of data to the network connection.
void declareSizes(int argc, int *argv) override
If you can easily determine how many blocks there are in a message, call this first,...
void appendInt8(std::int8_t data) override
Send a representation of a 8-bit integer to the network connection.
void setReplyHandler(PortReader &reader) override
This sets a handler to deal with replies to the message.
const char * data(size_t index) const override
bool isBareMode() const override
Check if the connection is bare mode.
void stopPool()
Stop adding to the current pool buffer.
bool convertTextMode() override
Converts a standard description in binary into a textual description, if the connection is in text-mo...
bool write(ConnectionWriter &connection) const override
Write this object to a network connection.
void stopWrite() const override
Call when all writing is finished.
void push(const Bytes &data, bool copy)
Add the specified buffer to the list of buffers to be written.
void appendInt16(std::int16_t data) override
Send a representation of a 16-bit integer to the network connection.
void setInitialPoolSize(size_t size)
Set a custom initial pool size, which affects the size of buffers created for temporary data storage.
bool addPool(const yarp::os::Bytes &data)
Add the specified bytes to the current pool buffer.
void appendFloat64(yarp::conf::float64_t data) override
Send a representation of a 64-bit floating point number to the network connection.
void clear() override
Clear all cached data.
BufferedConnectionWriter(bool textMode=false, bool bareMode=false)
Constructor.
bool isTextMode() const override
Check if the connection is text mode.
void restart()
Tell the writer that we will be serializing a new object, but to keep any cached buffers that already...
void appendFloat32(yarp::conf::float32_t data) override
Send a representation of a 32-bit floating point number to the network connection.
void appendInt64(std::int64_t data) override
Send a representation of a 64-bit integer to the network connection.
void setReference(yarp::os::Portable *obj) override
Stores a direct pointer to the object being sent.
void appendText(const std::string &str, const char terminate='\n') override
Send a terminated string to the network connection.
virtual void appendLine(const std::string &data)
Send a string along with a carriage-return-line-feed sequence.
void reset(bool textMode)
Completely clear the writer and start afresh.
void appendExternalBlock(const char *data, size_t len) override
Send a block of data to the network connection, without making a copy.
The components from which ports and connections are built.
constexpr size_t BUFFERED_CONNECTION_INITIAL_POOL_SIZE
An interface to the operating system, including Port based communication.
#define YARP_UNUSED(var)
Definition api.h:162