YARP
Yet Another Robot Platform
PortCoreOutputUnit.cpp
Go to the documentation of this file.
1 /*
2  * SPDX-FileCopyrightText: 2006-2021 Istituto Italiano di Tecnologia (IIT)
3  * SPDX-FileCopyrightText: 2006-2010 RobotCub Consortium
4  * SPDX-License-Identifier: BSD-3-Clause
5  */
6 
8 
9 #include <yarp/os/Name.h>
10 #include <yarp/os/PortInfo.h>
11 #include <yarp/os/PortReport.h>
12 #include <yarp/os/Portable.h>
13 #include <yarp/os/Time.h>
17 
18 namespace {
19 YARP_OS_LOG_COMPONENT(PORTCOREOUTPUTUNIT, "yarp.os.impl.PortCoreOutputUnit")
20 } // namespace
21 
22 using namespace yarp::os::impl;
23 using namespace yarp::os;
24 
26  PortCoreUnit(owner, index),
27  op(op),
28  closing(false),
29  finished(false),
30  running(false),
31  threaded(false),
32  sending(false),
33  phase(1),
34  activate(0),
35  trackerMutex(),
36  cachedWriter(nullptr),
37  cachedReader(nullptr),
38  cachedCallback(nullptr),
39  cachedTracker(nullptr)
40 {
41  yCAssert(PORTCOREOUTPUTUNIT, op != nullptr);
42 }
43 
45 {
46  closeMain();
47 }
48 
49 
51 {
52  phase.wait();
53 
54  if (!threaded) {
55  running = false;
56  sending = false;
58  phase.post();
59  return true;
60  }
61 
62  bool result = PortCoreUnit::start();
63  if (result) {
64  phase.wait();
65  phase.post();
66  } else {
67  phase.post();
68  }
69 
70  return result;
71 }
72 
73 
75 {
76  running = true;
77  sending = false;
78 
79  // By default, we don't start up a thread for outputs.
80 
81  if (!threaded) {
83  phase.post();
84  } else {
85  phase.post();
86  Route r = getRoute();
87  while (!closing) {
88  yCDebug(PORTCOREOUTPUTUNIT, "waiting");
89  activate.wait();
90  yCDebug(PORTCOREOUTPUTUNIT, "woken");
91  if (!closing) {
92  if (sending) {
93  yCDebug(PORTCOREOUTPUTUNIT, "write something in background");
94  sendHelper();
95  yCDebug(PORTCOREOUTPUTUNIT, "wrote something in background");
96  trackerMutex.lock();
97  if (cachedTracker != nullptr) {
98  void* t = cachedTracker;
99  cachedTracker = nullptr;
100  sending = false;
102  } else {
103  sending = false;
104  }
105  trackerMutex.unlock();
106  }
107  }
108  yCDebug(PORTCOREOUTPUTUNIT, "wrote something in background");
109  }
110  yCDebug(PORTCOREOUTPUTUNIT, "thread closing");
111  sending = false;
112  }
113 }
114 
115 
117 {
118  if (op != nullptr) {
119  Route route = op->getRoute();
120  setMode();
121  getOwner().reportUnit(this, true);
122 
123  std::string msg = std::string("Sending output from ") + route.getFromName() + " to " + route.getToName() + " using " + route.getCarrierName();
124  if (Name(route.getToName()).isRooted()) {
125  if (Name(route.getFromName()).isRooted()) {
126  yCInfo(PORTCOREOUTPUTUNIT, "%s", msg.c_str());
127  }
128  }
129 
130  // Report the new connection
131  PortInfo info;
132  info.message = msg;
134  info.incoming = false;
135  info.created = true;
136  info.sourceName = route.getFromName();
137  info.targetName = route.getToName();
138  info.portName = info.sourceName;
139  info.carrierName = route.getCarrierName();
140  getOwner().report(info);
141  }
142 
143  // no thread component
144  running = false;
145 }
146 
147 void PortCoreOutputUnit::closeBasic()
148 {
149  bool waitForOther = false;
150  if (op != nullptr) {
152  Route route = op->getRoute();
153  if (op->getConnection().isConnectionless() || op->getConnection().isBroadcast()) {
154  yCInfo(PORTCOREOUTPUTUNIT, "output for route %s asking other side to close by out-of-band means",
155  route.toString().c_str());
157  route.getFromName(),
158  true);
159  } else {
160  if (op->getConnection().canEscape()) {
162  op->getConnection().isBareMode());
163  PortCommand pc('\0', std::string("q"));
164  pc.write(buf);
165  //printf("Asked for %s to close...\n",
166  // op->getRoute().toString().c_str());
167  waitForOther = op->write(buf);
168  }
169  }
170 
171  std::string msg = std::string("Removing output from ") + route.getFromName() + " to " + route.getToName();
172 
173  if (Name(route.getToName()).isRooted()) {
174  if (Name(route.getFromName()).isRooted()) {
175  yCInfo(PORTCOREOUTPUTUNIT, "%s", msg.c_str());
176  }
177  }
178 
179  getOwner().reportUnit(this, false);
180 
181  // Report the disappearing connection
182  PortInfo info;
183  info.message = msg;
185  info.incoming = false;
186  info.created = false;
187  info.sourceName = route.getFromName();
188  info.targetName = route.getToName();
189  info.portName = info.sourceName;
190  info.carrierName = route.getCarrierName();
191  getOwner().report(info);
192  }
193 
194 
195  if (op != nullptr) {
196  if (waitForOther) {
197  // quit is only acknowledged in certain conditions
198  if (op->getConnection().isTextMode() && op->getConnection().supportReply()) {
199  InputStream& is = op->getInputStream();
200  ManagedBytes dummy(1);
201  is.read(dummy.bytes());
202  }
203  }
204  op->close();
205  delete op;
206  op = nullptr;
207  }
208 }
209 
210 void PortCoreOutputUnit::closeMain()
211 {
212  if (finished) {
213  return;
214  }
215 
216  yCDebug(PORTCOREOUTPUTUNIT, "closing");
217 
218  if (running) {
219  // give a kick (unfortunately unavoidable)
220 
221  if (op != nullptr) {
222  op->interrupt();
223  }
224 
225  closing = true;
226  phase.post();
227  activate.post();
228  join();
229  }
230 
231  yCDebug(PORTCOREOUTPUTUNIT, "internal join");
232 
233  closeBasic();
234  running = false;
235  closing = false;
236  finished = true;
237 
238  yCDebug(PORTCOREOUTPUTUNIT, "closed");
239 }
240 
241 
243 {
244  if (op != nullptr) {
245  Route r = op->getRoute();
246  op->beginWrite();
247  return r;
248  }
249  return PortCoreUnit::getRoute();
250 }
251 
252 bool PortCoreOutputUnit::sendHelper()
253 {
254  bool replied = false;
255  if (op != nullptr) {
256  bool done = false;
258  op->getConnection().isBareMode());
259  if (cachedReader != nullptr) {
260  buf.setReplyHandler(*cachedReader);
261  }
262 
263  if (op->getSender().modifiesOutgoingData()) {
264  if (op->getSender().acceptOutgoingData(*cachedWriter)) {
265  cachedWriter = &op->getSender().modifyOutgoingData(*cachedWriter);
266  } else {
267  return (done = true);
268  }
269  }
270 
271  if (op->getConnection().isLocal()) {
272  // WARNING Cast away const qualifier.
273  // This may actually cause bugs when using the local carrier
274  // with something that is actually const (i.e. that is using
275  // some parts of memory that cannot be written.
276  auto* pw = const_cast<yarp::os::PortWriter*>(cachedWriter);
277  auto* p = dynamic_cast<yarp::os::Portable*>(pw);
278  if (p == nullptr) {
279  yCError(PORTCOREOUTPUTUNIT, "cast failed.");
280  return false;
281  }
282  buf.setReference(p);
283  } else {
284  yCAssert(PORTCOREOUTPUTUNIT, cachedWriter != nullptr);
285  bool ok = cachedWriter->write(buf);
286  if (!ok) {
287  done = true;
288  }
289 
290  bool suppressReply = (buf.getReplyHandler() == nullptr);
291 
292  if (!done) {
293  if (!op->getConnection().canEscape()) {
294  if (!cachedEnvelope.empty()) {
295  op->getConnection().handleEnvelope(cachedEnvelope);
296  }
297  } else {
298  buf.addToHeader();
299 
300  if (!cachedEnvelope.empty()) {
301  if (cachedEnvelope == "__ADMIN") {
302  PortCommand pc('a', "");
303  pc.write(buf);
304  } else {
305  PortCommand pc('\0', std::string(suppressReply ? "D " : "d ") + cachedEnvelope);
306  pc.write(buf);
307  }
308  } else {
309  PortCommand pc(suppressReply ? 'D' : 'd', "");
310  pc.write(buf);
311  }
312  }
313  }
314  }
315 
316  if (!done) {
317  if (op->getConnection().isActive()) {
318  replied = op->write(buf);
319  if (replied && op->getSender().modifiesReply() && cachedReader != nullptr) {
320  cachedReader = &op->getSender().modifyReply(*cachedReader);
321  }
322  }
323  if (!op->isOk()) {
324  done = true;
325  }
326  }
327 
328  if (buf.dropRequested()) {
329  done = true;
330  }
331  if (done) {
332  closeBasic();
333  closing = true;
334  finished = true;
335  setDoomed();
336  }
337  }
338 
339 
340  return replied;
341 }
342 
344  yarp::os::PortReader* reader,
345  const yarp::os::PortWriter* callback,
346  void* tracker,
347  const std::string& envelopeString,
348  bool waitAfter,
349  bool waitBefore,
350  bool* gotReply)
351 {
352  bool replied = false;
353 
354  if (op != nullptr) {
355  if (!op->getConnection().isActive()) {
356  return tracker;
357  }
358  }
359 
360  if (!waitBefore || !waitAfter) {
361  if (!running) {
362  // we must have a thread if we're going to be skipping waits
363  threaded = true;
364  yCDebug(PORTCOREOUTPUTUNIT, "starting a thread for output");
365  start();
366  yCDebug(PORTCOREOUTPUTUNIT, "started a thread for output");
367  }
368  }
369 
370  if ((!waitBefore) && waitAfter) {
371  yCError(PORTCOREOUTPUTUNIT, "chosen port wait combination not yet implemented");
372  }
373  if (!sending) {
374  cachedWriter = &writer;
375  cachedReader = reader;
376  cachedCallback = callback;
377  cachedEnvelope = envelopeString;
378 
379  sending = true;
380  if (waitAfter) {
381  replied = sendHelper();
382  sending = false;
383  } else {
384  trackerMutex.lock();
385  void* nextTracker = tracker;
386  tracker = cachedTracker;
387  cachedTracker = nextTracker;
388  activate.post();
389  trackerMutex.unlock();
390  }
391  } else {
392  yCDebug(PORTCOREOUTPUTUNIT, "skipping connection tagged as sending something");
393  }
394 
395  if (waitAfter) {
396  if (gotReply != nullptr) {
397  *gotReply = replied;
398  }
399  }
400 
401  // return tracker that we no longer need
402  return tracker;
403 }
404 
405 
407 {
408  void* tracker = nullptr;
409  trackerMutex.lock();
410  if (!sending) {
411  tracker = cachedTracker;
412  cachedTracker = nullptr;
413  }
414  trackerMutex.unlock();
415  return tracker;
416 }
417 
419 {
420  return sending;
421 }
422 
424 {
425  if (op != nullptr) {
426  op->getConnection().setCarrierParams(params);
427  }
428 }
429 
431 {
432  if (op != nullptr) {
433  op->getConnection().getCarrierParams(params);
434  }
435 }
436 
438 {
439  return op;
440 }
float t
virtual bool isLocal() const =0
Check if carrier operates within a single process.
virtual bool isTextMode() const =0
Check if carrier is textual in nature.
virtual void prepareDisconnect()=0
Do cleanup and preparation for the coming disconnect, if necessary.
virtual bool isConnectionless() const =0
Check if this carrier is connectionless (like udp, mcast) or connection based (like tcp).
virtual void getCarrierParams(yarp::os::Property &params) const =0
Get carrier configuration and deliver it by port administrative commands.
virtual void handleEnvelope(const std::string &envelope)=0
Carriers that do not distinguish data from administrative headers (i.e.
virtual bool acceptOutgoingData(const PortWriter &writer)=0
Determine whether outgoing data should be accepted.
virtual bool modifiesReply() const =0
Check if this carrier modifies outgoing data through the Carrier::modifyReply method.
virtual PortReader & modifyReply(PortReader &reader)=0
Modify reply payload data, if appropriate.
virtual bool isActive() const =0
Check if carrier is alive and error free.
virtual bool isBareMode() const
Check if carrier excludes type information from payload.
Definition: Connection.cpp:17
virtual bool canEscape() const =0
Check if carrier can encode administrative messages, as opposed to just user data.
virtual const PortWriter & modifyOutgoingData(const PortWriter &writer)=0
Modify outgoing payload data, if appropriate.
virtual bool supportReply() const =0
This flag is used by YARP to determine whether the connection can carry RPC traffic,...
virtual bool modifiesOutgoingData() const =0
Check if this carrier modifies outgoing data through the Carrier::modifyOutgoingData method.
virtual bool isBroadcast() const =0
Check if this carrier uses a broadcast mechanism.
virtual void setCarrierParams(const yarp::os::Property &params)=0
Configure carrier from port administrative commands.
Simple specification of the minimum functions needed from input streams.
Definition: InputStream.h:26
virtual int read()
Read and return a single byte.
Definition: InputStream.cpp:20
An abstraction for a block of bytes, with optional responsibility for allocating/destroying that bloc...
Definition: ManagedBytes.h:22
Simple abstraction for a YARP port name.
Definition: Name.h:19
bool isRooted() const
Check if port name begins with "/".
Definition: Name.cpp:16
static int disconnectInput(const std::string &src, const std::string &dest, bool silent=false)
Sends a disconnection command to the specified port.
Definition: Network.cpp:1552
The output side of an active connection between two ports.
virtual const Route & getRoute() const =0
virtual Connection & getConnection()=0
Get the connection whose protocol operations we are managing.
virtual Connection & getSender()=0
It is possible to chain a basic connection with a modifier.
virtual InputStream & getInputStream()=0
Access the input stream associated with the connection.
virtual void close()=0
Negotiate an end to operations.
virtual void interrupt()=0
virtual void beginWrite()=0
Notify connection that we intend to write to it.
virtual bool isOk() const =0
Check if the connection is valid and can be used.
virtual bool write(SizedWriter &writer)=0
Write a message on the connection.
Information about a port connection or event.
Definition: PortInfo.h:26
std::string targetName
Name of connection target, if any.
Definition: PortInfo.h:63
std::string carrierName
Name of protocol type, if releveant.
Definition: PortInfo.h:66
bool incoming
True if a connection is incoming, false if outgoing.
Definition: PortInfo.h:51
std::string message
A human-readable description of contents.
Definition: PortInfo.h:69
bool created
True if a connection is created, false if destroyed.
Definition: PortInfo.h:54
std::string portName
Name of port.
Definition: PortInfo.h:57
int tag
Type of information.
Definition: PortInfo.h:48
std::string sourceName
Name of connection source, if any.
Definition: PortInfo.h:60
@ PORTINFO_CONNECTION
Information about an incoming or outgoing connection.
Definition: PortInfo.h:40
Interface implemented by all objects that can read themselves from the network, such as Bottle object...
Definition: PortReader.h:25
Interface implemented by all objects that can write themselves to the network, such as Bottle objects...
Definition: PortWriter.h:24
virtual bool write(ConnectionWriter &writer) const =0
Write this object to a network connection.
This is a base class for objects that can be both read from and be written to the YARP network.
Definition: Portable.h:26
A class for storing options and configuration information.
Definition: Property.h:34
Information about a connection between two ports.
Definition: Route.h:29
const std::string & getToName() const
Get the destination of the route.
Definition: Route.cpp:103
const std::string & getCarrierName() const
Get the carrier type of the route.
Definition: Route.cpp:123
std::string toString() const
Render a text form of the route, "source->carrier->dest".
Definition: Route.cpp:138
const std::string & getFromName() const
Get the source of the route.
Definition: Route.cpp:93
void wait()
Decrement the counter, even if we must wait to do that.
Definition: Semaphore.cpp:96
void post()
Increment the counter.
Definition: Semaphore.cpp:111
A helper for creating cached object descriptions.
Simple Readable and Writable object representing a command to a YARP port.
Definition: PortCommand.h:26
void getCarrierParams(yarp::os::Property &params) override
void * takeTracker() override
Reacquire a tracker previously passed via send().
void setCarrierParams(const yarp::os::Property &params) override
Set arbitrary parameters for this connection.
void * send(const yarp::os::PortWriter &writer, yarp::os::PortReader *reader, const yarp::os::PortWriter *callback, void *tracker, const std::string &envelopeString, bool waitAfter, bool waitBefore, bool *gotReply) override
Send a message on the connection.
PortCoreOutputUnit(PortCore &owner, int index, OutputProtocol *op)
Constructor.
bool start() override
Prepare to serve this output.
void run() override
The body of a thread managing background sends.
virtual void runSingleThreaded()
Perform send operations without a separate thread.
This manages a single threaded resource related to a single input or output connection.
Definition: PortCoreUnit.h:27
void setMode()
Check the carrier used for the connection, and see if it has a "log" modifier.
Definition: PortCoreUnit.h:186
void setDoomed()
Request that this connection be shut down as soon as possible.
Definition: PortCoreUnit.h:97
void notifyCompletion(void *tracker)
Call the right onCompletion() after sending message.
Definition: PortCore.cpp:1433
void report(const yarp::os::PortInfo &info)
Handle a port event (connection, disconnection, etc) Generate a description of the connections associ...
Definition: PortCore.cpp:1185
void reportUnit(PortCoreUnit *unit, bool active)
Called by a connection handler with active=true just after it is fully configured,...
Definition: PortCore.cpp:2889
int join(double seconds=-1)
Definition: ThreadImpl.cpp:120
#define yCInfo(component,...)
Definition: LogComponent.h:132
#define yCError(component,...)
Definition: LogComponent.h:154
#define yCAssert(component, x)
Definition: LogComponent.h:169
#define yCDebug(component,...)
Definition: LogComponent.h:109
#define YARP_OS_LOG_COMPONENT(name, name_string)
Definition: LogComponent.h:35
The components from which ports and connections are built.
An interface to the operating system, including Port based communication.