12 #define RUN_TIMEOUT 10.0
13 #define STOP_TIMEOUT 15.0
14 #define KILL_TIMEOUT 10.0
15 #define CONNECTION_TIMEOUT 2.0
17 #define WRITE_TO_PIPE 1
18 #define READ_FROM_PIPE 0
25 #include <sys/types.h>
32 #define PIPE_TIMEOUT 0
34 #define PIPE_SIGNALED 2
43 class LocalTerminateParams
46 LocalTerminateParams(DWORD
id) {
51 ~LocalTerminateParams(){}
56 BOOL CALLBACK LocalTerminateAppEnum(HWND hwnd, LPARAM lParam)
58 LocalTerminateParams* params=(LocalTerminateParams*)lParam;
60 GetWindowThreadProcessId(hwnd, &dwID);
61 if (dwID==params->dwID)
64 PostMessage(hwnd,WM_CLOSE,0,0);
69 volatile LONGLONG uniquePipeNumber = 0;
71 volatile LONG uniquePipeNumber = 0;
78 OUT LPHANDLE lpReadPipe,
79 OUT LPHANDLE lpWritePipe,
80 IN LPSECURITY_ATTRIBUTES lpPipeAttributes,
83 HANDLE ReadPipeHandle, WritePipeHandle;
85 char PipeNameBuffer[MAX_PATH];
86 nSize = (nSize ==0) ? 100*8096: nSize;
89 InterlockedIncrement64(&uniquePipeNumber);
91 InterlockedIncrement(&uniquePipeNumber);
94 sprintf( PipeNameBuffer,
95 "\\\\.\\Pipe\\RemoteExeAnon.%08x.%08x",
96 GetCurrentProcessId(),
100 ReadPipeHandle = CreateNamedPipeA(
101 (LPSTR)PipeNameBuffer,
102 PIPE_ACCESS_INBOUND | FILE_FLAG_OVERLAPPED,
103 PIPE_TYPE_BYTE | PIPE_WAIT,
111 if (! ReadPipeHandle) {
115 WritePipeHandle = CreateFileA(
116 (LPSTR)PipeNameBuffer,
121 FILE_ATTRIBUTE_NORMAL | FILE_FLAG_OVERLAPPED,
125 if (INVALID_HANDLE_VALUE == WritePipeHandle)
127 dwError = GetLastError();
128 CloseHandle( ReadPipeHandle );
129 SetLastError(dwError);
133 *lpReadPipe = ReadPipeHandle;
134 *lpWritePipe = WritePipeHandle;
140 LocalBroker::LocalBroker()
142 bOnlyConnector = bInitialized =
false;
145 setWindowMode(WINDOW_HIDDEN);
149 LocalBroker::~LocalBroker()
154 void LocalBroker::fini()
161 bool LocalBroker::init()
171 bOnlyConnector =
true;
175 bool LocalBroker::init(
const char* szcmd,
const char* szparam,
176 const char* szhost,
const char* szstdio,
177 const char* szworkdir,
const char* szenv )
190 strError =
"command is not specified.";
194 if (szparam && strlen(szparam)) {
198 if (szhost && strlen(szhost)) {
201 if (szworkdir && strlen(szworkdir)) {
202 strWorkdir = szworkdir;
205 if(szstdio && strlen(szstdio))
207 if (szstdio[0] !=
'/') {
208 strStdio = std::string(
"/") + std::string(szstdio);
214 if (szenv && strlen(szenv)) {
237 struct sigaction new_action;
238 new_action.sa_handler = SIG_IGN;
239 sigemptyset (&new_action.sa_mask);
240 new_action.sa_flags = 0;
241 sigaction (SIGCHLD, &new_action,
nullptr);
249 bool LocalBroker::start()
254 if (bOnlyConnector) {
275 bool LocalBroker::stop()
280 if (bOnlyConnector) {
301 strError =
"Timeout! cannot stop ";
308 bool LocalBroker::kill()
313 if (bOnlyConnector) {
335 strError =
"Timeout! cannot kill ";
343 int LocalBroker::running()
348 if (bOnlyConnector) {
351 return (psCmd(ID))?1:0;
358 bool LocalBroker::connect(
const char* from,
const char* to,
359 const char* carrier,
bool persist)
364 strError =
"no source port is introduced.";
370 strError =
"no destination port is introduced.";
377 strError +=
" does not exist.";
384 strError +=
" does not exist.";
390 strError =
"cannot connect ";
392 strError +=
" to " + std::string(to);
398 bool LocalBroker::disconnect(
const char* from,
const char* to,
const char *carrier)
403 strError =
"no source port is introduced.";
409 strError =
"no destination port is introduced.";
416 strError +=
" does not exist.";
423 strError +=
" does not exist.";
427 if (!connected(from, to, carrier)) {
433 strError =
"cannot disconnect ";
435 strError +=
" from " + std::string(to);
442 bool LocalBroker::exists(
const char* port)
448 const char* LocalBroker::requestRpc(
const char* szport,
const char* request,
double timeout)
450 if ((szport ==
nullptr) || (request ==
nullptr)) {
454 if (!exists(szport)) {
461 if (!port.
open(
"...")) {
469 for(
int i=0; i<10; i++) {
495 bool LocalBroker::connected(
const char* from,
const char* to,
const char* carrier)
497 if (!exists(from) || !exists(to)) {
504 const char* LocalBroker::error()
506 return strError.c_str();
509 bool LocalBroker::attachStdout()
516 strError =
"Module is not running";
519 return startStdout();
522 void LocalBroker::detachStdout()
528 bool LocalBroker::timeout(
double base,
double timeout)
537 bool LocalBroker::threadInit()
543 void LocalBroker::run()
552 BOOL bRet = ReadFile(read_from_pipe_cmd_to_stdout,
553 buff, 1023, &dwRead,
nullptr);
556 buff[dwRead] = (CHAR)0;
557 if(eventSink && strlen(buff))
558 eventSink->onBrokerStdout(buff);
570 while (fgets(buff, 1024, fd_stdout)) {
571 strmsg += std::string(buff);
573 if (eventSink && strmsg.size()) {
574 eventSink->onBrokerStdout(strmsg.c_str());
584 void LocalBroker::threadRelease()
597 std::string LocalBroker::lastError2String()
599 int error=GetLastError();
601 FormatMessage(FORMAT_MESSAGE_FROM_SYSTEM,
nullptr,error,0,buff,1024,
nullptr);
602 return std::string(buff);
605 bool LocalBroker::startStdout()
607 if (!CloseHandle(write_to_pipe_cmd_to_stdout))
613 void LocalBroker::stopStdout()
618 int LocalBroker::ExecuteCmd()
620 std::string strCmdLine = strCmd + std::string(
" ") + strParam;
622 PROCESS_INFORMATION cmd_process_info;
623 STARTUPINFO cmd_startup_info;
624 ZeroMemory(&cmd_process_info,
sizeof(PROCESS_INFORMATION));
625 ZeroMemory(&cmd_startup_info,
sizeof(STARTUPINFO));
626 cmd_startup_info.cb =
sizeof(STARTUPINFO);
629 std::string strDisplay=getDisplay();
631 DWORD dwCreationFlags;
644 if (windowMode==WINDOW_VISIBLE)
647 cmd_startup_info.dwFlags |= STARTF_USESHOWWINDOW;
648 cmd_startup_info.wShowWindow = SW_SHOWNA;
649 dwCreationFlags=CREATE_NEW_CONSOLE;
651 if (windowMode==WINDOW_HIDDEN)
654 SECURITY_ATTRIBUTES pipe_sec_attr;
655 pipe_sec_attr.nLength =
sizeof(SECURITY_ATTRIBUTES);
656 pipe_sec_attr.bInheritHandle = TRUE;
657 pipe_sec_attr.lpSecurityDescriptor =
nullptr;
658 CreatePipeAsync(&read_from_pipe_cmd_to_stdout,
659 &write_to_pipe_cmd_to_stdout,
662 cmd_startup_info.hStdError = write_to_pipe_cmd_to_stdout;
663 cmd_startup_info.hStdOutput = write_to_pipe_cmd_to_stdout;
665 cmd_startup_info.dwFlags |= STARTF_USESTDHANDLES;
667 dwCreationFlags=CREATE_NEW_PROCESS_GROUP;
675 TCHAR chNewEnv[32767];
678 LPTCH chOldEnv = GetEnvironmentStrings();
681 LPTSTR lpOld = (LPTSTR) chOldEnv;
682 LPTSTR lpNew = (LPTSTR) chNewEnv;
685 lstrcpy(lpNew, lpOld);
686 lpOld += lstrlen(lpOld) + 1;
687 lpNew += lstrlen(lpNew) + 1;
691 std::string cstrEnvName;
695 for (
const auto& s : ss) {
696 lstrcpy(lpNew, (LPTCH) s.c_str());
697 lpNew += lstrlen(lpNew) + 1;
704 bool bWorkdir=(strWorkdir.size()) ?
true :
false;
705 std::string strWorkdirOk = bWorkdir ? strWorkdir+std::string(
"\\") :
"";
707 BOOL bSuccess=CreateProcess(
nullptr,
708 (
char*)(strWorkdirOk+strCmdLine).c_str(),
714 bWorkdir?strWorkdirOk.c_str():
nullptr,
718 if (!bSuccess && bWorkdir)
720 bSuccess=CreateProcess(
nullptr,
721 (
char*)(strCmdLine.c_str()),
727 strWorkdirOk.c_str(),
733 FreeEnvironmentStrings(chOldEnv);
735 CloseHandle(cmd_process_info.hProcess);
736 CloseHandle(cmd_process_info.hThread);
740 strError = std::string(
"Can't execute command because ") + lastError2String();
744 return cmd_process_info.dwProcessId;
747 bool LocalBroker::psCmd(
int pid)
749 HANDLE hProc=OpenProcess(SYNCHRONIZE|PROCESS_QUERY_INFORMATION, FALSE, pid);
754 GetExitCodeProcess(hProc , &status);
756 return (status==STILL_ACTIVE);
759 bool LocalBroker::killCmd(
int pid)
761 HANDLE hProc=OpenProcess(SYNCHRONIZE|PROCESS_TERMINATE, FALSE, pid);
765 BOOL bRet = TerminateProcess(hProc, 0);
767 return bRet ? true :
false;
770 bool LocalBroker::stopCmd(
int pid)
772 HANDLE hProc=OpenProcess(SYNCHRONIZE|PROCESS_TERMINATE, FALSE, pid);
776 LocalTerminateParams params(pid);
777 EnumWindows((WNDENUMPROC)LocalTerminateAppEnum,(LPARAM)¶ms);
780 GenerateConsoleCtrlEvent(CTRL_C_EVENT, pid);
783 GenerateConsoleCtrlEvent(CTRL_BREAK_EVENT, pid);
791 bool LocalBroker::psCmd(
int pid)
796 return !::kill(pid, 0);
800 bool LocalBroker::killCmd(
int pid)
805 return !::kill(pid, SIGKILL);
809 bool LocalBroker::stopCmd(
int pid)
814 return !::kill(pid, SIGTERM);
817 int LocalBroker::waitPipe(
int pipe_fd)
819 struct timeval timeout;
824 timeout.tv_usec = 500000;
827 FD_SET(pipe_fd, &fd);
828 rc = select(pipe_fd + 1, &fd,
nullptr,
nullptr, &timeout);
833 int LocalBroker::waitPipeSignal(
int pipe_fd)
835 struct timespec timeout;
841 FD_SET(pipe_fd, &fd);
858 if (pselect(pipe_fd + 1, &fd,
nullptr,
nullptr, &timeout,
nullptr)) {
865 bool LocalBroker::startStdout()
870 strError =
"cannot open pipe. " + std::string(strerror(errno));
876 if(fcntl(pipe_to_stdout[
READ_FROM_PIPE], F_SETFL, oflags|O_NONBLOCK) == -1)
878 strError =
"cannot set flag on pipe: " + std::string(strerror(errno));
886 void LocalBroker::stopStdout()
897 int LocalBroker::ExecuteCmd()
899 int pipe_child_to_parent[2];
900 int ret = pipe(pipe_child_to_parent);
903 strError = std::string(
"Can't create child pipe because") + std::string(strerror(errno));
907 ret = pipe(pipe_to_stdout);
910 strError = std::string(
"Can't create stdout pipe because") + std::string(strerror(errno));
914 int pid_cmd =
fork();
916 if(IS_INVALID(pid_cmd))
918 strError = std::string(
"Can't fork command because ") + std::string(strerror(errno));
922 if (IS_NEW_PROCESS(pid_cmd))
928 if (fcntl(STDOUT_FILENO, F_SETFL, fcntl(STDOUT_FILENO, F_GETFL) | O_NONBLOCK) == -1) {
929 strError = std::string(
"Can't set flag on stdout: ") + std::string(strerror(errno));
932 if (fcntl(STDERR_FILENO, F_SETFL, fcntl(STDERR_FILENO, F_GETFL) | O_NONBLOCK) == -1) {
933 strError = std::string(
"Can't set flag on stderr: ") + std::string(strerror(errno));
940 strCmd = strCmd + std::string(
" ") + strParam;
941 char *szcmd =
new char[strCmd.size()+1];
942 strcpy(szcmd,strCmd.c_str());
946 szarg[nargs]=
nullptr;
950 for (
const auto& s : ss) {
951 char* szenv =
new char[s.size()+1];
952 strcpy(szenv, s.c_str());
958 if(strWorkdir.size())
960 int ret = chdir(strWorkdir.c_str());
963 strError = std::string(
"Can't set working directory because ") + std::string(strerror(errno));
964 FILE* out_to_parent = fdopen(pipe_child_to_parent[
WRITE_TO_PIPE],
"w");
965 fprintf(out_to_parent,
"%s", strError.c_str());
966 fflush(out_to_parent);
967 fclose(out_to_parent);
975 char currWorkDirBuff[1024];
976 char *currWorkDir =
getcwd(currWorkDirBuff,1024);
981 char **cwd_szarg=
new char*[nargs+1];
982 for (
int i = 1; i < nargs; ++i) {
983 cwd_szarg[i] = szarg[i];
985 cwd_szarg[nargs]=
nullptr;
986 cwd_szarg[0]=
new char[strlen(currWorkDir)+strlen(szarg[0])+16];
988 strcpy(cwd_szarg[0],currWorkDir);
989 strcat(cwd_szarg[0],
"/");
990 strcat(cwd_szarg[0],szarg[0]);
991 ret=execvp(cwd_szarg[0],cwd_szarg);
992 delete [] cwd_szarg[0];
998 ret=execvp(szarg[0],szarg);
1003 strError = std::string(
"Can't execute command because ") + std::string(strerror(errno));
1004 FILE* out_to_parent = fdopen(pipe_child_to_parent[
WRITE_TO_PIPE],
"w");
1005 fprintf(out_to_parent,
"%s", strError.c_str());
1006 fflush(out_to_parent);
1007 fclose(out_to_parent);
1015 if (IS_PARENT_OF(pid_cmd))
1018 FILE* in_from_child = fdopen(pipe_child_to_parent[
READ_FROM_PIPE],
"r");
1020 if (fcntl(pipe_child_to_parent[
READ_FROM_PIPE],F_SETFL,flags|O_NONBLOCK) == -1)
1022 strError = std::string(
"Can't set flag on pipe: ") + std::string(strerror(errno));
1023 fclose(in_from_child);
1027 std::string retError;
1030 for (
char buff[1024]; fgets(buff, 1024, in_from_child);) {
1031 retError += std::string(buff);
1033 fclose(in_from_child);
1037 strError = retError;
1055 char *pTmp = strchr(pLine,
' ');
1060 while ((*pTmp) && (*pTmp ==
' ')) {
1063 if (*pTmp ==
'\0') {
1077 char *pNext = io_pLine;
1081 size_t len = strlen(io_pLine);
1084 for(i = 0; i < len; i++) {
1085 if ((!quoted) && (
'"' == io_pLine[i])) {
1088 }
else if ((quoted) && (
'"' == io_pLine[i])) {
1091 }
else if ((quoted) && (
' ' == io_pLine[i])) {
1097 memset(o_pArgv, 0x00,
sizeof(
char*) *
C_MAXARGS);
1099 o_pArgv[0] = io_pLine;
1101 while ((
nullptr != pNext) && (*o_pArgc <
C_MAXARGS)) {
1103 pNext = o_pArgv[*o_pArgc];
1105 if (
nullptr != o_pArgv[*o_pArgc]) {
1110 for(j = 0; j < *o_pArgc; j++) {
1111 len = strlen(o_pArgv[j]);
1112 for(i = 0; i < len; i++) {
1113 if(
'\1' == o_pArgv[j][i]) {
1114 o_pArgv[j][i] =
' ';
void parseArguments(char *io_pLine, int *o_pArgc, char **o_pArgv)
Breaks up a line into multiple arguments.
void splitLine(char *pLine, char **pArgs)
Split a line into separate words.
A simple collection of objects that can be described and transmitted in a portable way.
void fromString(const std::string &text)
Initializes bottle from a string.
size_type size() const
Gets the number of elements in the bottle.
std::string toString() const override
Gives a human-readable textual representation of the bottle.
static bool connect(const std::string &src, const std::string &dest, const std::string &carrier="", bool quiet=true)
Request that an output port connect to an input port.
static bool exists(const std::string &port, bool quiet=true, bool checkVer=true)
Check for a port to be ready and responsive.
static bool disconnect(const std::string &src, const std::string &dest, bool quiet)
Request that an output port disconnect from an input port.
static bool isConnected(const std::string &src, const std::string &dest, bool quiet)
Check if a connection exists between two ports.
A mini-server for network communication.
bool write(const PortWriter &writer, const PortWriter *callback=nullptr) const override
Write an object to the port.
bool setTimeout(float timeout)
Set a timeout on network operations.
void close() override
Stop port activity.
bool open(const std::string &name) override
Start port operation, with a specific name, with automatically-chosen network parameters.
static double nowSystem()
static void delaySystem(double seconds)
bool stop()
Stop the thread.
bool isStopping()
Returns true if the thread is stopping (Thread::stop has been called).
bool isRunning()
Returns true if the thread is running (Thread::start has been called successfully and the thread has ...
bool start()
Start the new thread running.
#define CONNECTION_TIMEOUT
ContainerT split(const typename ContainerT::value_type &s, std::basic_regex< typename ContainerT::value_type::value_type > regex)
Utility to split a string by a separator, into a vector of strings.
An interface to the operating system, including Port based communication.
char * getcwd(char *buf, size_t size)
Portable wrapper for the getcwd() function.
int fork()
Portable wrapper for the fork() function.