YARP
Yet Another Robot Platform
yarpbroker.cpp
Go to the documentation of this file.
1 /*
2  * SPDX-FileCopyrightText: 2006-2021 Istituto Italiano di Tecnologia (IIT)
3  * SPDX-License-Identifier: BSD-3-Clause
4  */
5 
7 
8 #include <csignal>
9 #include <cstring>
10 
11 #define YARPRUN_OK 0
12 #define YARPRUN_NORESPONSE 1
13 #define YARPRUN_NOCONNECTION 2
14 #define YARPRUN_CONNECTION_TIMOUT 3
15 #define YARPRUN_SEMAPHORE_PARAM 4
16 #define YARPRUN_UNDEF 5
17 
18 #define CONNECTION_TIMEOUT 2.0 //seconds
19 #define RUN_TIMEOUT 10.0 //seconds
20 #define STOP_TIMEOUT 15.0
21 #define KILL_TIMEOUT 10.0
22 #define EVENT_THREAD_PERIOD 0.5 //seconds
23 
24 #if defined(_WIN32)
25  #define SIGKILL 9
26 #endif
27 
28 const char* yarprun_err_msg[] = { " (Ok) ",
29  " (Remote host does not respond) ",
30  " (Remote host does no exist) ",
31  " (Timeout while connecting to the remote host) ",
32  " (Blocked in broker semaphor) ",
33  " (Undefined message) " };
34 
35 using namespace yarp::os;
36 using namespace yarp::os::impl;
37 using namespace yarp::manager;
38 
39 
40 YarpBroker::YarpBroker() : PeriodicThread(EVENT_THREAD_PERIOD)
41 {
42  bOnlyConnector = bInitialized = false;
43  ID = generateID();
44  strStdioUUID.clear();
45 }
46 
47 
49 {
50  fini();
51 }
52 
54 {
55  if (PeriodicThread::isRunning()) {
56  PeriodicThread::stop();
57  }
58  //port.close();
59 }
60 
62 {
63  //if(bInitialized)
64  // return true;
65 
66  if(!NetworkBase::checkNetwork(CONNECTION_TIMEOUT))
67  {
68  strError = "YARP network server is not up.";
69  return false;
70  }
71  bInitialized = true;
72  bOnlyConnector = true;
73 
74  /*
75  semParam.wait();
76  __trace_message = "(init) opening port ...";
77  port.setTimeout(CONNECTION_TIMEOUT);
78  port.open("...");
79  __trace_message.clear();
80  semParam.post();
81  */
82  return true;
83 }
84 
85 bool YarpBroker::init(const char* szcmd, const char* szparam,
86  const char* szhost, const char* szstdio,
87  const char* szworkdir, const char* szenv )
88 {
89  //if(bInitialized)
90  // return true;
91 
92  semParam.wait();
93 
94  strCmd.clear();
95  strParam.clear();
96  strHost.clear();
97  strStdio.clear();
98  strWorkdir.clear();
99  strTag.clear();
100  strEnv.clear();
101 
102  if(!szcmd)
103  {
104  strError = "command is not specified.";
105  semParam.post();
106  return false;
107  }
108 
109  if(!szhost)
110  {
111  strError = "remote host port is not specified.";
112  semParam.post();
113  return false;
114  }
115 
116  if (szhost[0] != '/') {
117  strHost = std::string("/") + std::string(szhost);
118  } else {
119  strHost = szhost;
120  }
121 
122  strCmd = szcmd;
123  if (strlen(szparam)) {
124  strParam = szparam;
125  }
126  if (strlen(szworkdir)) {
127  strWorkdir = szworkdir;
128  }
129 
130  if(strlen(szstdio))
131  {
132  if (szstdio[0] != '/') {
133  strStdio = std::string("/") + std::string(szstdio);
134  } else {
135  strStdio = szstdio;
136  }
137  }
138 
139  if (strlen(szenv)) {
140  strEnv = szenv;
141  }
142 
143  OSTRINGSTREAM sstrID;
144  sstrID<<(int)ID;
145  strTag = strHost + strCmd + strParam + strEnv + sstrID.str();
146  std::string::iterator itr;
147  for (itr = strTag.begin(); itr != strTag.end(); itr++) {
148  if (((*itr) == ' ') || ((*itr) == '/')) {
149  (*itr) = ':';
150  }
151  }
152 
153  __trace_message = "(init) checking yarp network";
154  if(!NetworkBase::checkNetwork(5.0))
155  {
156  strError = "YARP network server is not up.";
157  __trace_message.clear();
158  semParam.post();
159  return false;
160  }
161  __trace_message = std::string("(init) checking existence of ") + strHost;
162  if(!exists(strHost.c_str()))
163  {
164  strError = szhost;
165  strError += " does not exist. check yarprun is running as server.";
166  __trace_message.clear();
167  semParam.post();
168  return false;
169  }
170 
171  /*
172  port.setTimeout(CONNECTION_TIMEOUT);
173  __trace_message = "(init) opening port ...";
174  port.open("...");
175  __trace_message.clear();
176  */
177 
178  bInitialized = true;
179  semParam.post();
180 
181  return true;
182 }
183 
184 
186 {
187  if (!bInitialized) {
188  return false;
189  }
190  if (bOnlyConnector) {
191  return false;
192  }
193 
194  strError.clear();
195  int ret = requestServer(runProperty());
196  if(ret != YARPRUN_OK)
197  {
198  strError = "cannot ask ";
199  strError += strHost;
200  strError += " to run ";
201  strError += strCmd;
202  strError += yarprun_err_msg[ret];
203  if (ret == YARPRUN_SEMAPHORE_PARAM) {
204  strError += std::string(" due to " + __trace_message);
205  }
206  return false;
207  }
208 
209  double base = SystemClock::nowSystem();
210  while(!timeout(base, RUN_TIMEOUT))
211  {
212  if(running() == 1)
213  {
214  if(strStdioUUID.size())
215  {
216  if (PeriodicThread::isRunning()) {
217  PeriodicThread::stop();
218  }
219  PeriodicThread::start();
220  }
221  return true;
222  }
223  }
224 
225  strError = "cannot run ";
226  strError += strCmd;
227  strError += " on ";
228  strError += strHost;
229  return false;
230 }
231 
233 {
234  if (!bInitialized) {
235  return true;
236  }
237  if (bOnlyConnector) {
238  return false;
239  }
240 
241  strError.clear();
242  yarp::os::Bottle msg,grp,response;
243 
244  grp.clear();
245  grp.addString("on");
246  grp.addString(strHost.c_str());
247  msg.addList()=grp;
248  grp.clear();
249  grp.addString("sigterm");
250  grp.addString(strTag.c_str());
251  msg.addList()=grp;
252  int ret = SendMsg(msg, strHost, response, CONNECTION_TIMEOUT);
253  if(ret != YARPRUN_OK)
254  {
255  strError = "cannot ask ";
256  strError += strHost;
257  strError += " to stop ";
258  strError += strCmd;
259  strError += yarprun_err_msg[ret];
260  if (ret == YARPRUN_SEMAPHORE_PARAM) {
261  strError += std::string(" due to " + __trace_message);
262  }
263  return false;
264  }
265 
266  double base = SystemClock::nowSystem();
267  while(!timeout(base, STOP_TIMEOUT))
268  {
269  if(running() == 0)
270  {
271  PeriodicThread::stop();
272  return true;
273  }
274  }
275 
276  strError = "Timeout! Cannot stop ";
277  strError += strCmd;
278  strError += " on ";
279  strError += strHost;
280  PeriodicThread::stop();
281  return false;
282 }
283 
285 {
286  if (!bInitialized) {
287  return true;
288  }
289  if (bOnlyConnector) {
290  return false;
291  }
292 
293  strError.clear();
294 
295  yarp::os::Bottle msg,grp,response;
296  grp.clear();
297  grp.addString("on");
298  grp.addString(strHost.c_str());
299  msg.addList() = grp;
300  grp.clear();
301  grp.addString("kill");
302  grp.addString(strTag.c_str());
303  grp.addInt32(SIGKILL);
304  msg.addList() = grp;
305  int ret = SendMsg(msg, strHost, response, CONNECTION_TIMEOUT);
306  if(ret != YARPRUN_OK)
307  {
308  strError = "cannot ask ";
309  strError += strHost;
310  strError += " to kill ";
311  strError += strCmd;
312  strError += yarprun_err_msg[ret];
313  if (ret == YARPRUN_SEMAPHORE_PARAM) {
314  strError += std::string(" due to " + __trace_message);
315  }
316  return false;
317  }
318 
319  double base = SystemClock::nowSystem();
320  while(!timeout(base, KILL_TIMEOUT))
321  {
322  if(running() == 0)
323  {
324  PeriodicThread::stop();
325  return true;
326  }
327  }
328 
329  strError = "cannot kill ";
330  strError += strCmd;
331  strError += " on ";
332  strError += strHost;
333  PeriodicThread::stop();
334  return false;
335 }
336 
337 
339 {
340  if (!bInitialized) {
341  return -1;
342  }
343  if (bOnlyConnector) {
344  return -1;
345  }
346 
347  strError.clear();
348  yarp::os::Bottle msg,grp,response;
349 
350  grp.clear();
351  grp.addString("on");
352  grp.addString(strHost.c_str());
353  msg.addList()=grp;
354 
355  grp.clear();
356  grp.addString("isrunning");
357  grp.addString(strTag.c_str());
358  msg.addList()=grp;
359 
360  int ret = SendMsg(msg, strHost, response, 3.0);
361  if(ret != YARPRUN_OK)
362  {
363  strError = "cannot ask ";
364  strError += strHost;
365  strError += " to check for status of ";
366  strError += strCmd;
367  strError += yarprun_err_msg[ret];
368  if (ret == YARPRUN_SEMAPHORE_PARAM) {
369  strError += std::string(" due to " + __trace_message);
370  }
371  return -1;
372  }
373  return ((response.get(0).asString() == "running")?1:0);
374 }
375 
376 
378 {
379  return true;
380 }
381 
383 {
384 }
385 
386 
387 Property& YarpBroker::runProperty()
388 {
389  command.clear();
390  std::string cmd = strCmd + std::string(" ") + strParam;
391  command.put("cmd", cmd);
392  command.put("on", strHost);
393  command.put("as", strTag);
394  if (!strWorkdir.empty()) {
395  command.put("workdir", strWorkdir);
396  }
397  if (!strStdio.empty()) {
398  command.put("stdio", strStdio);
399  }
400  if (!strEnv.empty()) {
401  command.put("env", strEnv);
402  }
403  //command.put("hold", "hold");
404  return command;
405 }
406 
407 
411 bool YarpBroker::connect(const char* from, const char* to,
412  const char* carrier, bool persist)
413 {
414  if(!from)
415  {
416  strError = "no source port is introduced.";
417  return false;
418  }
419 
420  if(!to)
421  {
422  strError = "no destination port is introduced.";
423  return false;
424  }
425 
426  ContactStyle style;
427  style.quiet = true;
428  style.timeout = CONNECTION_TIMEOUT;
429  style.carrier = carrier;
430 
431  if(!persist)
432  {
433  /*
434  * TODO: this check should be removed and
435  * the necessary modification should be done inside NetworkBase::isConnected!!!
436  */
437  std::string strCarrier = carrier;
438  bool needDisconnect = strCarrier.find("udp") == (size_t)0;
439  needDisconnect |= strCarrier.find("mcast") == (size_t)0;
440  if(needDisconnect == false) {
441  if (NetworkBase::isConnected(from, to, style)) {
442  return true;
443  }
444  }
445 
446  NetworkBase::connect(from, to, style);
447  if(!connected(from, to, carrier))
448  {
449  strError = "cannot connect ";
450  strError +=from;
451  strError += " to " + std::string(to);
452  return false;
453  }
454  }
455  else
456  {
457  std::string topic = std::string("topic:/") + std::string(from) + std::string(to);
458  NetworkBase::connect(from, topic, style);
459  NetworkBase::connect(topic, to, style);
460  if(!connected(from, to, carrier))
461  {
462  strError = "a persistent connection from ";
463  strError +=from;
464  strError += " to " + std::string(to);
465  strError += " is created but not connected.";
466  return false;
467  }
468 
469  }
470 
471  return true;
472 }
473 
474 bool YarpBroker::disconnect(const char* from, const char* to, const char* carrier)
475 {
476 
477  if(!from)
478  {
479  strError = "no source port is introduced.";
480  return false;
481  }
482 
483  if(!to)
484  {
485  strError = "no destination port is introduced.";
486  return false;
487  }
488 
489  /*
490  if(!exists(from))
491  {
492  strError = from;
493  strError += " does not exist.";
494  return true;
495  }
496 
497  if(!exists(to))
498  {
499  strError = to;
500  strError += " does not exist.";
501  return true;
502  }
503  */
504 
505  if (!connected(from, to, carrier)) {
506  return true;
507  }
508 
509  ContactStyle style;
510  style.quiet = true;
511  style.timeout = CONNECTION_TIMEOUT;
512  style.carrier = carrier;
513  if(!NetworkBase::disconnect(from, to, style))
514  {
515  strError = "cannot disconnect ";
516  strError +=from;
517  strError += " from " + std::string(to);
518  return false;
519  }
520  return true;
521 
522 }
523 
524 bool YarpBroker::exists(const char* szport)
525 {
526  ContactStyle style;
527  style.quiet = true;
528  style.timeout = CONNECTION_TIMEOUT;
529  return NetworkBase::exists(szport, style);
530 }
531 
532 const char* YarpBroker::requestRpc(const char* szport, const char* request, double timeout)
533 {
534  if ((szport == nullptr) || (request == nullptr)) {
535  return nullptr;
536  }
537 
538  if (!exists(szport)) {
539  return nullptr;
540  }
541 
542  // opening the port
543  yarp::os::Port port;
544  port.setTimeout((float)((timeout>0.0) ? timeout : CONNECTION_TIMEOUT));
545  if (!port.open("...")) {
546  return nullptr;
547  }
548 
549  ContactStyle style;
550  style.quiet = true;
551  style.timeout = (timeout>0.0) ? timeout : CONNECTION_TIMEOUT;
552  bool ret;
553  for(int i=0; i<10; i++) {
554  ret = NetworkBase::connect(port.getName(), szport, style);
555  if (ret) {
556  break;
557  }
558  SystemClock::delaySystem(1.0);
559  }
560 
561  if(!ret) {
562  port.close();
563  return nullptr;
564  }
565 
566  Bottle msg, response;
567  msg.fromString(request);
568  ret = port.write(msg, response);
569  NetworkBase::disconnect(port.getName(), szport);
570  if(!response.size() || !ret) {
571  port.close();
572  return nullptr;
573  }
574 
575  port.close();
576  return response.toString().c_str();
577 }
578 
579 bool YarpBroker::connected(const char* from, const char* to, const char* carrier)
580 {
581  if (!exists(from) || !exists(to)) {
582  return false;
583  }
584  ContactStyle style;
585  style.quiet = true;
586  style.timeout = CONNECTION_TIMEOUT;
587  style.carrier = carrier;
588  return NetworkBase::isConnected(from, to, style);
589 }
590 
591 bool YarpBroker::getSystemInfo(const char* server, SystemInfoSerializer& info)
592 {
593  if (!strlen(server)) {
594  return false;
595  }
596  if (!semParam.check()) {
597  return false;
598  }
599 
600  yarp::os::Port port;
601  // opening the port
603  if(!port.open("...")) {
604  __trace_message.clear();
605  semParam.post();
606  return false;
607  }
608 
609  yarp::os::Bottle msg, grp;
610  grp.clear();
611  grp.addString("sysinfo");
612  msg.addList() = grp;
613 
614  ContactStyle style;
615  style.quiet = true;
616  style.timeout = CONNECTION_TIMEOUT;
617  //style.carrier = carrier;
618 
619 
620  __trace_message = "(getSystemInfo) connecting to " + std::string(port.getName());
621  bool connected = yarp::os::NetworkBase::connect(port.getName(), server, style);
622  if(!connected)
623  {
624  port.close();
625  strError = std::string("Cannot connect to ") + std::string(server);
626  __trace_message.clear();
627  semParam.post();
628  return false;
629  }
630 
631  __trace_message = "(getSystemInfo) writing to " + std::string(port.getName());
632  bool ret = port.write(msg, info);
633  __trace_message = "(getSystemInfo) disconnecting from " + std::string(port.getName());
634  NetworkBase::disconnect(port.getName(), server);
635 
636  if(!ret)
637  {
638  port.close();
639  strError = std::string(server) + std::string(" does not respond");
640  __trace_message.clear();
641  semParam.post();
642  return false;
643  }
644 
645  port.close();
646  __trace_message.clear();
647  semParam.post();
648  return true;
649 }
650 
651 bool YarpBroker::getAllPorts(std::vector<std::string> &ports)
652 {
653  ContactStyle style;
654  style.quiet = true;
655  style.timeout = CONNECTION_TIMEOUT;
656  Bottle cmd, reply;
657  cmd.addString("list");
658 
659  bool ret = NetworkBase::writeToNameServer(cmd, reply, style);
660  if (!ret)
661  {
662  strError = "Failed to reach name server\n";
663  return false;
664  }
665 
666  if ((reply.size() != 1) || (!reply.get(0).isString())) {
667  return false;
668  }
669 
670  std::string str = reply.get(0).asString();
671  const char* delm = "registration name ";
672  size_t pos1, pos2;
673  while((pos1 = str.find(delm)) != std::string::npos)
674  {
675  str = str.substr(pos1+strlen(delm));
676  if ((pos2 = str.find(' ')) != std::string::npos) {
677  ports.push_back(str.substr(0, pos2));
678  }
679  }
680 
681  return true;
682 }
683 
684 bool YarpBroker::getAllProcesses(const char* server,
685  ProcessContainer& processes)
686 {
687  if (!strlen(server)) {
688  return false;
689  }
690 
691  processes.clear();
692  strError.clear();
693  yarp::os::Bottle msg,grp,response;
694 
695  grp.clear();
696  grp.addString("ps");
697  msg.addList()=grp;
698 
699  int ret = SendMsg(msg, server, response, 3.0);
700  if((ret == YARPRUN_OK) || (ret == YARPRUN_NORESPONSE))
701  {
702  for(size_t i=0; i<response.size(); i++)
703  {
704  Process proc;
705  std::string sprc;
706  if (response.get(i).check("pid")) {
707  proc.pid = response.get(i).find("pid").asInt32();
708  }
709  if (response.get(i).check("cmd")) {
710  sprc = response.get(i).find("cmd").asString();
711  }
712  if (response.get(i).check("env") && response.get(i).find("env").asString().length()) {
713  sprc.append("; ").append(response.get(i).find("env").asString());
714  }
715  proc.command = sprc;
716  processes.push_back(proc);
717  }
718  return true;
719  }
720 
721  strError = "cannot ask ";
722  strError += server;
723  strError += " to give the list of running processes.";
724  strError += yarprun_err_msg[ret];
725  if (ret == YARPRUN_SEMAPHORE_PARAM) {
726  strError += std::string(" due to " + __trace_message);
727  }
728  return false;
729 }
730 
731 
732 bool YarpBroker::rmconnect(const char* from, const char* to)
733 {
734  std::string topic = std::string(from) + std::string(to);
735  Bottle cmd, reply;
736  cmd.addString("untopic");
737  cmd.addString(topic.c_str());
738  return NetworkBase::write(NetworkBase::getNameServerContact(),
739  cmd,
740  reply,
741  false,
742  true,
744 }
745 
746 bool YarpBroker::setQos(const char* from, const char *to,
747  const char *qosFrom, const char *qosTo) {
748  strError.clear();
749 
750  if (qosFrom && qosTo && !strlen(qosFrom) && !strlen(qosTo)) {
751  return true;
752  }
753 
754  QosStyle styleFrom;
755  QosStyle styleTo;
756  if(qosFrom != nullptr && strlen(qosFrom)) {
757  if(!getQosFromString(qosFrom, styleFrom)) {
758  strError = "Error in parsing Qos properties of " + std::string(from);
759  return false;
760  }
761  }
762  if (qosTo != nullptr && strlen(qosTo)) {
763  if(!getQosFromString(qosTo, styleTo)) {
764  strError = "Error in parsing Qos properties of " + std::string(to);
765  return false;
766  }
767  }
768  return NetworkBase::setConnectionQos(from, to, styleFrom, styleTo, true);
769 }
770 
771 bool YarpBroker::getQosFromString(const char* qos, yarp::os::QosStyle& style) {
772  std::string strQos(qos);
773  transform(strQos.begin(), strQos.end(), strQos.begin(),
774  (int(*)(int))toupper);
775  strQos.erase( std::remove_if( strQos.begin(), strQos.end(), ::isspace ), strQos.end() );
776 
777  //level:high; priority:10; policy:1
778  std::stringstream ss(strQos); // Turn the string into a stream.
779  std::string prop;
780  while(getline(ss, prop, ';')) {
781  size_t p = prop.find(':');
782  if (p != prop.npos) {
783  std::string key = prop.substr(0, p);
784  std::string value = prop.substr(p+1);
785  if (key.length() > 0 && value.length() > 0) {
786  if (key == "LEVEL" || key=="DSCP" || key == "TOS") {
787  if (!style.setPacketPriority(prop)) {
788  return false;
789  }
790  }
791  else if (key == "PRIORITY") {
792  char* p;
793  int prio = strtol(value.c_str(), &p, 10);
794  style.setThreadPriority(prio);
795  }
796  else if (key == "POLICY") {
797  char* p;
798  int policy = strtol(value.c_str(), &p, 10);
799  style.setThreadPolicy(policy);
800  }
801  }
802  }
803  }
804  return true;
805 }
806 
807 const char* YarpBroker::error()
808 {
809  return strError.c_str();
810 }
811 
812 
813 bool YarpBroker::timeout(double base, double timeout)
814 {
815  SystemClock::delaySystem(1.0);
816  if ((SystemClock::nowSystem() - base) > timeout) {
817  return true;
818  }
819  return false;
820 }
821 
823 {
824  if (!strStdioUUID.size()) {
825  return false;
826  }
827 
828  std::string strStdioPort = strStdioUUID + "/stdout";
829  stdioPort.open("...");
830 
831  double base = SystemClock::nowSystem();
832  ContactStyle style;
833  style.quiet = true;
834  style.timeout = CONNECTION_TIMEOUT;
835  while(!timeout(base, 5.0))
836  {
837  if (NetworkBase::connect(strStdioPort, stdioPort.getName(), style)) {
838  return true;
839  }
840  }
841 
842  strError = "Cannot connect to stdio port ";
843  strError += strStdioPort;
844  stdioPort.close();
845  return false;
846 }
847 
848 
850 {
851  Bottle *input;
852  if( (input=stdioPort.read(false)) && eventSink)
853  {
854  for (size_t i = 0; i < input->size(); i++) {
855  eventSink->onBrokerStdout(input->get(i).asString().c_str());
856  }
857  }
858 }
859 
860 
862 {
863  NetworkBase::disconnect(stdioPort.getName(), strStdioUUID);
864  stdioPort.close();
865 }
866 
867 
868 int YarpBroker::SendMsg(Bottle& msg, std::string target, Bottle& response, float fTimeout)
869 {
870  if (!exists(target.c_str())) {
871  return YARPRUN_NOCONNECTION;
872  }
873 
874  if (!semParam.check()) {
876  }
877 
878  // opening the port
879  yarp::os::Port port;
880  port.setTimeout(fTimeout);
881  if(!port.open("..."))
882  {
883  __trace_message.clear();
884  semParam.post();
886  }
887 
888  ContactStyle style;
889  style.quiet = true;
890  style.timeout = CONNECTION_TIMEOUT;
891 
892  bool ret;
893  __trace_message = "(SendMsg) connecting to " + std::string(target);
894  for(int i=0; i<10; i++)
895  {
896  ret = NetworkBase::connect(port.getName(), target, style);
897  if (ret) {
898  break;
899  }
900  SystemClock::delaySystem(1.0);
901  }
902 
903  if(!ret)
904  {
905  port.close();
906  __trace_message.clear();
907  semParam.post();
909  }
910 
911  __trace_message = "(SendMsg) writing to " + std::string(target);
912  ret = port.write(msg, response);
913  __trace_message = "(SendMsg) disconnecting from " + std::string(target);
914  NetworkBase::disconnect(port.getName(),target);
915  __trace_message.clear();
916  semParam.post();
917 
918  if(!response.size() || !ret) {
919  port.close();
920  return YARPRUN_NORESPONSE;
921  }
922 
923  port.close();
924 
925  return YARPRUN_OK;
926 }
927 
928 
929 int YarpBroker::requestServer(Property& config)
930 {
931  yarp::os::Bottle msg;
932 
933  // USE A YARP RUN SERVER TO MANAGE STDIO
934  //
935  // client -> stdio server -> cmd server
936  //
937  if (config.check("cmd") && config.check("stdio"))
938  {
939  if (config.find("stdio").asString()=="") {return YARPRUN_UNDEF; }
940  if (config.find("cmd").asString()=="") {return YARPRUN_UNDEF; }
941  if (!config.check("as") || config.find("as").asString()=="") { return YARPRUN_UNDEF; }
942  if (!config.check("on") || config.find("on").asString()=="") { return YARPRUN_UNDEF; }
943 
944  msg.addList()=config.findGroup("stdio");
945  msg.addList()=config.findGroup("cmd");
946  msg.addList()=config.findGroup("as");
947  msg.addList()=config.findGroup("on");
948 
949  if (config.check("workdir")) {
950  msg.addList() = config.findGroup("workdir");
951  }
952  if (config.check("geometry")) {
953  msg.addList() = config.findGroup("geometry");
954  }
955  if (config.check("hold")) {
956  msg.addList() = config.findGroup("hold");
957  }
958  if (config.check("env")) {
959  msg.addList() = config.findGroup("env");
960  }
961 
962  Bottle response;
963  int ret = SendMsg(msg, config.find("stdio").asString(),
964  response, CONNECTION_TIMEOUT);
965  if (ret != YARPRUN_OK) {
966  return ret;
967  }
968 
969  if (response.size() > 2) {
970  strStdioUUID = response.get(2).asString();
971  }
972 
973  return ((response.get(0).asInt32()>0)?YARPRUN_OK:YARPRUN_UNDEF);
974  }
975 
976  // DON'T USE A RUN SERVER TO MANAGE STDIO
977  //
978  // client -> cmd server
979  //
980  if (config.check("cmd"))
981  {
982  if (config.find("cmd").asString()=="") { return YARPRUN_UNDEF; }
983  if (!config.check("as") || config.find("as").asString()=="") {return YARPRUN_UNDEF; }
984  if (!config.check("on") || config.find("on").asString()=="") {return YARPRUN_UNDEF; }
985 
986  msg.addList()=config.findGroup("cmd");
987  msg.addList()=config.findGroup("as");
988 
989  if (config.check("workdir")) {
990  msg.addList() = config.findGroup("workdir");
991  }
992  if (config.check("env")) {
993  msg.addList() = config.findGroup("env");
994  }
995 
996  Bottle response;
997  int ret = SendMsg(msg, config.find("on").asString(),
998  response, CONNECTION_TIMEOUT);
999  if (ret != YARPRUN_OK) {
1000  return ret;
1001  }
1002 
1003  return ((response.get(0).asInt32()>0)?YARPRUN_OK:YARPRUN_UNDEF);
1004  }
1005 
1006  return YARPRUN_UNDEF;
1007 }
bool ret
virtual void onBrokerStdout(const char *msg)
Definition: broker.h:23
BrokerEventSink * eventSink
Definition: broker.h:69
unsigned int generateID()
Definition: broker.cpp:26
void threadRelease() override
Release method.
Definition: yarpbroker.cpp:861
const char * error() override
Definition: yarpbroker.cpp:807
bool connected(const char *from, const char *to, const char *carrier) override
Definition: yarpbroker.cpp:579
bool getAllPorts(std::vector< std::string > &stingList)
Definition: yarpbroker.cpp:651
const char * requestRpc(const char *szport, const char *request, double timeout) override
Definition: yarpbroker.cpp:532
bool init() override
Definition: yarpbroker.cpp:61
bool getSystemInfo(const char *server, yarp::os::SystemInfoSerializer &info)
Definition: yarpbroker.cpp:591
bool getAllProcesses(const char *server, ProcessContainer &processes)
Definition: yarpbroker.cpp:684
void detachStdout() override
Definition: yarpbroker.cpp:382
bool connect(const char *from, const char *to, const char *carrier, bool persist=false) override
connection broker
Definition: yarpbroker.cpp:411
void fini() override
Definition: yarpbroker.cpp:53
bool rmconnect(const char *from, const char *to)
Definition: yarpbroker.cpp:732
bool threadInit() override
Initialization method.
Definition: yarpbroker.cpp:822
bool setQos(const char *from, const char *to, const char *qosFrom, const char *qosTo)
Definition: yarpbroker.cpp:746
bool disconnect(const char *from, const char *to, const char *carrier) override
Definition: yarpbroker.cpp:474
void run() override
Loop function.
Definition: yarpbroker.cpp:849
bool attachStdout() override
Definition: yarpbroker.cpp:377
bool exists(const char *port) override
Definition: yarpbroker.cpp:524
A simple collection of objects that can be described and transmitted in a portable way.
Definition: Bottle.h:74
void fromString(const std::string &text)
Initializes bottle from a string.
Definition: Bottle.cpp:204
Bottle & addList()
Places an empty nested list in the bottle, at the end of the list.
Definition: Bottle.cpp:182
size_type size() const
Gets the number of elements in the bottle.
Definition: Bottle.cpp:251
Value & get(size_type index) const
Reads a Value v from a certain part of the list.
Definition: Bottle.cpp:246
void clear()
Empties the bottle of any objects it contains.
Definition: Bottle.cpp:121
void addInt32(std::int32_t x)
Places a 32-bit integer in the bottle, at the end of the list.
Definition: Bottle.cpp:140
void addString(const char *str)
Places a string in the bottle, at the end of the list.
Definition: Bottle.cpp:170
std::string toString() const override
Gives a human-readable textual representation of the bottle.
Definition: Bottle.cpp:211
std::string getName() const override
Get name of port.
void close() override
Stop port activity.
bool open(const std::string &name) override
Start port operation, with a specific name, with automatically-chosen network parameters.
T * read(bool shouldWait=true) override
Read an available object from the port.
Preferences for how to communicate with a contact.
Definition: ContactStyle.h:24
double timeout
Set a timeout for communication (in units of seconds, fractional seconds allowed).
Definition: ContactStyle.h:47
bool quiet
Suppress all outputs and warnings.
Definition: ContactStyle.h:36
std::string carrier
Request that communication be made using a particular carrier.
Definition: ContactStyle.h:53
virtual std::string getName() const
Get name of port.
Definition: Contactable.cpp:14
static bool connect(const std::string &src, const std::string &dest, const std::string &carrier="", bool quiet=true)
Request that an output port connect to an input port.
Definition: Network.cpp:682
An abstraction for a periodic thread.
A mini-server for network communication.
Definition: Port.h:47
bool write(const PortWriter &writer, const PortWriter *callback=nullptr) const override
Write an object to the port.
Definition: Port.cpp:427
bool setTimeout(float timeout)
Set a timeout on network operations.
Definition: Port.cpp:625
void close() override
Stop port activity.
Definition: Port.cpp:354
bool open(const std::string &name) override
Start port operation, with a specific name, with automatically-chosen network parameters.
Definition: Port.cpp:79
A class for storing options and configuration information.
Definition: Property.h:34
Value & find(const std::string &key) const override
Gets a value corresponding to a given keyword.
Definition: Property.cpp:1051
void put(const std::string &key, const std::string &value)
Associate the given key with the given string.
Definition: Property.cpp:1015
bool check(const std::string &key) const override
Check if there exists a property of the given name.
Definition: Property.cpp:1041
void clear()
Remove all associations.
Definition: Property.cpp:1057
Bottle & findGroup(const std::string &key) const override
Gets a list corresponding to a given keyword.
Definition: Property.cpp:1142
Preferences for the port's Quality of Service.
Definition: QosStyle.h:24
void setThreadPriority(int priority)
sets the communication thread priority level
Definition: QosStyle.h:128
bool setPacketPriority(const std::string &priority)
sets the packet priority from a string.
Definition: QosStyle.cpp:39
void setThreadPolicy(int policy)
sets the communication thread scheduling policy
Definition: QosStyle.h:138
void wait()
Decrement the counter, even if we must wait to do that.
Definition: Semaphore.cpp:96
bool check()
Decrement the counter, unless that would require waiting.
Definition: Semaphore.cpp:106
void post()
Increment the counter.
Definition: Semaphore.cpp:111
A helper class to pass the SystemInfo object around the YARP network.
virtual bool isString() const
Checks if value is a string.
Definition: Value.cpp:156
virtual std::int32_t asInt32() const
Get 32-bit integer value.
Definition: Value.cpp:204
Value & find(const std::string &key) const override
Gets a value corresponding to a given keyword.
Definition: Value.cpp:327
bool check(const std::string &key) const override
Check if there exists a property of the given name.
Definition: Value.cpp:321
virtual std::string asString() const
Get string value.
Definition: Value.cpp:234
std::stringstream OSTRINGSTREAM
Definition: utility.h:49
std::vector< Process > ProcessContainer
Definition: primresource.h:169
The components from which ports and connections are built.
An interface to the operating system, including Port based communication.
bool write(const ImageOf< PixelRgb > &src, const std::string &dest, image_fileformat format=FORMAT_PPM)
Definition: ImageFile.cpp:1098
#define KILL_TIMEOUT
Definition: yarpbroker.cpp:21
#define CONNECTION_TIMEOUT
Definition: yarpbroker.cpp:18
#define YARPRUN_UNDEF
Definition: yarpbroker.cpp:16
#define YARPRUN_CONNECTION_TIMOUT
Definition: yarpbroker.cpp:14
#define YARPRUN_NORESPONSE
Definition: yarpbroker.cpp:12
#define YARPRUN_NOCONNECTION
Definition: yarpbroker.cpp:13
#define RUN_TIMEOUT
Definition: yarpbroker.cpp:19
#define YARPRUN_SEMAPHORE_PARAM
Definition: yarpbroker.cpp:15
#define STOP_TIMEOUT
Definition: yarpbroker.cpp:20
const char * yarprun_err_msg[]
Definition: yarpbroker.cpp:28
#define YARPRUN_OK
Definition: yarpbroker.cpp:11
#define EVENT_THREAD_PERIOD
Definition: yarpbroker.cpp:22