YARP
Yet Another Robot Platform
PeriodicThread.cpp
Go to the documentation of this file.
1/*
2 * SPDX-FileCopyrightText: 2006-2021 Istituto Italiano di Tecnologia (IIT)
3 * SPDX-FileCopyrightText: 2006-2010 RobotCub Consortium
4 * SPDX-License-Identifier: BSD-3-Clause
5 */
6
8
11
12#include <cmath>
13#include <algorithm> // std::max
14#include <memory>
15#include <mutex>
16
17using namespace yarp::os::impl;
18using namespace yarp::os;
19
20
21namespace
22{
23 class DelayEstimatorBase
24 {
25 public:
26 DelayEstimatorBase(double period) : adaptedPeriod(std::max(period, 0.0)) {}
27 virtual ~DelayEstimatorBase() = default;
28 double getPeriod() const { return adaptedPeriod; }
29 virtual void setPeriod(double period) { adaptedPeriod = std::max(period, 0.0); }
30 virtual void onInit() {};
31 virtual void onSchedule(unsigned int count, double now) {};
32 virtual double computeDelay(unsigned int count, double now, double elapsed) const = 0;
33 virtual void reset(unsigned int count, double now) {};
34
35 private:
36 double adaptedPeriod;
37 };
38
39 class AbsoluteDelayEstimator : public DelayEstimatorBase
40 {
41 public:
42 AbsoluteDelayEstimator(double period) : DelayEstimatorBase(period), scheduledPeriod(period) {}
43
44 void onInit() override
45 {
46 scheduleAdapt = true;
47 }
48
49 void setPeriod(double period) override
50 {
51 scheduledPeriod = period;
52 scheduleAdapt = true;
53 }
54
55 void onSchedule(unsigned int count, double now) override
56 {
57 if (scheduleAdapt) {
58 DelayEstimatorBase::setPeriod(scheduledPeriod);
59 reset(count, now);
60 }
61 }
62
63 double computeDelay(unsigned int count, double now, double elapsed) const override
64 {
65 return refTime + getPeriod() * (count - countOffset) - now;
66 }
67
68 void reset(unsigned int count, double now) override
69 {
70 countOffset = count;
71 refTime = now;
72 scheduleAdapt = false;
73 }
74
75 private:
76 unsigned int countOffset {0}; // iteration to count from for delay calculation
77 double refTime {0.0}; // absolute reference time for delay calculation
78 double scheduledPeriod {0.0}; // new period, to be configured on schedule
79 bool scheduleAdapt {false};
80 };
81
82 class RelativeDelayEstimator : public DelayEstimatorBase
83 {
84 public:
85 using DelayEstimatorBase::DelayEstimatorBase;
86
87 double computeDelay(unsigned int count, double now, double elapsed) const override
88 {
89 return getPeriod() - elapsed;
90 }
91 };
92 } // namespace
93
94
96{
97private:
98 PeriodicThread* owner;
99 mutable std::mutex mutex;
100
101 bool suspended;
102 double totalUsed; //total time taken iterations
103 unsigned int count; //number of iterations from last reset
104 unsigned int estPIt; //number of useful iterations for period estimation
105 double totalT; //time bw run, accumulated
106 double sumTSq; //cumulative sum sq of estimated period dT
107 double sumUsedSq; //cumulative sum sq of estimated thread tun
108 double previousRun; //time when last iteration started
109 bool scheduleReset;
110
111 std::unique_ptr<DelayEstimatorBase> delayEstimator;
112
113 using NowFuncPtr = double (*)();
114 using DelayFuncPtr = void (*)(double);
115 const NowFuncPtr nowFunc;
116 const DelayFuncPtr delayFunc;
117
118 void _resetStat()
119 {
120 totalUsed = 0;
121 count = 0;
122 estPIt = 0;
123 totalT = 0;
124 sumUsedSq = 0;
125 sumTSq = 0;
126 scheduleReset = false;
127 }
128
129public:
131 owner(owner),
132 suspended(false),
133 totalUsed(0),
134 count(0),
135 estPIt(0),
136 totalT(0),
137 sumTSq(0),
138 sumUsedSq(0),
139 previousRun(0),
140 scheduleReset(false),
141 nowFunc(useSystemClock == ShouldUseSystemClock::Yes ? SystemClock::nowSystem : yarp::os::Time::now),
142 delayFunc(useSystemClock == ShouldUseSystemClock::Yes ? SystemClock::delaySystem : yarp::os::Time::delay)
143 {
144 if (clockAccuracy == PeriodicThreadClock::Relative) {
145 delayEstimator = std::make_unique<RelativeDelayEstimator>(p);
146 } else {
147 delayEstimator = std::make_unique<AbsoluteDelayEstimator>(p);
148 }
149 }
150
152 {
153 scheduleReset = true;
154 }
155
156 double getEstimatedPeriod() const
157 {
158 double ret;
159 lock();
160 if (estPIt == 0) {
161 ret = 0;
162 } else {
163 ret = (totalT / estPIt);
164 }
165 unlock();
166 return ret;
167 }
168
169 void getEstimatedPeriod(double& av, double& std) const
170 {
171 lock();
172 if (estPIt == 0) {
173 av = 0;
174 std = 0;
175 } else {
176 av = totalT / estPIt;
177 if (estPIt > 1) {
178 std = sqrt(((1.0 / (estPIt - 1)) * (sumTSq - estPIt * av * av)));
179 } else {
180 std = 0;
181 }
182 }
183 unlock();
184 }
185
186 unsigned int getIterations() const
187 {
188 lock();
189 unsigned int ret = count;
190 unlock();
191 return ret;
192 }
193
194 double getEstimatedUsed() const
195 {
196 double ret;
197 lock();
198 if (count < 1) {
199 ret = 0.0;
200 } else {
201 ret = totalUsed / count;
202 }
203 unlock();
204 return ret;
205 }
206
207 void getEstimatedUsed(double& av, double& std) const
208 {
209 lock();
210 if (count < 1) {
211 av = 0;
212 std = 0;
213 } else {
214 av = totalUsed / count;
215 if (count > 1) {
216 std = sqrt((1.0 / (count - 1)) * (sumUsedSq - count * av * av));
217 } else {
218 std = 0;
219 }
220 }
221 unlock();
222 }
223
224 void step()
225 {
226 lock();
227 double currentRun = nowFunc();
228 delayEstimator->onSchedule(count, currentRun);
229
230 if (scheduleReset) {
231 _resetStat();
232 delayEstimator->reset(count, currentRun);
233 }
234
235 if (count > 0) {
236 double dT = currentRun - previousRun;
237 sumTSq += dT * dT;
238 totalT += dT;
239 estPIt++;
240 }
241
242 previousRun = currentRun;
243 unlock();
244
245 if (!suspended) {
246 owner->run();
247 }
248
249 // At the end of each run of updateModule function, the thread is supposed
250 // to be suspended and release CPU to other threads.
251 // Calling a yield here will help the threads to alternate in the execution.
252 // Note: call yield BEFORE computing elapsed time, so that any time spent due to
253 // yield is taken into account and the sleep time is correct.
254 yield();
255
256 lock();
257 count++;
258 double now = nowFunc();
259 double elapsed = now - currentRun;
260 double sleepPeriod = delayEstimator->computeDelay(count, now, elapsed);
261 //save last
262 totalUsed += elapsed;
263 sumUsedSq += elapsed * elapsed;
264 unlock();
265
266 delayFunc(sleepPeriod);
267 }
268
269 void run() override
270 {
271 while (!isClosing()) {
272 step();
273 }
274 }
275
276 bool threadInit() override
277 {
278 delayEstimator->onInit();
279 return owner->threadInit();
280 }
281
282 void threadRelease() override
283 {
284 owner->threadRelease();
285 }
286
287 bool setPeriod(double period)
288 {
289 lock();
290 delayEstimator->setPeriod(period);
291 unlock();
292 return true;
293 }
294
295 double getPeriod() const
296 {
297 return delayEstimator->getPeriod();
298 }
299
300 bool isSuspended() const
301 {
302 return suspended;
303 }
304
305 void suspend()
306 {
307 suspended = true;
308 }
309
310 void resume()
311 {
312 suspended = false;
313 }
314
315 void afterStart(bool s) override
316 {
317 owner->afterStart(s);
318 }
319
320 void beforeStart() override
321 {
322 owner->beforeStart();
323 }
324
325 void lock() const
326 {
327 mutex.lock();
328 }
329
330 void unlock() const
331 {
332 mutex.unlock();
333 }
334};
335
336
337PeriodicThread::PeriodicThread(double period, ShouldUseSystemClock useSystemClock, PeriodicThreadClock clockAccuracy) :
338 mPriv(new Private(this, period, useSystemClock, clockAccuracy))
339{
340}
341
343 mPriv(new Private(this, period, ShouldUseSystemClock::No, clockAccuracy))
344{
345}
346
348{
349 delete mPriv;
350}
351
352bool PeriodicThread::setPeriod(double period)
353{
354 return mPriv->setPeriod(period);
355}
356
358{
359 return mPriv->getPeriod();
360}
361
363{
364 return mPriv->isSuspended();
365}
366
368{
369 mPriv->close();
370}
371
373{
374 mPriv->askToClose();
375}
376
378{
379 mPriv->step();
380}
381
383{
384 return mPriv->start();
385}
386
388{
389 return mPriv->isRunning();
390}
391
393{
394 mPriv->suspend();
395}
396
398{
399 mPriv->resume();
400}
401
403{
404 return mPriv->getIterations();
405}
406
408{
409 return mPriv->getEstimatedPeriod();
410}
411
413{
414 return mPriv->getEstimatedUsed();
415}
416
417void PeriodicThread::getEstimatedPeriod(double& av, double& std) const
418{
419 mPriv->getEstimatedPeriod(av, std);
420}
421
422void PeriodicThread::getEstimatedUsed(double& av, double& std) const
423{
424 mPriv->getEstimatedUsed(av, std);
425}
426
428{
429 mPriv->resetStat();
430}
431
433{
434 return true;
435}
436
438{
439}
440
442{
443}
444
446{
447 YARP_UNUSED(success);
448}
449
450int PeriodicThread::setPriority(int priority, int policy)
451{
452 return mPriv->setPriority(priority, policy);
453}
454
456{
457 return mPriv->getPriority();
458}
459
461{
462 return mPriv->getPolicy();
463}
bool ret
void getEstimatedUsed(double &av, double &std) const
Private(PeriodicThread *owner, double p, ShouldUseSystemClock useSystemClock, PeriodicThreadClock clockAccuracy)
void getEstimatedPeriod(double &av, double &std) const
An abstraction for a periodic thread.
void resetStat()
Reset thread statistics.
bool setPeriod(double period)
Set the (new) period of the thread.
int getPriority() const
Query the current priority of the thread, if the OS supports that.
PeriodicThread(double period, ShouldUseSystemClock useSystemClock=ShouldUseSystemClock::No, PeriodicThreadClock clockAccuracy=PeriodicThreadClock::Relative)
Constructor.
virtual void run()=0
Loop function.
virtual void beforeStart()
Called just before a new thread starts.
unsigned int getIterations() const
Return the number of iterations performed since last reset.
bool isRunning() const
Returns true when the thread is started, false otherwise.
int getPolicy() const
Query the current scheduling policy of the thread, if the OS supports that.
bool isSuspended() const
Returns true when the thread is suspended, false otherwise.
virtual bool threadInit()
Initialization method.
void resume()
Resume the thread if previously suspended.
virtual void afterStart(bool success)
Called just after a new thread starts (or fails to start), this is executed by the same thread that c...
void suspend()
Suspend the thread, the thread keeps running by doLoop is never executed.
int setPriority(int priority, int policy=-1)
Set the priority and scheduling policy of the thread, if the OS supports that.
void askToStop()
Stop the thread.
double getEstimatedUsed() const
Return the estimated duration of the run() function since last reset.
double getEstimatedPeriod() const
Return estimated period since last reset.
bool start()
Call this to start the thread.
void step()
Call this to "step" the thread rather than starting it.
double getPeriod() const
Return the current period of the thread.
void stop()
Call this to stop the thread, this call blocks until the thread is terminated (and releaseThread() ca...
virtual void threadRelease()
Release method.
An abstraction for a thread of execution.
Definition: ThreadImpl.h:21
yarp::rosmsg::std_msgs::Time Time
Definition: Time.h:21
STL namespace.
void useSystemClock()
Configure YARP to use system time (this is the default).
Definition: Time.cpp:144
double now()
Return the current time in seconds, relative to an arbitrary starting point.
Definition: Time.cpp:121
void delay(double seconds)
Wait for a certain number of seconds.
Definition: Time.cpp:111
The components from which ports and connections are built.
An interface to the operating system, including Port based communication.
ShouldUseSystemClock
Definition: Time.h:19
The main, catch-all namespace for YARP.
Definition: dirs.h:16
#define YARP_UNUSED(var)
Definition: api.h:162