YARP
Yet Another Robot Platform
PeriodicThread.cpp
Go to the documentation of this file.
1 /*
2  * SPDX-FileCopyrightText: 2006-2021 Istituto Italiano di Tecnologia (IIT)
3  * SPDX-FileCopyrightText: 2006-2010 RobotCub Consortium
4  * SPDX-License-Identifier: BSD-3-Clause
5  */
6 
8 
9 #include <yarp/os/SystemClock.h>
11 
12 #include <cmath>
13 #include <algorithm> // std::max
14 #include <memory>
15 #include <mutex>
16 
17 using namespace yarp::os::impl;
18 using namespace yarp::os;
19 
20 
21 namespace
22 {
23  class DelayEstimatorBase
24  {
25  public:
26  DelayEstimatorBase(double period) : adaptedPeriod(std::max(period, 0.0)) {}
27  virtual ~DelayEstimatorBase() = default;
28  double getPeriod() const { return adaptedPeriod; }
29  virtual void setPeriod(double period) { adaptedPeriod = std::max(period, 0.0); }
30  virtual void onInit() {};
31  virtual void onSchedule(unsigned int count, double now) {};
32  virtual double computeDelay(unsigned int count, double now, double elapsed) const = 0;
33  virtual void reset(unsigned int count, double now) {};
34 
35  private:
36  double adaptedPeriod;
37  };
38 
39  class AbsoluteDelayEstimator : public DelayEstimatorBase
40  {
41  public:
42  AbsoluteDelayEstimator(double period) : DelayEstimatorBase(period), scheduledPeriod(period) {}
43 
44  void onInit() override
45  {
46  scheduleAdapt = true;
47  }
48 
49  void setPeriod(double period) override
50  {
51  scheduledPeriod = period;
52  scheduleAdapt = true;
53  }
54 
55  void onSchedule(unsigned int count, double now) override
56  {
57  if (scheduleAdapt) {
58  DelayEstimatorBase::setPeriod(scheduledPeriod);
59  reset(count, now);
60  }
61  }
62 
63  double computeDelay(unsigned int count, double now, double elapsed) const override
64  {
65  return refTime + getPeriod() * (count - countOffset) - now;
66  }
67 
68  void reset(unsigned int count, double now) override
69  {
70  countOffset = count;
71  refTime = now;
72  scheduleAdapt = false;
73  }
74 
75  private:
76  unsigned int countOffset {0}; // iteration to count from for delay calculation
77  double refTime {0.0}; // absolute reference time for delay calculation
78  double scheduledPeriod {0.0}; // new period, to be configured on schedule
79  bool scheduleAdapt {false};
80  };
81 
82  class RelativeDelayEstimator : public DelayEstimatorBase
83  {
84  public:
85  using DelayEstimatorBase::DelayEstimatorBase;
86 
87  double computeDelay(unsigned int count, double now, double elapsed) const override
88  {
89  return getPeriod() - elapsed;
90  }
91  };
92 }
93 
94 
96 {
97 private:
98  PeriodicThread* owner;
99  mutable std::mutex mutex;
100 
101  bool suspended;
102  double totalUsed; //total time taken iterations
103  unsigned int count; //number of iterations from last reset
104  unsigned int estPIt; //number of useful iterations for period estimation
105  double totalT; //time bw run, accumulated
106  double sumTSq; //cumulative sum sq of estimated period dT
107  double sumUsedSq; //cumulative sum sq of estimated thread tun
108  double previousRun; //time when last iteration started
109  bool scheduleReset;
110 
111  std::unique_ptr<DelayEstimatorBase> delayEstimator;
112 
113  using NowFuncPtr = double (*)();
114  using DelayFuncPtr = void (*)(double);
115  const NowFuncPtr nowFunc;
116  const DelayFuncPtr delayFunc;
117 
118  void _resetStat()
119  {
120  totalUsed = 0;
121  count = 0;
122  estPIt = 0;
123  totalT = 0;
124  sumUsedSq = 0;
125  sumTSq = 0;
126  scheduleReset = false;
127  }
128 
129 public:
131  owner(owner),
132  suspended(false),
133  totalUsed(0),
134  count(0),
135  estPIt(0),
136  totalT(0),
137  sumTSq(0),
138  sumUsedSq(0),
139  previousRun(0),
140  scheduleReset(false),
141  nowFunc(useSystemClock == ShouldUseSystemClock::Yes ? SystemClock::nowSystem : yarp::os::Time::now),
142  delayFunc(useSystemClock == ShouldUseSystemClock::Yes ? SystemClock::delaySystem : yarp::os::Time::delay)
143  {
144  if (clockAccuracy == PeriodicThreadClock::Relative) {
145  delayEstimator = std::make_unique<RelativeDelayEstimator>(p);
146  } else {
147  delayEstimator = std::make_unique<AbsoluteDelayEstimator>(p);
148  }
149  }
150 
151  void resetStat()
152  {
153  scheduleReset = true;
154  }
155 
156  double getEstimatedPeriod() const
157  {
158  double ret;
159  lock();
160  if (estPIt == 0) {
161  ret = 0;
162  } else {
163  ret = (totalT / estPIt);
164  }
165  unlock();
166  return ret;
167  }
168 
169  void getEstimatedPeriod(double& av, double& std) const
170  {
171  lock();
172  if (estPIt == 0) {
173  av = 0;
174  std = 0;
175  } else {
176  av = totalT / estPIt;
177  if (estPIt > 1) {
178  std = sqrt(((1.0 / (estPIt - 1)) * (sumTSq - estPIt * av * av)));
179  } else {
180  std = 0;
181  }
182  }
183  unlock();
184  }
185 
186  unsigned int getIterations() const
187  {
188  lock();
189  unsigned int ret = count;
190  unlock();
191  return ret;
192  }
193 
194  double getEstimatedUsed() const
195  {
196  double ret;
197  lock();
198  if (count < 1) {
199  ret = 0.0;
200  } else {
201  ret = totalUsed / count;
202  }
203  unlock();
204  return ret;
205  }
206 
207  void getEstimatedUsed(double& av, double& std) const
208  {
209  lock();
210  if (count < 1) {
211  av = 0;
212  std = 0;
213  } else {
214  av = totalUsed / count;
215  if (count > 1) {
216  std = sqrt((1.0 / (count - 1)) * (sumUsedSq - count * av * av));
217  } else {
218  std = 0;
219  }
220  }
221  unlock();
222  }
223 
224  void step()
225  {
226  lock();
227  double currentRun = nowFunc();
228  delayEstimator->onSchedule(count, currentRun);
229 
230  if (scheduleReset) {
231  _resetStat();
232  delayEstimator->reset(count, currentRun);
233  }
234 
235  if (count > 0) {
236  double dT = currentRun - previousRun;
237  sumTSq += dT * dT;
238  totalT += dT;
239  estPIt++;
240  }
241 
242  previousRun = currentRun;
243  unlock();
244 
245  if (!suspended) {
246  owner->run();
247  }
248 
249  // At the end of each run of updateModule function, the thread is supposed
250  // to be suspended and release CPU to other threads.
251  // Calling a yield here will help the threads to alternate in the execution.
252  // Note: call yield BEFORE computing elapsed time, so that any time spent due to
253  // yield is taken into account and the sleep time is correct.
254  yield();
255 
256  lock();
257  count++;
258  double now = nowFunc();
259  double elapsed = now - currentRun;
260  double sleepPeriod = delayEstimator->computeDelay(count, now, elapsed);
261  //save last
262  totalUsed += elapsed;
263  sumUsedSq += elapsed * elapsed;
264  unlock();
265 
266  delayFunc(sleepPeriod);
267  }
268 
269  void run() override
270  {
271  while (!isClosing()) {
272  step();
273  }
274  }
275 
276  bool threadInit() override
277  {
278  delayEstimator->onInit();
279  return owner->threadInit();
280  }
281 
282  void threadRelease() override
283  {
284  owner->threadRelease();
285  }
286 
287  bool setPeriod(double period)
288  {
289  lock();
290  delayEstimator->setPeriod(period);
291  unlock();
292  return true;
293  }
294 
295  double getPeriod() const
296  {
297  return delayEstimator->getPeriod();
298  }
299 
300  bool isSuspended() const
301  {
302  return suspended;
303  }
304 
305  void suspend()
306  {
307  suspended = true;
308  }
309 
310  void resume()
311  {
312  suspended = false;
313  }
314 
315  void afterStart(bool s) override
316  {
317  owner->afterStart(s);
318  }
319 
320  void beforeStart() override
321  {
322  owner->beforeStart();
323  }
324 
325  void lock() const
326  {
327  mutex.lock();
328  }
329 
330  void unlock() const
331  {
332  mutex.unlock();
333  }
334 };
335 
336 
338  mPriv(new Private(this, period, useSystemClock, clockAccuracy))
339 {
340 }
341 
343  mPriv(new Private(this, period, ShouldUseSystemClock::No, clockAccuracy))
344 {
345 }
346 
348 {
349  delete mPriv;
350 }
351 
352 bool PeriodicThread::setPeriod(double period)
353 {
354  return mPriv->setPeriod(period);
355 }
356 
358 {
359  return mPriv->getPeriod();
360 }
361 
363 {
364  return mPriv->isSuspended();
365 }
366 
368 {
369  mPriv->close();
370 }
371 
373 {
374  mPriv->askToClose();
375 }
376 
378 {
379  mPriv->step();
380 }
381 
383 {
384  return mPriv->start();
385 }
386 
388 {
389  return mPriv->isRunning();
390 }
391 
393 {
394  mPriv->suspend();
395 }
396 
398 {
399  mPriv->resume();
400 }
401 
402 unsigned int PeriodicThread::getIterations() const
403 {
404  return mPriv->getIterations();
405 }
406 
408 {
409  return mPriv->getEstimatedPeriod();
410 }
411 
413 {
414  return mPriv->getEstimatedUsed();
415 }
416 
417 void PeriodicThread::getEstimatedPeriod(double& av, double& std) const
418 {
419  mPriv->getEstimatedPeriod(av, std);
420 }
421 
422 void PeriodicThread::getEstimatedUsed(double& av, double& std) const
423 {
424  mPriv->getEstimatedUsed(av, std);
425 }
426 
428 {
429  mPriv->resetStat();
430 }
431 
433 {
434  return true;
435 }
436 
438 {
439 }
440 
442 {
443 }
444 
445 void PeriodicThread::afterStart(bool success)
446 {
447  YARP_UNUSED(success);
448 }
449 
450 int PeriodicThread::setPriority(int priority, int policy)
451 {
452  return mPriv->setPriority(priority, policy);
453 }
454 
456 {
457  return mPriv->getPriority();
458 }
459 
461 {
462  return mPriv->getPolicy();
463 }
bool ret
void getEstimatedUsed(double &av, double &std) const
Private(PeriodicThread *owner, double p, ShouldUseSystemClock useSystemClock, PeriodicThreadClock clockAccuracy)
void getEstimatedPeriod(double &av, double &std) const
An abstraction for a periodic thread.
void resetStat()
Reset thread statistics.
bool setPeriod(double period)
Set the (new) period of the thread.
int getPriority() const
Query the current priority of the thread, if the OS supports that.
PeriodicThread(double period, ShouldUseSystemClock useSystemClock=ShouldUseSystemClock::No, PeriodicThreadClock clockAccuracy=PeriodicThreadClock::Relative)
Constructor.
virtual void run()=0
Loop function.
virtual void beforeStart()
Called just before a new thread starts.
unsigned int getIterations() const
Return the number of iterations performed since last reset.
bool isRunning() const
Returns true when the thread is started, false otherwise.
int getPolicy() const
Query the current scheduling policy of the thread, if the OS supports that.
bool isSuspended() const
Returns true when the thread is suspended, false otherwise.
virtual bool threadInit()
Initialization method.
void resume()
Resume the thread if previously suspended.
virtual void afterStart(bool success)
Called just after a new thread starts (or fails to start), this is executed by the same thread that c...
void suspend()
Suspend the thread, the thread keeps running by doLoop is never executed.
int setPriority(int priority, int policy=-1)
Set the priority and scheduling policy of the thread, if the OS supports that.
void askToStop()
Stop the thread.
double getEstimatedUsed() const
Return the estimated duration of the run() function since last reset.
double getEstimatedPeriod() const
Return estimated period since last reset.
bool start()
Call this to start the thread.
void step()
Call this to "step" the thread rather than starting it.
double getPeriod() const
Return the current period of the thread.
void stop()
Call this to stop the thread, this call blocks until the thread is terminated (and releaseThread() ca...
virtual void threadRelease()
Release method.
An abstraction for a thread of execution.
Definition: ThreadImpl.h:23
yarp::rosmsg::std_msgs::Time Time
Definition: Time.h:21
void useSystemClock()
Configure YARP to use system time (this is the default).
Definition: Time.cpp:144
void yield()
The calling thread releases its remaining quantum upon calling this function.
Definition: Time.cpp:138
double now()
Return the current time in seconds, relative to an arbitrary starting point.
Definition: Time.cpp:121
void delay(double seconds)
Wait for a certain number of seconds.
Definition: Time.cpp:111
The components from which ports and connections are built.
An interface to the operating system, including Port based communication.
ShouldUseSystemClock
Definition: Time.h:20
The main, catch-all namespace for YARP.
Definition: dirs.h:16
#define YARP_UNUSED(var)
Definition: api.h:162