YARP
Yet Another Robot Platform
ShmemInputStream.cpp
Go to the documentation of this file.
1/*
2 * SPDX-FileCopyrightText: 2006-2021 Istituto Italiano di Tecnologia (IIT)
3 * SPDX-FileCopyrightText: 2006-2010 RobotCub Consortium
4 * SPDX-License-Identifier: BSD-3-Clause
5 */
6
7#include "ShmemInputStream.h"
8#include "ShmemLogComponent.h"
9
10#include <yarp/os/Bytes.h>
12
13#include <ace/Lib_Find.h>
14// In one the ACE headers there is a definition of "main" for WIN32
15# ifdef main
16# undef main
17# endif
18
19
20
22 m_bOpen(false),
23 m_ResizeNum(0),
24 m_Port(-1),
25 m_pAccessMutex(nullptr),
26 m_pWaitDataMutex(nullptr),
27 m_pMap(nullptr),
28 m_pData(nullptr),
29 m_pHeader(nullptr),
30 m_pSock(nullptr)
31{
32}
33
35{
36 close();
37}
38
40{
41 return m_bOpen;
42}
43
44bool ShmemInputStreamImpl::open(int port, ACE_SOCK_Stream* pSock, int size)
45{
46 m_pSock = pSock;
47
49 m_pMap = nullptr;
50 m_pData = nullptr;
51 m_pHeader = nullptr;
52 m_ResizeNum = 0;
53
54 m_Port = port;
55
56 char obj_name[2048];
57 char temp_dir_path[1024];
58
59 if (ACE::get_temp_dir(temp_dir_path, 1024) == -1) {
60 yCError(SHMEMCARRIER, "ShmemHybridStream: no temp directory found.");
61 return false;
62 }
63
64#ifdef ACE_LACKS_SYSV_SHMEM
65
66 snprintf(obj_name, 2048, "%sSHMEM_FILE_%d_%d", temp_dir_path, port, 0);
67
68 m_pMap = new ACE_Shared_Memory_MM(obj_name, //const ACE_TCHAR *filename,
69 size + sizeof(ShmemHeader_t), //int len = -1,
70 O_RDWR, //int flags = O_RDWR | O_CREAT,
71 ACE_DEFAULT_FILE_PERMS, //int mode = ACE_DEFAULT_FILE_PERMS,
72 PROT_RDWR, //int prot = PROT_RDWR,
73 ACE_MAP_SHARED); //int share = ACE_MAP_PRIVATE,
74
75#else
76
77 m_pMap = new ACE_Shared_Memory_SV(port, size + sizeof(ShmemHeader_t));
78
79#endif
80
81 m_pHeader = (ShmemHeader_t*)m_pMap->malloc();
82 m_pData = (char*)(m_pHeader + 1);
83
84#ifdef _ACE_USE_SV_SEM
85 snprintf(obj_name, 2048, "%sSHMEM_ACCESS_MUTEX_%d", temp_dir_path, port);
86 m_pAccessMutex = new ACE_Mutex(USYNC_PROCESS, obj_name);
87 snprintf(obj_name, 2048, "%sSHMEM_WAITDATA_MUTEX_%d", temp_dir_path, port);
88 m_pWaitDataMutex = new ACE_Mutex(USYNC_PROCESS, obj_name);
89#else
90 snprintf(obj_name, 2048, "SHMEM_ACCESS_MUTEX_%d", port);
91 m_pAccessMutex = new ACE_Process_Mutex(obj_name);
92 snprintf(obj_name, 2048, "SHMEM_WAITDATA_MUTEX_%d", port);
93 m_pWaitDataMutex = new ACE_Process_Mutex(obj_name);
94#endif
95
96 m_pWaitDataMutex->acquire();
97
98 m_bOpen = true;
99
100 return true;
101}
102
104{
105 ++m_ResizeNum;
106
107 ACE_Shared_Memory* pNewMap;
108
109 yCDebug(SHMEMCARRIER, "input stream resize %d to %d", m_ResizeNum, m_pHeader->newsize);
110
111#ifdef ACE_LACKS_SYSV_SHMEM
112
113 char file_path[1024];
114
115 if (ACE::get_temp_dir(file_path, 1024) == -1) {
116 yCError(SHMEMCARRIER, "ShmemHybridStream: no temp directory found.");
117 return false;
118 }
119
120 char file_name[2048];
121 snprintf(file_name, 2048, "%sSHMEM_FILE_%d_%d", file_path, m_Port, m_ResizeNum);
122
123 pNewMap = new ACE_Shared_Memory_MM(file_name, //const ACE_TCHAR *filename,
124 m_pHeader->newsize + sizeof(ShmemHeader_t), //int len = -1,
125 O_RDWR, //int flags = O_RDWR | O_CREAT,
126 ACE_DEFAULT_FILE_PERMS, //int mode = ACE_DEFAULT_FILE_PERMS,
127 PROT_RDWR, //int prot = PROT_RDWR,
128 ACE_MAP_SHARED); //int share = ACE_MAP_PRIVATE,
129
130#else
131
132 int shmemkey = (m_ResizeNum << 16) + m_Port;
133
134 pNewMap = new ACE_Shared_Memory_SV(shmemkey, m_pHeader->size + sizeof(ShmemHeader_t));
135
136#endif
137
138 if (!pNewMap) {
139 yCError(SHMEMCARRIER, "ShmemOutputStream can't create shared memory");
140 return false;
141 }
142
143 auto* pNewHeader = (ShmemHeader_t*)pNewMap->malloc();
144 char* pNewData = (char*)(pNewHeader + 1);
145
146 m_pMap->close();
147 delete m_pMap;
148
149 m_pMap = pNewMap;
150 m_pHeader = pNewHeader;
151 m_pData = pNewData;
152
153 return true;
154}
155
156int ShmemInputStreamImpl::read(char* data, int len)
157{
158 m_pAccessMutex->acquire();
159
160 if (m_pHeader->close) {
161 m_pAccessMutex->release();
162 close();
163 return -1;
164 }
165
166 while (m_pHeader->resize) {
167 Resize();
168 }
169
170 if (m_pHeader->avail < len) {
172 m_pAccessMutex->release();
173 return 0;
174 }
175
176 if (m_pHeader->tail + len > m_pHeader->size) {
177 int first_block_size = m_pHeader->size - m_pHeader->tail;
178
179 memcpy((void*)data, (void*)(m_pData + m_pHeader->tail), first_block_size);
180 memcpy((void*)(data + first_block_size), (void*)m_pData, len - first_block_size);
181 } else {
182 memcpy((void*)data, (void*)(m_pData + m_pHeader->tail), len);
183 }
184
185 m_pHeader->avail -= len;
186 m_pHeader->tail += len;
188
189 m_pAccessMutex->release();
190
191 return len;
192}
193
195{
197
198 if (!m_bOpen) {
199 m_ReadSerializerMutex.unlock();
200 return -1;
201 }
202
203 char* data = b.get();
204 char* buf;
205 size_t len = b.length();
207
208 while (!(ret = read(data, (int)len))) {
209#ifdef _ACE_USE_SV_SEM
210 yarp::os::impl::YARP_timeval tv = ACE_OS::gettimeofday();
211 tv.sec(tv.sec() + 1);
212#else
214#endif
215
216 m_pWaitDataMutex->acquire(tv);
217
218 if (!m_pSock->recv(&buf, 1)) {
219 yCDebug(SHMEMCARRIER, "STREAM IS BROKEN");
220 close();
221 m_ReadSerializerMutex.unlock();
222 return -1;
223 }
224 }
225
226 m_ReadSerializerMutex.unlock();
227
228 return ret;
229}
230
232{
233 if (!m_bOpen) {
234 return;
235 }
236
237 m_bOpen = false;
238
239 m_pAccessMutex->acquire();
240 while (m_pHeader->waiting > 0) {
242 m_pWaitDataMutex->release();
243 }
244 m_pHeader->close = true;
245 m_pAccessMutex->release();
246
247 m_pAccessMutex->remove();
248 delete m_pAccessMutex;
249 m_pAccessMutex = nullptr;
250
251 m_pWaitDataMutex->remove();
252 delete m_pWaitDataMutex;
253 m_pWaitDataMutex = nullptr;
254
255 m_pMap->close();
256 delete m_pMap;
257 m_pMap = nullptr;
258}
bool ret
const yarp::os::LogComponent & SHMEMCARRIER()
ACE_SOCK_Stream * m_pSock
yarp::conf::ssize_t read(yarp::os::Bytes &b)
ACE_Shared_Memory * m_pMap
ACE_Process_Mutex * m_pWaitDataMutex
ShmemHeader_t * m_pHeader
bool open(int port, ACE_SOCK_Stream *pSock, int size=4096)
std::mutex m_ReadSerializerMutex
ACE_Process_Mutex * m_pAccessMutex
A simple abstraction for a block of bytes.
Definition: Bytes.h:24
size_t length() const
Definition: Bytes.cpp:22
const char * get() const
Definition: Bytes.cpp:27
#define yCError(component,...)
Definition: LogComponent.h:213
#define yCDebug(component,...)
Definition: LogComponent.h:128
::ssize_t ssize_t
Definition: numeric.h:86
struct timeval YARP_timeval
Definition: PlatformTime.h:30