YARP
Yet Another Robot Platform
SocketTwoWayStream.h
Go to the documentation of this file.
1/*
2 * SPDX-FileCopyrightText: 2006-2021 Istituto Italiano di Tecnologia (IIT)
3 * SPDX-FileCopyrightText: 2006-2010 RobotCub Consortium
4 * SPDX-License-Identifier: BSD-3-Clause
5 */
6
7#ifndef YARP_OS_IMPL_SOCKETTWOWAYSTREAM_H
8#define YARP_OS_IMPL_SOCKETTWOWAYSTREAM_H
9
10#include <yarp/conf/system.h>
11
12#include <yarp/os/Bytes.h>
18
19#ifdef YARP_HAS_ACE // For TCP_CORK definition
20# include <ace/os_include/netinet/os_tcp.h>
21// In one the ACE headers there is a definition of "main" for WIN32
22# ifdef main
23# undef main
24# endif
25#else
26# include <netinet/tcp.h>
27#endif
28
30
31namespace yarp::os::impl {
32
37 public TwoWayStream,
38 public InputStream,
39 public OutputStream
40{
41public:
43 haveWriteTimeout(false),
44 haveReadTimeout(false),
45 happy(false)
46 {
47 }
48
49 int open(const Contact& address);
50
51 int open(yarp::os::impl::TcpAcceptor& acceptor);
52
54 {
55 close();
56 }
57
59 {
60 return *this;
61 }
62
64 {
65 return *this;
66 }
67
68 const Contact& getLocalAddress() const override
69 {
70 return localAddress;
71 }
72
73 const Contact& getRemoteAddress() const override
74 {
75 return remoteAddress;
76 }
77
78 void interrupt() override
79 {
80 yCDebug(SOCKETTWOWAYSTREAM, "Interrupting socket");
81 if (happy) {
82 happy = false;
83 stream.close_reader();
84 yCDebug(SOCKETTWOWAYSTREAM, "Interrupting socket reader");
85 stream.close_writer();
86 yCDebug(SOCKETTWOWAYSTREAM, "Interrupting socket writer");
87 stream.close();
88 yCDebug(SOCKETTWOWAYSTREAM, "Interrupting socket fully");
89 }
90 }
91
92 void close() override
93 {
94 stream.close();
95 happy = false;
96 }
97
100 {
101 if (!isOk()) {
102 return -1;
103 }
104 yarp::conf::ssize_t result;
105 if (haveReadTimeout) {
106 result = stream.recv_n(b.get(), b.length(), &readTimeout);
107 } else {
108 result = stream.recv_n(b.get(), b.length());
109 }
110 if (!happy) {
111 return -1;
112 }
113 if (result <= 0) {
114 happy = false;
115 yCDebug(SOCKETTWOWAYSTREAM, "bad socket read");
116 }
117 return result;
118 }
119
121 {
122 if (!isOk()) {
123 return -1;
124 }
125 yarp::conf::ssize_t result;
126 if (haveReadTimeout) {
127 result = stream.recv(b.get(), b.length(), &readTimeout);
128 } else {
129 result = stream.recv(b.get(), b.length());
130 }
131 if (!happy) {
132 return -1;
133 }
134 if (result <= 0) {
135 happy = false;
136 yCDebug(SOCKETTWOWAYSTREAM, "bad socket read");
137 }
138 return result;
139 }
140
142 void write(const Bytes& b) override
143 {
144 if (!isOk()) {
145 return;
146 }
147 yarp::conf::ssize_t result;
148 if (haveWriteTimeout) {
149 result = stream.send_n(b.get(), b.length(), &writeTimeout);
150 } else {
151 result = stream.send_n(b.get(), b.length());
152 }
153 if (result < 0) {
154 happy = false;
155 yCDebug(SOCKETTWOWAYSTREAM, "bad socket write");
156 }
157 }
158
159 void flush() override
160 {
161#ifdef TCP_CORK
162 int status = 0;
163 int sizeInt = sizeof(int);
164 stream.get_option(IPPROTO_TCP, TCP_CORK, &status, &sizeInt);
165 if (status == 1) {
166 // Remove CORK
167 int zero = 0;
168 stream.set_option(IPPROTO_TCP, TCP_CORK, &zero, sizeof(int));
169 // Set CORK
170 int one = 1;
171 stream.set_option(IPPROTO_TCP, TCP_CORK, &one, sizeof(int));
172 }
173#endif
174 }
175
176 bool isOk() const override
177 {
178 return happy;
179 }
180
181 void reset() override
182 {
183 }
184
185 void beginPacket() override
186 {
187#ifdef TCP_CORK
188 // Set CORK
189 int one = 1;
190 stream.set_option(IPPROTO_TCP, TCP_CORK, &one, sizeof(int));
191#endif
192 }
193
194 void endPacket() override
195 {
196#ifdef TCP_CORK
197 // Remove CORK
198 int zero = 0;
199 stream.set_option(IPPROTO_TCP, TCP_CORK, &zero, sizeof(int));
200#endif
201 }
202
203 bool setWriteTimeout(double timeout) override
204 {
205 if (timeout < 1e-12) {
206 haveWriteTimeout = false;
207 } else {
208 PLATFORM_TIME_SET(writeTimeout, timeout);
209 haveWriteTimeout = true;
210 }
211 return true;
212 }
213
214 bool setReadTimeout(double timeout) override
215 {
216 if (timeout < 1e-12) {
217 haveReadTimeout = false;
218 } else {
219 PLATFORM_TIME_SET(readTimeout, timeout);
220 haveReadTimeout = true;
221 }
222 return true;
223 }
224
225 bool setTypeOfService(int tos) override;
226 int getTypeOfService() override;
227
228private:
229 yarp::os::impl::TcpStream stream;
230 bool haveWriteTimeout;
231 bool haveReadTimeout;
232 YARP_timeval writeTimeout;
233 YARP_timeval readTimeout;
234 Contact localAddress, remoteAddress;
235 bool happy;
236 void updateAddresses();
237};
238
239} // namespace yarp::os::impl
240
241#endif // YARP_OS_IMPL_SOCKETTWOWAYSTREAM_H
#define PLATFORM_TIME_SET(x, y)
Definition: PlatformTime.h:22
const yarp::os::LogComponent & SOCKETTWOWAYSTREAM()
A simple abstraction for a block of bytes.
Definition: Bytes.h:24
size_t length() const
Definition: Bytes.cpp:22
const char * get() const
Definition: Bytes.cpp:27
Represents how to reach a part of a YARP network.
Definition: Contact.h:33
Simple specification of the minimum functions needed from input streams.
Definition: InputStream.h:25
virtual int read()
Read and return a single byte.
Definition: InputStream.cpp:20
Simple specification of the minimum functions needed from output streams.
Definition: OutputStream.h:21
virtual void write(char ch)
Write a single byte to the stream.
A stream which can be asked to perform bidirectional communication.
Definition: TwoWayStream.h:25
A stream abstraction for socket communication.
void endPacket() override
Mark the end of a logical packet (see beginPacket).
bool isOk() const override
Check if the stream is ok or in an error state.
void beginPacket() override
Mark the beginning of a logical packet.
yarp::conf::ssize_t read(Bytes &b) override
Read a block of data from the stream.
bool setWriteTimeout(double timeout) override
Set activity timeout.
yarp::conf::ssize_t partialRead(Bytes &b) override
Like read, but solicit partial responses.
bool setReadTimeout(double timeout) override
Set activity timeout.
InputStream & getInputStream() override
Get an InputStream to read from.
const Contact & getLocalAddress() const override
Get the address of the local side of the stream.
void close() override
Terminate the stream.
void flush() override
Make sure all pending write operations are finished.
void interrupt() override
Interrupt the stream.
void write(const Bytes &b) override
Write a block of bytes to the stream.
void reset() override
Reset the stream.
const Contact & getRemoteAddress() const override
Get the address of the remote side of the stream.
OutputStream & getOutputStream() override
Get an OutputStream to write to.
#define YARP_DECLARE_LOG_COMPONENT(name)
Definition: LogComponent.h:73
#define yCDebug(component,...)
Definition: LogComponent.h:128
::ssize_t ssize_t
Definition: numeric.h:86
The components from which ports and connections are built.
struct timeval YARP_timeval
Definition: PlatformTime.h:30
#define YARP_os_impl_API
Definition: api.h:46