YARP
Yet Another Robot Platform
localbroker.cpp
Go to the documentation of this file.
1/*
2 * SPDX-FileCopyrightText: 2006-2021 Istituto Italiano di Tecnologia (IIT)
3 * SPDX-License-Identifier: BSD-3-Clause
4 */
5
7#include <yarp/conf/string.h>
8
9#include <csignal>
10#include <cstring>
11
12#define RUN_TIMEOUT 10.0 //seconds
13#define STOP_TIMEOUT 15.0
14#define KILL_TIMEOUT 10.0
15#define CONNECTION_TIMEOUT 2.0
16
17#define WRITE_TO_PIPE 1
18#define READ_FROM_PIPE 0
19
20#if defined(_WIN32)
21 #include<Windows.h>
22 #define SIGKILL 9
23#else
24 #include <cstdlib>
25 #include <sys/types.h>
26 #include <sys/stat.h>
27 #include <fcntl.h>
28 #include <cerrno>
29 #include <unistd.h>
30 #include <cstring>
31
32 #define PIPE_TIMEOUT 0
33 #define PIPE_EVENT 1
34 #define PIPE_SIGNALED 2
35 #define C_MAXARGS 128 // max number of the command parameters
36#endif
37
38using namespace yarp::os;
39using namespace yarp::manager;
40
41
42#if defined(_WIN32)
43class LocalTerminateParams
44{
45public:
46 LocalTerminateParams(DWORD id) {
47 nWin = 0;
48 dwID = id;
49 }
50
51 ~LocalTerminateParams(){}
52 int nWin;
53 DWORD dwID;
54};
55
56BOOL CALLBACK LocalTerminateAppEnum(HWND hwnd, LPARAM lParam)
57{
58 LocalTerminateParams* params=(LocalTerminateParams*)lParam;
59 DWORD dwID;
60 GetWindowThreadProcessId(hwnd, &dwID);
61 if (dwID==params->dwID)
62 {
63 params->nWin++;
64 PostMessage(hwnd,WM_CLOSE,0,0);
65 }
66 return TRUE ;
67}
68#if defined(_WIN64)
69volatile LONGLONG uniquePipeNumber = 0;
70#else
71volatile LONG uniquePipeNumber = 0;
72#endif
73
74/*
75* TODO: check deeply for asyn PIPE
76*/
77BOOL CreatePipeAsync(
78 OUT LPHANDLE lpReadPipe,
79 OUT LPHANDLE lpWritePipe,
80 IN LPSECURITY_ATTRIBUTES lpPipeAttributes,
81 IN DWORD nSize)
82{
83 HANDLE ReadPipeHandle, WritePipeHandle;
84 DWORD dwError;
85 char PipeNameBuffer[MAX_PATH];
86 nSize = (nSize ==0) ? 100*8096: nSize;
87
88#if defined(_WIN64)
89 InterlockedIncrement64(&uniquePipeNumber);
90#else
91 InterlockedIncrement(&uniquePipeNumber);
92#endif
93
94 sprintf( PipeNameBuffer,
95 "\\\\.\\Pipe\\RemoteExeAnon.%08x.%08x",
96 GetCurrentProcessId(),
97 uniquePipeNumber
98 );
99
100 ReadPipeHandle = CreateNamedPipeA(
101 (LPSTR)PipeNameBuffer,
102 PIPE_ACCESS_INBOUND | FILE_FLAG_OVERLAPPED,
103 PIPE_TYPE_BYTE | PIPE_WAIT, //PIPE_NOWAIT,
104 1, // Number of pipes
105 nSize, // Out buffer size
106 nSize, // In buffer size
107 120 * 1000, // Timeout in ms
108 lpPipeAttributes
109 );
110
111 if (! ReadPipeHandle) {
112 return FALSE;
113 }
114
115 WritePipeHandle = CreateFileA(
116 (LPSTR)PipeNameBuffer,
117 GENERIC_WRITE,
118 0, // No sharing
119 lpPipeAttributes,
120 OPEN_EXISTING,
121 FILE_ATTRIBUTE_NORMAL | FILE_FLAG_OVERLAPPED,
122 nullptr // Template file
123 );
124
125 if (INVALID_HANDLE_VALUE == WritePipeHandle)
126 {
127 dwError = GetLastError();
128 CloseHandle( ReadPipeHandle );
129 SetLastError(dwError);
130 return FALSE;
131 }
132
133 *lpReadPipe = ReadPipeHandle;
134 *lpWritePipe = WritePipeHandle;
135 return( TRUE );
136}
137
138#endif
139
140LocalBroker::LocalBroker()
141{
142 bOnlyConnector = bInitialized = false;
143 ID = 0;
144 fd_stdout = nullptr;
146}
147
148
150{
151 fini();
152}
153
155{
156 if (Thread::isRunning()) {
157 Thread::stop();
158 }
159}
160
162{
163 /*
164 if(!NetworkBase::checkNetwork(5.0))
165 {
166 strError = "YARP network server is not up.";
167 return false;
168 }
169 */
170 bInitialized = true;
171 bOnlyConnector = true;
172 return true;
173}
174
175bool LocalBroker::init(const char* szcmd, const char* szparam,
176 const char* szhost, const char* szstdio,
177 const char* szworkdir, const char* szenv )
178{
179
180 strCmd.clear();
181 strParam.clear();
182 strHost.clear();
183 strStdio.clear();
184 strWorkdir.clear();
185 strTag.clear();
186 strEnv.clear();
187
188 if(!szcmd)
189 {
190 strError = "command is not specified.";
191 return false;
192 }
193 strCmd = szcmd;
194 if (szparam && strlen(szparam)) {
195 strParam = szparam;
196 }
197
198 if (szhost && strlen(szhost)) {
199 strHost = szhost;
200 }
201 if (szworkdir && strlen(szworkdir)) {
202 strWorkdir = szworkdir;
203 }
204
205 if(szstdio && strlen(szstdio))
206 {
207 if (szstdio[0] != '/') {
208 strStdio = std::string("/") + std::string(szstdio);
209 } else {
210 strStdio = szstdio;
211 }
212 }
213
214 if (szenv && strlen(szenv)) {
215 strEnv = szenv;
216 }
217
218 /*
219 OSTRINGSTREAM sstrID;
220 sstrID<<ID;
221 strTag = strHost + strCmd + sstrID.str();
222
223 if(!NetworkBase::checkNetwork(5.0))
224 {
225 strError = "YARP network server is not up.";
226 semParam.post();
227 return false;
228 }
229 */
230
231#if defined(_WIN32)
232 // do nothing
233 bInitialized = true;
234 return true;
235#else
236 /* avoiding zombie */
237 struct sigaction new_action;
238 new_action.sa_handler = SIG_IGN;
239 sigemptyset (&new_action.sa_mask);
240 new_action.sa_flags = 0;
241 sigaction (SIGCHLD, &new_action, nullptr);
242 bInitialized = true;
243 return true;
244#endif
245
246}
247
248
250{
251 if (!bInitialized) {
252 return false;
253 }
254 if (bOnlyConnector) {
255 return false;
256 }
257
258 if (running()) {
259 return true;
260 }
261
262 strError.clear();
263 ID = ExecuteCmd();
264 if (!ID) {
265 return false;
266 }
267
268 if(running())
269 {
270 return true;
271 }
272 return false;
273}
274
276{
277 if (!bInitialized) {
278 return true;
279 }
280 if (bOnlyConnector) {
281 return false;
282 }
283
284 strError.clear();
285#if defined(_WIN32)
286 stopCmd(ID);
287 stopStdout();
288#else
289 stopStdout();
290 stopCmd(ID);
291#endif
292
293 double base = SystemClock::nowSystem();
294 while(!timeout(base, STOP_TIMEOUT))
295 {
296 if (!running()) {
297 return true;
298 }
299 }
300
301 strError = "Timeout! cannot stop ";
302 strError += strCmd;
303 strError += " on ";
304 strError += strHost;
305 return false;
306}
307
309{
310 if (!bInitialized) {
311 return true;
312 }
313 if (bOnlyConnector) {
314 return false;
315 }
316
317 strError.clear();
318
319#if defined(_WIN32)
320 killCmd(ID);
321 stopStdout();
322#else
323 stopStdout();
324 stopCmd(ID);
325#endif
326
327 double base = SystemClock::nowSystem();
328 while(!timeout(base, KILL_TIMEOUT))
329 {
330 if (!running()) {
331 return true;
332 }
333 }
334
335 strError = "Timeout! cannot kill ";
336 strError += strCmd;
337 strError += " on ";
338 strError += strHost;
339 return false;
340}
341
342
344{
345 if (!bInitialized) {
346 return 0;
347 }
348 if (bOnlyConnector) {
349 return 0;
350 }
351 return (psCmd(ID))?1:0;
352}
353
354
358bool LocalBroker::connect(const char* from, const char* to,
359 const char* carrier, bool persist)
360{
361
362 if(!from)
363 {
364 strError = "no source port is introduced.";
365 return false;
366 }
367
368 if(!to)
369 {
370 strError = "no destination port is introduced.";
371 return false;
372 }
373
374 if(!exists(from))
375 {
376 strError = from;
377 strError += " does not exist.";
378 return false;
379 }
380
381 if(!exists(to))
382 {
383 strError = to;
384 strError += " does not exist.";
385 return false;
386 }
387
388 if(!NetworkBase::connect(from, to, carrier) || !connected(from, to, carrier))
389 {
390 strError = "cannot connect ";
391 strError +=from;
392 strError += " to " + std::string(to);
393 return false;
394 }
395 return true;
396}
397
398bool LocalBroker::disconnect(const char* from, const char* to, const char *carrier)
399{
400
401 if(!from)
402 {
403 strError = "no source port is introduced.";
404 return false;
405 }
406
407 if(!to)
408 {
409 strError = "no destination port is introduced.";
410 return false;
411 }
412
413 if(!exists(from))
414 {
415 strError = from;
416 strError += " does not exist.";
417 return true;
418 }
419
420 if(!exists(to))
421 {
422 strError = to;
423 strError += " does not exist.";
424 return true;
425 }
426
427 if (!connected(from, to, carrier)) {
428 return true;
429 }
430
431 if(!NetworkBase::disconnect(from, to))
432 {
433 strError = "cannot disconnect ";
434 strError +=from;
435 strError += " from " + std::string(to);
436 return false;
437 }
438 return true;
439
440}
441
442bool LocalBroker::exists(const char* port)
443{
444 return NetworkBase::exists(port);
445}
446
447
448const char* LocalBroker::requestRpc(const char* szport, const char* request, double timeout)
449{
450 if ((szport == nullptr) || (request == nullptr)) {
451 return nullptr;
452 }
453
454 if (!exists(szport)) {
455 return nullptr;
456 }
457
458 // opening the port
459 yarp::os::Port port;
460 port.setTimeout((float)((timeout>0.0) ? timeout : CONNECTION_TIMEOUT));
461 if (!port.open("...")) {
462 return nullptr;
463 }
464
465 ContactStyle style;
466 style.quiet = true;
467 style.timeout = (timeout>0.0) ? timeout : CONNECTION_TIMEOUT;
468 bool ret;
469 for(int i=0; i<10; i++) {
470 ret = NetworkBase::connect(port.getName(), szport, style);
471 if (ret) {
472 break;
473 }
474 SystemClock::delaySystem(1.0);
475 }
476
477 if(!ret) {
478 port.close();
479 return nullptr;
480 }
481
482 Bottle msg, response;
483 msg.fromString(request);
484 ret = port.write(msg, response);
485 NetworkBase::disconnect(port.getName(), szport);
486 if(!response.size() || !ret) {
487 port.close();
488 return nullptr;
489 }
490
491 port.close();
492 return response.toString().c_str();
493}
494
495bool LocalBroker::connected(const char* from, const char* to, const char* carrier)
496{
497 if (!exists(from) || !exists(to)) {
498 return false;
499 }
500 return NetworkBase::isConnected(from, to);
501}
502
503
505{
506 return strError.c_str();
507}
508
510{
511 if (Thread::isRunning()) {
512 return true;
513 }
514 if(!running())
515 {
516 strError = "Module is not running";
517 return false;
518 }
519 return startStdout();
520}
521
523{
524 stopStdout();
525}
526
527
528bool LocalBroker::timeout(double base, double timeout)
529{
530 SystemClock::delaySystem(1.0);
531 if ((SystemClock::nowSystem() - base) > timeout) {
532 return true;
533 }
534 return false;
535}
536
538{
539 return true;
540}
541
542
544{
545
546#if defined(_WIN32)
547 //windows implementation
548 DWORD dwRead;
549 CHAR buff[1024];
550 while(!Thread::isStopping())
551 {
552 BOOL bRet = ReadFile(read_from_pipe_cmd_to_stdout,
553 buff, 1023, &dwRead, nullptr);
554 if(!bRet)
555 break;
556 buff[dwRead] = (CHAR)0;
557 if(eventSink && strlen(buff))
559 yarp::os::SystemClock::delaySystem(0.5); // this prevents event flooding
560 }
561#else
562 while(!Thread::isStopping())
563 {
564 if(waitPipeSignal(pipe_to_stdout[READ_FROM_PIPE]) == PIPE_EVENT)
565 {
566 if(fd_stdout)
567 {
568 std::string strmsg;
569 char buff[1024];
570 while (fgets(buff, 1024, fd_stdout)) {
571 strmsg += std::string(buff);
572 }
573 if (eventSink && strmsg.size()) {
574 eventSink->onBrokerStdout(strmsg.c_str());
575 }
576 yarp::os::SystemClock::delaySystem(0.5); // this prevents event flooding
577 }
578 }
579 }
580#endif
581}
582
583
585{
586}
587
588
590{
591 windowMode=m;
592}
593
594
595#if defined(_WIN32)
596
597std::string LocalBroker::lastError2String()
598{
599 int error=GetLastError();
600 char buff[1024];
601 FormatMessage(FORMAT_MESSAGE_FROM_SYSTEM,nullptr,error,0,buff,1024,nullptr);
602 return std::string(buff);
603}
604
605bool LocalBroker::startStdout()
606{
607 if (!CloseHandle(write_to_pipe_cmd_to_stdout))
608 return false;
609 Thread::start();
610 return true;
611}
612
613void LocalBroker::stopStdout()
614{
615 Thread::stop();
616}
617
618int LocalBroker::ExecuteCmd()
619{
620 std::string strCmdLine = strCmd + std::string(" ") + strParam;
621
622 PROCESS_INFORMATION cmd_process_info;
623 STARTUPINFO cmd_startup_info;
624 ZeroMemory(&cmd_process_info,sizeof(PROCESS_INFORMATION));
625 ZeroMemory(&cmd_startup_info,sizeof(STARTUPINFO));
626 cmd_startup_info.cb = sizeof(STARTUPINFO);
627
628
629 std::string strDisplay=getDisplay();
630
631 DWORD dwCreationFlags;
632
633 //these come from xml
634 /*
635 // These are not supported until we find a way to send break signals to
636 // consoles that are not inherited
637 if (strDisplay=="--visible_na")
638 windowMode=WINDOW_VISIBLE;
639 if (strDisplay=="--hidden")
640 windowMode=WINDOW_HIDDEN;
641 */
642
643 // this is for "attach to stoud only"
644 if (windowMode==WINDOW_VISIBLE)
645 {
646 //this is common to all processes
647 cmd_startup_info.dwFlags |= STARTF_USESHOWWINDOW;
648 cmd_startup_info.wShowWindow = SW_SHOWNA;
649 dwCreationFlags=CREATE_NEW_CONSOLE;
650 }
651 if (windowMode==WINDOW_HIDDEN)
652 {
653 // Setting up child process and pipe for stdout (useful for attaching stdout)
654 SECURITY_ATTRIBUTES pipe_sec_attr;
655 pipe_sec_attr.nLength = sizeof(SECURITY_ATTRIBUTES);
656 pipe_sec_attr.bInheritHandle = TRUE;
657 pipe_sec_attr.lpSecurityDescriptor = nullptr;
658 CreatePipeAsync(&read_from_pipe_cmd_to_stdout,
659 &write_to_pipe_cmd_to_stdout,
660 &pipe_sec_attr, 0);
661
662 cmd_startup_info.hStdError = write_to_pipe_cmd_to_stdout;
663 cmd_startup_info.hStdOutput = write_to_pipe_cmd_to_stdout;
664
665 cmd_startup_info.dwFlags |= STARTF_USESTDHANDLES;
666
667 dwCreationFlags=CREATE_NEW_PROCESS_GROUP; //CREATE_NEW_CONSOLE|CREATE_NEW_PROCESS_GROUP,
668 }
669
670
671
672 /*
673 * setting environment variable for child process
674 */
675 TCHAR chNewEnv[32767];
676
677 // Get a pointer to the env block.
678 LPTCH chOldEnv = GetEnvironmentStrings();
679
680 // copying parent env variables
681 LPTSTR lpOld = (LPTSTR) chOldEnv;
682 LPTSTR lpNew = (LPTSTR) chNewEnv;
683 while (*lpOld)
684 {
685 lstrcpy(lpNew, lpOld);
686 lpOld += lstrlen(lpOld) + 1;
687 lpNew += lstrlen(lpNew) + 1;
688 }
689
690 // adding new env variables
691 std::string cstrEnvName;
692 if(strEnv.size())
693 {
694 auto ss = yarp::conf::string::split(strEnv, ';');
695 for (const auto& s : ss) {
696 lstrcpy(lpNew, (LPTCH) s.c_str());
697 lpNew += lstrlen(lpNew) + 1;
698 }
699 }
700
701 // closing env block
702 *lpNew = (TCHAR)0;
703
704 bool bWorkdir=(strWorkdir.size()) ? true : false;
705 std::string strWorkdirOk = bWorkdir ? strWorkdir+std::string("\\") : "";
706
707 BOOL bSuccess=CreateProcess(nullptr, // command name
708 (char*)(strWorkdirOk+strCmdLine).c_str(), // command line
709 nullptr, // process security attributes
710 nullptr, // primary thread security attributes
711 TRUE, // handles are inherited
712 dwCreationFlags,
713 (LPVOID) chNewEnv, // use new environment
714 bWorkdir?strWorkdirOk.c_str():nullptr, // working directory
715 &cmd_startup_info, // STARTUPINFO pointer
716 &cmd_process_info); // receives PROCESS_INFORMATION
717
718 if (!bSuccess && bWorkdir)
719 {
720 bSuccess=CreateProcess(nullptr, // command name
721 (char*)(strCmdLine.c_str()), // command line
722 nullptr, // process security attributes
723 nullptr, // primary thread security attributes
724 TRUE, // handles are inherited
725 dwCreationFlags,
726 (LPVOID) chNewEnv, // use new environment
727 strWorkdirOk.c_str(), // working directory
728 &cmd_startup_info, // STARTUPINFO pointer
729 &cmd_process_info); // receives PROCESS_INFORMATION
730 }
731
732 // deleting old environment variable
733 FreeEnvironmentStrings(chOldEnv);
734
735 CloseHandle(cmd_process_info.hProcess);
736 CloseHandle(cmd_process_info.hThread);
737
738 if (!bSuccess)
739 {
740 strError = std::string("Can't execute command because ") + lastError2String();
741 return 0;
742 }
743
744 return cmd_process_info.dwProcessId;
745}
746
747bool LocalBroker::psCmd(int pid)
748{
749 HANDLE hProc=OpenProcess(SYNCHRONIZE|PROCESS_QUERY_INFORMATION, FALSE, pid);
750 if (hProc==nullptr)
751 return false;
752
753 DWORD status;
754 GetExitCodeProcess(hProc , &status);
755 CloseHandle(hProc);
756 return (status==STILL_ACTIVE);
757}
758
759bool LocalBroker::killCmd(int pid)
760{
761 HANDLE hProc=OpenProcess(SYNCHRONIZE|PROCESS_TERMINATE, FALSE, pid);
762 if (hProc==nullptr)
763 return false;
764
765 BOOL bRet = TerminateProcess(hProc, 0);
766 CloseHandle(hProc);
767 return bRet ? true : false;
768}
769
770bool LocalBroker::stopCmd(int pid)
771{
772 HANDLE hProc=OpenProcess(SYNCHRONIZE|PROCESS_TERMINATE, FALSE, pid);
773 if (hProc==nullptr)
774 return false;
775
776 LocalTerminateParams params(pid);
777 EnumWindows((WNDENUMPROC)LocalTerminateAppEnum,(LPARAM)&params);
778
779 // I believe we do not need this. It is ignored by console applications created with CREATE_NEW_PROCESS_GROUP
780 GenerateConsoleCtrlEvent(CTRL_C_EVENT, pid);
781
782 //send BREAK_EVENT because we created the process with CREATE_NEW_PROCESS_GROUP
783 GenerateConsoleCtrlEvent(CTRL_BREAK_EVENT, pid);
784
785 CloseHandle(hProc);
786 return true;
787}
788
789#else //for UNIX
790
791bool LocalBroker::psCmd(int pid)
792{
793 if (!pid) {
794 return false;
795 }
796 return !::kill(pid, 0);
797}
798
799
800bool LocalBroker::killCmd(int pid)
801{
802 if (!pid) {
803 return false;
804 }
805 return !::kill(pid, SIGKILL);
806}
807
808
809bool LocalBroker::stopCmd(int pid)
810{
811 if (!pid) {
812 return false;
813 }
814 return !::kill(pid, SIGTERM);
815}
816
817int LocalBroker::waitPipe(int pipe_fd)
818{
819 struct timeval timeout;
820 int rc;
821 fd_set fd;
822
823 timeout.tv_sec = 0;
824 timeout.tv_usec = 500000;
825
826 FD_ZERO(&fd);
827 FD_SET(pipe_fd, &fd);
828 rc = select(pipe_fd + 1, &fd, nullptr, nullptr, &timeout);
829 return rc;
830}
831
832
833int LocalBroker::waitPipeSignal(int pipe_fd)
834{
835 struct timespec timeout;
836 fd_set fd;
837
838 timeout.tv_sec = 2;
839 timeout.tv_nsec = 0;
840 FD_ZERO(&fd);
841 FD_SET(pipe_fd, &fd);
842
843 /*
844#if (_POSIX_C_SOURCE >= 200112L) || (_XOPEN_SOURCE >= 600)
845 struct sigaction new_action;
846 new_action.sa_handler = SIG_IGN;
847 sigemptyset (&new_action.sa_mask);
848 new_action.sa_flags = 0;
849 sigaction (SIGUSR1, &new_action, nullptr);
850 sigset_t sset, orgmask;
851 sigemptyset(&sset);
852 sigaddset(&sset, SIGUSR1);
853 pthread_sigmask(SIG_BLOCK, &sset, &orgmask);
854 if(pselect(pipe_fd + 1, &fd, nullptr, nullptr, &timeout, &orgmask))
855 return PIPE_EVENT;
856#endif
857*/
858 if (pselect(pipe_fd + 1, &fd, nullptr, nullptr, &timeout, nullptr)) {
859 return PIPE_EVENT;
860 }
861 return PIPE_TIMEOUT;
862}
863
864
865bool LocalBroker::startStdout()
866{
867 fd_stdout = fdopen(pipe_to_stdout[READ_FROM_PIPE], "r");
868 if(!fd_stdout)
869 {
870 strError = "cannot open pipe. " + std::string(strerror(errno));
871 //close(pipe_to_stdout[READ_FROM_PIPE]);
872 return false;
873 }
874
875 int oflags = fcntl(pipe_to_stdout[READ_FROM_PIPE], F_GETFL);
876 if(fcntl(pipe_to_stdout[READ_FROM_PIPE], F_SETFL, oflags|O_NONBLOCK) == -1)
877 {
878 strError = "cannot set flag on pipe: " + std::string(strerror(errno));
879 //close(pipe_to_stdout[READ_FROM_PIPE]);
880 return false;
881 }
882 Thread::start();
883 return true;
884}
885
886void LocalBroker::stopStdout()
887{
888 Thread::stop();
889 if (fd_stdout) {
890 fclose(fd_stdout);
891 }
892 fd_stdout = nullptr;
893}
894
895
896
897int LocalBroker::ExecuteCmd()
898{
899 int pipe_child_to_parent[2];
900 int ret = pipe(pipe_child_to_parent);
901 if (ret!=0)
902 {
903 strError = std::string("Can't create child pipe because") + std::string(strerror(errno));
904 return 0;
905 }
906
907 ret = pipe(pipe_to_stdout);
908 if (ret!=0)
909 {
910 strError = std::string("Can't create stdout pipe because") + std::string(strerror(errno));
911 return 0;
912 }
913
914 int pid_cmd = fork();
915
916 if(IS_INVALID(pid_cmd))
917 {
918 strError = std::string("Can't fork command because ") + std::string(strerror(errno));
919 return 0;
920 }
921
922 if (IS_NEW_PROCESS(pid_cmd)) // RUN COMMAND HERE
923 {
924 close(pipe_child_to_parent[READ_FROM_PIPE]);
925 //int saved_stderr = dup(STDERR_FILENO);
926 dup2(pipe_to_stdout[WRITE_TO_PIPE], STDOUT_FILENO);
927 dup2(pipe_to_stdout[WRITE_TO_PIPE], STDERR_FILENO);
928 if (fcntl(STDOUT_FILENO, F_SETFL, fcntl(STDOUT_FILENO, F_GETFL) | O_NONBLOCK) == -1) {
929 strError = std::string("Can't set flag on stdout: ") + std::string(strerror(errno));
930 return 0;
931 }
932 if (fcntl(STDERR_FILENO, F_SETFL, fcntl(STDERR_FILENO, F_GETFL) | O_NONBLOCK) == -1) {
933 strError = std::string("Can't set flag on stderr: ") + std::string(strerror(errno));
934 return 0;
935 }
936
937 close(pipe_to_stdout[WRITE_TO_PIPE]);
938 close(pipe_to_stdout[READ_FROM_PIPE]);
939
940 strCmd = strCmd + std::string(" ") + strParam;
941 char *szcmd = new char[strCmd.size()+1];
942 strcpy(szcmd,strCmd.c_str());
943 int nargs = 0;
944 char **szarg = new char*[C_MAXARGS + 1];
945 parseArguments(szcmd, &nargs, szarg);
946 szarg[nargs]=nullptr;
947 if(strEnv.size())
948 {
949 auto ss = yarp::conf::string::split(strEnv, ';');
950 for (const auto& s : ss) {
951 char* szenv = new char[s.size()+1];
952 strcpy(szenv, s.c_str());
953 putenv(szenv); // putenv doesn't make copy of the string
954 }
955 //delete szenv;
956 }
957
958 if(strWorkdir.size())
959 {
960 int ret = chdir(strWorkdir.c_str());
961 if (ret!=0)
962 {
963 strError = std::string("Can't set working directory because ") + std::string(strerror(errno));
964 FILE* out_to_parent = fdopen(pipe_child_to_parent[WRITE_TO_PIPE],"w");
965 fprintf(out_to_parent,"%s", strError.c_str());
966 fflush(out_to_parent);
967 fclose(out_to_parent);
968 close(pipe_child_to_parent[WRITE_TO_PIPE]);
969 delete [] szcmd;
970 delete [] szarg;
971 std::exit(ret);
972 }
973 }
974
975 char currWorkDirBuff[1024];
976 char *currWorkDir = getcwd(currWorkDirBuff,1024);
977
978 ret = 0;
979 if (currWorkDir)
980 {
981 char **cwd_szarg=new char*[nargs+1];
982 for (int i = 1; i < nargs; ++i) {
983 cwd_szarg[i] = szarg[i];
984 }
985 cwd_szarg[nargs]=nullptr;
986 cwd_szarg[0]=new char[strlen(currWorkDir)+strlen(szarg[0])+16];
987
988 strcpy(cwd_szarg[0],currWorkDir);
989 strcat(cwd_szarg[0],"/");
990 strcat(cwd_szarg[0],szarg[0]);
991 ret=execvp(cwd_szarg[0],cwd_szarg);
992 delete [] cwd_szarg[0];
993 delete [] cwd_szarg;
994 }
995
996 if (ret==-1)
997 {
998 ret=execvp(szarg[0],szarg);
999 }
1000
1001 if (ret==-1)
1002 {
1003 strError = std::string("Can't execute command because ") + std::string(strerror(errno));
1004 FILE* out_to_parent = fdopen(pipe_child_to_parent[WRITE_TO_PIPE],"w");
1005 fprintf(out_to_parent,"%s", strError.c_str());
1006 fflush(out_to_parent);
1007 fclose(out_to_parent);
1008 }
1009 close(pipe_child_to_parent[WRITE_TO_PIPE]);
1010 delete [] szcmd;
1011 delete [] szarg;
1012 ::exit(ret);
1013 }
1014
1015 if (IS_PARENT_OF(pid_cmd))
1016 {
1017 close(pipe_child_to_parent[WRITE_TO_PIPE]);
1018 FILE* in_from_child = fdopen(pipe_child_to_parent[READ_FROM_PIPE],"r");
1019 int flags=fcntl(pipe_child_to_parent[READ_FROM_PIPE],F_GETFL,0);
1020 if (fcntl(pipe_child_to_parent[READ_FROM_PIPE],F_SETFL,flags|O_NONBLOCK) == -1)
1021 {
1022 strError = std::string("Can't set flag on pipe: ") + std::string(strerror(errno));
1023 fclose(in_from_child);
1024 return 0;
1025 }
1026
1027 std::string retError;
1028 waitPipe(pipe_child_to_parent[READ_FROM_PIPE]);
1029
1030 for (char buff[1024]; fgets(buff, 1024, in_from_child);) {
1031 retError += std::string(buff);
1032 }
1033 fclose(in_from_child);
1034
1035 if(retError.size())
1036 {
1037 strError = retError;
1038 close(pipe_child_to_parent[READ_FROM_PIPE]);
1039 return 0;
1040 }
1041
1042 close(pipe_to_stdout[WRITE_TO_PIPE]);
1043 close(pipe_child_to_parent[READ_FROM_PIPE]);
1044 return pid_cmd;
1045 }
1046
1047 return 0;
1048}
1049
1053void LocalBroker::splitLine(char *pLine, char **pArgs)
1054{
1055 char *pTmp = strchr(pLine, ' ');
1056
1057 if (pTmp) {
1058 *pTmp = '\0';
1059 pTmp++;
1060 while ((*pTmp) && (*pTmp == ' ')) {
1061 pTmp++;
1062 }
1063 if (*pTmp == '\0') {
1064 pTmp = nullptr;
1065 }
1066 }
1067 *pArgs = pTmp;
1068}
1069
1070
1071
1075void LocalBroker::parseArguments(char *io_pLine, int *o_pArgc, char **o_pArgv)
1076{
1077 char *pNext = io_pLine;
1078 size_t i;
1079 int j;
1080 int quoted = 0;
1081 size_t len = strlen(io_pLine);
1082
1083 // Protect spaces inside quotes, but lose the quotes
1084 for(i = 0; i < len; i++) {
1085 if ((!quoted) && ('"' == io_pLine[i])) {
1086 quoted = 1;
1087 io_pLine[i] = ' ';
1088 } else if ((quoted) && ('"' == io_pLine[i])) {
1089 quoted = 0;
1090 io_pLine[i] = ' ';
1091 } else if ((quoted) && (' ' == io_pLine[i])) {
1092 io_pLine[i] = '\1';
1093 }
1094 }
1095
1096 // init
1097 memset(o_pArgv, 0x00, sizeof(char*) * C_MAXARGS);
1098 *o_pArgc = 1;
1099 o_pArgv[0] = io_pLine;
1100
1101 while ((nullptr != pNext) && (*o_pArgc < C_MAXARGS)) {
1102 splitLine(pNext, &(o_pArgv[*o_pArgc]));
1103 pNext = o_pArgv[*o_pArgc];
1104
1105 if (nullptr != o_pArgv[*o_pArgc]) {
1106 *o_pArgc += 1;
1107 }
1108 }
1109
1110 for(j = 0; j < *o_pArgc; j++) {
1111 len = strlen(o_pArgv[j]);
1112 for(i = 0; i < len; i++) {
1113 if('\1' == o_pArgv[j][i]) {
1114 o_pArgv[j][i] = ' ';
1115 }
1116 }
1117 }
1118}
1119#endif
bool ret
void * HANDLE
void parseArguments(char *io_pLine, int *o_pArgc, char **o_pArgv)
Breaks up a line into multiple arguments.
Definition: Run.cpp:2422
void splitLine(char *pLine, char **pArgs)
Split a line into separate words.
Definition: Run.cpp:2402
virtual void onBrokerStdout(const char *msg)
Definition: broker.h:22
std::string strDisplay
Definition: broker.h:70
const char * getDisplay() const
Definition: broker.h:65
BrokerEventSink * eventSink
Definition: broker.h:68
const char * requestRpc(const char *szport, const char *request, double timeout) override
bool exists(const char *port) override
bool connected(const char *from, const char *to, const char *carrier) override
void run() override
Main body of the new thread.
void setWindowMode(WindowMode m)
Define if the application will be visible or not.
bool threadInit() override
Initialization method.
bool disconnect(const char *from, const char *to, const char *carrier) override
void threadRelease() override
Release method.
bool connect(const char *from, const char *to, const char *carrier, bool persist=false) override
connection broker
void detachStdout() override
const char * error() override
bool attachStdout() override
A simple collection of objects that can be described and transmitted in a portable way.
Definition: Bottle.h:64
void fromString(const std::string &text)
Initializes bottle from a string.
Definition: Bottle.cpp:204
size_type size() const
Gets the number of elements in the bottle.
Definition: Bottle.cpp:251
std::string toString() const override
Gives a human-readable textual representation of the bottle.
Definition: Bottle.cpp:211
Preferences for how to communicate with a contact.
Definition: ContactStyle.h:23
double timeout
Set a timeout for communication (in units of seconds, fractional seconds allowed).
Definition: ContactStyle.h:46
bool quiet
Suppress all outputs and warnings.
Definition: ContactStyle.h:35
virtual std::string getName() const
Get name of port.
Definition: Contactable.cpp:14
A mini-server for network communication.
Definition: Port.h:46
bool write(const PortWriter &writer, const PortWriter *callback=nullptr) const override
Write an object to the port.
Definition: Port.cpp:436
bool setTimeout(float timeout)
Set a timeout on network operations.
Definition: Port.cpp:634
void close() override
Stop port activity.
Definition: Port.cpp:363
bool open(const std::string &name) override
Start port operation, with a specific name, with automatically-chosen network parameters.
Definition: Port.cpp:79
static void delaySystem(double seconds)
Definition: SystemClock.cpp:29
#define KILL_TIMEOUT
Definition: localbroker.cpp:14
#define CONNECTION_TIMEOUT
Definition: localbroker.cpp:15
#define PIPE_TIMEOUT
Definition: localbroker.cpp:32
#define WRITE_TO_PIPE
Definition: localbroker.cpp:17
#define PIPE_EVENT
Definition: localbroker.cpp:33
#define READ_FROM_PIPE
Definition: localbroker.cpp:18
#define STOP_TIMEOUT
Definition: localbroker.cpp:13
#define C_MAXARGS
Definition: localbroker.cpp:35
ContainerT split(const typename ContainerT::value_type &s, std::basic_regex< typename ContainerT::value_type::value_type > regex)
Utility to split a string by a separator, into a vector of strings.
Definition: string.h:26
An interface to the operating system, including Port based communication.
char * getcwd(char *buf, size_t size)
Portable wrapper for the getcwd() function.
Definition: Os.cpp:112
int fork()
Portable wrapper for the fork() function.
Definition: Os.cpp:159