YARP
Yet Another Robot Platform
SocketTwoWayStream.h
Go to the documentation of this file.
1 /*
2  * SPDX-FileCopyrightText: 2006-2021 Istituto Italiano di Tecnologia (IIT)
3  * SPDX-FileCopyrightText: 2006-2010 RobotCub Consortium
4  * SPDX-License-Identifier: BSD-3-Clause
5  */
6 
7 #ifndef YARP_OS_IMPL_SOCKETTWOWAYSTREAM_H
8 #define YARP_OS_IMPL_SOCKETTWOWAYSTREAM_H
9 
10 #include <yarp/conf/system.h>
11 
12 #include <yarp/os/Bytes.h>
13 #include <yarp/os/LogComponent.h>
14 #include <yarp/os/TwoWayStream.h>
17 #include <yarp/os/impl/TcpStream.h>
18 
19 #ifdef YARP_HAS_ACE // For TCP_CORK definition
20 # include <ace/os_include/netinet/os_tcp.h>
21 // In one the ACE headers there is a definition of "main" for WIN32
22 # ifdef main
23 # undef main
24 # endif
25 #else
26 # include <netinet/tcp.h>
27 #endif
28 
30 
31 namespace yarp {
32 namespace os {
33 namespace impl {
34 
39  public TwoWayStream,
40  public InputStream,
41  public OutputStream
42 {
43 public:
45  haveWriteTimeout(false),
46  haveReadTimeout(false),
47  happy(false)
48  {
49  }
50 
51  int open(const Contact& address);
52 
53  int open(yarp::os::impl::TcpAcceptor& acceptor);
54 
56  {
57  close();
58  }
59 
61  {
62  return *this;
63  }
64 
66  {
67  return *this;
68  }
69 
70  const Contact& getLocalAddress() const override
71  {
72  return localAddress;
73  }
74 
75  const Contact& getRemoteAddress() const override
76  {
77  return remoteAddress;
78  }
79 
80  void interrupt() override
81  {
82  yCDebug(SOCKETTWOWAYSTREAM, "Interrupting socket");
83  if (happy) {
84  happy = false;
85  stream.close_reader();
86  yCDebug(SOCKETTWOWAYSTREAM, "Interrupting socket reader");
87  stream.close_writer();
88  yCDebug(SOCKETTWOWAYSTREAM, "Interrupting socket writer");
89  stream.close();
90  yCDebug(SOCKETTWOWAYSTREAM, "Interrupting socket fully");
91  }
92  }
93 
94  void close() override
95  {
96  stream.close();
97  happy = false;
98  }
99 
102  {
103  if (!isOk()) {
104  return -1;
105  }
106  yarp::conf::ssize_t result;
107  if (haveReadTimeout) {
108  result = stream.recv_n(b.get(), b.length(), &readTimeout);
109  } else {
110  result = stream.recv_n(b.get(), b.length());
111  }
112  if (!happy) {
113  return -1;
114  }
115  if (result <= 0) {
116  happy = false;
117  yCDebug(SOCKETTWOWAYSTREAM, "bad socket read");
118  }
119  return result;
120  }
121 
123  {
124  if (!isOk()) {
125  return -1;
126  }
127  yarp::conf::ssize_t result;
128  if (haveReadTimeout) {
129  result = stream.recv(b.get(), b.length(), &readTimeout);
130  } else {
131  result = stream.recv(b.get(), b.length());
132  }
133  if (!happy) {
134  return -1;
135  }
136  if (result <= 0) {
137  happy = false;
138  yCDebug(SOCKETTWOWAYSTREAM, "bad socket read");
139  }
140  return result;
141  }
142 
144  void write(const Bytes& b) override
145  {
146  if (!isOk()) {
147  return;
148  }
149  yarp::conf::ssize_t result;
150  if (haveWriteTimeout) {
151  result = stream.send_n(b.get(), b.length(), &writeTimeout);
152  } else {
153  result = stream.send_n(b.get(), b.length());
154  }
155  if (result < 0) {
156  happy = false;
157  yCDebug(SOCKETTWOWAYSTREAM, "bad socket write");
158  }
159  }
160 
161  void flush() override
162  {
163 #ifdef TCP_CORK
164  int status = 0;
165  int sizeInt = sizeof(int);
166  stream.get_option(IPPROTO_TCP, TCP_CORK, &status, &sizeInt);
167  if (status == 1) {
168  // Remove CORK
169  int zero = 0;
170  stream.set_option(IPPROTO_TCP, TCP_CORK, &zero, sizeof(int));
171  // Set CORK
172  int one = 1;
173  stream.set_option(IPPROTO_TCP, TCP_CORK, &one, sizeof(int));
174  }
175 #endif
176  }
177 
178  bool isOk() const override
179  {
180  return happy;
181  }
182 
183  void reset() override
184  {
185  }
186 
187  void beginPacket() override
188  {
189 #ifdef TCP_CORK
190  // Set CORK
191  int one = 1;
192  stream.set_option(IPPROTO_TCP, TCP_CORK, &one, sizeof(int));
193 #endif
194  }
195 
196  void endPacket() override
197  {
198 #ifdef TCP_CORK
199  // Remove CORK
200  int zero = 0;
201  stream.set_option(IPPROTO_TCP, TCP_CORK, &zero, sizeof(int));
202 #endif
203  }
204 
205  bool setWriteTimeout(double timeout) override
206  {
207  if (timeout < 1e-12) {
208  haveWriteTimeout = false;
209  } else {
210  PLATFORM_TIME_SET(writeTimeout, timeout);
211  haveWriteTimeout = true;
212  }
213  return true;
214  }
215 
216  bool setReadTimeout(double timeout) override
217  {
218  if (timeout < 1e-12) {
219  haveReadTimeout = false;
220  } else {
221  PLATFORM_TIME_SET(readTimeout, timeout);
222  haveReadTimeout = true;
223  }
224  return true;
225  }
226 
227  bool setTypeOfService(int tos) override;
228  int getTypeOfService() override;
229 
230 private:
231  yarp::os::impl::TcpStream stream;
232  bool haveWriteTimeout;
233  bool haveReadTimeout;
234  YARP_timeval writeTimeout;
235  YARP_timeval readTimeout;
236  Contact localAddress, remoteAddress;
237  bool happy;
238  void updateAddresses();
239 };
240 
241 } // namespace impl
242 } // namespace os
243 } // namespace yarp
244 
245 #endif // YARP_OS_IMPL_SOCKETTWOWAYSTREAM_H
#define PLATFORM_TIME_SET(x, y)
Definition: PlatformTime.h:22
const yarp::os::LogComponent & SOCKETTWOWAYSTREAM()
A simple abstraction for a block of bytes.
Definition: Bytes.h:25
size_t length() const
Definition: Bytes.cpp:22
const char * get() const
Definition: Bytes.cpp:27
Represents how to reach a part of a YARP network.
Definition: Contact.h:36
Simple specification of the minimum functions needed from input streams.
Definition: InputStream.h:26
virtual int read()
Read and return a single byte.
Definition: InputStream.cpp:20
Simple specification of the minimum functions needed from output streams.
Definition: OutputStream.h:22
virtual void write(char ch)
Write a single byte to the stream.
A stream which can be asked to perform bidirectional communication.
Definition: TwoWayStream.h:26
A stream abstraction for socket communication.
void endPacket() override
Mark the end of a logical packet (see beginPacket).
bool isOk() const override
Check if the stream is ok or in an error state.
void beginPacket() override
Mark the beginning of a logical packet.
yarp::conf::ssize_t read(Bytes &b) override
Read a block of data from the stream.
bool setWriteTimeout(double timeout) override
Set activity timeout.
yarp::conf::ssize_t partialRead(Bytes &b) override
Like read, but solicit partial responses.
const Contact & getRemoteAddress() const override
Get the address of the remote side of the stream.
bool setReadTimeout(double timeout) override
Set activity timeout.
OutputStream & getOutputStream() override
Get an OutputStream to write to.
InputStream & getInputStream() override
Get an InputStream to read from.
void close() override
Terminate the stream.
void flush() override
Make sure all pending write operations are finished.
const Contact & getLocalAddress() const override
Get the address of the local side of the stream.
void interrupt() override
Interrupt the stream.
void write(const Bytes &b) override
Write a block of bytes to the stream.
void reset() override
Reset the stream.
#define YARP_DECLARE_LOG_COMPONENT(name)
Definition: LogComponent.h:74
#define yCDebug(component,...)
Definition: LogComponent.h:109
::ssize_t ssize_t
Definition: numeric.h:86
struct timeval YARP_timeval
Definition: PlatformTime.h:32
The main, catch-all namespace for YARP.
Definition: dirs.h:16
#define YARP_os_impl_API
Definition: api.h:46