YARP
Yet Another Robot Platform
PortWriterBufferBase.cpp
Go to the documentation of this file.
1 /*
2  * SPDX-FileCopyrightText: 2006-2021 Istituto Italiano di Tecnologia (IIT)
3  * SPDX-FileCopyrightText: 2006-2010 RobotCub Consortium
4  * SPDX-License-Identifier: BSD-3-Clause
5  */
6 
8 
9 #include <yarp/os/Port.h>
10 #include <yarp/os/Semaphore.h>
13 
14 using namespace yarp::os::impl;
15 using namespace yarp::os;
16 
17 namespace {
18 YARP_OS_LOG_COMPONENT(PORTWRITERBUFFERBASE, "yarp.os.PortWriterBufferBase")
19 } // namespace
20 
21 PortWriterBufferManager::~PortWriterBufferManager() = default;
22 
23 
24 class PortWriterBufferBase::Private : public PortWriterBufferManager
25 {
26 public:
28  owner(owner),
29  stateSema(1),
30  completionSema(0),
31  port(nullptr),
32  current(nullptr),
33  callback(nullptr),
34  finishing(false),
35  outCt(0)
36  {
37  }
38 
39  ~Private() override
40  {
41  release();
42  finishWrites();
43  stateSema.wait();
44  }
45 
46  int getCount()
47  {
48  stateSema.wait();
49  int ct = packets.getCount();
50  stateSema.post();
51  return ct;
52  }
53 
54  void finishWrites()
55  {
56  yCDebug(PORTWRITERBUFFERBASE, "finishing writes");
57  bool done = false;
58  while (!done) {
59  stateSema.wait();
60  if (port != nullptr) {
61  if (!port->isOpen()) {
62  outCt = 0;
63  }
64  }
65  done = (outCt == 0);
66  if (!done) {
67  finishing = true;
68  }
69  stateSema.post();
70  if (!done) {
71  completionSema.wait();
72  }
73  }
74  yCDebug(PORTWRITERBUFFERBASE, "finished writes");
75  }
76 
77  const void* get()
78  {
79  if (callback != nullptr) {
80  // (Safe to check outside mutex)
81  // oops, there is already a prepared and unwritten
82  // object. best remove it.
83  yCDebug(PORTWRITERBUFFERBASE, "releasing unused buffer");
84  release();
85  }
86  stateSema.wait();
87  PortCorePacket* packet = packets.getFreePacket();
88  yCAssert(PORTWRITERBUFFERBASE, packet != nullptr);
89  if (packet->getContent() == nullptr) {
90  yCDebug(PORTWRITERBUFFERBASE, "creating a writer buffer");
91  //packet->setContent(owner.create(*this, packet), true);
92  yarp::os::PortWriterWrapper* wrapper = owner.create(*this, packet);
93  //packet->setContent(wrapper, true);
94  packet->setContent(wrapper->getInternal(), false, wrapper, true);
95  }
96  stateSema.post();
97 
98  current = packet->getContent();
99  callback = packet->getCallback();
100  return callback;
101  }
102 
103  bool release()
104  {
105  stateSema.wait();
106  const PortWriter* cback = callback;
107  current = nullptr;
108  callback = nullptr;
109  stateSema.post();
110  if (cback != nullptr) {
111  stateSema.wait();
112  outCt++;
113  stateSema.post();
114  cback->onCompletion();
115  }
116  return cback != nullptr;
117  }
118 
119  void onCompletion(void* tracker) override
120  {
121  stateSema.wait();
122  yCDebug(PORTWRITERBUFFERBASE, "freeing up a writer buffer");
123  packets.freePacket((PortCorePacket*)tracker, false);
124  outCt--;
125  bool sig = finishing;
126  finishing = false;
127  stateSema.post();
128  if (sig) {
129  completionSema.post();
130  }
131  }
132 
133 
134  void attach(Port& port)
135  {
136  stateSema.wait();
137  this->port = &port;
138  port.enableBackgroundWrite(true);
139  stateSema.post();
140  }
141 
142  void detach()
143  {
144  // nothing to do
145  }
146 
147  void write(bool strict)
148  {
149  if (strict) {
150  finishWrites();
151  }
152  stateSema.wait();
153  const PortWriter* active = current;
154  const PortWriter* cback = callback;
155  current = nullptr;
156  callback = nullptr;
157  stateSema.post();
158  if (active != nullptr && port != nullptr) {
159  stateSema.wait();
160  outCt++;
161  stateSema.post();
162  port->write(*active, cback);
163  }
164  }
165 
166 private:
167  PortWriterBufferBase& owner;
168  PortCorePackets packets;
169  yarp::os::Semaphore stateSema;
170  yarp::os::Semaphore completionSema;
171  Port* port;
172  const PortWriter* current;
173  const PortWriter* callback;
174  bool finishing;
175  int outCt;
176 };
177 
178 
179 
180 
182  mPriv(new Private(*this))
183 {
184 }
185 
187 {
188  delete mPriv;
189 }
190 
192 {
193  return mPriv->get();
194 }
195 
197 {
198  return mPriv->release();
199 }
200 
201 
203 {
204  return mPriv->getCount();
205 }
206 
208 {
209  mPriv->attach(port);
210 }
211 
213 {
214  mPriv->detach();
215 }
216 
218 {
219  mPriv->write(strict);
220 }
221 
223 {
224  mPriv->finishWrites();
225 }
Interface implemented by all objects that can write themselves to the network, such as Bottle objects...
Definition: PortWriter.h:24
virtual bool write(ConnectionWriter &writer) const =0
Write this object to a network connection.
virtual void onCompletion() const
This is called when the port has finished all writing operations.
Definition: PortWriter.cpp:13
A mini-server for network communication.
Definition: Port.h:47
void enableBackgroundWrite(bool backgroundFlag)
control whether writing from this port is done in the background.
Definition: Port.cpp:524
A class for thread synchronization and mutual exclusion.
Definition: Semaphore.h:26
A single message, potentially being transmitted on multiple connections.
void setContent(const yarp::os::PortWriter *writable, bool owned=false, const yarp::os::PortWriter *callback=nullptr, bool ownedCallback=false)
Configure the object being sent and where to send notifications.
const yarp::os::PortWriter * getContent()
const yarp::os::PortWriter * getCallback()
A collection of messages being transmitted over connections.
#define yCAssert(component, x)
Definition: LogComponent.h:169
#define yCDebug(component,...)
Definition: LogComponent.h:109
#define YARP_OS_LOG_COMPONENT(name, name_string)
Definition: LogComponent.h:35
The components from which ports and connections are built.
An interface to the operating system, including Port based communication.