YARP
Yet Another Robot Platform
PortCoreInputUnit.cpp
Go to the documentation of this file.
1 /*
2  * SPDX-FileCopyrightText: 2006-2021 Istituto Italiano di Tecnologia (IIT)
3  * SPDX-FileCopyrightText: 2006-2010 RobotCub Consortium
4  * SPDX-License-Identifier: BSD-3-Clause
5  */
6 
8 
9 #include <yarp/os/Name.h>
10 #include <yarp/os/Os.h>
11 #include <yarp/os/PortInfo.h>
12 #include <yarp/os/PortReport.h>
13 #include <yarp/os/Time.h>
18 
19 #include <cstdio>
20 
21 
22 using namespace yarp::os::impl;
23 using namespace yarp::os;
24 
25 namespace {
26 YARP_OS_LOG_COMPONENT(PORTCOREINPUTUNIT, "yarp.os.impl.PortCoreInputUnit")
27 } // namespace
28 
30  int index,
31  InputProtocol* ip,
32  bool reversed) :
33  PortCoreUnit(owner, index),
34  ip(ip),
35  phase(1),
36  access(1),
37  closing(false),
38  finished(false),
39  running(false),
40  name(owner.getName()),
41  localReader(nullptr),
42  reversed(reversed)
43 {
44  yCAssert(PORTCOREINPUTUNIT, ip != nullptr);
45 
47  if (creator != nullptr) {
48  localReader = creator->create();
49  }
50 }
51 
53 {
54  closeMain();
55  if (localReader != nullptr) {
56  delete localReader;
57  localReader = nullptr;
58  }
59 }
60 
61 
63 {
64  yCDebug(PORTCOREINPUTUNIT, "new input connection to %s starting", getOwner().getName().c_str());
65 
66  phase.wait();
67 
68  bool result = PortCoreUnit::start();
69  if (result) {
70  yCDebug(PORTCOREINPUTUNIT, "new input connection to %s started ok", getOwner().getName().c_str());
71  phase.wait();
72  phase.post();
73  } else {
74  yCDebug(PORTCOREINPUTUNIT, "new input connection to %s failed to start", getOwner().getName().c_str());
75  phase.post();
76  }
77 
78  return result;
79 }
80 
81 
83 {
84  running = true;
85  phase.post();
86 
87  Route route;
88  bool wasNoticed = false;
89  bool posted = false;
90 
91  bool done = false;
92 
93  yCAssert(PORTCOREINPUTUNIT, ip != nullptr);
94 
95  PortCommand cmd;
96 
97  bool ok = true;
98  if (!reversed) {
99  ip->open(getName());
100  }
101  if (!ok) {
102  yCDebug(PORTCOREINPUTUNIT, "new input connection to %s is broken", getOwner().getName().c_str());
103  done = true;
104  } else {
105  route = ip->getRoute();
106 
107  // just before going official, tag any lurking inputs from
108  // the same source as undesired
109  if (Name(route.getFromName()).isRooted()) {
110  yCDebug(PORTCOREINPUTUNIT, "Port %s starting up, flushing routes %s->*->%s",
111  getOwner().getName().c_str(),
112  route.getFromName().c_str(),
113  route.getToName().c_str());
115  route.getToName(),
116  "*"),
117  true);
118  }
119  officialRoute = route;
120  setMode();
121  getOwner().reportUnit(this, true);
122 
123  std::string msg = "Receiving input from " + route.getFromName() + " to " + route.getToName() + " using " + route.getCarrierName();
124  if (Name(route.getFromName()).isRooted() && (reversed || ip->getConnection().isPush())) {
125  yCInfo(PORTCOREINPUTUNIT, "%s", msg.c_str());
126  posted = true;
127  } else {
128  yCDebug(PORTCOREINPUTUNIT, "%s", msg.c_str());
129  }
130 
131  // Report the new connection
132  PortInfo info;
133  info.message = msg;
135  info.incoming = true;
136  info.created = true;
137  info.sourceName = route.getFromName();
138  info.targetName = route.getToName();
139  info.portName = info.targetName;
140  info.carrierName = route.getCarrierName();
141 
142  if (info.sourceName != "admin" && info.sourceName != "null") {
143  getOwner().report(info);
144  wasNoticed = true;
145  }
146  }
147 
148  if (!reversed) {
149  if (!ip->getConnection().isPush()) {
150  /* IP=OP */
151  OutputProtocol* op = &(ip->getOutput());
152  Route r = op->getRoute();
153  // reverse route
154  r.swapNames();
155  op->rename(r);
156 
157  getOwner().addOutput(op);
158  ip = nullptr;
159  done = true;
160  }
161  }
162 
163  if (closing) {
164  done = true;
165  }
166 
167  auto* id = reinterpret_cast<void*>(this);
168 
169  if (ip != nullptr && !ip->getConnection().canEscape()) {
170  InputStream* is = &ip->getInputStream();
171  is->setReadEnvelopeCallback(envelopeReadCallback, this);
172  }
173 
174  while (!done) {
175  if (ip == nullptr) {
176  break;
177  }
178  ConnectionReader& br = ip->beginRead();
179 
180  if (br.getReference() != nullptr) {
181  //printf("HAVE A REFERENCE\n");
182  if (localReader != nullptr) {
183  bool ok = localReader->read(br);
184  if (!br.isActive()) {
185  break;
186  }
187  if (!ok) {
188  continue;
189  }
190  } else {
191  PortCore& man = getOwner();
192  bool ok = man.readBlock(br, id, nullptr);
193  if (!br.isActive()) {
194  break;
195  }
196  if (!ok) {
197  continue;
198  }
199  }
200  //printf("DONE WITH A REFERENCE\n");
201  if (ip != nullptr) {
202  ip->endRead();
203  }
204  continue;
205  }
206 
207  if (ip->getConnection().canEscape()) {
208  bool ok = cmd.read(br);
209  if (!br.isActive()) {
210  break;
211  }
212  if (!ok) {
213  continue;
214  }
215  } else {
216  cmd = PortCommand('d', "");
217  if (!ip->isOk()) {
218  break;
219  }
220  }
221 
222  if (closing || isDoomed()) {
223  break;
224  }
225  char key = cmd.getKey();
226  //printf("Port command is [%c:%d/%s]\n",
227  // (key>=32)?key:'?', key, cmd.getText().c_str());
228 
229  PortCore& man = getOwner();
230  OutputStream* os = nullptr;
231  if (br.isTextMode()) {
232  os = &(ip->getOutputStream());
233  }
234 
235  switch (key) {
236  case '/':
237  yCDebug(PORTCOREINPUTUNIT,
238  "Port command (%s): %s should add connection: %s",
239  route.toString().c_str(),
240  getOwner().getName().c_str(),
241  cmd.getText().c_str());
242  man.addOutput(cmd.getText(), id, os);
243  break;
244  case '!':
245  yCDebug(PORTCOREINPUTUNIT,
246  "Port command (%s): %s should remove output: %s",
247  route.toString().c_str(),
248  getOwner().getName().c_str(),
249  cmd.getText().c_str());
250  man.removeOutput(cmd.getText().substr(1, std::string::npos), id, os);
251  break;
252  case '~':
253  yCDebug(PORTCOREINPUTUNIT,
254  "Port command (%s): %s should remove input: %s",
255  route.toString().c_str(),
256  getOwner().getName().c_str(),
257  cmd.getText().c_str());
258  man.removeInput(cmd.getText().substr(1, std::string::npos), id, os);
259  break;
260  case '*':
261  man.describe(id, os);
262  break;
263  case 'D':
264  case 'd': {
265  if (key == 'D') {
266  ip->suppressReply();
267  }
268 
269  std::string env = cmd.getText();
270  if (env.length() > 2) {
271  yCTrace(PORTCOREINPUTUNIT, "***** received an envelope! [%s]", env.c_str());
272  std::string env2 = env.substr(2, env.length());
273  man.setEnvelope(env2);
274  ip->setEnvelope(env2);
275  }
276  if (localReader != nullptr) {
277  localReader->read(br);
278  if (!br.isActive()) {
279  done = true;
280  break;
281  }
282  } else {
283  if (ip->getReceiver().acceptIncomingData(br)) {
286  modifier.inputMutex.lock();
287  if (modifier.inputModifier != nullptr) {
288  if (modifier.inputModifier->acceptIncomingData(*cr)) {
289  cr = &(modifier.inputModifier->modifyIncomingData(*cr));
290  modifier.inputMutex.unlock();
291  man.readBlock(*cr, id, os);
292  } else {
293  modifier.inputMutex.unlock();
294  skipIncomingData(*cr);
295  }
296  } else {
297  modifier.inputMutex.unlock();
298  man.readBlock(*cr, id, os);
299  }
300  } else {
301  skipIncomingData(br);
302  }
303  if (!br.isActive()) {
304  done = true;
305  break;
306  }
307  }
308  } break;
309  case 'a': {
310  man.adminBlock(br, id);
311  } break;
312  case 'r':
313  /*
314  In YARP implementation, OP=IP.
315  (This information is used rarely, and when used
316  is tagged with OP=IP keyword)
317  If it were not true, memory alloc would need to
318  reorganized here
319  */
320  {
321  OutputProtocol* op = &(ip->getOutput());
322  ip->endRead();
323  Route r = op->getRoute();
324  // reverse route
325  r.swapNames();
326  op->rename(r);
327 
328  getOwner().addOutput(op);
329  ip = nullptr;
330  done = true;
331  }
332  break;
333  case 'q':
334  done = true;
335  break;
336 #if !defined(NDEBUG)
337  case 'i':
338  printf("Interrupt requested\n");
339  //yarp::os::impl::kill(0, 2); // SIGINT
340  //yarp::os::impl::kill(yarp::os::getpid(), 2); // SIGINT
341  yarp::os::impl::kill(yarp::os::getpid(), 15); // SIGTERM
342  break;
343 #endif
344  case '?':
345  case 'h':
346  if (os != nullptr) {
347  BufferedConnectionWriter bw(true);
348  bw.appendLine("This is a YARP port. Here are the commands it responds to:");
349  bw.appendLine("* Gives a description of this port");
350  bw.appendLine("d Signals the beginning of input for the port's owner");
351  bw.appendLine(R"(do The same as "d" except replies should be suppressed ("data-only"))");
352  bw.appendLine("q Disconnects");
353 #if !defined(NDEBUG)
354  bw.appendLine("i Interrupt parent process (unix only)");
355 #endif
356  bw.appendLine("r Reverse connection type to be a reader");
357  bw.appendLine("/port Requests to send output to /port");
358  bw.appendLine("!/port Requests to stop sending output to /port");
359  bw.appendLine("~/port Requests to stop receiving input from /port");
360  bw.appendLine("a Signals the beginning of an administrative message");
361  bw.appendLine("? Gives this help");
362  bw.write(*os);
363  }
364  break;
365  default:
366  if (os != nullptr) {
367  BufferedConnectionWriter bw(true);
368  bw.appendLine("Port command not understood.");
369  bw.appendLine("Type d to send data to the port's owner.");
370  bw.appendLine("Type ? for help.");
371  bw.write(*os);
372  }
373  break;
374  }
375  if (ip != nullptr) {
376  ip->endRead();
377  }
378  if (ip == nullptr) {
379  break;
380  }
381  if (closing || isDoomed() || (!ip->isOk())) {
382  break;
383  }
384  }
385 
386  setDoomed();
387 
388  yCDebug(PORTCOREINPUTUNIT, "Closing ip");
389  access.wait();
390  if (ip != nullptr) {
391  ip->close();
392  }
393  access.post();
394  yCDebug(PORTCOREINPUTUNIT, "Closed ip");
395 
396  std::string msg = std::string("Removing input from ") + route.getFromName() + " to " + route.getToName();
397 
398  if (Name(route.getFromName()).isRooted()) {
399  if (posted) {
400  yCInfo(PORTCOREINPUTUNIT, "%s", msg.c_str());
401  }
402  } else {
403  yCDebug(PORTCOREINPUTUNIT, "(unrooted) shutting down");
404  }
405 
406  getOwner().reportUnit(this, false);
407 
408  if (wasNoticed) {
409  // Report the disappearing connection
410  PortInfo info;
411  info.message = msg;
413  info.incoming = true;
414  info.created = false;
415  info.sourceName = route.getFromName();
416  info.targetName = route.getToName();
417  info.portName = info.targetName;
418  info.carrierName = route.getCarrierName();
419 
420  if (info.sourceName != "admin") {
421  getOwner().report(info);
422  }
423  }
424 
425  if (localReader != nullptr) {
426  delete localReader;
427  localReader = nullptr;
428  }
429 
430  running = false;
431  finished = true;
432 
433  // it would be nice to get my entry removed from the port immediately,
434  // but it would be a bit dodgy to delete this object and join this
435  // thread within and from themselves
436 }
437 
439 {
440  return true;
441 }
442 
444 {
445  closeMain();
446 }
447 
449 {
450  return finished;
451 }
452 
453 const std::string& PortCoreInputUnit::getName()
454 {
455  return name;
456 }
457 
459 {
460  // give a kick (unfortunately unavoidable)
461  access.wait();
462  if (!closing) {
463  if (ip != nullptr) {
464  ip->interrupt();
465  }
466  closing = true;
467  }
468  access.post();
469  return true;
470 }
471 
473 {
474  if (ip != nullptr) {
475  ip->getReceiver().setCarrierParams(params);
476  }
477 }
478 
480 {
481  if (ip != nullptr) {
482  ip->getReceiver().getCarrierParams(params);
483  }
484 }
485 
486 // return the protocol object
488 {
489  return ip;
490 }
491 
492 void PortCoreInputUnit::closeMain()
493 {
494  access.wait();
495  Route r = getRoute();
496  access.post();
497 
498  yCDebug(PORTCOREINPUTUNIT, "[%s] closing", r.toString().c_str());
499 
500  if (running) {
501  yCDebug(PORTCOREINPUTUNIT, "[%s] joining", r.toString().c_str());
502  interrupt();
503  join();
504  yCDebug(PORTCOREINPUTUNIT, "[%s] joined", r.toString().c_str());
505  }
506 
507  if (ip != nullptr) {
508  ip->close();
509  delete ip;
510  ip = nullptr;
511  }
512  running = false;
513  closing = false;
514 }
515 
516 
518 {
519  return officialRoute;
520 }
521 
522 
523 bool PortCoreInputUnit::skipIncomingData(yarp::os::ConnectionReader& reader)
524 {
525  size_t pending = reader.getSize();
526  if (pending > 0) {
527  while (pending > 0) {
528  char buf[10000];
529  size_t next = (pending < sizeof(buf)) ? pending : sizeof(buf);
530  reader.expectBlock(&buf[0], next);
531  pending -= next;
532  }
533  return true;
534  }
535  return false;
536 }
537 
538 
540 {
541  bool busy = false;
542  access.wait();
543  if (ip != nullptr) {
544  busy = ip->isReplying();
545  }
546  access.post();
547  return busy;
548 }
549 
550 
551 void PortCoreInputUnit::envelopeReadCallback(void* data, const Bytes& envelope)
552 {
553  auto* p = reinterpret_cast<PortCoreInputUnit*>(data);
554  if (p == nullptr) {
555  return;
556  }
557  p->getOwner().setEnvelope(envelope.get());
558  p->ip->setEnvelope(envelope.get());
559 }
A simple abstraction for a block of bytes.
Definition: Bytes.h:25
const char * get() const
Definition: Bytes.cpp:27
ConnectionReader & modifyIncomingData(ConnectionReader &reader) override
Modify incoming payload data, if appropriate.
Definition: Carrier.cpp:59
bool acceptIncomingData(ConnectionReader &reader) override
Determine whether incoming data should be accepted.
Definition: Carrier.cpp:64
An interface for reading from a network connection.
virtual bool isTextMode() const =0
Check if the connection is text mode.
virtual size_t getSize() const =0
Checks how much data is available.
virtual bool expectBlock(char *data, size_t len)=0
Read a block of data from the network connection.
virtual Portable * getReference() const =0
Get a direct pointer to the object being sent, if possible.
virtual bool isActive() const =0
virtual bool acceptIncomingData(yarp::os::ConnectionReader &reader)=0
Determine whether incoming data should be accepted.
virtual void getCarrierParams(yarp::os::Property &params) const =0
Get carrier configuration and deliver it by port administrative commands.
virtual yarp::os::ConnectionReader & modifyIncomingData(yarp::os::ConnectionReader &reader)=0
Modify incoming payload data, if appropriate.
virtual bool canEscape() const =0
Check if carrier can encode administrative messages, as opposed to just user data.
virtual bool isPush() const =0
Check if carrier is "push" or "pull" style.
virtual void setCarrierParams(const yarp::os::Property &params)=0
Configure carrier from port administrative commands.
The input side of an active connection between two ports.
Definition: InputProtocol.h:35
virtual bool open(const std::string &name)=0
Start negotiating a carrier, using the given name as our own if a name is needed (this should general...
virtual bool isOk() const =0
Check if the connection is valid and can be used.
virtual const Route & getRoute() const =0
Get the route associated with this connection.
virtual void close()=0
Negotiate an end to operations.
virtual OutputStream & getOutputStream()=0
Access the output stream associated with the connection.
virtual ConnectionReader & beginRead()=0
Begin a read operation, with bytes read via the returned yarp::os::ConnectionReader object.
virtual Connection & getReceiver()=0
It is possible to chain a basic connection with a modifier.
virtual void setEnvelope(const std::string &str)=0
Set the envelope that will be attached to the next message.
virtual void endRead()=0
End the current read operation, begin by beginRead().
virtual Connection & getConnection()=0
Get the basic connection through which we are communicating.
virtual OutputProtocol & getOutput()=0
Get an interface for doing write operations on the connection.
virtual void suppressReply()=0
Make sure that any attempt to send a reply to input will be denied.
virtual InputStream & getInputStream()=0
Access the input stream associated with the connection.
virtual void interrupt()=0
Try to get operations interrupted.
virtual bool isReplying() const =0
Simple specification of the minimum functions needed from input streams.
Definition: InputStream.h:26
virtual bool setReadEnvelopeCallback(readEnvelopeCallbackType callback, void *data)
Install a callback that the InputStream will have to call when the envelope is read from a message in...
Simple abstraction for a YARP port name.
Definition: Name.h:19
bool isRooted() const
Check if port name begins with "/".
Definition: Name.cpp:16
The output side of an active connection between two ports.
virtual const Route & getRoute() const =0
virtual void rename(const Route &route)=0
Relabel the route after the fact (e.g.
Simple specification of the minimum functions needed from output streams.
Definition: OutputStream.h:22
Information about a port connection or event.
Definition: PortInfo.h:26
std::string targetName
Name of connection target, if any.
Definition: PortInfo.h:63
std::string carrierName
Name of protocol type, if releveant.
Definition: PortInfo.h:66
bool incoming
True if a connection is incoming, false if outgoing.
Definition: PortInfo.h:51
std::string message
A human-readable description of contents.
Definition: PortInfo.h:69
bool created
True if a connection is created, false if destroyed.
Definition: PortInfo.h:54
std::string portName
Name of port.
Definition: PortInfo.h:57
int tag
Type of information.
Definition: PortInfo.h:48
std::string sourceName
Name of connection source, if any.
Definition: PortInfo.h:60
@ PORTINFO_CONNECTION
Information about an incoming or outgoing connection.
Definition: PortInfo.h:40
A creator for readers.
virtual PortReader * create() const =0
Factory for PortReader objects.
virtual bool read(ConnectionReader &reader)=0
Read this object from a network connection.
A class for storing options and configuration information.
Definition: Property.h:34
Information about a connection between two ports.
Definition: Route.h:29
const std::string & getToName() const
Get the destination of the route.
Definition: Route.cpp:103
const std::string & getCarrierName() const
Get the carrier type of the route.
Definition: Route.cpp:123
std::string toString() const
Render a text form of the route, "source->carrier->dest".
Definition: Route.cpp:138
const std::string & getFromName() const
Get the source of the route.
Definition: Route.cpp:93
void swapNames()
Swap from and to names.
Definition: Route.cpp:133
void wait()
Decrement the counter, even if we must wait to do that.
Definition: Semaphore.cpp:96
void post()
Increment the counter.
Definition: Semaphore.cpp:111
A helper for creating cached object descriptions.
bool write(ConnectionWriter &connection) const override
Write this object to a network connection.
virtual void appendLine(const std::string &data)
Send a string along with a carriage-return-line-feed sequence.
Simple Readable and Writable object representing a command to a YARP port.
Definition: PortCommand.h:26
bool read(yarp::os::ConnectionReader &reader) override
Read this object from a network connection.
Definition: PortCommand.cpp:20
Manager for a single input to a port.
PortCoreInputUnit(PortCore &owner, int index, InputProtocol *ip, bool reversed)
Constructor.
void run() override
The body of the thread associated with this input.
bool start() override
Start a thread running to serve this input.
void getCarrierParams(yarp::os::Property &params) override
void setCarrierParams(const yarp::os::Property &params) override
Set arbitrary parameters for this connection.
bool interrupt() override
Interrupt the connection.
This manages a single threaded resource related to a single input or output connection.
Definition: PortCoreUnit.h:27
void setMode()
Check the carrier used for the connection, and see if it has a "log" modifier.
Definition: PortCoreUnit.h:186
void setDoomed()
Request that this connection be shut down as soon as possible.
Definition: PortCoreUnit.h:97
bool readBlock(ConnectionReader &reader, void *id, yarp::os::OutputStream *os)
Read a block of regular payload data.
Definition: PortCore.cpp:1199
yarp::os::PortReaderCreator * getReadCreator()
Get the creator of callbacks.
Definition: PortCore.cpp:2996
void setEnvelope(const std::string &envelope)
Set some envelope information to pass along with a message without actually being part of the message...
Definition: PortCore.cpp:1457
bool removeIO(const Route &route, bool synch=false)
Remove any connection matching the supplied route.
Definition: PortCore.cpp:2961
void report(const yarp::os::PortInfo &info)
Handle a port event (connection, disconnection, etc) Generate a description of the connections associ...
Definition: PortCore.cpp:1185
void removeOutput(const std::string &dest, void *id, yarp::os::OutputStream *os)
Remove an output connection.
Definition: PortCore.cpp:1012
void reportUnit(PortCoreUnit *unit, bool active)
Called by a connection handler with active=true just after it is fully configured,...
Definition: PortCore.cpp:2889
void describe(void *id, yarp::os::OutputStream *os)
Produce a text description of the port and its connections.
Definition: PortCore.cpp:1048
bool adminBlock(ConnectionReader &reader, void *id)
Read a block of administrative data.
Definition: PortCore.cpp:1659
void removeInput(const std::string &src, void *id, yarp::os::OutputStream *os)
Remove an input connection.
Definition: PortCore.cpp:1030
yarp::os::impl::PortDataModifier & getPortModifier()
Definition: PortCore.cpp:3117
bool addOutput(const std::string &dest, void *id, yarp::os::OutputStream *os, bool onlyIfNeeded=false)
Add an output connection to this port.
Definition: PortCore.cpp:842
This is the heart of a yarp port.
Definition: PortCore.h:110
yarp::os::Carrier * inputModifier
Definition: PortCore.h:144
int join(double seconds=-1)
Definition: ThreadImpl.cpp:120
#define yCInfo(component,...)
Definition: LogComponent.h:132
#define yCAssert(component, x)
Definition: LogComponent.h:169
#define yCTrace(component,...)
Definition: LogComponent.h:85
#define yCDebug(component,...)
Definition: LogComponent.h:109
#define YARP_OS_LOG_COMPONENT(name, name_string)
Definition: LogComponent.h:35
The components from which ports and connections are built.
An interface to the operating system, including Port based communication.
int getpid()
Portable wrapper for the getppid() function.
Definition: Os.cpp:91