YARP
Yet Another Robot Platform
 
Loading...
Searching...
No Matches
ShmemInputStream.cpp
Go to the documentation of this file.
1/*
2 * SPDX-FileCopyrightText: 2006-2021 Istituto Italiano di Tecnologia (IIT)
3 * SPDX-FileCopyrightText: 2006-2010 RobotCub Consortium
4 * SPDX-License-Identifier: BSD-3-Clause
5 */
6
7#include "ShmemInputStream.h"
8#include "ShmemLogComponent.h"
9
10#include <yarp/os/Bytes.h>
12
13#include <ace/Lib_Find.h>
14// In one the ACE headers there is a definition of "main" for WIN32
15# ifdef main
16# undef main
17# endif
18
19
20
22 m_bOpen(false),
23 m_ResizeNum(0),
24 m_Port(-1),
25 m_pAccessMutex(nullptr),
26 m_pWaitDataMutex(nullptr),
27 m_pMap(nullptr),
28 m_pData(nullptr),
29 m_pHeader(nullptr),
30 m_pSock(nullptr)
31{
32}
33
38
40{
41 return m_bOpen;
42}
43
44bool ShmemInputStreamImpl::open(int port, ACE_SOCK_Stream* pSock, int size)
45{
46 std::lock_guard<std::recursive_mutex> l_guard(m_generalMutex);
47
48 m_pSock = pSock;
49
51 m_pMap = nullptr;
52 m_pData = nullptr;
53 m_pHeader = nullptr;
54 m_ResizeNum = 0;
55
56 m_Port = port;
57
58 char obj_name[2048];
59 char temp_dir_path[1024];
60
61 if (ACE::get_temp_dir(temp_dir_path, 1024) == -1) {
62 yCError(SHMEMCARRIER, "ShmemHybridStream: no temp directory found.");
63 return false;
64 }
65
66#ifdef ACE_LACKS_SYSV_SHMEM
67
68 snprintf(obj_name, 2048, "%sSHMEM_FILE_%d_%d", temp_dir_path, port, 0);
69
70 m_pMap = new ACE_Shared_Memory_MM(obj_name, //const ACE_TCHAR *filename,
71 size + sizeof(ShmemHeader_t), //int len = -1,
72 O_RDWR, //int flags = O_RDWR | O_CREAT,
73 ACE_DEFAULT_FILE_PERMS, //int mode = ACE_DEFAULT_FILE_PERMS,
74 PROT_RDWR, //int prot = PROT_RDWR,
75 ACE_MAP_SHARED); //int share = ACE_MAP_PRIVATE,
76
77#else
78
79 m_pMap = new ACE_Shared_Memory_SV(port, size + sizeof(ShmemHeader_t));
80
81#endif
82
83 m_pHeader = (ShmemHeader_t*)m_pMap->malloc();
84 m_pData = (char*)(m_pHeader + 1);
85
86#ifdef _ACE_USE_SV_SEM
87 snprintf(obj_name, 2048, "%sSHMEM_ACCESS_MUTEX_%d", temp_dir_path, port);
88 m_pAccessMutex = new ACE_Mutex(USYNC_PROCESS, obj_name);
89 snprintf(obj_name, 2048, "%sSHMEM_WAITDATA_MUTEX_%d", temp_dir_path, port);
90 m_pWaitDataMutex = new ACE_Mutex(USYNC_PROCESS, obj_name);
91#else
92 snprintf(obj_name, 2048, "SHMEM_ACCESS_MUTEX_%d", port);
93 m_pAccessMutex = new ACE_Process_Mutex(obj_name);
94 snprintf(obj_name, 2048, "SHMEM_WAITDATA_MUTEX_%d", port);
95 m_pWaitDataMutex = new ACE_Process_Mutex(obj_name);
96#endif
97
98 m_pWaitDataMutex->acquire();
99
100 m_bOpen = true;
101
102 return true;
103}
104
106{
107 std::lock_guard<std::recursive_mutex> l_guard(m_generalMutex);
108
109 ++m_ResizeNum;
110
111 ACE_Shared_Memory* pNewMap;
112
113 yCDebug(SHMEMCARRIER, "input stream resize %d to %d", m_ResizeNum, m_pHeader->newsize);
114
115#ifdef ACE_LACKS_SYSV_SHMEM
116
117 char file_path[1024];
118
119 if (ACE::get_temp_dir(file_path, 1024) == -1) {
120 yCError(SHMEMCARRIER, "ShmemHybridStream: no temp directory found.");
121 return false;
122 }
123
124 char file_name[2048];
125 snprintf(file_name, 2048, "%sSHMEM_FILE_%d_%d", file_path, m_Port, m_ResizeNum);
126
127 pNewMap = new ACE_Shared_Memory_MM(file_name, //const ACE_TCHAR *filename,
128 m_pHeader->newsize + sizeof(ShmemHeader_t), //int len = -1,
129 O_RDWR, //int flags = O_RDWR | O_CREAT,
130 ACE_DEFAULT_FILE_PERMS, //int mode = ACE_DEFAULT_FILE_PERMS,
131 PROT_RDWR, //int prot = PROT_RDWR,
132 ACE_MAP_SHARED); //int share = ACE_MAP_PRIVATE,
133
134#else
135
136 int shmemkey = (m_ResizeNum << 16) + m_Port;
137
138 pNewMap = new ACE_Shared_Memory_SV(shmemkey, m_pHeader->size + sizeof(ShmemHeader_t));
139
140#endif
141
142 if (!pNewMap) {
143 yCError(SHMEMCARRIER, "ShmemOutputStream can't create shared memory");
144 return false;
145 }
146
147 auto* pNewHeader = (ShmemHeader_t*)pNewMap->malloc();
148 char* pNewData = (char*)(pNewHeader + 1);
149
150 m_pMap->close();
151 delete m_pMap;
152
153 m_pMap = pNewMap;
154 m_pHeader = pNewHeader;
155 m_pData = pNewData;
156
157 return true;
158}
159
160int ShmemInputStreamImpl::read(char* data, int len)
161{
162 std::lock_guard<std::recursive_mutex> l_guard(m_generalMutex);
163
164 m_pAccessMutex->acquire();
165
166 if (m_pHeader->close) {
167 m_pAccessMutex->release();
168 close();
169 return -1;
170 }
171
172 while (m_pHeader->resize) {
173 Resize();
174 }
175
176 if (m_pHeader->avail < len) {
178 m_pAccessMutex->release();
179 return 0;
180 }
181
182 if (m_pHeader->tail + len > m_pHeader->size) {
183 int first_block_size = m_pHeader->size - m_pHeader->tail;
184
185 memcpy((void*)data, (void*)(m_pData + m_pHeader->tail), first_block_size);
186 memcpy((void*)(data + first_block_size), (void*)m_pData, len - first_block_size);
187 } else {
188 memcpy((void*)data, (void*)(m_pData + m_pHeader->tail), len);
189 }
190
191 m_pHeader->avail -= len;
192 m_pHeader->tail += len;
194
195 m_pAccessMutex->release();
196
197 return len;
198}
199
201{
202 std::lock_guard<std::recursive_mutex> l_guard(m_generalMutex);
203
205
206 if (!m_bOpen) {
207 m_ReadSerializerMutex.unlock();
208 return -1;
209 }
210
211 char* data = b.get();
212 char* buf;
213 size_t len = b.length();
215
216 while (!(ret = read(data, (int)len))) {
217#ifdef _ACE_USE_SV_SEM
218 yarp::os::impl::YARP_timeval tv = ACE_OS::gettimeofday();
219 tv.sec(tv.sec() + 1);
220#else
222#endif
223
224 m_pWaitDataMutex->acquire(tv);
225
226 if (!m_pSock->recv(&buf, 1)) {
227 yCDebug(SHMEMCARRIER, "STREAM IS BROKEN");
228 close();
229 m_ReadSerializerMutex.unlock();
230 return -1;
231 }
232 }
233
234 m_ReadSerializerMutex.unlock();
235
236 return ret;
237}
238
240{
241 std::lock_guard<std::recursive_mutex> l_guard(m_generalMutex);
242
243 if (!m_bOpen) {
244 return;
245 }
246
247 m_bOpen = false;
248
249 m_pAccessMutex->acquire();
250 while (m_pHeader->waiting > 0) {
252 m_pWaitDataMutex->release();
253 }
254 m_pHeader->close = true;
255 m_pAccessMutex->release();
256
257 m_pAccessMutex->remove();
258 delete m_pAccessMutex;
259 m_pAccessMutex = nullptr;
260
261 m_pWaitDataMutex->remove();
262 delete m_pWaitDataMutex;
263 m_pWaitDataMutex = nullptr;
264
265 m_pMap->close();
266 delete m_pMap;
267 m_pMap = nullptr;
268}
bool ret
const yarp::os::LogComponent & SHMEMCARRIER()
ACE_SOCK_Stream * m_pSock
std::recursive_mutex m_generalMutex
yarp::conf::ssize_t read(yarp::os::Bytes &b)
ACE_Shared_Memory * m_pMap
ACE_Process_Mutex * m_pWaitDataMutex
ShmemHeader_t * m_pHeader
bool open(int port, ACE_SOCK_Stream *pSock, int size=4096)
std::mutex m_ReadSerializerMutex
ACE_Process_Mutex * m_pAccessMutex
A mini-server for performing network communication in the background.
A simple abstraction for a block of bytes.
Definition Bytes.h:24
size_t length() const
Definition Bytes.cpp:22
const char * get() const
Definition Bytes.cpp:27
#define yCError(component,...)
#define yCDebug(component,...)
::ssize_t ssize_t
Definition numeric.h:86