YARP
Yet Another Robot Platform
 
Loading...
Searching...
No Matches
PortCoreAdapter.cpp
Go to the documentation of this file.
1/*
2 * SPDX-FileCopyrightText: 2006-2021 Istituto Italiano di Tecnologia (IIT)
3 * SPDX-FileCopyrightText: 2006-2010 RobotCub Consortium
4 * SPDX-License-Identifier: BSD-3-Clause
5 */
6
8
10#include <yarp/os/Time.h>
12
13namespace {
14YARP_OS_LOG_COMPONENT(PORTCOREADAPTER, "yarp.os.impl.PortCoreAdapter")
15} // namespace
16
21
23{
24 stateMutex.lock();
25 closed = false;
26 opened = true;
27 stateMutex.unlock();
28}
29
31{
32 usedForRead = true;
33}
34
36{
37 usedForWrite = true;
38}
39
41{
42 usedForRpc = true;
43}
44
46{
47 commitToRead = true;
48}
49
51{
52 commitToWrite = true;
53}
54
56{
57 commitToRpc = true;
58}
59
61{
62 if (!readBackground) {
63 stateMutex.lock();
64 closed = true;
65 consume.post();
66 consume.post();
67 stateMutex.unlock();
68 }
69}
70
72{
73 if (isWriting()) {
74 double start = SystemClock::nowSystem();
75 double pause = 0.01;
76 do {
78 pause *= 2;
79 } while (isWriting() && (SystemClock::nowSystem() - start < 3));
80 if (isWriting()) {
81 yCError(PORTCOREADAPTER, "Closing port that was sending data (slowly)");
82 }
83 }
84}
85
86
88{
89 while (produce.check()) {
90 }
91 while (readBlock.check()) {
92 }
93 resume();
94 readBlock.post();
95}
96
98{
99 if (permanentReadDelegate != nullptr) {
100 bool result = permanentReadDelegate->read(reader);
101 return result;
102 }
103
104 // called by comms code
105 readBlock.wait();
106
107 if (!reader.isValid()) {
108 // interrupt
109 stateMutex.lock();
110 if (readDelegate != nullptr) {
111 readResult = readDelegate->read(reader);
112 }
113 stateMutex.unlock();
114 produce.post();
115 readBlock.post();
116 return false;
117 }
118
119 if (closed) {
120 yCDebug(PORTCOREADAPTER, "Port::read shutting down");
121 readBlock.post();
122 return false;
123 }
124
125 // wait for happy consumer - don't want to miss a packet
126 if (!readBackground) {
127 consume.wait();
128 }
129
130 stateMutex.lock();
131 readResult = false;
132 if (readDelegate != nullptr) {
133 readResult = readDelegate->read(reader);
134 } else {
135 // read and ignore
136 yCDebug(PORTCOREADAPTER, "data received in Port, no reader for it");
137 Bottle b;
138 b.read(reader);
139 }
140 if (!readBackground) {
141 readDelegate = nullptr;
142 writeDelegate = nullptr;
143 }
144 bool result = readResult;
145 stateMutex.unlock();
146 if (!readBackground) {
147 produce.post();
148 }
149 if (result && willReply) {
150 consume.wait();
151 if (closed) {
152 yCDebug(PORTCOREADAPTER, "Port::read shutting down");
153 readBlock.post();
154 return false;
155 }
156 if (writeDelegate != nullptr) {
157 stateMutex.lock();
158 ConnectionWriter* writer = reader.getWriter();
159 if (writer != nullptr) {
160 result = readResult = writeDelegate->write(*writer);
161 }
162 stateMutex.unlock();
163 }
164 if (dropDue) {
165 reader.requestDrop();
166 }
167 produce.post();
168 }
169 readBlock.post();
170 return result;
171}
172
174{
175 // called by user
176
177 // user claimed they would reply to last read, but then
178 // decided not to.
179 if (replyDue) {
181 reply(emptyMessage, false, false);
182 replyDue = false;
183 dropDue = false;
184 }
185 if (willReply) {
186 replyDue = true;
187 }
188
189 stateMutex.lock();
190 readActive = true;
191 readDelegate = &reader;
192 checkType(reader);
193 writeDelegate = nullptr;
194 this->willReply = willReply;
195 consume.post(); // happy consumer
196 stateMutex.unlock();
197
198 produce.wait();
199 stateMutex.lock();
200 if (!readBackground) {
201 readDelegate = nullptr;
202 }
203 bool result = readResult;
204 if (!result) {
205 replyDue = false;
206 }
207 stateMutex.unlock();
208 return result;
209}
210
211bool yarp::os::impl::PortCoreAdapter::reply(PortWriter& writer, bool drop, bool /*interrupted*/)
212{
213 // send reply even if interrupt has happened in interim
214 if (!replyDue) {
215 return false;
216 }
217
218 replyDue = false;
219 dropDue = drop;
220 writeDelegate = &writer;
221 consume.post();
222 produce.wait();
223 bool result = readResult;
224 return result;
225}
226
227/*
228 Configuration of a port that should be remembered
229 between opens and closes
230*/
231
233{
234 stateMutex.lock();
235 readActive = true;
236 readBackground = true;
237 readDelegate = &reader;
238 permanentReadDelegate = &reader;
239 checkType(reader);
240 consume.post(); // just do this once
241 stateMutex.unlock();
242}
243
245{
246 stateMutex.lock();
247 adminReadDelegate = &reader;
248 setAdminReadHandler(reader);
249 stateMutex.unlock();
250}
251
253{
254 recReadCreator = &creator;
255 setReadCreator(creator);
256}
257
259{
260 if (waitAfterSend && isManual()) {
261 yCError(PORTCOREADAPTER, "Cannot use background-mode writes on a fake port");
262 }
263 recWaitAfterSend = waitAfterSend ? 1 : 0;
264 setWaitAfterSend(waitAfterSend);
265}
266
268{
269 recCallbackLock = lock;
270 haveCallbackLock = true;
271 return setCallbackLock(lock);
272}
273
275{
276 recCallbackLock = nullptr;
277 haveCallbackLock = false;
278 return removeCallbackLock();
279}
280
285
290
295
297{
298 return recWaitAfterSend;
299}
300
301
303{
304 return opened;
305}
306
308{
309 this->opened = opened;
310}
311
313{
314 includeNode = flag;
315}
A simple collection of objects that can be described and transmitted in a portable way.
Definition Bottle.h:64
bool read(ConnectionReader &reader) override
Set the bottle's value based on input from a network connection.
Definition Bottle.cpp:240
A mini-server for performing network communication in the background.
T * read(bool shouldWait=true) override
Read an available object from the port.
void write(bool forceStrict=false)
Write the current object being returned by BufferedPort::prepare.
An interface for reading from a network connection.
virtual void requestDrop()=0
Tag the connection to be dropped after the current message.
virtual ConnectionWriter * getWriter()=0
Gets a way to reply to the message, if possible.
virtual bool isValid() const =0
An interface for writing to a network connection.
A creator for readers.
Interface implemented by all objects that can read themselves from the network, such as Bottle object...
Definition PortReader.h:24
Interface implemented by all objects that can write themselves to the network, such as Bottle objects...
Definition PortWriter.h:23
A mini-server for network communication.
Definition Port.h:46
static double nowSystem()
static void delaySystem(double seconds)
PortReaderCreator * checkReadCreator()
void configWaitAfterSend(bool waitAfterSend)
void configReadCreator(PortReaderCreator &creator)
void configAdminReader(PortReader &reader)
bool configCallbackLock(std::mutex *lock)
bool reply(PortWriter &writer, bool drop, bool interrupted)
void configReader(PortReader &reader)
bool read(ConnectionReader &reader) override
Callback for data.
void setContactable(Contactable *contactable)
Definition PortCore.h:290
#define yCError(component,...)
#define yCDebug(component,...)
#define YARP_OS_LOG_COMPONENT(name, name_string)