YARP
Yet Another Robot Platform
PortReaderBufferBase.cpp
Go to the documentation of this file.
1/*
2 * SPDX-FileCopyrightText: 2006-2021 Istituto Italiano di Tecnologia (IIT)
3 * SPDX-FileCopyrightText: 2006-2010 RobotCub Consortium
4 * SPDX-License-Identifier: BSD-3-Clause
5 */
6
8
9#include <yarp/os/Os.h>
11#include <yarp/os/Portable.h>
12#include <yarp/os/Semaphore.h>
14#include <yarp/os/Thread.h>
15#include <yarp/os/Time.h>
19
20#include <list>
21#include <mutex>
22
23using namespace yarp::os::impl;
24using namespace yarp::os;
25
26namespace {
27YARP_OS_LOG_COMPONENT(PORTREADERBUFFERBASE, "yarp.os.PortReaderBufferBase")
28} // namespace
29
30#ifndef DOXYGEN_SHOULD_SKIP_THIS
31class PortReaderPacket
32{
33public:
34 PortReaderPacket *prev_, *next_;
35
36 // if non-null, contains a buffer that the packet owns
37 PortReader* reader;
38
39 std::string envelope;
40
41 // if nun-null, refers to an external buffer
42 // by convention, overrides reader
43 PortReader* external;
44 PortWriter* writer; // if a callback is needed
45
46 PortReaderPacket()
47 {
48 prev_ = next_ = nullptr;
49 reader = nullptr;
50 external = nullptr;
51 writer = nullptr;
52 reset();
53 }
54
55 virtual ~PortReaderPacket()
56 {
57 resetExternal();
58 reset();
59 }
60
61 void reset()
62 {
63 if (reader != nullptr) {
64 delete reader;
65 reader = nullptr;
66 }
67 writer = nullptr;
68 envelope = "";
69 }
70
71 PortReader* getReader()
72 {
73 return reader;
74 }
75
76 void setReader(PortReader* reader)
77 {
78 resetExternal();
79 reset();
80 this->reader = reader;
81 }
82
83 PortReader* getExternal()
84 {
85 return external;
86 }
87
88 void setExternal(PortReader* reader, PortWriter* writer)
89 {
90 resetExternal();
91 this->external = reader;
92 this->writer = writer;
93 }
94
95 void setEnvelope(const Bytes& bytes)
96 {
97 envelope = std::string(bytes.get(), bytes.length());
98 //envelope.set(bytes.get(), bytes.length(), 1);
99 }
100
101 void resetExternal()
102 {
103 if (writer != nullptr) {
104 writer->onCompletion();
105 writer = nullptr;
106 }
107 external = nullptr;
108 }
109};
110
111
112class PortReaderPool
113{
114private:
115 std::list<PortReaderPacket*> inactive;
116 std::list<PortReaderPacket*> active;
117
118public:
119 size_t getCount()
120 {
121 return active.size();
122 }
123
124 size_t getFree()
125 {
126 return inactive.size();
127 }
128
129 PortReaderPacket* getInactivePacket()
130 {
131 if (inactive.empty()) {
132 PortReaderPacket* obj = nullptr;
133 obj = new PortReaderPacket();
134 inactive.push_back(obj);
135 }
136 PortReaderPacket* next = inactive.front();
137 yCAssert(PORTREADERBUFFERBASE, next != nullptr);
138 inactive.remove(next);
139 return next;
140 }
141
142 PortReaderPacket* getActivePacket()
143 {
144 PortReaderPacket* next = nullptr;
145 if (getCount() >= 1) {
146 next = active.front();
147 yCAssert(PORTREADERBUFFERBASE, next != nullptr);
148 active.remove(next);
149 }
150 return next;
151 }
152
153 void addActivePacket(PortReaderPacket* packet)
154 {
155 if (packet != nullptr) {
156 active.push_back(packet);
157 }
158 }
159
160 void addInactivePacket(PortReaderPacket* packet)
161 {
162 if (packet != nullptr) {
163 inactive.push_back(packet);
164 }
165 }
166
167 void reset()
168 {
169 while (!active.empty()) {
170 delete active.back();
171 active.pop_back();
172 }
173 while (!inactive.empty()) {
174 delete inactive.back();
175 inactive.pop_back();
176 }
177 }
178};
179
180
181class PortReaderBufferBase::Private
182{
183private:
185 PortReaderPacket* prev;
186
187public:
189 unsigned int maxBuffer;
190 bool prune;
191 yarp::os::PortReader* replier;
192 double period;
193 double last_recv;
194
195 PortReaderPool pool;
196
197 int ct;
198 Port* port;
199 yarp::os::Semaphore contentSema;
200 yarp::os::Semaphore consumeSema;
201 std::mutex stateMutex;
202
203 Private(PortReaderBufferBase& owner, unsigned int maxBuffer) :
204 owner(owner),
205 prev(nullptr),
206 creator(nullptr),
207 maxBuffer(maxBuffer),
208 prune(false),
209 replier(nullptr),
210 period(-1),
211 last_recv(-1),
212 ct(0),
213 port(nullptr),
214 contentSema(0),
215 consumeSema(0),
216 stateMutex()
217 {
218 }
219
220 virtual ~Private()
221 {
222 Port* closePort = nullptr;
223 stateMutex.lock();
224 if (port != nullptr) {
225 closePort = port;
226 }
227 stateMutex.unlock();
228 if (closePort != nullptr) {
229 closePort->close();
230 }
231 stateMutex.lock();
232 clear();
233 stateMutex.unlock();
234 }
235
236 void clear()
237 {
238 if (prev != nullptr) {
239 pool.addInactivePacket(prev);
240 prev = nullptr;
241 }
242 pool.reset();
243 ct = 0;
244 }
245
246
247 std::string getName()
248 {
249 if (port != nullptr) {
250 return port->getName();
251 }
252 return {};
253 }
254
255 PortReaderPacket* get()
256 {
257 PortReaderPacket* result = nullptr;
258 bool grab = true;
259 if (pool.getFree() == 0) {
260 grab = false;
261 if (maxBuffer == 0 || pool.getCount() < maxBuffer) {
262 grab = true;
263 } else {
264 // ok, can't get free, clean space.
265 // here would be a good place to do buffer reuse.
266 }
267 }
268 if (grab) {
269 result = pool.getInactivePacket();
270 }
271
272 return result;
273 }
274
275 int checkContent()
276 {
277 return (int)pool.getCount();
278 }
279
280 PortReaderPacket* getContent()
281 {
282 if (prev != nullptr) {
283 pool.addInactivePacket(prev);
284 prev = nullptr;
285 }
286 if (pool.getCount() >= 1) {
287 prev = pool.getActivePacket();
288 ct--;
289 }
290 return prev;
291 }
292
293
294 bool getEnvelope(PortReader& envelope)
295 {
296 if (prev == nullptr) {
297 return false;
298 }
300 sis.add(prev->envelope);
301 sis.add("\r\n");
303 Route route;
304 sbr.reset(sis, nullptr, route, 0, true);
305 return envelope.read(sbr);
306 }
307
308 PortReaderPacket* dropContent()
309 {
310 // don't affect "prev"
311 PortReaderPacket* drop = nullptr;
312
313 if (pool.getCount() >= 1) {
314 drop = pool.getActivePacket();
315 if (drop != nullptr) {
316 pool.addInactivePacket(drop);
317 }
318 ct--;
319 }
320 return drop;
321 }
322
323 void attach(Port& port)
324 {
325 this->port = &port;
326 port.setReader(owner);
327 }
328
329 void* acquire()
330 {
331 if (prev != nullptr) {
332 void* result = prev;
333 prev = nullptr;
334 return result;
335 }
336 return nullptr;
337 }
338
339 void release(void* key)
340 {
341 if (key != nullptr) {
342 pool.addInactivePacket((PortReaderPacket*)key);
343 }
344 }
345};
346#endif // DOXYGEN_SHOULD_SKIP_THIS
347
348
349
350PortReaderBufferBase::PortReaderBufferBase(unsigned int maxBuffer) :
351 mPriv(new Private(*this, maxBuffer))
352{
353}
354
356{
357 delete mPriv;
358}
359
361{
362 if (mPriv->creator != nullptr) {
363 return mPriv->creator->create();
364 }
365 return nullptr;
366}
367
369{
370 mPriv->stateMutex.lock();
371 int count = mPriv->checkContent();
372 mPriv->stateMutex.unlock();
373 return count;
374}
375
376
378{
379 // give read a chance
380 mPriv->contentSema.post();
381}
382
383PortReader* PortReaderBufferBase::readBase(bool& missed, bool cleanup)
384{
385 missed = false;
386 if (mPriv->period < 0 || cleanup) {
387 mPriv->contentSema.wait();
388 } else {
389 bool ok = false;
390 double now = SystemClock::nowSystem();
391 double target = now + mPriv->period;
392 if (mPriv->last_recv > 0) {
393 target = mPriv->last_recv + mPriv->period;
394 }
395 double diff = target - now;
396 if (diff > 0) {
397 ok = mPriv->contentSema.waitWithTimeout(diff);
398 } else {
399 ok = mPriv->contentSema.check();
400 if (ok) {
401 mPriv->contentSema.wait();
402 }
403 }
404 if (!ok) {
405 missed = true;
406 if (mPriv->last_recv > 0) {
407 mPriv->last_recv += mPriv->period;
408 }
409 return nullptr;
410 }
412 if (mPriv->last_recv < 0) {
413 mPriv->last_recv = now;
414 } else {
415 diff = target - now;
416 if (diff > 0) {
418 }
419 mPriv->last_recv = target;
420 }
421 }
422 mPriv->stateMutex.lock();
423 PortReaderPacket* readerPacket = mPriv->getContent();
424 PortReader* reader = nullptr;
425 if (readerPacket != nullptr) {
426 PortReader* external = readerPacket->getExternal();
427 if (external == nullptr) {
428 reader = readerPacket->getReader();
429 } else {
430 reader = external;
431 }
432 }
433 mPriv->stateMutex.unlock();
434 if (reader != nullptr) {
435 mPriv->consumeSema.post();
436 }
437 return reader;
438}
439
440
442{
443 if (connection.getReference() != nullptr) {
444 //printf("REF %ld %d\n", (long int)connection.getReference(),
445 // connection.isValid());
446 return acceptObjectBase(connection.getReference(), nullptr);
447 }
448
449 if (mPriv->replier != nullptr) {
450 if (connection.getWriter() != nullptr) {
451 return mPriv->replier->read(connection);
452 }
453 }
454 PortReaderPacket* reader = nullptr;
455 while (reader == nullptr) {
456 mPriv->stateMutex.lock();
457 reader = mPriv->get();
458 if ((reader != nullptr) && reader->getReader() == nullptr) {
459 PortReader* next = create();
460 yCAssert(PORTREADERBUFFERBASE, next != nullptr);
461 reader->setReader(next);
462 }
463
464 mPriv->stateMutex.unlock();
465 if (reader == nullptr) {
466 mPriv->consumeSema.wait();
467 }
468 }
469 bool ok = false;
470 if (connection.isValid()) {
471 yCAssert(PORTREADERBUFFERBASE, reader->getReader() != nullptr);
472 ok = reader->getReader()->read(connection);
473 reader->setEnvelope(connection.readEnvelope());
474 } else {
475 // this is a disconnection
476 // don't talk to this port ever again
477 mPriv->port = nullptr;
478 }
479 if (ok) {
480 mPriv->stateMutex.lock();
481 bool pruned = false;
482 if (mPriv->ct > 0 && mPriv->prune) {
483 PortReaderPacket* readerPacket = mPriv->dropContent();
484 pruned = (readerPacket != nullptr);
485 }
486 //mPriv->configure(reader, false, true);
487 mPriv->pool.addActivePacket(reader);
488 mPriv->ct++;
489 mPriv->stateMutex.unlock();
490 if (!pruned) {
491 mPriv->contentSema.post();
492 }
493 yCTrace(PORTREADERBUFFERBASE, ">>>>>>>>>>>>>>>>> adding data");
494 } else {
495 mPriv->stateMutex.lock();
496 mPriv->pool.addInactivePacket(reader);
497 mPriv->stateMutex.unlock();
498 yCTrace(PORTREADERBUFFERBASE, ">>>>>>>>>>>>>>>>> skipping data");
499
500 // important to give reader a shot anyway, allowing proper closing
501 yCDebug(PORTREADERBUFFERBASE, "giving PortReaderBuffer chance to close");
502 mPriv->contentSema.post();
503 }
504 return ok;
505}
506
507
509{
510 mPriv->creator = creator;
511}
512
514{
515 mPriv->replier = &reader;
516}
517
519{
520 mPriv->prune = flag;
521}
522
524{
525 mPriv->period = period;
526}
527
529{
530 return mPriv->getName();
531}
532
534{
535 return mPriv->maxBuffer;
536}
537
539{
540 return mPriv->port == nullptr;
541}
542
544{
545 mPriv->attach(port);
546}
547
548
553
555 yarp::os::PortWriter* wrapper)
556{
557 // getting an object here should be basically the same as
558 // receiving from a Port -- except no need to create/read
559 // the object
560
561 PortReaderPacket* reader = nullptr;
562 while (reader == nullptr) {
563 mPriv->stateMutex.lock();
564 reader = mPriv->get();
565 mPriv->stateMutex.unlock();
566 if (reader == nullptr) {
567 mPriv->consumeSema.wait();
568 }
569 }
570
571 reader->setExternal(obj, wrapper);
572
573 mPriv->stateMutex.lock();
574 bool pruned = false;
575 if (mPriv->ct > 0 && mPriv->prune) {
576 PortReaderPacket* readerPacket = mPriv->dropContent();
577 pruned = (readerPacket != nullptr);
578 }
579 //mPriv->configure(reader, false, true);
580 mPriv->pool.addActivePacket(reader);
581 mPriv->ct++;
582 mPriv->stateMutex.unlock();
583 if (!pruned) {
584 mPriv->contentSema.post();
585 }
586 yCTrace(PORTREADERBUFFERBASE, ">>>>>>>>>>>>>>>>> adding data");
587
588 return true;
589}
590
591
593 yarp::os::PortWriter* wrapper)
594{
595 YARP_UNUSED(obj);
596 YARP_UNUSED(wrapper);
597 printf("Sorry, forgetting not implemented yet\n");
598 return false;
599}
600
601
603{
604 return mPriv->acquire();
605}
606
608{
609 mPriv->stateMutex.lock();
610 mPriv->release(key);
611 mPriv->stateMutex.unlock();
612}
613
614
616{
617 return mPriv->getEnvelope(envelope);
618}
619
621{
622 mPriv->clear();
623}
624
626{
627 yCError(PORTREADERBUFFERBASE, "Missing or incorrectly typed onRead function");
628}
void typedReaderMissingCallback()
A simple abstraction for a block of bytes.
Definition: Bytes.h:24
size_t length() const
Definition: Bytes.cpp:22
const char * get() const
Definition: Bytes.cpp:27
An interface for reading from a network connection.
virtual ConnectionWriter * getWriter()=0
Gets a way to reply to the message, if possible.
virtual Portable * getReference() const =0
Get a direct pointer to the object being sent, if possible.
virtual bool isValid() const =0
virtual Bytes readEnvelope()
Read a message envelope, if available.
virtual std::string getName() const
Get name of port.
Definition: Contactable.cpp:14
virtual yarp::os::PortReader * create()
virtual bool getEnvelope(PortReader &envelope)
virtual bool forgetObjectBase(yarp::os::PortReader *obj, yarp::os::PortWriter *wrapper)
void setCreator(PortReaderBufferBaseCreator *creator)
void setReplier(yarp::os::PortReader &reader)
virtual bool acceptObjectBase(yarp::os::PortReader *obj, yarp::os::PortWriter *wrapper)
Careful! merge with ::read – very similar code Until merge, don't change one without looking at other...
void attachBase(yarp::os::Port &port)
bool read(yarp::os::ConnectionReader &connection) override
Read this object from a network connection.
yarp::os::PortReader * readBase(bool &missed, bool cleanup)
Interface implemented by all objects that can read themselves from the network, such as Bottle object...
Definition: PortReader.h:24
virtual bool read(ConnectionReader &reader)=0
Read this object from a network connection.
Interface implemented by all objects that can write themselves to the network, such as Bottle objects...
Definition: PortWriter.h:23
virtual void onCompletion() const
This is called when the port has finished all writing operations.
Definition: PortWriter.cpp:13
A mini-server for network communication.
Definition: Port.h:46
void setReader(PortReader &reader) override
Set an external reader for port data.
Definition: Port.cpp:511
void close() override
Stop port activity.
Definition: Port.cpp:363
Information about a connection between two ports.
Definition: Route.h:28
A class for thread synchronization and mutual exclusion.
Definition: Semaphore.h:25
An InputStream that reads from a string.
void add(const std::string &txt)
static double nowSystem()
Definition: SystemClock.cpp:34
static void delaySystem(double seconds)
Definition: SystemClock.cpp:29
Lets Readable objects read from the underlying InputStream associated with the connection between two...
void reset(yarp::os::InputStream &in, TwoWayStream *str, const Route &route, size_t len, bool textMode, bool bareMode=false)
#define yCError(component,...)
Definition: LogComponent.h:213
#define yCAssert(component, x)
Definition: LogComponent.h:240
#define yCTrace(component,...)
Definition: LogComponent.h:84
#define yCDebug(component,...)
Definition: LogComponent.h:128
#define YARP_OS_LOG_COMPONENT(name, name_string)
Definition: LogComponent.h:29
double now()
Return the current time in seconds, relative to an arbitrary starting point.
Definition: Time.cpp:121
The components from which ports and connections are built.
An interface to the operating system, including Port based communication.
#define YARP_UNUSED(var)
Definition: api.h:162