YARP
Yet Another Robot Platform
ShmemOutputStream.cpp
Go to the documentation of this file.
1 /*
2  * SPDX-FileCopyrightText: 2006-2021 Istituto Italiano di Tecnologia (IIT)
3  * SPDX-FileCopyrightText: 2006-2010 RobotCub Consortium
4  * SPDX-License-Identifier: BSD-3-Clause
5  */
6 
7 #include "ShmemOutputStream.h"
8 #include "ShmemLogComponent.h"
9 
10 #include <yarp/conf/numeric.h>
11 #include <yarp/os/Bytes.h>
12 #include <ace/Lib_Find.h>
13 // In one the ACE headers there is a definition of "main" for WIN32
14 # ifdef main
15 # undef main
16 # endif
17 
18 
19 
21  m_bOpen(false),
22  m_ResizeNum(0),
23  m_Port(0),
24  m_pAccessMutex(nullptr),
25  m_pWaitDataMutex(nullptr),
26  m_pMap(nullptr),
27  m_pData(nullptr),
28  m_pHeader(nullptr)
29 {
30 }
31 
33 {
34  close();
35 }
36 
38 {
39  return m_bOpen;
40 }
41 
42 bool ShmemOutputStreamImpl::open(int port, int size)
43 {
44  m_pAccessMutex = m_pWaitDataMutex = nullptr;
45  m_pMap = nullptr;
46  m_pData = nullptr;
47  m_pHeader = nullptr;
48  m_ResizeNum = 0;
49  m_Port = port;
50 
51  char obj_name[2048];
52  char temp_dir_path[1024];
53 
54  if (ACE::get_temp_dir(temp_dir_path, 1024) == -1) {
55  yCError(SHMEMCARRIER, "ShmemHybridStream: no temp directory found.");
56  return false;
57  }
58 
59 #ifdef ACE_LACKS_SYSV_SHMEM
60 
61  snprintf(obj_name, 2048, "%sSHMEM_FILE_%d_0", temp_dir_path, port);
62 
63  m_pMap = new ACE_Shared_Memory_MM(obj_name, //const ACE_TCHAR *filename,
64  size + sizeof(ShmemHeader_t), //int len = -1,
65  O_RDWR | O_CREAT, //int flags = O_RDWR | O_CREAT,
66  ACE_DEFAULT_FILE_PERMS, //int mode = ACE_DEFAULT_FILE_PERMS,
67  PROT_RDWR, //int prot = PROT_RDWR,
68  ACE_MAP_SHARED); //int share = ACE_MAP_PRIVATE,
69 
70 #else
71 
72  m_pMap = new ACE_Shared_Memory_SV(port, size + sizeof(ShmemHeader_t), ACE_Shared_Memory_SV::ACE_CREATE);
73 
74 #endif
75 
76  m_pHeader = (ShmemHeader_t*)m_pMap->malloc();
77  m_pData = (char*)(m_pHeader + 1);
78 
79 #ifdef _ACE_USE_SV_SEM
80  snprintf(obj_name, 2048, "%sSHMEM_ACCESS_MUTEX_%d", temp_dir_path, port);
81  m_pAccessMutex = new ACE_Mutex(USYNC_PROCESS, obj_name);
82  snprintf(obj_name, 2048, "%sSHMEM_WAITDATA_MUTEX_%d", temp_dir_path, port);
83  m_pWaitDataMutex = new ACE_Mutex(USYNC_PROCESS, obj_name);
84 #else
85  snprintf(obj_name, 2048, "SHMEM_ACCESS_MUTEX_%d", port);
86  m_pAccessMutex = new ACE_Process_Mutex(obj_name);
87  snprintf(obj_name, 2048, "SHMEM_WAITDATA_MUTEX_%d", port);
88  m_pWaitDataMutex = new ACE_Process_Mutex(obj_name);
89 #endif
90 
91  m_pAccessMutex->acquire();
92 
93  m_pHeader->resize = false;
94  m_pHeader->close = false;
95 
96  m_pHeader->avail = 0;
97  m_pHeader->head = 0;
98  m_pHeader->size = size;
99  m_pHeader->tail = 0;
100  m_pHeader->waiting = 0;
101 
102  m_pAccessMutex->release();
103 
104  m_bOpen = true;
105 
106  return true;
107 }
108 
110 {
111  ++m_ResizeNum;
112 
113  yCDebug(SHMEMCARRIER, "output stream resize %d to %d\n", m_ResizeNum, newsize);
114 
115  ACE_Shared_Memory* pNewMap;
116 
117  m_pHeader->resize = true;
118  m_pHeader->newsize = newsize;
119 
120 #ifdef ACE_LACKS_SYSV_SHMEM
121 
122  char file_path[1024];
123 
124  if (ACE::get_temp_dir(file_path, 1024) == -1) {
125  yCError(SHMEMCARRIER, "ShmemHybridStream: no temp directory found.");
126  return false;
127  }
128 
129  char file_name[2048];
130  snprintf(file_name, 2048, "%sSHMEM_FILE_%d_%d", file_path, m_Port, m_ResizeNum);
131 
132  pNewMap = new ACE_Shared_Memory_MM(file_name, //const ACE_TCHAR *filename,
133  newsize + sizeof(ShmemHeader_t), //int len = -1,
134  O_RDWR | O_CREAT, //int flags = O_RDWR | O_CREAT,
135  ACE_DEFAULT_FILE_PERMS, //int mode = ACE_DEFAULT_FILE_PERMS,
136  PROT_RDWR, //int prot = PROT_RDWR,
137  ACE_MAP_SHARED); //int share = ACE_MAP_PRIVATE,
138 
139 #else
140 
141  int shmemkey = (m_ResizeNum << 16) + m_Port;
142 
143  pNewMap = new ACE_Shared_Memory_SV(shmemkey, newsize + sizeof(ShmemHeader_t), ACE_Shared_Memory_SV::ACE_CREATE);
144 
145 #endif
146 
147  if (!pNewMap) {
148  yCError(SHMEMCARRIER, "ShmemOutputStream can't create shared memory");
149  return false;
150  }
151 
152  auto* pNewHeader = (ShmemHeader_t*)pNewMap->malloc();
153  char* pNewData = (char*)(pNewHeader + 1);
154 
155  pNewHeader->size = newsize;
156  pNewHeader->resize = false;
157  pNewHeader->close = m_pHeader->close;
158 
159  pNewHeader->tail = 0;
160  pNewHeader->head = pNewHeader->avail = m_pHeader->avail;
161  pNewHeader->waiting = m_pHeader->waiting;
162 
163  if (m_pHeader->avail) {
164  // one or two blocks in circular queue?
165  if (m_pHeader->tail < m_pHeader->head) {
166  memcpy(pNewData, m_pData + m_pHeader->tail, m_pHeader->avail);
167  } else {
168  int firstchunk = m_pHeader->size - m_pHeader->tail;
169  memcpy(pNewData, m_pData + m_pHeader->tail, firstchunk);
170  memcpy(pNewData + firstchunk, m_pData, m_pHeader->head);
171  }
172  }
173 
174  m_pMap->close();
175  delete m_pMap;
176  m_pMap = pNewMap;
177 
178  m_pHeader = pNewHeader;
179  m_pData = pNewData;
180 
181  return true;
182 }
183 
185 {
186  if (!m_bOpen) {
187  return false;
188  }
189 
190  m_pAccessMutex->acquire();
191 
192  if (!m_bOpen) {
193  return false;
194  }
195 
196  if (m_pHeader->close) {
197  m_pAccessMutex->release();
198  close();
199  return false;
200  }
201 
202  if ((int)m_pHeader->size - (int)m_pHeader->avail < (int)b.length()) {
203  yarp::conf::ssize_t required = m_pHeader->size + 2 * b.length();
204  Resize((int)required);
205  }
206 
207  if ((int)m_pHeader->head + (int)b.length() <= (int)m_pHeader->size) {
208  memcpy(m_pData + m_pHeader->head, b.get(), b.length());
209  } else {
210  int first_block_size = m_pHeader->size - m_pHeader->head;
211  memcpy(m_pData + m_pHeader->head, b.get(), first_block_size);
212  memcpy(m_pData, b.get() + first_block_size, b.length() - first_block_size);
213  }
214 
215  m_pHeader->avail += (int)b.length();
216  m_pHeader->head += (int)b.length();
218 
219  while (m_pHeader->waiting > 0) {
220  --m_pHeader->waiting;
221  m_pWaitDataMutex->release();
222  }
223 
224  m_pAccessMutex->release();
225 
226  return true;
227 }
228 
230 {
231  if (!m_bOpen) {
232  return;
233  }
234 
235  m_bOpen = false;
236 
237  m_pAccessMutex->acquire();
238  while (m_pHeader->waiting > 0) {
239  --m_pHeader->waiting;
240  m_pWaitDataMutex->release();
241  }
242  m_pHeader->close = true;
243  m_pAccessMutex->release();
244 
245  m_pAccessMutex->remove();
246  delete m_pAccessMutex;
247  m_pAccessMutex = nullptr;
248 
249  m_pWaitDataMutex->remove();
250  delete m_pWaitDataMutex;
251  m_pWaitDataMutex = nullptr;
252 
253  m_pMap->close();
254  delete m_pMap;
255  m_pMap = nullptr;
256 }
const yarp::os::LogComponent & SHMEMCARRIER()
bool write(const yarp::os::Bytes &b)
bool open(int port, int size=4096)
bool Resize(int newsize)
ACE_Process_Mutex * m_pWaitDataMutex
ACE_Process_Mutex * m_pAccessMutex
ShmemHeader_t * m_pHeader
ACE_Shared_Memory * m_pMap
A simple abstraction for a block of bytes.
Definition: Bytes.h:25
size_t length() const
Definition: Bytes.cpp:22
const char * get() const
Definition: Bytes.cpp:27
#define yCError(component,...)
Definition: LogComponent.h:154
#define yCDebug(component,...)
Definition: LogComponent.h:109
::ssize_t ssize_t
Definition: numeric.h:86