33 #ifndef DOXYGEN_SHOULD_SKIP_THIS
34 class PortReaderPacket
37 PortReaderPacket *prev_, *next_;
51 prev_ = next_ =
nullptr;
58 virtual ~PortReaderPacket()
66 if (reader !=
nullptr) {
83 this->reader = reader;
94 this->external = reader;
95 this->writer = writer;
98 void setEnvelope(
const Bytes& bytes)
100 envelope = std::string(bytes.
get(), bytes.
length());
106 if (writer !=
nullptr) {
118 std::list<PortReaderPacket*> inactive;
119 std::list<PortReaderPacket*> active;
124 return active.size();
129 return inactive.size();
132 PortReaderPacket* getInactivePacket()
134 if (inactive.empty()) {
135 PortReaderPacket* obj =
nullptr;
136 obj =
new PortReaderPacket();
137 inactive.push_back(obj);
139 PortReaderPacket* next = inactive.front();
140 yCAssert(PORTREADERBUFFERBASE, next !=
nullptr);
141 inactive.remove(next);
145 PortReaderPacket* getActivePacket()
147 PortReaderPacket* next =
nullptr;
148 if (getCount() >= 1) {
149 next = active.front();
150 yCAssert(PORTREADERBUFFERBASE, next !=
nullptr);
156 void addActivePacket(PortReaderPacket* packet)
158 if (packet !=
nullptr) {
159 active.push_back(packet);
163 void addInactivePacket(PortReaderPacket* packet)
165 if (packet !=
nullptr) {
166 inactive.push_back(packet);
172 while (!active.empty()) {
173 delete active.back();
176 while (!inactive.empty()) {
177 delete inactive.back();
184 class PortReaderBufferBase::Private
188 PortReaderPacket* prev;
192 unsigned int maxBuffer;
204 std::mutex stateMutex;
210 maxBuffer(maxBuffer),
225 Port* closePort =
nullptr;
227 if (port !=
nullptr) {
231 if (closePort !=
nullptr) {
241 if (prev !=
nullptr) {
242 pool.addInactivePacket(prev);
250 std::string getName()
252 if (port !=
nullptr) {
258 PortReaderPacket* get()
260 PortReaderPacket* result =
nullptr;
262 if (pool.getFree() == 0) {
264 if (maxBuffer == 0 || pool.getCount() < maxBuffer) {
272 result = pool.getInactivePacket();
280 return (
int)pool.getCount();
283 PortReaderPacket* getContent()
285 if (prev !=
nullptr) {
286 pool.addInactivePacket(prev);
289 if (pool.getCount() >= 1) {
290 prev = pool.getActivePacket();
299 if (prev ==
nullptr) {
303 sis.
add(prev->envelope);
307 sbr.
reset(sis,
nullptr, route, 0,
true);
308 return envelope.
read(sbr);
311 PortReaderPacket* dropContent()
314 PortReaderPacket* drop =
nullptr;
316 if (pool.getCount() >= 1) {
317 drop = pool.getActivePacket();
318 if (drop !=
nullptr) {
319 pool.addInactivePacket(drop);
326 void attach(
Port& port)
334 if (prev !=
nullptr) {
342 void release(
void* key)
344 if (key !=
nullptr) {
345 pool.addInactivePacket((PortReaderPacket*)key);
354 mPriv(new Private(*this, maxBuffer))
365 if (mPriv->creator !=
nullptr) {
366 return mPriv->creator->create();
373 mPriv->stateMutex.lock();
374 int count = mPriv->checkContent();
375 mPriv->stateMutex.unlock();
383 mPriv->contentSema.post();
389 if (mPriv->period < 0 || cleanup) {
390 mPriv->contentSema.wait();
394 double target =
now + mPriv->period;
395 if (mPriv->last_recv > 0) {
396 target = mPriv->last_recv + mPriv->period;
398 double diff = target -
now;
400 ok = mPriv->contentSema.waitWithTimeout(diff);
402 ok = mPriv->contentSema.check();
404 mPriv->contentSema.wait();
409 if (mPriv->last_recv > 0) {
410 mPriv->last_recv += mPriv->period;
415 if (mPriv->last_recv < 0) {
416 mPriv->last_recv =
now;
422 mPriv->last_recv = target;
425 mPriv->stateMutex.lock();
426 PortReaderPacket* readerPacket = mPriv->getContent();
428 if (readerPacket !=
nullptr) {
429 PortReader* external = readerPacket->getExternal();
430 if (external ==
nullptr) {
431 reader = readerPacket->getReader();
436 mPriv->stateMutex.unlock();
437 if (reader !=
nullptr) {
438 mPriv->consumeSema.post();
452 if (mPriv->replier !=
nullptr) {
454 return mPriv->replier->read(connection);
457 PortReaderPacket* reader =
nullptr;
458 while (reader ==
nullptr) {
459 mPriv->stateMutex.lock();
460 reader = mPriv->get();
461 if ((reader !=
nullptr) && reader->getReader() ==
nullptr) {
463 yCAssert(PORTREADERBUFFERBASE, next !=
nullptr);
464 reader->setReader(next);
467 mPriv->stateMutex.unlock();
468 if (reader ==
nullptr) {
469 mPriv->consumeSema.wait();
474 yCAssert(PORTREADERBUFFERBASE, reader->getReader() !=
nullptr);
475 ok = reader->getReader()->read(connection);
480 mPriv->port =
nullptr;
483 mPriv->stateMutex.lock();
485 if (mPriv->ct > 0 && mPriv->prune) {
486 PortReaderPacket* readerPacket = mPriv->dropContent();
487 pruned = (readerPacket !=
nullptr);
490 mPriv->pool.addActivePacket(reader);
492 mPriv->stateMutex.unlock();
494 mPriv->contentSema.post();
496 yCTrace(PORTREADERBUFFERBASE,
">>>>>>>>>>>>>>>>> adding data");
498 mPriv->stateMutex.lock();
499 mPriv->pool.addInactivePacket(reader);
500 mPriv->stateMutex.unlock();
501 yCTrace(PORTREADERBUFFERBASE,
">>>>>>>>>>>>>>>>> skipping data");
504 yCDebug(PORTREADERBUFFERBASE,
"giving PortReaderBuffer chance to close");
505 mPriv->contentSema.post();
513 mPriv->creator = creator;
518 mPriv->replier = &reader;
528 mPriv->period = period;
533 return mPriv->getName();
538 return mPriv->maxBuffer;
543 return mPriv->port ==
nullptr;
564 PortReaderPacket* reader =
nullptr;
565 while (reader ==
nullptr) {
566 mPriv->stateMutex.lock();
567 reader = mPriv->get();
568 mPriv->stateMutex.unlock();
569 if (reader ==
nullptr) {
570 mPriv->consumeSema.wait();
574 reader->setExternal(obj, wrapper);
576 mPriv->stateMutex.lock();
578 if (mPriv->ct > 0 && mPriv->prune) {
579 PortReaderPacket* readerPacket = mPriv->dropContent();
580 pruned = (readerPacket !=
nullptr);
583 mPriv->pool.addActivePacket(reader);
585 mPriv->stateMutex.unlock();
587 mPriv->contentSema.post();
589 yCTrace(PORTREADERBUFFERBASE,
">>>>>>>>>>>>>>>>> adding data");
600 printf(
"Sorry, forgetting not implemented yet\n");
607 return mPriv->acquire();
612 mPriv->stateMutex.lock();
614 mPriv->stateMutex.unlock();
620 return mPriv->getEnvelope(envelope);
630 yCError(PORTREADERBUFFERBASE,
"Missing or incorrectly typed onRead function");
void typedReaderMissingCallback()
A simple abstraction for a block of bytes.
An interface for reading from a network connection.
virtual Portable * getReference() const =0
Get a direct pointer to the object being sent, if possible.
virtual bool isValid() const =0
virtual Bytes readEnvelope()
Read a message envelope, if available.
virtual ConnectionWriter * getWriter()=0
Gets a way to reply to the message, if possible.
virtual yarp::os::PortReader * create()
std::string getName() const
PortReaderBufferBase(unsigned int maxBuffer)
virtual bool getEnvelope(PortReader &envelope)
void setPrune(bool flag=true)
virtual bool forgetObjectBase(yarp::os::PortReader *obj, yarp::os::PortWriter *wrapper)
void setCreator(PortReaderBufferBaseCreator *creator)
void setReplier(yarp::os::PortReader &reader)
virtual bool acceptObjectBase(yarp::os::PortReader *obj, yarp::os::PortWriter *wrapper)
Careful! merge with ::read – very similar code Until merge, don't change one without looking at other...
void attachBase(yarp::os::Port &port)
bool read(yarp::os::ConnectionReader &connection) override
Read this object from a network connection.
yarp::os::PortReader * readBase(bool &missed, bool cleanup)
void setTargetPeriod(double period)
unsigned int getMaxBuffer()
virtual ~PortReaderBufferBase()
Interface implemented by all objects that can read themselves from the network, such as Bottle object...
virtual bool read(ConnectionReader &reader)=0
Read this object from a network connection.
Interface implemented by all objects that can write themselves to the network, such as Bottle objects...
virtual void onCompletion() const
This is called when the port has finished all writing operations.
A mini-server for network communication.
void setReader(PortReader &reader) override
Set an external reader for port data.
void close() override
Stop port activity.
Information about a connection between two ports.
A class for thread synchronization and mutual exclusion.
static double nowSystem()
static void delaySystem(double seconds)
Lets Readable objects read from the underlying InputStream associated with the connection between two...
void reset(yarp::os::InputStream &in, TwoWayStream *str, const Route &route, size_t len, bool textMode, bool bareMode=false)
#define yCError(component,...)
#define yCAssert(component, x)
#define yCTrace(component,...)
#define yCDebug(component,...)
#define YARP_OS_LOG_COMPONENT(name, name_string)
double now()
Return the current time in seconds, relative to an arbitrary starting point.
The components from which ports and connections are built.
An interface to the operating system, including Port based communication.