YARP
Yet Another Robot Platform
ThreadImpl.cpp
Go to the documentation of this file.
1 /*
2  * SPDX-FileCopyrightText: 2006-2021 Istituto Italiano di Tecnologia (IIT)
3  * SPDX-FileCopyrightText: 2006-2010 RobotCub Consortium
4  * SPDX-License-Identifier: BSD-3-Clause
5  */
6 
8 
9 #include <yarp/os/NetType.h>
10 #include <yarp/os/Semaphore.h>
13 
14 #include <cstdlib>
15 #include <sstream>
16 #include <thread>
17 
18 #if defined(YARP_HAS_ACE)
19 # include <ace/Thread.h> // For using ACE_hthread_t as native_handle
20 // In one the ACE headers there is a definition of "main" for WIN32
21 # ifdef main
22 # undef main
23 # endif
24 #endif
25 
26 #if defined(__linux__) // Use the POSIX syscalls for the gettid()
27 # include <sys/syscall.h>
28 # include <unistd.h>
29 #endif
30 
31 
32 using namespace yarp::os::impl;
33 
34 namespace {
35 YARP_OS_LOG_COMPONENT(THREADIMPL, "yarp.os.impl.ThreadImpl")
36 } // namespace
37 
38 static std::atomic<int> threadCount{0};
39 
40 void theExecutiveBranch(void* args)
41 {
42  // just for now -- rather deal with broken pipes through normal procedures
43  yarp::os::impl::signal(SIGPIPE, SIG_IGN);
44 
45 
46  /*
47  sigset_t set;
48  sigemptyset(&set);
49  sigaddset(&set, SIGHUP);
50  sigaddset(&set, SIGINT);
51  sigaddset(&set, SIGQUIT);
52  sigaddset(&set, SIGTERM);
53  sigaddset(&set, SIGUSR1);
54  sigaddset(&set, SIGCHLD);
55  ACE_OS::thr_sigsetmask(SIG_BLOCK, &set, nullptr);
56  fprintf(stderr, "Blocking signals\n");
57  */
58 
59  auto* thread = (ThreadImpl*)args;
60 
61  yCDebug(THREADIMPL, "Thread starting up");
62 
63  bool success = thread->threadInit();
64  thread->notify(success);
65  thread->notifyOpened(success);
66  thread->synchroPost();
67 
68  if (success) {
69  // the thread id must be set before calling run() to avoid a race
70  // condition in case the run() method checks it.
71  thread->id = std::this_thread::get_id();
72 #if defined(__linux__)
73  // Use the POSIX syscalls to get
74  // the real thread ID (gettid) on Linux machine
75  thread->tid = static_cast<long int>(syscall(SYS_gettid));
76 #else
77  thread->tid = static_cast<long int>(std::hash<std::thread::id>()(thread->id));
78 #endif
79 
80  thread->setPriority();
81  thread->run();
82  thread->threadRelease();
83  }
84 
85  --threadCount;
86  yCDebug(THREADIMPL, "Thread shutting down");
87 
88  thread->notify(false);
89  thread->synchroPost();
90 
91  return;
92 }
93 
94 
96 {
97  yCDebug(THREADIMPL, "Thread being deleted");
98  join();
99 }
100 
101 
103 {
104  return tid;
105 }
106 
107 
109 {
110 #if defined(__linux__)
111  // Use the POSIX syscalls to get
112  // the real thread ID (gettid) on Linux machine
113  return static_cast<long int>(syscall(SYS_gettid));
114 #else
115  return static_cast<long int>(std::hash<std::thread::id>()(std::this_thread::get_id()));
116 #endif
117 }
118 
119 
120 int ThreadImpl::join(double seconds)
121 {
122  closing = true;
123  if (needJoin) {
124  if (seconds > 0) {
125  if (!initWasSuccessful) {
126  // join called before start completed
127  yCError(THREADIMPL, "Tried to join a thread before starting it");
128  return -1;
129  }
130  synchro.waitWithTimeout(seconds);
131  if (active) {
132  return -1;
133  }
134  }
135 
136  int result = -1;
137  if (thread.joinable()) {
138  thread.join();
139  result = 0;
140  }
141 
142  needJoin = false;
143  active = false;
144  while (synchro.check()) {
145  }
146  return result;
147  }
148  return 0;
149 }
150 
152 {
153 }
154 
156 {
157  closing = true;
158  join(-1);
159 }
160 
161 // similar to close(), but does not join (does not block)
163 {
164  closing = true;
165 }
166 
168 {
169 }
170 
171 void ThreadImpl::afterStart(bool success)
172 {
173 }
174 
176 {
177  return true;
178 }
179 
181 {
182 }
183 
185 {
186  join();
187  closing = false;
188  initWasSuccessful = false;
189  beforeStart();
190  thread = std::thread(theExecutiveBranch, (void*)this);
191  int result = thread.joinable() ? 0 : 1;
192  if (result == 0) {
193  // we must, at some point in the future, join the thread
194  needJoin = true;
195 
196  // the thread started correctly, wait for the initialization
197  yCDebug(THREADIMPL, "Child thread initializing");
198  synchroWait();
199  initWasSuccessful = true;
200  if (opened) {
201  ++threadCount;
202  yCDebug(THREADIMPL, "Child thread initialized ok");
203  afterStart(true);
204  return true;
205  }
206  yCDebug(THREADIMPL, "Child thread did not initialize ok");
207  //wait for the thread to really exit
208  ThreadImpl::join(-1);
209  }
210  //the thread did not start, call afterStart() to warn the user
211  yCError(THREADIMPL, "A thread failed to start with error code: %d", result);
212  afterStart(false);
213  return false;
214 }
215 
217 {
218  synchro.wait();
219 }
220 
222 {
223  synchro.post();
224 }
225 
226 void ThreadImpl::notify(bool s)
227 {
228  active = s;
229 }
230 
232 {
233  return closing;
234 }
235 
237 {
238  return active;
239 }
240 
242 {
243  return threadCount;
244 }
245 
246 int ThreadImpl::setPriority(int priority, int policy)
247 {
248  if (priority == -1) {
249  priority = defaultPriority;
250  policy = defaultPolicy;
251  } else {
252  defaultPriority = priority;
253  defaultPolicy = policy;
254  }
255  if (active && priority != -1) {
256 #if defined(YARP_HAS_ACE)
257  if (std::is_same<std::thread::native_handle_type, ACE_hthread_t>::value) {
258  return ACE_Thread::setprio(thread.native_handle(), priority, policy);
259  }
260  yCError(THREADIMPL, "Cannot set priority without ACE");
261 #elif defined(__unix__)
262  if (std::is_same<std::thread::native_handle_type, pthread_t>::value) {
263  struct sched_param thread_param;
264  thread_param.sched_priority = priority;
265  int ret = pthread_setschedparam(thread.native_handle(), policy, &thread_param);
266  return (ret != 0) ? -1 : 0;
267  } else {
268  yCError(THREADIMPL, "Cannot set priority without ACE");
269  }
270 #else
271  yCError(THREADIMPL, "Cannot set priority without ACE");
272 #endif
273  }
274  return 0;
275 }
276 
278 {
279  int prio = defaultPriority;
280  if (active) {
281 #if defined(YARP_HAS_ACE)
282  if (std::is_same<std::thread::native_handle_type, ACE_hthread_t>::value) {
283  ACE_Thread::getprio(thread.native_handle(), prio);
284  } else {
285  yCError(THREADIMPL, "Cannot get priority without ACE");
286  }
287 #elif defined(__unix__)
288  if (std::is_same<std::thread::native_handle_type, pthread_t>::value) {
289  struct sched_param thread_param;
290  int policy;
291  if (pthread_getschedparam(thread.native_handle(), &policy, &thread_param) == 0) {
292  prio = thread_param.sched_priority;
293  } else {
294  yCError(THREADIMPL, "Cannot get priority without ACE");
295  }
296  }
297 #else
298  yCError(THREADIMPL, "Cannot get priority without ACE");
299 #endif
300  }
301  return prio;
302 }
303 
305 {
306  int policy = defaultPolicy;
307  if (active) {
308 #if defined(YARP_HAS_ACE)
309  if (std::is_same<std::thread::native_handle_type, ACE_hthread_t>::value) {
310  int prio;
311  ACE_Thread::getprio(thread.native_handle(), prio, policy);
312  } else {
313  yCError(THREADIMPL, "Cannot get scheduling policy without ACE");
314  }
315 #elif defined(__unix__)
316  if (std::is_same<std::thread::native_handle_type, pthread_t>::value) {
317  struct sched_param thread_param;
318  if (pthread_getschedparam(thread.native_handle(), &policy, &thread_param) != 0) {
319  policy = defaultPolicy;
320  }
321  } else {
322  yCError(THREADIMPL, "Cannot get scheduling policy without ACE");
323  }
324 #else
325  yCError(THREADIMPL, "Cannot get scheduling policy without ACE");
326 #endif
327  }
328  return policy;
329 }
330 
332 {
333  return tid;
334 }
335 
337 {
339 }
bool ret
static std::atomic< int > threadCount
Definition: ThreadImpl.cpp:38
void theExecutiveBranch(void *args)
Definition: ThreadImpl.cpp:40
bool waitWithTimeout(double timeoutInSeconds)
Try to decrement the counter, even if we must wait - but don't wait forever.
Definition: Semaphore.cpp:101
void wait()
Decrement the counter, even if we must wait to do that.
Definition: Semaphore.cpp:96
bool check()
Decrement the counter, unless that would require waiting.
Definition: Semaphore.cpp:106
void post()
Increment the counter.
Definition: Semaphore.cpp:111
An abstraction for a thread of execution.
Definition: ThreadImpl.h:23
virtual void threadRelease()
Definition: ThreadImpl.cpp:180
int join(double seconds=-1)
Definition: ThreadImpl.cpp:120
virtual void afterStart(bool success)
Definition: ThreadImpl.cpp:171
virtual void beforeStart()
Definition: ThreadImpl.cpp:167
static long int getKeyOfCaller()
Definition: ThreadImpl.cpp:108
int setPriority(int priority=-1, int policy=-1)
Definition: ThreadImpl.cpp:246
#define yCError(component,...)
Definition: LogComponent.h:154
#define yCDebug(component,...)
Definition: LogComponent.h:109
#define YARP_OS_LOG_COMPONENT(name, name_string)
Definition: LogComponent.h:35
void yield()
The calling thread releases its remaining quantum upon calling this function.
Definition: Time.cpp:138
The components from which ports and connections are built.