YARP
Yet Another Robot Platform
localbroker.cpp
Go to the documentation of this file.
1 /*
2  * SPDX-FileCopyrightText: 2006-2021 Istituto Italiano di Tecnologia (IIT)
3  * SPDX-License-Identifier: BSD-3-Clause
4  */
5 
7 #include <yarp/conf/string.h>
8 
9 #include <csignal>
10 #include <cstring>
11 
12 #define RUN_TIMEOUT 10.0 //seconds
13 #define STOP_TIMEOUT 15.0
14 #define KILL_TIMEOUT 10.0
15 #define CONNECTION_TIMEOUT 2.0
16 
17 #define WRITE_TO_PIPE 1
18 #define READ_FROM_PIPE 0
19 
20 #if defined(_WIN32)
21  #include<Windows.h>
22  #define SIGKILL 9
23 #else
24  #include <cstdlib>
25  #include <sys/types.h>
26  #include <sys/stat.h>
27  #include <fcntl.h>
28  #include <cerrno>
29  #include <unistd.h>
30  #include <cstring>
31 
32  #define PIPE_TIMEOUT 0
33  #define PIPE_EVENT 1
34  #define PIPE_SIGNALED 2
35  #define C_MAXARGS 128 // max number of the command parameters
36 #endif
37 
38 using namespace yarp::os;
39 using namespace yarp::manager;
40 
41 
42 #if defined(_WIN32)
43 class LocalTerminateParams
44 {
45 public:
46  LocalTerminateParams(DWORD id) {
47  nWin = 0;
48  dwID = id;
49  }
50 
51  ~LocalTerminateParams(){}
52  int nWin;
53  DWORD dwID;
54 };
55 
56 BOOL CALLBACK LocalTerminateAppEnum(HWND hwnd, LPARAM lParam)
57 {
58  LocalTerminateParams* params=(LocalTerminateParams*)lParam;
59  DWORD dwID;
60  GetWindowThreadProcessId(hwnd, &dwID);
61  if (dwID==params->dwID)
62  {
63  params->nWin++;
64  PostMessage(hwnd,WM_CLOSE,0,0);
65  }
66  return TRUE ;
67 }
68 #if defined(_WIN64)
69 volatile LONGLONG uniquePipeNumber = 0;
70 #else
71 volatile LONG uniquePipeNumber = 0;
72 #endif
73 
74 /*
75 * TODO: check deeply for asyn PIPE
76 */
77 BOOL CreatePipeAsync(
78  OUT LPHANDLE lpReadPipe,
79  OUT LPHANDLE lpWritePipe,
80  IN LPSECURITY_ATTRIBUTES lpPipeAttributes,
81  IN DWORD nSize)
82 {
83  HANDLE ReadPipeHandle, WritePipeHandle;
84  DWORD dwError;
85  char PipeNameBuffer[MAX_PATH];
86  nSize = (nSize ==0) ? 100*8096: nSize;
87 
88 #if defined(_WIN64)
89  InterlockedIncrement64(&uniquePipeNumber);
90 #else
91  InterlockedIncrement(&uniquePipeNumber);
92 #endif
93 
94  sprintf( PipeNameBuffer,
95  "\\\\.\\Pipe\\RemoteExeAnon.%08x.%08x",
96  GetCurrentProcessId(),
97  uniquePipeNumber
98  );
99 
100  ReadPipeHandle = CreateNamedPipeA(
101  (LPSTR)PipeNameBuffer,
102  PIPE_ACCESS_INBOUND | FILE_FLAG_OVERLAPPED,
103  PIPE_TYPE_BYTE | PIPE_WAIT, //PIPE_NOWAIT,
104  1, // Number of pipes
105  nSize, // Out buffer size
106  nSize, // In buffer size
107  120 * 1000, // Timeout in ms
108  lpPipeAttributes
109  );
110 
111  if (! ReadPipeHandle) {
112  return FALSE;
113  }
114 
115  WritePipeHandle = CreateFileA(
116  (LPSTR)PipeNameBuffer,
117  GENERIC_WRITE,
118  0, // No sharing
119  lpPipeAttributes,
120  OPEN_EXISTING,
121  FILE_ATTRIBUTE_NORMAL | FILE_FLAG_OVERLAPPED,
122  nullptr // Template file
123  );
124 
125  if (INVALID_HANDLE_VALUE == WritePipeHandle)
126  {
127  dwError = GetLastError();
128  CloseHandle( ReadPipeHandle );
129  SetLastError(dwError);
130  return FALSE;
131  }
132 
133  *lpReadPipe = ReadPipeHandle;
134  *lpWritePipe = WritePipeHandle;
135  return( TRUE );
136 }
137 
138 #endif
139 
140 LocalBroker::LocalBroker()
141 {
142  bOnlyConnector = bInitialized = false;
143  ID = 0;
144  fd_stdout = nullptr;
145  setWindowMode(WINDOW_HIDDEN);
146 }
147 
148 
149 LocalBroker::~LocalBroker()
150 {
151  fini();
152 }
153 
154 void LocalBroker::fini()
155 {
156  if (Thread::isRunning()) {
157  Thread::stop();
158  }
159 }
160 
161 bool LocalBroker::init()
162 {
163  /*
164  if(!NetworkBase::checkNetwork(5.0))
165  {
166  strError = "YARP network server is not up.";
167  return false;
168  }
169  */
170  bInitialized = true;
171  bOnlyConnector = true;
172  return true;
173 }
174 
175 bool LocalBroker::init(const char* szcmd, const char* szparam,
176  const char* szhost, const char* szstdio,
177  const char* szworkdir, const char* szenv )
178 {
179 
180  strCmd.clear();
181  strParam.clear();
182  strHost.clear();
183  strStdio.clear();
184  strWorkdir.clear();
185  strTag.clear();
186  strEnv.clear();
187 
188  if(!szcmd)
189  {
190  strError = "command is not specified.";
191  return false;
192  }
193  strCmd = szcmd;
194  if (szparam && strlen(szparam)) {
195  strParam = szparam;
196  }
197 
198  if (szhost && strlen(szhost)) {
199  strHost = szhost;
200  }
201  if (szworkdir && strlen(szworkdir)) {
202  strWorkdir = szworkdir;
203  }
204 
205  if(szstdio && strlen(szstdio))
206  {
207  if (szstdio[0] != '/') {
208  strStdio = std::string("/") + std::string(szstdio);
209  } else {
210  strStdio = szstdio;
211  }
212  }
213 
214  if (szenv && strlen(szenv)) {
215  strEnv = szenv;
216  }
217 
218  /*
219  OSTRINGSTREAM sstrID;
220  sstrID<<ID;
221  strTag = strHost + strCmd + sstrID.str();
222 
223  if(!NetworkBase::checkNetwork(5.0))
224  {
225  strError = "YARP network server is not up.";
226  semParam.post();
227  return false;
228  }
229  */
230 
231 #if defined(_WIN32)
232  // do nothing
233  bInitialized = true;
234  return true;
235 #else
236  /* avoiding zombie */
237  struct sigaction new_action;
238  new_action.sa_handler = SIG_IGN;
239  sigemptyset (&new_action.sa_mask);
240  new_action.sa_flags = 0;
241  sigaction (SIGCHLD, &new_action, nullptr);
242  bInitialized = true;
243  return true;
244 #endif
245 
246 }
247 
248 
249 bool LocalBroker::start()
250 {
251  if (!bInitialized) {
252  return false;
253  }
254  if (bOnlyConnector) {
255  return false;
256  }
257 
258  if (running()) {
259  return true;
260  }
261 
262  strError.clear();
263  ID = ExecuteCmd();
264  if (!ID) {
265  return false;
266  }
267 
268  if(running())
269  {
270  return true;
271  }
272  return false;
273 }
274 
275 bool LocalBroker::stop()
276 {
277  if (!bInitialized) {
278  return true;
279  }
280  if (bOnlyConnector) {
281  return false;
282  }
283 
284  strError.clear();
285 #if defined(_WIN32)
286  stopCmd(ID);
287  stopStdout();
288 #else
289  stopStdout();
290  stopCmd(ID);
291 #endif
292 
293  double base = SystemClock::nowSystem();
294  while(!timeout(base, STOP_TIMEOUT))
295  {
296  if (!running()) {
297  return true;
298  }
299  }
300 
301  strError = "Timeout! cannot stop ";
302  strError += strCmd;
303  strError += " on ";
304  strError += strHost;
305  return false;
306 }
307 
308 bool LocalBroker::kill()
309 {
310  if (!bInitialized) {
311  return true;
312  }
313  if (bOnlyConnector) {
314  return false;
315  }
316 
317  strError.clear();
318 
319 #if defined(_WIN32)
320  killCmd(ID);
321  stopStdout();
322 #else
323  stopStdout();
324  stopCmd(ID);
325 #endif
326 
327  double base = SystemClock::nowSystem();
328  while(!timeout(base, KILL_TIMEOUT))
329  {
330  if (!running()) {
331  return true;
332  }
333  }
334 
335  strError = "Timeout! cannot kill ";
336  strError += strCmd;
337  strError += " on ";
338  strError += strHost;
339  return false;
340 }
341 
342 
343 int LocalBroker::running()
344 {
345  if (!bInitialized) {
346  return 0;
347  }
348  if (bOnlyConnector) {
349  return 0;
350  }
351  return (psCmd(ID))?1:0;
352 }
353 
354 
358 bool LocalBroker::connect(const char* from, const char* to,
359  const char* carrier, bool persist)
360 {
361 
362  if(!from)
363  {
364  strError = "no source port is introduced.";
365  return false;
366  }
367 
368  if(!to)
369  {
370  strError = "no destination port is introduced.";
371  return false;
372  }
373 
374  if(!exists(from))
375  {
376  strError = from;
377  strError += " does not exist.";
378  return false;
379  }
380 
381  if(!exists(to))
382  {
383  strError = to;
384  strError += " does not exist.";
385  return false;
386  }
387 
388  if(!NetworkBase::connect(from, to, carrier) || !connected(from, to, carrier))
389  {
390  strError = "cannot connect ";
391  strError +=from;
392  strError += " to " + std::string(to);
393  return false;
394  }
395  return true;
396 }
397 
398 bool LocalBroker::disconnect(const char* from, const char* to, const char *carrier)
399 {
400 
401  if(!from)
402  {
403  strError = "no source port is introduced.";
404  return false;
405  }
406 
407  if(!to)
408  {
409  strError = "no destination port is introduced.";
410  return false;
411  }
412 
413  if(!exists(from))
414  {
415  strError = from;
416  strError += " does not exist.";
417  return true;
418  }
419 
420  if(!exists(to))
421  {
422  strError = to;
423  strError += " does not exist.";
424  return true;
425  }
426 
427  if (!connected(from, to, carrier)) {
428  return true;
429  }
430 
431  if(!NetworkBase::disconnect(from, to))
432  {
433  strError = "cannot disconnect ";
434  strError +=from;
435  strError += " from " + std::string(to);
436  return false;
437  }
438  return true;
439 
440 }
441 
442 bool LocalBroker::exists(const char* port)
443 {
444  return NetworkBase::exists(port);
445 }
446 
447 
448 const char* LocalBroker::requestRpc(const char* szport, const char* request, double timeout)
449 {
450  if ((szport == nullptr) || (request == nullptr)) {
451  return nullptr;
452  }
453 
454  if (!exists(szport)) {
455  return nullptr;
456  }
457 
458  // opening the port
459  yarp::os::Port port;
460  port.setTimeout((float)((timeout>0.0) ? timeout : CONNECTION_TIMEOUT));
461  if (!port.open("...")) {
462  return nullptr;
463  }
464 
465  ContactStyle style;
466  style.quiet = true;
467  style.timeout = (timeout>0.0) ? timeout : CONNECTION_TIMEOUT;
468  bool ret;
469  for(int i=0; i<10; i++) {
470  ret = NetworkBase::connect(port.getName(), szport, style);
471  if (ret) {
472  break;
473  }
475  }
476 
477  if(!ret) {
478  port.close();
479  return nullptr;
480  }
481 
482  Bottle msg, response;
483  msg.fromString(request);
484  ret = port.write(msg, response);
485  NetworkBase::disconnect(port.getName(), szport);
486  if(!response.size() || !ret) {
487  port.close();
488  return nullptr;
489  }
490 
491  port.close();
492  return response.toString().c_str();
493 }
494 
495 bool LocalBroker::connected(const char* from, const char* to, const char* carrier)
496 {
497  if (!exists(from) || !exists(to)) {
498  return false;
499  }
500  return NetworkBase::isConnected(from, to);
501 }
502 
503 
504 const char* LocalBroker::error()
505 {
506  return strError.c_str();
507 }
508 
509 bool LocalBroker::attachStdout()
510 {
511  if (Thread::isRunning()) {
512  return true;
513  }
514  if(!running())
515  {
516  strError = "Module is not running";
517  return false;
518  }
519  return startStdout();
520 }
521 
522 void LocalBroker::detachStdout()
523 {
524  stopStdout();
525 }
526 
527 
528 bool LocalBroker::timeout(double base, double timeout)
529 {
531  if ((SystemClock::nowSystem() - base) > timeout) {
532  return true;
533  }
534  return false;
535 }
536 
537 bool LocalBroker::threadInit()
538 {
539  return true;
540 }
541 
542 
543 void LocalBroker::run()
544 {
545 
546 #if defined(_WIN32)
547  //windows implementation
548  DWORD dwRead;
549  CHAR buff[1024];
550  while(!Thread::isStopping())
551  {
552  BOOL bRet = ReadFile(read_from_pipe_cmd_to_stdout,
553  buff, 1023, &dwRead, nullptr);
554  if(!bRet)
555  break;
556  buff[dwRead] = (CHAR)0;
557  if(eventSink && strlen(buff))
558  eventSink->onBrokerStdout(buff);
559  yarp::os::SystemClock::delaySystem(0.5); // this prevents event flooding
560  }
561 #else
562  while(!Thread::isStopping())
563  {
564  if(waitPipeSignal(pipe_to_stdout[READ_FROM_PIPE]) == PIPE_EVENT)
565  {
566  if(fd_stdout)
567  {
568  std::string strmsg;
569  char buff[1024];
570  while (fgets(buff, 1024, fd_stdout)) {
571  strmsg += std::string(buff);
572  }
573  if (eventSink && strmsg.size()) {
574  eventSink->onBrokerStdout(strmsg.c_str());
575  }
576  yarp::os::SystemClock::delaySystem(0.5); // this prevents event flooding
577  }
578  }
579  }
580 #endif
581 }
582 
583 
584 void LocalBroker::threadRelease()
585 {
586 }
587 
588 
589 void LocalBroker::setWindowMode(WindowMode m)
590 {
591  windowMode=m;
592 }
593 
594 
595 #if defined(_WIN32)
596 
597 std::string LocalBroker::lastError2String()
598 {
599  int error=GetLastError();
600  char buff[1024];
601  FormatMessage(FORMAT_MESSAGE_FROM_SYSTEM,nullptr,error,0,buff,1024,nullptr);
602  return std::string(buff);
603 }
604 
605 bool LocalBroker::startStdout()
606 {
607  if (!CloseHandle(write_to_pipe_cmd_to_stdout))
608  return false;
609  Thread::start();
610  return true;
611 }
612 
613 void LocalBroker::stopStdout()
614 {
615  Thread::stop();
616 }
617 
618 int LocalBroker::ExecuteCmd()
619 {
620  std::string strCmdLine = strCmd + std::string(" ") + strParam;
621 
622  PROCESS_INFORMATION cmd_process_info;
623  STARTUPINFO cmd_startup_info;
624  ZeroMemory(&cmd_process_info,sizeof(PROCESS_INFORMATION));
625  ZeroMemory(&cmd_startup_info,sizeof(STARTUPINFO));
626  cmd_startup_info.cb = sizeof(STARTUPINFO);
627 
628 
629  std::string strDisplay=getDisplay();
630 
631  DWORD dwCreationFlags;
632 
633  //these come from xml
634  /*
635  // These are not supported until we find a way to send break signals to
636  // consoles that are not inherited
637  if (strDisplay=="--visible_na")
638  windowMode=WINDOW_VISIBLE;
639  if (strDisplay=="--hidden")
640  windowMode=WINDOW_HIDDEN;
641  */
642 
643  // this is for "attach to stoud only"
644  if (windowMode==WINDOW_VISIBLE)
645  {
646  //this is common to all processes
647  cmd_startup_info.dwFlags |= STARTF_USESHOWWINDOW;
648  cmd_startup_info.wShowWindow = SW_SHOWNA;
649  dwCreationFlags=CREATE_NEW_CONSOLE;
650  }
651  if (windowMode==WINDOW_HIDDEN)
652  {
653  // Setting up child process and pipe for stdout (useful for attaching stdout)
654  SECURITY_ATTRIBUTES pipe_sec_attr;
655  pipe_sec_attr.nLength = sizeof(SECURITY_ATTRIBUTES);
656  pipe_sec_attr.bInheritHandle = TRUE;
657  pipe_sec_attr.lpSecurityDescriptor = nullptr;
658  CreatePipeAsync(&read_from_pipe_cmd_to_stdout,
659  &write_to_pipe_cmd_to_stdout,
660  &pipe_sec_attr, 0);
661 
662  cmd_startup_info.hStdError = write_to_pipe_cmd_to_stdout;
663  cmd_startup_info.hStdOutput = write_to_pipe_cmd_to_stdout;
664 
665  cmd_startup_info.dwFlags |= STARTF_USESTDHANDLES;
666 
667  dwCreationFlags=CREATE_NEW_PROCESS_GROUP; //CREATE_NEW_CONSOLE|CREATE_NEW_PROCESS_GROUP,
668  }
669 
670 
671 
672  /*
673  * setting environment variable for child process
674  */
675  TCHAR chNewEnv[32767];
676 
677  // Get a pointer to the env block.
678  LPTCH chOldEnv = GetEnvironmentStrings();
679 
680  // copying parent env variables
681  LPTSTR lpOld = (LPTSTR) chOldEnv;
682  LPTSTR lpNew = (LPTSTR) chNewEnv;
683  while (*lpOld)
684  {
685  lstrcpy(lpNew, lpOld);
686  lpOld += lstrlen(lpOld) + 1;
687  lpNew += lstrlen(lpNew) + 1;
688  }
689 
690  // adding new env variables
691  std::string cstrEnvName;
692  if(strEnv.size())
693  {
694  auto ss = yarp::conf::string::split(strEnv, ';');
695  for (const auto& s : ss) {
696  lstrcpy(lpNew, (LPTCH) s.c_str());
697  lpNew += lstrlen(lpNew) + 1;
698  }
699  }
700 
701  // closing env block
702  *lpNew = (TCHAR)0;
703 
704  bool bWorkdir=(strWorkdir.size()) ? true : false;
705  std::string strWorkdirOk = bWorkdir ? strWorkdir+std::string("\\") : "";
706 
707  BOOL bSuccess=CreateProcess(nullptr, // command name
708  (char*)(strWorkdirOk+strCmdLine).c_str(), // command line
709  nullptr, // process security attributes
710  nullptr, // primary thread security attributes
711  TRUE, // handles are inherited
712  dwCreationFlags,
713  (LPVOID) chNewEnv, // use new environment
714  bWorkdir?strWorkdirOk.c_str():nullptr, // working directory
715  &cmd_startup_info, // STARTUPINFO pointer
716  &cmd_process_info); // receives PROCESS_INFORMATION
717 
718  if (!bSuccess && bWorkdir)
719  {
720  bSuccess=CreateProcess(nullptr, // command name
721  (char*)(strCmdLine.c_str()), // command line
722  nullptr, // process security attributes
723  nullptr, // primary thread security attributes
724  TRUE, // handles are inherited
725  dwCreationFlags,
726  (LPVOID) chNewEnv, // use new environment
727  strWorkdirOk.c_str(), // working directory
728  &cmd_startup_info, // STARTUPINFO pointer
729  &cmd_process_info); // receives PROCESS_INFORMATION
730  }
731 
732  // deleting old environment variable
733  FreeEnvironmentStrings(chOldEnv);
734 
735  CloseHandle(cmd_process_info.hProcess);
736  CloseHandle(cmd_process_info.hThread);
737 
738  if (!bSuccess)
739  {
740  strError = std::string("Can't execute command because ") + lastError2String();
741  return 0;
742  }
743 
744  return cmd_process_info.dwProcessId;
745 }
746 
747 bool LocalBroker::psCmd(int pid)
748 {
749  HANDLE hProc=OpenProcess(SYNCHRONIZE|PROCESS_QUERY_INFORMATION, FALSE, pid);
750  if (hProc==nullptr)
751  return false;
752 
753  DWORD status;
754  GetExitCodeProcess(hProc , &status);
755  CloseHandle(hProc);
756  return (status==STILL_ACTIVE);
757 }
758 
759 bool LocalBroker::killCmd(int pid)
760 {
761  HANDLE hProc=OpenProcess(SYNCHRONIZE|PROCESS_TERMINATE, FALSE, pid);
762  if (hProc==nullptr)
763  return false;
764 
765  BOOL bRet = TerminateProcess(hProc, 0);
766  CloseHandle(hProc);
767  return bRet ? true : false;
768 }
769 
770 bool LocalBroker::stopCmd(int pid)
771 {
772  HANDLE hProc=OpenProcess(SYNCHRONIZE|PROCESS_TERMINATE, FALSE, pid);
773  if (hProc==nullptr)
774  return false;
775 
776  LocalTerminateParams params(pid);
777  EnumWindows((WNDENUMPROC)LocalTerminateAppEnum,(LPARAM)&params);
778 
779  // I believe we do not need this. It is ignored by console applications created with CREATE_NEW_PROCESS_GROUP
780  GenerateConsoleCtrlEvent(CTRL_C_EVENT, pid);
781 
782  //send BREAK_EVENT because we created the process with CREATE_NEW_PROCESS_GROUP
783  GenerateConsoleCtrlEvent(CTRL_BREAK_EVENT, pid);
784 
785  CloseHandle(hProc);
786  return true;
787 }
788 
789 #else //for UNIX
790 
791 bool LocalBroker::psCmd(int pid)
792 {
793  if (!pid) {
794  return false;
795  }
796  return !::kill(pid, 0);
797 }
798 
799 
800 bool LocalBroker::killCmd(int pid)
801 {
802  if (!pid) {
803  return false;
804  }
805  return !::kill(pid, SIGKILL);
806 }
807 
808 
809 bool LocalBroker::stopCmd(int pid)
810 {
811  if (!pid) {
812  return false;
813  }
814  return !::kill(pid, SIGTERM);
815 }
816 
817 int LocalBroker::waitPipe(int pipe_fd)
818 {
819  struct timeval timeout;
820  int rc;
821  fd_set fd;
822 
823  timeout.tv_sec = 0;
824  timeout.tv_usec = 500000;
825 
826  FD_ZERO(&fd);
827  FD_SET(pipe_fd, &fd);
828  rc = select(pipe_fd + 1, &fd, nullptr, nullptr, &timeout);
829  return rc;
830 }
831 
832 
833 int LocalBroker::waitPipeSignal(int pipe_fd)
834 {
835  struct timespec timeout;
836  fd_set fd;
837 
838  timeout.tv_sec = 2;
839  timeout.tv_nsec = 0;
840  FD_ZERO(&fd);
841  FD_SET(pipe_fd, &fd);
842 
843  /*
844 #if (_POSIX_C_SOURCE >= 200112L) || (_XOPEN_SOURCE >= 600)
845  struct sigaction new_action;
846  new_action.sa_handler = SIG_IGN;
847  sigemptyset (&new_action.sa_mask);
848  new_action.sa_flags = 0;
849  sigaction (SIGUSR1, &new_action, nullptr);
850  sigset_t sset, orgmask;
851  sigemptyset(&sset);
852  sigaddset(&sset, SIGUSR1);
853  pthread_sigmask(SIG_BLOCK, &sset, &orgmask);
854  if(pselect(pipe_fd + 1, &fd, nullptr, nullptr, &timeout, &orgmask))
855  return PIPE_EVENT;
856 #endif
857 */
858  if (pselect(pipe_fd + 1, &fd, nullptr, nullptr, &timeout, nullptr)) {
859  return PIPE_EVENT;
860  }
861  return PIPE_TIMEOUT;
862 }
863 
864 
865 bool LocalBroker::startStdout()
866 {
867  fd_stdout = fdopen(pipe_to_stdout[READ_FROM_PIPE], "r");
868  if(!fd_stdout)
869  {
870  strError = "cannot open pipe. " + std::string(strerror(errno));
871  //close(pipe_to_stdout[READ_FROM_PIPE]);
872  return false;
873  }
874 
875  int oflags = fcntl(pipe_to_stdout[READ_FROM_PIPE], F_GETFL);
876  if(fcntl(pipe_to_stdout[READ_FROM_PIPE], F_SETFL, oflags|O_NONBLOCK) == -1)
877  {
878  strError = "cannot set flag on pipe: " + std::string(strerror(errno));
879  //close(pipe_to_stdout[READ_FROM_PIPE]);
880  return false;
881  }
882  Thread::start();
883  return true;
884 }
885 
886 void LocalBroker::stopStdout()
887 {
888  Thread::stop();
889  if (fd_stdout) {
890  fclose(fd_stdout);
891  }
892  fd_stdout = nullptr;
893 }
894 
895 
896 
897 int LocalBroker::ExecuteCmd()
898 {
899  int pipe_child_to_parent[2];
900  int ret = pipe(pipe_child_to_parent);
901  if (ret!=0)
902  {
903  strError = std::string("Can't create child pipe because") + std::string(strerror(errno));
904  return 0;
905  }
906 
907  ret = pipe(pipe_to_stdout);
908  if (ret!=0)
909  {
910  strError = std::string("Can't create stdout pipe because") + std::string(strerror(errno));
911  return 0;
912  }
913 
914  int pid_cmd = fork();
915 
916  if(IS_INVALID(pid_cmd))
917  {
918  strError = std::string("Can't fork command because ") + std::string(strerror(errno));
919  return 0;
920  }
921 
922  if (IS_NEW_PROCESS(pid_cmd)) // RUN COMMAND HERE
923  {
924  close(pipe_child_to_parent[READ_FROM_PIPE]);
925  //int saved_stderr = dup(STDERR_FILENO);
926  dup2(pipe_to_stdout[WRITE_TO_PIPE], STDOUT_FILENO);
927  dup2(pipe_to_stdout[WRITE_TO_PIPE], STDERR_FILENO);
928  if (fcntl(STDOUT_FILENO, F_SETFL, fcntl(STDOUT_FILENO, F_GETFL) | O_NONBLOCK) == -1) {
929  strError = std::string("Can't set flag on stdout: ") + std::string(strerror(errno));
930  return 0;
931  }
932  if (fcntl(STDERR_FILENO, F_SETFL, fcntl(STDERR_FILENO, F_GETFL) | O_NONBLOCK) == -1) {
933  strError = std::string("Can't set flag on stderr: ") + std::string(strerror(errno));
934  return 0;
935  }
936 
937  close(pipe_to_stdout[WRITE_TO_PIPE]);
938  close(pipe_to_stdout[READ_FROM_PIPE]);
939 
940  strCmd = strCmd + std::string(" ") + strParam;
941  char *szcmd = new char[strCmd.size()+1];
942  strcpy(szcmd,strCmd.c_str());
943  int nargs = 0;
944  char **szarg = new char*[C_MAXARGS + 1];
945  parseArguments(szcmd, &nargs, szarg);
946  szarg[nargs]=nullptr;
947  if(strEnv.size())
948  {
949  auto ss = yarp::conf::string::split(strEnv, ';');
950  for (const auto& s : ss) {
951  char* szenv = new char[s.size()+1];
952  strcpy(szenv, s.c_str());
953  putenv(szenv); // putenv doesn't make copy of the string
954  }
955  //delete szenv;
956  }
957 
958  if(strWorkdir.size())
959  {
960  int ret = chdir(strWorkdir.c_str());
961  if (ret!=0)
962  {
963  strError = std::string("Can't set working directory because ") + std::string(strerror(errno));
964  FILE* out_to_parent = fdopen(pipe_child_to_parent[WRITE_TO_PIPE],"w");
965  fprintf(out_to_parent,"%s", strError.c_str());
966  fflush(out_to_parent);
967  fclose(out_to_parent);
968  close(pipe_child_to_parent[WRITE_TO_PIPE]);
969  delete [] szcmd;
970  delete [] szarg;
971  std::exit(ret);
972  }
973  }
974 
975  char currWorkDirBuff[1024];
976  char *currWorkDir = getcwd(currWorkDirBuff,1024);
977 
978  ret = 0;
979  if (currWorkDir)
980  {
981  char **cwd_szarg=new char*[nargs+1];
982  for (int i = 1; i < nargs; ++i) {
983  cwd_szarg[i] = szarg[i];
984  }
985  cwd_szarg[nargs]=nullptr;
986  cwd_szarg[0]=new char[strlen(currWorkDir)+strlen(szarg[0])+16];
987 
988  strcpy(cwd_szarg[0],currWorkDir);
989  strcat(cwd_szarg[0],"/");
990  strcat(cwd_szarg[0],szarg[0]);
991  ret=execvp(cwd_szarg[0],cwd_szarg);
992  delete [] cwd_szarg[0];
993  delete [] cwd_szarg;
994  }
995 
996  if (ret==-1)
997  {
998  ret=execvp(szarg[0],szarg);
999  }
1000 
1001  if (ret==-1)
1002  {
1003  strError = std::string("Can't execute command because ") + std::string(strerror(errno));
1004  FILE* out_to_parent = fdopen(pipe_child_to_parent[WRITE_TO_PIPE],"w");
1005  fprintf(out_to_parent,"%s", strError.c_str());
1006  fflush(out_to_parent);
1007  fclose(out_to_parent);
1008  }
1009  close(pipe_child_to_parent[WRITE_TO_PIPE]);
1010  delete [] szcmd;
1011  delete [] szarg;
1012  ::exit(ret);
1013  }
1014 
1015  if (IS_PARENT_OF(pid_cmd))
1016  {
1017  close(pipe_child_to_parent[WRITE_TO_PIPE]);
1018  FILE* in_from_child = fdopen(pipe_child_to_parent[READ_FROM_PIPE],"r");
1019  int flags=fcntl(pipe_child_to_parent[READ_FROM_PIPE],F_GETFL,0);
1020  if (fcntl(pipe_child_to_parent[READ_FROM_PIPE],F_SETFL,flags|O_NONBLOCK) == -1)
1021  {
1022  strError = std::string("Can't set flag on pipe: ") + std::string(strerror(errno));
1023  fclose(in_from_child);
1024  return 0;
1025  }
1026 
1027  std::string retError;
1028  waitPipe(pipe_child_to_parent[READ_FROM_PIPE]);
1029 
1030  for (char buff[1024]; fgets(buff, 1024, in_from_child);) {
1031  retError += std::string(buff);
1032  }
1033  fclose(in_from_child);
1034 
1035  if(retError.size())
1036  {
1037  strError = retError;
1038  close(pipe_child_to_parent[READ_FROM_PIPE]);
1039  return 0;
1040  }
1041 
1042  close(pipe_to_stdout[WRITE_TO_PIPE]);
1043  close(pipe_child_to_parent[READ_FROM_PIPE]);
1044  return pid_cmd;
1045  }
1046 
1047  return 0;
1048 }
1049 
1053 void LocalBroker::splitLine(char *pLine, char **pArgs)
1054 {
1055  char *pTmp = strchr(pLine, ' ');
1056 
1057  if (pTmp) {
1058  *pTmp = '\0';
1059  pTmp++;
1060  while ((*pTmp) && (*pTmp == ' ')) {
1061  pTmp++;
1062  }
1063  if (*pTmp == '\0') {
1064  pTmp = nullptr;
1065  }
1066  }
1067  *pArgs = pTmp;
1068 }
1069 
1070 
1071 
1075 void LocalBroker::parseArguments(char *io_pLine, int *o_pArgc, char **o_pArgv)
1076 {
1077  char *pNext = io_pLine;
1078  size_t i;
1079  int j;
1080  int quoted = 0;
1081  size_t len = strlen(io_pLine);
1082 
1083  // Protect spaces inside quotes, but lose the quotes
1084  for(i = 0; i < len; i++) {
1085  if ((!quoted) && ('"' == io_pLine[i])) {
1086  quoted = 1;
1087  io_pLine[i] = ' ';
1088  } else if ((quoted) && ('"' == io_pLine[i])) {
1089  quoted = 0;
1090  io_pLine[i] = ' ';
1091  } else if ((quoted) && (' ' == io_pLine[i])) {
1092  io_pLine[i] = '\1';
1093  }
1094  }
1095 
1096  // init
1097  memset(o_pArgv, 0x00, sizeof(char*) * C_MAXARGS);
1098  *o_pArgc = 1;
1099  o_pArgv[0] = io_pLine;
1100 
1101  while ((nullptr != pNext) && (*o_pArgc < C_MAXARGS)) {
1102  splitLine(pNext, &(o_pArgv[*o_pArgc]));
1103  pNext = o_pArgv[*o_pArgc];
1104 
1105  if (nullptr != o_pArgv[*o_pArgc]) {
1106  *o_pArgc += 1;
1107  }
1108  }
1109 
1110  for(j = 0; j < *o_pArgc; j++) {
1111  len = strlen(o_pArgv[j]);
1112  for(i = 0; i < len; i++) {
1113  if('\1' == o_pArgv[j][i]) {
1114  o_pArgv[j][i] = ' ';
1115  }
1116  }
1117  }
1118 }
1119 #endif
bool ret
void * HANDLE
void parseArguments(char *io_pLine, int *o_pArgc, char **o_pArgv)
Breaks up a line into multiple arguments.
Definition: Run.cpp:2406
void splitLine(char *pLine, char **pArgs)
Split a line into separate words.
Definition: Run.cpp:2386
A simple collection of objects that can be described and transmitted in a portable way.
Definition: Bottle.h:74
void fromString(const std::string &text)
Initializes bottle from a string.
Definition: Bottle.cpp:204
size_type size() const
Gets the number of elements in the bottle.
Definition: Bottle.cpp:251
std::string toString() const override
Gives a human-readable textual representation of the bottle.
Definition: Bottle.cpp:211
Preferences for how to communicate with a contact.
Definition: ContactStyle.h:24
double timeout
Set a timeout for communication (in units of seconds, fractional seconds allowed).
Definition: ContactStyle.h:47
bool quiet
Suppress all outputs and warnings.
Definition: ContactStyle.h:36
virtual std::string getName() const
Get name of port.
Definition: Contactable.cpp:14
static bool connect(const std::string &src, const std::string &dest, const std::string &carrier="", bool quiet=true)
Request that an output port connect to an input port.
Definition: Network.cpp:682
static bool exists(const std::string &port, bool quiet=true, bool checkVer=true)
Check for a port to be ready and responsive.
Definition: Network.cpp:746
static bool disconnect(const std::string &src, const std::string &dest, bool quiet)
Request that an output port disconnect from an input port.
Definition: Network.cpp:700
static bool isConnected(const std::string &src, const std::string &dest, bool quiet)
Check if a connection exists between two ports.
Definition: Network.cpp:727
A mini-server for network communication.
Definition: Port.h:47
bool write(const PortWriter &writer, const PortWriter *callback=nullptr) const override
Write an object to the port.
Definition: Port.cpp:427
bool setTimeout(float timeout)
Set a timeout on network operations.
Definition: Port.cpp:625
void close() override
Stop port activity.
Definition: Port.cpp:354
bool open(const std::string &name) override
Start port operation, with a specific name, with automatically-chosen network parameters.
Definition: Port.cpp:79
static double nowSystem()
Definition: SystemClock.cpp:34
static void delaySystem(double seconds)
Definition: SystemClock.cpp:29
bool stop()
Stop the thread.
Definition: Thread.cpp:81
bool isStopping()
Returns true if the thread is stopping (Thread::stop has been called).
Definition: Thread.cpp:99
bool isRunning()
Returns true if the thread is running (Thread::start has been called successfully and the thread has ...
Definition: Thread.cpp:105
bool start()
Start the new thread running.
Definition: Thread.cpp:93
#define KILL_TIMEOUT
Definition: localbroker.cpp:14
#define CONNECTION_TIMEOUT
Definition: localbroker.cpp:15
#define PIPE_TIMEOUT
Definition: localbroker.cpp:32
#define WRITE_TO_PIPE
Definition: localbroker.cpp:17
#define PIPE_EVENT
Definition: localbroker.cpp:33
#define READ_FROM_PIPE
Definition: localbroker.cpp:18
#define STOP_TIMEOUT
Definition: localbroker.cpp:13
#define C_MAXARGS
Definition: localbroker.cpp:35
ContainerT split(const typename ContainerT::value_type &s, std::basic_regex< typename ContainerT::value_type::value_type > regex)
Utility to split a string by a separator, into a vector of strings.
Definition: string.h:27
An interface to the operating system, including Port based communication.
char * getcwd(char *buf, size_t size)
Portable wrapper for the getcwd() function.
Definition: Os.cpp:112
int fork()
Portable wrapper for the fork() function.
Definition: Os.cpp:159