YARP
Yet Another Robot Platform
UnixSocketCarrier.cpp
Go to the documentation of this file.
1 /*
2  * SPDX-FileCopyrightText: 2006-2021 Istituto Italiano di Tecnologia (IIT)
3  * SPDX-License-Identifier: BSD-3-Clause
4  */
5 
6 #include "UnixSocketCarrier.h"
7 
9 #include <yarp/conf/filesystem.h>
10 
12 #include <yarp/os/Log.h>
13 #include <yarp/os/LogStream.h>
14 #include <yarp/os/Os.h>
15 
16 #include "UnixSocketLogComponent.h"
17 
18 #include <array>
19 #include <mutex>
20 
21 using namespace yarp::os;
22 namespace fs = yarp::conf::filesystem;
23 
24 namespace {
25 
26 // FIXME: This method should be available somewhere in YARP
27 std::string getYARPRuntimeDir()
28 {
29  static std::mutex m;
30  std::lock_guard<std::mutex> lock(m);
31 
32  static std::string yarp_runtime_dir;
33  bool found = false;
34 
35  // If already populated, there is nothing to do
36  if (!yarp_runtime_dir.empty()) {
37  return yarp_runtime_dir;
38  }
39 
40  // Check YARP_RUNTIME_DIR
41  yarp_runtime_dir = yarp::conf::environment::get_string("YARP_RUNTIME_DIR", &found);
42  if (found) {
43  return yarp_runtime_dir;
44  }
45 
46  // Check XDG_RUNTIME_DIR
47  std::string xdg_runtime_dir = yarp::conf::environment::get_string("XDG_RUNTIME_DIR", &found);
48  if (found) {
49  yarp_runtime_dir = xdg_runtime_dir + fs::preferred_separator + "yarp";
50  return yarp_runtime_dir;
51  }
52 
53  // Use /tmp/runtime-user
54  std::string user = yarp::conf::environment::get_string("USER", &found);
55  if (found) {
56  yarp_runtime_dir = "/tmp/runtime-" + user + fs::preferred_separator + "yarp";
57  return yarp_runtime_dir;
58  }
59 
60  // ERROR
61  return {};
62 }
63 
69 bool isUnixSockSupported(ConnectionState& proto) // FIXME Why is this method unused?
70 {
73 
74  if (remote.getHost() != local.getHost()) {
76  "The ports are on different machines, unix socket not supported...");
77  return false;
78  }
79  return true;
80 }
81 
82 } // namespace
83 
85 {
86  return new UnixSocketCarrier();
87 }
88 
89 std::string UnixSocketCarrier::getName() const
90 {
91  return name;
92 }
93 
95 {
96  return requireAckFlag;
97 }
98 
100 {
101  return false;
102 }
103 
105 {
106  if (header.length() != headerSize) {
107  return false;
108  }
109 
110  bool isUnix = true;
111  bool isUnix_ack = true;
112 
113  const char* target = headerCode;
114  const char* target_ack = headerCode_ack;
115  for (size_t i = 0; i < headerSize; i++) {
116  if (header.get()[i] != target[i]) {
117  isUnix = false;
118  }
119  if (header.get()[i] != target_ack[i]) {
120  isUnix_ack = false;
121  }
122  }
123 
124  return (isUnix || isUnix_ack);
125 }
126 
128 {
129  const char* target = requireAckFlag ? headerCode_ack : headerCode;
130  for (size_t i = 0; i < headerSize && i < header.length(); i++) {
131  header.get()[i] = target[i];
132  }
133 }
134 
136 {
137  YARP_UNUSED(proto);
138  YARP_UNUSED(writer);
139 
140  return true;
141 }
142 
144 {
145  YARP_UNUSED(proto);
146 
147  return true;
148 }
149 
151 {
152  if (requireAckFlag) {
153  const Bytes ack_bytes(const_cast<char*>(ack_string), ack_string_size);
154  proto.os().write(ack_bytes);
155  }
156  return true;
157 }
158 
160 {
161  if (requireAckFlag) {
162  std::array<char, ack_string_size> buf;
163  Bytes ack(buf.data(), buf.size());
164  yarp::conf::ssize_t hdr = proto.is().readFull(ack);
165  if (static_cast<size_t>(hdr) != ack.length()) {
166  yCDebug(UNIXSOCK_CARRIER, "Did not get ack");
167  return false;
168  }
169 
170  const char* target = ack_string;
171  for (size_t i = 0; i < ack_string_size; i++) {
172  if (ack.get()[i] != target[i]) {
173  yCDebug(UNIXSOCK_CARRIER, "Bad ack");
174  return false;
175  }
176  }
177  }
178  return true;
179 }
180 
182 {
183  // I am the receiver
184  return becomeUnixSocket(proto, false);
185 }
186 
188 {
189  // I am the sender
190  return becomeUnixSocket(proto, true);
191 }
192 
193 bool UnixSocketCarrier::becomeUnixSocket(ConnectionState& proto, bool sender)
194 {
195  if (!isUnixSockSupported(proto)) {
196  return false;
197  }
198 
199  Contact remote = proto.getStreams().getRemoteAddress();
201 
202  proto.takeStreams(nullptr); // free up port from tcp
203 
204  std::string runtime_dir = getYARPRuntimeDir();
205 
206  // Make sure that the path exists
207  if (runtime_dir.empty() || yarp::os::mkdir_p(runtime_dir.c_str(), 0) != 0) {
208  yCError(UNIXSOCK_CARRIER, "Failed to create directory %s", runtime_dir.c_str());
209  return false;
210  }
211 
212  if (sender) {
213  socketPath = runtime_dir + fs::preferred_separator + std::to_string(remote.getPort()) + "_" + std::to_string(local.getPort()) + ".sock";
214  } else {
215  socketPath = runtime_dir + fs::preferred_separator + std::to_string(local.getPort()) + "_" + std::to_string(remote.getPort()) + ".sock";
216  }
217 
218  stream = new UnixSockTwoWayStream(socketPath);
219  stream->setLocalAddress(local);
220  stream->setRemoteAddress(remote);
221 
222  if (!stream->open(sender)) {
223  delete stream;
224  stream = nullptr;
225  yCError(UNIXSOCK_CARRIER, "Failed to open stream on socket %s as %s", socketPath.c_str(), (sender ? "sender" : "receiver"));
226  return false;
227  }
228  yAssert(stream != nullptr);
229 
230  proto.takeStreams(stream);
231 
232  yCDebug(UNIXSOCK_CARRIER, "Connected on socket %s as %s", socketPath.c_str(), (sender ? "sender" : "receiver"));
233  return true;
234 }
235 
236 
238 {
239  Property options;
240  options.fromString(proto.getSenderSpecifier());
241  return configureFromProperty(options);
242 }
243 
245 {
246  if (options.check("ack")) {
247  yCInfo(UNIXSOCK_CARRIER, "ACK Enabled");
248  requireAckFlag = true;
249  }
250  return true;
251 }
252 
254 {
255  const char* target_ack = headerCode_ack;
256  for (size_t i = 0; i < headerSize; i++) {
257  if (header.get()[i] != target_ack[i]) {
258  return;
259  }
260  }
261  yCInfo(UNIXSOCK_CARRIER, "ACK Enabled");
262  requireAckFlag = true;
263 }
#define yAssert(x)
Definition: Log.h:294
const yarp::os::LogComponent & UNIXSOCK_CARRIER()
A stream abstraction for unix socket communication.
Communicating between two ports(IPC) via Unix Socket.
bool sendAck(yarp::os::ConnectionState &proto) override
Send an acknowledgement, if needed for this carrier.
bool sendIndex(yarp::os::ConnectionState &proto, yarp::os::SizedWriter &writer) override
void setParameters(const yarp::os::Bytes &header) override
Configure this carrier based on the first 8 bytes of the connection.
bool isConnectionless() const override
Check if this carrier is connectionless (like udp, mcast) or connection based (like tcp).
std::string getName() const override
Get the name of this connection type ("tcp", "mcast", "shmem", ...)
bool requireAck() const override
Check if carrier has flow control, requiring sent messages to be acknowledged by recipient.
bool expectAck(yarp::os::ConnectionState &proto) override
Receive an acknowledgement, if expected for this carrier.
bool configure(yarp::os::ConnectionState &proto) override
Give carrier a shot at looking at how the connection is set up.
bool expectIndex(yarp::os::ConnectionState &proto) override
Expect a message header, if there is one for this carrier.
bool respondToHeader(yarp::os::ConnectionState &proto) override
Respond to the header.
bool expectReplyToHeader(yarp::os::ConnectionState &proto) override
Process reply to header, if one is expected for this carrier.
bool configureFromProperty(yarp::os::Property &options) override
yarp::os::Carrier * create() const override
Factory method.
void getHeader(yarp::os::Bytes &header) const override
Provide 8 bytes describing this connection sufficiently to allow the other side of a connection to se...
bool checkHeader(const yarp::os::Bytes &header) override
Given the first 8 bytes received on a connection, decide if this is the right carrier type to use for...
A simple abstraction for a block of bytes.
Definition: Bytes.h:25
size_t length() const
Definition: Bytes.cpp:22
const char * get() const
Definition: Bytes.cpp:27
A base class for connection types (tcp, mcast, shmem, ...) which are called carriers in YARP.
Definition: Carrier.h:45
The basic state of a connection - route, streams in use, etc.
virtual TwoWayStream & getStreams()=0
Access the streams associated with the connection.
virtual std::string getSenderSpecifier() const =0
Extract a name for the sender, if the connection type supports that.
virtual void takeStreams(TwoWayStream *streams)=0
Provide streams to be used with the connection.
InputStream & is()
Shorthand for getInputStream()
OutputStream & os()
Shorthand for getOutputStream()
Represents how to reach a part of a YARP network.
Definition: Contact.h:36
int getPort() const
Get the port number associated with this Contact for socket communication.
Definition: Contact.cpp:239
std::string getHost() const
Get the host name associated with this Contact for socket communication.
Definition: Contact.cpp:228
yarp::conf::ssize_t readFull(Bytes &b)
Keep reading until buffer is full.
Definition: InputStream.cpp:96
virtual void write(char ch)
Write a single byte to the stream.
A class for storing options and configuration information.
Definition: Property.h:34
void fromString(const std::string &txt, bool wipe=true)
Interprets a string as a list of properties.
Definition: Property.cpp:1063
bool check(const std::string &key) const override
Check if there exists a property of the given name.
Definition: Property.cpp:1041
Minimal requirements for an efficient Writer.
Definition: SizedWriter.h:33
virtual const Contact & getLocalAddress() const =0
Get the address of the local side of the stream.
virtual const Contact & getRemoteAddress() const =0
Get the address of the remote side of the stream.
#define yCInfo(component,...)
Definition: LogComponent.h:132
#define yCError(component,...)
Definition: LogComponent.h:154
#define yCDebug(component,...)
Definition: LogComponent.h:109
std::string get_string(const std::string &key, bool *found=nullptr)
Read a string from an environment variable.
Definition: environment.h:68
static constexpr value_type preferred_separator
Definition: filesystem.h:23
std::string to_string(IntegerType x)
Definition: numeric.h:115
::ssize_t ssize_t
Definition: numeric.h:86
An interface to the operating system, including Port based communication.
int mkdir_p(const char *p, int ignoreLevels=0)
Create a directory and all parent directories needed.
Definition: Os.cpp:42
#define YARP_UNUSED(var)
Definition: api.h:162