39 cachedWriter(nullptr),
40 cachedReader(nullptr),
41 cachedCallback(nullptr),
42 cachedTracker(nullptr)
44 yCAssert(PORTCOREOUTPUTUNIT, op !=
nullptr);
91 yCDebug(PORTCOREOUTPUTUNIT,
"waiting");
93 yCDebug(PORTCOREOUTPUTUNIT,
"woken");
96 yCDebug(PORTCOREOUTPUTUNIT,
"write something in background");
98 yCDebug(PORTCOREOUTPUTUNIT,
"wrote something in background");
100 if (cachedTracker !=
nullptr) {
101 void*
t = cachedTracker;
102 cachedTracker =
nullptr;
108 trackerMutex.unlock();
111 yCDebug(PORTCOREOUTPUTUNIT,
"wrote something in background");
113 yCDebug(PORTCOREOUTPUTUNIT,
"thread closing");
129 yCInfo(PORTCOREOUTPUTUNIT,
"%s", msg.c_str());
150 void PortCoreOutputUnit::closeBasic()
152 bool waitForOther =
false;
157 yCInfo(PORTCOREOUTPUTUNIT,
"output for route %s asking other side to close by out-of-band means",
170 waitForOther = op->
write(buf);
174 std::string msg = std::string(
"Removing output from ") + route.
getFromName() +
" to " + route.
getToName();
178 yCInfo(PORTCOREOUTPUTUNIT,
"%s", msg.c_str());
204 is.
read(dummy.bytes());
213 void PortCoreOutputUnit::closeMain()
219 yCDebug(PORTCOREOUTPUTUNIT,
"closing");
234 yCDebug(PORTCOREOUTPUTUNIT,
"internal join");
241 yCDebug(PORTCOREOUTPUTUNIT,
"closed");
255 bool PortCoreOutputUnit::sendHelper()
257 bool replied =
false;
262 if (cachedReader !=
nullptr) {
263 buf.setReplyHandler(*cachedReader);
270 return (done =
true);
282 yCError(PORTCOREOUTPUTUNIT,
"cast failed.");
287 yCAssert(PORTCOREOUTPUTUNIT, cachedWriter !=
nullptr);
288 bool ok = cachedWriter->
write(buf);
293 bool suppressReply = (buf.getReplyHandler() ==
nullptr);
297 if (!cachedEnvelope.empty()) {
303 if (!cachedEnvelope.empty()) {
304 if (cachedEnvelope ==
"__ADMIN") {
308 PortCommand pc(
'\0', std::string(suppressReply ?
"D " :
"d ") + cachedEnvelope);
321 replied = op->
write(buf);
331 if (buf.dropRequested()) {
350 const std::string& envelopeString,
355 bool replied =
false;
363 if (!waitBefore || !waitAfter) {
367 yCDebug(PORTCOREOUTPUTUNIT,
"starting a thread for output");
369 yCDebug(PORTCOREOUTPUTUNIT,
"started a thread for output");
373 if ((!waitBefore) && waitAfter) {
374 yCError(PORTCOREOUTPUTUNIT,
"chosen port wait combination not yet implemented");
377 cachedWriter = &writer;
378 cachedReader = reader;
379 cachedCallback = callback;
380 cachedEnvelope = envelopeString;
384 replied = sendHelper();
388 void* nextTracker = tracker;
389 tracker = cachedTracker;
390 cachedTracker = nextTracker;
392 trackerMutex.unlock();
395 yCDebug(PORTCOREOUTPUTUNIT,
"skipping connection tagged as sending something");
399 if (gotReply !=
nullptr) {
411 void* tracker =
nullptr;
414 tracker = cachedTracker;
415 cachedTracker =
nullptr;
417 trackerMutex.unlock();
virtual bool isLocal() const =0
Check if carrier operates within a single process.
virtual bool isTextMode() const =0
Check if carrier is textual in nature.
virtual void prepareDisconnect()=0
Do cleanup and preparation for the coming disconnect, if necessary.
virtual bool isConnectionless() const =0
Check if this carrier is connectionless (like udp, mcast) or connection based (like tcp).
virtual void getCarrierParams(yarp::os::Property ¶ms) const =0
Get carrier configuration and deliver it by port administrative commands.
virtual void handleEnvelope(const std::string &envelope)=0
Carriers that do not distinguish data from administrative headers (i.e.
virtual bool acceptOutgoingData(const PortWriter &writer)=0
Determine whether outgoing data should be accepted.
virtual bool modifiesReply() const =0
Check if this carrier modifies outgoing data through the Carrier::modifyReply method.
virtual PortReader & modifyReply(PortReader &reader)=0
Modify reply payload data, if appropriate.
virtual bool isActive() const =0
Check if carrier is alive and error free.
virtual bool isBareMode() const
Check if carrier excludes type information from payload.
virtual bool canEscape() const =0
Check if carrier can encode administrative messages, as opposed to just user data.
virtual const PortWriter & modifyOutgoingData(const PortWriter &writer)=0
Modify outgoing payload data, if appropriate.
virtual bool supportReply() const =0
This flag is used by YARP to determine whether the connection can carry RPC traffic,...
virtual bool modifiesOutgoingData() const =0
Check if this carrier modifies outgoing data through the Carrier::modifyOutgoingData method.
virtual bool isBroadcast() const =0
Check if this carrier uses a broadcast mechanism.
virtual void setCarrierParams(const yarp::os::Property ¶ms)=0
Configure carrier from port administrative commands.
An abstraction for a block of bytes, with optional responsibility for allocating/destroying that bloc...
Simple abstraction for a YARP port name.
bool isRooted() const
Check if port name begins with "/".
static int disconnectInput(const std::string &src, const std::string &dest, bool silent=false)
Sends a disconnection command to the specified port.
The output side of an active connection between two ports.
virtual const Route & getRoute() const =0
virtual Connection & getConnection()=0
Get the connection whose protocol operations we are managing.
virtual Connection & getSender()=0
It is possible to chain a basic connection with a modifier.
virtual InputStream & getInputStream()=0
Access the input stream associated with the connection.
virtual void close()=0
Negotiate an end to operations.
virtual void interrupt()=0
virtual void beginWrite()=0
Notify connection that we intend to write to it.
virtual bool isOk() const =0
Check if the connection is valid and can be used.
virtual bool write(SizedWriter &writer)=0
Write a message on the connection.
Information about a port connection or event.
std::string targetName
Name of connection target, if any.
std::string carrierName
Name of protocol type, if releveant.
bool incoming
True if a connection is incoming, false if outgoing.
std::string message
A human-readable description of contents.
bool created
True if a connection is created, false if destroyed.
std::string portName
Name of port.
int tag
Type of information.
std::string sourceName
Name of connection source, if any.
@ PORTINFO_CONNECTION
Information about an incoming or outgoing connection.
Interface implemented by all objects that can read themselves from the network, such as Bottle object...
Interface implemented by all objects that can write themselves to the network, such as Bottle objects...
virtual bool write(ConnectionWriter &writer) const =0
Write this object to a network connection.
This is a base class for objects that can be both read from and be written to the YARP network.
A class for storing options and configuration information.
Information about a connection between two ports.
const std::string & getToName() const
Get the destination of the route.
const std::string & getCarrierName() const
Get the carrier type of the route.
std::string toString() const
Render a text form of the route, "source->carrier->dest".
const std::string & getFromName() const
Get the source of the route.
void wait()
Decrement the counter, even if we must wait to do that.
void post()
Increment the counter.
A helper for creating cached object descriptions.
Simple Readable and Writable object representing a command to a YARP port.
OutputProtocol * getOutPutProtocol()
void getCarrierParams(yarp::os::Property ¶ms) override
void * takeTracker() override
Reacquire a tracker previously passed via send().
void setCarrierParams(const yarp::os::Property ¶ms) override
Set arbitrary parameters for this connection.
void * send(const yarp::os::PortWriter &writer, yarp::os::PortReader *reader, const yarp::os::PortWriter *callback, void *tracker, const std::string &envelopeString, bool waitAfter, bool waitBefore, bool *gotReply) override
Send a message on the connection.
~PortCoreOutputUnit() override
Destructor.
PortCoreOutputUnit(PortCore &owner, int index, OutputProtocol *op)
Constructor.
bool start() override
Prepare to serve this output.
void run() override
The body of a thread managing background sends.
virtual void runSingleThreaded()
Perform send operations without a separate thread.
Route getRoute() override
This manages a single threaded resource related to a single input or output connection.
void setMode()
Check the carrier used for the connection, and see if it has a "log" modifier.
void setDoomed()
Request that this connection be shut down as soon as possible.
void notifyCompletion(void *tracker)
Call the right onCompletion() after sending message.
void report(const yarp::os::PortInfo &info)
Handle a port event (connection, disconnection, etc) Generate a description of the connections associ...
void reportUnit(PortCoreUnit *unit, bool active)
Called by a connection handler with active=true just after it is fully configured,...
int join(double seconds=-1)
#define yCInfo(component,...)
#define yCError(component,...)
#define yCAssert(component, x)
#define yCDebug(component,...)
#define YARP_OS_LOG_COMPONENT(name, name_string)
The components from which ports and connections are built.
An interface to the operating system, including Port based communication.