YARP
Yet Another Robot Platform
 
Loading...
Searching...
No Matches
ShmemHybridStream.cpp
Go to the documentation of this file.
1/*
2 * SPDX-FileCopyrightText: 2006-2021 Istituto Italiano di Tecnologia (IIT)
3 * SPDX-FileCopyrightText: 2006-2010 RobotCub Consortium
4 * SPDX-License-Identifier: BSD-3-Clause
5 */
6
7#include "ShmemHybridStream.h"
8#include "ShmemLogComponent.h"
9
11 m_bLinked(false)
12{
13}
14
19
20int ShmemHybridStream::open(const yarp::os::Contact& yarp_address, bool sender)
21{
22 m_bLinked = false;
23
24 ACE_INET_Addr ace_address(yarp_address.getPort(), yarp_address.getHost().c_str());
25
26 if (sender) {
27 return connect(ace_address);
28 } else {
29 ACE_INET_Addr ace_server_addr(ace_address.get_port_number());
30
31 int result = m_Acceptor.open(ace_server_addr);
32
33 if (result < 0) {
34 yCError(SHMEMCARRIER, "ShmemHybridStream open result %d", result);
35 return result;
36 }
37
38 m_Acceptor.get_local_addr(ace_server_addr);
39
40 m_LocalAddress = yarp::os::Contact(ace_server_addr.get_host_addr(), ace_server_addr.get_port_number());
41 m_RemoteAddress = m_LocalAddress; // finalized in call to accept()
42
43 return result;
44 }
45
46 return 1;
47}
48
50{
51 if (m_bLinked) {
52 return -1;
53 }
54
55 yarp::conf::ssize_t result = m_Acceptor.accept(m_SockStream);
56
57 if (result < 0) {
58 yCError(SHMEMCARRIER, "ShmemHybridStream server returned %zd", result);
59 close();
60 return -1;
61 }
62
63 ACE_INET_Addr local, remote;
64 m_SockStream.get_local_addr(local);
65 m_SockStream.get_remote_addr(remote);
66 m_LocalAddress = yarp::os::Contact(local.get_host_addr(), local.get_port_number());
67 m_RemoteAddress = yarp::os::Contact(remote.get_host_addr(), remote.get_port_number());
68
69 ShmemPacket_t recv_conn_data;
70 result = m_SockStream.recv_n(&recv_conn_data, sizeof recv_conn_data);
71 if (result <= 0) {
72 yCError(SHMEMCARRIER, "Socket returned %zd", result);
73 close();
74 return -1;
75 }
76
77 if (!in.open(m_RemoteAddress.getPort(), &m_SockStream)) {
78 yCError(SHMEMCARRIER, "ShmemHybridStream can't create shared memory");
79 close();
80 return -1;
81 }
82
83 if (!out.open(m_LocalAddress.getPort())) {
84 yCError(SHMEMCARRIER, "ShmemHybridStream can't create shared memory");
85 close();
86 return -1;
87 }
88
89 ShmemPacket_t send_conn_data;
90 send_conn_data.command = ACKNOWLEDGE;
91 if (m_SockStream.send_n(&send_conn_data, sizeof send_conn_data) <= 0) {
92 yCError(SHMEMCARRIER, "ShmemHybridStream socket writing error");
93 close();
94 return -1;
95 }
96
97 m_bLinked = true;
98
99 m_SockStream.enable(ACE_NONBLOCK);
100
101 return 0;
102}
103
104int ShmemHybridStream::connect(const ACE_INET_Addr& ace_address)
105{
106 if (m_bLinked) {
107 return -1;
108 }
109
110 ACE_SOCK_Connector connector;
111 yarp::conf::ssize_t result = connector.connect(m_SockStream, ace_address);
112 if (result < 0) {
113 yCError(SHMEMCARRIER, "ShmemHybridStream client returned %zd", result);
114 close();
115 return -1;
116 }
117
118 ACE_INET_Addr local, remote;
119 m_SockStream.get_local_addr(local);
120 m_SockStream.get_remote_addr(remote);
121 m_LocalAddress = yarp::os::Contact(local.get_host_addr(), local.get_port_number());
122 m_RemoteAddress = yarp::os::Contact(remote.get_host_addr(), remote.get_port_number());
123
124 out.open(m_LocalAddress.getPort());
125
126 ShmemPacket_t send_conn_data;
127 send_conn_data.command = CONNECT;
128 send_conn_data.size = SHMEM_DEFAULT_SIZE;
129 result = m_SockStream.send_n(&send_conn_data, sizeof send_conn_data);
130 if (result <= 0) {
131 yCError(SHMEMCARRIER, "Socket returned %zd", result);
132 close();
133 return -1;
134 }
135
136 ShmemPacket_t recv_conn_data;
137 result = m_SockStream.recv_n(&recv_conn_data, sizeof recv_conn_data);
138 if (result <= 0) {
139 yCError(SHMEMCARRIER, "Socket returned %zd", result);
140 close();
141 return -1;
142 }
143
144 in.open(m_RemoteAddress.getPort(), &m_SockStream);
145
146 m_bLinked = true;
147
148 m_SockStream.enable(ACE_NONBLOCK);
149
150 return 0;
151}
152
154{
155 m_bLinked = false;
156 in.close();
157 out.close();
158}
159
161{
162 yCDebug(SHMEMCARRIER, "INTERRUPT");
163 close();
164}
165
167{
168 if (!out.write(b)) {
169 close();
170 }
171}
172
174{
176 if (ret == -1) {
177 close();
178 }
179 return ret;
180}
181
186
191
193{
194 return m_bLinked && in.isOk() && out.isOk();
195}
196
198{
199 yCDebug(SHMEMCARRIER, "RECEIVED RESET COMMAND");
200 close();
201}
202
206
210
212{
213 return m_LocalAddress;
214}
215
217{
218 return m_RemoteAddress;
219}
bool ret
const yarp::os::LogComponent & SHMEMCARRIER()
#define SHMEM_DEFAULT_SIZE
Definition ShmemTypes.h:10
void write(const yarp::os::Bytes &b) override
Write a block of bytes to the stream.
void reset() override
Reset the stream.
yarp::os::OutputStream & getOutputStream() override
Get an OutputStream to write to.
void interrupt() override
Interrupt the stream.
int open(const yarp::os::Contact &yarp_address, bool sender)
bool isOk() const override
Check if the stream is ok or in an error state.
const yarp::os::Contact & getRemoteAddress() const override
Get the address of the remote side of the stream.
void beginPacket() override
Mark the beginning of a logical packet.
const yarp::os::Contact & getLocalAddress() const override
Get the address of the local side of the stream.
void endPacket() override
Mark the end of a logical packet (see beginPacket).
yarp::os::InputStream & getInputStream() override
Get an InputStream to read from.
void close() override
Terminate the stream.
yarp::conf::ssize_t read(yarp::os::Bytes &b)
bool open(int port, ACE_SOCK_Stream *pSock, int size=4096)
bool write(const yarp::os::Bytes &b)
bool open(int port, int size=4096)
A simple abstraction for a block of bytes.
Definition Bytes.h:24
Represents how to reach a part of a YARP network.
Definition Contact.h:33
int getPort() const
Get the port number associated with this Contact for socket communication.
Definition Contact.cpp:239
std::string getHost() const
Get the host name associated with this Contact for socket communication.
Definition Contact.cpp:228
Simple specification of the minimum functions needed from input streams.
Definition InputStream.h:25
virtual int read()
Read and return a single byte.
Simple specification of the minimum functions needed from output streams.
#define yCError(component,...)
#define yCDebug(component,...)
::ssize_t ssize_t
Definition numeric.h:86