YARP
Yet Another Robot Platform
TcpRosStream.cpp
Go to the documentation of this file.
1 /*
2  * SPDX-FileCopyrightText: 2006-2021 Istituto Italiano di Tecnologia (IIT)
3  * SPDX-License-Identifier: BSD-3-Clause
4  */
5 
6 #include "TcpRosStream.h"
7 #include "TcpRosLogComponent.h"
8 
9 #include <cstdio>
10 #include <cstring>
11 #include <cstdlib>
12 #include <yarp/os/NetType.h>
13 #include <yarp/os/NetInt32.h>
14 #include <yarp/os/Bottle.h>
15 #include <yarp/os/Port.h>
16 
18 
19 using namespace yarp::os;
20 using namespace yarp::wire_rep_utils;
21 
23  if (!setInitiative) {
24  initiative = false;
25  setInitiative = true;
26  }
27 
28  if (kind!="") {
29  return twiddlerReader.read(b);
30  }
31 
32  if (phase == -1) {
33  return -1;
34  }
35  if (remaining==0) {
36  if (phase==1) {
37  phase = 2;
38  if (scan.length()>0) {
39  cursor = scan.get();
40  remaining = scan.length();
41  } else {
42  cursor = nullptr;
43  remaining = header.blobLen;
44  }
45  } else {
46  scan.clear();
47  phase = 0;
48  }
49  }
50  if (phase==0) {
51  if (expectTwiddle&&initiative) {
52  // There's a success/failure byte to check.
53  // Here we just consume it, but if it shows failure
54  // we should in fact expect a string afterwards.
55  char twiddle[1];
56  Bytes twiddle_buf(twiddle,1);
57  delegate->getInputStream().readFull(twiddle_buf);
58  }
59 
60  char mlen[4];
61  Bytes mlen_buf(mlen,4);
62  int res = delegate->getInputStream().readFull(mlen_buf);
63  if (res<4) {
64  yCTrace(TCPROSCARRIER, "tcpros_carrier failed, %s %d\n", __FILE__, __LINE__);
65  phase = -1;
66  return -1;
67  }
68  int len = NetType::netInt(mlen_buf);
69  yCTrace(TCPROSCARRIER, "Unit length %d\n", len);
70 
71  // inhibit type scanning for now, it is unreliable
72  if (raw == -1) {
73  raw = 2;
74  }
75  if (raw==-2) {
76  scan.allocate(len);
77  int res = delegate->getInputStream().readFull(scan.bytes());
78  yCTrace(TCPROSCARRIER, "Read %d bytes with raw==-2\n", res);
79  if (res<0) {
80  yCTrace(TCPROSCARRIER, "tcpros_carrier failed, %s %d\n", __FILE__, __LINE__);
81  phase = -1;
82  return -1;
83  }
84  int len_scan = scan.length();
85  if (WireBottle::checkBottle(scan.get(),len_scan)) {
86  yCTrace(TCPROSCARRIER, "Looks YARP-compatible\n");
87  raw = 1;
88  } else {
89  yCTrace(TCPROSCARRIER, "Looks strange, blobbing...\n");
90  raw = 0;
91  }
92  }
93 
94  header.init(len);
95  if (raw==1) {
96  phase = 2;
97  if (scan.length()>0) {
98  cursor = scan.get();
99  remaining = scan.length();
100  } else {
101  cursor = nullptr;
102  remaining = header.blobLen;
103  }
104  } else if (raw==2) {
105  cursor = nullptr;
106  remaining = header.blobLen;
107  phase = 2;
108  } else {
109  phase = 1;
110  cursor = (char*) &header;
111  remaining = sizeof(header);
112  }
113  }
114  yCTrace(TCPROSCARRIER, "phase %d remaining %d\n", phase, remaining);
115  if (remaining>0) {
116  if (cursor!=nullptr) {
117  int allow = remaining;
118  if ((int)b.length()<allow) {
119  allow = b.length();
120  }
121  memcpy(b.get(),cursor,allow);
122  cursor+=allow;
123  remaining-=allow;
124  yCTrace(TCPROSCARRIER, "%d bytes of header\n", allow);
125  return allow;
126  } else {
127  int result = delegate->getInputStream().read(b);
128  yCTrace(TCPROSCARRIER, "Read %d bytes\n", result);
129  if (result>0) {
130  remaining-=result;
131  yCTrace(TCPROSCARRIER, "%d bytes of meat\n", result);
132  return result;
133  }
134  }
135  }
136  phase = -1;
137  return -1;
138 }
139 
140 
141 void TcpRosStream::write(const Bytes& b) {
142  if (!setInitiative) {
143  initiative = true;
144  setInitiative = true;
145  }
146  yCTrace(TCPROSCARRIER, " [[[[[ write %d bytes ]]]]]\n", (int)b.length());
147  delegate->getOutputStream().write(b);
148 }
149 
150 
151 void TcpRosStream::updateKind(const char *kind, bool sender, bool reply) {
152  std::string code = rosToKind(kind);
153  if (code!="") {
154  configureTwiddler(twiddler,code.c_str(),kind,sender,reply);
155  this->kind = code;
156  } else {
157  this->kind = "";
158  }
159 }
160 
161 
162 std::map<std::string, std::string> TcpRosStream::rosToKind() {
163  std::map<std::string, std::string> kinds;
164  kinds["std_msgs/String"] = "vector string 1 *";
165  kinds["std_msgs/Int32"] = "vector int32 1 *";
166  kinds["std_msgs/Float64"] = "vector float64 1 *";
167 
168  // these two are specialized, TODO link them specifically to
169  // yarp/image and yarp/vector
170  kinds["sensor_msgs/Image"] = "list 4 skip uint32 * skip uint32 * skip uint32 * skip string * >height uint32 * >width uint32 * >encoding string * skip int8 * >step int32 * compute image_params <=[mat] vocab * <translated_encoding vocab * item_vector int32 5 <depth item * <img_size item * <quantum item * <width item * <height item * blob *";
171 
172  kinds["test_roscpp/TestStringString"] = "vector string 1 * --- vector string 1 *";
173  // kinds["rospy_tutorials/AddTwoInts"] = "vector int64 2 * --- vector int64 1 *";
174  return kinds;
175 }
176 
177 std::string TcpRosStream::rosToKind(const char *rosname) {
178  if (std::string(rosname) == "") {
179  return {};
180  };
181  std::map<std::string, std::string> kinds = rosToKind();
182 
183  if (kinds.find(rosname)!=kinds.end()) {
184  return kinds[rosname];
185  }
186  Port port;
187  port.openFake("yarpidl_rosmsg");
188  if (port.addOutput("/typ")) {
189  Bottle cmd, resp;
190  cmd.addString(std::string("twiddle ") + rosname);
191  yCTrace(TCPROSCARRIER, "QUERY yarpidl_rosmsg %s\n", cmd.toString().c_str());
192  port.write(cmd,resp);
193  yCTrace(TCPROSCARRIER, "GOT yarpidl_rosmsg %s\n", resp.toString().c_str());
194  std::string txt = resp.get(0).asString();
195  if (txt != "?") {
196  return txt;
197  }
198  }
199  port.close();
200  if (std::string(rosname)!="") {
201  yCError(TCPROSCARRIER, "Do not know anything about type '%s'\n", rosname);
202  yCError(TCPROSCARRIER, "Could not connect to a type server to look up type '%s'\n", rosname);
203  ::exit(1);
204  }
205  return {};
206 }
207 
208 
209 bool TcpRosStream::configureTwiddler(WireTwiddler& twiddler, const char *txt, const char *prompt, bool sender, bool reply) {
210  yCTrace(TCPROSCARRIER, "CONFIGURE AS %s [%s/%s]\n", txt,
211  sender?"sender":"receiver",
212  reply?"reply":"main");
213  std::string str(txt);
214  if (reply) {
215  size_t idx = str.find("---");
216  if (idx!=std::string::npos) {
217  str = str.substr(idx+3,str.length());
218  }
219  }
220  str = std::string("skip int32 * ") + str;
221  if (reply) {
222  str = std::string("skip int8 * ") + str;
223  }
224  return twiddler.configure(str.c_str(),prompt);
225 }
const yarp::os::LogComponent & TCPROSCARRIER()
static char * checkBottle(char *cursor, int &remaining, int ct, int list_tag)
Definition: WireBottle.cpp:22
static std::map< std::string, std::string > rosToKind()
static bool configureTwiddler(yarp::wire_rep_utils::WireTwiddler &twiddler, const char *txt, const char *prompt, bool sender, bool reply)
void write(const yarp::os::Bytes &b) override
Write a block of bytes to the stream.
void updateKind(const char *kind, bool sender, bool reply)
A simple collection of objects that can be described and transmitted in a portable way.
Definition: Bottle.h:74
Value & get(size_type index) const
Reads a Value v from a certain part of the list.
Definition: Bottle.cpp:246
void addString(const char *str)
Places a string in the bottle, at the end of the list.
Definition: Bottle.cpp:170
std::string toString() const override
Gives a human-readable textual representation of the bottle.
Definition: Bottle.cpp:211
A simple abstraction for a block of bytes.
Definition: Bytes.h:25
size_t length() const
Definition: Bytes.cpp:22
const char * get() const
Definition: Bytes.cpp:27
virtual int read()
Read and return a single byte.
Definition: InputStream.cpp:20
static int netInt(const yarp::os::Bytes &code)
Definition: NetType.cpp:27
A mini-server for network communication.
Definition: Port.h:47
bool write(const PortWriter &writer, const PortWriter *callback=nullptr) const override
Write an object to the port.
Definition: Port.cpp:427
bool addOutput(const std::string &name) override
Add an output connection to the specified port.
Definition: Port.cpp:344
bool openFake(const std::string &name)
Start port without making it accessible from the network.
Definition: Port.cpp:74
void close() override
Stop port activity.
Definition: Port.cpp:354
virtual std::string asString() const
Get string value.
Definition: Value.cpp:234
bool configure(const char *txt, const char *prompt)
#define yCError(component,...)
Definition: LogComponent.h:154
#define yCTrace(component,...)
Definition: LogComponent.h:85
::ssize_t ssize_t
Definition: numeric.h:86
An interface to the operating system, including Port based communication.