YARP
Yet Another Robot Platform
execstate.cpp
Go to the documentation of this file.
1 /*
2  * SPDX-FileCopyrightText: 2006-2021 Istituto Italiano di Tecnologia (IIT)
3  * SPDX-License-Identifier: BSD-3-Clause
4  */
5 
8 #include <yarp/os/Time.h>
9 
10 #include <iostream>
11 
12 using namespace FSM;
13 using namespace yarp::manager;
14 
15 
20 Event* EventFactory::startEvent = new Event("start");
21 Event* EventFactory::stopEvent = new Event("stop");
22 Event* EventFactory::killEvent = new Event("kill");
23 Event* EventFactory::failedEvent = new Event("failed");
24 Event* EventFactory::recoverEvent = new Event("recover");
25 Event* EventFactory::startModuleEventOk = new Event("startModule:ok");
26 Event* EventFactory::startModuleEventFailed = new Event("startModule:failed");
27 Event* EventFactory::stopModuleEventOk = new Event("stopModule:ok");
28 Event* EventFactory::stopModuleEventFailed = new Event("stopModule:failed");
29 Event* EventFactory::killModuleEventOk = new Event("killModule:ok");
30 Event* EventFactory::killModuleEventFailed = new Event("killModule:failed");
31 Event* EventFactory::connectAllPortsEventOk = new Event("connectAllPorts:ok");
32 Event* EventFactory::connectAllPortsEventFailed = new Event("connectAllPorts:failed");
33 Event* EventFactory::disconnectAllPortsEventOk = new Event("disconnectAllPorts:ok");
34 
35 
36 
40 Suspended::Suspended(Executable* pExecutable, FSM::IEventSink* pEventSink)
41  : StateBase(pEventSink, "SUSPENDED")
42 {
43  executable = pExecutable;
44 }
45 
46 
47 Suspended::~Suspended() = default;
48 
50 {
52 }
53 
55 {
57 }
58 
60 {
62 }
63 
64 void Suspended::moduleFailed() { /* do nothing*/ }
65 
66 
67 // refresh() from Suspended can be used for recovering from
68 // unexptected termination of manager.
70 {
72  int ret = executable->getBroker()->running();
73  if(ret == 1)
74  {
77  } else if (ret == -1) {
78  logger->addError(executable->getBroker()->error());
79  }
80 }
81 
82 
86 Ready::Ready(Executable* pExecutable, FSM::IEventSink* pEventSink)
87  : StateBase(pEventSink, "READY")
88 {
89  executable = pExecutable;
90  bAborted = false;
91 }
92 
93 
94 Ready::~Ready() = default;
95 
96 bool Ready::checkPriorityPorts()
97 {
98  CnnIterator itr;
99  for(itr=executable->getConnections().begin();
100  itr!=executable->getConnections().end(); itr++)
101  {
102  if ((*itr).withPriority()
103  && !executable->getBroker()->exists((*itr).from())) {
104  return false;
105  }
106  }
107  return true;
108 }
109 
110 bool Ready::checkResources(bool silent)
111 {
112  bool allOK = true;
113  ResourceIterator itr;
114  for(itr=executable->getResources().begin();
115  itr!=executable->getResources().end(); itr++)
116  {
117  if(!executable->getBroker()->exists((*itr).getPort())) {
118  allOK = false;
119  if (silent) {
120  return false;
121  } else {
122  OSTRINGSTREAM msg;
123  msg<<(*itr).getPort()<<" does not exist";
125  continue;
126  }
127  }
128  // check the rpc request/reply if required
129  if(strlen((*itr).getRequest()) != 0) {
130  const char* reply = executable->getBroker()->requestRpc((*itr).getPort(),
131  (*itr).getRequest(),
132  (*itr).getTimeout());
133  if(reply == nullptr) {
134  allOK = false;
135  OSTRINGSTREAM msg;
136  msg<<"cannot request resource "<<(*itr).getPort()<<" for "<<(*itr).getRequest();
138  if (silent) {
139  return false;
140  } else {
141  continue;
142  }
143  }
144 
145  if(!compareString(reply, (*itr).getReply())) {
146  allOK = false;
147  OSTRINGSTREAM msg;
148  msg<<"waiting for the expected reply from resource "<<(*itr).getPort();
149  msg<<" for request "<<(*itr).getRequest();
150  msg<<". (expected="<<(*itr).getReply()<<", received="<<reply<<")";
152  if (silent) {
153  return false;
154  } else {
155  continue;
156  }
157  }
158  }
159  }
160  return allOK;
161 }
162 
163 bool Ready::timeout(double base, double timeout)
164 {
166  if ((yarp::os::SystemClock::nowSystem() - base) > timeout) {
167  return true;
168  }
169  return false;
170 }
171 
173 {
174 
176 
177  // wait for priority ports if auto connecte is enabled
178  if(executable->autoConnect())
179  {
180  bAborted = false;
181  while(!checkPriorityPorts())
182  {
184  if (bAborted) {
185  return;
186  }
187  }
188  }
189 
190  // finding maximum resource-waiting timeout
191  ResourceIterator itr;
192  double maxTimeout = 0;
193  for(itr=executable->getResources().begin();
194  itr!=executable->getResources().end(); itr++)
195  {
196  if ((*itr).getTimeout() > maxTimeout) {
197  maxTimeout = (*itr).getTimeout();
198  }
199  }
200 
201  // waiting for resources
202  double base = yarp::os::SystemClock::nowSystem();
203  while(!checkResources()) {
204  if (bAborted) {
205  return;
206  }
207 
208  if(timeout(base, maxTimeout)) {
209  // give it the last try and collect the error messages
210  if(!checkResources(false)) {
211  OSTRINGSTREAM msg;
212  msg<<"cannot run "<<executable->getCommand()<<" on "<<executable->getHost();
213  msg<<" : Timeout while waiting for "<<executable->getHost();
214  logger->addError(msg);
215 
217  executable->getEvent()->onExecutableDied(executable);
218  return;
219  }
220  }
221  }
222 
223  if (executable->getPostExecWait() > 0)
224  {
226  }
227  executable->restoreOriginalPostExecWait();
228  if(!executable->getBroker()->start())
229  {
230  OSTRINGSTREAM msg;
231  msg<<"cannot run "<<executable->getCommand()<<" on "<<executable->getHost();
232  if (executable->getBroker()->error()) {
233  msg << " : " << executable->getBroker()->error();
234  }
235  logger->addError(msg);
236 
238  executable->getEvent()->onExecutableDied(executable);
239  }
240  else
241  {
243  executable->getEvent()->onExecutableStart(executable);
244  }
245 }
246 
247 
249 {
250  bAborted = true;
252 }
253 
254 void Ready::moduleFailed() { /* do nothing */ }
255 
256 
261  : StateBase(pEventSink, "CONNECTING"),
262  executable(pExecutable),
263  bAborted(false)
264 {}
265 
266 Connecting::~Connecting() = default;
267 
268 bool Connecting::checkNormalPorts()
269 {
270  CnnIterator itr;
271  for(itr=executable->getConnections().begin();
272  itr!=executable->getConnections().end(); itr++)
273  {
274  if (!executable->getBroker()->exists((*itr).to()) || !executable->getBroker()->exists((*itr).from())) {
275  return false;
276  }
277  }
278  return true;
279 }
280 
281 
283 {
285  if(executable->autoConnect())
286  {
290  bAborted = false;
291  while(!checkNormalPorts())
292  {
294  if (bAborted) {
295  return;
296  }
297  }
298 
299  CnnIterator itr;
300  for(itr=executable->getConnections().begin();
301  itr!=executable->getConnections().end(); itr++)
302  {
303  if( !executable->getBroker()->connect((*itr).from(), (*itr).to(),
304  (*itr).carrier()) )
305  {
306  OSTRINGSTREAM msg;
307  msg<<"cannot connect "<<(*itr).from() <<" to "<<(*itr).to();
308  if (executable->getBroker()->error()) {
309  msg << " : " << executable->getBroker()->error();
310  }
311  logger->addError(msg);
312  } else {
313  executable->getEvent()->onCnnStablished(&(*itr));
314  }
315  }
316  }
317 
319 }
320 
322 {
324  int ret = executable->getBroker()->running();
325  if (ret == 0) {
327  } else if (ret == -1) {
328  logger->addError(executable->getBroker()->error());
329  }
330 }
331 
333 {
334  bAborted = true;
336 }
337 
339 {
340  bAborted = true;
342  executable->getEvent()->onExecutableFailed(executable);
343 }
344 
345 
349 Running::Running(Executable* pExecutable, FSM::IEventSink* pEventSink)
350  : StateBase(pEventSink, "RUNNING")
351 {
352  executable = pExecutable;
353 }
354 
355 
356 Running::~Running() = default;
357 
359 {
361  int ret = executable->getBroker()->running();
362  if (ret == 0) {
364  } else if (ret == -1) {
365  logger->addError(executable->getBroker()->error());
366  }
367 }
368 
370 {
371  executable->getEvent()->onExecutableStart(executable);
372 }
373 
374 
376 {
378 }
379 
381 {
383 }
384 
386 {
388  executable->getEvent()->onExecutableFailed(executable);
389 }
390 
391 void Running::connectionFailed(void* which)
392 {
393  executable->getEvent()->onCnnFailed(which);
394 }
395 
396 
400 Dying::Dying(Executable* pExecutable, FSM::IEventSink* pEventSink)
401  : StateBase(pEventSink, "DYING")
402 {
403  executable = pExecutable;
404 }
405 
406 
407 Dying::~Dying() = default;
408 
410 {
412  if (executable->getPostStopWait() > 0)
413  {
415  }
416  executable->restoreOriginalPostStopWait();
417  if(!executable->getBroker()->stop())
418  {
419  OSTRINGSTREAM msg;
420  msg<<"cannot stop "<<executable->getCommand()<<" on "<<executable->getHost();
421  if (executable->getBroker()->error()) {
422  msg << " : " << executable->getBroker()->error();
423  }
424  logger->addError(msg);
425  executable->getEvent()->onError(executable);
427  }
428  else
429  {
431  executable->getEvent()->onExecutableStop(executable);
432  }
433 }
434 
436 {
438  if(!executable->getBroker()->kill())
439  {
440  OSTRINGSTREAM msg;
441  msg<<"cannot kill "<<executable->getCommand()<<" on "<<executable->getHost();
442  if (executable->getBroker()->error()) {
443  msg << " : " << executable->getBroker()->error();
444  }
445  logger->addError(msg);
446  executable->getEvent()->onError(executable);
448  }
449  else
450  {
452  executable->getEvent()->onExecutableDied(executable);
453  }
454 }
455 
456 
458 {
460  if(executable->autoConnect())
461  {
462  CnnIterator itr;
463  for(itr=executable->getConnections().begin();
464  itr!=executable->getConnections().end(); itr++)
465  {
466  if( !executable->getBroker()->disconnect((*itr).from(), (*itr).to(), (*itr).carrier()) )
467  {
468  OSTRINGSTREAM msg;
469  msg<<"cannot disconnect "<<(*itr).from() <<" to "<<(*itr).to();
470  if (executable->getBroker()->error()) {
471  msg << " : " << executable->getBroker()->error();
472  }
473  logger->addError(msg);
474  } else {
475  executable->getEvent()->onCnnReleased(&(*itr));
476  }
477  }
478  }
479  // We do not need to handle event disconnectAllPortsEventOk
480 }
481 
483 {
485  int ret = executable->getBroker()->running();
486  if (ret == 0) {
488  } else if (ret == -1) {
489  logger->addError(executable->getBroker()->error());
490  }
491 }
492 
493 void Dying::kill() { /* do nothing */ }
494 
496 {
497  // Notice that we should not call onExecutableFailed
498  // in DYING state!
500  executable->getEvent()->onExecutableDied(executable);
501 }
502 
503 
504 
505 
509 Dead::Dead(Executable* pExecutable, FSM::IEventSink* pEventSink)
510  : StateBase(pEventSink, "DEAD")
511 {
512  executable = pExecutable;
513 }
514 
515 
516 Dead::~Dead() = default;
517 
519 {
521 }
522 
524 {
525  executable->getEvent()->onExecutableStop(executable);
526 }
527 
528 
530 {
532 }
533 
534 // refresh() from Dead can be used for recovering from
535 // unexpect termination of manager.
537 {
539  int ret = executable->getBroker()->running();
540  if(ret == 1)
541  {
542  executable->getEvent()->onExecutableStart(executable);
544  } else if (ret == -1) {
545  logger->addError(executable->getBroker()->error());
546  }
547 }
548 
549 
550 void Dead::moduleFailed() { /* do nothing*/ }
551 
552 
553 
558 {
559  executable = pExecutable;
560  // creating states
561  suspended = new Suspended(executable, this);
562  ready = new Ready(executable, this);
563  connecting = new Connecting(executable, this);
564  running = new Running(executable, this);
565  dying = new Dying(executable, this);
566  dead = new Dead(executable, this);
567 
568  // setting initial state
569  setInitState(suspended);
570 
571  // transitions from suspended
572  addTransition(suspended, EventFactory::startEvent, ready);
573  addTransition(suspended, EventFactory::recoverEvent, running); //recovering
574  addTransition(suspended, EventFactory::killEvent, dying);
576  addTransition(suspended, EventFactory::failedEvent, suspended);
577 
578  // transitions from ready
581  addTransition(ready, EventFactory::killEvent, dying);
583 
584  // transitions from connecting
586  addTransition(connecting, EventFactory::failedEvent, dead);
587  addTransition(connecting, EventFactory::killEvent, dying);
589 
590  // transitions from running
591  addTransition(running, EventFactory::stopEvent, dying);
593  addTransition(running, EventFactory::killEvent, dying);
595 
596  // transitions from dying
604 
605  // transitions from dead
608  addTransition(dead, EventFactory::recoverEvent, running); // recovering
616 
617 }
618 
620 {
621  delete running;
622  delete suspended;
623  delete ready;
624  delete connecting;
625  delete dying;
626  delete dead;
627 }
628 
630 {
631  auto* tr = dynamic_cast<ITransition*>(currentState());
632  if (tr) {
633  tr->refresh();
634  }
635 }
636 
638 {
639  auto* tr = dynamic_cast<ITransition*>(currentState());
640  if (tr) {
641  tr->start();
642  }
643 }
644 
646 {
647  auto* tr = dynamic_cast<ITransition*>(currentState());
648  if (tr) {
649  tr->stop();
650  }
651 }
652 
654 {
655  auto* tr = dynamic_cast<ITransition*>(currentState());
656  if (tr) {
657  tr->kill();
658  }
659 }
660 
662 {
663  auto* tr = dynamic_cast<ITransition*>(currentState());
664  if (tr) {
665  tr->startModule();
666  }
667 }
668 
670 {
671  auto* tr = dynamic_cast<ITransition*>(currentState());
672  if (tr) {
673  tr->stopModule();
674  }
675 }
676 
678 {
679  auto* tr = dynamic_cast<ITransition*>(currentState());
680  if (tr) {
681  tr->killModule();
682  }
683 }
684 
686 {
687  auto* tr = dynamic_cast<ITransition*>(currentState());
688  if (tr) {
689  tr->connectAllPorts();
690  }
691 }
692 
694 {
695  auto* tr = dynamic_cast<ITransition*>(currentState());
696  if (tr) {
697  tr->disconnectAllPorts();
698  }
699 }
700 
702 {
703  auto* tr = dynamic_cast<ITransition*>(currentState());
704  if (tr) {
705  tr->moduleFailed();
706  }
707 }
708 
710 {
711  auto* tr = dynamic_cast<ITransition*>(currentState());
712  if (tr) {
713  tr->connectionFailed(which);
714  }
715 }
716 
717 // For debugging
719  Event* event, StateBase* current)
720 {
721  /*
722  std::cout<<executable->getID()<<": ";
723  std::cout<<"["<<previous->getName()<<"] ";
724  std::cout<<"--- ("<<event->getName()<<"/"<<event->getTimeStamp()<<") --> ";
725  std::cout<<"["<<current->getName()<<"]"<<endl;
726  */
727 }
bool ret
class IEventSink
Definition: fsm.h:29
class IEventSink
Definition: fsm.h:56
Class StateBase.
Definition: fsm.h:77
void castEvent(Event *event)
Definition: fsm.h:90
StateBase * currentState()
Definition: fsm.h:115
void setInitState(StateBase *pState)
Definition: fsm.h:128
void addTransition(StateBase *source, Event *event, StateBase *target)
Definition: fsm.h:132
virtual bool exists(const char *port)=0
virtual int running()=0
virtual const char * error()=0
virtual bool kill()=0
virtual bool connect(const char *from, const char *to, const char *carrier, bool persist=false)=0
virtual bool stop()=0
virtual bool start()=0
virtual bool disconnect(const char *from, const char *to, const char *carrier)=0
virtual const char * requestRpc(const char *szport, const char *request, double timeout=0.0)=0
class Connecting
Definition: execstate.h:109
Connecting(Executable *pExecutable, FSM::IEventSink *pEventSink)
Class Connecting.
Definition: execstate.cpp:260
void kill() override
Definition: execstate.cpp:332
void moduleFailed() override
Definition: execstate.cpp:338
void connectAllPorts() override
Definition: execstate.cpp:282
void refresh() override
Definition: execstate.cpp:321
Dead(Executable *pExecutable, FSM::IEventSink *pEventSink)
Class Dead.
Definition: execstate.cpp:509
void moduleFailed() override
Definition: execstate.cpp:550
void start() override
Definition: execstate.cpp:518
void stop() override
Definition: execstate.cpp:523
void kill() override
Definition: execstate.cpp:529
void refresh() override
Definition: execstate.cpp:536
Dying(Executable *pExecutable, FSM::IEventSink *pEventSink)
Class Dying.
Definition: execstate.cpp:400
void disconnectAllPorts() override
Definition: execstate.cpp:457
void refresh() override
Definition: execstate.cpp:482
void kill() override
Definition: execstate.cpp:493
void stopModule() override
Definition: execstate.cpp:409
void moduleFailed() override
Definition: execstate.cpp:495
void killModule() override
Definition: execstate.cpp:435
Singleton class ErrorLogger.
Definition: utility.h:57
void addError(const char *szError)
Definition: utility.cpp:118
void addWarning(const char *szWarning)
Definition: utility.cpp:104
static ErrorLogger * Instance()
Singleton class ErrorLogger.
Definition: utility.cpp:98
static FSM::Event * startModuleEventOk
Definition: execstate.h:48
static FSM::Event * startModuleEventFailed
Definition: execstate.h:49
static FSM::Event * stopModuleEventFailed
Definition: execstate.h:51
static FSM::Event * killModuleEventOk
Definition: execstate.h:52
static FSM::Event * startEvent
Initializing event factory.
Definition: execstate.h:43
static FSM::Event * killEvent
Definition: execstate.h:45
static FSM::Event * connectAllPortsEventOk
Definition: execstate.h:54
static FSM::Event * killModuleEventFailed
Definition: execstate.h:53
static FSM::Event * failedEvent
Definition: execstate.h:46
static FSM::Event * recoverEvent
Definition: execstate.h:47
static FSM::Event * stopModuleEventOk
Definition: execstate.h:50
static FSM::Event * stopEvent
Definition: execstate.h:44
ExecMachine(Executable *pExecutable)
Class ExecMachine.
Definition: execstate.cpp:557
void onTransition(FSM::StateBase *previous, FSM::Event *event, FSM::StateBase *current) override
Callback onTransition represents the change in the states.
Definition: execstate.cpp:718
void connectionFailed(void *which)
Definition: execstate.cpp:709
Class Executable.
Definition: executable.h:72
const char * getCommand()
Definition: executable.h:102
ResourceContainer & getResources()
Definition: executable.h:92
CnnContainer & getConnections()
Definition: executable.h:90
const char * getHost()
Definition: executable.h:104
all transitions are used in state machine
Definition: execstate.h:23
virtual void stopModule()
Definition: execstate.h:33
virtual void startModule()
Definition: execstate.h:32
virtual void connectionFailed(void *which)
Definition: execstate.h:29
virtual void disconnectAllPorts()
Definition: execstate.h:36
virtual void start()
Definition: execstate.h:30
virtual void connectAllPorts()
Definition: execstate.h:35
virtual void moduleFailed()=0
virtual void killModule()
Definition: execstate.h:34
virtual void refresh()
Definition: execstate.h:28
virtual void stop()
Definition: execstate.h:31
virtual void onCnnFailed(void *which)
Definition: executable.h:60
virtual void onExecutableFailed(void *which)
Definition: executable.h:56
virtual void onExecutableStop(void *which)
Definition: executable.h:54
virtual void onExecutableDied(void *which)
Definition: executable.h:55
virtual void onCnnStablished(void *which)
Definition: executable.h:58
virtual void onCnnReleased(void *which)
Definition: executable.h:59
virtual void onExecutableStart(void *which)
Definition: executable.h:53
virtual void onError(void *which)
Definition: executable.h:61
class Ready
Definition: execstate.h:85
void moduleFailed() override
Definition: execstate.cpp:254
Ready(Executable *pExecutable, FSM::IEventSink *pEventSink)
Class Ready.
Definition: execstate.cpp:86
void startModule() override
Definition: execstate.cpp:172
void kill() override
Definition: execstate.cpp:248
class Running
Definition: execstate.h:132
void connectionFailed(void *which) override
Definition: execstate.cpp:391
void refresh() override
Definition: execstate.cpp:358
void start() override
Definition: execstate.cpp:369
void moduleFailed() override
Definition: execstate.cpp:385
void stop() override
Definition: execstate.cpp:375
Running(Executable *pExecutable, FSM::IEventSink *pEventSink)
Class Running.
Definition: execstate.cpp:349
void kill() override
Definition: execstate.cpp:380
class Suspended
Definition: execstate.h:64
void kill() override
Definition: execstate.cpp:59
void stop() override
Definition: execstate.cpp:54
void refresh() override
Definition: execstate.cpp:69
Executable * executable
Definition: execstate.h:77
void moduleFailed() override
Definition: execstate.cpp:64
void start() override
Definition: execstate.cpp:49
static double nowSystem()
Definition: SystemClock.cpp:34
static void delaySystem(double seconds)
Definition: SystemClock.cpp:29
Definition: fsm.h:17
bool compareString(const char *szFirst, const char *szSecond)
Definition: utility.cpp:310
std::vector< ResYarpPort >::iterator ResourceIterator
Definition: application.h:154
std::vector< Connection >::iterator CnnIterator
Definition: application.h:151
std::stringstream OSTRINGSTREAM
Definition: utility.h:49