YARP
Yet Another Robot Platform
PortCoreAdapter.cpp
Go to the documentation of this file.
1 /*
2  * SPDX-FileCopyrightText: 2006-2021 Istituto Italiano di Tecnologia (IIT)
3  * SPDX-FileCopyrightText: 2006-2010 RobotCub Consortium
4  * SPDX-License-Identifier: BSD-3-Clause
5  */
6 
8 
9 #include <yarp/os/PortReader.h>
10 #include <yarp/os/Time.h>
12 
13 namespace {
14 YARP_OS_LOG_COMPONENT(PORTCOREADAPTER, "yarp.os.impl.PortCoreAdapter")
15 } // namespace
16 
18 {
19  setContactable(&owner);
20 }
21 
23 {
24  stateMutex.lock();
25  closed = false;
26  opened = true;
27  stateMutex.unlock();
28 }
29 
31 {
32  usedForRead = true;
33 }
34 
36 {
37  usedForWrite = true;
38 }
39 
41 {
42  usedForRpc = true;
43 }
44 
46 {
47  commitToRead = true;
48 }
49 
51 {
52  commitToWrite = true;
53 }
54 
56 {
57  commitToRpc = true;
58 }
59 
61 {
62  if (!readBackground) {
63  stateMutex.lock();
64  closed = true;
65  consume.post();
66  consume.post();
67  stateMutex.unlock();
68  }
69 }
70 
72 {
73  if (isWriting()) {
74  double start = SystemClock::nowSystem();
75  double pause = 0.01;
76  do {
78  pause *= 2;
79  } while (isWriting() && (SystemClock::nowSystem() - start < 3));
80  if (isWriting()) {
81  yCError(PORTCOREADAPTER, "Closing port that was sending data (slowly)");
82  }
83  }
84 }
85 
86 
88 {
89  while (produce.check()) {
90  }
91  while (readBlock.check()) {
92  }
93  resume();
94  readBlock.post();
95 }
96 
98 {
99  if (permanentReadDelegate != nullptr) {
100  bool result = permanentReadDelegate->read(reader);
101  return result;
102  }
103 
104  // called by comms code
105  readBlock.wait();
106 
107  if (!reader.isValid()) {
108  // interrupt
109  stateMutex.lock();
110  if (readDelegate != nullptr) {
111  readResult = readDelegate->read(reader);
112  }
113  stateMutex.unlock();
114  produce.post();
115  readBlock.post();
116  return false;
117  }
118 
119  if (closed) {
120  yCDebug(PORTCOREADAPTER, "Port::read shutting down");
121  readBlock.post();
122  return false;
123  }
124 
125  // wait for happy consumer - don't want to miss a packet
126  if (!readBackground) {
127  consume.wait();
128  }
129 
130  stateMutex.lock();
131  readResult = false;
132  if (readDelegate != nullptr) {
133  readResult = readDelegate->read(reader);
134  } else {
135  // read and ignore
136  yCDebug(PORTCOREADAPTER, "data received in Port, no reader for it");
137  Bottle b;
138  b.read(reader);
139  }
140  if (!readBackground) {
141  readDelegate = nullptr;
142  writeDelegate = nullptr;
143  }
144  bool result = readResult;
145  stateMutex.unlock();
146  if (!readBackground) {
147  produce.post();
148  }
149  if (result && willReply) {
150  consume.wait();
151  if (closed) {
152  yCDebug(PORTCOREADAPTER, "Port::read shutting down");
153  readBlock.post();
154  return false;
155  }
156  if (writeDelegate != nullptr) {
157  stateMutex.lock();
158  ConnectionWriter* writer = reader.getWriter();
159  if (writer != nullptr) {
160  result = readResult = writeDelegate->write(*writer);
161  }
162  stateMutex.unlock();
163  }
164  if (dropDue) {
165  reader.requestDrop();
166  }
167  produce.post();
168  }
169  readBlock.post();
170  return result;
171 }
172 
174 {
175  // called by user
176 
177  // user claimed they would reply to last read, but then
178  // decided not to.
179  if (replyDue) {
180  Bottle emptyMessage;
181  reply(emptyMessage, false, false);
182  replyDue = false;
183  dropDue = false;
184  }
185  if (willReply) {
186  replyDue = true;
187  }
188 
189  stateMutex.lock();
190  readActive = true;
191  readDelegate = &reader;
192  checkType(reader);
193  writeDelegate = nullptr;
194  this->willReply = willReply;
195  consume.post(); // happy consumer
196  stateMutex.unlock();
197 
198  produce.wait();
199  stateMutex.lock();
200  if (!readBackground) {
201  readDelegate = nullptr;
202  }
203  bool result = readResult;
204  if (!result) {
205  replyDue = false;
206  }
207  stateMutex.unlock();
208  return result;
209 }
210 
211 bool yarp::os::impl::PortCoreAdapter::reply(PortWriter& writer, bool drop, bool /*interrupted*/)
212 {
213  // send reply even if interrupt has happened in interim
214  if (!replyDue) {
215  return false;
216  }
217 
218  replyDue = false;
219  dropDue = drop;
220  writeDelegate = &writer;
221  consume.post();
222  produce.wait();
223  bool result = readResult;
224  return result;
225 }
226 
227 /*
228  Configuration of a port that should be remembered
229  between opens and closes
230 */
231 
233 {
234  stateMutex.lock();
235  readActive = true;
236  readBackground = true;
237  readDelegate = &reader;
238  permanentReadDelegate = &reader;
239  checkType(reader);
240  consume.post(); // just do this once
241  stateMutex.unlock();
242 }
243 
245 {
246  stateMutex.lock();
247  adminReadDelegate = &reader;
248  setAdminReadHandler(reader);
249  stateMutex.unlock();
250 }
251 
253 {
254  recReadCreator = &creator;
255  setReadCreator(creator);
256 }
257 
259 {
260  if (waitAfterSend && isManual()) {
261  yCError(PORTCOREADAPTER, "Cannot use background-mode writes on a fake port");
262  }
263  recWaitAfterSend = waitAfterSend ? 1 : 0;
264  setWaitAfterSend(waitAfterSend);
265 }
266 
267 #ifndef YARP_NO_DEPRECATED // Since YARP 3.3
271 {
272  recCallbackLock = nullptr;
273  old_recCallbackLock = lock;
274  haveCallbackLock = true;
275  return setCallbackLock(lock);
276 }
278 #endif
279 
281 {
282  recCallbackLock = lock;
283 #ifndef YARP_NO_DEPRECATED // Since YARP 3.3
284  old_recCallbackLock = nullptr;
285 #endif
286  haveCallbackLock = true;
287  return setCallbackLock(lock);
288 }
289 
291 {
292  recCallbackLock = nullptr;
293 #ifndef YARP_NO_DEPRECATED // Since YARP 3.3
294  old_recCallbackLock = nullptr;
295 #endif
296  haveCallbackLock = false;
297  return removeCallbackLock();
298 }
299 
301 {
302  return readDelegate;
303 }
304 
306 {
307  return adminReadDelegate;
308 }
309 
311 {
312  return recReadCreator;
313 }
314 
316 {
317  return recWaitAfterSend;
318 }
319 
320 
322 {
323  return opened;
324 }
325 
327 {
328  this->opened = opened;
329 }
330 
332 {
333  includeNode = flag;
334 }
A simple collection of objects that can be described and transmitted in a portable way.
Definition: Bottle.h:74
bool read(ConnectionReader &reader) override
Set the bottle's value based on input from a network connection.
Definition: Bottle.cpp:240
An interface for reading from a network connection.
virtual void requestDrop()=0
Tag the connection to be dropped after the current message.
virtual bool isValid() const =0
virtual ConnectionWriter * getWriter()=0
Gets a way to reply to the message, if possible.
An interface for writing to a network connection.
Basic wrapper for mutual exclusion.
Definition: Mutex.h:32
A creator for readers.
Interface implemented by all objects that can read themselves from the network, such as Bottle object...
Definition: PortReader.h:25
Interface implemented by all objects that can write themselves to the network, such as Bottle objects...
Definition: PortWriter.h:24
A mini-server for network communication.
Definition: Port.h:47
static double nowSystem()
Definition: SystemClock.cpp:34
static void delaySystem(double seconds)
Definition: SystemClock.cpp:29
PortReaderCreator * checkReadCreator()
void configWaitAfterSend(bool waitAfterSend)
void configReadCreator(PortReaderCreator &creator)
void configAdminReader(PortReader &reader)
bool reply(PortWriter &writer, bool drop, bool interrupted)
void configReader(PortReader &reader)
bool read(ConnectionReader &reader) override
Callback for data.
void setContactable(Contactable *contactable)
Definition: PortCore.h:298
#define yCError(component,...)
Definition: LogComponent.h:154
#define yCDebug(component,...)
Definition: LogComponent.h:109
#define YARP_OS_LOG_COMPONENT(name, name_string)
Definition: LogComponent.h:34
#define YARP_WARNING_POP
Ends a temporary alteration of the enabled warnings.
Definition: system.h:332
#define YARP_WARNING_PUSH
Starts a temporary alteration of the enabled warnings.
Definition: system.h:331
#define YARP_DISABLE_DEPRECATED_WARNING
Disable deprecated warnings in the following code.
Definition: system.h:333