YARP
Yet Another Robot Platform
UnixSockTwoWayStream.cpp
Go to the documentation of this file.
1 /*
2  * SPDX-FileCopyrightText: 2006-2021 Istituto Italiano di Tecnologia (IIT)
3  * SPDX-License-Identifier: BSD-3-Clause
4  */
5 
6 #include <yarp/conf/system.h>
7 
8 #include <yarp/os/LogStream.h>
9 #include <yarp/os/NetType.h>
10 #include <yarp/os/SystemClock.h>
11 
12 #include "UnixSockTwoWayStream.h"
13 #include "UnixSocketLogComponent.h"
14 #include <cerrno>
15 #include <cstring>
16 #include <fcntl.h> /* For O_* constants */
17 #include <sys/socket.h>
18 #include <sys/stat.h> /* For mode constants */
19 #include <sys/un.h>
20 #include <unistd.h>
21 
22 using namespace yarp::os;
23 
24 UnixSockTwoWayStream::UnixSockTwoWayStream(const std::string& _socketPath) :
25  socketPath(_socketPath)
26 {
27 }
28 
29 bool UnixSockTwoWayStream::open(bool sender)
30 {
31  openedAsReader = !sender;
32  struct sockaddr_un addr;
33  if ((reader_fd = ::socket(AF_UNIX, SOCK_STREAM, 0)) == -1) {
34  yCError(UNIXSOCK_CARRIER, "%d, %s", errno, strerror(errno));
35  return false;
36  }
37 
38  memset(&addr, 0, sizeof(addr));
39  addr.sun_family = AF_UNIX;
40 
41  if (socketPath.empty()) {
42  *addr.sun_path = '\0';
43  strncpy(addr.sun_path + 1, socketPath.c_str() + 1, sizeof(addr.sun_path) - 2);
44  } else {
45  strncpy(addr.sun_path, socketPath.c_str(), sizeof(addr.sun_path) - 1);
46  if (!sender) {
47  ::unlink(socketPath.c_str());
48  }
49  }
50 
51  if (sender) {
52  size_t attempts = 0;
53  // try connection 5 times, waiting that the receiver bind the socket
54  while (attempts < maxAttempts) {
55  int result = ::connect(reader_fd, reinterpret_cast<struct sockaddr*>(&addr), sizeof(addr));
56  if (result == 0) {
57  break;
58  }
59  yarp::os::SystemClock::delaySystem(delayBetweenAttempts);
60  attempts++;
61  }
62 
63  if (attempts >= maxAttempts) {
64  yCError(UNIXSOCK_CARRIER, "connect() error, I tried %zu times: %d, %s", maxAttempts, errno, strerror(errno));
65  return false;
66  }
67  } else {
68  if (::bind(reader_fd, reinterpret_cast<struct sockaddr*>(&addr), sizeof(addr)) == -1) {
69  yCError(UNIXSOCK_CARRIER, "bind() error: %d, %s", errno, strerror(errno));
70  return false;
71  }
72 
73  // the socket will listen only 1 client
74  if (::listen(reader_fd, 2) == -1) {
75  yCError(UNIXSOCK_CARRIER, "listen() error: %d, %s", errno, strerror(errno));
76  return false;
77  }
78  struct sockaddr_un remote;
79  uint lenRemote = sizeof(remote);
80 
81  if ((sender_fd = ::accept(reader_fd, reinterpret_cast<struct sockaddr*>(&remote), &lenRemote)) == -1) {
82  yCError(UNIXSOCK_CARRIER, "accept() error: %d, %s", errno, strerror(errno));
83  return false;
84  }
85  }
86 
87  return true;
88 }
89 
91 {
92  close();
93 }
94 
96 {
97  return *this;
98 }
99 
101 {
102  return *this;
103 }
104 
106 {
107  return localAddress;
108 }
109 
111 {
112  return remoteAddress;
113 }
114 
116 {
117  localAddress = _localAddress;
118 }
119 
121 {
122  remoteAddress = _remoteAddress;
123 }
124 
126 {
127  yCDebug(UNIXSOCK_CARRIER, "Interrupting socket");
128  close();
129 }
130 
132 {
133  // If the connect descriptor is valid close socket
134  // and free the memory dedicated.
135  // socket closure
136  if (sender_fd > 0) {
137  ::shutdown(sender_fd, SHUT_RDWR);
138  ::close(sender_fd);
139  sender_fd = -1;
140  }
141 
142  if (reader_fd > 0) {
143  ::shutdown(reader_fd, SHUT_RDWR);
144  ::close(reader_fd);
145  reader_fd = -1;
146  }
147 
148  ::unlink(socketPath.c_str());
149  happy = false;
150 }
151 
153 {
154  if (closed || !happy) {
155  return -1;
156  }
157  int result;
158  result = ::read(openedAsReader ? sender_fd : reader_fd, b.get(), b.length());
159  if (closed || result == 0) {
160  happy = false;
161  return -1;
162  }
163  if (result < 0) {
164  yCError(UNIXSOCK_CARRIER, "read() error: %d, %s", errno, strerror(errno));
165  return -1;
166  }
167  return result;
168 }
169 
171 {
172  if (reader_fd < 0) {
173  close();
174  return;
175  }
176  int writtenMem = ::write(openedAsReader ? sender_fd : reader_fd, b.get(), b.length());
177  if (writtenMem < 0) {
178  yCError(UNIXSOCK_CARRIER, "write() error: %d, %s", errno, strerror(errno));
179  if (errno != ETIMEDOUT) {
180  close();
181  }
182  return;
183  }
184 }
185 
187 {
188  return happy;
189 }
190 
192 {
193 }
194 
196 {
197 }
198 
200 {
201 }
const yarp::os::LogComponent & UNIXSOCK_CARRIER()
bool isOk() const override
Check if the stream is ok or in an error state.
OutputStream & getOutputStream() override
Get an OutputStream to write to.
InputStream & getInputStream() override
Get an InputStream to read from.
void beginPacket() override
Mark the beginning of a logical packet.
void interrupt() override
Interrupt the stream.
bool open(bool sender=false)
const yarp::os::Contact & getRemoteAddress() const override
Get the address of the remote side of the stream.
UnixSockTwoWayStream(const std::string &_socketPath="")
void reset() override
Reset the stream.
void close() override
Terminate the stream.
void setLocalAddress(yarp::os::Contact &_localAddress)
void endPacket() override
Mark the end of a logical packet (see beginPacket).
void write(const yarp::os::Bytes &b) override
Write a block of bytes to the stream.
void setRemoteAddress(yarp::os::Contact &_remoteAddress)
const yarp::os::Contact & getLocalAddress() const override
Get the address of the local side of the stream.
A simple abstraction for a block of bytes.
Definition: Bytes.h:25
size_t length() const
Definition: Bytes.cpp:22
const char * get() const
Definition: Bytes.cpp:27
Represents how to reach a part of a YARP network.
Definition: Contact.h:36
Simple specification of the minimum functions needed from input streams.
Definition: InputStream.h:26
virtual int read()
Read and return a single byte.
Definition: InputStream.cpp:20
Simple specification of the minimum functions needed from output streams.
Definition: OutputStream.h:22
static void delaySystem(double seconds)
Definition: SystemClock.cpp:29
#define yCError(component,...)
Definition: LogComponent.h:154
#define yCDebug(component,...)
Definition: LogComponent.h:109
::ssize_t ssize_t
Definition: numeric.h:86
An interface to the operating system, including Port based communication.