30#ifndef DOXYGEN_SHOULD_SKIP_THIS
34 PortReaderPacket *prev_, *next_;
48 prev_ = next_ =
nullptr;
55 virtual ~PortReaderPacket()
63 if (reader !=
nullptr) {
80 this->reader = reader;
91 this->external = reader;
92 this->writer = writer;
95 void setEnvelope(
const Bytes& bytes)
97 envelope = std::string(bytes.
get(), bytes.
length());
103 if (writer !=
nullptr) {
115 std::list<PortReaderPacket*> inactive;
116 std::list<PortReaderPacket*> active;
121 return active.size();
126 return inactive.size();
129 PortReaderPacket* getInactivePacket()
131 if (inactive.empty()) {
132 PortReaderPacket* obj =
nullptr;
133 obj =
new PortReaderPacket();
134 inactive.push_back(obj);
136 PortReaderPacket* next = inactive.front();
137 yCAssert(PORTREADERBUFFERBASE, next !=
nullptr);
138 inactive.remove(next);
142 PortReaderPacket* getActivePacket()
144 PortReaderPacket* next =
nullptr;
145 if (getCount() >= 1) {
146 next = active.front();
147 yCAssert(PORTREADERBUFFERBASE, next !=
nullptr);
153 void addActivePacket(PortReaderPacket* packet)
155 if (packet !=
nullptr) {
156 active.push_back(packet);
160 void addInactivePacket(PortReaderPacket* packet)
162 if (packet !=
nullptr) {
163 inactive.push_back(packet);
169 while (!active.empty()) {
170 delete active.back();
173 while (!inactive.empty()) {
174 delete inactive.back();
181class PortReaderBufferBase::Private
185 PortReaderPacket* prev;
189 unsigned int maxBuffer;
201 std::mutex stateMutex;
207 maxBuffer(maxBuffer),
222 Port* closePort =
nullptr;
224 if (port !=
nullptr) {
228 if (closePort !=
nullptr) {
238 if (prev !=
nullptr) {
239 pool.addInactivePacket(prev);
247 std::string getName()
249 if (port !=
nullptr) {
255 PortReaderPacket* get()
257 PortReaderPacket* result =
nullptr;
259 if (pool.getFree() == 0) {
261 if (maxBuffer == 0 || pool.getCount() < maxBuffer) {
269 result = pool.getInactivePacket();
277 return (
int)pool.getCount();
280 PortReaderPacket* getContent()
282 if (prev !=
nullptr) {
283 pool.addInactivePacket(prev);
286 if (pool.getCount() >= 1) {
287 prev = pool.getActivePacket();
296 if (prev ==
nullptr) {
300 sis.
add(prev->envelope);
304 sbr.
reset(sis,
nullptr, route, 0,
true);
305 return envelope.
read(sbr);
308 PortReaderPacket* dropContent()
311 PortReaderPacket* drop =
nullptr;
313 if (pool.getCount() >= 1) {
314 drop = pool.getActivePacket();
315 if (drop !=
nullptr) {
316 pool.addInactivePacket(drop);
323 void attach(
Port& port)
331 if (prev !=
nullptr) {
339 void release(
void* key)
341 if (key !=
nullptr) {
342 pool.addInactivePacket((PortReaderPacket*)key);
350PortReaderBufferBase::PortReaderBufferBase(
unsigned int maxBuffer) :
351 mPriv(new Private(*this, maxBuffer))
362 if (mPriv->creator !=
nullptr) {
363 return mPriv->creator->create();
370 mPriv->stateMutex.lock();
371 int count = mPriv->checkContent();
372 mPriv->stateMutex.unlock();
380 mPriv->contentSema.post();
386 if (mPriv->period < 0 || cleanup) {
387 mPriv->contentSema.wait();
391 double target =
now + mPriv->period;
392 if (mPriv->last_recv > 0) {
393 target = mPriv->last_recv + mPriv->period;
395 double diff = target -
now;
397 ok = mPriv->contentSema.waitWithTimeout(diff);
399 ok = mPriv->contentSema.check();
401 mPriv->contentSema.wait();
406 if (mPriv->last_recv > 0) {
407 mPriv->last_recv += mPriv->period;
412 if (mPriv->last_recv < 0) {
413 mPriv->last_recv =
now;
419 mPriv->last_recv = target;
422 mPriv->stateMutex.lock();
423 PortReaderPacket* readerPacket = mPriv->getContent();
425 if (readerPacket !=
nullptr) {
426 PortReader* external = readerPacket->getExternal();
427 if (external ==
nullptr) {
428 reader = readerPacket->getReader();
433 mPriv->stateMutex.unlock();
434 if (reader !=
nullptr) {
435 mPriv->consumeSema.post();
449 if (mPriv->replier !=
nullptr) {
451 return mPriv->replier->read(connection);
454 PortReaderPacket* reader =
nullptr;
455 while (reader ==
nullptr) {
456 mPriv->stateMutex.lock();
457 reader = mPriv->get();
458 if ((reader !=
nullptr) && reader->getReader() ==
nullptr) {
460 yCAssert(PORTREADERBUFFERBASE, next !=
nullptr);
461 reader->setReader(next);
464 mPriv->stateMutex.unlock();
465 if (reader ==
nullptr) {
466 mPriv->consumeSema.wait();
471 yCAssert(PORTREADERBUFFERBASE, reader->getReader() !=
nullptr);
472 ok = reader->getReader()->read(connection);
477 mPriv->port =
nullptr;
480 mPriv->stateMutex.lock();
482 if (mPriv->ct > 0 && mPriv->prune) {
483 PortReaderPacket* readerPacket = mPriv->dropContent();
484 pruned = (readerPacket !=
nullptr);
487 mPriv->pool.addActivePacket(reader);
489 mPriv->stateMutex.unlock();
491 mPriv->contentSema.post();
493 yCTrace(PORTREADERBUFFERBASE,
">>>>>>>>>>>>>>>>> adding data");
495 mPriv->stateMutex.lock();
496 mPriv->pool.addInactivePacket(reader);
497 mPriv->stateMutex.unlock();
498 yCTrace(PORTREADERBUFFERBASE,
">>>>>>>>>>>>>>>>> skipping data");
501 yCDebug(PORTREADERBUFFERBASE,
"giving PortReaderBuffer chance to close");
502 mPriv->contentSema.post();
510 mPriv->creator = creator;
515 mPriv->replier = &reader;
525 mPriv->period = period;
530 return mPriv->getName();
535 return mPriv->maxBuffer;
540 return mPriv->port ==
nullptr;
561 PortReaderPacket* reader =
nullptr;
562 while (reader ==
nullptr) {
563 mPriv->stateMutex.lock();
564 reader = mPriv->get();
565 mPriv->stateMutex.unlock();
566 if (reader ==
nullptr) {
567 mPriv->consumeSema.wait();
571 reader->setExternal(obj, wrapper);
573 mPriv->stateMutex.lock();
575 if (mPriv->ct > 0 && mPriv->prune) {
576 PortReaderPacket* readerPacket = mPriv->dropContent();
577 pruned = (readerPacket !=
nullptr);
580 mPriv->pool.addActivePacket(reader);
582 mPriv->stateMutex.unlock();
584 mPriv->contentSema.post();
586 yCTrace(PORTREADERBUFFERBASE,
">>>>>>>>>>>>>>>>> adding data");
597 printf(
"Sorry, forgetting not implemented yet\n");
604 return mPriv->acquire();
609 mPriv->stateMutex.lock();
611 mPriv->stateMutex.unlock();
617 return mPriv->getEnvelope(envelope);
627 yCError(PORTREADERBUFFERBASE,
"Missing or incorrectly typed onRead function");
void typedReaderMissingCallback()
A simple abstraction for a block of bytes.
An interface for reading from a network connection.
virtual ConnectionWriter * getWriter()=0
Gets a way to reply to the message, if possible.
virtual Portable * getReference() const =0
Get a direct pointer to the object being sent, if possible.
virtual bool isValid() const =0
virtual Bytes readEnvelope()
Read a message envelope, if available.
virtual yarp::os::PortReader * create()
std::string getName() const
virtual bool getEnvelope(PortReader &envelope)
void setPrune(bool flag=true)
virtual bool forgetObjectBase(yarp::os::PortReader *obj, yarp::os::PortWriter *wrapper)
void setCreator(PortReaderBufferBaseCreator *creator)
void setReplier(yarp::os::PortReader &reader)
virtual bool acceptObjectBase(yarp::os::PortReader *obj, yarp::os::PortWriter *wrapper)
Careful! merge with ::read – very similar code Until merge, don't change one without looking at other...
void attachBase(yarp::os::Port &port)
bool read(yarp::os::ConnectionReader &connection) override
Read this object from a network connection.
yarp::os::PortReader * readBase(bool &missed, bool cleanup)
void setTargetPeriod(double period)
unsigned int getMaxBuffer()
virtual ~PortReaderBufferBase()
Interface implemented by all objects that can read themselves from the network, such as Bottle object...
virtual bool read(ConnectionReader &reader)=0
Read this object from a network connection.
Interface implemented by all objects that can write themselves to the network, such as Bottle objects...
virtual void onCompletion() const
This is called when the port has finished all writing operations.
A mini-server for network communication.
void setReader(PortReader &reader) override
Set an external reader for port data.
void close() override
Stop port activity.
Information about a connection between two ports.
A class for thread synchronization and mutual exclusion.
static double nowSystem()
static void delaySystem(double seconds)
Lets Readable objects read from the underlying InputStream associated with the connection between two...
void reset(yarp::os::InputStream &in, TwoWayStream *str, const Route &route, size_t len, bool textMode, bool bareMode=false)
#define yCError(component,...)
#define yCAssert(component, x)
#define yCTrace(component,...)
#define yCDebug(component,...)
#define YARP_OS_LOG_COMPONENT(name, name_string)
double now()
Return the current time in seconds, relative to an arbitrary starting point.
The components from which ports and connections are built.
An interface to the operating system, including Port based communication.