30#ifndef DOXYGEN_SHOULD_SKIP_THIS
48 prev_ = next_ =
nullptr;
63 if (reader !=
nullptr) {
80 this->reader = reader;
91 this->external = reader;
92 this->writer = writer;
97 envelope = std::string(bytes.
get(), bytes.
length());
103 if (writer !=
nullptr) {
115 std::list<PortReaderPacket*> inactive;
116 std::list<PortReaderPacket*> active;
121 return active.size();
126 return inactive.size();
131 if (inactive.empty()) {
134 inactive.push_back(obj);
138 inactive.remove(next);
145 if (getCount() >= 1) {
146 next = active.front();
163 inactive.push_back(
packet);
169 while (!active.empty()) {
170 delete active.back();
173 while (!inactive.empty()) {
174 delete inactive.back();
181class PortReaderBufferBase::Private
201 std::mutex stateMutex;
222 Port* closePort =
nullptr;
224 if (port !=
nullptr) {
228 if (closePort !=
nullptr) {
238 if (prev !=
nullptr) {
239 pool.addInactivePacket(prev);
247 std::string getName()
249 if (port !=
nullptr) {
259 if (pool.getFree() == 0) {
269 result = pool.getInactivePacket();
277 return (
int)pool.getCount();
282 if (prev !=
nullptr) {
283 pool.addInactivePacket(prev);
286 if (pool.getCount() >= 1) {
287 prev = pool.getActivePacket();
296 if (prev ==
nullptr) {
300 sis.
add(prev->envelope);
304 sbr.reset(sis,
nullptr, route, 0,
true);
313 if (pool.getCount() >= 1) {
314 drop = pool.getActivePacket();
315 if (drop !=
nullptr) {
316 pool.addInactivePacket(drop);
323 void attach(
Port& port)
331 if (prev !=
nullptr) {
339 void release(
void* key)
341 if (key !=
nullptr) {
362 if (mPriv->creator !=
nullptr) {
363 return mPriv->creator->create();
370 mPriv->stateMutex.lock();
371 int count = mPriv->checkContent();
372 mPriv->stateMutex.unlock();
380 mPriv->contentSema.post();
386 if (mPriv->period < 0 || cleanup) {
387 mPriv->contentSema.wait();
391 double target = now + mPriv->period;
392 if (mPriv->last_recv > 0) {
393 target = mPriv->last_recv + mPriv->period;
395 double diff = target - now;
397 ok = mPriv->contentSema.waitWithTimeout(
diff);
399 ok = mPriv->contentSema.check();
401 mPriv->contentSema.wait();
406 if (mPriv->last_recv > 0) {
407 mPriv->last_recv += mPriv->period;
412 if (mPriv->last_recv < 0) {
413 mPriv->last_recv = now;
419 mPriv->last_recv = target;
422 mPriv->stateMutex.lock();
433 mPriv->stateMutex.unlock();
434 if (reader !=
nullptr) {
435 mPriv->consumeSema.post();
449 if (mPriv->replier !=
nullptr) {
451 return mPriv->replier->read(connection);
455 while (reader ==
nullptr) {
456 mPriv->stateMutex.lock();
457 reader = mPriv->get();
458 if ((reader !=
nullptr) && reader->getReader() ==
nullptr) {
464 mPriv->stateMutex.unlock();
465 if (reader ==
nullptr) {
466 mPriv->consumeSema.wait();
472 ok = reader->getReader()->
read(connection);
477 mPriv->port =
nullptr;
480 mPriv->stateMutex.lock();
482 if (mPriv->ct > 0 && mPriv->prune) {
487 mPriv->pool.addActivePacket(reader);
489 mPriv->stateMutex.unlock();
491 mPriv->contentSema.post();
495 mPriv->stateMutex.lock();
496 mPriv->pool.addInactivePacket(reader);
497 mPriv->stateMutex.unlock();
502 mPriv->contentSema.post();
515 mPriv->replier = &reader;
525 mPriv->period = period;
535 return mPriv->maxBuffer;
540 return mPriv->port ==
nullptr;
562 while (reader ==
nullptr) {
563 mPriv->stateMutex.lock();
564 reader = mPriv->get();
565 mPriv->stateMutex.unlock();
566 if (reader ==
nullptr) {
567 mPriv->consumeSema.wait();
571 reader->setExternal(obj,
wrapper);
573 mPriv->stateMutex.lock();
575 if (mPriv->ct > 0 && mPriv->prune) {
580 mPriv->pool.addActivePacket(reader);
582 mPriv->stateMutex.unlock();
584 mPriv->contentSema.post();
597 printf(
"Sorry, forgetting not implemented yet\n");
609 mPriv->stateMutex.lock();
611 mPriv->stateMutex.unlock();
void typedReaderMissingCallback()
A mini-server for performing network communication in the background.
void * acquire() override
Take control of the last object read.
std::string getName() const override
Get name of port.
bool getEnvelope(PortReader &envelope) override
Get the envelope information (e.g., a timestamp) from the last message received on the port.
bool setEnvelope(PortWriter &envelope) override
Set an envelope (e.g., a timestamp) to the next message which will be sent.
void release(void *handle) override
Return control to YARP of an object previously taken control of with the acquire() method.
void setReader(PortReader &reader) override
Set an external reader for port data.
BufferedPort()
Constructor.
T * read(bool shouldWait=true) override
Read an available object from the port.
A simple abstraction for a block of bytes.
An interface for reading from a network connection.
virtual ConnectionWriter * getWriter()=0
Gets a way to reply to the message, if possible.
virtual Portable * getReference() const =0
Get a direct pointer to the object being sent, if possible.
virtual bool isValid() const =0
virtual Bytes readEnvelope()
Read a message envelope, if available.
virtual yarp::os::PortReader * create()
std::string getName() const
PortReaderBufferBase(unsigned int maxBuffer)
virtual bool getEnvelope(PortReader &envelope)
void setPrune(bool flag=true)
virtual bool forgetObjectBase(yarp::os::PortReader *obj, yarp::os::PortWriter *wrapper)
void setCreator(PortReaderBufferBaseCreator *creator)
void setReplier(yarp::os::PortReader &reader)
virtual bool acceptObjectBase(yarp::os::PortReader *obj, yarp::os::PortWriter *wrapper)
Careful! merge with read – very similar code Until merge, don't change one without looking at other :...
void attachBase(yarp::os::Port &port)
bool read(yarp::os::ConnectionReader &connection) override
Read this object from a network connection.
yarp::os::PortReader * readBase(bool &missed, bool cleanup)
void setTargetPeriod(double period)
unsigned int getMaxBuffer()
virtual ~PortReaderBufferBase()
Interface implemented by all objects that can read themselves from the network, such as Bottle object...
virtual bool read(ConnectionReader &reader)=0
Read this object from a network connection.
Interface implemented by all objects that can write themselves to the network, such as Bottle objects...
virtual void onCompletion() const
This is called when the port has finished all writing operations.
A mini-server for network communication.
void setReader(PortReader &reader) override
Set an external reader for port data.
void close() override
Stop port activity.
Information about a connection between two ports.
A class for thread synchronization and mutual exclusion.
static double nowSystem()
static void delaySystem(double seconds)
Lets Readable objects read from the underlying InputStream associated with the connection between two...
#define yCError(component,...)
#define yCAssert(component, x)
#define yCTrace(component,...)
#define yCDebug(component,...)
#define YARP_OS_LOG_COMPONENT(name, name_string)
The components from which ports and connections are built.
An interface to the operating system, including Port based communication.
int diff(std::string contextName, folderType fType, bool verbose)