YARP
Yet Another Robot Platform
PortCoreInputUnit.cpp
Go to the documentation of this file.
1/*
2 * SPDX-FileCopyrightText: 2006-2021 Istituto Italiano di Tecnologia (IIT)
3 * SPDX-FileCopyrightText: 2006-2010 RobotCub Consortium
4 * SPDX-License-Identifier: BSD-3-Clause
5 */
6
8
9#include <yarp/os/Name.h>
10#include <yarp/os/Os.h>
11#include <yarp/os/PortInfo.h>
12#include <yarp/os/PortReport.h>
13#include <yarp/os/Time.h>
18
19#include <cstdio>
20
21
22using namespace yarp::os::impl;
23using namespace yarp::os;
24
25namespace {
26YARP_OS_LOG_COMPONENT(PORTCOREINPUTUNIT, "yarp.os.impl.PortCoreInputUnit")
27} // namespace
28
29PortCoreInputUnit::PortCoreInputUnit(PortCore& owner,
30 int index,
31 InputProtocol* ip,
32 bool reversed) :
33 PortCoreUnit(owner, index),
34 ip(ip),
35 phase(1),
36 access(1),
37 closing(false),
38 finished(false),
39 running(false),
40 name(owner.getName()),
41 localReader(nullptr),
42 reversed(reversed)
43{
44 yCIAssert(PORTCOREINPUTUNIT, getName(), ip != nullptr);
45
47 if (creator != nullptr) {
48 localReader = creator->create();
49 }
50}
51
53{
54 closeMain();
55 if (localReader != nullptr) {
56 delete localReader;
57 localReader = nullptr;
58 }
59}
60
61
63{
64 yCIDebug(PORTCOREINPUTUNIT, getName(), "new input connection to %s starting", getOwner().getName().c_str());
65
66 phase.wait();
67
68 bool result = PortCoreUnit::start();
69 if (result) {
70 yCIDebug(PORTCOREINPUTUNIT, getName(), "new input connection to %s started ok", getOwner().getName().c_str());
71 phase.wait();
72 phase.post();
73 } else {
74 yCIDebug(PORTCOREINPUTUNIT, getName(), "new input connection to %s failed to start", getOwner().getName().c_str());
75 phase.post();
76 }
77
78 return result;
79}
80
81
83{
84 running = true;
85 phase.post();
86
87 Route route;
88 bool wasNoticed = false;
89 bool posted = false;
90
91 bool done = false;
92
93 yCIAssert(PORTCOREINPUTUNIT, getName(), ip != nullptr);
94
95 PortCommand cmd;
96
97 bool ok = true;
98 if (!reversed) {
99 ip->open(getName());
100 }
101 if (!ok) {
102 yCIDebug(PORTCOREINPUTUNIT, getName(), "new input connection to %s is broken", getOwner().getName().c_str());
103 done = true;
104 } else {
105 route = ip->getRoute();
106
107 // just before going official, tag any lurking inputs from
108 // the same source as undesired
109 if (Name(route.getFromName()).isRooted()) {
110 yCIDebug(PORTCOREINPUTUNIT, getName(), "Port %s starting up, flushing routes %s->*->%s",
111 getOwner().getName().c_str(),
112 route.getFromName().c_str(),
113 route.getToName().c_str());
115 route.getToName(),
116 "*"),
117 true);
118 }
119 officialRoute = route;
120 setMode();
121 getOwner().reportUnit(this, true);
122
123 std::string msg = "Receiving input from " + route.getFromName() + " to " + route.getToName() + " using " + route.getCarrierName();
124 if (Name(route.getFromName()).isRooted() && (reversed || ip->getConnection().isPush())) {
125 yCIInfo(PORTCOREINPUTUNIT, getName(), "%s", msg.c_str());
126 posted = true;
127 } else {
128 yCIDebug(PORTCOREINPUTUNIT, getName(), "%s", msg.c_str());
129 }
130
131 // Report the new connection
132 PortInfo info;
133 info.message = msg;
135 info.incoming = true;
136 info.created = true;
137 info.sourceName = route.getFromName();
138 info.targetName = route.getToName();
139 info.portName = info.targetName;
140 info.carrierName = route.getCarrierName();
141
142 if (info.sourceName != "admin" && info.sourceName != "null") {
143 getOwner().report(info);
144 wasNoticed = true;
145 }
146 }
147
148 if (!reversed) {
149 if (!ip->getConnection().isPush()) {
150 /* IP=OP */
151 OutputProtocol* op = &(ip->getOutput());
152 Route r = op->getRoute();
153 // reverse route
154 r.swapNames();
155 op->rename(r);
156
157 getOwner().addOutput(op);
158 ip = nullptr;
159 done = true;
160 }
161 }
162
163 if (closing) {
164 done = true;
165 }
166
167 auto* id = reinterpret_cast<void*>(this);
168
169 if (ip != nullptr && !ip->getConnection().canEscape()) {
170 InputStream* is = &ip->getInputStream();
171 is->setReadEnvelopeCallback(envelopeReadCallback, this);
172 }
173
174 while (!done) {
175 if (ip == nullptr) {
176 break;
177 }
178 ConnectionReader& br = ip->beginRead();
179
180 if (br.getReference() != nullptr) {
181 //printf("HAVE A REFERENCE\n");
182 if (localReader != nullptr) {
183 bool ok = localReader->read(br);
184 if (!br.isActive()) {
185 break;
186 }
187 if (!ok) {
188 continue;
189 }
190 } else {
191 PortCore& man = getOwner();
192 bool ok = man.readBlock(br, id, nullptr);
193 if (!br.isActive()) {
194 break;
195 }
196 if (!ok) {
197 continue;
198 }
199 }
200 //printf("DONE WITH A REFERENCE\n");
201 if (ip != nullptr) {
202 ip->endRead();
203 }
204 continue;
205 }
206
207 if (ip->getConnection().canEscape()) {
208 bool ok = cmd.read(br);
209 if (!br.isActive()) {
210 break;
211 }
212 if (!ok) {
213 continue;
214 }
215 } else {
216 cmd = PortCommand('d', "");
217 if (!ip->isOk()) {
218 break;
219 }
220 }
221
222 if (closing || isDoomed()) {
223 break;
224 }
225 char key = cmd.getKey();
226 //printf("Port command is [%c:%d/%s]\n",
227 // (key>=32)?key:'?', key, cmd.getText().c_str());
228
229 PortCore& man = getOwner();
230 OutputStream* os = nullptr;
231 if (br.isTextMode()) {
232 os = &(ip->getOutputStream());
233 }
234
235 switch (key) {
236 case '/':
237 yCIDebug(PORTCOREINPUTUNIT,
238 getName(),
239 "Port command (%s): %s should add connection: %s",
240 route.toString().c_str(),
241 getOwner().getName().c_str(),
242 cmd.getText().c_str());
243 man.addOutput(cmd.getText(), id, os);
244 break;
245 case '!':
246 yCIDebug(PORTCOREINPUTUNIT,
247 getName(),
248 "Port command (%s): %s should remove output: %s",
249 route.toString().c_str(),
250 getOwner().getName().c_str(),
251 cmd.getText().c_str());
252 man.removeOutput(cmd.getText().substr(1, std::string::npos), id, os);
253 break;
254 case '~':
255 yCIDebug(PORTCOREINPUTUNIT,
256 getName(),
257 "Port command (%s): %s should remove input: %s",
258 route.toString().c_str(),
259 getOwner().getName().c_str(),
260 cmd.getText().c_str());
261 man.removeInput(cmd.getText().substr(1, std::string::npos), id, os);
262 break;
263 case '*':
264 man.describe(id, os);
265 break;
266 case 'D':
267 case 'd': {
268 if (key == 'D') {
269 ip->suppressReply();
270 }
271
272 std::string env = cmd.getText();
273 if (env.length() > 2) {
274 yCITrace(PORTCOREINPUTUNIT, getName(), "***** received an envelope! [%s]", env.c_str());
275 std::string env2 = env.substr(2, env.length());
276 man.setEnvelope(env2);
277 ip->setEnvelope(env2);
278 }
279 if (localReader != nullptr) {
280 localReader->read(br);
281 if (!br.isActive()) {
282 done = true;
283 break;
284 }
285 } else {
286 if (ip->getReceiver().acceptIncomingData(br)) {
289 modifier.inputMutex.lock();
290 if (modifier.inputModifier != nullptr) {
291 if (modifier.inputModifier->acceptIncomingData(*cr)) {
292 cr = &(modifier.inputModifier->modifyIncomingData(*cr));
293 modifier.inputMutex.unlock();
294 man.readBlock(*cr, id, os);
295 } else {
296 modifier.inputMutex.unlock();
297 skipIncomingData(*cr);
298 }
299 } else {
300 modifier.inputMutex.unlock();
301 man.readBlock(*cr, id, os);
302 }
303 } else {
304 skipIncomingData(br);
305 }
306 if (!br.isActive()) {
307 done = true;
308 break;
309 }
310 }
311 } break;
312 case 'a': {
313 man.adminBlock(br, id);
314 } break;
315 case 'r':
316 /*
317 In YARP implementation, OP=IP.
318 (This information is used rarely, and when used
319 is tagged with OP=IP keyword)
320 If it were not true, memory alloc would need to
321 reorganized here
322 */
323 {
324 OutputProtocol* op = &(ip->getOutput());
325 ip->endRead();
326 Route r = op->getRoute();
327 // reverse route
328 r.swapNames();
329 op->rename(r);
330
331 getOwner().addOutput(op);
332 ip = nullptr;
333 done = true;
334 }
335 break;
336 case 'q':
337 done = true;
338 break;
339#if !defined(NDEBUG)
340 case 'i':
341 printf("Interrupt requested\n");
342 //yarp::os::impl::kill(0, 2); // SIGINT
343 //yarp::os::impl::kill(yarp::os::getpid(), 2); // SIGINT
344 yarp::os::impl::kill(yarp::os::getpid(), 15); // SIGTERM
345 break;
346#endif
347 case '?':
348 case 'h':
349 if (os != nullptr) {
351 bw.appendLine("This is a YARP port. Here are the commands it responds to:");
352 bw.appendLine("* Gives a description of this port");
353 bw.appendLine("d Signals the beginning of input for the port's owner");
354 bw.appendLine(R"(do The same as "d" except replies should be suppressed ("data-only"))");
355 bw.appendLine("q Disconnects");
356#if !defined(NDEBUG)
357 bw.appendLine("i Interrupt parent process (unix only)");
358#endif
359 bw.appendLine("r Reverse connection type to be a reader");
360 bw.appendLine("/port Requests to send output to /port");
361 bw.appendLine("!/port Requests to stop sending output to /port");
362 bw.appendLine("~/port Requests to stop receiving input from /port");
363 bw.appendLine("a Signals the beginning of an administrative message");
364 bw.appendLine("? Gives this help");
365 bw.write(*os);
366 }
367 break;
368 default:
369 if (os != nullptr) {
371 bw.appendLine("Port command not understood.");
372 bw.appendLine("Type d to send data to the port's owner.");
373 bw.appendLine("Type ? for help.");
374 bw.write(*os);
375 }
376 break;
377 }
378 if (ip != nullptr) {
379 ip->endRead();
380 }
381 if (ip == nullptr) {
382 break;
383 }
384 if (closing || isDoomed() || (!ip->isOk())) {
385 break;
386 }
387 }
388
389 setDoomed();
390
391 yCIDebug(PORTCOREINPUTUNIT, getName(), "Closing ip");
392 access.wait();
393 if (ip != nullptr) {
394 ip->close();
395 }
396 access.post();
397 yCIDebug(PORTCOREINPUTUNIT, getName(), "Closed ip");
398
399 std::string msg = std::string("Removing input from ") + route.getFromName() + " to " + route.getToName();
400
401 if (Name(route.getFromName()).isRooted()) {
402 if (posted) {
403 yCIInfo(PORTCOREINPUTUNIT, getName(), "%s", msg.c_str());
404 }
405 } else {
406 yCIDebug(PORTCOREINPUTUNIT, getName(), "(unrooted) shutting down");
407 }
408
409 getOwner().reportUnit(this, false);
410
411 if (wasNoticed) {
412 // Report the disappearing connection
413 PortInfo info;
414 info.message = msg;
416 info.incoming = true;
417 info.created = false;
418 info.sourceName = route.getFromName();
419 info.targetName = route.getToName();
420 info.portName = info.targetName;
421 info.carrierName = route.getCarrierName();
422
423 if (info.sourceName != "admin") {
424 getOwner().report(info);
425 }
426 }
427
428 if (localReader != nullptr) {
429 delete localReader;
430 localReader = nullptr;
431 }
432
433 running = false;
434 finished = true;
435
436 // it would be nice to get my entry removed from the port immediately,
437 // but it would be a bit dodgy to delete this object and join this
438 // thread within and from themselves
439}
440
442{
443 return true;
444}
445
447{
448 closeMain();
449}
450
452{
453 return finished;
454}
455
456const std::string& PortCoreInputUnit::getName()
457{
458 return name;
459}
460
462{
463 // give a kick (unfortunately unavoidable)
464 access.wait();
465 if (!closing) {
466 if (ip != nullptr) {
467 ip->interrupt();
468 }
469 closing = true;
470 }
471 access.post();
472 return true;
473}
474
476{
477 if (ip != nullptr) {
478 ip->getReceiver().setCarrierParams(params);
479 }
480}
481
483{
484 if (ip != nullptr) {
485 ip->getReceiver().getCarrierParams(params);
486 }
487}
488
489// return the protocol object
491{
492 return ip;
493}
494
495void PortCoreInputUnit::closeMain()
496{
497 access.wait();
498 Route r = getRoute();
499 access.post();
500
501 yCIDebug(PORTCOREINPUTUNIT, getName(), "[%s] closing", r.toString().c_str());
502
503 if (running) {
504 yCIDebug(PORTCOREINPUTUNIT, getName(), "[%s] joining", r.toString().c_str());
505 interrupt();
506 join();
507 yCIDebug(PORTCOREINPUTUNIT, getName(), "[%s] joined", r.toString().c_str());
508 }
509
510 if (ip != nullptr) {
511 ip->close();
512 delete ip;
513 ip = nullptr;
514 }
515 running = false;
516 closing = false;
517}
518
519
521{
522 return officialRoute;
523}
524
525
526bool PortCoreInputUnit::skipIncomingData(yarp::os::ConnectionReader& reader)
527{
528 size_t pending = reader.getSize();
529 if (pending > 0) {
530 while (pending > 0) {
531 char buf[10000];
532 size_t next = (pending < sizeof(buf)) ? pending : sizeof(buf);
533 reader.expectBlock(&buf[0], next);
534 pending -= next;
535 }
536 return true;
537 }
538 return false;
539}
540
541
543{
544 bool busy = false;
545 access.wait();
546 if (ip != nullptr) {
547 busy = ip->isReplying();
548 }
549 access.post();
550 return busy;
551}
552
553
554void PortCoreInputUnit::envelopeReadCallback(void* data, const Bytes& envelope)
555{
556 auto* p = reinterpret_cast<PortCoreInputUnit*>(data);
557 if (p == nullptr) {
558 return;
559 }
560 p->getOwner().setEnvelope(envelope.get());
561 p->ip->setEnvelope(envelope.get());
562}
A simple abstraction for a block of bytes.
Definition: Bytes.h:24
const char * get() const
Definition: Bytes.cpp:27
ConnectionReader & modifyIncomingData(ConnectionReader &reader) override
Modify incoming payload data, if appropriate.
Definition: Carrier.cpp:59
bool acceptIncomingData(ConnectionReader &reader) override
Determine whether incoming data should be accepted.
Definition: Carrier.cpp:64
An interface for reading from a network connection.
virtual bool isTextMode() const =0
Check if the connection is text mode.
virtual size_t getSize() const =0
Checks how much data is available.
virtual bool expectBlock(char *data, size_t len)=0
Read a block of data from the network connection.
virtual Portable * getReference() const =0
Get a direct pointer to the object being sent, if possible.
virtual bool isActive() const =0
virtual bool acceptIncomingData(yarp::os::ConnectionReader &reader)=0
Determine whether incoming data should be accepted.
virtual void getCarrierParams(yarp::os::Property &params) const =0
Get carrier configuration and deliver it by port administrative commands.
virtual bool canEscape() const =0
Check if carrier can encode administrative messages, as opposed to just user data.
virtual yarp::os::ConnectionReader & modifyIncomingData(yarp::os::ConnectionReader &reader)=0
Modify incoming payload data, if appropriate.
virtual bool isPush() const =0
Check if carrier is "push" or "pull" style.
virtual void setCarrierParams(const yarp::os::Property &params)=0
Configure carrier from port administrative commands.
The input side of an active connection between two ports.
Definition: InputProtocol.h:32
virtual bool open(const std::string &name)=0
Start negotiating a carrier, using the given name as our own if a name is needed (this should general...
virtual bool isOk() const =0
Check if the connection is valid and can be used.
virtual OutputStream & getOutputStream()=0
Access the output stream associated with the connection.
virtual InputStream & getInputStream()=0
Access the input stream associated with the connection.
virtual void close()=0
Negotiate an end to operations.
virtual void setEnvelope(const std::string &str)=0
Set the envelope that will be attached to the next message.
virtual void endRead()=0
End the current read operation, begin by beginRead().
virtual const Route & getRoute() const =0
Get the route associated with this connection.
virtual ConnectionReader & beginRead()=0
Begin a read operation, with bytes read via the returned yarp::os::ConnectionReader object.
virtual void suppressReply()=0
Make sure that any attempt to send a reply to input will be denied.
virtual void interrupt()=0
Try to get operations interrupted.
virtual bool isReplying() const =0
virtual Connection & getReceiver()=0
It is possible to chain a basic connection with a modifier.
virtual OutputProtocol & getOutput()=0
Get an interface for doing write operations on the connection.
virtual Connection & getConnection()=0
Get the basic connection through which we are communicating.
Simple specification of the minimum functions needed from input streams.
Definition: InputStream.h:25
virtual bool setReadEnvelopeCallback(readEnvelopeCallbackType callback, void *data)
Install a callback that the InputStream will have to call when the envelope is read from a message in...
Simple abstraction for a YARP port name.
Definition: Name.h:18
bool isRooted() const
Check if port name begins with "/".
Definition: Name.cpp:16
The output side of an active connection between two ports.
virtual const Route & getRoute() const =0
virtual void rename(const Route &route)=0
Relabel the route after the fact (e.g.
Simple specification of the minimum functions needed from output streams.
Definition: OutputStream.h:21
Information about a port connection or event.
Definition: PortInfo.h:25
std::string targetName
Name of connection target, if any.
Definition: PortInfo.h:62
std::string carrierName
Name of protocol type, if releveant.
Definition: PortInfo.h:65
bool incoming
True if a connection is incoming, false if outgoing.
Definition: PortInfo.h:50
std::string message
A human-readable description of contents.
Definition: PortInfo.h:68
bool created
True if a connection is created, false if destroyed.
Definition: PortInfo.h:53
std::string portName
Name of port.
Definition: PortInfo.h:56
int tag
Type of information.
Definition: PortInfo.h:47
std::string sourceName
Name of connection source, if any.
Definition: PortInfo.h:59
@ PORTINFO_CONNECTION
Information about an incoming or outgoing connection.
Definition: PortInfo.h:39
A creator for readers.
virtual PortReader * create() const =0
Factory for PortReader objects.
virtual bool read(ConnectionReader &reader)=0
Read this object from a network connection.
A class for storing options and configuration information.
Definition: Property.h:33
Information about a connection between two ports.
Definition: Route.h:28
const std::string & getToName() const
Get the destination of the route.
Definition: Route.cpp:103
const std::string & getCarrierName() const
Get the carrier type of the route.
Definition: Route.cpp:123
std::string toString() const
Render a text form of the route, "source->carrier->dest".
Definition: Route.cpp:138
const std::string & getFromName() const
Get the source of the route.
Definition: Route.cpp:93
void swapNames()
Swap from and to names.
Definition: Route.cpp:133
void wait()
Decrement the counter, even if we must wait to do that.
Definition: Semaphore.cpp:96
void post()
Increment the counter.
Definition: Semaphore.cpp:111
A helper for creating cached object descriptions.
bool write(ConnectionWriter &connection) const override
Write this object to a network connection.
virtual void appendLine(const std::string &data)
Send a string along with a carriage-return-line-feed sequence.
Simple Readable and Writable object representing a command to a YARP port.
Definition: PortCommand.h:24
bool read(yarp::os::ConnectionReader &reader) override
Read this object from a network connection.
Definition: PortCommand.cpp:20
Manager for a single input to a port.
void run() override
The body of the thread associated with this input.
bool start() override
Start a thread running to serve this input.
void getCarrierParams(yarp::os::Property &params) override
void setCarrierParams(const yarp::os::Property &params) override
Set arbitrary parameters for this connection.
bool interrupt() override
Interrupt the connection.
This manages a single threaded resource related to a single input or output connection.
Definition: PortCoreUnit.h:25
void setMode()
Check the carrier used for the connection, and see if it has a "log" modifier.
Definition: PortCoreUnit.h:184
void setDoomed()
Request that this connection be shut down as soon as possible.
Definition: PortCoreUnit.h:95
bool readBlock(ConnectionReader &reader, void *id, yarp::os::OutputStream *os)
Read a block of regular payload data.
Definition: PortCore.cpp:1199
yarp::os::PortReaderCreator * getReadCreator()
Get the creator of callbacks.
Definition: PortCore.cpp:2996
void setEnvelope(const std::string &envelope)
Set some envelope information to pass along with a message without actually being part of the message...
Definition: PortCore.cpp:1457
bool removeIO(const Route &route, bool synch=false)
Remove any connection matching the supplied route.
Definition: PortCore.cpp:2961
void report(const yarp::os::PortInfo &info)
Handle a port event (connection, disconnection, etc) Generate a description of the connections associ...
Definition: PortCore.cpp:1185
void removeOutput(const std::string &dest, void *id, yarp::os::OutputStream *os)
Remove an output connection.
Definition: PortCore.cpp:1012
void reportUnit(PortCoreUnit *unit, bool active)
Called by a connection handler with active=true just after it is fully configured,...
Definition: PortCore.cpp:2889
void describe(void *id, yarp::os::OutputStream *os)
Produce a text description of the port and its connections.
Definition: PortCore.cpp:1048
bool adminBlock(ConnectionReader &reader, void *id)
Read a block of administrative data.
Definition: PortCore.cpp:1659
void removeInput(const std::string &src, void *id, yarp::os::OutputStream *os)
Remove an input connection.
Definition: PortCore.cpp:1030
yarp::os::impl::PortDataModifier & getPortModifier()
Definition: PortCore.cpp:3074
bool addOutput(const std::string &dest, void *id, yarp::os::OutputStream *os, bool onlyIfNeeded=false)
Add an output connection to this port.
Definition: PortCore.cpp:842
This is the heart of a yarp port.
Definition: PortCore.h:102
yarp::os::Carrier * inputModifier
Definition: PortCore.h:136
int join(double seconds=-1)
Definition: ThreadImpl.cpp:120
#define yCIAssert(component, id, x)
Definition: LogComponent.h:248
#define yCIInfo(component, id,...)
Definition: LogComponent.h:181
#define yCITrace(component, id,...)
Definition: LogComponent.h:94
#define yCIDebug(component, id,...)
Definition: LogComponent.h:138
#define YARP_OS_LOG_COMPONENT(name, name_string)
Definition: LogComponent.h:29
The components from which ports and connections are built.
An interface to the operating system, including Port based communication.
int getpid()
Portable wrapper for the getppid() function.
Definition: Os.cpp:91