YARP
Yet Another Robot Platform
yarpbroker.cpp
Go to the documentation of this file.
1/*
2 * SPDX-FileCopyrightText: 2006-2021 Istituto Italiano di Tecnologia (IIT)
3 * SPDX-License-Identifier: BSD-3-Clause
4 */
5
7
8#include <csignal>
9#include <cstring>
10
11#define YARPRUN_OK 0
12#define YARPRUN_NORESPONSE 1
13#define YARPRUN_NOCONNECTION 2
14#define YARPRUN_CONNECTION_TIMOUT 3
15#define YARPRUN_SEMAPHORE_PARAM 4
16#define YARPRUN_UNDEF 5
17
18#define CONNECTION_TIMEOUT 2.0 //seconds
19#define RUN_TIMEOUT 10.0 //seconds
20#define STOP_TIMEOUT 15.0
21#define KILL_TIMEOUT 10.0
22#define EVENT_THREAD_PERIOD 0.5 //seconds
23
24#if defined(_WIN32)
25 #define SIGKILL 9
26#endif
27
28const char* yarprun_err_msg[] = { " (Ok) ",
29 " (Remote host does not respond) ",
30 " (Remote host does no exist) ",
31 " (Timeout while connecting to the remote host) ",
32 " (Blocked in broker semaphor) ",
33 " (Undefined message) " };
34
35using namespace yarp::os;
36using namespace yarp::os::impl;
37using namespace yarp::manager;
40YarpBroker::YarpBroker() : PeriodicThread(EVENT_THREAD_PERIOD)
41{
42 bOnlyConnector = bInitialized = false;
43 ID = generateID();
44 strStdioUUID.clear();
45}
46
47
49{
50 fini();
51}
52
54{
55 if (PeriodicThread::isRunning()) {
56 PeriodicThread::stop();
57 }
58 //port.close();
59}
60
62{
63 //if(bInitialized)
64 // return true;
65
66 if(!NetworkBase::checkNetwork(CONNECTION_TIMEOUT))
67 {
68 strError = "YARP network server is not up.";
69 return false;
70 }
71 bInitialized = true;
72 bOnlyConnector = true;
73
74 /*
75 semParam.wait();
76 __trace_message = "(init) opening port ...";
77 port.setTimeout(CONNECTION_TIMEOUT);
78 port.open("...");
79 __trace_message.clear();
80 semParam.post();
81 */
82 return true;
83}
84
85bool YarpBroker::init(const char* szcmd, const char* szparam,
86 const char* szhost, const char* szstdio,
87 const char* szworkdir, const char* szenv )
88{
89 //if(bInitialized)
90 // return true;
91
92 semParam.wait();
93
94 strCmd.clear();
95 strParam.clear();
96 strHost.clear();
97 strStdio.clear();
98 strWorkdir.clear();
99 strTag.clear();
100 strEnv.clear();
101
102 if(!szcmd)
103 {
104 strError = "command is not specified.";
105 semParam.post();
106 return false;
107 }
108
109 if(!szhost)
110 {
111 strError = "remote host port is not specified.";
112 semParam.post();
113 return false;
114 }
115
116 if (szhost[0] != '/') {
117 strHost = std::string("/") + std::string(szhost);
118 } else {
119 strHost = szhost;
120 }
121
122 strCmd = szcmd;
123 if (strlen(szparam)) {
124 strParam = szparam;
125 }
126 if (strlen(szworkdir)) {
127 strWorkdir = szworkdir;
128 }
129
130 if(strlen(szstdio))
131 {
132 if (szstdio[0] != '/') {
133 strStdio = std::string("/") + std::string(szstdio);
134 } else {
135 strStdio = szstdio;
136 }
137 }
138
139 if (strlen(szenv)) {
140 strEnv = szenv;
141 }
142
143 OSTRINGSTREAM sstrID;
144 sstrID<<(int)ID;
145 strTag = strHost + strCmd + strParam + strEnv + sstrID.str();
146 std::string::iterator itr;
147 for (itr = strTag.begin(); itr != strTag.end(); itr++) {
148 if (((*itr) == ' ') || ((*itr) == '/')) {
149 (*itr) = ':';
150 }
151 }
152
153 __trace_message = "(init) checking yarp network";
154 if(!NetworkBase::checkNetwork(5.0))
155 {
156 strError = "YARP network server is not up.";
157 __trace_message.clear();
158 semParam.post();
159 return false;
160 }
161 __trace_message = std::string("(init) checking existence of ") + strHost;
162 if(!exists(strHost.c_str()))
163 {
164 strError = szhost;
165 strError += " does not exist. check yarprun is running as server.";
166 __trace_message.clear();
167 semParam.post();
168 return false;
169 }
170
171 /*
172 port.setTimeout(CONNECTION_TIMEOUT);
173 __trace_message = "(init) opening port ...";
174 port.open("...");
175 __trace_message.clear();
176 */
177
178 bInitialized = true;
179 semParam.post();
180
181 return true;
182}
183
184
186{
187 if (!bInitialized) {
188 return false;
189 }
190 if (bOnlyConnector) {
191 return false;
192 }
193
194 strError.clear();
195 int ret = requestServer(runProperty());
196 if(ret != YARPRUN_OK)
197 {
198 strError = "cannot ask ";
199 strError += strHost;
200 strError += " to run ";
201 strError += strCmd;
202 strError += yarprun_err_msg[ret];
204 strError += std::string(" due to " + __trace_message);
205 }
206 return false;
207 }
208
209 double base = SystemClock::nowSystem();
210 while(!timeout(base, RUN_TIMEOUT))
211 {
212 if(running() == 1)
213 {
214 if(strStdioUUID.size())
215 {
216 if (PeriodicThread::isRunning()) {
217 PeriodicThread::stop();
218 }
219 PeriodicThread::start();
220 }
221 return true;
222 }
223 }
224
225 strError = "cannot run ";
226 strError += strCmd;
227 strError += " on ";
228 strError += strHost;
229 return false;
230}
231
233{
234 if (!bInitialized) {
235 return true;
236 }
237 if (bOnlyConnector) {
238 return false;
239 }
240
241 strError.clear();
242 yarp::os::Bottle msg,grp,response;
243
244 grp.clear();
245 grp.addString("on");
246 grp.addString(strHost.c_str());
247 msg.addList()=grp;
248 grp.clear();
249 grp.addString("sigterm");
250 grp.addString(strTag.c_str());
251 msg.addList()=grp;
252 int ret = SendMsg(msg, strHost, response, CONNECTION_TIMEOUT);
253 if(ret != YARPRUN_OK)
254 {
255 strError = "cannot ask ";
256 strError += strHost;
257 strError += " to stop ";
258 strError += strCmd;
259 strError += yarprun_err_msg[ret];
261 strError += std::string(" due to " + __trace_message);
262 }
263 return false;
264 }
265
266 double base = SystemClock::nowSystem();
267 while(!timeout(base, STOP_TIMEOUT))
268 {
269 if(running() == 0)
270 {
271 PeriodicThread::stop();
272 return true;
273 }
274 }
275
276 strError = "Timeout! Cannot stop ";
277 strError += strCmd;
278 strError += " on ";
279 strError += strHost;
280 PeriodicThread::stop();
281 return false;
282}
283
285{
286 if (!bInitialized) {
287 return true;
288 }
289 if (bOnlyConnector) {
290 return false;
291 }
292
293 strError.clear();
294
295 yarp::os::Bottle msg,grp,response;
296 grp.clear();
297 grp.addString("on");
298 grp.addString(strHost.c_str());
299 msg.addList() = grp;
300 grp.clear();
301 grp.addString("kill");
302 grp.addString(strTag.c_str());
303 grp.addInt32(SIGKILL);
304 msg.addList() = grp;
305 int ret = SendMsg(msg, strHost, response, CONNECTION_TIMEOUT);
306 if(ret != YARPRUN_OK)
307 {
308 strError = "cannot ask ";
309 strError += strHost;
310 strError += " to kill ";
311 strError += strCmd;
312 strError += yarprun_err_msg[ret];
314 strError += std::string(" due to " + __trace_message);
315 }
316 return false;
317 }
318
319 double base = SystemClock::nowSystem();
320 while(!timeout(base, KILL_TIMEOUT))
321 {
322 if(running() == 0)
323 {
324 PeriodicThread::stop();
325 return true;
326 }
327 }
328
329 strError = "cannot kill ";
330 strError += strCmd;
331 strError += " on ";
332 strError += strHost;
333 PeriodicThread::stop();
334 return false;
335}
336
337
339{
340 if (!bInitialized) {
341 return -1;
342 }
343 if (bOnlyConnector) {
344 return -1;
345 }
346
347 strError.clear();
348 yarp::os::Bottle msg,grp,response;
349
350 grp.clear();
351 grp.addString("on");
352 grp.addString(strHost.c_str());
353 msg.addList()=grp;
354
355 grp.clear();
356 grp.addString("isrunning");
357 grp.addString(strTag.c_str());
358 msg.addList()=grp;
359
360 int ret = SendMsg(msg, strHost, response, 3.0);
361 if(ret != YARPRUN_OK)
362 {
363 strError = "cannot ask ";
364 strError += strHost;
365 strError += " to check for status of ";
366 strError += strCmd;
367 strError += yarprun_err_msg[ret];
369 strError += std::string(" due to " + __trace_message);
370 }
371 return -1;
372 }
373 return ((response.get(0).asString() == "running")?1:0);
374}
375
376
378{
379 return true;
380}
381
383{
384}
385
386
387Property& YarpBroker::runProperty()
388{
389 command.clear();
390 std::string cmd = strCmd + std::string(" ") + strParam;
391 command.put("cmd", cmd);
392 command.put("on", strHost);
393 command.put("as", strTag);
394 if (!strWorkdir.empty()) {
395 command.put("workdir", strWorkdir);
396 }
397 if (!strStdio.empty()) {
398 command.put("stdio", strStdio);
399 }
400 if (!strEnv.empty()) {
401 command.put("env", strEnv);
402 }
403 //command.put("hold", "hold");
404 return command;
405}
406
407
411bool YarpBroker::connect(const char* from, const char* to,
412 const char* carrier, bool persist)
413{
414 if(!from)
415 {
416 strError = "no source port is introduced.";
417 return false;
418 }
419
420 if(!to)
421 {
422 strError = "no destination port is introduced.";
423 return false;
424 }
425
426 ContactStyle style;
427 style.quiet = true;
429 style.carrier = carrier;
430
431 if(!persist)
432 {
433 /*
434 * TODO: this check should be removed and
435 * the necessary modification should be done inside NetworkBase::isConnected!!!
436 */
437 std::string strCarrier = carrier;
438 bool needDisconnect = strCarrier.find("udp") == (size_t)0;
439 needDisconnect |= strCarrier.find("mcast") == (size_t)0;
440 if(needDisconnect == false) {
441 if (NetworkBase::isConnected(from, to, style)) {
442 return true;
443 }
444 }
445
446 NetworkBase::connect(from, to, style);
447 if(!connected(from, to, carrier))
448 {
449 strError = "cannot connect ";
450 strError +=from;
451 strError += " to " + std::string(to);
452 return false;
453 }
454 }
455 else
456 {
457 std::string topic = std::string("topic:/") + std::string(from) + std::string(to);
458 NetworkBase::connect(from, topic, style);
459 NetworkBase::connect(topic, to, style);
460 if(!connected(from, to, carrier))
461 {
462 strError = "a persistent connection from ";
463 strError +=from;
464 strError += " to " + std::string(to);
465 strError += " is created but not connected.";
466 return false;
467 }
468
469 }
470
471 return true;
472}
473
474bool YarpBroker::disconnect(const char* from, const char* to, const char* carrier)
475{
476
477 if(!from)
478 {
479 strError = "no source port is introduced.";
480 return false;
481 }
482
483 if(!to)
484 {
485 strError = "no destination port is introduced.";
486 return false;
487 }
488
489 /*
490 if(!exists(from))
491 {
492 strError = from;
493 strError += " does not exist.";
494 return true;
495 }
496
497 if(!exists(to))
498 {
499 strError = to;
500 strError += " does not exist.";
501 return true;
502 }
503 */
504
505 if (!connected(from, to, carrier)) {
506 return true;
507 }
508
509 ContactStyle style;
510 style.quiet = true;
512 style.carrier = carrier;
513 if(!NetworkBase::disconnect(from, to, style))
514 {
515 strError = "cannot disconnect ";
516 strError +=from;
517 strError += " from " + std::string(to);
518 return false;
519 }
520 return true;
521
522}
523
524bool YarpBroker::exists(const char* szport)
525{
526 ContactStyle style;
527 style.quiet = true;
529 return NetworkBase::exists(szport, style);
530}
531
532const char* YarpBroker::requestRpc(const char* szport, const char* request, double timeout)
533{
534 if ((szport == nullptr) || (request == nullptr)) {
535 return nullptr;
536 }
537
538 if (!exists(szport)) {
539 return nullptr;
540 }
541
542 // opening the port
543 yarp::os::Port port;
544 port.setTimeout((float)((timeout>0.0) ? timeout : CONNECTION_TIMEOUT));
545 if (!port.open("...")) {
546 return nullptr;
547 }
548
549 ContactStyle style;
550 style.quiet = true;
551 style.timeout = (timeout>0.0) ? timeout : CONNECTION_TIMEOUT;
552 bool ret;
553 for(int i=0; i<10; i++) {
554 ret = NetworkBase::connect(port.getName(), szport, style);
555 if (ret) {
556 break;
557 }
558 SystemClock::delaySystem(1.0);
559 }
560
561 if(!ret) {
562 port.close();
563 return nullptr;
564 }
565
566 Bottle msg, response;
567 msg.fromString(request);
568 ret = port.write(msg, response);
569 NetworkBase::disconnect(port.getName(), szport);
570 if(!response.size() || !ret) {
571 port.close();
572 return nullptr;
573 }
574
575 port.close();
576 return response.toString().c_str();
577}
578
579bool YarpBroker::connected(const char* from, const char* to, const char* carrier)
580{
581 if (!exists(from) || !exists(to)) {
582 return false;
583 }
584 ContactStyle style;
585 style.quiet = true;
587 style.carrier = carrier;
588 return NetworkBase::isConnected(from, to, style);
589}
590
591bool YarpBroker::getSystemInfo(const char* server, SystemInfoSerializer& info)
592{
593 if (!strlen(server)) {
594 return false;
595 }
596 if (!semParam.check()) {
597 return false;
598 }
599
600 yarp::os::Port port;
601 // opening the port
603 if(!port.open("...")) {
604 __trace_message.clear();
605 semParam.post();
606 return false;
607 }
608
609 yarp::os::Bottle msg, grp;
610 grp.clear();
611 grp.addString("sysinfo");
612 msg.addList() = grp;
613
614 ContactStyle style;
615 style.quiet = true;
617 //style.carrier = carrier;
618
619
620 __trace_message = "(getSystemInfo) connecting to " + std::string(port.getName());
621 bool connected = yarp::os::NetworkBase::connect(port.getName(), server, style);
622 if(!connected)
623 {
624 port.close();
625 strError = std::string("Cannot connect to ") + std::string(server);
626 __trace_message.clear();
627 semParam.post();
628 return false;
629 }
630
631 __trace_message = "(getSystemInfo) writing to " + std::string(port.getName());
632 bool ret = port.write(msg, info);
633 __trace_message = "(getSystemInfo) disconnecting from " + std::string(port.getName());
634 NetworkBase::disconnect(port.getName(), server);
635
636 if(!ret)
637 {
638 port.close();
639 strError = std::string(server) + std::string(" does not respond");
640 __trace_message.clear();
641 semParam.post();
642 return false;
643 }
644
645 port.close();
646 __trace_message.clear();
647 semParam.post();
648 return true;
649}
650
651bool YarpBroker::getAllPorts(std::vector<std::string> &ports)
652{
653 ContactStyle style;
654 style.quiet = true;
656 Bottle cmd, reply;
657 cmd.addString("list");
658
659 bool ret = NetworkBase::writeToNameServer(cmd, reply, style);
660 if (!ret)
661 {
662 strError = "Failed to reach name server\n";
663 return false;
664 }
665
666 if ((reply.size() != 1) || (!reply.get(0).isString())) {
667 return false;
668 }
669
670 std::string str = reply.get(0).asString();
671 const char* delm = "registration name ";
672 size_t pos1, pos2;
673 while((pos1 = str.find(delm)) != std::string::npos)
674 {
675 str = str.substr(pos1+strlen(delm));
676 if ((pos2 = str.find(' ')) != std::string::npos) {
677 ports.push_back(str.substr(0, pos2));
678 }
679 }
680
681 return true;
682}
683
684bool YarpBroker::getAllProcesses(const char* server,
685 ProcessContainer& processes)
686{
687 if (!strlen(server)) {
688 return false;
689 }
690
691 processes.clear();
692 strError.clear();
693 yarp::os::Bottle msg,grp,response;
694
695 grp.clear();
696 grp.addString("ps");
697 msg.addList()=grp;
698
699 int ret = SendMsg(msg, server, response, 3.0);
700 if((ret == YARPRUN_OK) || (ret == YARPRUN_NORESPONSE))
701 {
702 for(size_t i=0; i<response.size(); i++)
703 {
704 Process proc;
705 std::string sprc;
706 if (response.get(i).check("pid")) {
707 proc.pid = response.get(i).find("pid").asInt32();
708 }
709 if (response.get(i).check("cmd")) {
710 sprc = response.get(i).find("cmd").asString();
711 }
712 if (response.get(i).check("env") && response.get(i).find("env").asString().length()) {
713 sprc.append("; ").append(response.get(i).find("env").asString());
714 }
715 proc.command = sprc;
716 processes.push_back(proc);
717 }
718 return true;
719 }
720
721 strError = "cannot ask ";
722 strError += server;
723 strError += " to give the list of running processes.";
724 strError += yarprun_err_msg[ret];
726 strError += std::string(" due to " + __trace_message);
727 }
728 return false;
729}
730
731
732bool YarpBroker::rmconnect(const char* from, const char* to)
733{
734 std::string topic = std::string(from) + std::string(to);
735 Bottle cmd, reply;
736 cmd.addString("untopic");
737 cmd.addString(topic.c_str());
738 return NetworkBase::write(NetworkBase::getNameServerContact(),
739 cmd,
740 reply,
741 false,
742 true,
744}
745
746bool YarpBroker::setQos(const char* from, const char *to,
747 const char *qosFrom, const char *qosTo) {
748 strError.clear();
749
750 if (qosFrom && qosTo && !strlen(qosFrom) && !strlen(qosTo)) {
751 return true;
752 }
753
754 QosStyle styleFrom;
755 QosStyle styleTo;
756 if(qosFrom != nullptr && strlen(qosFrom)) {
757 if(!getQosFromString(qosFrom, styleFrom)) {
758 strError = "Error in parsing Qos properties of " + std::string(from);
759 return false;
760 }
761 }
762 if (qosTo != nullptr && strlen(qosTo)) {
763 if(!getQosFromString(qosTo, styleTo)) {
764 strError = "Error in parsing Qos properties of " + std::string(to);
765 return false;
766 }
767 }
768 return NetworkBase::setConnectionQos(from, to, styleFrom, styleTo, true);
769}
770
771bool YarpBroker::getQosFromString(const char* qos, yarp::os::QosStyle& style) {
772 std::string strQos(qos);
773 transform(strQos.begin(), strQos.end(), strQos.begin(),
774 (int(*)(int))toupper);
775 strQos.erase( std::remove_if( strQos.begin(), strQos.end(), ::isspace ), strQos.end() );
776
777 //level:high; priority:10; policy:1
778 std::stringstream ss(strQos); // Turn the string into a stream.
779 std::string prop;
780 while(getline(ss, prop, ';')) {
781 size_t p = prop.find(':');
782 if (p != prop.npos) {
783 std::string key = prop.substr(0, p);
784 std::string value = prop.substr(p+1);
785 if (key.length() > 0 && value.length() > 0) {
786 if (key == "LEVEL" || key=="DSCP" || key == "TOS") {
787 if (!style.setPacketPriority(prop)) {
788 return false;
789 }
790 }
791 else if (key == "PRIORITY") {
792 char* p;
793 int prio = strtol(value.c_str(), &p, 10);
794 style.setThreadPriority(prio);
795 }
796 else if (key == "POLICY") {
797 char* p;
798 int policy = strtol(value.c_str(), &p, 10);
799 style.setThreadPolicy(policy);
800 }
801 }
802 }
803 }
804 return true;
805}
806
807const char* YarpBroker::error()
808{
809 return strError.c_str();
810}
811
812
813bool YarpBroker::timeout(double base, double timeout)
814{
815 SystemClock::delaySystem(1.0);
816 if ((SystemClock::nowSystem() - base) > timeout) {
817 return true;
818 }
819 return false;
820}
821
823{
824 if (!strStdioUUID.size()) {
825 return false;
826 }
827
828 std::string strStdioPort = strStdioUUID + "/stdout";
829 stdioPort.open("...");
830
831 double base = SystemClock::nowSystem();
832 ContactStyle style;
833 style.quiet = true;
835 while(!timeout(base, 5.0))
836 {
837 if (NetworkBase::connect(strStdioPort, stdioPort.getName(), style)) {
838 return true;
839 }
840 }
841
842 strError = "Cannot connect to stdio port ";
843 strError += strStdioPort;
844 stdioPort.close();
845 return false;
846}
847
848
850{
851 Bottle *input;
852 if( (input=stdioPort.read(false)) && eventSink)
853 {
854 for (size_t i = 0; i < input->size(); i++) {
855 eventSink->onBrokerStdout(input->get(i).asString().c_str());
856 }
857 }
858}
859
860
862{
863 NetworkBase::disconnect(stdioPort.getName(), strStdioUUID);
864 stdioPort.close();
865}
866
867
868int YarpBroker::SendMsg(Bottle& msg, std::string target, Bottle& response, float fTimeout)
869{
870 if (!exists(target.c_str())) {
872 }
873
874 if (!semParam.check()) {
876 }
877
878 // opening the port
879 yarp::os::Port port;
880 port.setTimeout(fTimeout);
881 if(!port.open("..."))
882 {
883 __trace_message.clear();
884 semParam.post();
886 }
887
888 ContactStyle style;
889 style.quiet = true;
891
892 bool ret;
893 __trace_message = "(SendMsg) connecting to " + std::string(target);
894 for(int i=0; i<10; i++)
895 {
896 ret = NetworkBase::connect(port.getName(), target, style);
897 if (ret) {
898 break;
899 }
900 SystemClock::delaySystem(1.0);
901 }
902
903 if(!ret)
904 {
905 port.close();
906 __trace_message.clear();
907 semParam.post();
909 }
910
911 __trace_message = "(SendMsg) writing to " + std::string(target);
912 ret = port.write(msg, response);
913 __trace_message = "(SendMsg) disconnecting from " + std::string(target);
914 NetworkBase::disconnect(port.getName(),target);
915 __trace_message.clear();
916 semParam.post();
917
918 if(!response.size() || !ret) {
919 port.close();
920 return YARPRUN_NORESPONSE;
921 }
922
923 port.close();
924
925 return YARPRUN_OK;
926}
927
928
929int YarpBroker::requestServer(Property& config)
930{
932
933 // USE A YARP RUN SERVER TO MANAGE STDIO
934 //
935 // client -> stdio server -> cmd server
936 //
937 if (config.check("cmd") && config.check("stdio"))
938 {
939 if (config.find("stdio").asString()=="") {return YARPRUN_UNDEF; }
940 if (config.find("cmd").asString()=="") {return YARPRUN_UNDEF; }
941 if (!config.check("as") || config.find("as").asString()=="") { return YARPRUN_UNDEF; }
942 if (!config.check("on") || config.find("on").asString()=="") { return YARPRUN_UNDEF; }
943
944 msg.addList()=config.findGroup("stdio");
945 msg.addList()=config.findGroup("cmd");
946 msg.addList()=config.findGroup("as");
947 msg.addList()=config.findGroup("on");
948
949 if (config.check("workdir")) {
950 msg.addList() = config.findGroup("workdir");
951 }
952 if (config.check("geometry")) {
953 msg.addList() = config.findGroup("geometry");
954 }
955 if (config.check("hold")) {
956 msg.addList() = config.findGroup("hold");
957 }
958 if (config.check("env")) {
959 msg.addList() = config.findGroup("env");
960 }
961
962 Bottle response;
963 int ret = SendMsg(msg, config.find("stdio").asString(),
964 response, CONNECTION_TIMEOUT);
965 if (ret != YARPRUN_OK) {
966 return ret;
967 }
968
969 if (response.size() > 2) {
970 strStdioUUID = response.get(2).asString();
971 }
972
973 return ((response.get(0).asInt32()>0)?YARPRUN_OK:YARPRUN_UNDEF);
974 }
975
976 // DON'T USE A RUN SERVER TO MANAGE STDIO
977 //
978 // client -> cmd server
979 //
980 if (config.check("cmd"))
981 {
982 if (config.find("cmd").asString()=="") { return YARPRUN_UNDEF; }
983 if (!config.check("as") || config.find("as").asString()=="") {return YARPRUN_UNDEF; }
984 if (!config.check("on") || config.find("on").asString()=="") {return YARPRUN_UNDEF; }
985
986 msg.addList()=config.findGroup("cmd");
987 msg.addList()=config.findGroup("as");
988
989 if (config.check("workdir")) {
990 msg.addList() = config.findGroup("workdir");
991 }
992 if (config.check("env")) {
993 msg.addList() = config.findGroup("env");
994 }
995
996 Bottle response;
997 int ret = SendMsg(msg, config.find("on").asString(),
998 response, CONNECTION_TIMEOUT);
999 if (ret != YARPRUN_OK) {
1000 return ret;
1001 }
1002
1003 return ((response.get(0).asInt32()>0)?YARPRUN_OK:YARPRUN_UNDEF);
1004 }
1005
1006 return YARPRUN_UNDEF;
1007}
bool ret
virtual void onBrokerStdout(const char *msg)
Definition: broker.h:22
BrokerEventSink * eventSink
Definition: broker.h:68
unsigned int generateID()
Definition: broker.cpp:26
void threadRelease() override
Release method.
Definition: yarpbroker.cpp:861
const char * error() override
Definition: yarpbroker.cpp:807
bool connected(const char *from, const char *to, const char *carrier) override
Definition: yarpbroker.cpp:579
bool getAllPorts(std::vector< std::string > &stingList)
Definition: yarpbroker.cpp:651
const char * requestRpc(const char *szport, const char *request, double timeout) override
Definition: yarpbroker.cpp:532
bool init() override
Definition: yarpbroker.cpp:61
bool getSystemInfo(const char *server, yarp::os::SystemInfoSerializer &info)
Definition: yarpbroker.cpp:591
bool getAllProcesses(const char *server, ProcessContainer &processes)
Definition: yarpbroker.cpp:684
void detachStdout() override
Definition: yarpbroker.cpp:382
bool connect(const char *from, const char *to, const char *carrier, bool persist=false) override
connection broker
Definition: yarpbroker.cpp:411
void fini() override
Definition: yarpbroker.cpp:53
bool rmconnect(const char *from, const char *to)
Definition: yarpbroker.cpp:732
bool threadInit() override
Initialization method.
Definition: yarpbroker.cpp:822
bool setQos(const char *from, const char *to, const char *qosFrom, const char *qosTo)
Definition: yarpbroker.cpp:746
bool disconnect(const char *from, const char *to, const char *carrier) override
Definition: yarpbroker.cpp:474
void run() override
Loop function.
Definition: yarpbroker.cpp:849
bool attachStdout() override
Definition: yarpbroker.cpp:377
bool exists(const char *port) override
Definition: yarpbroker.cpp:524
A simple collection of objects that can be described and transmitted in a portable way.
Definition: Bottle.h:64
void fromString(const std::string &text)
Initializes bottle from a string.
Definition: Bottle.cpp:204
Bottle & addList()
Places an empty nested list in the bottle, at the end of the list.
Definition: Bottle.cpp:182
size_type size() const
Gets the number of elements in the bottle.
Definition: Bottle.cpp:251
Value & get(size_type index) const
Reads a Value v from a certain part of the list.
Definition: Bottle.cpp:246
void clear()
Empties the bottle of any objects it contains.
Definition: Bottle.cpp:121
void addInt32(std::int32_t x)
Places a 32-bit integer in the bottle, at the end of the list.
Definition: Bottle.cpp:140
void addString(const char *str)
Places a string in the bottle, at the end of the list.
Definition: Bottle.cpp:170
std::string toString() const override
Gives a human-readable textual representation of the bottle.
Definition: Bottle.cpp:211
std::string getName() const override
Get name of port.
void close() override
Stop port activity.
bool open(const std::string &name) override
Start port operation, with a specific name, with automatically-chosen network parameters.
T * read(bool shouldWait=true) override
Read an available object from the port.
Preferences for how to communicate with a contact.
Definition: ContactStyle.h:23
double timeout
Set a timeout for communication (in units of seconds, fractional seconds allowed).
Definition: ContactStyle.h:46
bool quiet
Suppress all outputs and warnings.
Definition: ContactStyle.h:35
std::string carrier
Request that communication be made using a particular carrier.
Definition: ContactStyle.h:52
virtual std::string getName() const
Get name of port.
Definition: Contactable.cpp:14
static bool connect(const std::string &src, const std::string &dest, const std::string &carrier="", bool quiet=true)
Request that an output port connect to an input port.
Definition: Network.cpp:682
An abstraction for a periodic thread.
A mini-server for network communication.
Definition: Port.h:46
bool write(const PortWriter &writer, const PortWriter *callback=nullptr) const override
Write an object to the port.
Definition: Port.cpp:436
bool setTimeout(float timeout)
Set a timeout on network operations.
Definition: Port.cpp:634
void close() override
Stop port activity.
Definition: Port.cpp:363
bool open(const std::string &name) override
Start port operation, with a specific name, with automatically-chosen network parameters.
Definition: Port.cpp:79
A class for storing options and configuration information.
Definition: Property.h:33
Value & find(const std::string &key) const override
Gets a value corresponding to a given keyword.
Definition: Property.cpp:1051
void put(const std::string &key, const std::string &value)
Associate the given key with the given string.
Definition: Property.cpp:1015
bool check(const std::string &key) const override
Check if there exists a property of the given name.
Definition: Property.cpp:1041
void clear()
Remove all associations.
Definition: Property.cpp:1057
Bottle & findGroup(const std::string &key) const override
Gets a list corresponding to a given keyword.
Definition: Property.cpp:1142
Preferences for the port's Quality of Service.
Definition: QosStyle.h:23
void setThreadPriority(int priority)
sets the communication thread priority level
Definition: QosStyle.h:127
bool setPacketPriority(const std::string &priority)
sets the packet priority from a string.
Definition: QosStyle.cpp:39
void setThreadPolicy(int policy)
sets the communication thread scheduling policy
Definition: QosStyle.h:137
void wait()
Decrement the counter, even if we must wait to do that.
Definition: Semaphore.cpp:96
bool check()
Decrement the counter, unless that would require waiting.
Definition: Semaphore.cpp:106
void post()
Increment the counter.
Definition: Semaphore.cpp:111
A helper class to pass the SystemInfo object around the YARP network.
virtual bool isString() const
Checks if value is a string.
Definition: Value.cpp:156
virtual std::int32_t asInt32() const
Get 32-bit integer value.
Definition: Value.cpp:204
Value & find(const std::string &key) const override
Gets a value corresponding to a given keyword.
Definition: Value.cpp:327
bool check(const std::string &key) const override
Check if there exists a property of the given name.
Definition: Value.cpp:321
virtual std::string asString() const
Get string value.
Definition: Value.cpp:234
std::stringstream OSTRINGSTREAM
Definition: utility.h:48
std::vector< Process > ProcessContainer
Definition: primresource.h:168
The components from which ports and connections are built.
An interface to the operating system, including Port based communication.
bool write(const ImageOf< PixelRgb > &src, const std::string &dest, image_fileformat format=FORMAT_PPM)
Definition: ImageFile.cpp:1091
#define KILL_TIMEOUT
Definition: yarpbroker.cpp:21
#define CONNECTION_TIMEOUT
Definition: yarpbroker.cpp:18
#define YARPRUN_UNDEF
Definition: yarpbroker.cpp:16
#define YARPRUN_CONNECTION_TIMOUT
Definition: yarpbroker.cpp:14
#define YARPRUN_NORESPONSE
Definition: yarpbroker.cpp:12
#define YARPRUN_NOCONNECTION
Definition: yarpbroker.cpp:13
#define RUN_TIMEOUT
Definition: yarpbroker.cpp:19
#define YARPRUN_SEMAPHORE_PARAM
Definition: yarpbroker.cpp:15
#define STOP_TIMEOUT
Definition: yarpbroker.cpp:20
const char * yarprun_err_msg[]
Definition: yarpbroker.cpp:28
#define YARPRUN_OK
Definition: yarpbroker.cpp:11
#define EVENT_THREAD_PERIOD
Definition: yarpbroker.cpp:22