YARP
Yet Another Robot Platform
 
Loading...
Searching...
No Matches
ShmemOutputStream.cpp
Go to the documentation of this file.
1/*
2 * SPDX-FileCopyrightText: 2006-2021 Istituto Italiano di Tecnologia (IIT)
3 * SPDX-FileCopyrightText: 2006-2010 RobotCub Consortium
4 * SPDX-License-Identifier: BSD-3-Clause
5 */
6
7#include "ShmemOutputStream.h"
8#include "ShmemLogComponent.h"
9
10#include <yarp/conf/numeric.h>
11#include <yarp/os/Bytes.h>
12#include <ace/Lib_Find.h>
13// In one the ACE headers there is a definition of "main" for WIN32
14# ifdef main
15# undef main
16# endif
17
18
19
21 m_bOpen(false),
22 m_ResizeNum(0),
23 m_Port(0),
24 m_pAccessMutex(nullptr),
25 m_pWaitDataMutex(nullptr),
26 m_pMap(nullptr),
27 m_pData(nullptr),
28 m_pHeader(nullptr)
29{
30}
31
36
38{
39 return m_bOpen;
40}
41
42bool ShmemOutputStreamImpl::open(int port, int size)
43{
45 m_pMap = nullptr;
46 m_pData = nullptr;
47 m_pHeader = nullptr;
48 m_ResizeNum = 0;
49 m_Port = port;
50
51 char obj_name[2048];
52 char temp_dir_path[1024];
53
54 if (ACE::get_temp_dir(temp_dir_path, 1024) == -1) {
55 yCError(SHMEMCARRIER, "ShmemHybridStream: no temp directory found.");
56 return false;
57 }
58
59#ifdef ACE_LACKS_SYSV_SHMEM
60
61 snprintf(obj_name, 2048, "%sSHMEM_FILE_%d_0", temp_dir_path, port);
62
63 m_pMap = new ACE_Shared_Memory_MM(obj_name, //const ACE_TCHAR *filename,
64 size + sizeof(ShmemHeader_t), //int len = -1,
65 O_RDWR | O_CREAT, //int flags = O_RDWR | O_CREAT,
66 ACE_DEFAULT_FILE_PERMS, //int mode = ACE_DEFAULT_FILE_PERMS,
67 PROT_RDWR, //int prot = PROT_RDWR,
68 ACE_MAP_SHARED); //int share = ACE_MAP_PRIVATE,
69
70#else
71
72 m_pMap = new ACE_Shared_Memory_SV(port, size + sizeof(ShmemHeader_t), ACE_Shared_Memory_SV::ACE_CREATE);
73
74#endif
75
76 m_pHeader = (ShmemHeader_t*)m_pMap->malloc();
77 m_pData = (char*)(m_pHeader + 1);
78
79#ifdef _ACE_USE_SV_SEM
80 snprintf(obj_name, 2048, "%sSHMEM_ACCESS_MUTEX_%d", temp_dir_path, port);
81 m_pAccessMutex = new ACE_Mutex(USYNC_PROCESS, obj_name);
82 snprintf(obj_name, 2048, "%sSHMEM_WAITDATA_MUTEX_%d", temp_dir_path, port);
83 m_pWaitDataMutex = new ACE_Mutex(USYNC_PROCESS, obj_name);
84#else
85 snprintf(obj_name, 2048, "SHMEM_ACCESS_MUTEX_%d", port);
86 m_pAccessMutex = new ACE_Process_Mutex(obj_name);
87 snprintf(obj_name, 2048, "SHMEM_WAITDATA_MUTEX_%d", port);
88 m_pWaitDataMutex = new ACE_Process_Mutex(obj_name);
89#endif
90
91 m_pAccessMutex->acquire();
92
93 m_pHeader->resize = false;
94 m_pHeader->close = false;
95
96 m_pHeader->avail = 0;
97 m_pHeader->head = 0;
98 m_pHeader->size = size;
99 m_pHeader->tail = 0;
100 m_pHeader->waiting = 0;
101
102 m_pAccessMutex->release();
103
104 m_bOpen = true;
105
106 return true;
107}
108
110{
111 ++m_ResizeNum;
112
113 yCDebug(SHMEMCARRIER, "output stream resize %d to %d\n", m_ResizeNum, newsize);
114
115 ACE_Shared_Memory* pNewMap;
116
117 m_pHeader->resize = true;
118 m_pHeader->newsize = newsize;
119
120#ifdef ACE_LACKS_SYSV_SHMEM
121
122 char file_path[1024];
123
124 if (ACE::get_temp_dir(file_path, 1024) == -1) {
125 yCError(SHMEMCARRIER, "ShmemHybridStream: no temp directory found.");
126 return false;
127 }
128
129 char file_name[2048];
130 snprintf(file_name, 2048, "%sSHMEM_FILE_%d_%d", file_path, m_Port, m_ResizeNum);
131
132 pNewMap = new ACE_Shared_Memory_MM(file_name, //const ACE_TCHAR *filename,
133 newsize + sizeof(ShmemHeader_t), //int len = -1,
134 O_RDWR | O_CREAT, //int flags = O_RDWR | O_CREAT,
135 ACE_DEFAULT_FILE_PERMS, //int mode = ACE_DEFAULT_FILE_PERMS,
136 PROT_RDWR, //int prot = PROT_RDWR,
137 ACE_MAP_SHARED); //int share = ACE_MAP_PRIVATE,
138
139#else
140
141 int shmemkey = (m_ResizeNum << 16) + m_Port;
142
143 pNewMap = new ACE_Shared_Memory_SV(shmemkey, newsize + sizeof(ShmemHeader_t), ACE_Shared_Memory_SV::ACE_CREATE);
144
145#endif
146
147 if (!pNewMap) {
148 yCError(SHMEMCARRIER, "ShmemOutputStream can't create shared memory");
149 return false;
150 }
151
152 auto* pNewHeader = (ShmemHeader_t*)pNewMap->malloc();
153 char* pNewData = (char*)(pNewHeader + 1);
154
155 pNewHeader->size = newsize;
156 pNewHeader->resize = false;
157 pNewHeader->close = m_pHeader->close;
158
159 pNewHeader->tail = 0;
160 pNewHeader->head = pNewHeader->avail = m_pHeader->avail;
161 pNewHeader->waiting = m_pHeader->waiting;
162
163 if (m_pHeader->avail) {
164 // one or two blocks in circular queue?
165 if (m_pHeader->tail < m_pHeader->head) {
166 memcpy(pNewData, m_pData + m_pHeader->tail, m_pHeader->avail);
167 } else {
168 int firstchunk = m_pHeader->size - m_pHeader->tail;
169 memcpy(pNewData, m_pData + m_pHeader->tail, firstchunk);
170 memcpy(pNewData + firstchunk, m_pData, m_pHeader->head);
171 }
172 }
173
174 m_pMap->close();
175 delete m_pMap;
176 m_pMap = pNewMap;
177
178 m_pHeader = pNewHeader;
179 m_pData = pNewData;
180
181 return true;
182}
183
185{
186 if (!m_bOpen) {
187 return false;
188 }
189
190 m_pAccessMutex->acquire();
191
192 if (!m_bOpen) {
193 return false;
194 }
195
196 if (m_pHeader->close) {
197 m_pAccessMutex->release();
198 close();
199 return false;
200 }
201
202 if ((int)m_pHeader->size - (int)m_pHeader->avail < (int)b.length()) {
203 yarp::conf::ssize_t required = m_pHeader->size + 2 * b.length();
204 Resize((int)required);
205 }
206
207 if ((int)m_pHeader->head + (int)b.length() <= (int)m_pHeader->size) {
208 memcpy(m_pData + m_pHeader->head, b.get(), b.length());
209 } else {
210 int first_block_size = m_pHeader->size - m_pHeader->head;
211 memcpy(m_pData + m_pHeader->head, b.get(), first_block_size);
212 memcpy(m_pData, b.get() + first_block_size, b.length() - first_block_size);
213 }
214
215 m_pHeader->avail += (int)b.length();
216 m_pHeader->head += (int)b.length();
218
219 while (m_pHeader->waiting > 0) {
221 m_pWaitDataMutex->release();
222 }
223
224 m_pAccessMutex->release();
225
226 return true;
227}
228
230{
231 if (!m_bOpen) {
232 return;
233 }
234
235 m_bOpen = false;
236
237 m_pAccessMutex->acquire();
238 while (m_pHeader->waiting > 0) {
240 m_pWaitDataMutex->release();
241 }
242 m_pHeader->close = true;
243 m_pAccessMutex->release();
244
245 m_pAccessMutex->remove();
246 delete m_pAccessMutex;
247 m_pAccessMutex = nullptr;
248
249 m_pWaitDataMutex->remove();
250 delete m_pWaitDataMutex;
251 m_pWaitDataMutex = nullptr;
252
253 m_pMap->close();
254 delete m_pMap;
255 m_pMap = nullptr;
256}
const yarp::os::LogComponent & SHMEMCARRIER()
bool write(const yarp::os::Bytes &b)
bool open(int port, int size=4096)
ACE_Process_Mutex * m_pWaitDataMutex
ACE_Process_Mutex * m_pAccessMutex
ShmemHeader_t * m_pHeader
ACE_Shared_Memory * m_pMap
A simple abstraction for a block of bytes.
Definition Bytes.h:24
size_t length() const
Definition Bytes.cpp:22
const char * get() const
Definition Bytes.cpp:27
#define yCError(component,...)
#define yCDebug(component,...)
::ssize_t ssize_t
Definition numeric.h:86