YARP
Yet Another Robot Platform
 
Loading...
Searching...
No Matches
safe_manager.cpp
Go to the documentation of this file.
1/*
2 * SPDX-FileCopyrightText: 2006-2021 Istituto Italiano di Tecnologia (IIT)
3 * SPDX-License-Identifier: LGPL-2.1-or-later
4 */
5
6#include "safe_manager.h"
7
8using namespace yarp::manager;
9
10#define WAIT_SEMAPHOR() waitSemaphore();
11
12#define POST_SEMAPHOR() postSemaphore();
13
14
16 m_pConfig(nullptr),
17 action(MNOTHING),
18 eventReceiver(nullptr),
19 busyAction(false)
20{}
21
23
27 eventReceiver = nullptr;
29}
30
32 yarp::os::Property* pConfig, ApplicationEvent* event)
33{
34
35 eventReceiver = event;
36 m_pConfig = pConfig;
37
38 if (pConfig->find("watchdog").asString() == "yes") {
40 } else {
42 }
43
44 if (pConfig->find("auto_dependency").asString() == "yes") {
46 } else {
48 }
49
50 if (pConfig->find("auto_connect").asString() == "yes") {
52 } else {
54 }
55
56 // making manager from lazy manager
57 KnowledgeBase* lazy_kb = lazy->getKnowledgeBase();
58
59 ModulePContainer mods = lazy_kb->getModules();
60 for (auto& mod : mods) {
62 }
63
64 ResourcePContainer res = lazy_kb->getResources();
65 for (auto& re : res) {
67 }
68
69 ApplicaitonPContainer apps = lazy_kb->getApplications();
70 for (auto& app : apps) {
72 }
73
74 return true;
75}
76
77
78
80 return true;
81}
82
86
88 bool ret;
90 ret = busyAction;
92 return ret;
93}
94
96{
98 ThreadAction local_action = action;
99 std::vector<int> local_modIds = modIds;
100 std::vector<int> local_conIds = conIds;
101 std::vector<int> local_resIds = resIds;
103
104 switch(local_action){
105 case MRUN:{
106 std::vector<double> waitVec;
107 for (int local_modId : local_modIds)
108 {
109 Executable * exec = Manager::getExecutableById(local_modId);
110 if (exec)
111 {
112 waitVec.push_back(exec->getPostExecWait());
113 }
114 }
115 double minWait=*std::min_element(waitVec.begin(), waitVec.end());
116 for (int local_modId : local_modIds)
117 {
118 Executable * exec = Manager::getExecutableById(local_modId);
119 if (exec)
120 {
121 exec->setPostExecWait(exec->getPostExecWait() - minWait);
122 }
123 Manager::run(local_modId, true);
124 }
125
126 /*
127 for(unsigned int i=0; i<local_modIds.size(); i++)
128 Manager::waitingModuleRun(local_modIds[i]);
129
130 for(unsigned int i=0; i<local_conIds.size(); i++)
131 {
132 if(Manager::connected(local_conIds[i]))
133 {
134 if(eventReceiver) eventReceiver->onConConnect(local_conIds[i]);
135 }
136 else
137 {
138 if(eventReceiver) eventReceiver->onConDisconnect(local_conIds[i]);
139 }
140 refreshPortStatus(local_conIds[i]);
141 }
142 for(unsigned int i=0; i<local_resIds.size(); i++)
143 {
144 if(Manager::exist(local_resIds[i]))
145 {
146 if(eventReceiver) eventReceiver->onResAvailable(local_resIds[i]);
147 }
148 else
149 {
150 if(eventReceiver) eventReceiver->onResUnAvailable(local_resIds[i]);
151 }
152 }*/
153 break;
154 }
155 case MSTOP:{
156 std::vector<double> waitVec;
157 for (int local_modId : local_modIds)
158 {
159 Executable * exec = Manager::getExecutableById(local_modId);
160 if (exec)
161 {
162 waitVec.push_back(exec->getPostStopWait());
163 }
164 }
165 double minWait=*std::min_element(waitVec.begin(), waitVec.end());
166 for (int local_modId : local_modIds)
167 {
168 Executable * exec = Manager::getExecutableById(local_modId);
169 if (exec)
170 {
171 exec->setPostStopWait(exec->getPostStopWait() - minWait);
172 }
173 Manager::stop(local_modId, true);
174 }
175 /*for(unsigned int i=0; i<local_modIds.size(); i++)
176 Manager::waitingModuleStop(local_modIds[i]);
177
178 for(unsigned int i=0; i<local_conIds.size(); i++)
179 {
180 if(Manager::connected(local_conIds[i]))
181 {
182 if(eventReceiver) eventReceiver->onConConnect(local_conIds[i]);
183 }
184 else
185 {
186 if(eventReceiver) eventReceiver->onConDisconnect(local_conIds[i]);
187 }
188 refreshPortStatus(local_conIds[i]);
189 }
190 for(unsigned int i=0; i<local_resIds.size(); i++)
191 {
192 if(Manager::exist(local_resIds[i]))
193 {
194 if(eventReceiver) eventReceiver->onResAvailable(local_resIds[i]);
195 }
196 else
197 {
198 if(eventReceiver) eventReceiver->onResUnAvailable(local_resIds[i]);
199 }
200 }*/
201 break;
202 }
203 case MKILL:{
204 for (int local_modId : local_modIds) {
205 Manager::kill(local_modId, true);
206 }
207 /*for(unsigned int i=0; i<local_modIds.size(); i++)
208 Manager::waitingModuleKill(local_modIds[i]);
209
210 for(unsigned int i=0; i<local_conIds.size(); i++)
211 {
212 if(Manager::connected(local_conIds[i]))
213 {
214 if(eventReceiver) eventReceiver->onConConnect(local_conIds[i]);
215 }
216 else
217 {
218 if(eventReceiver) eventReceiver->onConDisconnect(local_conIds[i]);
219 }
220 refreshPortStatus(local_conIds[i]);
221 }
222 for(unsigned int i=0; i<local_resIds.size(); i++)
223 {
224 if(Manager::exist(local_resIds[i]))
225 {
226 if(eventReceiver) eventReceiver->onResAvailable(local_resIds[i]);
227 }
228 else
229 {
230 if(eventReceiver) eventReceiver->onResUnAvailable(local_resIds[i]);
231 }
232 }*/
233 break;
234 }
235 case MCONNECT:{
236 for(int local_conId : local_conIds)
237 {
238 refreshPortStatus(local_conId);
239 if(Manager::connect(local_conId))
240 {
241 if (eventReceiver) {
242 eventReceiver->onConConnect(local_conId);
243 }
244 }
245 else
246 {
247 if (eventReceiver) {
248 eventReceiver->onConDisconnect(local_conId);
249 }
250 }
251 }
252 break;
253 }
254 case MDISCONNECT:{
255 for(int local_conId : local_conIds)
256 {
257 refreshPortStatus(local_conId);
258 if(Manager::disconnect(local_conId))
259 {
260 if (eventReceiver) {
261 eventReceiver->onConDisconnect(local_conId);
262 }
263 }
264 else
265 {
266 if (eventReceiver) {
267 eventReceiver->onConConnect(local_conId);
268 }
269 }
270 }
271 break;
272 }
273
274 case MREFRESH:{
275 busyAction = true;
276
277 for(int local_modId : local_modIds)
278 {
279 if(Manager::running(local_modId))
280 {
281 if (eventReceiver) {
282 eventReceiver->onModStart(local_modId);
283 }
284 }
285 else //if(Manager::suspended(local_modIds[i]))
286 {
287 if (eventReceiver) {
288 eventReceiver->onModStop(local_modId);
289 }
290 }
291 }
292
293 for(int local_conId : local_conIds)
294 {
295 refreshPortStatus(local_conId);
296 if(Manager::connected(local_conId))
297 {
298 if (eventReceiver) {
299 eventReceiver->onConConnect(local_conId);
300 }
301 }
302 else
303 {
304 if (eventReceiver) {
305 eventReceiver->onConDisconnect(local_conId);
306 }
307 }
308 }
309
310 for(int local_resId : local_resIds)
311 {
312 if(Manager::exist(local_resId))
313 {
314 if (eventReceiver) {
315 eventReceiver->onResAvailable(local_resId);
316 }
317 }
318 else
319 {
320 if (eventReceiver) {
321 eventReceiver->onResUnAvailable(local_resId);
322 }
323 }
324 }
325 busyAction = false;
326 break;
327 }
328
329 case MREFRESH_CNN:{
330 for(int local_conId : local_conIds)
331 {
332 refreshPortStatus(local_conId);
333 if(Manager::connected(local_conId))
334 {
335 if (eventReceiver) {
336 eventReceiver->onConConnect(local_conId);
337 }
338 }
339 else
340 {
341 if (eventReceiver) {
342 eventReceiver->onConDisconnect(local_conId);
343 }
344 }
345 }
346 break;
347 }
348
349 case MATTACHSTDOUT:{
350 for (int local_modId : local_modIds) {
351 Manager::attachStdout(local_modId);
352 }
353 break;
354 }
355
356 case MDETACHSTDOUT:{
357 for (int local_modId : local_modIds) {
358 Manager::detachStdout(local_modId);
359 }
360 break;
361 }
362
363 case MLOADBALANCE:{
364 busyAction = true;
366 if (eventReceiver) {
367 eventReceiver->onLoadBalance();
368 }
369 busyAction = false;
370 break;
371 }
372
373
374 default:
375 break;
376 };
377
378 if (eventReceiver) {
379 eventReceiver->onError();
380 }
381}
382
383void SafeManager::safeRun(std::vector<int>& MIDs, std::vector<int>& CIDs, std::vector<int> &RIDs)
384{
385 if (busy()) {
386 return;
387 }
388
390 modIds = MIDs;
391 conIds = CIDs;
392 resIds = RIDs;
393 action = MRUN;
397 }
398}
399
400void SafeManager::safeStop(std::vector<int>& MIDs, std::vector<int>& CIDs, std::vector<int> &RIDs)
401{
402 if (busy()) {
403 return;
404 }
405
407 modIds = MIDs;
408 conIds = CIDs;
409 resIds = RIDs;
410 action = MSTOP;
414 }
415}
416
417void SafeManager::safeKill(std::vector<int>& MIDs, std::vector<int> &CIDs, std::vector<int> &RIDs)
418{
419 if (busy()) {
420 return;
421 }
422
424 modIds = MIDs;
425 conIds = CIDs;
426 resIds = RIDs;
427 action = MKILL;
431 }
432}
433
434
435void SafeManager::safeConnect(std::vector<int>& CIDs)
436{
437 if (busy()) {
438 return;
439 }
440
442 conIds = CIDs;
443 action = MCONNECT;
447 }
448}
449
450
451void SafeManager::safeDisconnect(std::vector<int>& CIDs)
452{
453 if (busy()) {
454 return;
455 }
456
458 conIds = CIDs;
459 action = MDISCONNECT;
463 }
464}
465
466
467void SafeManager::safeRefresh(std::vector<int>& MIDs,
468 std::vector<int>& CIDs,
469 std::vector<int>& RIDs)
470{
471 if (busy()) {
472 return;
473 }
474
476 modIds = MIDs;
477 conIds = CIDs;
478 resIds = RIDs;
479 action = MREFRESH;
483 }
484}
485
486
487void SafeManager::safeAttachStdout(std::vector<int>& MIDs)
488{
489 if (busy()) {
490 return;
491 }
493 modIds = MIDs;
494 action = MATTACHSTDOUT;
498 }
499}
500
501void SafeManager::safeDetachStdout(std::vector<int>& MIDs)
502{
503 if (busy()) {
504 return;
505 }
506
508 modIds = MIDs;
509 action = MDETACHSTDOUT;
513 }
514}
515
517{
518 if (busy()) {
519 return;
520 }
521
523 action = MLOADBALANCE;
527 }
528}
529
531{
533 auto* exe = static_cast<Executable*>(which);
534 if (eventReceiver && exe) {
535 eventReceiver->onModStart(exe->getID());
536 }
538}
539
541{
543 auto* exe = static_cast<Executable*>(which);
544 if (eventReceiver && exe) {
545 eventReceiver->onModStop(exe->getID());
546 }
548 // Experimental:
549 // do auto refresh on connections whenever a module stops
550 /*
551 if(checkSemaphore())
552 {
553 if(!isRunning())
554 {
555 conIds.clear();
556 for(int i=0; i<getConnections().size(); i++)
557 conIds.push_back(i);
558 action = MREFRESH_CNN;
559 yarp::os::Thread::start();
560 }
561 POST_SEMAPHOR();
562 }
563 */
564}
565
567{
569 auto* exe = static_cast<Executable*>(which);
570 if (eventReceiver && exe) {
571 eventReceiver->onModStop(exe->getID());
572 }
574}
575
576
578{
581 auto* exe = static_cast<Executable*>(which);
582 if(exe)
583 {
584 if(m_pConfig->find("module_failure").asString() == "prompt")
585 {
586 OSTRINGSTREAM err;
587 err<<exe->getCommand()<<" from "<<exe->getHost()<<" is failed! [id:"<<exe->getID()<<"]";
588 logger->addError(err);
589 if (eventReceiver && exe) {
590 eventReceiver->onModStop(exe->getID());
591 }
592 }
593
594 if(m_pConfig->find("module_failure").asString() == "recover")
595 {
596 OSTRINGSTREAM err;
597 err<<exe->getCommand()<<" from "<<exe->getHost()<<" is failed! [id:"<<exe->getID()<<"] (restarting...)";
598 logger->addError(err);
599 exe->start();
600 }
601
602 if(m_pConfig->find("module_failure").asString() == "terminate")
603 {
604 OSTRINGSTREAM err;
605 err<<exe->getCommand()<<" from "<<exe->getHost()<<" is failed! [id:"<<exe->getID()<<"] (terminating...)";
606 logger->addError(err);
608 }
609 }
610
611 if (eventReceiver) {
612 eventReceiver->onError();
613 }
615}
616
617
618void SafeManager::onCnnStablished(void* which) { }
619
620
622{
625 auto* cnn = static_cast<Connection*>(which);
626 if(cnn)
627 {
628 if( m_pConfig->find("connection_failure").asString() == "prompt")
629 {
630 OSTRINGSTREAM err;
631 err<<"connection failed between "<<cnn->from()<<" and "<<cnn->to();
632 logger->addError(err);
633 }
634
635 if(m_pConfig->find("connection_failure").asString() == "terminate")
636 {
637 OSTRINGSTREAM err;
638 err<<"connection failed between "<<cnn->from()<<" and "<<cnn->to()<<" (terminating...)";
639 logger->addError(err);
641 }
642 }
643
644 if (eventReceiver) {
645 eventReceiver->onError();
646 }
648}
649
650
651void SafeManager::onExecutableStdout(void* which, const char* msg)
652{
654 auto* exe = static_cast<Executable*>(which);
655 if (eventReceiver) {
656 eventReceiver->onModStdout(exe->getID(), msg);
657 }
659}
660
661void SafeManager::onError(void* which)
662{
664 if (eventReceiver) {
665 eventReceiver->onError();
666 }
668}
669
670void SafeManager::refreshPortStatus(int id)
671{
672 // refreshing ports status
674 {
675 if (eventReceiver) {
676 eventReceiver->onConAvailable(id, -1);
677 }
678 }
679 else
680 {
681 if (eventReceiver) {
682 eventReceiver->onConUnAvailable(id, -1);
683 }
684 }
685
687 {
688 if (eventReceiver) {
689 eventReceiver->onConAvailable(-1, id);
690 }
691 }
692 else
693 {
694 if (eventReceiver) {
695 eventReceiver->onConUnAvailable(-1, id);
696 }
697 }
698}
bool ret
virtual void onResUnAvailable(int which)
virtual void onConDisconnect(int which)
virtual void onConUnAvailable(int from, int to)
virtual void onLoadBalance()
virtual void onConConnect(int which)
virtual void onModStart(int which)
virtual void onConAvailable(int from, int to)
virtual void onModStdout(int which, const char *msg)
virtual void onModStop(int which)
virtual void onError()
virtual void onResAvailable(int which)
~SafeManager() override
void safeConnect(std::vector< int > &CIDs)
void onCnnStablished(void *which) override
void safeStop(std::vector< int > &MIDs, std::vector< int > &CIDs, std::vector< int > &RIDs)
void safeRun(std::vector< int > &MIDs, std::vector< int > &CIDs, std::vector< int > &RIDs)
void onCnnFailed(void *which) override
void safeKill(std::vector< int > &MIDs, std::vector< int > &CIDs, std::vector< int > &RIDs)
void safeRefresh(std::vector< int > &MIDs, std::vector< int > &CIDs, std::vector< int > &RIDs)
void safeLoadBalance()
void onExecutableStdout(void *which, const char *msg) override
void run() override
Main body of the new thread.
void onExecutableDied(void *which) override
void onExecutableStart(void *which) override
bool threadInit() override
Initialization method.
void safeDetachStdout(std::vector< int > &MIDs)
void onError(void *which) override
void onExecutableStop(void *which) override
void threadRelease() override
Release method.
void safeAttachStdout(std::vector< int > &MIDs)
bool prepare(yarp::manager::Manager *lazy, yarp::os::Property *config, ApplicationEvent *event=nullptr)
void safeDisconnect(std::vector< int > &CDs)
void onExecutableFailed(void *which) override
Class Connection.
Definition application.h:56
Singleton class ErrorLogger.
Definition utility.h:58
void addError(const char *szError)
Definition utility.cpp:126
static ErrorLogger * Instance()
Singleton class ErrorLogger.
Definition utility.cpp:98
Class Executable.
Definition executable.h:71
void setPostStopWait(double t)
Definition executable.h:112
void setPostExecWait(double t)
Definition executable.h:110
const char * getCommand()
Definition executable.h:101
Class KnowledgeBase.
Definition kbase.h:32
const ModulePContainer & getModules(Application *parent=nullptr)
Definition kbase.cpp:217
bool addResource(GenericResource *resource)
Definition kbase.cpp:144
bool addModule(Module *module)
Definition kbase.cpp:120
const ApplicaitonPContainer & getApplications(Application *parent=nullptr)
Definition kbase.cpp:188
const ResourcePContainer & getResources(Application *parent=nullptr)
Definition kbase.cpp:261
bool addApplication(Application *application, char **szAppName_=nullptr, bool modifyName=false)
Definition kbase.cpp:76
Class Manager.
Definition manager.h:20
bool detachStdout(unsigned int id)
Definition manager.cpp:1353
bool attachStdout(unsigned int id)
Definition manager.cpp:1331
bool existPortFrom(unsigned int id)
Definition manager.cpp:731
bool exist(unsigned int id)
Definition manager.cpp:489
bool existPortTo(unsigned int id)
Definition manager.cpp:752
void disableAutoDependency()
Definition manager.h:96
void enableAutoDependency()
Definition manager.h:95
KnowledgeBase * getKnowledgeBase()
Definition manager.h:103
void disableAutoConnect()
Definition manager.h:94
Executable * getExecutableById(size_t id)
Definition manager.cpp:283
A class for storing options and configuration information.
Definition Property.h:33
Value & find(const std::string &key) const override
Gets a value corresponding to a given keyword.
bool stop()
Stop the thread.
Definition Thread.cpp:81
bool isRunning()
Returns true if the thread is running (Thread::start has been called successfully and the thread has ...
Definition Thread.cpp:105
bool start()
Start the new thread running.
Definition Thread.cpp:93
virtual std::string asString() const
Get string value.
Definition Value.cpp:234
std::vector< Module * > ModulePContainer
Definition module.h:230
std::stringstream OSTRINGSTREAM
Definition utility.h:50
std::vector< GenericResource * > ResourcePContainer
Definition resource.h:58
std::vector< Application * > ApplicaitonPContainer
#define WAIT_SEMAPHOR()
#define POST_SEMAPHOR()
@ MLOADBALANCE
@ MNOTHING
@ MATTACHSTDOUT
@ MDISCONNECT
@ MREFRESH
@ MDETACHSTDOUT
@ MREFRESH_CNN
@ MKILL
@ MRUN
@ MSTOP
@ MCONNECT
enum __ThreadAction ThreadAction