YARP
Yet Another Robot Platform
PortWriterBufferBase.cpp
Go to the documentation of this file.
1 /*
2  * Copyright (C) 2006-2020 Istituto Italiano di Tecnologia (IIT)
3  * Copyright (C) 2006-2010 RobotCub Consortium
4  * All rights reserved.
5  *
6  * This software may be modified and distributed under the terms of the
7  * BSD-3-Clause license. See the accompanying LICENSE file for details.
8  */
9 
11 
12 #include <yarp/os/Port.h>
13 #include <yarp/os/Semaphore.h>
16 
17 using namespace yarp::os::impl;
18 using namespace yarp::os;
19 
20 namespace {
21 YARP_OS_LOG_COMPONENT(PORTWRITERBUFFERBASE, "yarp.os.PortWriterBufferBase")
22 } // namespace
23 
24 PortWriterBufferManager::~PortWriterBufferManager() = default;
25 
26 
27 class PortWriterBufferBase::Private : public PortWriterBufferManager
28 {
29 public:
31  owner(owner),
32  stateSema(1),
33  completionSema(0),
34  port(nullptr),
35  current(nullptr),
36  callback(nullptr),
37  finishing(false),
38  outCt(0)
39  {
40  }
41 
42  ~Private() override
43  {
44  release();
45  finishWrites();
46  stateSema.wait();
47  }
48 
49  int getCount()
50  {
51  stateSema.wait();
52  int ct = packets.getCount();
53  stateSema.post();
54  return ct;
55  }
56 
57  void finishWrites()
58  {
59  yCDebug(PORTWRITERBUFFERBASE, "finishing writes");
60  bool done = false;
61  while (!done) {
62  stateSema.wait();
63  if (port != nullptr) {
64  if (!port->isOpen()) {
65  outCt = 0;
66  }
67  }
68  done = (outCt == 0);
69  if (!done) {
70  finishing = true;
71  }
72  stateSema.post();
73  if (!done) {
74  completionSema.wait();
75  }
76  }
77  yCDebug(PORTWRITERBUFFERBASE, "finished writes");
78  }
79 
80  const void* get()
81  {
82  if (callback != nullptr) {
83  // (Safe to check outside mutex)
84  // oops, there is already a prepared and unwritten
85  // object. best remove it.
86  yCDebug(PORTWRITERBUFFERBASE, "releasing unused buffer");
87  release();
88  }
89  stateSema.wait();
90  PortCorePacket* packet = packets.getFreePacket();
91  yCAssert(PORTWRITERBUFFERBASE, packet != nullptr);
92  if (packet->getContent() == nullptr) {
93  yCDebug(PORTWRITERBUFFERBASE, "creating a writer buffer");
94  //packet->setContent(owner.create(*this, packet), true);
95  yarp::os::PortWriterWrapper* wrapper = owner.create(*this, packet);
96  //packet->setContent(wrapper, true);
97  packet->setContent(wrapper->getInternal(), false, wrapper, true);
98  }
99  stateSema.post();
100 
101  current = packet->getContent();
102  callback = packet->getCallback();
103  return callback;
104  }
105 
106  bool release()
107  {
108  stateSema.wait();
109  const PortWriter* cback = callback;
110  current = nullptr;
111  callback = nullptr;
112  stateSema.post();
113  if (cback != nullptr) {
114  stateSema.wait();
115  outCt++;
116  stateSema.post();
117  cback->onCompletion();
118  }
119  return cback != nullptr;
120  }
121 
122  void onCompletion(void* tracker) override
123  {
124  stateSema.wait();
125  yCDebug(PORTWRITERBUFFERBASE, "freeing up a writer buffer");
126  packets.freePacket((PortCorePacket*)tracker, false);
127  outCt--;
128  bool sig = finishing;
129  finishing = false;
130  stateSema.post();
131  if (sig) {
132  completionSema.post();
133  }
134  }
135 
136 
137  void attach(Port& port)
138  {
139  stateSema.wait();
140  this->port = &port;
141  port.enableBackgroundWrite(true);
142  stateSema.post();
143  }
144 
145  void detach()
146  {
147  // nothing to do
148  }
149 
150  void write(bool strict)
151  {
152  if (strict) {
153  finishWrites();
154  }
155  stateSema.wait();
156  const PortWriter* active = current;
157  const PortWriter* cback = callback;
158  current = nullptr;
159  callback = nullptr;
160  stateSema.post();
161  if (active != nullptr && port != nullptr) {
162  stateSema.wait();
163  outCt++;
164  stateSema.post();
165  port->write(*active, cback);
166  }
167  }
168 
169 private:
170  PortWriterBufferBase& owner;
171  PortCorePackets packets;
172  yarp::os::Semaphore stateSema;
173  yarp::os::Semaphore completionSema;
174  Port* port;
175  const PortWriter* current;
176  const PortWriter* callback;
177  bool finishing;
178  int outCt;
179 };
180 
181 
182 
183 
185  mPriv(new Private(*this))
186 {
187 }
188 
190 {
191  delete mPriv;
192 }
193 
195 {
196  return mPriv->get();
197 }
198 
200 {
201  return mPriv->release();
202 }
203 
204 
206 {
207  return mPriv->getCount();
208 }
209 
211 {
212  mPriv->attach(port);
213 }
214 
216 {
217  mPriv->detach();
218 }
219 
221 {
222  mPriv->write(strict);
223 }
224 
226 {
227  mPriv->finishWrites();
228 }
Interface implemented by all objects that can write themselves to the network, such as Bottle objects...
Definition: PortWriter.h:27
virtual bool write(ConnectionWriter &writer) const =0
Write this object to a network connection.
virtual void onCompletion() const
This is called when the port has finished all writing operations.
Definition: PortWriter.cpp:16
A mini-server for network communication.
Definition: Port.h:50
void enableBackgroundWrite(bool backgroundFlag)
control whether writing from this port is done in the background.
Definition: Port.cpp:527
A class for thread synchronization and mutual exclusion.
Definition: Semaphore.h:29
A single message, potentially being transmitted on multiple connections.
void setContent(const yarp::os::PortWriter *writable, bool owned=false, const yarp::os::PortWriter *callback=nullptr, bool ownedCallback=false)
Configure the object being sent and where to send notifications.
const yarp::os::PortWriter * getContent()
const yarp::os::PortWriter * getCallback()
A collection of messages being transmitted over connections.
#define yCAssert(component, x)
Definition: LogComponent.h:172
#define yCDebug(component,...)
Definition: LogComponent.h:112
#define YARP_OS_LOG_COMPONENT(name, name_string)
Definition: LogComponent.h:37
The components from which ports and connections are built.
An interface to the operating system, including Port based communication.