YARP
Yet Another Robot Platform
ShmemHybridStream.cpp
Go to the documentation of this file.
1 /*
2  * SPDX-FileCopyrightText: 2006-2021 Istituto Italiano di Tecnologia (IIT)
3  * SPDX-FileCopyrightText: 2006-2010 RobotCub Consortium
4  * SPDX-License-Identifier: BSD-3-Clause
5  */
6 
7 #include "ShmemHybridStream.h"
8 #include "ShmemLogComponent.h"
9 
11  m_bLinked(false)
12 {
13 }
14 
16 {
17  close();
18 }
19 
20 int ShmemHybridStream::open(const yarp::os::Contact& yarp_address, bool sender)
21 {
22  m_bLinked = false;
23 
24  ACE_INET_Addr ace_address(yarp_address.getPort(), yarp_address.getHost().c_str());
25 
26  if (sender) {
27  return connect(ace_address);
28  } else {
29  ACE_INET_Addr ace_server_addr(ace_address.get_port_number());
30 
31  int result = m_Acceptor.open(ace_server_addr);
32 
33  if (result < 0) {
34  yCError(SHMEMCARRIER, "ShmemHybridStream open result %d", result);
35  return result;
36  }
37 
38  m_Acceptor.get_local_addr(ace_server_addr);
39 
40  m_LocalAddress = yarp::os::Contact(ace_server_addr.get_host_addr(), ace_server_addr.get_port_number());
41  m_RemoteAddress = m_LocalAddress; // finalized in call to accept()
42 
43  return result;
44  }
45 
46  return 1;
47 }
48 
50 {
51  if (m_bLinked) {
52  return -1;
53  }
54 
55  yarp::conf::ssize_t result = m_Acceptor.accept(m_SockStream);
56 
57  if (result < 0) {
58  yCError(SHMEMCARRIER, "ShmemHybridStream server returned %zd", result);
59  close();
60  return -1;
61  }
62 
63  ACE_INET_Addr local, remote;
64  m_SockStream.get_local_addr(local);
65  m_SockStream.get_remote_addr(remote);
66  m_LocalAddress = yarp::os::Contact(local.get_host_addr(), local.get_port_number());
67  m_RemoteAddress = yarp::os::Contact(remote.get_host_addr(), remote.get_port_number());
68 
69  ShmemPacket_t recv_conn_data;
70  result = m_SockStream.recv_n(&recv_conn_data, sizeof recv_conn_data);
71  if (result <= 0) {
72  yCError(SHMEMCARRIER, "Socket returned %zd", result);
73  close();
74  return -1;
75  }
76 
77  if (!in.open(m_RemoteAddress.getPort(), &m_SockStream)) {
78  yCError(SHMEMCARRIER, "ShmemHybridStream can't create shared memory");
79  close();
80  return -1;
81  }
82 
83  if (!out.open(m_LocalAddress.getPort())) {
84  yCError(SHMEMCARRIER, "ShmemHybridStream can't create shared memory");
85  close();
86  return -1;
87  }
88 
89  ShmemPacket_t send_conn_data;
90  send_conn_data.command = ACKNOWLEDGE;
91  if (m_SockStream.send_n(&send_conn_data, sizeof send_conn_data) <= 0) {
92  yCError(SHMEMCARRIER, "ShmemHybridStream socket writing error");
93  close();
94  return -1;
95  }
96 
97  m_bLinked = true;
98 
99  m_SockStream.enable(ACE_NONBLOCK);
100 
101  return 0;
102 }
103 
104 int ShmemHybridStream::connect(const ACE_INET_Addr& ace_address)
105 {
106  if (m_bLinked) {
107  return -1;
108  }
109 
110  ACE_SOCK_Connector connector;
111  yarp::conf::ssize_t result = connector.connect(m_SockStream, ace_address);
112  if (result < 0) {
113  yCError(SHMEMCARRIER, "ShmemHybridStream client returned %zd", result);
114  close();
115  return -1;
116  }
117 
118  ACE_INET_Addr local, remote;
119  m_SockStream.get_local_addr(local);
120  m_SockStream.get_remote_addr(remote);
121  m_LocalAddress = yarp::os::Contact(local.get_host_addr(), local.get_port_number());
122  m_RemoteAddress = yarp::os::Contact(remote.get_host_addr(), remote.get_port_number());
123 
124  out.open(m_LocalAddress.getPort());
125 
126  ShmemPacket_t send_conn_data;
127  send_conn_data.command = CONNECT;
128  send_conn_data.size = SHMEM_DEFAULT_SIZE;
129  result = m_SockStream.send_n(&send_conn_data, sizeof send_conn_data);
130  if (result <= 0) {
131  yCError(SHMEMCARRIER, "Socket returned %zd", result);
132  close();
133  return -1;
134  }
135 
136  ShmemPacket_t recv_conn_data;
137  result = m_SockStream.recv_n(&recv_conn_data, sizeof recv_conn_data);
138  if (result <= 0) {
139  yCError(SHMEMCARRIER, "Socket returned %zd", result);
140  close();
141  return -1;
142  }
143 
144  in.open(m_RemoteAddress.getPort(), &m_SockStream);
145 
146  m_bLinked = true;
147 
148  m_SockStream.enable(ACE_NONBLOCK);
149 
150  return 0;
151 }
152 
154 {
155  m_bLinked = false;
156  in.close();
157  out.close();
158 }
159 
161 {
162  yCDebug(SHMEMCARRIER, "INTERRUPT");
163  close();
164 }
165 
167 {
168  if (!out.write(b)) {
169  close();
170  }
171 }
172 
174 {
175  yarp::conf::ssize_t ret = in.read(b);
176  if (ret == -1) {
177  close();
178  }
179  return ret;
180 }
181 
183 {
184  return *this;
185 }
186 
188 {
189  return *this;
190 }
191 
193 {
194  return m_bLinked && in.isOk() && out.isOk();
195 }
196 
198 {
199  yCDebug(SHMEMCARRIER, "RECEIVED RESET COMMAND");
200  close();
201 }
202 
204 {
205 }
206 
208 {
209 }
210 
212 {
213  return m_LocalAddress;
214 }
215 
217 {
218  return m_RemoteAddress;
219 }
bool ret
const yarp::os::LogComponent & SHMEMCARRIER()
#define SHMEM_DEFAULT_SIZE
Definition: ShmemTypes.h:10
void write(const yarp::os::Bytes &b) override
Write a block of bytes to the stream.
void reset() override
Reset the stream.
yarp::os::OutputStream & getOutputStream() override
Get an OutputStream to write to.
void interrupt() override
Interrupt the stream.
int open(const yarp::os::Contact &yarp_address, bool sender)
bool isOk() const override
Check if the stream is ok or in an error state.
const yarp::os::Contact & getRemoteAddress() const override
Get the address of the remote side of the stream.
void beginPacket() override
Mark the beginning of a logical packet.
const yarp::os::Contact & getLocalAddress() const override
Get the address of the local side of the stream.
void endPacket() override
Mark the end of a logical packet (see beginPacket).
yarp::os::InputStream & getInputStream() override
Get an InputStream to read from.
void close() override
Terminate the stream.
yarp::conf::ssize_t read(yarp::os::Bytes &b)
bool open(int port, ACE_SOCK_Stream *pSock, int size=4096)
bool write(const yarp::os::Bytes &b)
bool open(int port, int size=4096)
A simple abstraction for a block of bytes.
Definition: Bytes.h:25
Represents how to reach a part of a YARP network.
Definition: Contact.h:36
int getPort() const
Get the port number associated with this Contact for socket communication.
Definition: Contact.cpp:239
std::string getHost() const
Get the host name associated with this Contact for socket communication.
Definition: Contact.cpp:228
Simple specification of the minimum functions needed from input streams.
Definition: InputStream.h:26
virtual int read()
Read and return a single byte.
Definition: InputStream.cpp:20
Simple specification of the minimum functions needed from output streams.
Definition: OutputStream.h:22
#define yCError(component,...)
Definition: LogComponent.h:154
#define yCDebug(component,...)
Definition: LogComponent.h:109
::ssize_t ssize_t
Definition: numeric.h:86