YARP
Yet Another Robot Platform
UnixSocketCarrier.cpp
Go to the documentation of this file.
1 /*
2  * Copyright (C) 2006-2020 Istituto Italiano di Tecnologia (IIT)
3  * All rights reserved.
4  *
5  * This software may be modified and distributed under the terms of the
6  * BSD-3-Clause license. See the accompanying LICENSE file for details.
7  */
8 
9 #include "UnixSocketCarrier.h"
10 
11 #include <yarp/conf/environment.h>
12 #include <yarp/conf/filesystem.h>
13 
15 #include <yarp/os/Log.h>
16 #include <yarp/os/LogStream.h>
17 #include <yarp/os/Os.h>
18 
19 #include "UnixSocketLogComponent.h"
20 
21 #include <array>
22 #include <mutex>
23 
24 using namespace yarp::os;
25 namespace fs = yarp::conf::filesystem;
26 
27 namespace {
28 
29 // FIXME: This method should be available somewhere in YARP
30 std::string getYARPRuntimeDir()
31 {
32  static std::mutex m;
33  std::lock_guard<std::mutex> lock(m);
34 
35  static std::string yarp_runtime_dir;
36  bool found = false;
37 
38  // If already populated, there is nothing to do
39  if (!yarp_runtime_dir.empty()) {
40  return yarp_runtime_dir;
41  }
42 
43  // Check YARP_RUNTIME_DIR
44  yarp_runtime_dir = yarp::conf::environment::getEnvironment("YARP_RUNTIME_DIR", &found);
45  if (found) {
46  return yarp_runtime_dir;
47  }
48 
49  // Check XDG_RUNTIME_DIR
50  std::string xdg_runtime_dir = yarp::conf::environment::getEnvironment("XDG_RUNTIME_DIR", &found);
51  if (found) {
52  yarp_runtime_dir = xdg_runtime_dir + fs::preferred_separator + "yarp";
53  return yarp_runtime_dir;
54  }
55 
56  // Use /tmp/runtime-user
57  std::string user = yarp::conf::environment::getEnvironment("USER", &found);
58  if (found) {
59  yarp_runtime_dir = "/tmp/runtime-" + user + fs::preferred_separator + "yarp";
60  return yarp_runtime_dir;
61  }
62 
63  // ERROR
64  return {};
65 }
66 
72 bool isUnixSockSupported(ConnectionState& proto) // FIXME Why is this method unused?
73 {
76 
77  if (remote.getHost() != local.getHost()) {
79  "The ports are on different machines, unix socket not supported...");
80  return false;
81  }
82  return true;
83 }
84 
85 } // namespace
86 
88 {
89  return new UnixSocketCarrier();
90 }
91 
92 std::string UnixSocketCarrier::getName() const
93 {
94  return name;
95 }
96 
98 {
99  return requireAckFlag;
100 }
101 
103 {
104  return false;
105 }
106 
108 {
109  if (header.length() != headerSize) {
110  return false;
111  }
112 
113  bool isUnix = true;
114  bool isUnix_ack = true;
115 
116  const char* target = headerCode;
117  const char* target_ack = headerCode_ack;
118  for (size_t i = 0; i < headerSize; i++) {
119  if (header.get()[i] != target[i]) {
120  isUnix = false;
121  }
122  if (header.get()[i] != target_ack[i]) {
123  isUnix_ack = false;
124  }
125  }
126 
127  return (isUnix || isUnix_ack);
128 }
129 
131 {
132  const char* target = requireAckFlag ? headerCode_ack : headerCode;
133  for (size_t i = 0; i < headerSize && i < header.length(); i++) {
134  header.get()[i] = target[i];
135  }
136 }
137 
139 {
140  YARP_UNUSED(proto);
141  YARP_UNUSED(writer);
142 
143  return true;
144 }
145 
147 {
148  YARP_UNUSED(proto);
149 
150  return true;
151 }
152 
154 {
155  if (requireAckFlag) {
156  const Bytes ack_bytes(const_cast<char*>(ack_string), ack_string_size);
157  proto.os().write(ack_bytes);
158  }
159  return true;
160 }
161 
163 {
164  if (requireAckFlag) {
165  std::array<char, ack_string_size> buf;
166  Bytes ack(buf.data(), buf.size());
167  yarp::conf::ssize_t hdr = proto.is().readFull(ack);
168  if (static_cast<size_t>(hdr) != ack.length()) {
169  yCDebug(UNIXSOCK_CARRIER, "Did not get ack");
170  return false;
171  }
172 
173  const char* target = ack_string;
174  for (size_t i = 0; i < ack_string_size; i++) {
175  if (ack.get()[i] != target[i]) {
176  yCDebug(UNIXSOCK_CARRIER, "Bad ack");
177  return false;
178  }
179  }
180  }
181  return true;
182 }
183 
185 {
186  // I am the receiver
187  return becomeUnixSocket(proto, false);
188 }
189 
191 {
192  // I am the sender
193  return becomeUnixSocket(proto, true);
194 }
195 
196 bool UnixSocketCarrier::becomeUnixSocket(ConnectionState& proto, bool sender)
197 {
198  if (!isUnixSockSupported(proto)) {
199  return false;
200  }
201 
202  Contact remote = proto.getStreams().getRemoteAddress();
204 
205  proto.takeStreams(nullptr); // free up port from tcp
206 
207  std::string runtime_dir = getYARPRuntimeDir();
208 
209  // Make sure that the path exists
210  if (runtime_dir.empty() || yarp::os::mkdir_p(runtime_dir.c_str(), 0) != 0) {
211  yCError(UNIXSOCK_CARRIER, "Failed to create directory %s", runtime_dir.c_str());
212  return false;
213  }
214 
215  if (sender) {
216  socketPath = runtime_dir + fs::preferred_separator + std::to_string(remote.getPort()) + "_" + std::to_string(local.getPort()) + ".sock";
217  } else {
218  socketPath = runtime_dir + fs::preferred_separator + std::to_string(local.getPort()) + "_" + std::to_string(remote.getPort()) + ".sock";
219  }
220 
221  stream = new UnixSockTwoWayStream(socketPath);
222  stream->setLocalAddress(local);
223  stream->setRemoteAddress(remote);
224 
225  if (!stream->open(sender)) {
226  delete stream;
227  stream = nullptr;
228  yCError(UNIXSOCK_CARRIER, "Failed to open stream on socket %s as %s", socketPath.c_str(), (sender ? "sender" : "receiver"));
229  return false;
230  }
231  yAssert(stream != nullptr);
232 
233  proto.takeStreams(stream);
234 
235  yCDebug(UNIXSOCK_CARRIER, "Connected on socket %s as %s", socketPath.c_str(), (sender ? "sender" : "receiver"));
236  return true;
237 }
238 
239 
241 {
242  Property options;
243  options.fromString(proto.getSenderSpecifier());
244  return configureFromProperty(options);
245 }
246 
248 {
249  if (options.check("ack")) {
250  yCInfo(UNIXSOCK_CARRIER, "ACK Enabled");
251  requireAckFlag = true;
252  }
253  return true;
254 }
255 
257 {
258  const char* target_ack = headerCode_ack;
259  for (size_t i = 0; i < headerSize; i++) {
260  if (header.get()[i] != target_ack[i]) {
261  return;
262  }
263  }
264  yCInfo(UNIXSOCK_CARRIER, "ACK Enabled");
265  requireAckFlag = true;
266 }
#define yAssert(x)
Definition: Log.h:297
const yarp::os::LogComponent & UNIXSOCK_CARRIER()
A stream abstraction for unix socket communication.
Communicating between two ports(IPC) via Unix Socket.
bool sendAck(yarp::os::ConnectionState &proto) override
Send an acknowledgement, if needed for this carrier.
bool sendIndex(yarp::os::ConnectionState &proto, yarp::os::SizedWriter &writer) override
void setParameters(const yarp::os::Bytes &header) override
Configure this carrier based on the first 8 bytes of the connection.
bool isConnectionless() const override
Check if this carrier is connectionless (like udp, mcast) or connection based (like tcp).
std::string getName() const override
Get the name of this connection type ("tcp", "mcast", "shmem", ...)
bool requireAck() const override
Check if carrier has flow control, requiring sent messages to be acknowledged by recipient.
bool expectAck(yarp::os::ConnectionState &proto) override
Receive an acknowledgement, if expected for this carrier.
bool configure(yarp::os::ConnectionState &proto) override
Give carrier a shot at looking at how the connection is set up.
bool expectIndex(yarp::os::ConnectionState &proto) override
Expect a message header, if there is one for this carrier.
bool respondToHeader(yarp::os::ConnectionState &proto) override
Respond to the header.
bool expectReplyToHeader(yarp::os::ConnectionState &proto) override
Process reply to header, if one is expected for this carrier.
bool configureFromProperty(yarp::os::Property &options) override
yarp::os::Carrier * create() const override
Factory method.
void getHeader(yarp::os::Bytes &header) const override
Provide 8 bytes describing this connection sufficiently to allow the other side of a connection to se...
bool checkHeader(const yarp::os::Bytes &header) override
Given the first 8 bytes received on a connection, decide if this is the right carrier type to use for...
A simple abstraction for a block of bytes.
Definition: Bytes.h:28
size_t length() const
Definition: Bytes.cpp:25
const char * get() const
Definition: Bytes.cpp:30
A base class for connection types (tcp, mcast, shmem, ...) which are called carriers in YARP.
Definition: Carrier.h:48
The basic state of a connection - route, streams in use, etc.
virtual TwoWayStream & getStreams()=0
Access the streams associated with the connection.
virtual std::string getSenderSpecifier() const =0
Extract a name for the sender, if the connection type supports that.
virtual void takeStreams(TwoWayStream *streams)=0
Provide streams to be used with the connection.
InputStream & is()
Shorthand for getInputStream()
OutputStream & os()
Shorthand for getOutputStream()
Represents how to reach a part of a YARP network.
Definition: Contact.h:39
int getPort() const
Get the port number associated with this Contact for socket communication.
Definition: Contact.cpp:242
std::string getHost() const
Get the host name associated with this Contact for socket communication.
Definition: Contact.cpp:231
yarp::conf::ssize_t readFull(Bytes &b)
Keep reading until buffer is full.
Definition: InputStream.cpp:99
virtual void write(char ch)
Write a single byte to the stream.
A class for storing options and configuration information.
Definition: Property.h:37
void fromString(const std::string &txt, bool wipe=true)
Interprets a string as a list of properties.
Definition: Property.cpp:1046
bool check(const std::string &key) const override
Check if there exists a property of the given name.
Definition: Property.cpp:1024
Minimal requirements for an efficient Writer.
Definition: SizedWriter.h:36
virtual const Contact & getLocalAddress() const =0
Get the address of the local side of the stream.
virtual const Contact & getRemoteAddress() const =0
Get the address of the remote side of the stream.
#define yCInfo(component,...)
Definition: LogComponent.h:135
#define yCError(component,...)
Definition: LogComponent.h:157
#define yCDebug(component,...)
Definition: LogComponent.h:112
std::string getEnvironment(const char *key, bool *found=nullptr)
Read a variable from the environment.
Definition: environment.h:31
static constexpr value_type preferred_separator
Definition: filesystem.h:28
::ssize_t ssize_t
Definition: numeric.h:60
An interface to the operating system, including Port based communication.
int mkdir_p(const char *p, int ignoreLevels=0)
Create a directory and all parent directories needed.
Definition: Os.cpp:45
#define YARP_UNUSED(var)
Definition: api.h:159