YARP
Yet Another Robot Platform
PortReaderBufferBase.cpp
Go to the documentation of this file.
1 /*
2  * SPDX-FileCopyrightText: 2006-2021 Istituto Italiano di Tecnologia (IIT)
3  * SPDX-FileCopyrightText: 2006-2010 RobotCub Consortium
4  * SPDX-License-Identifier: BSD-3-Clause
5  */
6 
8 
9 #include <yarp/os/Os.h>
11 #include <yarp/os/Portable.h>
12 #include <yarp/os/Semaphore.h>
14 #include <yarp/os/Thread.h>
15 #include <yarp/os/Time.h>
19 
20 #include <list>
21 #include <mutex>
22 
23 using namespace yarp::os::impl;
24 using namespace yarp::os;
25 
26 namespace {
27 YARP_OS_LOG_COMPONENT(PORTREADERBUFFERBASE, "yarp.os.PortReaderBufferBase")
28 } // namespace
29 
30 #ifndef DOXYGEN_SHOULD_SKIP_THIS
31 class PortReaderPacket
32 {
33 public:
34  PortReaderPacket *prev_, *next_;
35 
36  // if non-null, contains a buffer that the packet owns
37  PortReader* reader;
38 
39  std::string envelope;
40 
41  // if nun-null, refers to an external buffer
42  // by convention, overrides reader
43  PortReader* external;
44  PortWriter* writer; // if a callback is needed
45 
46  PortReaderPacket()
47  {
48  prev_ = next_ = nullptr;
49  reader = nullptr;
50  external = nullptr;
51  writer = nullptr;
52  reset();
53  }
54 
55  virtual ~PortReaderPacket()
56  {
57  resetExternal();
58  reset();
59  }
60 
61  void reset()
62  {
63  if (reader != nullptr) {
64  delete reader;
65  reader = nullptr;
66  }
67  writer = nullptr;
68  envelope = "";
69  }
70 
71  PortReader* getReader()
72  {
73  return reader;
74  }
75 
76  void setReader(PortReader* reader)
77  {
78  resetExternal();
79  reset();
80  this->reader = reader;
81  }
82 
83  PortReader* getExternal()
84  {
85  return external;
86  }
87 
88  void setExternal(PortReader* reader, PortWriter* writer)
89  {
90  resetExternal();
91  this->external = reader;
92  this->writer = writer;
93  }
94 
95  void setEnvelope(const Bytes& bytes)
96  {
97  envelope = std::string(bytes.get(), bytes.length());
98  //envelope.set(bytes.get(), bytes.length(), 1);
99  }
100 
101  void resetExternal()
102  {
103  if (writer != nullptr) {
104  writer->onCompletion();
105  writer = nullptr;
106  }
107  external = nullptr;
108  }
109 };
110 
111 
112 class PortReaderPool
113 {
114 private:
115  std::list<PortReaderPacket*> inactive;
116  std::list<PortReaderPacket*> active;
117 
118 public:
119  size_t getCount()
120  {
121  return active.size();
122  }
123 
124  size_t getFree()
125  {
126  return inactive.size();
127  }
128 
129  PortReaderPacket* getInactivePacket()
130  {
131  if (inactive.empty()) {
132  PortReaderPacket* obj = nullptr;
133  obj = new PortReaderPacket();
134  inactive.push_back(obj);
135  }
136  PortReaderPacket* next = inactive.front();
137  yCAssert(PORTREADERBUFFERBASE, next != nullptr);
138  inactive.remove(next);
139  return next;
140  }
141 
142  PortReaderPacket* getActivePacket()
143  {
144  PortReaderPacket* next = nullptr;
145  if (getCount() >= 1) {
146  next = active.front();
147  yCAssert(PORTREADERBUFFERBASE, next != nullptr);
148  active.remove(next);
149  }
150  return next;
151  }
152 
153  void addActivePacket(PortReaderPacket* packet)
154  {
155  if (packet != nullptr) {
156  active.push_back(packet);
157  }
158  }
159 
160  void addInactivePacket(PortReaderPacket* packet)
161  {
162  if (packet != nullptr) {
163  inactive.push_back(packet);
164  }
165  }
166 
167  void reset()
168  {
169  while (!active.empty()) {
170  delete active.back();
171  active.pop_back();
172  }
173  while (!inactive.empty()) {
174  delete inactive.back();
175  inactive.pop_back();
176  }
177  }
178 };
179 
180 
181 class PortReaderBufferBase::Private
182 {
183 private:
184  PortReaderBufferBase& owner;
185  PortReaderPacket* prev;
186 
187 public:
189  unsigned int maxBuffer;
190  bool prune;
191  yarp::os::PortReader* replier;
192  double period;
193  double last_recv;
194 
195  PortReaderPool pool;
196 
197  int ct;
198  Port* port;
199  yarp::os::Semaphore contentSema;
200  yarp::os::Semaphore consumeSema;
201  std::mutex stateMutex;
202 
203  Private(PortReaderBufferBase& owner, unsigned int maxBuffer) :
204  owner(owner),
205  prev(nullptr),
206  creator(nullptr),
207  maxBuffer(maxBuffer),
208  prune(false),
209  replier(nullptr),
210  period(-1),
211  last_recv(-1),
212  ct(0),
213  port(nullptr),
214  contentSema(0),
215  consumeSema(0),
216  stateMutex()
217  {
218  }
219 
220  virtual ~Private()
221  {
222  Port* closePort = nullptr;
223  stateMutex.lock();
224  if (port != nullptr) {
225  closePort = port;
226  }
227  stateMutex.unlock();
228  if (closePort != nullptr) {
229  closePort->close();
230  }
231  stateMutex.lock();
232  clear();
233  stateMutex.unlock();
234  }
235 
236  void clear()
237  {
238  if (prev != nullptr) {
239  pool.addInactivePacket(prev);
240  prev = nullptr;
241  }
242  pool.reset();
243  ct = 0;
244  }
245 
246 
247  std::string getName()
248  {
249  if (port != nullptr) {
250  return port->getName();
251  }
252  return {};
253  }
254 
255  PortReaderPacket* get()
256  {
257  PortReaderPacket* result = nullptr;
258  bool grab = true;
259  if (pool.getFree() == 0) {
260  grab = false;
261  if (maxBuffer == 0 || pool.getCount() < maxBuffer) {
262  grab = true;
263  } else {
264  // ok, can't get free, clean space.
265  // here would be a good place to do buffer reuse.
266  }
267  }
268  if (grab) {
269  result = pool.getInactivePacket();
270  }
271 
272  return result;
273  }
274 
275  int checkContent()
276  {
277  return (int)pool.getCount();
278  }
279 
280  PortReaderPacket* getContent()
281  {
282  if (prev != nullptr) {
283  pool.addInactivePacket(prev);
284  prev = nullptr;
285  }
286  if (pool.getCount() >= 1) {
287  prev = pool.getActivePacket();
288  ct--;
289  }
290  return prev;
291  }
292 
293 
294  bool getEnvelope(PortReader& envelope)
295  {
296  if (prev == nullptr) {
297  return false;
298  }
299  StringInputStream sis;
300  sis.add(prev->envelope);
301  sis.add("\r\n");
303  Route route;
304  sbr.reset(sis, nullptr, route, 0, true);
305  return envelope.read(sbr);
306  }
307 
308  PortReaderPacket* dropContent()
309  {
310  // don't affect "prev"
311  PortReaderPacket* drop = nullptr;
312 
313  if (pool.getCount() >= 1) {
314  drop = pool.getActivePacket();
315  if (drop != nullptr) {
316  pool.addInactivePacket(drop);
317  }
318  ct--;
319  }
320  return drop;
321  }
322 
323  void attach(Port& port)
324  {
325  this->port = &port;
326  port.setReader(owner);
327  }
328 
329  void* acquire()
330  {
331  if (prev != nullptr) {
332  void* result = prev;
333  prev = nullptr;
334  return result;
335  }
336  return nullptr;
337  }
338 
339  void release(void* key)
340  {
341  if (key != nullptr) {
342  pool.addInactivePacket((PortReaderPacket*)key);
343  }
344  }
345 };
346 #endif // DOXYGEN_SHOULD_SKIP_THIS
347 
348 
349 
351  mPriv(new Private(*this, maxBuffer))
352 {
353 }
354 
356 {
357  delete mPriv;
358 }
359 
361 {
362  if (mPriv->creator != nullptr) {
363  return mPriv->creator->create();
364  }
365  return nullptr;
366 }
367 
369 {
370  mPriv->stateMutex.lock();
371  int count = mPriv->checkContent();
372  mPriv->stateMutex.unlock();
373  return count;
374 }
375 
376 
378 {
379  // give read a chance
380  mPriv->contentSema.post();
381 }
382 
383 PortReader* PortReaderBufferBase::readBase(bool& missed, bool cleanup)
384 {
385  missed = false;
386  if (mPriv->period < 0 || cleanup) {
387  mPriv->contentSema.wait();
388  } else {
389  bool ok = false;
390  double now = SystemClock::nowSystem();
391  double target = now + mPriv->period;
392  if (mPriv->last_recv > 0) {
393  target = mPriv->last_recv + mPriv->period;
394  }
395  double diff = target - now;
396  if (diff > 0) {
397  ok = mPriv->contentSema.waitWithTimeout(diff);
398  } else {
399  ok = mPriv->contentSema.check();
400  if (ok) {
401  mPriv->contentSema.wait();
402  }
403  }
404  if (!ok) {
405  missed = true;
406  if (mPriv->last_recv > 0) {
407  mPriv->last_recv += mPriv->period;
408  }
409  return nullptr;
410  }
412  if (mPriv->last_recv < 0) {
413  mPriv->last_recv = now;
414  } else {
415  diff = target - now;
416  if (diff > 0) {
418  }
419  mPriv->last_recv = target;
420  }
421  }
422  mPriv->stateMutex.lock();
423  PortReaderPacket* readerPacket = mPriv->getContent();
424  PortReader* reader = nullptr;
425  if (readerPacket != nullptr) {
426  PortReader* external = readerPacket->getExternal();
427  if (external == nullptr) {
428  reader = readerPacket->getReader();
429  } else {
430  reader = external;
431  }
432  }
433  mPriv->stateMutex.unlock();
434  if (reader != nullptr) {
435  mPriv->consumeSema.post();
436  }
437  return reader;
438 }
439 
440 
442 {
443  if (connection.getReference() != nullptr) {
444  //printf("REF %ld %d\n", (long int)connection.getReference(),
445  // connection.isValid());
446  return acceptObjectBase(connection.getReference(), nullptr);
447  }
448 
449  if (mPriv->replier != nullptr) {
450  if (connection.getWriter() != nullptr) {
451  return mPriv->replier->read(connection);
452  }
453  }
454  PortReaderPacket* reader = nullptr;
455  while (reader == nullptr) {
456  mPriv->stateMutex.lock();
457  reader = mPriv->get();
458  if ((reader != nullptr) && reader->getReader() == nullptr) {
459  PortReader* next = create();
460  yCAssert(PORTREADERBUFFERBASE, next != nullptr);
461  reader->setReader(next);
462  }
463 
464  mPriv->stateMutex.unlock();
465  if (reader == nullptr) {
466  mPriv->consumeSema.wait();
467  }
468  }
469  bool ok = false;
470  if (connection.isValid()) {
471  yCAssert(PORTREADERBUFFERBASE, reader->getReader() != nullptr);
472  ok = reader->getReader()->read(connection);
473  reader->setEnvelope(connection.readEnvelope());
474  } else {
475  // this is a disconnection
476  // don't talk to this port ever again
477  mPriv->port = nullptr;
478  }
479  if (ok) {
480  mPriv->stateMutex.lock();
481  bool pruned = false;
482  if (mPriv->ct > 0 && mPriv->prune) {
483  PortReaderPacket* readerPacket = mPriv->dropContent();
484  pruned = (readerPacket != nullptr);
485  }
486  //mPriv->configure(reader, false, true);
487  mPriv->pool.addActivePacket(reader);
488  mPriv->ct++;
489  mPriv->stateMutex.unlock();
490  if (!pruned) {
491  mPriv->contentSema.post();
492  }
493  yCTrace(PORTREADERBUFFERBASE, ">>>>>>>>>>>>>>>>> adding data");
494  } else {
495  mPriv->stateMutex.lock();
496  mPriv->pool.addInactivePacket(reader);
497  mPriv->stateMutex.unlock();
498  yCTrace(PORTREADERBUFFERBASE, ">>>>>>>>>>>>>>>>> skipping data");
499 
500  // important to give reader a shot anyway, allowing proper closing
501  yCDebug(PORTREADERBUFFERBASE, "giving PortReaderBuffer chance to close");
502  mPriv->contentSema.post();
503  }
504  return ok;
505 }
506 
507 
509 {
510  mPriv->creator = creator;
511 }
512 
514 {
515  mPriv->replier = &reader;
516 }
517 
519 {
520  mPriv->prune = flag;
521 }
522 
524 {
525  mPriv->period = period;
526 }
527 
528 std::string PortReaderBufferBase::getName() const
529 {
530  return mPriv->getName();
531 }
532 
534 {
535  return mPriv->maxBuffer;
536 }
537 
539 {
540  return mPriv->port == nullptr;
541 }
542 
544 {
545  mPriv->attach(port);
546 }
547 
548 
553 
555  yarp::os::PortWriter* wrapper)
556 {
557  // getting an object here should be basically the same as
558  // receiving from a Port -- except no need to create/read
559  // the object
560 
561  PortReaderPacket* reader = nullptr;
562  while (reader == nullptr) {
563  mPriv->stateMutex.lock();
564  reader = mPriv->get();
565  mPriv->stateMutex.unlock();
566  if (reader == nullptr) {
567  mPriv->consumeSema.wait();
568  }
569  }
570 
571  reader->setExternal(obj, wrapper);
572 
573  mPriv->stateMutex.lock();
574  bool pruned = false;
575  if (mPriv->ct > 0 && mPriv->prune) {
576  PortReaderPacket* readerPacket = mPriv->dropContent();
577  pruned = (readerPacket != nullptr);
578  }
579  //mPriv->configure(reader, false, true);
580  mPriv->pool.addActivePacket(reader);
581  mPriv->ct++;
582  mPriv->stateMutex.unlock();
583  if (!pruned) {
584  mPriv->contentSema.post();
585  }
586  yCTrace(PORTREADERBUFFERBASE, ">>>>>>>>>>>>>>>>> adding data");
587 
588  return true;
589 }
590 
591 
593  yarp::os::PortWriter* wrapper)
594 {
595  YARP_UNUSED(obj);
596  YARP_UNUSED(wrapper);
597  printf("Sorry, forgetting not implemented yet\n");
598  return false;
599 }
600 
601 
603 {
604  return mPriv->acquire();
605 }
606 
608 {
609  mPriv->stateMutex.lock();
610  mPriv->release(key);
611  mPriv->stateMutex.unlock();
612 }
613 
614 
616 {
617  return mPriv->getEnvelope(envelope);
618 }
619 
621 {
622  mPriv->clear();
623 }
624 
626 {
627  yCError(PORTREADERBUFFERBASE, "Missing or incorrectly typed onRead function");
628 }
void typedReaderMissingCallback()
A simple abstraction for a block of bytes.
Definition: Bytes.h:24
size_t length() const
Definition: Bytes.cpp:22
const char * get() const
Definition: Bytes.cpp:27
An interface for reading from a network connection.
virtual Portable * getReference() const =0
Get a direct pointer to the object being sent, if possible.
virtual bool isValid() const =0
virtual Bytes readEnvelope()
Read a message envelope, if available.
virtual ConnectionWriter * getWriter()=0
Gets a way to reply to the message, if possible.
virtual std::string getName() const
Get name of port.
Definition: Contactable.cpp:14
virtual yarp::os::PortReader * create()
PortReaderBufferBase(unsigned int maxBuffer)
virtual bool getEnvelope(PortReader &envelope)
virtual bool forgetObjectBase(yarp::os::PortReader *obj, yarp::os::PortWriter *wrapper)
void setCreator(PortReaderBufferBaseCreator *creator)
void setReplier(yarp::os::PortReader &reader)
virtual bool acceptObjectBase(yarp::os::PortReader *obj, yarp::os::PortWriter *wrapper)
Careful! merge with ::read – very similar code Until merge, don't change one without looking at other...
void attachBase(yarp::os::Port &port)
bool read(yarp::os::ConnectionReader &connection) override
Read this object from a network connection.
yarp::os::PortReader * readBase(bool &missed, bool cleanup)
Interface implemented by all objects that can read themselves from the network, such as Bottle object...
Definition: PortReader.h:24
virtual bool read(ConnectionReader &reader)=0
Read this object from a network connection.
Interface implemented by all objects that can write themselves to the network, such as Bottle objects...
Definition: PortWriter.h:23
virtual void onCompletion() const
This is called when the port has finished all writing operations.
Definition: PortWriter.cpp:13
A mini-server for network communication.
Definition: Port.h:46
void setReader(PortReader &reader) override
Set an external reader for port data.
Definition: Port.cpp:511
void close() override
Stop port activity.
Definition: Port.cpp:363
Information about a connection between two ports.
Definition: Route.h:28
A class for thread synchronization and mutual exclusion.
Definition: Semaphore.h:25
An InputStream that reads from a string.
void add(const std::string &txt)
static double nowSystem()
Definition: SystemClock.cpp:34
static void delaySystem(double seconds)
Definition: SystemClock.cpp:29
Lets Readable objects read from the underlying InputStream associated with the connection between two...
void reset(yarp::os::InputStream &in, TwoWayStream *str, const Route &route, size_t len, bool textMode, bool bareMode=false)
#define yCError(component,...)
Definition: LogComponent.h:213
#define yCAssert(component, x)
Definition: LogComponent.h:240
#define yCTrace(component,...)
Definition: LogComponent.h:84
#define yCDebug(component,...)
Definition: LogComponent.h:128
#define YARP_OS_LOG_COMPONENT(name, name_string)
Definition: LogComponent.h:29
double now()
Return the current time in seconds, relative to an arbitrary starting point.
Definition: Time.cpp:121
The components from which ports and connections are built.
An interface to the operating system, including Port based communication.
#define YARP_UNUSED(var)
Definition: api.h:162