YARP
Yet Another Robot Platform
manager.cpp
Go to the documentation of this file.
1 /*
2  * SPDX-FileCopyrightText: 2006-2021 Istituto Italiano di Tecnologia (IIT)
3  * SPDX-License-Identifier: BSD-3-Clause
4  */
5 
6 #include <cstring>
7 #include <yarp/manager/manager.h>
17 #include <yarp/os/LogStream.h>
18 
20 
21 
22 #define RUN_TIMEOUT 10 // Run timeout in seconds
23 #define STOP_TIMEOUT 30 // Stop timeout in seconds
24 #define KILL_TIMEOUT 10 // kill timeout in seconds
25 
26 #define BROKER_LOCAL "local"
27 #define BROKER_YARPRUN "yarprun"
28 #define BROKER_YARPDEV "yarpdev"
29 
30 
31 using namespace yarp::manager;
32 
33 
38 Manager::Manager(bool withWatchDog) : MEvent()
39 {
40  logger = ErrorLogger::Instance();
41  bWithWatchDog = withWatchDog;
42  bAutoDependancy = false;
43  bAutoConnect = false;
44  bRestricted = false;
45  strDefBroker = BROKER_YARPRUN;
46  knowledge.createFrom(nullptr, nullptr, nullptr);
47  connector.init();
48 }
49 
50 Manager::Manager(const char* szModPath, const char* szAppPath,
51  const char* szResPath, bool withWatchDog)
52 {
53  logger = ErrorLogger::Instance();
54  bWithWatchDog = withWatchDog;
55  bAutoDependancy = false;
56  bAutoConnect = false;
57  bRestricted = false;
58  strDefBroker = BROKER_YARPRUN;
59 
60  XmlModLoader modload(szModPath, nullptr);
61  XmlModLoader* pModLoad = &modload;
62  if (!modload.init()) {
63  pModLoad = nullptr;
64  }
65 
66  XmlAppLoader appload(szAppPath, nullptr);
67  XmlAppLoader* pAppLoad = &appload;
68  if (!appload.init()) {
69  pAppLoad = nullptr;
70  }
71 
72  XmlResLoader resload(szResPath, nullptr);
73  XmlResLoader* pResLoad = &resload;
74  if (!resload.init()) {
75  pResLoad = nullptr;
76  }
77 
78  knowledge.createFrom(pModLoad, pAppLoad, pResLoad);
79  connector.init();
80 }
81 
82 
84 {
85  // untopic persistent connections
86  rmconnect();
87  clearExecutables();
88 }
89 
90 bool Manager::addApplication(const char* szFileName, char** szAppName_, bool modifyName)
91 {
92  if (find(listOfXml.begin(), listOfXml.end(), szFileName) == listOfXml.end()) {
93  listOfXml.emplace_back(szFileName);
94  } else {
95  return true; //it means that the app exist already so it is safe to return true
96  }
97  XmlAppLoader appload(szFileName);
98  if (!appload.init()) {
99  return false;
100  }
101  Application* application = appload.getNextApplication();
102  if (!application) {
103  return false;
104  }
105 
106  return knowledge.addApplication(application, szAppName_, modifyName);
107 }
108 
109 
110 bool Manager::addApplications(const char* szPath)
111 {
112  XmlAppLoader appload(szPath, nullptr);
113  if (!appload.init()) {
114  return false;
115  }
116  Application* application;
117  while((application = appload.getNextApplication()))
118  {
119  const char* currentFile = application->getXmlFile();
120  knowledge.addApplication(application);
121  listOfXml.emplace_back(currentFile);
122  }
123  return true;
124 }
125 
126 
127 bool Manager::addModule(const char* szFileName)
128 {
129  XmlModLoader modload(szFileName);
130  if (!modload.init()) {
131  return false;
132  }
133  Module* module = modload.getNextModule();
134  if (!module) {
135  return false;
136  }
137  return knowledge.addModule(module);
138 }
139 
140 
141 bool Manager::addModules(const char* szPath)
142 {
143  XmlModLoader modload(szPath, nullptr);
144  if (!modload.init()) {
145  return false;
146  }
147  Module* module;
148  while ((module = modload.getNextModule())) {
149  knowledge.addModule(module);
150  }
151  return true;
152 }
153 
154 
155 bool Manager::addResource(const char* szFileName)
156 {
157  XmlResLoader resload(szFileName);
158  if (!resload.init()) {
159  return false;
160  }
161  GenericResource* resource;
162  bool bloaded = false;
163  while ((resource = resload.getNextResource())) {
164  bloaded |= knowledge.addResource(resource);
165  }
166  return bloaded;
167 }
168 
169 
170 bool Manager::addResources(const char* szPath)
171 {
172  XmlResLoader resload(szPath, nullptr);
173  if (!resload.init()) {
174  return false;
175  }
176  GenericResource* resource;
177  while ((resource = resload.getNextResource())) {
178  knowledge.addResource(resource);
179  }
180  return true;
181 }
182 
183 
184 bool Manager::removeApplication(const char *szFileName, const char* szAppName)
185 {
186  //Note: use it with care. it is better we first check that no application
187  //is loaded.
188  if(!runnables.empty())
189  {
190  logger->addError("Application cannot be removed if there is a loaded application");
191  return false;
192  }
193  listOfXml.erase(std::remove(listOfXml.begin(), listOfXml.end(), szFileName), listOfXml.end());
194  Application* app = knowledge.getApplication(szAppName);
195  if (!app) {
196  return false;
197  }
198  return knowledge.removeApplication(app);
199 }
200 
201 
202 bool Manager::removeModule(const char* szModName)
203 {
204  //Note: use it with care. it is better we first check that no application
205  //is loaded.
206  if(!runnables.empty())
207  {
208  logger->addError("Module cannot be removed if there is a loaded application");
209  return false;
210  }
211 
212  Module* mod = knowledge.getModule(szModName);
213  if (!mod) {
214  return false;
215  }
216 
217  return knowledge.removeModule(mod);
218 }
219 
220 bool Manager::removeResource(const char* szResName)
221 {
222  //Note: use it with care. it is better we first check that no application
223  //is loaded.
224  if(!runnables.empty())
225  {
226  logger->addError("Resource cannot be removed if there is a loaded application");
227  return false;
228  }
229 
230  GenericResource* res = knowledge.getResource(szResName);
231  if (!res) {
232  return false;
233  }
234 
235  return knowledge.removeResource(res);
236 }
237 
238 
239 
240 bool Manager::loadApplication(const char* szAppName)
241 {
242  __CHECK_NULLPTR(szAppName);
243 
244  if(!allStopped())
245  {
246  logger->addError("Please stop current running application first.");
247  return false;
248  }
249 
250  strAppName = szAppName;
251 
252  // set all resources as unavailable
253  ResourcePContainer allresources = knowledge.getResources();
254  for(auto& allresource : allresources)
255  {
256  auto* comp = dynamic_cast<Computer*>(allresource);
257  if (comp) {
258  comp->setAvailability(false);
259  }
260  }
261 
262  return prepare(true);
263 }
264 
265 
266 bool Manager::saveApplication(const char* szAppName, const char* fileName)
267 {
268  Application* pApp = knowledge.getApplication();
269  __CHECK_NULLPTR(pApp);
270 
271  XmlAppSaver appsaver(fileName);
272  return knowledge.saveApplication(&appsaver, pApp);
273 }
274 
275 
277 {
278  updateResources();
279  bool ret = prepare(false);
280  return ret;
281 }
282 
284 {
285  if (id < runnables.size())
286  {
287  return runnables[id];
288  }
289  else
290  {
291  return nullptr;
292  }
293 }
294 
295 bool Manager::switchBroker(size_t id)
296 {
297  Executable* exe = getExecutableById(id);
298  if (removeBroker(exe))
299  {
300  modules[id]->setHost(exe->getHost());
301  Broker* broker = createBroker(modules[id]);
302  if ( broker == nullptr)
303  {
304  return false;
305  }
306  broker->setDisplay(modules[id]->getDisplay());
307  exe->setAndInitializeBroker(broker);
308  }
309  else
310  {
311  return false;
312  }
313  return true;
314 }
315 
316 
317 bool Manager::prepare(bool silent)
318 {
319  knowledge.reasolveDependency(strAppName.c_str(), bAutoDependancy, silent);
320 
321  clearExecutables();
322  connections.clear();
323  modules.clear();
324  resources.clear();
325  connections = knowledge.getSelConnection();
326  modules = knowledge.getSelModules();
327  resources = knowledge.getSelResources();
328 
337  ModulePIterator itr;
338  int id = 0;
339  for(itr=modules.begin(); itr!=modules.end(); itr++)
340  {
341  Broker* broker = createBroker(*itr);
342  broker->setDisplay((*itr)->getDisplay());
343  auto* exe = new Executable(broker, (MEvent*)this, *itr, bWithWatchDog);
344  exe->setID(id++);
345  exe->setCommand((*itr)->getName());
346  exe->setParam((*itr)->getParam());
347  exe->setHost((*itr)->getHost());
348  exe->setStdio((*itr)->getStdio());
349  exe->setWorkDir((*itr)->getWorkDir());
350  exe->setPostExecWait((*itr)->getPostExecWait());
351  exe->setPostStopWait((*itr)->getPostStopWait());
352  exe->setOriginalPostExecWait((*itr)->getPostExecWait());
353  exe->setOriginalPostStopWait((*itr)->getPostStopWait());
354  std::string env;
355  if ((*itr)->getPrefix() && strlen((*itr)->getPrefix())) {
356  env = std::string("YARP_PORT_PREFIX=") + std::string((*itr)->getPrefix());
357  }
358  if ((*itr)->getEnvironment() && strlen((*itr)->getEnvironment())) {
359  env += (env.length()) ? (std::string(";") + (*itr)->getEnvironment()) : (*itr)->getEnvironment();
360  }
361  exe->setEnv(env.c_str());
362 
368  //CnnIterator cnn;
369  //for(cnn=connections.begin(); cnn!=connections.end(); cnn++)
370  // if((*cnn).owner() == (*itr))
371  // exe->addConnection(*cnn);
372 
376  for(auto& resource : resources)
377  {
378  auto* res = dynamic_cast<ResYarpPort*>(resource);
379  if (res && (res->owner() == (*itr))) {
380  exe->addResource(*res);
381  }
382  }
383 
384  runnables.push_back(exe);
385  }
386 
387  return true;
388 }
389 
390 Broker* Manager::createBroker(Module* module)
391 {
392  if(strlen(module->getBroker()) == 0)
393  {
394  if (compareString(module->getHost(), "localhost")) {
395  return (new LocalBroker());
396  } else {
397  return (new YarpBroker());
398  }
399  }
400  else if(compareString(module->getBroker(), BROKER_YARPDEV))
401  {
402  if (compareString(module->getHost(), "localhost")) {
403  return (new YarpdevLocalBroker());
404  } else {
405  return (new YarpdevYarprunBroker());
406  }
407  } else if (compareString(module->getHost(), "localhost")) {
408  return (new ScriptLocalBroker(module->getBroker()));
409  }
410 
411  return (new ScriptYarprunBroker(module->getBroker()));
412 }
413 
414 bool Manager::removeBroker(Executable* exe)
415 {
416  if (exe == nullptr)
417  {
418  return false;
419  }
420  else if(exe->state() == RUNNING)
421  {
422  exe->stop();
423  exe->stopWatchDog();
424  }
425 
426  exe->removeBroker(); //TODO possible race condition in case watchdog enabled.
427  return true;
428 }
429 
430 bool Manager::updateExecutable(unsigned int id, const char* szparam,
431  const char* szhost, const char* szstdio,
432  const char* szworkdir, const char* szenv )
433 {
434  if(runnables.empty())
435  {
436  logger->addError("Application is not loaded.");
437  return false;
438  }
439 
440  if(id>=runnables.size())
441  {
442  logger->addError("Module id is out of range.");
443  return false;
444  }
445 
446  Executable* exe = runnables[id];
447  exe->setParam(szparam);
448  exe->setHost(szhost);
449  exe->setStdio(szstdio);
450  exe->setWorkDir(szworkdir);
451  exe->setEnv(szenv);
452  return true;
453 }
454 
455 
456 bool Manager::updateConnection(unsigned int id, const char* from,
457  const char* to, const char* carrier)
458 {
459  if(id>=connections.size())
460  {
461  logger->addError("Connection id is out of range.");
462  return false;
463  }
464 
465  /*
466  if(connections[id].owner())
467  {
468  OSTRINGSTREAM msg;
469  msg<<"Connection ["<<connections[id].from()<<" -> ";
470  msg<<connections[id].to()<<"] cannot be updated.";
471  logger->addWarning(msg);
472  return false;
473  }
474  */
475 
476  connections[id].setFrom(from);
477  connections[id].setTo(to);
478  connections[id].setCarrier(carrier);
479 
480  return true;
481 }
482 
483 Node* Manager::getNode(std::string appName)
484 {
485  Node* node = knowledge.getNode(appName);
486  return node;
487 }
488 
489 bool Manager::exist(unsigned int id)
490 {
491  if(id>=resources.size())
492  {
493  logger->addError("Resource id is out of range.");
494  return false;
495  }
496 
497  GenericResource* res = resources[id];
498  if (compareString(res->getName(), "localhost")) {
499  return true;
500  }
501 
502  if(dynamic_cast<Computer*>(res) || dynamic_cast<ResYarpPort*>(res))
503  {
504  if(res->getName())
505  {
506  //YarpBroker broker;
507  //broker.init();
508  std::string strPort = res->getName();
509  if (strPort[0] != '/') {
510  strPort = std::string("/") + strPort;
511  }
512  if(dynamic_cast<ResYarpPort*>(res))
513  {
514  res->setAvailability(connector.exists(strPort.c_str()));
515  }
516  else //if it is a computer I have to be sure that the port has been opened through yarp runner
517  {
518  yarp::os::Bottle cmd, reply;
519  cmd.addString("get");
520  cmd.addString(strPort);
521  cmd.addString("yarprun");
523  if(!ret)
524  {
525  yError()<<"Manager::Cannot contact the NameClient";
526  return false;
527  }
528  if(reply.size()==6)
529  {
530  if(reply.get(5).asBool())
531  {
532  res->setAvailability(true);
533  }
534  else
535  {
536  res->setAvailability(false);
537  }
538 
539  }
540  else
541  {
542  res->setAvailability(false);
543  }
544 
545  }
546 
547  }
548  }
549  return res->getAvailability();
550 }
551 
552 
554 {
555  YarpBroker broker;
556  broker.init();
557 
558  // finding all available yarp ports
559  std::vector<std::string> ports;
560  broker.getAllPorts(ports);
561 
562  ResourcePContainer allresources = knowledge.getResources();
563  for(auto& allresource : allresources)
564  {
565  auto* comp = dynamic_cast<Computer*>(allresource);
566  if(comp && updateResource(comp))
567  {
568  //set all as unavailable
569  for(int i=0; i<comp->peripheralCount(); i++)
570  {
571  auto* res = dynamic_cast<ResYarpPort*>(&comp->getPeripheralAt(i));
572  if (res) {
573  res->setAvailability(false);
574  }
575  }
576 
577  // adding all available yarp ports as peripherals
578  for(auto& port : ports)
579  {
580  ResYarpPort resport;
581  resport.setName(port.c_str());
582  resport.setPort(port.c_str());
583 
584  bool bfound = false;
585  for(int i=0; i<comp->peripheralCount(); i++)
586  {
587  auto* res = dynamic_cast<ResYarpPort*>(&comp->getPeripheralAt(i));
588  if(res && (std::string(res->getName()) == std::string(resport.getName())))
589  {
590  res->setAvailability(true);
591  bfound = true;
592  break;
593  }
594  }
595  if (!bfound) {
596  comp->addPeripheral(resport);
597  }
598  }
599  }
600  } // end of for
601 
602  return true;
603 }
604 
605 
606 bool Manager::updateResource(const char* szName)
607 {
608  GenericResource* res = knowledge.getResource(szName);
609  if (!res) {
610  return false;
611  }
612  return updateResource(res);
613 }
614 
616 {
617  YarpBroker broker;
618  broker.init();
619 
620  auto* comp = dynamic_cast<Computer*>(resource);
621  if (!comp || !strlen(comp->getName())) {
622  return false;
623  }
624 
625  if (compareString(comp->getName(), "localhost")) {
626  return false;
627  }
628 
630  std::string strServer = comp->getName();
631  if (strServer[0] != '/') {
632  strServer = std::string("/") + strServer;
633  }
634  if(!broker.getSystemInfo(strServer.c_str(), info))
635  {
636  logger->addError(broker.error());
637  comp->setAvailability(false);
638  }
639  else
640  {
641  comp->setAvailability(true);
642 
643  comp->getMemory().setTotalSpace(info.memory.totalSpace*1024);
644  comp->getMemory().setFreeSpace(info.memory.freeSpace*1024);
645 
646  comp->getStorage().setTotalSpace(info.storage.totalSpace*1024);
647  comp->getStorage().setFreeSpace(info.storage.freeSpace*1024);
648 
649  //comp->getNetwork().setIP4(info.network.ip4.c_str());
650  //comp->getNetwork().setIP6(info.network.ip6.c_str());
651  //comp->getNetwork().setMAC(info.network.mac.c_str());
652 
653 
654  comp->getProcessor().setArchitecture(info.processor.architecture.c_str());
655  comp->getProcessor().setCores(info.processor.cores);
656  comp->getProcessor().setSiblings(info.processor.siblings);
657  comp->getProcessor().setFrequency(info.processor.frequency);
658  comp->getProcessor().setModel(info.processor.model.c_str());
659  LoadAvg load;
660  load.loadAverageInstant = (double)info.load.cpuLoadInstant;
661  load.loadAverage1 = info.load.cpuLoad1;
662  load.loadAverage5 = info.load.cpuLoad5;
663  load.loadAverage15 = info.load.cpuLoad15;
664  comp->getProcessor().setCPULoad(load);
665 
666  comp->getPlatform().setName(info.platform.name.c_str());
667  comp->getPlatform().setDistribution(info.platform.distribution.c_str());
668  comp->getPlatform().setRelease(info.platform.release.c_str());
669  }
670  return true;
671 }
672 
673 bool Manager::waitingModuleRun(unsigned int id)
674 {
675  double base = yarp::os::Time::now();
676  double wait = runnables[id]->getPostExecWait() + RUN_TIMEOUT;
677  while (!timeout(base, wait)) {
678  if (running(id)) {
679  return true;
680  }
681  }
682 
683  OSTRINGSTREAM msg;
684  msg<<"Failed to run "<<runnables[id]->getCommand();
685  msg<<" on "<<runnables[id]->getHost();
686  msg<<". (State: "<<runnables[id]->state();
687  msg<<", parameter: "<<runnables[id]->getParam()<<")";
688  logger->addError(msg);
689  return false;
690 
691 }
692 
693 bool Manager::waitingModuleStop(unsigned int id)
694 {
695  double base = yarp::os::Time::now();
696  while (!timeout(base, STOP_TIMEOUT)) {
697  if (!running(id)) {
698  return true;
699  }
700  }
701 
702  OSTRINGSTREAM msg;
703  msg<<"Failed to stop "<<runnables[id]->getCommand();
704  msg<<" on "<<runnables[id]->getHost();
705  msg<<". (State: "<<runnables[id]->state();
706  msg<<", paramete: "<<runnables[id]->getParam()<<")";
707  logger->addError(msg);
708  return false;
709 }
710 
711 bool Manager::waitingModuleKill(unsigned int id)
712 {
713  double base = yarp::os::Time::now();
714  while (!timeout(base, KILL_TIMEOUT)) {
715  if (!running(id)) {
716  return true;
717  }
718  }
719 
720  OSTRINGSTREAM msg;
721  msg<<"Failed to kill "<<runnables[id]->getCommand();
722  msg<<" on "<<runnables[id]->getHost();
723  msg<<". (State: "<<runnables[id]->state();
724  msg<<", paramete: "<<runnables[id]->getParam()<<")";
725  logger->addError(msg);
726  return false;
727 
728 }
729 
730 
731 bool Manager::existPortFrom(unsigned int id)
732 {
733  if(id>=connections.size())
734  {
735  logger->addError("Connection id is out of range.");
736  return false;
737  }
738  std::string portName(connections[id].from());
739  if(portName.find(' ') != std::string::npos)
740  {
741  std::string message = "Port name \"" + portName + "\" contains spaces.";
742  logger->addError(message.c_str());
743  return false;
744  }
745 
746  bool exists = connector.exists(portName.c_str());
747  connections[id].setFromExists(exists);
748  return exists;
749 }
750 
751 
752 bool Manager::existPortTo(unsigned int id)
753 {
754  if(id>=connections.size())
755  {
756  logger->addError("Connection id is out of range.");
757  return false;
758  }
759  std::string portName(connections[id].to());
760  if(portName.find(' ') != std::string::npos)
761  {
762  std::string message = "Port name \"" + portName + "\" contains spaces.";
763  logger->addError(message.c_str());
764  return false;
765  }
766 
767  bool exists = connector.exists(portName.c_str());
768  connections[id].setToExists(exists);
769  return exists;
770 }
771 
772 
774 {
780  bool ret = true;
781  ResourcePIterator itrRes;
782  for(itrRes=resources.begin(); itrRes!=resources.end(); itrRes++)
783  {
784  if(!(*itrRes)->getAvailability())
785  {
786  ret = false;
787  OSTRINGSTREAM err;
788  err<<"Resource "<<(*itrRes)->getName()<<" is not available!";
789  logger->addError(err);
790  }
791  }
792 
793  return ret;
794 }
795 
796 
797 
798 bool Manager::run(unsigned int id, bool async)
799 {
800  if(runnables.empty())
801  {
802  logger->addError("Application is not loaded.");
803  return false;
804  }
805 
806  if(id>=runnables.size())
807  {
808  logger->addError("Module id is out of range.");
809  return false;
810  }
811 
812  if (runnables[id]->shouldChangeBroker())
813  {
814  if (!switchBroker(id))
815  {
816  logger->addError("Failing to switch broker");
817  return false;
818  }
819  }
820 
821  runnables[id]->disableAutoConnect();
822  runnables[id]->start();
823  if(bWithWatchDog) {
825  runnables[id]->startWatchDog();
826  }
827  if (async) {
828  return true;
829  }
830 
831  // waiting for running
832  return waitingModuleRun(id);
833 }
834 
836 {
837  if(runnables.empty())
838  {
839  logger->addError("Application is not loaded.");
840  return false;
841  }
842 
843  if(!checkDependency())
844  {
845  if(bRestricted)
846  {
847  logger->addError("Some of external ports dependency are not satisfied.");
848  return false;
849  } else {
850  logger->addWarning("Some of external ports dependency are not satisfied.");
851  }
852  }
853 
855  double wait = 0.0;
856  for(itr=runnables.begin(); itr!=runnables.end(); itr++)
857  {
858  if (bAutoConnect) {
859  (*itr)->enableAutoConnect();
860  } else {
861  (*itr)->disableAutoConnect();
862  }
863  (*itr)->start();
865  wait = (wait > (*itr)->getPostExecWait()) ? wait : (*itr)->getPostExecWait();
866  }
867 
868  // waiting for running
869  double base = yarp::os::SystemClock::nowSystem();
870  while (!timeout(base, wait + RUN_TIMEOUT)) {
871  if (allRunning()) {
872  break;
873  }
874  }
875 
876  // starting the watchdog if needed
877  if(bWithWatchDog) {
878  for (itr = runnables.begin(); itr != runnables.end(); itr++) {
879  (*itr)->startWatchDog();
880  }
881  }
882 
883  if(!allRunning())
884  {
886  for (itr = runnables.begin(); itr != runnables.end(); itr++) {
887  if((*itr)->state() != RUNNING)
888  {
889  OSTRINGSTREAM msg;
890  msg<<"Failed to run "<<(*itr)->getCommand();
891  msg<<" on "<<(*itr)->getHost();
892  msg<<". (State: "<<(*itr)->state();
893  msg<<", parameter: "<<(*itr)->getParam()<<")";
894  logger->addError(msg);
895  }
896  }
897 
898  if(bRestricted)
899  {
900  kill();
901  return false;
902  }
903  }
904 
905  /* connecting extra ports*/
906  if (bAutoConnect) {
907  if(!connectExtraPorts())
908  {
909  logger->addError("Failed to stablish some of connections.");
910  if (bRestricted) {
911  return false;
912  }
913  }
914  }
915 
916  return true;
917 }
918 
919 bool Manager::stop(unsigned int id, bool async)
920 {
921  if(runnables.empty())
922  {
923  logger->addError("Application is not loaded.");
924  return false;
925  }
926 
927  if(id>=runnables.size())
928  {
929  logger->addError("Module id is out of range.");
930  return false;
931  }
932 
933  runnables[id]->stop();
934 
935  if (async) {
936  return true;
937  }
938 
939  // waiting for stop
940  return waitingModuleStop(id);
941 }
942 
943 
945 {
946  if (runnables.empty()) {
947  return true;
948  }
949 
951  for(itr=runnables.begin(); itr!=runnables.end(); itr++)
952  {
953  (*itr)->stop();
955  }
956 
957  double base = yarp::os::SystemClock::nowSystem();
958  while (!timeout(base, STOP_TIMEOUT)) {
959  if (allStopped()) {
960  break;
961  }
962  }
963 
964  if(!allStopped())
965  {
967  for (itr = runnables.begin(); itr != runnables.end(); itr++) {
968  if( ((*itr)->state() != SUSPENDED) &&
969  ((*itr)->state() != DEAD))
970  {
971  OSTRINGSTREAM msg;
972  msg<<"Failed to stop "<<(*itr)->getCommand();
973  msg<<" on "<<(*itr)->getHost();
974  msg<<". (State: "<<(*itr)->state();
975  msg<<", paramete: "<<(*itr)->getParam()<<")";
976  logger->addError(msg);
977  }
978  }
979  return false;
980  }
981 
982  return true;
983 }
984 
985 bool Manager::kill(unsigned int id, bool async)
986 {
987  if(runnables.empty())
988  {
989  logger->addError("Application is not loaded.");
990  return false;
991  }
992 
993  if(id>=runnables.size())
994  {
995  logger->addError("Module id is out of range.");
996  return false;
997  }
998 
999  runnables[id]->kill();
1000 
1001  if (async) {
1002  return true;
1003  }
1004  return waitingModuleKill(id);
1005 }
1006 
1007 
1009 {
1010  if (runnables.empty()) {
1011  return true;
1012  }
1013 
1014  ExecutablePIterator itr;
1015  for(itr=runnables.begin(); itr!=runnables.end(); itr++)
1016  {
1017  (*itr)->kill();
1019  }
1020 
1021  double base = yarp::os::SystemClock::nowSystem();
1022  while (!timeout(base, KILL_TIMEOUT)) {
1023  if (allStopped()) {
1024  break;
1025  }
1026  }
1027 
1028  if(!allStopped())
1029  {
1030  ExecutablePIterator itr;
1031  for (itr = runnables.begin(); itr != runnables.end(); itr++) {
1032  if( ((*itr)->state() != SUSPENDED) &&
1033  ((*itr)->state() != DEAD))
1034  {
1035  OSTRINGSTREAM msg;
1036  msg<<"Failed to kill "<<(*itr)->getCommand();
1037  msg<<" on "<<(*itr)->getHost();
1038  msg<<". (State: "<<(*itr)->state();
1039  msg<<", paramete: "<<(*itr)->getParam()<<")";
1040  logger->addError(msg);
1041  }
1042  }
1043  return false;
1044  }
1045 
1046  return true;
1047 }
1048 
1049 
1050 void Manager::clearExecutables()
1051 {
1052  ExecutablePIterator itr;
1053  for(itr=runnables.begin(); itr!=runnables.end(); itr++)
1054  {
1055  // broker will be deleted by Executable
1056  delete (*itr);
1057  }
1058  runnables.clear();
1059 }
1060 
1061 
1062 bool Manager::connect(unsigned int id)
1063 {
1064  if(id>=connections.size())
1065  {
1066  logger->addError("Connection id is out of range.");
1067  return false;
1068  }
1069 
1070  //YarpBroker connector;
1071  //connector.init();
1072 
1073  if( !connector.connect(connections[id].from(),
1074  connections[id].to(),
1075  connections[id].carrier(),
1076  connections[id].isPersistent()) )
1077  {
1078  logger->addError(connector.error());
1079  //cout<<connector.error()<<endl;
1080  return false;
1081  }
1082 
1083  // setting the connection Qos if specified
1084  return connector.setQos(connections[id].from(),
1085  connections[id].to(),
1086  connections[id].qosFrom(),
1087  connections[id].qosTo());
1088 }
1089 
1091 {
1092  //YarpBroker connector;
1093  //connector.init();
1094  CnnIterator cnn;
1095  for(cnn=connections.begin(); cnn!=connections.end(); cnn++) {
1096  if( !(*cnn).getFromExists() ||
1097  !(*cnn).getToExists() ||
1098  !connector.connect((*cnn).from(), (*cnn).to(),
1099  (*cnn).carrier(), (*cnn).isPersistent()) )
1100  {
1101  logger->addError(connector.error());
1102  //cout<<connector.error()<<endl;
1103  if (bRestricted) {
1104  return false;
1105  }
1106  }
1107 
1108  // setting the connection Qos if specified
1109  if(! connector.setQos((*cnn).from(), (*cnn).to(),
1110  (*cnn).qosFrom(), (*cnn).qosTo())) {
1111  if (bRestricted) {
1112  return false;
1113  }
1114  }
1115  }
1116  return true;
1117 }
1118 
1119 bool Manager::disconnect(unsigned int id)
1120 {
1121  if(id>=connections.size())
1122  {
1123  logger->addError("Connection id is out of range.");
1124  return false;
1125  }
1126 
1127  //YarpBroker connector;
1128  //connector.init();
1129 
1130  if( !connector.disconnect(connections[id].from(),
1131  connections[id].to(),
1132  connections[id].carrier()) )
1133  {
1134  logger->addError(connector.error());
1135  //cout<<connector.error()<<endl;
1136  return false;
1137  }
1138 
1139  return true;
1140 }
1141 
1143 {
1144  //YarpBroker connector;
1145  //connector.init();
1146  CnnIterator cnn;
1147  for (cnn = connections.begin(); cnn != connections.end(); cnn++) {
1148  if( !connector.disconnect((*cnn).from(), (*cnn).to(), (*cnn).carrier()) )
1149  {
1150  logger->addError(connector.error());
1151  //cout<<connector.error()<<endl;
1152  return false;
1153  }
1154  }
1155  return true;
1156 }
1157 
1158 
1159 bool Manager::rmconnect(unsigned int id)
1160 {
1161  if(id>=connections.size())
1162  {
1163  logger->addError("Connection id is out of range.");
1164  return false;
1165  }
1166 
1167  if(!connector.rmconnect(connections[id].from(),
1168  connections[id].to()) )
1169  {
1170  logger->addError(connector.error());
1171  return false;
1172  }
1173 
1174  return true;
1175 }
1176 
1177 
1179 {
1180  CnnIterator cnn;
1181  for (cnn = connections.begin(); cnn != connections.end(); cnn++) {
1182  if( !connector.rmconnect((*cnn).from(), (*cnn).to()) )
1183  {
1184  logger->addError(connector.error());
1185  return false;
1186  }
1187  }
1188  return true;
1189 }
1190 
1191 
1192 bool Manager::connected(unsigned int id)
1193 {
1194  if(id>=connections.size())
1195  {
1196  logger->addError("Connection id is out of range.");
1197  return false;
1198  }
1199 
1200  return connections[id].getFromExists() &&
1201  connections[id].getToExists() &&
1202  connector.connected(connections[id].from(),
1203  connections[id].to(),
1204  connections[id].carrier());
1205 }
1206 
1207 
1209 {
1210  //YarpBroker connector;
1211  //connector.init();
1212  CnnIterator cnn;
1213  bool bConnected = true;
1214  for (cnn = connections.begin(); cnn != connections.end(); cnn++) {
1215  if (!(*cnn).getFromExists() || !(*cnn).getToExists() || !connector.connected((*cnn).from(), (*cnn).to(), (*cnn).carrier())) {
1216  bConnected = false;
1217  }
1218  }
1219  return bConnected;
1220 }
1221 
1222 bool Manager::checkPortsAvailable(Broker* broker)
1223 {
1224  CnnIterator itr;
1225  for(itr=connections.begin(); itr!=connections.end(); itr++)
1226  {
1227  //if(!(*itr).owner() )
1228  // {
1229  if (!broker->exists((*itr).to()) || !broker->exists((*itr).from())) {
1230  return false;
1231  }
1232  // }
1233  }
1234  return true;
1235 }
1236 
1237 
1238 bool Manager::connectExtraPorts()
1239 {
1240  //YarpBroker connector;
1241  //connector.init();
1242 
1243  double base = yarp::os::SystemClock::nowSystem();
1244  while (!timeout(base, 10.0)) {
1245  if (checkPortsAvailable(&connector)) {
1246  break;
1247  }
1248  }
1249 
1250  CnnIterator cnn;
1251  for(cnn=connections.begin(); cnn!=connections.end(); cnn++)
1252  {
1253  //if(!(*cnn).owner() )
1254  //{
1255  if( !connector.connect((*cnn).from(), (*cnn).to(),
1256  (*cnn).carrier()) )
1257  {
1258  logger->addError(connector.error());
1259  //cout<<connector.error()<<endl;
1260  return false;
1261  }
1262  //}
1263  }
1264  return true;
1265 }
1266 
1267 bool Manager::running(unsigned int id)
1268 {
1269  if(id>=runnables.size())
1270  {
1271  logger->addError("Module id is out of range.");
1272  return false;
1273  }
1274 
1275  RSTATE st = runnables[id]->state();
1276  if ((st == RUNNING) || (st == CONNECTING) || (st == DYING)) {
1277  return true;
1278  }
1279  return false;
1280 }
1281 
1282 
1283 bool Manager::allRunning()
1284 {
1285  if (!runnables.size()) {
1286  return false;
1287  }
1288  ExecutablePIterator itr;
1289  for(itr=runnables.begin(); itr!=runnables.end(); itr++)
1290  {
1291  RSTATE st = (*itr)->state();
1292  if ((st != RUNNING) && (st != CONNECTING) && (st != DYING)) {
1293  return false;
1294  }
1295  }
1296  return true;
1297 }
1298 
1299 
1300 bool Manager::suspended(unsigned int id)
1301 {
1302  if(id>=runnables.size())
1303  {
1304  logger->addError("Module id is out of range.");
1305  return false;
1306  }
1307  RSTATE st = runnables[id]->state();
1308  if ((st == SUSPENDED) || (st == DEAD)) {
1309  return true;
1310  }
1311  return false;
1312 }
1313 
1314 
1315 bool Manager::allStopped()
1316 {
1317  if (!runnables.size()) {
1318  return true;
1319  }
1320  ExecutablePIterator itr;
1321  for(itr=runnables.begin(); itr!=runnables.end(); itr++)
1322  {
1323  RSTATE st = (*itr)->state();
1324  if ((st != SUSPENDED) && (st != DEAD)) {
1325  return false;
1326  }
1327  }
1328  return true;
1329 }
1330 
1331 bool Manager::attachStdout(unsigned int id)
1332 {
1333  if(id>=runnables.size())
1334  {
1335  logger->addError("Module id is out of range.");
1336  return false;
1337  }
1338 
1339  if(!runnables[id]->getBroker()->attachStdout())
1340  {
1341  OSTRINGSTREAM msg;
1342  msg<<"Cannot attach to stdout of "<<runnables[id]->getCommand();
1343  msg<<" on "<<runnables[id]->getHost();
1344  msg<<". (State: "<<runnables[id]->state();
1345  msg<<", paramete: "<<runnables[id]->getParam()<<") ";
1346  msg<<"because "<<runnables[id]->getBroker()->error();
1347  logger->addError(msg);
1348  return false;
1349  }
1350  return true;
1351 }
1352 
1353 bool Manager::detachStdout(unsigned int id)
1354 {
1355  if(id>=runnables.size())
1356  {
1357  logger->addError("Module id is out of range.");
1358  return false;
1359  }
1360 
1361  runnables[id]->getBroker()->detachStdout();
1362  return true;
1363 }
1364 
1365 bool Manager::timeout(double base, double t)
1366 {
1368  if ((yarp::os::SystemClock::nowSystem() - base) > t) {
1369  return true;
1370  }
1371  return false;
1372 }
1373 
1374 
1375 void Manager::onExecutableStart(void* which) {}
1376 void Manager::onExecutableStop(void* which) {}
1377 void Manager::onCnnStablished(void* which) {}
1378 void Manager::onExecutableDied(void* which) {}
1379 void Manager::onExecutableFailed(void* which) {}
1380 void Manager::onExecutableStdout(void* which, const char* msg) {}
1381 void Manager::onCnnFailed(void* which) {}
1382 void Manager::onError(void* which) {}
1383 
1384 
1385 /*
1386 bool Manager::loadModule(const char* szModule, const char* szHost)
1387 {
1388  __CHECK_NULLPTR(szModule);
1389  strAppName = szModule;
1390 
1391  SingleAppLoader appLoader(szModule, szHost);
1392  if(!appLoader.init())
1393  {
1394  logger->addError("Error initializing SingleAppLoader.");
1395  return false;
1396  }
1397 
1398  if(!createKnowledgeBase(appLoader))
1399  {
1400  logger->addError("Cannot create knowledge base");
1401  return false;
1402  }
1403 
1404  return prepare();
1405 
1406 }
1407 */
float t
bool ret
#define yError(...)
Definition: Log.h:279
static RFModule * module
Definition: RFModule.cpp:231
Class Application.
Definition: application.h:289
const char * getXmlFile()
Definition: application.h:338
Class Broker.
Definition: broker.h:31
void setDisplay(const char *szDisplay)
Definition: broker.h:64
virtual bool exists(const char *port)=0
void addError(const char *szError)
Definition: utility.cpp:118
void addWarning(const char *szWarning)
Definition: utility.cpp:104
static ErrorLogger * Instance()
Singleton class ErrorLogger.
Definition: utility.cpp:98
Class Executable.
Definition: executable.h:72
void setParam(const char *val)
Definition: executable.h:83
void setWorkDir(const char *val)
Definition: executable.h:86
void setHost(const char *val)
Definition: executable.h:84
void setStdio(const char *val)
Definition: executable.h:85
const char * getHost()
Definition: executable.h:104
void setEnv(const char *val)
Definition: executable.h:87
void setAndInitializeBroker(Broker *_broker)
Definition: executable.cpp:210
void setAvailability(bool flag)
Definition: resource.h:23
void setName(const char *szName)
Definition: resource.h:28
GenericResource * getResource(const char *szName)
Definition: kbase.h:90
bool removeModule(Module *module)
Definition: kbase.cpp:177
bool removeResource(GenericResource *resource)
Definition: kbase.cpp:182
Module * getModule(const char *szName)
Definition: kbase.h:82
bool reasolveDependency(const char *szName, bool bAutoDependancy=false, bool bSilent=false)
Definition: kbase.cpp:894
bool addResource(GenericResource *resource)
Definition: kbase.cpp:144
Node * getNode(std::string appName)
Definition: kbase.cpp:1664
const ResourcePContainer & getSelResources()
Definition: kbase.h:70
bool addModule(Module *module)
Definition: kbase.cpp:120
bool removeApplication(Application *application)
Definition: kbase.cpp:172
Application * getApplication()
Definition: kbase.h:85
bool createFrom(ModuleLoader *_mloader, AppLoader *_apploader, ResourceLoader *_resloader)
Definition: kbase.cpp:19
const ResourcePContainer & getResources(Application *parent=nullptr)
Definition: kbase.cpp:261
const CnnContainer & getSelConnection()
Definition: kbase.h:69
bool addApplication(Application *application, char **szAppName_=nullptr, bool modifyName=false)
Definition: kbase.cpp:76
const ModulePContainer & getSelModules()
Definition: kbase.h:68
bool saveApplication(AppSaver *appSaver, Application *application)
Definition: kbase.cpp:1304
Class LocalBroker.
Definition: localbroker.h:34
void onExecutableStdout(void *which, const char *msg) override
Definition: manager.cpp:1380
void onExecutableDied(void *which) override
Definition: manager.cpp:1378
void onExecutableStop(void *which) override
Definition: manager.cpp:1376
Manager(bool withWatchDog=false)
Class Manager.
Definition: manager.cpp:38
bool removeApplication(const char *szFileName, const char *szAppName)
Definition: manager.cpp:184
bool updateResource(const char *szName)
Definition: manager.cpp:606
bool detachStdout(unsigned int id)
Definition: manager.cpp:1353
void onCnnFailed(void *which) override
Definition: manager.cpp:1381
bool addModule(const char *szFileName)
Definition: manager.cpp:127
bool updateConnection(unsigned int id, const char *from, const char *to, const char *carrier)
Definition: manager.cpp:456
bool attachStdout(unsigned int id)
Definition: manager.cpp:1331
bool removeModule(const char *szModName)
Definition: manager.cpp:202
bool existPortFrom(unsigned int id)
Definition: manager.cpp:731
bool addModules(const char *szPath)
Definition: manager.cpp:141
bool addResources(const char *szPath)
Definition: manager.cpp:170
Node * getNode(std::string appName)
Definition: manager.cpp:483
bool addApplication(const char *szFileName, char **szAppName_=nullptr, bool modifyName=false)
Definition: manager.cpp:90
bool exist(unsigned int id)
Definition: manager.cpp:489
bool existPortTo(unsigned int id)
Definition: manager.cpp:752
void onError(void *which) override
Definition: manager.cpp:1382
bool switchBroker(size_t id)
Definition: manager.cpp:295
bool addResource(const char *szFileName)
Definition: manager.cpp:155
bool updateExecutable(unsigned int id, const char *szparam, const char *szhost, const char *szstdio, const char *szworkdir, const char *szenv)
Definition: manager.cpp:430
bool waitingModuleKill(unsigned int id)
Definition: manager.cpp:711
bool waitingModuleStop(unsigned int id)
Definition: manager.cpp:693
bool removeResource(const char *szResName)
Definition: manager.cpp:220
bool addApplications(const char *szPath)
Definition: manager.cpp:110
void onExecutableFailed(void *which) override
Definition: manager.cpp:1379
~Manager() override
Definition: manager.cpp:83
bool loadApplication(const char *szAppName)
Definition: manager.cpp:240
bool saveApplication(const char *szAppName, const char *fileName=nullptr)
Definition: manager.cpp:266
void onCnnStablished(void *which) override
Definition: manager.cpp:1377
bool waitingModuleRun(unsigned int id)
Definition: manager.cpp:673
Executable * getExecutableById(size_t id)
Definition: manager.cpp:283
void onExecutableStart(void *which) override
Definition: manager.cpp:1375
Class Module.
Definition: module.h:100
a Node of a Graph
Definition: node.h:65
void setPort(const char *szPort)
Definition: logicresource.h:49
Class XmlAppLoader.
Definition: xmlapploader.h:21
Application * getNextApplication() override
Class XmlAppSaver.
Definition: xmlappsaver.h:20
Class XmlModLoader.
Definition: xmlmodloader.h:21
Module * getNextModule() override
Class XmlResLoader.
Definition: xmlresloader.h:22
GenericResource * getNextResource() override
const char * error() override
Definition: yarpbroker.cpp:807
bool connected(const char *from, const char *to, const char *carrier) override
Definition: yarpbroker.cpp:579
bool getAllPorts(std::vector< std::string > &stingList)
Definition: yarpbroker.cpp:651
bool init() override
Definition: yarpbroker.cpp:61
bool getSystemInfo(const char *server, yarp::os::SystemInfoSerializer &info)
Definition: yarpbroker.cpp:591
bool connect(const char *from, const char *to, const char *carrier, bool persist=false) override
connection broker
Definition: yarpbroker.cpp:411
bool rmconnect(const char *from, const char *to)
Definition: yarpbroker.cpp:732
bool setQos(const char *from, const char *to, const char *qosFrom, const char *qosTo)
Definition: yarpbroker.cpp:746
bool disconnect(const char *from, const char *to, const char *carrier) override
Definition: yarpbroker.cpp:474
bool exists(const char *port) override
Definition: yarpbroker.cpp:524
A simple collection of objects that can be described and transmitted in a portable way.
Definition: Bottle.h:74
size_type size() const
Gets the number of elements in the bottle.
Definition: Bottle.cpp:251
Value & get(size_type index) const
Reads a Value v from a certain part of the list.
Definition: Bottle.cpp:246
void addString(const char *str)
Places a string in the bottle, at the end of the list.
Definition: Bottle.cpp:170
static double nowSystem()
Definition: SystemClock.cpp:34
static void delaySystem(double seconds)
Definition: SystemClock.cpp:29
A helper class to pass the SystemInfo object around the YARP network.
yarp::os::SystemInfo::LoadInfo load
current cpu load information
yarp::os::SystemInfo::StorageInfo storage
system storage information
yarp::os::SystemInfo::PlatformInfo platform
operating system information
yarp::os::SystemInfo::MemoryInfo memory
system memory information
yarp::os::SystemInfo::ProcessorInfo processor
system processor type information
virtual bool asBool() const
Get boolean value.
Definition: Value.cpp:186
static NameClient & getNameClient()
Get an instance of the name client.
Definition: NameClient.cpp:125
std::string send(const std::string &cmd, bool multi=true, const ContactStyle &style=ContactStyle())
Send a text message to the nameserver, and return the result.
Definition: NameClient.cpp:299
#define KILL_TIMEOUT
Definition: manager.cpp:24
#define BROKER_YARPDEV
Definition: manager.cpp:28
#define RUN_TIMEOUT
Definition: manager.cpp:22
#define STOP_TIMEOUT
Definition: manager.cpp:23
#define BROKER_YARPRUN
Definition: manager.cpp:27
bool compareString(const char *szFirst, const char *szSecond)
Definition: utility.cpp:310
std::vector< Executable * >::iterator ExecutablePIterator
Definition: executable.h:168
enum yarp::manager::__RSTATE RSTATE
std::vector< GenericResource * >::iterator ResourcePIterator
Definition: resource.h:60
std::vector< Connection >::iterator CnnIterator
Definition: application.h:151
std::stringstream OSTRINGSTREAM
Definition: utility.h:49
std::vector< GenericResource * > ResourcePContainer
Definition: resource.h:59
std::vector< Module * >::iterator ModulePIterator
Definition: module.h:232
double now()
Return the current time in seconds, relative to an arbitrary starting point.
Definition: Time.cpp:121
#define __CHECK_NULLPTR(_ptr)
Definition: ymm-types.h:80