YARP
Yet Another Robot Platform
 
Loading...
Searching...
No Matches
PortWriterBufferBase.cpp
Go to the documentation of this file.
1/*
2 * SPDX-FileCopyrightText: 2006-2021 Istituto Italiano di Tecnologia (IIT)
3 * SPDX-FileCopyrightText: 2006-2010 RobotCub Consortium
4 * SPDX-License-Identifier: BSD-3-Clause
5 */
6
8
9#include <yarp/os/Port.h>
10#include <yarp/os/Semaphore.h>
13
14using namespace yarp::os::impl;
15using namespace yarp::os;
16
17namespace {
18YARP_OS_LOG_COMPONENT(PORTWRITERBUFFERBASE, "yarp.os.PortWriterBufferBase")
19} // namespace
20
21PortWriterBufferManager::~PortWriterBufferManager() = default;
22
23
24class PortWriterBufferBase::Private : public PortWriterBufferManager
25{
26public:
28 owner(owner)
29 {
30 }
31
32 ~Private() override
33 {
34 release();
36 }
37
39 {
40 stateMutex.lock();
41 int ct = packets.getCount();
42 stateMutex.unlock();
43 return ct;
44 }
45
47 {
48 yCDebug(PORTWRITERBUFFERBASE, "finishing writes");
49 bool done = false;
50 while (!done) {
51 stateMutex.lock();
52 if (port != nullptr) {
53 if (!port->isOpen()) {
54 outCt = 0;
55 }
56 }
57 done = (outCt == 0);
58 if (!done) {
59 finishing = true;
60 }
61 stateMutex.unlock();
62 if (!done) {
63 completionSema.wait();
64 }
65 }
66 yCDebug(PORTWRITERBUFFERBASE, "finished writes");
67 }
68
69 const void* get()
70 {
71 if (callback != nullptr) {
72 // (Safe to check outside mutex)
73 // oops, there is already a prepared and unwritten
74 // object. best remove it.
75 yCDebug(PORTWRITERBUFFERBASE, "releasing unused buffer");
76 release();
77 }
78 stateMutex.lock();
79 PortCorePacket* packet = packets.getFreePacket();
80 yCAssert(PORTWRITERBUFFERBASE, packet != nullptr);
81 if (packet->getContent() == nullptr) {
82 yCDebug(PORTWRITERBUFFERBASE, "creating a writer buffer");
83 //packet->setContent(owner.create(*this, packet), true);
84 yarp::os::PortWriterWrapper* wrapper = owner.create(*this, packet);
85 //packet->setContent(wrapper, true);
86 packet->setContent(wrapper->getInternal(), false, wrapper, true);
87 }
88 stateMutex.unlock();
89
90 current = packet->getContent();
91 callback = packet->getCallback();
92 return callback;
93 }
94
95 bool release()
96 {
97 stateMutex.lock();
98 const PortWriter* cback = callback;
99 current = nullptr;
100 callback = nullptr;
101 stateMutex.unlock();
102 if (cback != nullptr) {
103 stateMutex.lock();
104 outCt++;
105 stateMutex.unlock();
106 cback->onCompletion();
107 }
108 return cback != nullptr;
109 }
110
111 void onCompletion(void* tracker) override
112 {
113 stateMutex.lock();
114 yCDebug(PORTWRITERBUFFERBASE, "freeing up a writer buffer");
115 packets.freePacket((PortCorePacket*)tracker, false);
116 outCt--;
117 bool sig = finishing;
118 finishing = false;
119 stateMutex.unlock();
120 if (sig) {
121 completionSema.post();
122 }
123 }
124
125
126 void attach(Port& port)
127 {
128 stateMutex.lock();
129 this->port = &port;
130 port.enableBackgroundWrite(true);
131 stateMutex.unlock();
132 }
133
134 void detach()
135 {
136 // nothing to do
137 }
138
139 void write(bool strict)
140 {
141 if (strict) {
142 finishWrites();
143 }
144 stateMutex.lock();
145 const PortWriter* active = current;
146 const PortWriter* cback = callback;
147 current = nullptr;
148 callback = nullptr;
149 stateMutex.unlock();
150 if (active != nullptr && port != nullptr) {
151 stateMutex.lock();
152 outCt++;
153 stateMutex.unlock();
154 port->write(*active, cback);
155 }
156 }
157
158private:
160 PortCorePackets packets;
161 std::mutex stateMutex;
162 yarp::os::Semaphore completionSema {0};
163 Port* port {nullptr};
164 const PortWriter* current {nullptr};
165 const PortWriter* callback {nullptr};
166 bool finishing {false};
167 int outCt {0};
168};
169
170
171
172
177
182
184{
185 return mPriv->get();
186}
187
189{
190 return mPriv->release();
191}
192
193
195{
196 return mPriv->getCount();
197}
198
200{
201 mPriv->attach(port);
202}
203
205{
206 mPriv->detach();
207}
208
210{
211 mPriv->write(strict);
212}
213
215{
216 mPriv->finishWrites();
217}
A mini-server for performing network communication in the background.
void release(void *handle) override
Return control to YARP of an object previously taken control of with the acquire() method.
void write(bool forceStrict=false)
Write the current object being returned by BufferedPort::prepare.
virtual PortWriterWrapper * create(PortWriterBufferManager &man, void *tracker)=0
Interface implemented by all objects that can write themselves to the network, such as Bottle objects...
Definition PortWriter.h:23
A mini-server for network communication.
Definition Port.h:46
void enableBackgroundWrite(bool backgroundFlag)
control whether writing from this port is done in the background.
Definition Port.cpp:533
bool write(const PortWriter &writer, const PortWriter *callback=nullptr) const override
Write an object to the port.
Definition Port.cpp:436
bool isOpen() const
Check if the port has been opened.
Definition Port.cpp:677
A class for thread synchronization and mutual exclusion.
Definition Semaphore.h:25
void wait()
Decrement the counter, even if we must wait to do that.
Definition Semaphore.cpp:96
void post()
Increment the counter.
A single message, potentially being transmitted on multiple connections.
void setContent(const yarp::os::PortWriter *writable, bool owned=false, const yarp::os::PortWriter *callback=nullptr, bool ownedCallback=false)
Configure the object being sent and where to send notifications.
const yarp::os::PortWriter * getContent()
const yarp::os::PortWriter * getCallback()
A collection of messages being transmitted over connections.
PortCorePacket * getFreePacket()
Get a packet that we can prepare for sending.
void freePacket(PortCorePacket *packet, bool clear=true)
Force the given packet into an inactive state.
#define yCAssert(component, x)
#define yCDebug(component,...)
#define YARP_OS_LOG_COMPONENT(name, name_string)
The components from which ports and connections are built.
An interface to the operating system, including Port based communication.