YARP
Yet Another Robot Platform
 
Loading...
Searching...
No Matches
UnixSocketCarrier.cpp
Go to the documentation of this file.
1/*
2 * SPDX-FileCopyrightText: 2006-2021 Istituto Italiano di Tecnologia (IIT)
3 * SPDX-License-Identifier: BSD-3-Clause
4 */
5
6#include "UnixSocketCarrier.h"
7
10
12#include <yarp/os/Log.h>
13#include <yarp/os/LogStream.h>
14#include <yarp/os/Os.h>
15
17
18#include <array>
19#include <mutex>
20
21using namespace yarp::os;
22namespace fs = yarp::conf::filesystem;
23
24namespace {
25
26// FIXME: This method should be available somewhere in YARP
27std::string getYARPRuntimeDir()
28{
29 static std::mutex m;
30 std::lock_guard<std::mutex> lock(m);
31
32 static std::string yarp_runtime_dir;
33 bool found = false;
34
35 // If already populated, there is nothing to do
36 if (!yarp_runtime_dir.empty()) {
37 return yarp_runtime_dir;
38 }
39
40 // Check YARP_RUNTIME_DIR
42 if (found) {
43 return yarp_runtime_dir;
44 }
45
46 // Check XDG_RUNTIME_DIR
47 std::string xdg_runtime_dir = yarp::conf::environment::get_string("XDG_RUNTIME_DIR", &found);
48 if (found) {
50 return yarp_runtime_dir;
51 }
52
53 // Use /tmp/runtime-user
54 std::string user = yarp::conf::environment::get_string("USER", &found);
55 if (found) {
56 yarp_runtime_dir = "/tmp/runtime-" + user + fs::preferred_separator + "yarp";
57 return yarp_runtime_dir;
58 }
59
60 // ERROR
61 return {};
62}
63
69bool isUnixSockSupported(ConnectionState& proto) // FIXME Why is this method unused?
70{
71 yarp::os::Contact remote = proto.getStreams().getRemoteAddress();
72 yarp::os::Contact local = proto.getStreams().getLocalAddress();
73
74 if (remote.getHost() != local.getHost()) {
76 "The ports are on different machines, unix socket not supported...");
77 return false;
78 }
79 return true;
80}
81
82} // namespace
83
88
89std::string UnixSocketCarrier::getName() const
90{
91 return name;
92}
93
95{
96 return requireAckFlag;
97}
98
100{
101 return false;
102}
103
105{
106 if (header.length() != headerSize) {
107 return false;
108 }
109
110 bool isUnix = true;
111 bool isUnix_ack = true;
112
113 const char* target = headerCode;
114 const char* target_ack = headerCode_ack;
115 for (size_t i = 0; i < headerSize; i++) {
116 if (header.get()[i] != target[i]) {
117 isUnix = false;
118 }
119 if (header.get()[i] != target_ack[i]) {
120 isUnix_ack = false;
121 }
122 }
123
124 return (isUnix || isUnix_ack);
125}
126
128{
129 const char* target = requireAckFlag ? headerCode_ack : headerCode;
130 for (size_t i = 0; i < headerSize && i < header.length(); i++) {
131 header.get()[i] = target[i];
132 }
133}
134
136{
137 YARP_UNUSED(proto);
138 YARP_UNUSED(writer);
139
140 return true;
141}
142
144{
145 YARP_UNUSED(proto);
146
147 return true;
148}
149
151{
152 if (requireAckFlag) {
153 const Bytes ack_bytes(const_cast<char*>(ack_string), ack_string_size);
154 proto.os().write(ack_bytes);
155 }
156 return true;
157}
158
160{
161 if (requireAckFlag) {
162 std::array<char, ack_string_size> buf {'\0', '\0', '\0', '\0'};
163 Bytes ack(buf.data(), buf.size());
165 if (static_cast<size_t>(hdr) != ack.length()) {
166 yCDebug(UNIXSOCK_CARRIER, "Did not get ack");
167 return false;
168 }
169
170 const char* target = ack_string;
171 for (size_t i = 0; i < ack_string_size; i++) {
172 if (ack.get()[i] != target[i]) {
173 yCDebug(UNIXSOCK_CARRIER, "Bad ack");
174 return false;
175 }
176 }
177 }
178 return true;
179}
180
182{
183 // I am the receiver
184 return becomeUnixSocket(proto, false);
185}
186
188{
189 // I am the sender
190 return becomeUnixSocket(proto, true);
191}
192
193bool UnixSocketCarrier::becomeUnixSocket(ConnectionState& proto, bool sender)
194{
195 if (!isUnixSockSupported(proto)) {
196 return false;
197 }
198
199 Contact remote = proto.getStreams().getRemoteAddress();
200 Contact local = proto.getStreams().getLocalAddress();
201
202 proto.takeStreams(nullptr); // free up port from tcp
203
204 std::string runtime_dir = getYARPRuntimeDir();
205
206 // Make sure that the path exists
207 if (runtime_dir.empty() || yarp::os::mkdir_p(runtime_dir.c_str(), 0) != 0) {
208 yCError(UNIXSOCK_CARRIER, "Failed to create directory %s", runtime_dir.c_str());
209 return false;
210 }
211
212 if (sender) {
213 socketPath = runtime_dir + fs::preferred_separator + std::to_string(remote.getPort()) + "_" + std::to_string(local.getPort()) + ".sock";
214 } else {
215 socketPath = runtime_dir + fs::preferred_separator + std::to_string(local.getPort()) + "_" + std::to_string(remote.getPort()) + ".sock";
216 }
217
218 stream = new UnixSockTwoWayStream(socketPath);
219 stream->setLocalAddress(local);
220 stream->setRemoteAddress(remote);
221
222 if (!stream->open(sender)) {
223 delete stream;
224 stream = nullptr;
225 yCError(UNIXSOCK_CARRIER, "Failed to open stream on socket %s as %s", socketPath.c_str(), (sender ? "sender" : "receiver"));
226 return false;
227 }
228 yAssert(stream != nullptr);
229
230 proto.takeStreams(stream);
231
232 yCDebug(UNIXSOCK_CARRIER, "Connected on socket %s as %s", socketPath.c_str(), (sender ? "sender" : "receiver"));
233 return true;
234}
235
236
238{
239 Property options;
240 options.fromString(proto.getSenderSpecifier());
241 return configureFromProperty(options);
242}
243
245{
246 if (options.check("ack")) {
247 yCInfo(UNIXSOCK_CARRIER, "ACK Enabled");
248 requireAckFlag = true;
249 }
250 return true;
251}
252
254{
255 const char* target_ack = headerCode_ack;
256 for (size_t i = 0; i < headerSize; i++) {
257 if (header.get()[i] != target_ack[i]) {
258 return;
259 }
260 }
261 yCInfo(UNIXSOCK_CARRIER, "ACK Enabled");
262 requireAckFlag = true;
263}
#define yAssert(x)
Definition Log.h:388
const yarp::os::LogComponent & UNIXSOCK_CARRIER()
A stream abstraction for unix socket communication.
bool open(bool sender=false)
void setLocalAddress(yarp::os::Contact &_localAddress)
void setRemoteAddress(yarp::os::Contact &_remoteAddress)
bool sendAck(yarp::os::ConnectionState &proto) override
Send an acknowledgement, if needed for this carrier.
bool sendIndex(yarp::os::ConnectionState &proto, yarp::os::SizedWriter &writer) override
void setParameters(const yarp::os::Bytes &header) override
Configure this carrier based on the first 8 bytes of the connection.
bool isConnectionless() const override
Check if this carrier is connectionless (like udp, mcast) or connection based (like tcp).
std::string getName() const override
Get the name of this connection type ("tcp", "mcast", "shmem", ...)
bool requireAck() const override
Check if carrier has flow control, requiring sent messages to be acknowledged by recipient.
bool expectAck(yarp::os::ConnectionState &proto) override
Receive an acknowledgement, if expected for this carrier.
bool configure(yarp::os::ConnectionState &proto) override
Give carrier a shot at looking at how the connection is set up.
UnixSocketCarrier()=default
bool expectIndex(yarp::os::ConnectionState &proto) override
Expect a message header, if there is one for this carrier.
bool respondToHeader(yarp::os::ConnectionState &proto) override
Respond to the header.
bool expectReplyToHeader(yarp::os::ConnectionState &proto) override
Process reply to header, if one is expected for this carrier.
bool configureFromProperty(yarp::os::Property &options) override
yarp::os::Carrier * create() const override
Factory method.
void getHeader(yarp::os::Bytes &header) const override
Provide 8 bytes describing this connection sufficiently to allow the other side of a connection to se...
bool checkHeader(const yarp::os::Bytes &header) override
Given the first 8 bytes received on a connection, decide if this is the right carrier type to use for...
A mini-server for performing network communication in the background.
A simple abstraction for a block of bytes.
Definition Bytes.h:24
size_t length() const
Definition Bytes.cpp:22
const char * get() const
Definition Bytes.cpp:27
A base class for connection types (tcp, mcast, shmem, ...) which are called carriers in YARP.
Definition Carrier.h:44
The basic state of a connection - route, streams in use, etc.
OutputStream & os()
Shorthand for getOutputStream()
virtual std::string getSenderSpecifier() const =0
Extract a name for the sender, if the connection type supports that.
InputStream & is()
Shorthand for getInputStream()
virtual void takeStreams(TwoWayStream *streams)=0
Provide streams to be used with the connection.
virtual TwoWayStream & getStreams()=0
Access the streams associated with the connection.
Represents how to reach a part of a YARP network.
Definition Contact.h:33
int getPort() const
Get the port number associated with this Contact for socket communication.
Definition Contact.cpp:239
std::string getHost() const
Get the host name associated with this Contact for socket communication.
Definition Contact.cpp:228
yarp::conf::ssize_t readFull(Bytes &b)
Keep reading until buffer is full.
virtual void write(char ch)
Write a single byte to the stream.
A class for storing options and configuration information.
Definition Property.h:33
void fromString(const std::string &txt, bool wipe=true)
Interprets a string as a list of properties.
bool check(const std::string &key) const override
Check if there exists a property of the given name.
Minimal requirements for an efficient Writer.
Definition SizedWriter.h:32
#define yCInfo(component,...)
#define yCError(component,...)
#define yCDebug(component,...)
std::string get_string(const std::string &key, bool *found=nullptr)
Read a string from an environment variable.
Definition environment.h:66
static constexpr value_type preferred_separator
Definition filesystem.h:21
::ssize_t ssize_t
Definition numeric.h:86
An interface to the operating system, including Port based communication.
int mkdir_p(const char *p, int ignoreLevels=0)
Create a directory and all parent directories needed.
Definition Os.cpp:35
#define YARP_UNUSED(var)
Definition api.h:162