YARP
Yet Another Robot Platform
Protocol.cpp
Go to the documentation of this file.
1 /*
2  * SPDX-FileCopyrightText: 2006-2021 Istituto Italiano di Tecnologia (IIT)
3  * SPDX-FileCopyrightText: 2006-2010 RobotCub Consortium
4  * SPDX-License-Identifier: BSD-3-Clause
5  */
6 
8 
9 #include <yarp/os/Bottle.h>
10 #include <yarp/os/Carrier.h>
11 #include <yarp/os/Carriers.h>
12 #include <yarp/os/NetType.h>
13 #include <yarp/os/Portable.h>
14 #include <yarp/os/ShiftStream.h>
15 #include <yarp/os/TwoWayStream.h>
18 
19 #include <cstdio>
20 #include <cstdlib>
21 #include <string>
22 
23 using namespace yarp::os::impl;
24 using namespace yarp::os;
25 
26 namespace {
27 YARP_OS_LOG_COMPONENT(PROTOCOL, "yarp.os.impl.Protocol")
28 } // namespace
29 
31  messageLen(0),
32  pendingAck(false),
33  active(true),
34  delegate(nullptr),
35  recv_delegate(nullptr),
36  send_delegate(nullptr),
37  need_recv_delegate(false),
38  need_send_delegate(false),
39  recv_delegate_fail(false),
40  send_delegate_fail(false),
41  route("null", "null", "tcp"),
42  writer(nullptr),
43  ref(nullptr),
44  envelope(""),
45  port(nullptr),
46  pendingReply(false)
47 {
48  // We start off with the streams used to contact the port that
49  // owns this connection.
50  shift.takeStream(stream);
51 
52  reader.setProtocol(this);
53  yCDebug(PROTOCOL, "Remote contact = %s", reader.getRemoteContact().toURI().c_str());
54 }
55 
56 
58 {
59  closeHelper();
60 }
61 
62 
63 void Protocol::setRoute(const Route& route)
64 {
65  Route r = route;
66 
67  // We reorganize the route to reduce variation in naming.
68  // If there are qualifiers in the source port name, propagate
69  // those qualifiers to the carrier.
70  std::string from = r.getFromName();
71  std::string carrier = r.getCarrierName();
72  if (from.find(' ') != std::string::npos) {
73  Bottle b(from);
74  if (b.size() > 1) {
75  r.setFromName(b.get(0).toString());
76  for (size_t i = 1; i < b.size(); i++) {
77  Value& v = b.get(i);
78  Bottle* lst = v.asList();
79  if (lst != nullptr) {
80  carrier.append("+").append(lst->get(0).toString()).append(".").append(lst->get(1).toString());
81  } else {
82  carrier.append("+").append(v.toString());
83  }
84  }
85  r.setCarrierName(carrier);
86  }
87  }
88 
89  // Record canonicalized route.
90  this->route = r;
91 
92  // Check if we have a receiver modifier.
93  if (recv_delegate == nullptr) {
95  if (b.check("recv")) {
96  need_recv_delegate = true;
97  }
98  }
99 
100  // Check if we have a sender modifier.
101  if (send_delegate == nullptr) {
103  if (b.check("send")) {
104  need_send_delegate = true;
105  }
106  }
107 }
108 
109 
110 const Route& Protocol::getRoute() const
111 {
112  return route;
113 }
114 
115 
117 {
118  return shift;
119 }
120 
121 
123 {
124  shift.takeStream(streams);
125  if (streams != nullptr) {
126  active = true;
127  }
128 }
129 
130 
132 {
133  return shift.giveStream();
134 }
135 
136 
138 {
139  return shift.isOk();
140 }
141 
142 
144 {
145  this->ref = ref;
146 }
147 
148 
149 std::string Protocol::getSenderSpecifier() const
150 {
151  Route r = getRoute();
152  // We pull the sender name from the route.
153  std::string from = r.getFromName();
154  // But we need to add any qualifiers looking in the carrier
155  // name. Ideally, we wouldn't need to bundle that in with
156  // the sender name, but we do it for now in the name of
157  // backwards compatibility.
158  std::string carrier = r.getCarrierName();
159  size_t start = carrier.find('+');
160  if (start != std::string::npos) {
161  from += " (";
162  for (size_t i = start + 1; i < (size_t)carrier.length(); i++) {
163  char ch = carrier[i];
164  if (ch == '+') {
165  from += ") (";
166  } else if (ch == '.') {
167  from += " ";
168  } else {
169  from += ch;
170  }
171  }
172  from += ")";
173  }
174  return from;
175 }
176 
177 
178 const std::string& Protocol::getEnvelope() const
179 {
180  return envelope;
181 }
182 
183 
185 {
186  messageLen = len;
187 }
188 
189 
191 {
192  if (delegate == nullptr) {
193  return nullConnection;
194  }
195  return *delegate;
196 }
197 
198 
200 {
201  return port;
202 }
203 
204 
205 bool Protocol::open(const std::string& name)
206 {
207  if (name.empty()) {
208  return false;
209  }
210  Route r = getRoute();
211  r.setToName(name);
212  setRoute(r);
213  // We are not the initiator of the connection, so we
214  // expect to receive a header (carrier-dependent).
215  bool ok = expectHeader();
216  if (!ok) {
217  return false;
218  }
219  // Respond to header (carrier-dependent).
220  return respondToHeader();
221 }
222 
223 
224 bool Protocol::open(const Route& route)
225 {
226  setRoute(route);
227  setCarrier(route.getCarrierName());
228  if (delegate == nullptr) {
229  return false;
230  }
231  // We are the initiator of the connection, so we
232  // send a header (carrier-dependent).
233  bool ok = sendHeader();
234  if (!ok) {
235  return false;
236  }
237  // Expect a resonse to the header (carrier-dependent).
238  return expectReplyToHeader();
239 }
240 
241 
243 {
244  closeHelper();
245 }
246 
247 
249 {
250  if (!active) {
251  return;
252  }
253  if (pendingAck) {
254  // Don't neglect to send one last acknowledgement if needed.
255  sendAck();
256  }
257  // Break the input stream.
258  shift.getInputStream().interrupt();
259  active = false;
260 }
261 
262 
264 {
265  return shift.getOutputStream();
266 }
267 
268 
270 {
271  return shift.getInputStream();
272 }
273 
274 
275 void Protocol::rename(const Route& route)
276 {
277  setRoute(route);
278 }
279 
280 
281 bool Protocol::isOk() const
282 {
283  return !(!checkStreams() || recv_delegate_fail || send_delegate_fail);
284 }
285 
286 
288 {
289  // End any current write.
290  writer.stopWrite();
291  // Skip if this connection is not active (e.g. when there are several
292  // logical mcast connections but only one write is actually needed).
293  if (!getConnection().isActive()) {
294  return false;
295  }
296  this->writer = &writer;
297  bool replied = false;
298  yCAssert(PROTOCOL, delegate != nullptr);
299  getStreams().beginPacket(); // Message begins.
300  bool ok = delegate->write(*this, writer);
301  getStreams().endPacket(); // Message ends.
302  PortReader* reply = writer.getReplyHandler();
303  if (reply != nullptr) {
304  if (!delegate->supportReply()) {
305  // We are expected to get a reply, but cannot.
306  yCInfo(PROTOCOL, "connection %s does not support replies (try \"tcp\" or \"text_ack\")", getRoute().toString().c_str());
307  }
308  if (ok) {
309  // Read reply.
310  reader.reset(is(), &getStreams(), getRoute(), messageLen, delegate->isTextMode(), delegate->isBareMode());
311  replied = reply->read(reader);
312  }
313  }
314  expectAck(); // Expect acknowledgement (carrier-specific).
315  this->writer = nullptr;
316  return replied;
317 }
318 
319 
321 {
322  writer.stopWrite();
323  delegate->reply(*this, writer);
324  pendingReply = false;
325 }
326 
327 
329 {
330  return *this;
331 }
332 
333 
335 {
336  return *this;
337 }
338 
339 
341 {
342  // We take care of reading the message index
343  // (carrier-specific preamble), then leave it
344  // up to caller to read the actual message payload.
345  getRecvDelegate();
346  if (delegate != nullptr) {
347  bool ok = false;
348  while (!ok) {
349  ok = expectIndex();
350  if (!ok) {
351  if (!is().isOk()) {
352  // Go ahead, we'll be shutting down, it'll
353  // work out.
354  ok = true;
355  }
356  }
357  }
358  respondToIndex();
359  }
360  return reader;
361 }
362 
363 
365 {
366  reader.flushWriter();
367  sendAck(); // acknowledge after reply (if there is one)
368 }
369 
370 
372 {
373  reader.suppressReply();
374 }
375 
376 
377 bool Protocol::setTimeout(double timeout)
378 {
379  bool ok = os().setWriteTimeout(timeout);
380  if (!ok) {
381  return false;
382  }
383  return is().setReadTimeout(timeout);
384 }
385 
386 
387 void Protocol::setEnvelope(const std::string& str)
388 {
389  envelope = str;
390 }
391 
392 
394 {
395  if (recv_delegate == nullptr) {
396  return nullConnection;
397  }
398  return *recv_delegate;
399 }
400 
401 
403 {
404  this->port = port;
405 }
406 
407 
409 {
410  return pendingReply;
411 }
412 
413 
415 {
416  getSendDelegate();
417 }
418 
419 
421 {
422  if (send_delegate == nullptr) {
423  return nullConnection;
424  }
425  return *send_delegate;
426 }
427 
428 
429 bool Protocol::getRecvDelegate()
430 {
431  // If we've already checked for a receiver modifier, return.
432  if (recv_delegate != nullptr) {
433  return true;
434  }
435  if (!need_recv_delegate) {
436  return true;
437  }
438  if (recv_delegate_fail) {
439  return false;
440  }
442  // Check for a "recv" qualifier.
443  std::string tag = b.find("recv").asString();
444  recv_delegate = Carriers::chooseCarrier(tag);
445  if (recv_delegate == nullptr) {
446  fprintf(stderr, "Need carrier \"%s\", but cannot find it.\n", tag.c_str());
447  recv_delegate_fail = true;
448  close();
449  return false;
450  }
451  if (!recv_delegate->modifiesIncomingData()) {
452  fprintf(stderr, "Carrier \"%s\" does not modify incoming data as expected.\n", tag.c_str());
453  recv_delegate_fail = true;
454  close();
455  return false;
456  }
457  // Configure the receiver modifier.
458  if (!recv_delegate->configure(*this)) {
459  fprintf(stderr, "Carrier \"%s\" could not configure the send delegate.\n", tag.c_str());
460  recv_delegate_fail = true;
461  close();
462  return false;
463  }
464  return true;
465 }
466 
467 
468 bool Protocol::getSendDelegate()
469 {
470  // If we've already checked for a sender modifier, return.
471  if (send_delegate != nullptr) {
472  return true;
473  }
474  if (!need_send_delegate) {
475  return true;
476  }
477  if (send_delegate_fail) {
478  return false;
479  }
481  // Check for a "send" qualifier.
482  std::string tag = b.find("send").asString();
483  send_delegate = Carriers::chooseCarrier(tag);
484  if (send_delegate == nullptr) {
485  fprintf(stderr, "Need carrier \"%s\", but cannot find it.\n", tag.c_str());
486  send_delegate_fail = true;
487  close();
488  return false;
489  }
490  if (!send_delegate->modifiesOutgoingData()) {
491  fprintf(stderr, "Carrier \"%s\" does not modify outgoing data as expected.\n", tag.c_str());
492  send_delegate_fail = true;
493  close();
494  return false;
495  }
496  // Configure the sender modifier.
497  if (!send_delegate->configure(*this)) {
498  fprintf(stderr, "Carrier \"%s\" could not configure the send delegate.\n", tag.c_str());
499  send_delegate_fail = true;
500  close();
501  return false;
502  }
503  return true;
504 }
505 
506 
507 bool Protocol::respondToHeader()
508 {
509  yCAssert(PROTOCOL, delegate != nullptr);
510  bool ok = delegate->respondToHeader(*this);
511  if (!ok) {
512  return false;
513  }
514  os().flush();
515  return os().isOk();
516 }
517 
518 
519 bool Protocol::expectAck()
520 {
521  yCAssert(PROTOCOL, delegate != nullptr);
522  if (delegate->requireAck()) {
523  return delegate->expectAck(*this);
524  }
525  return true;
526 }
527 
528 
529 void Protocol::closeHelper()
530 {
531  active = false;
532  if (pendingAck) {
533  sendAck();
534  }
535  shift.close();
536  if (delegate != nullptr) {
537  delegate->close();
538  delete delegate;
539  delegate = nullptr;
540  }
541  if (recv_delegate != nullptr) {
542  recv_delegate->close();
543  delete recv_delegate;
544  recv_delegate = nullptr;
545  }
546  if (send_delegate != nullptr) {
547  send_delegate->close();
548  delete send_delegate;
549  send_delegate = nullptr;
550  }
551 }
552 
553 
554 bool Protocol::sendAck()
555 {
556  bool ok = true;
557  pendingAck = false;
558  if (delegate == nullptr) {
559  return false;
560  }
561  if (delegate->requireAck()) {
562  ok = delegate->sendAck(*this);
563  os().flush();
564  }
565  getStreams().endPacket();
566  return ok;
567 }
568 
569 
570 bool Protocol::expectIndex()
571 {
572  // We'll eventually need to send an acknowledgement
573  // (if the carrier in use requires that).
574  pendingAck = true;
575  messageLen = 0;
576  // This is where a message can be considered to begin.
577  // If things go wrong on an unreliable carrier (e.g. on
578  // udp), we should skip to the beginning of the next
579  // message, as marked by this call.
581  ref = nullptr;
582  bool ok = false;
583  if (delegate != nullptr) {
584  // What we actually do here is carrier-specific.
585  // Perhaps we do nothing at all.
586  ok = delegate->expectIndex(*this);
587  }
588  if (ok) {
589  // Set up a reader for the user payload.
590  reader.reset(is(), &getStreams(), getRoute(), messageLen, delegate->isTextMode(), delegate->isBareMode());
591  // Pass on a reference to the object being
592  // send, if we know it, for local connections.
593  if (ref != nullptr) {
594  reader.setReference(ref);
595  }
596  } else {
597  reader.reset(is(), &getStreams(), getRoute(), 0, false);
598  }
599  return ok;
600 }
601 
602 
603 void Protocol::setCarrier(const std::string& carrierNameBase)
604 {
605  // Set up the carrier for this connection. The carrier
606  // has all the protocol-specific behavior.
607  std::string carrierName = carrierNameBase;
608  if (carrierNameBase.empty()) {
609  carrierName = "tcp";
610  }
611  Route route = getRoute();
612  route.setCarrierName(carrierName);
613  setRoute(route);
614  if (delegate == nullptr) {
615  delegate = Carriers::chooseCarrier(carrierName);
616  if (delegate != nullptr) {
617  if (delegate->modifiesIncomingData()) {
618  if (active) {
619  fprintf(stderr, "Carrier \"%s\" cannot be used this way, try \"tcp+recv.%s\" instead.\n", carrierName.c_str(), carrierName.c_str());
620  }
621  close();
622  return;
623  }
624  // Configure the carrier.
625  if (!delegate->configure(*this)) {
626  fprintf(stderr, "Carrier \"%s\" could not be configured.\n", carrierName.c_str());
627  close();
628  return;
629  }
630  delegate->prepareSend(*this);
631  }
632  }
633 }
634 
635 
636 bool Protocol::expectHeader()
637 {
638  // A header, for historic reasons, is seen as
639  // a protocol fingerprint (at least 8 bytes)
640  // and the name of the sender. In practice,
641  // these callbacks have been stretched to the
642  // point where their names have little bearing
643  // on what the get used for.
644  messageLen = 0;
645  bool ok = expectProtocolSpecifier();
646  if (!ok) {
647  return false;
648  }
649  ok = expectSenderSpecifier();
650  if (!ok) {
651  return false;
652  }
653  yCAssert(PROTOCOL, delegate != nullptr);
654  ok = delegate->expectExtraHeader(*this);
655  return ok;
656 }
657 
658 
659 bool Protocol::expectProtocolSpecifier()
660 {
661  // Historically YARP has used the first 8 bytes of
662  // every connection as a way to identify it. This
663  // assumption is showing its age, and should really
664  // be generalized.
665  char buf[8];
666  yarp::os::Bytes header((char*)&buf[0], sizeof(buf));
667  yarp::conf::ssize_t len = is().readFull(header);
668  if (len == -1) {
669  yCDebug(PROTOCOL, "no connection");
670  return false;
671  }
672  if ((size_t)len != header.length()) {
673  yCDebug(PROTOCOL, "data stream died");
674  return false;
675  }
676  bool already = false;
677  if (delegate != nullptr) {
678  if (delegate->checkHeader(header)) {
679  already = true;
680  }
681  }
682  if (!already) {
683  delegate = Carriers::chooseCarrier(header);
684  if (delegate == nullptr) {
685  // Carrier not found; send a human-readable message.
686  std::string msg = "* Error. Protocol not found.\r\n* Hello. You appear to be trying to communicate with a YARP Port.\r\n* The first 8 bytes sent to a YARP Port are critical for identifying the\r\n* protocol you wish to speak.\r\n* The first 8 bytes you sent were not associated with any particular protocol.\r\n* If you are a human, try typing \"CONNECT foo\" followed by a <RETURN>.\r\n* The 8 bytes \"CONNECT \" correspond to a simple text-mode protocol.\r\n* Goodbye.\r\n";
687  yarp::os::Bytes b((char*)msg.c_str(), msg.length());
688  os().write(b);
689  os().flush();
690  }
691  }
692  if (delegate == nullptr) {
693  yCDebug(PROTOCOL, "unrecognized protocol");
694  return false;
695  }
696  Route r = getRoute();
697  r.setCarrierName(delegate->getName());
698  setRoute(r);
699  delegate->setParameters(header);
700  return true;
701 }
702 
703 
704 bool Protocol::expectSenderSpecifier()
705 {
706  yCAssert(PROTOCOL, delegate != nullptr);
707  return delegate->expectSenderSpecifier(*this);
708 }
709 
710 
711 bool Protocol::sendHeader()
712 {
713  yCAssert(PROTOCOL, delegate != nullptr);
714  return delegate->sendHeader(*this);
715 }
716 
717 
718 bool Protocol::expectReplyToHeader()
719 {
720  yCAssert(PROTOCOL, delegate != nullptr);
721  return delegate->expectReplyToHeader(*this);
722 }
723 
724 
725 bool Protocol::respondToIndex()
726 {
727  return true;
728 }
A simple collection of objects that can be described and transmitted in a portable way.
Definition: Bottle.h:74
void append(const Bottle &alt)
Append the content of the given bottle to the current list.
Definition: Bottle.cpp:380
size_type size() const
Gets the number of elements in the bottle.
Definition: Bottle.cpp:251
Value & get(size_type index) const
Reads a Value v from a certain part of the list.
Definition: Bottle.cpp:246
bool check(const std::string &key) const override
Check if there exists a property of the given name.
Definition: Bottle.cpp:277
A simple abstraction for a block of bytes.
Definition: Bytes.h:25
bool supportReply() const override=0
This flag is used by YARP to determine whether the connection can carry RPC traffic,...
virtual bool write(ConnectionState &proto, SizedWriter &writer)=0
Write a message.
virtual bool configure(ConnectionState &proto)
Give carrier a shot at looking at how the connection is set up.
Definition: Carrier.cpp:109
virtual bool respondToHeader(ConnectionState &proto)=0
Respond to the header.
bool requireAck() const override=0
Check if carrier has flow control, requiring sent messages to be acknowledged by recipient.
bool isTextMode() const override=0
Check if carrier is textual in nature.
virtual bool expectSenderSpecifier(ConnectionState &proto)=0
Expect the name of the sending port.
virtual bool sendHeader(ConnectionState &proto)=0
Write a header appropriate to the carrier to the connection, followed by any carrier-specific data.
virtual void close()
Close the carrier.
Definition: Carrier.cpp:91
virtual void setParameters(const Bytes &header)=0
Configure this carrier based on the first 8 bytes of the connection.
virtual bool expectExtraHeader(ConnectionState &proto)=0
Receive any carrier-specific header.
virtual bool expectIndex(ConnectionState &proto)=0
Expect a message header, if there is one for this carrier.
virtual bool expectAck(ConnectionState &proto)=0
Receive an acknowledgement, if expected for this carrier.
bool modifiesIncomingData() const override
Check if this carrier modifies incoming data through the Carrier::modifyIncomingData method.
Definition: Carrier.cpp:53
virtual bool checkHeader(const Bytes &header)=0
Given the first 8 bytes received on a connection, decide if this is the right carrier type to use for...
virtual bool reply(ConnectionState &proto, SizedWriter &writer)
Definition: Carrier.cpp:28
virtual bool expectReplyToHeader(ConnectionState &proto)=0
Process reply to header, if one is expected for this carrier.
virtual bool prepareSend(ConnectionState &proto)=0
Perform any initialization needed before writing on a connection.
virtual bool sendAck(ConnectionState &proto)=0
Send an acknowledgement, if needed for this carrier.
bool modifiesOutgoingData() const override
Check if this carrier modifies outgoing data through the Carrier::modifyOutgoingData method.
Definition: Carrier.cpp:71
static Carrier * chooseCarrier(const std::string &name)
Select a carrier by name.
Definition: Carriers.cpp:233
An interface for reading from a network connection.
InputStream & is()
Shorthand for getInputStream()
OutputStream & os()
Shorthand for getOutputStream()
A controller for an individual connection.
Definition: Connection.h:27
virtual bool isBareMode() const
Check if carrier excludes type information from payload.
Definition: Connection.cpp:17
virtual std::string getName() const =0
Get the name of this connection type ("tcp", "mcast", "shmem", ...)
std::string toURI(bool includeCarrier=true) const
Get a representation of the Contact as a URI.
Definition: Contact.cpp:313
An abstract port.
Definition: Contactable.h:35
The input side of an active connection between two ports.
Definition: InputProtocol.h:35
Simple specification of the minimum functions needed from input streams.
Definition: InputStream.h:26
yarp::conf::ssize_t readFull(Bytes &b)
Keep reading until buffer is full.
Definition: InputStream.cpp:96
virtual bool setReadTimeout(double timeout)
Set activity timeout.
Definition: InputStream.cpp:46
virtual void interrupt()
Interrupt the stream.
Definition: InputStream.cpp:42
The output side of an active connection between two ports.
Simple specification of the minimum functions needed from output streams.
Definition: OutputStream.h:22
virtual void flush()
Make sure all pending write operations are finished.
virtual bool setWriteTimeout(double timeout)
Set activity timeout.
virtual bool isOk() const =0
Check if the stream is ok or in an error state.
virtual void write(char ch)
Write a single byte to the stream.
Interface implemented by all objects that can read themselves from the network, such as Bottle object...
Definition: PortReader.h:25
This is a base class for objects that can be both read from and be written to the YARP network.
Definition: Portable.h:26
Information about a connection between two ports.
Definition: Route.h:29
const std::string & getCarrierName() const
Get the carrier type of the route.
Definition: Route.cpp:123
void setToName(const std::string &toName)
Set the destination of the route.
Definition: Route.cpp:108
const std::string & getFromName() const
Get the source of the route.
Definition: Route.cpp:93
void setFromName(const std::string &fromName)
Set the source of the route.
Definition: Route.cpp:98
void setCarrierName(const std::string &carrierName)
Set the carrier type of the route.
Definition: Route.cpp:128
virtual void takeStream(TwoWayStream *stream)
Wrap the supplied stream.
Definition: ShiftStream.cpp:94
bool isOk() const override
Check if the stream is ok or in an error state.
virtual TwoWayStream * giveStream()
Removes the wrapped stream and returns it.
InputStream & getInputStream() override
Get an InputStream to read from.
Definition: ShiftStream.cpp:57
void close() override
Terminate the stream.
Definition: ShiftStream.cpp:89
OutputStream & getOutputStream() override
Get an OutputStream to write to.
Definition: ShiftStream.cpp:66
Minimal requirements for an efficient Writer.
Definition: SizedWriter.h:33
virtual void stopWrite() const =0
Call when all writing is finished.
virtual PortReader * getReplyHandler()=0
A stream which can be asked to perform bidirectional communication.
Definition: TwoWayStream.h:26
virtual void endPacket()=0
Mark the end of a logical packet (see beginPacket).
virtual void beginPacket()=0
Mark the beginning of a logical packet.
A single value (typically within a Bottle).
Definition: Value.h:45
virtual Bottle * asList() const
Get list value.
Definition: Value.cpp:240
std::string toString() const override
Return a standard text representation of the content of the object.
Definition: Value.cpp:356
yarp::os::ConnectionReader & beginRead() override
Begin a read operation, with bytes read via the returned yarp::os::ConnectionReader object.
Definition: Protocol.cpp:340
std::string getSenderSpecifier() const override
Extract a name for the sender, if the connection type supports that.
Definition: Protocol.cpp:149
bool isOk() const override
Check if the connection is valid and can be used.
Definition: Protocol.cpp:281
OutputStream & getOutputStream() override
Access the output stream associated with this connection.
Definition: Protocol.cpp:263
TwoWayStream * giveStreams() override
Take ownership of the streams associated with the connection.
Definition: Protocol.cpp:131
void attachPort(yarp::os::Contactable *port) override
Set the port to be associated with the connection.
Definition: Protocol.cpp:402
void endRead() override
End the current read operation, begin by beginRead().
Definition: Protocol.cpp:364
bool setTimeout(double timeout) override
Set the timeout to be used for network operations.
Definition: Protocol.cpp:377
Contactable * getContactable() const override
Get the port associated with the connection.
Definition: Protocol.cpp:199
void rename(const Route &route) override
Relabel the route after the fact (e.g.
Definition: Protocol.cpp:275
bool open(const std::string &name) override
Start negotiating a carrier, using the given name as our own if a name is needed (this should general...
Definition: Protocol.cpp:205
void suppressReply() override
Make sure that any attempt to send a reply to input will be denied.
Definition: Protocol.cpp:371
bool checkStreams() const override
Check whether streams are in a good state.
Definition: Protocol.cpp:137
Connection & getReceiver() override
It is possible to chain a basic connection with a modifier.
Definition: Protocol.cpp:393
bool isReplying() const override
Definition: Protocol.cpp:408
const std::string & getEnvelope() const override
Read the envelope associated with the current message.
Definition: Protocol.cpp:178
void beginWrite() override
Notify connection that we intend to write to it.
Definition: Protocol.cpp:414
TwoWayStream & getStreams() override
Access the streams associated with the connection.
Definition: Protocol.cpp:116
~Protocol() override
Destructor.
Definition: Protocol.cpp:57
void setEnvelope(const std::string &str) override
Set the envelope that will be attached to the next message.
Definition: Protocol.cpp:387
bool write(SizedWriter &writer) override
Write a message on the connection.
Definition: Protocol.cpp:287
void takeStreams(TwoWayStream *streams) override
Provide streams to be used with the connection.
Definition: Protocol.cpp:122
void interrupt() override
Try to get operations interrupted.
Definition: Protocol.cpp:248
InputStream & getInputStream() override
Access the input stream associated with this connection.
Definition: Protocol.cpp:269
void setReference(yarp::os::Portable *ref) override
Give a direct pointer to an object being sent on the connection.
Definition: Protocol.cpp:143
void setRemainingLength(int len) override
Tell the connection that the given number of bytes are left to be read.
Definition: Protocol.cpp:184
Connection & getConnection() override
Access the controller for this connection.
Definition: Protocol.cpp:190
OutputProtocol & getOutput() override
Get an interface for doing write operations on the connection.
Definition: Protocol.cpp:328
void close() override
Negotiate an end to operations.
Definition: Protocol.cpp:242
Connection & getSender() override
It is possible to chain a basic connection with a modifier.
Definition: Protocol.cpp:420
void reply(SizedWriter &writer) override
Reply to a message we have just read.
Definition: Protocol.cpp:320
InputProtocol & getInput() override
Get an interface for doing read operations on the connection.
Definition: Protocol.cpp:334
Protocol(TwoWayStream *stream)
Constructor.
Definition: Protocol.cpp:30
const Route & getRoute() const override
Get the route associated with this connection.
Definition: Protocol.cpp:110
void setRoute(const Route &route) override
Set the route associated with this connection.
Definition: Protocol.cpp:63
yarp::os::Contact getRemoteContact() const override
Gets information about who is supplying the data being read, if that information is available.
void reset(yarp::os::InputStream &in, TwoWayStream *str, const Route &route, size_t len, bool textMode, bool bareMode=false)
virtual void setReference(yarp::os::Portable *obj)
std::string toString(const T &value)
convert an arbitrary type to string.
#define yCInfo(component,...)
Definition: LogComponent.h:132
#define yCAssert(component, x)
Definition: LogComponent.h:169
#define yCDebug(component,...)
Definition: LogComponent.h:109
#define YARP_OS_LOG_COMPONENT(name, name_string)
Definition: LogComponent.h:35
::ssize_t ssize_t
Definition: numeric.h:86
The components from which ports and connections are built.
An interface to the operating system, including Port based communication.