YARP
Yet Another Robot Platform
ShmemInputStream.cpp
Go to the documentation of this file.
1 /*
2  * SPDX-FileCopyrightText: 2006-2021 Istituto Italiano di Tecnologia (IIT)
3  * SPDX-FileCopyrightText: 2006-2010 RobotCub Consortium
4  * SPDX-License-Identifier: BSD-3-Clause
5  */
6 
7 #include "ShmemInputStream.h"
8 #include "ShmemLogComponent.h"
9 
10 #include <yarp/os/Bytes.h>
12 
13 #include <ace/Lib_Find.h>
14 // In one the ACE headers there is a definition of "main" for WIN32
15 # ifdef main
16 # undef main
17 # endif
18 
19 
20 
22  m_bOpen(false),
23  m_ResizeNum(0),
24  m_Port(-1),
25  m_pAccessMutex(nullptr),
26  m_pWaitDataMutex(nullptr),
27  m_pMap(nullptr),
28  m_pData(nullptr),
29  m_pHeader(nullptr),
30  m_pSock(nullptr)
31 {
32 }
33 
35 {
36  close();
37 }
38 
40 {
41  return m_bOpen;
42 }
43 
44 bool ShmemInputStreamImpl::open(int port, ACE_SOCK_Stream* pSock, int size)
45 {
46  m_pSock = pSock;
47 
48  m_pAccessMutex = m_pWaitDataMutex = nullptr;
49  m_pMap = nullptr;
50  m_pData = nullptr;
51  m_pHeader = nullptr;
52  m_ResizeNum = 0;
53 
54  m_Port = port;
55 
56  char obj_name[2048];
57  char temp_dir_path[1024];
58 
59  if (ACE::get_temp_dir(temp_dir_path, 1024) == -1) {
60  yCError(SHMEMCARRIER, "ShmemHybridStream: no temp directory found.");
61  return false;
62  }
63 
64 #ifdef ACE_LACKS_SYSV_SHMEM
65 
66  snprintf(obj_name, 2048, "%sSHMEM_FILE_%d_%d", temp_dir_path, port, 0);
67 
68  m_pMap = new ACE_Shared_Memory_MM(obj_name, //const ACE_TCHAR *filename,
69  size + sizeof(ShmemHeader_t), //int len = -1,
70  O_RDWR, //int flags = O_RDWR | O_CREAT,
71  ACE_DEFAULT_FILE_PERMS, //int mode = ACE_DEFAULT_FILE_PERMS,
72  PROT_RDWR, //int prot = PROT_RDWR,
73  ACE_MAP_SHARED); //int share = ACE_MAP_PRIVATE,
74 
75 #else
76 
77  m_pMap = new ACE_Shared_Memory_SV(port, size + sizeof(ShmemHeader_t));
78 
79 #endif
80 
81  m_pHeader = (ShmemHeader_t*)m_pMap->malloc();
82  m_pData = (char*)(m_pHeader + 1);
83 
84 #ifdef _ACE_USE_SV_SEM
85  snprintf(obj_name, 2048, "%sSHMEM_ACCESS_MUTEX_%d", temp_dir_path, port);
86  m_pAccessMutex = new ACE_Mutex(USYNC_PROCESS, obj_name);
87  snprintf(obj_name, 2048, "%sSHMEM_WAITDATA_MUTEX_%d", temp_dir_path, port);
88  m_pWaitDataMutex = new ACE_Mutex(USYNC_PROCESS, obj_name);
89 #else
90  snprintf(obj_name, 2048, "SHMEM_ACCESS_MUTEX_%d", port);
91  m_pAccessMutex = new ACE_Process_Mutex(obj_name);
92  snprintf(obj_name, 2048, "SHMEM_WAITDATA_MUTEX_%d", port);
93  m_pWaitDataMutex = new ACE_Process_Mutex(obj_name);
94 #endif
95 
96  m_pWaitDataMutex->acquire();
97 
98  m_bOpen = true;
99 
100  return true;
101 }
102 
104 {
105  ++m_ResizeNum;
106 
107  ACE_Shared_Memory* pNewMap;
108 
109  yCDebug(SHMEMCARRIER, "input stream resize %d to %d", m_ResizeNum, m_pHeader->newsize);
110 
111 #ifdef ACE_LACKS_SYSV_SHMEM
112 
113  char file_path[1024];
114 
115  if (ACE::get_temp_dir(file_path, 1024) == -1) {
116  yCError(SHMEMCARRIER, "ShmemHybridStream: no temp directory found.");
117  return false;
118  }
119 
120  char file_name[2048];
121  snprintf(file_name, 2048, "%sSHMEM_FILE_%d_%d", file_path, m_Port, m_ResizeNum);
122 
123  pNewMap = new ACE_Shared_Memory_MM(file_name, //const ACE_TCHAR *filename,
124  m_pHeader->newsize + sizeof(ShmemHeader_t), //int len = -1,
125  O_RDWR, //int flags = O_RDWR | O_CREAT,
126  ACE_DEFAULT_FILE_PERMS, //int mode = ACE_DEFAULT_FILE_PERMS,
127  PROT_RDWR, //int prot = PROT_RDWR,
128  ACE_MAP_SHARED); //int share = ACE_MAP_PRIVATE,
129 
130 #else
131 
132  int shmemkey = (m_ResizeNum << 16) + m_Port;
133 
134  pNewMap = new ACE_Shared_Memory_SV(shmemkey, m_pHeader->size + sizeof(ShmemHeader_t));
135 
136 #endif
137 
138  if (!pNewMap) {
139  yCError(SHMEMCARRIER, "ShmemOutputStream can't create shared memory");
140  return false;
141  }
142 
143  auto* pNewHeader = (ShmemHeader_t*)pNewMap->malloc();
144  char* pNewData = (char*)(pNewHeader + 1);
145 
146  m_pMap->close();
147  delete m_pMap;
148 
149  m_pMap = pNewMap;
150  m_pHeader = pNewHeader;
151  m_pData = pNewData;
152 
153  return true;
154 }
155 
156 int ShmemInputStreamImpl::read(char* data, int len)
157 {
158  m_pAccessMutex->acquire();
159 
160  if (m_pHeader->close) {
161  m_pAccessMutex->release();
162  close();
163  return -1;
164  }
165 
166  while (m_pHeader->resize) {
167  Resize();
168  }
169 
170  if (m_pHeader->avail < len) {
171  ++m_pHeader->waiting;
172  m_pAccessMutex->release();
173  return 0;
174  }
175 
176  if (m_pHeader->tail + len > m_pHeader->size) {
177  int first_block_size = m_pHeader->size - m_pHeader->tail;
178 
179  memcpy((void*)data, (void*)(m_pData + m_pHeader->tail), first_block_size);
180  memcpy((void*)(data + first_block_size), (void*)m_pData, len - first_block_size);
181  } else {
182  memcpy((void*)data, (void*)(m_pData + m_pHeader->tail), len);
183  }
184 
185  m_pHeader->avail -= len;
186  m_pHeader->tail += len;
188 
189  m_pAccessMutex->release();
190 
191  return len;
192 }
193 
195 {
196  m_ReadSerializerMutex.lock();
197 
198  if (!m_bOpen) {
199  m_ReadSerializerMutex.unlock();
200  return -1;
201  }
202 
203  char* data = b.get();
204  char* buf;
205  size_t len = b.length();
207 
208  while (!(ret = read(data, (int)len))) {
209 #ifdef _ACE_USE_SV_SEM
210  yarp::os::impl::YARP_timeval tv = ACE_OS::gettimeofday();
211  tv.sec(tv.sec() + 1);
212 #else
214 #endif
215 
216  m_pWaitDataMutex->acquire(tv);
217 
218  if (!m_pSock->recv(&buf, 1)) {
219  yCDebug(SHMEMCARRIER, "STREAM IS BROKEN");
220  close();
221  m_ReadSerializerMutex.unlock();
222  return -1;
223  }
224  }
225 
226  m_ReadSerializerMutex.unlock();
227 
228  return ret;
229 }
230 
232 {
233  if (!m_bOpen) {
234  return;
235  }
236 
237  m_bOpen = false;
238 
239  m_pAccessMutex->acquire();
240  while (m_pHeader->waiting > 0) {
241  --m_pHeader->waiting;
242  m_pWaitDataMutex->release();
243  }
244  m_pHeader->close = true;
245  m_pAccessMutex->release();
246 
247  m_pAccessMutex->remove();
248  delete m_pAccessMutex;
249  m_pAccessMutex = nullptr;
250 
251  m_pWaitDataMutex->remove();
252  delete m_pWaitDataMutex;
253  m_pWaitDataMutex = nullptr;
254 
255  m_pMap->close();
256  delete m_pMap;
257  m_pMap = nullptr;
258 }
bool ret
const yarp::os::LogComponent & SHMEMCARRIER()
ACE_SOCK_Stream * m_pSock
yarp::conf::ssize_t read(yarp::os::Bytes &b)
ACE_Shared_Memory * m_pMap
ACE_Process_Mutex * m_pWaitDataMutex
ShmemHeader_t * m_pHeader
bool open(int port, ACE_SOCK_Stream *pSock, int size=4096)
std::mutex m_ReadSerializerMutex
ACE_Process_Mutex * m_pAccessMutex
A simple abstraction for a block of bytes.
Definition: Bytes.h:25
size_t length() const
Definition: Bytes.cpp:22
const char * get() const
Definition: Bytes.cpp:27
#define yCError(component,...)
Definition: LogComponent.h:154
#define yCDebug(component,...)
Definition: LogComponent.h:109
::ssize_t ssize_t
Definition: numeric.h:86
struct timeval YARP_timeval
Definition: PlatformTime.h:32