YARP
Yet Another Robot Platform
ThreadImpl.cpp
Go to the documentation of this file.
1/*
2 * SPDX-FileCopyrightText: 2006-2021 Istituto Italiano di Tecnologia (IIT)
3 * SPDX-FileCopyrightText: 2006-2010 RobotCub Consortium
4 * SPDX-License-Identifier: BSD-3-Clause
5 */
6
8
9#include <yarp/os/NetType.h>
10#include <yarp/os/Semaphore.h>
13
14#include <cstdlib>
15#include <sstream>
16#include <thread>
17
18#if defined(YARP_HAS_ACE)
19# include <ace/Thread.h> // For using ACE_hthread_t as native_handle
20// In one the ACE headers there is a definition of "main" for WIN32
21# ifdef main
22# undef main
23# endif
24#endif
25
26#if defined(__linux__) // Use the POSIX syscalls for the gettid()
27# include <sys/syscall.h>
28# include <unistd.h>
29#endif
30
31
32using namespace yarp::os::impl;
33
34namespace {
35YARP_OS_LOG_COMPONENT(THREADIMPL, "yarp.os.impl.ThreadImpl")
36} // namespace
37
38static std::atomic<int> threadCount{0};
39
40void theExecutiveBranch(void* args)
41{
42 // just for now -- rather deal with broken pipes through normal procedures
43 yarp::os::impl::signal(SIGPIPE, SIG_IGN);
44
45
46 /*
47 sigset_t set;
48 sigemptyset(&set);
49 sigaddset(&set, SIGHUP);
50 sigaddset(&set, SIGINT);
51 sigaddset(&set, SIGQUIT);
52 sigaddset(&set, SIGTERM);
53 sigaddset(&set, SIGUSR1);
54 sigaddset(&set, SIGCHLD);
55 ACE_OS::thr_sigsetmask(SIG_BLOCK, &set, nullptr);
56 fprintf(stderr, "Blocking signals\n");
57 */
58
59 auto* thread = (ThreadImpl*)args;
60
61 yCDebug(THREADIMPL, "Thread starting up");
62
63 bool success = thread->threadInit();
64 thread->notify(success);
65 thread->notifyOpened(success);
66 thread->synchroPost();
67
68 if (success) {
69 // the thread id must be set before calling run() to avoid a race
70 // condition in case the run() method checks it.
71 thread->id = std::this_thread::get_id();
72#if defined(__linux__)
73 // Use the POSIX syscalls to get
74 // the real thread ID (gettid) on Linux machine
75 thread->tid = static_cast<long int>(syscall(SYS_gettid));
76#else
77 thread->tid = static_cast<long int>(std::hash<std::thread::id>()(thread->id));
78#endif
79
80 thread->setPriority();
81 thread->run();
82 thread->threadRelease();
83 }
84
86 yCDebug(THREADIMPL, "Thread shutting down");
87
88 thread->notify(false);
89 thread->synchroPost();
90
91 return;
92}
93
94
95ThreadImpl::~ThreadImpl()
96{
97 yCDebug(THREADIMPL, "Thread being deleted");
98 join();
99}
100
101
103{
104 return tid;
105}
106
107
109{
110#if defined(__linux__)
111 // Use the POSIX syscalls to get
112 // the real thread ID (gettid) on Linux machine
113 return static_cast<long int>(syscall(SYS_gettid));
114#else
115 return static_cast<long int>(std::hash<std::thread::id>()(std::this_thread::get_id()));
116#endif
117}
118
119
120int ThreadImpl::join(double seconds)
121{
122 closing = true;
123 if (needJoin) {
124 if (seconds > 0) {
125 if (!initWasSuccessful) {
126 // join called before start completed
127 yCError(THREADIMPL, "Tried to join a thread before starting it");
128 return -1;
129 }
130 synchro.waitWithTimeout(seconds);
131 if (active) {
132 return -1;
133 }
134 }
135
136 int result = -1;
137 if (thread.joinable()) {
138 thread.join();
139 result = 0;
140 }
141
142 needJoin = false;
143 active = false;
144 while (synchro.check()) {
145 }
146 return result;
147 }
148 return 0;
149}
150
152{
153}
154
156{
157 closing = true;
158 join(-1);
159}
160
161// similar to close(), but does not join (does not block)
163{
164 closing = true;
165}
166
168{
169}
170
171void ThreadImpl::afterStart(bool success)
172{
173}
174
176{
177 return true;
178}
179
181{
182}
183
185{
186 join();
187 closing = false;
188 initWasSuccessful = false;
189 beforeStart();
190 thread = std::thread(theExecutiveBranch, (void*)this);
191 int result = thread.joinable() ? 0 : 1;
192 if (result == 0) {
193 // we must, at some point in the future, join the thread
194 needJoin = true;
195
196 // the thread started correctly, wait for the initialization
197 yCDebug(THREADIMPL, "Child thread initializing");
198 synchroWait();
199 initWasSuccessful = true;
200 if (opened) {
201 ++threadCount;
202 yCDebug(THREADIMPL, "Child thread initialized ok");
203 afterStart(true);
204 return true;
205 }
206 yCDebug(THREADIMPL, "Child thread did not initialize ok");
207 //wait for the thread to really exit
209 }
210 //the thread did not start, call afterStart() to warn the user
211 yCError(THREADIMPL, "A thread failed to start with error code: %d", result);
212 afterStart(false);
213 return false;
214}
215
217{
218 synchro.wait();
219}
220
222{
223 synchro.post();
224}
225
227{
228 active = s;
229}
230
232{
233 return closing;
234}
235
237{
238 return active;
239}
240
242{
243 return threadCount;
244}
245
246int ThreadImpl::setPriority(int priority, int policy)
247{
248 if (priority == -1) {
249 priority = defaultPriority;
250 policy = defaultPolicy;
251 } else {
252 defaultPriority = priority;
253 defaultPolicy = policy;
254 }
255 if (active && priority != -1) {
256#if defined(YARP_HAS_ACE)
257 if (std::is_same<std::thread::native_handle_type, ACE_hthread_t>::value) {
258 return ACE_Thread::setprio(thread.native_handle(), priority, policy);
259 }
260 yCError(THREADIMPL, "Cannot set priority without ACE");
261#elif defined(__unix__)
262 if (std::is_same<std::thread::native_handle_type, pthread_t>::value) {
263 struct sched_param thread_param;
264 thread_param.sched_priority = priority;
265 int ret = pthread_setschedparam(thread.native_handle(), policy, &thread_param);
266 return (ret != 0) ? -1 : 0;
267 } else {
268 yCError(THREADIMPL, "Cannot set priority without ACE");
269 }
270#else
271 yCError(THREADIMPL, "Cannot set priority without ACE");
272#endif
273 }
274 return 0;
275}
276
278{
279 int prio = defaultPriority;
280 if (active) {
281#if defined(YARP_HAS_ACE)
282 if (std::is_same<std::thread::native_handle_type, ACE_hthread_t>::value) {
283 ACE_Thread::getprio(thread.native_handle(), prio);
284 } else {
285 yCError(THREADIMPL, "Cannot get priority without ACE");
286 }
287#elif defined(__unix__)
288 if (std::is_same<std::thread::native_handle_type, pthread_t>::value) {
289 struct sched_param thread_param;
290 int policy;
291 if (pthread_getschedparam(thread.native_handle(), &policy, &thread_param) == 0) {
292 prio = thread_param.sched_priority;
293 } else {
294 yCError(THREADIMPL, "Cannot get priority without ACE");
295 }
296 }
297#else
298 yCError(THREADIMPL, "Cannot get priority without ACE");
299#endif
300 }
301 return prio;
302}
303
305{
306 int policy = defaultPolicy;
307 if (active) {
308#if defined(YARP_HAS_ACE)
309 if (std::is_same<std::thread::native_handle_type, ACE_hthread_t>::value) {
310 int prio;
311 ACE_Thread::getprio(thread.native_handle(), prio, policy);
312 } else {
313 yCError(THREADIMPL, "Cannot get scheduling policy without ACE");
314 }
315#elif defined(__unix__)
316 if (std::is_same<std::thread::native_handle_type, pthread_t>::value) {
317 struct sched_param thread_param;
318 if (pthread_getschedparam(thread.native_handle(), &policy, &thread_param) != 0) {
319 policy = defaultPolicy;
320 }
321 } else {
322 yCError(THREADIMPL, "Cannot get scheduling policy without ACE");
323 }
324#else
325 yCError(THREADIMPL, "Cannot get scheduling policy without ACE");
326#endif
327 }
328 return policy;
329}
330
332{
333 return tid;
334}
335
337{
339}
bool ret
static std::atomic< int > threadCount
Definition: ThreadImpl.cpp:38
void theExecutiveBranch(void *args)
Definition: ThreadImpl.cpp:40
bool waitWithTimeout(double timeoutInSeconds)
Try to decrement the counter, even if we must wait - but don't wait forever.
Definition: Semaphore.cpp:101
void wait()
Decrement the counter, even if we must wait to do that.
Definition: Semaphore.cpp:96
bool check()
Decrement the counter, unless that would require waiting.
Definition: Semaphore.cpp:106
void post()
Increment the counter.
Definition: Semaphore.cpp:111
An abstraction for a thread of execution.
Definition: ThreadImpl.h:21
virtual void threadRelease()
Definition: ThreadImpl.cpp:180
int join(double seconds=-1)
Definition: ThreadImpl.cpp:120
virtual void afterStart(bool success)
Definition: ThreadImpl.cpp:171
virtual void beforeStart()
Definition: ThreadImpl.cpp:167
static long int getKeyOfCaller()
Definition: ThreadImpl.cpp:108
int setPriority(int priority=-1, int policy=-1)
Definition: ThreadImpl.cpp:246
#define yCError(component,...)
Definition: LogComponent.h:213
#define yCDebug(component,...)
Definition: LogComponent.h:128
#define YARP_OS_LOG_COMPONENT(name, name_string)
Definition: LogComponent.h:29
void yield()
The calling thread releases its remaining quantum upon calling this function.
Definition: Time.cpp:138
The components from which ports and connections are built.