38 recv_delegate(nullptr),
39 send_delegate(nullptr),
40 need_recv_delegate(false),
41 need_send_delegate(false),
42 recv_delegate_fail(false),
43 send_delegate_fail(false),
44 route(
"null",
"null",
"tcp"),
75 if (from.find(
' ') != std::string::npos) {
79 for (
size_t i = 1; i < b.
size(); i++) {
85 carrier.append(
"+").append(v.
toString());
96 if (recv_delegate ==
nullptr) {
98 if (b.
check(
"recv")) {
99 need_recv_delegate =
true;
104 if (send_delegate ==
nullptr) {
106 if (b.
check(
"send")) {
107 need_send_delegate =
true;
128 if (streams !=
nullptr) {
162 size_t start = carrier.find(
'+');
163 if (start != std::string::npos) {
165 for (
size_t i = start + 1; i < (size_t)carrier.length(); i++) {
166 char ch = carrier[i];
169 }
else if (ch ==
'.') {
195 if (delegate ==
nullptr) {
196 return nullConnection;
218 bool ok = expectHeader();
223 return respondToHeader();
231 if (delegate ==
nullptr) {
236 bool ok = sendHeader();
241 return expectReplyToHeader();
286 return !(!
checkStreams() || recv_delegate_fail || send_delegate_fail);
299 this->writer = &writer;
300 bool replied =
false;
301 yCAssert(PROTOCOL, delegate !=
nullptr);
303 bool ok = delegate->
write(*
this, writer);
306 if (
reply !=
nullptr) {
309 yCInfo(PROTOCOL,
"connection %s does not support replies (try \"tcp\" or \"text_ack\")",
getRoute().
toString().c_str());
314 replied =
reply->read(reader);
318 this->writer =
nullptr;
326 delegate->
reply(*
this, writer);
327 pendingReply =
false;
349 if (delegate !=
nullptr) {
398 if (recv_delegate ==
nullptr) {
399 return nullConnection;
401 return *recv_delegate;
425 if (send_delegate ==
nullptr) {
426 return nullConnection;
428 return *send_delegate;
432 bool Protocol::getRecvDelegate()
435 if (recv_delegate !=
nullptr) {
438 if (!need_recv_delegate) {
441 if (recv_delegate_fail) {
446 std::string tag = b.find(
"recv").asString();
448 if (recv_delegate ==
nullptr) {
449 fprintf(stderr,
"Need carrier \"%s\", but cannot find it.\n", tag.c_str());
450 recv_delegate_fail =
true;
455 fprintf(stderr,
"Carrier \"%s\" does not modify incoming data as expected.\n", tag.c_str());
456 recv_delegate_fail =
true;
462 fprintf(stderr,
"Carrier \"%s\" could not configure the send delegate.\n", tag.c_str());
463 recv_delegate_fail =
true;
471 bool Protocol::getSendDelegate()
474 if (send_delegate !=
nullptr) {
477 if (!need_send_delegate) {
480 if (send_delegate_fail) {
485 std::string tag = b.find(
"send").asString();
487 if (send_delegate ==
nullptr) {
488 fprintf(stderr,
"Need carrier \"%s\", but cannot find it.\n", tag.c_str());
489 send_delegate_fail =
true;
494 fprintf(stderr,
"Carrier \"%s\" does not modify outgoing data as expected.\n", tag.c_str());
495 send_delegate_fail =
true;
501 fprintf(stderr,
"Carrier \"%s\" could not configure the send delegate.\n", tag.c_str());
502 send_delegate_fail =
true;
510 bool Protocol::respondToHeader()
512 yCAssert(PROTOCOL, delegate !=
nullptr);
522 bool Protocol::expectAck()
524 yCAssert(PROTOCOL, delegate !=
nullptr);
532 void Protocol::closeHelper()
539 if (delegate !=
nullptr) {
544 if (recv_delegate !=
nullptr) {
545 recv_delegate->
close();
546 delete recv_delegate;
547 recv_delegate =
nullptr;
549 if (send_delegate !=
nullptr) {
550 send_delegate->
close();
551 delete send_delegate;
552 send_delegate =
nullptr;
557 bool Protocol::sendAck()
561 if (delegate ==
nullptr) {
573 bool Protocol::expectIndex()
586 if (delegate !=
nullptr) {
596 if (ref !=
nullptr) {
606 void Protocol::setCarrier(
const std::string& carrierNameBase)
610 std::string carrierName = carrierNameBase;
611 if (carrierNameBase.empty()) {
617 if (delegate ==
nullptr) {
619 if (delegate !=
nullptr) {
622 fprintf(stderr,
"Carrier \"%s\" cannot be used this way, try \"tcp+recv.%s\" instead.\n", carrierName.c_str(), carrierName.c_str());
629 fprintf(stderr,
"Carrier \"%s\" could not be configured.\n", carrierName.c_str());
639 bool Protocol::expectHeader()
648 bool ok = expectProtocolSpecifier();
652 ok = expectSenderSpecifier();
656 yCAssert(PROTOCOL, delegate !=
nullptr);
662 bool Protocol::expectProtocolSpecifier()
672 yCDebug(PROTOCOL,
"no connection");
675 if ((
size_t)len != header.length()) {
676 yCDebug(PROTOCOL,
"data stream died");
679 bool already =
false;
680 if (delegate !=
nullptr) {
687 if (delegate ==
nullptr) {
689 std::string msg =
"* Error. Protocol not found.\r\n* Hello. You appear to be trying to communicate with a YARP Port.\r\n* The first 8 bytes sent to a YARP Port are critical for identifying the\r\n* protocol you wish to speak.\r\n* The first 8 bytes you sent were not associated with any particular protocol.\r\n* If you are a human, try typing \"CONNECT foo\" followed by a <RETURN>.\r\n* The 8 bytes \"CONNECT \" correspond to a simple text-mode protocol.\r\n* Goodbye.\r\n";
695 if (delegate ==
nullptr) {
696 yCDebug(PROTOCOL,
"unrecognized protocol");
707 bool Protocol::expectSenderSpecifier()
709 yCAssert(PROTOCOL, delegate !=
nullptr);
714 bool Protocol::sendHeader()
716 yCAssert(PROTOCOL, delegate !=
nullptr);
721 bool Protocol::expectReplyToHeader()
723 yCAssert(PROTOCOL, delegate !=
nullptr);
728 bool Protocol::respondToIndex()
A simple collection of objects that can be described and transmitted in a portable way.
void append(const Bottle &alt)
Append the content of the given bottle to the current list.
size_type size() const
Gets the number of elements in the bottle.
Value & get(size_type index) const
Reads a Value v from a certain part of the list.
bool check(const std::string &key) const override
Check if there exists a property of the given name.
A simple abstraction for a block of bytes.
bool supportReply() const override=0
This flag is used by YARP to determine whether the connection can carry RPC traffic,...
virtual bool write(ConnectionState &proto, SizedWriter &writer)=0
Write a message.
virtual bool configure(ConnectionState &proto)
Give carrier a shot at looking at how the connection is set up.
virtual bool respondToHeader(ConnectionState &proto)=0
Respond to the header.
bool requireAck() const override=0
Check if carrier has flow control, requiring sent messages to be acknowledged by recipient.
bool isTextMode() const override=0
Check if carrier is textual in nature.
virtual bool expectSenderSpecifier(ConnectionState &proto)=0
Expect the name of the sending port.
virtual bool sendHeader(ConnectionState &proto)=0
Write a header appropriate to the carrier to the connection, followed by any carrier-specific data.
virtual void close()
Close the carrier.
virtual void setParameters(const Bytes &header)=0
Configure this carrier based on the first 8 bytes of the connection.
virtual bool expectExtraHeader(ConnectionState &proto)=0
Receive any carrier-specific header.
virtual bool expectIndex(ConnectionState &proto)=0
Expect a message header, if there is one for this carrier.
virtual bool expectAck(ConnectionState &proto)=0
Receive an acknowledgement, if expected for this carrier.
bool modifiesIncomingData() const override
Check if this carrier modifies incoming data through the Carrier::modifyIncomingData method.
virtual bool checkHeader(const Bytes &header)=0
Given the first 8 bytes received on a connection, decide if this is the right carrier type to use for...
virtual bool reply(ConnectionState &proto, SizedWriter &writer)
virtual bool expectReplyToHeader(ConnectionState &proto)=0
Process reply to header, if one is expected for this carrier.
virtual bool prepareSend(ConnectionState &proto)=0
Perform any initialization needed before writing on a connection.
virtual bool sendAck(ConnectionState &proto)=0
Send an acknowledgement, if needed for this carrier.
bool modifiesOutgoingData() const override
Check if this carrier modifies outgoing data through the Carrier::modifyOutgoingData method.
static Carrier * chooseCarrier(const std::string &name)
Select a carrier by name.
An interface for reading from a network connection.
InputStream & is()
Shorthand for getInputStream()
OutputStream & os()
Shorthand for getOutputStream()
A controller for an individual connection.
virtual bool isBareMode() const
Check if carrier excludes type information from payload.
virtual std::string getName() const =0
Get the name of this connection type ("tcp", "mcast", "shmem", ...)
The output side of an active connection between two ports.
Simple specification of the minimum functions needed from output streams.
virtual void flush()
Make sure all pending write operations are finished.
virtual bool setWriteTimeout(double timeout)
Set activity timeout.
virtual bool isOk() const =0
Check if the stream is ok or in an error state.
virtual void write(char ch)
Write a single byte to the stream.
Interface implemented by all objects that can read themselves from the network, such as Bottle object...
This is a base class for objects that can be both read from and be written to the YARP network.
Information about a connection between two ports.
const std::string & getCarrierName() const
Get the carrier type of the route.
void setToName(const std::string &toName)
Set the destination of the route.
const std::string & getFromName() const
Get the source of the route.
void setFromName(const std::string &fromName)
Set the source of the route.
void setCarrierName(const std::string &carrierName)
Set the carrier type of the route.
virtual void takeStream(TwoWayStream *stream)
Wrap the supplied stream.
bool isOk() const override
Check if the stream is ok or in an error state.
virtual TwoWayStream * giveStream()
Removes the wrapped stream and returns it.
InputStream & getInputStream() override
Get an InputStream to read from.
void close() override
Terminate the stream.
OutputStream & getOutputStream() override
Get an OutputStream to write to.
Minimal requirements for an efficient Writer.
virtual void stopWrite() const =0
Call when all writing is finished.
virtual PortReader * getReplyHandler()=0
A stream which can be asked to perform bidirectional communication.
virtual void endPacket()=0
Mark the end of a logical packet (see beginPacket).
virtual void beginPacket()=0
Mark the beginning of a logical packet.
A single value (typically within a Bottle).
virtual Bottle * asList() const
Get list value.
std::string toString() const override
Return a standard text representation of the content of the object.
yarp::os::ConnectionReader & beginRead() override
Begin a read operation, with bytes read via the returned yarp::os::ConnectionReader object.
std::string getSenderSpecifier() const override
Extract a name for the sender, if the connection type supports that.
bool isOk() const override
Check if the connection is valid and can be used.
OutputStream & getOutputStream() override
Access the output stream associated with this connection.
TwoWayStream * giveStreams() override
Take ownership of the streams associated with the connection.
void attachPort(yarp::os::Contactable *port) override
Set the port to be associated with the connection.
void endRead() override
End the current read operation, begin by beginRead().
bool setTimeout(double timeout) override
Set the timeout to be used for network operations.
Contactable * getContactable() const override
Get the port associated with the connection.
void rename(const Route &route) override
Relabel the route after the fact (e.g.
bool open(const std::string &name) override
Start negotiating a carrier, using the given name as our own if a name is needed (this should general...
void suppressReply() override
Make sure that any attempt to send a reply to input will be denied.
bool checkStreams() const override
Check whether streams are in a good state.
Connection & getReceiver() override
It is possible to chain a basic connection with a modifier.
bool isReplying() const override
const std::string & getEnvelope() const override
Read the envelope associated with the current message.
void beginWrite() override
Notify connection that we intend to write to it.
TwoWayStream & getStreams() override
Access the streams associated with the connection.
~Protocol() override
Destructor.
void setEnvelope(const std::string &str) override
Set the envelope that will be attached to the next message.
bool write(SizedWriter &writer) override
Write a message on the connection.
void takeStreams(TwoWayStream *streams) override
Provide streams to be used with the connection.
void interrupt() override
Try to get operations interrupted.
InputStream & getInputStream() override
Access the input stream associated with this connection.
void setReference(yarp::os::Portable *ref) override
Give a direct pointer to an object being sent on the connection.
void setRemainingLength(int len) override
Tell the connection that the given number of bytes are left to be read.
Connection & getConnection() override
Access the controller for this connection.
OutputProtocol & getOutput() override
Get an interface for doing write operations on the connection.
void close() override
Negotiate an end to operations.
Connection & getSender() override
It is possible to chain a basic connection with a modifier.
void reply(SizedWriter &writer) override
Reply to a message we have just read.
InputProtocol & getInput() override
Get an interface for doing read operations on the connection.
Protocol(TwoWayStream *stream)
Constructor.
const Route & getRoute() const override
Get the route associated with this connection.
void setRoute(const Route &route) override
Set the route associated with this connection.
yarp::os::Contact getRemoteContact() const override
Gets information about who is supplying the data being read, if that information is available.
void setProtocol(Protocol *protocol)
virtual void flushWriter()
void reset(yarp::os::InputStream &in, TwoWayStream *str, const Route &route, size_t len, bool textMode, bool bareMode=false)
virtual void setReference(yarp::os::Portable *obj)
std::string toString(const T &value)
convert an arbitrary type to string.
#define yCInfo(component,...)
#define yCAssert(component, x)
#define yCDebug(component,...)
#define YARP_OS_LOG_COMPONENT(name, name_string)
The components from which ports and connections are built.
An interface to the operating system, including Port based communication.