YARP
Yet Another Robot Platform
PortCoreOutputUnit.cpp
Go to the documentation of this file.
1 /*
2  * Copyright (C) 2006-2020 Istituto Italiano di Tecnologia (IIT)
3  * Copyright (C) 2006-2010 RobotCub Consortium
4  * All rights reserved.
5  *
6  * This software may be modified and distributed under the terms of the
7  * BSD-3-Clause license. See the accompanying LICENSE file for details.
8  */
9 
11 
12 #include <yarp/os/Name.h>
13 #include <yarp/os/PortInfo.h>
14 #include <yarp/os/PortReport.h>
15 #include <yarp/os/Portable.h>
16 #include <yarp/os/Time.h>
20 
21 namespace {
22 YARP_OS_LOG_COMPONENT(PORTCOREOUTPUTUNIT, "yarp.os.impl.PortCoreOutputUnit")
23 } // namespace
24 
25 using namespace yarp::os::impl;
26 using namespace yarp::os;
27 
29  PortCoreUnit(owner, index),
30  op(op),
31  closing(false),
32  finished(false),
33  running(false),
34  threaded(false),
35  sending(false),
36  phase(1),
37  activate(0),
38  trackerMutex(),
39  cachedWriter(nullptr),
40  cachedReader(nullptr),
41  cachedCallback(nullptr),
42  cachedTracker(nullptr)
43 {
44  yCAssert(PORTCOREOUTPUTUNIT, op != nullptr);
45 }
46 
48 {
49  closeMain();
50 }
51 
52 
54 {
55  phase.wait();
56 
57  if (!threaded) {
58  running = false;
59  sending = false;
61  phase.post();
62  return true;
63  }
64 
65  bool result = PortCoreUnit::start();
66  if (result) {
67  phase.wait();
68  phase.post();
69  } else {
70  phase.post();
71  }
72 
73  return result;
74 }
75 
76 
78 {
79  running = true;
80  sending = false;
81 
82  // By default, we don't start up a thread for outputs.
83 
84  if (!threaded) {
86  phase.post();
87  } else {
88  phase.post();
89  Route r = getRoute();
90  while (!closing) {
91  yCDebug(PORTCOREOUTPUTUNIT, "waiting");
92  activate.wait();
93  yCDebug(PORTCOREOUTPUTUNIT, "woken");
94  if (!closing) {
95  if (sending) {
96  yCDebug(PORTCOREOUTPUTUNIT, "write something in background");
97  sendHelper();
98  yCDebug(PORTCOREOUTPUTUNIT, "wrote something in background");
99  trackerMutex.lock();
100  if (cachedTracker != nullptr) {
101  void* t = cachedTracker;
102  cachedTracker = nullptr;
103  sending = false;
105  } else {
106  sending = false;
107  }
108  trackerMutex.unlock();
109  }
110  }
111  yCDebug(PORTCOREOUTPUTUNIT, "wrote something in background");
112  }
113  yCDebug(PORTCOREOUTPUTUNIT, "thread closing");
114  sending = false;
115  }
116 }
117 
118 
120 {
121  if (op != nullptr) {
122  Route route = op->getRoute();
123  setMode();
124  getOwner().reportUnit(this, true);
125 
126  std::string msg = std::string("Sending output from ") + route.getFromName() + " to " + route.getToName() + " using " + route.getCarrierName();
127  if (Name(route.getToName()).isRooted()) {
128  if (Name(route.getFromName()).isRooted()) {
129  yCInfo(PORTCOREOUTPUTUNIT, "%s", msg.c_str());
130  }
131  }
132 
133  // Report the new connection
134  PortInfo info;
135  info.message = msg;
137  info.incoming = false;
138  info.created = true;
139  info.sourceName = route.getFromName();
140  info.targetName = route.getToName();
141  info.portName = info.sourceName;
142  info.carrierName = route.getCarrierName();
143  getOwner().report(info);
144  }
145 
146  // no thread component
147  running = false;
148 }
149 
150 void PortCoreOutputUnit::closeBasic()
151 {
152  bool waitForOther = false;
153  if (op != nullptr) {
155  Route route = op->getRoute();
156  if (op->getConnection().isConnectionless() || op->getConnection().isBroadcast()) {
157  yCInfo(PORTCOREOUTPUTUNIT, "output for route %s asking other side to close by out-of-band means",
158  route.toString().c_str());
160  route.getFromName(),
161  true);
162  } else {
163  if (op->getConnection().canEscape()) {
165  op->getConnection().isBareMode());
166  PortCommand pc('\0', std::string("q"));
167  pc.write(buf);
168  //printf("Asked for %s to close...\n",
169  // op->getRoute().toString().c_str());
170  waitForOther = op->write(buf);
171  }
172  }
173 
174  std::string msg = std::string("Removing output from ") + route.getFromName() + " to " + route.getToName();
175 
176  if (Name(route.getToName()).isRooted()) {
177  if (Name(route.getFromName()).isRooted()) {
178  yCInfo(PORTCOREOUTPUTUNIT, "%s", msg.c_str());
179  }
180  }
181 
182  getOwner().reportUnit(this, false);
183 
184  // Report the disappearing connection
185  PortInfo info;
186  info.message = msg;
188  info.incoming = false;
189  info.created = false;
190  info.sourceName = route.getFromName();
191  info.targetName = route.getToName();
192  info.portName = info.sourceName;
193  info.carrierName = route.getCarrierName();
194  getOwner().report(info);
195  }
196 
197 
198  if (op != nullptr) {
199  if (waitForOther) {
200  // quit is only acknowledged in certain conditions
201  if (op->getConnection().isTextMode() && op->getConnection().supportReply()) {
202  InputStream& is = op->getInputStream();
203  ManagedBytes dummy(1);
204  is.read(dummy.bytes());
205  }
206  }
207  op->close();
208  delete op;
209  op = nullptr;
210  }
211 }
212 
213 void PortCoreOutputUnit::closeMain()
214 {
215  if (finished) {
216  return;
217  }
218 
219  yCDebug(PORTCOREOUTPUTUNIT, "closing");
220 
221  if (running) {
222  // give a kick (unfortunately unavoidable)
223 
224  if (op != nullptr) {
225  op->interrupt();
226  }
227 
228  closing = true;
229  phase.post();
230  activate.post();
231  join();
232  }
233 
234  yCDebug(PORTCOREOUTPUTUNIT, "internal join");
235 
236  closeBasic();
237  running = false;
238  closing = false;
239  finished = true;
240 
241  yCDebug(PORTCOREOUTPUTUNIT, "closed");
242 }
243 
244 
246 {
247  if (op != nullptr) {
248  Route r = op->getRoute();
249  op->beginWrite();
250  return r;
251  }
252  return PortCoreUnit::getRoute();
253 }
254 
255 bool PortCoreOutputUnit::sendHelper()
256 {
257  bool replied = false;
258  if (op != nullptr) {
259  bool done = false;
261  op->getConnection().isBareMode());
262  if (cachedReader != nullptr) {
263  buf.setReplyHandler(*cachedReader);
264  }
265 
266  if (op->getSender().modifiesOutgoingData()) {
267  if (op->getSender().acceptOutgoingData(*cachedWriter)) {
268  cachedWriter = &op->getSender().modifyOutgoingData(*cachedWriter);
269  } else {
270  return (done = true);
271  }
272  }
273 
274  if (op->getConnection().isLocal()) {
275  // WARNING Cast away const qualifier.
276  // This may actually cause bugs when using the local carrier
277  // with something that is actually const (i.e. that is using
278  // some parts of memory that cannot be written.
279  auto* pw = const_cast<yarp::os::PortWriter*>(cachedWriter);
280  auto* p = dynamic_cast<yarp::os::Portable*>(pw);
281  if (p == nullptr) {
282  yCError(PORTCOREOUTPUTUNIT, "cast failed.");
283  return false;
284  }
285  buf.setReference(p);
286  } else {
287  yCAssert(PORTCOREOUTPUTUNIT, cachedWriter != nullptr);
288  bool ok = cachedWriter->write(buf);
289  if (!ok) {
290  done = true;
291  }
292 
293  bool suppressReply = (buf.getReplyHandler() == nullptr);
294 
295  if (!done) {
296  if (!op->getConnection().canEscape()) {
297  if (!cachedEnvelope.empty()) {
298  op->getConnection().handleEnvelope(cachedEnvelope);
299  }
300  } else {
301  buf.addToHeader();
302 
303  if (!cachedEnvelope.empty()) {
304  if (cachedEnvelope == "__ADMIN") {
305  PortCommand pc('a', "");
306  pc.write(buf);
307  } else {
308  PortCommand pc('\0', std::string(suppressReply ? "D " : "d ") + cachedEnvelope);
309  pc.write(buf);
310  }
311  } else {
312  PortCommand pc(suppressReply ? 'D' : 'd', "");
313  pc.write(buf);
314  }
315  }
316  }
317  }
318 
319  if (!done) {
320  if (op->getConnection().isActive()) {
321  replied = op->write(buf);
322  if (replied && op->getSender().modifiesReply() && cachedReader != nullptr) {
323  cachedReader = &op->getSender().modifyReply(*cachedReader);
324  }
325  }
326  if (!op->isOk()) {
327  done = true;
328  }
329  }
330 
331  if (buf.dropRequested()) {
332  done = true;
333  }
334  if (done) {
335  closeBasic();
336  closing = true;
337  finished = true;
338  setDoomed();
339  }
340  }
341 
342 
343  return replied;
344 }
345 
347  yarp::os::PortReader* reader,
348  const yarp::os::PortWriter* callback,
349  void* tracker,
350  const std::string& envelopeString,
351  bool waitAfter,
352  bool waitBefore,
353  bool* gotReply)
354 {
355  bool replied = false;
356 
357  if (op != nullptr) {
358  if (!op->getConnection().isActive()) {
359  return tracker;
360  }
361  }
362 
363  if (!waitBefore || !waitAfter) {
364  if (!running) {
365  // we must have a thread if we're going to be skipping waits
366  threaded = true;
367  yCDebug(PORTCOREOUTPUTUNIT, "starting a thread for output");
368  start();
369  yCDebug(PORTCOREOUTPUTUNIT, "started a thread for output");
370  }
371  }
372 
373  if ((!waitBefore) && waitAfter) {
374  yCError(PORTCOREOUTPUTUNIT, "chosen port wait combination not yet implemented");
375  }
376  if (!sending) {
377  cachedWriter = &writer;
378  cachedReader = reader;
379  cachedCallback = callback;
380  cachedEnvelope = envelopeString;
381 
382  sending = true;
383  if (waitAfter) {
384  replied = sendHelper();
385  sending = false;
386  } else {
387  trackerMutex.lock();
388  void* nextTracker = tracker;
389  tracker = cachedTracker;
390  cachedTracker = nextTracker;
391  activate.post();
392  trackerMutex.unlock();
393  }
394  } else {
395  yCDebug(PORTCOREOUTPUTUNIT, "skipping connection tagged as sending something");
396  }
397 
398  if (waitAfter) {
399  if (gotReply != nullptr) {
400  *gotReply = replied;
401  }
402  }
403 
404  // return tracker that we no longer need
405  return tracker;
406 }
407 
408 
410 {
411  void* tracker = nullptr;
412  trackerMutex.lock();
413  if (!sending) {
414  tracker = cachedTracker;
415  cachedTracker = nullptr;
416  }
417  trackerMutex.unlock();
418  return tracker;
419 }
420 
422 {
423  return sending;
424 }
425 
427 {
428  if (op != nullptr) {
429  op->getConnection().setCarrierParams(params);
430  }
431 }
432 
434 {
435  if (op != nullptr) {
436  op->getConnection().getCarrierParams(params);
437  }
438 }
439 
441 {
442  return op;
443 }
float t
virtual bool isLocal() const =0
Check if carrier operates within a single process.
virtual bool isTextMode() const =0
Check if carrier is textual in nature.
virtual void prepareDisconnect()=0
Do cleanup and preparation for the coming disconnect, if necessary.
virtual bool isConnectionless() const =0
Check if this carrier is connectionless (like udp, mcast) or connection based (like tcp).
virtual void getCarrierParams(yarp::os::Property &params) const =0
Get carrier configuration and deliver it by port administrative commands.
virtual void handleEnvelope(const std::string &envelope)=0
Carriers that do not distinguish data from administrative headers (i.e.
virtual bool acceptOutgoingData(const PortWriter &writer)=0
Determine whether outgoing data should be accepted.
virtual bool modifiesReply() const =0
Check if this carrier modifies outgoing data through the Carrier::modifyReply method.
virtual PortReader & modifyReply(PortReader &reader)=0
Modify reply payload data, if appropriate.
virtual bool isActive() const =0
Check if carrier is alive and error free.
virtual bool isBareMode() const
Check if carrier excludes type information from payload.
Definition: Connection.cpp:20
virtual bool canEscape() const =0
Check if carrier can encode administrative messages, as opposed to just user data.
virtual const PortWriter & modifyOutgoingData(const PortWriter &writer)=0
Modify outgoing payload data, if appropriate.
virtual bool supportReply() const =0
This flag is used by YARP to determine whether the connection can carry RPC traffic,...
virtual bool modifiesOutgoingData() const =0
Check if this carrier modifies outgoing data through the Carrier::modifyOutgoingData method.
virtual bool isBroadcast() const =0
Check if this carrier uses a broadcast mechanism.
virtual void setCarrierParams(const yarp::os::Property &params)=0
Configure carrier from port administrative commands.
Simple specification of the minimum functions needed from input streams.
Definition: InputStream.h:29
virtual int read()
Read and return a single byte.
Definition: InputStream.cpp:23
An abstraction for a block of bytes, with optional responsibility for allocating/destroying that bloc...
Definition: ManagedBytes.h:25
Simple abstraction for a YARP port name.
Definition: Name.h:22
bool isRooted() const
Check if port name begins with "/".
Definition: Name.cpp:19
static int disconnectInput(const std::string &src, const std::string &dest, bool silent=false)
Sends a disconnection command to the specified port.
Definition: Network.cpp:1555
The output side of an active connection between two ports.
virtual const Route & getRoute() const =0
virtual Connection & getConnection()=0
Get the connection whose protocol operations we are managing.
virtual Connection & getSender()=0
It is possible to chain a basic connection with a modifier.
virtual InputStream & getInputStream()=0
Access the input stream associated with the connection.
virtual void close()=0
Negotiate an end to operations.
virtual void interrupt()=0
virtual void beginWrite()=0
Notify connection that we intend to write to it.
virtual bool isOk() const =0
Check if the connection is valid and can be used.
virtual bool write(SizedWriter &writer)=0
Write a message on the connection.
Information about a port connection or event.
Definition: PortInfo.h:29
std::string targetName
Name of connection target, if any.
Definition: PortInfo.h:66
std::string carrierName
Name of protocol type, if releveant.
Definition: PortInfo.h:69
bool incoming
True if a connection is incoming, false if outgoing.
Definition: PortInfo.h:54
std::string message
A human-readable description of contents.
Definition: PortInfo.h:72
bool created
True if a connection is created, false if destroyed.
Definition: PortInfo.h:57
std::string portName
Name of port.
Definition: PortInfo.h:60
int tag
Type of information.
Definition: PortInfo.h:51
std::string sourceName
Name of connection source, if any.
Definition: PortInfo.h:63
@ PORTINFO_CONNECTION
Information about an incoming or outgoing connection.
Definition: PortInfo.h:43
Interface implemented by all objects that can read themselves from the network, such as Bottle object...
Definition: PortReader.h:28
Interface implemented by all objects that can write themselves to the network, such as Bottle objects...
Definition: PortWriter.h:27
virtual bool write(ConnectionWriter &writer) const =0
Write this object to a network connection.
This is a base class for objects that can be both read from and be written to the YARP network.
Definition: Portable.h:29
A class for storing options and configuration information.
Definition: Property.h:37
Information about a connection between two ports.
Definition: Route.h:32
const std::string & getToName() const
Get the destination of the route.
Definition: Route.cpp:106
const std::string & getCarrierName() const
Get the carrier type of the route.
Definition: Route.cpp:126
std::string toString() const
Render a text form of the route, "source->carrier->dest".
Definition: Route.cpp:141
const std::string & getFromName() const
Get the source of the route.
Definition: Route.cpp:96
void wait()
Decrement the counter, even if we must wait to do that.
Definition: Semaphore.cpp:99
void post()
Increment the counter.
Definition: Semaphore.cpp:114
A helper for creating cached object descriptions.
Simple Readable and Writable object representing a command to a YARP port.
Definition: PortCommand.h:29
void getCarrierParams(yarp::os::Property &params) override
void * takeTracker() override
Reacquire a tracker previously passed via send().
void setCarrierParams(const yarp::os::Property &params) override
Set arbitrary parameters for this connection.
void * send(const yarp::os::PortWriter &writer, yarp::os::PortReader *reader, const yarp::os::PortWriter *callback, void *tracker, const std::string &envelopeString, bool waitAfter, bool waitBefore, bool *gotReply) override
Send a message on the connection.
PortCoreOutputUnit(PortCore &owner, int index, OutputProtocol *op)
Constructor.
bool start() override
Prepare to serve this output.
void run() override
The body of a thread managing background sends.
virtual void runSingleThreaded()
Perform send operations without a separate thread.
This manages a single threaded resource related to a single input or output connection.
Definition: PortCoreUnit.h:30
void setMode()
Check the carrier used for the connection, and see if it has a "log" modifier.
Definition: PortCoreUnit.h:189
void setDoomed()
Request that this connection be shut down as soon as possible.
Definition: PortCoreUnit.h:100
void notifyCompletion(void *tracker)
Call the right onCompletion() after sending message.
Definition: PortCore.cpp:1456
void report(const yarp::os::PortInfo &info)
Handle a port event (connection, disconnection, etc) Generate a description of the connections associ...
Definition: PortCore.cpp:1204
void reportUnit(PortCoreUnit *unit, bool active)
Called by a connection handler with active=true just after it is fully configured,...
Definition: PortCore.cpp:2914
int join(double seconds=-1)
Definition: ThreadImpl.cpp:123
#define yCInfo(component,...)
Definition: LogComponent.h:135
#define yCError(component,...)
Definition: LogComponent.h:157
#define yCAssert(component, x)
Definition: LogComponent.h:172
#define yCDebug(component,...)
Definition: LogComponent.h:112
#define YARP_OS_LOG_COMPONENT(name, name_string)
Definition: LogComponent.h:37
The components from which ports and connections are built.
An interface to the operating system, including Port based communication.