YARP
Yet Another Robot Platform
TcpRosStream.cpp
Go to the documentation of this file.
1/*
2 * SPDX-FileCopyrightText: 2006-2021 Istituto Italiano di Tecnologia (IIT)
3 * SPDX-License-Identifier: BSD-3-Clause
4 */
5
6#include "TcpRosStream.h"
8
9#include <cstdio>
10#include <cstring>
11#include <cstdlib>
12#include <yarp/os/NetType.h>
13#include <yarp/os/NetInt32.h>
14#include <yarp/os/Bottle.h>
15#include <yarp/os/Port.h>
16
18
19using namespace yarp::os;
20using namespace yarp::wire_rep_utils;
21
23 if (!setInitiative) {
24 initiative = false;
25 setInitiative = true;
26 }
27
28 if (kind!="") {
29 return twiddlerReader.read(b);
30 }
31
32 if (phase == -1) {
33 return -1;
34 }
35 if (remaining==0) {
36 if (phase==1) {
37 phase = 2;
38 if (scan.length()>0) {
39 cursor = scan.get();
40 remaining = scan.length();
41 } else {
42 cursor = nullptr;
43 remaining = header.blobLen;
44 }
45 } else {
46 scan.clear();
47 phase = 0;
48 }
49 }
50 if (phase==0) {
51 if (expectTwiddle&&initiative) {
52 // There's a success/failure byte to check.
53 // Here we just consume it, but if it shows failure
54 // we should in fact expect a string afterwards.
55 char twiddle[1];
56 Bytes twiddle_buf(twiddle,1);
57 delegate->getInputStream().readFull(twiddle_buf);
58 }
59
60 char mlen[4];
61 Bytes mlen_buf(mlen,4);
62 int res = delegate->getInputStream().readFull(mlen_buf);
63 if (res<4) {
64 yCTrace(TCPROSCARRIER, "tcpros_carrier failed, %s %d\n", __FILE__, __LINE__);
65 phase = -1;
66 return -1;
67 }
68 int len = NetType::netInt(mlen_buf);
69 yCTrace(TCPROSCARRIER, "Unit length %d\n", len);
70
71 // inhibit type scanning for now, it is unreliable
72 if (raw == -1) {
73 raw = 2;
74 }
75 if (raw==-2) {
76 scan.allocate(len);
77 int res = delegate->getInputStream().readFull(scan.bytes());
78 yCTrace(TCPROSCARRIER, "Read %d bytes with raw==-2\n", res);
79 if (res<0) {
80 yCTrace(TCPROSCARRIER, "tcpros_carrier failed, %s %d\n", __FILE__, __LINE__);
81 phase = -1;
82 return -1;
83 }
84 int len_scan = scan.length();
85 if (WireBottle::checkBottle(scan.get(),len_scan)) {
86 yCTrace(TCPROSCARRIER, "Looks YARP-compatible\n");
87 raw = 1;
88 } else {
89 yCTrace(TCPROSCARRIER, "Looks strange, blobbing...\n");
90 raw = 0;
91 }
92 }
93
94 header.init(len);
95 if (raw==1) {
96 phase = 2;
97 if (scan.length()>0) {
98 cursor = scan.get();
99 remaining = scan.length();
100 } else {
101 cursor = nullptr;
102 remaining = header.blobLen;
103 }
104 } else if (raw==2) {
105 cursor = nullptr;
106 remaining = header.blobLen;
107 phase = 2;
108 } else {
109 phase = 1;
110 cursor = (char*) &header;
111 remaining = sizeof(header);
112 }
113 }
114 yCTrace(TCPROSCARRIER, "phase %d remaining %d\n", phase, remaining);
115 if (remaining>0) {
116 if (cursor!=nullptr) {
117 int allow = remaining;
118 if ((int)b.length()<allow) {
119 allow = b.length();
120 }
121 memcpy(b.get(),cursor,allow);
122 cursor+=allow;
123 remaining-=allow;
124 yCTrace(TCPROSCARRIER, "%d bytes of header\n", allow);
125 return allow;
126 } else {
127 int result = delegate->getInputStream().read(b);
128 yCTrace(TCPROSCARRIER, "Read %d bytes\n", result);
129 if (result>0) {
130 remaining-=result;
131 yCTrace(TCPROSCARRIER, "%d bytes of meat\n", result);
132 return result;
133 }
134 }
135 }
136 phase = -1;
137 return -1;
138}
139
140
142 if (!setInitiative) {
143 initiative = true;
144 setInitiative = true;
145 }
146 yCTrace(TCPROSCARRIER, " [[[[[ write %d bytes ]]]]]\n", (int)b.length());
147 delegate->getOutputStream().write(b);
148}
149
150
151void TcpRosStream::updateKind(const char *kind, bool sender, bool reply) {
152 std::string code = rosToKind(kind);
153 if (code!="") {
154 configureTwiddler(twiddler,code.c_str(),kind,sender,reply);
155 this->kind = code;
156 } else {
157 this->kind = "";
158 }
159}
160
161
162std::map<std::string, std::string> TcpRosStream::rosToKind() {
163 std::map<std::string, std::string> kinds;
164 kinds["std_msgs/String"] = "vector string 1 *";
165 kinds["std_msgs/Int32"] = "vector int32 1 *";
166 kinds["std_msgs/Float64"] = "vector float64 1 *";
167
168 // these two are specialized, TODO link them specifically to
169 // yarp/image and yarp/vector
170 kinds["sensor_msgs/Image"] = "list 4 skip uint32 * skip uint32 * skip uint32 * skip string * >height uint32 * >width uint32 * >encoding string * skip int8 * >step int32 * compute image_params <=[mat] vocab * <translated_encoding vocab * item_vector int32 5 <depth item * <img_size item * <quantum item * <width item * <height item * blob *";
171
172 kinds["test_roscpp/TestStringString"] = "vector string 1 * --- vector string 1 *";
173 // kinds["rospy_tutorials/AddTwoInts"] = "vector int64 2 * --- vector int64 1 *";
174 return kinds;
175}
176
177std::string TcpRosStream::rosToKind(const char *rosname) {
178 if (std::string(rosname) == "") {
179 return {};
180 };
181 std::map<std::string, std::string> kinds = rosToKind();
182
183 if (kinds.find(rosname)!=kinds.end()) {
184 return kinds[rosname];
185 }
186 Port port;
187 port.openFake("yarpidl_rosmsg");
188 if (port.addOutput("/typ")) {
189 Bottle cmd, resp;
190 cmd.addString(std::string("twiddle ") + rosname);
191 yCTrace(TCPROSCARRIER, "QUERY yarpidl_rosmsg %s\n", cmd.toString().c_str());
192 port.write(cmd,resp);
193 yCTrace(TCPROSCARRIER, "GOT yarpidl_rosmsg %s\n", resp.toString().c_str());
194 std::string txt = resp.get(0).asString();
195 if (txt != "?") {
196 return txt;
197 }
198 }
199 port.close();
200 if (std::string(rosname)!="") {
201 yCError(TCPROSCARRIER, "Do not know anything about type '%s'\n", rosname);
202 yCError(TCPROSCARRIER, "Could not connect to a type server to look up type '%s'\n", rosname);
203 ::exit(1);
204 }
205 return {};
206}
207
208
209bool TcpRosStream::configureTwiddler(WireTwiddler& twiddler, const char *txt, const char *prompt, bool sender, bool reply) {
210 yCTrace(TCPROSCARRIER, "CONFIGURE AS %s [%s/%s]\n", txt,
211 sender?"sender":"receiver",
212 reply?"reply":"main");
213 std::string str(txt);
214 if (reply) {
215 size_t idx = str.find("---");
216 if (idx!=std::string::npos) {
217 str = str.substr(idx+3,str.length());
218 }
219 }
220 str = std::string("skip int32 * ") + str;
221 if (reply) {
222 str = std::string("skip int8 * ") + str;
223 }
224 return twiddler.configure(str.c_str(),prompt);
225}
const yarp::os::LogComponent & TCPROSCARRIER()
static char * checkBottle(char *cursor, int &remaining, int ct, int list_tag)
Definition: WireBottle.cpp:22
static std::map< std::string, std::string > rosToKind()
void write(const yarp::os::Bytes &b) override
Write a block of bytes to the stream.
static bool configureTwiddler(yarp::wire_rep_utils::WireTwiddler &twiddler, const char *txt, const char *prompt, bool sender, bool reply)
void updateKind(const char *kind, bool sender, bool reply)
A simple collection of objects that can be described and transmitted in a portable way.
Definition: Bottle.h:64
Value & get(size_type index) const
Reads a Value v from a certain part of the list.
Definition: Bottle.cpp:246
void addString(const char *str)
Places a string in the bottle, at the end of the list.
Definition: Bottle.cpp:170
std::string toString() const override
Gives a human-readable textual representation of the bottle.
Definition: Bottle.cpp:211
A simple abstraction for a block of bytes.
Definition: Bytes.h:24
size_t length() const
Definition: Bytes.cpp:22
const char * get() const
Definition: Bytes.cpp:27
virtual int read()
Read and return a single byte.
Definition: InputStream.cpp:20
yarp::conf::ssize_t readFull(Bytes &b)
Keep reading until buffer is full.
Definition: InputStream.cpp:96
void clear()
Disassociate object with any data block (deleting block if appropriate).
const Bytes & bytes() const
void allocate(size_t len)
Makes a data block of the specified length that will be deleted if this object is destroyed.
const char * get() const
virtual void write(char ch)
Write a single byte to the stream.
A mini-server for network communication.
Definition: Port.h:46
bool write(const PortWriter &writer, const PortWriter *callback=nullptr) const override
Write an object to the port.
Definition: Port.cpp:436
bool addOutput(const std::string &name) override
Add an output connection to the specified port.
Definition: Port.cpp:353
bool openFake(const std::string &name)
Start port without making it accessible from the network.
Definition: Port.cpp:74
void close() override
Stop port activity.
Definition: Port.cpp:363
virtual InputStream & getInputStream()=0
Get an InputStream to read from.
virtual OutputStream & getOutputStream()=0
Get an OutputStream to write to.
virtual std::string asString() const
Get string value.
Definition: Value.cpp:234
yarp::conf::ssize_t read(yarp::os::Bytes &b) override
Read a block of data from the stream.
bool configure(const char *txt, const char *prompt)
#define yCError(component,...)
Definition: LogComponent.h:213
#define yCTrace(component,...)
Definition: LogComponent.h:84
::ssize_t ssize_t
Definition: numeric.h:86
An interface to the operating system, including Port based communication.