15 #define YARPRUN_NORESPONSE 1
16 #define YARPRUN_NOCONNECTION 2
17 #define YARPRUN_CONNECTION_TIMOUT 3
18 #define YARPRUN_SEMAPHORE_PARAM 4
19 #define YARPRUN_UNDEF 5
21 #define CONNECTION_TIMEOUT 2.0
22 #define RUN_TIMEOUT 10.0
23 #define STOP_TIMEOUT 15.0
24 #define KILL_TIMEOUT 10.0
25 #define EVENT_THREAD_PERIOD 0.5
32 " (Remote host does not respond) ",
33 " (Remote host does no exist) ",
34 " (Timeout while connecting to the remote host) ",
35 " (Blocked in broker semaphor) ",
36 " (Undefined message) " };
46 bOnlyConnector = bInitialized =
false;
59 if(PeriodicThread::isRunning())
60 PeriodicThread::stop();
71 strError =
"Yarp network server is not up.";
75 bOnlyConnector =
true;
89 const char* szhost,
const char* szstdio,
90 const char* szworkdir,
const char* szenv )
107 strError =
"command is not specified.";
114 strError =
"remote host port is not specified.";
120 strHost = string(
"/") + string(szhost);
127 if(strlen(szworkdir))
128 strWorkdir = szworkdir;
132 if(szstdio[0] !=
'/')
133 strStdio = string(
"/") + string(szstdio);
143 strTag = strHost + strCmd + strParam + strEnv + sstrID.str();
144 string::iterator itr;
145 for(itr=strTag.begin(); itr!=strTag.end(); itr++)
146 if(((*itr) ==
' ') || ((*itr) ==
'/') )
149 __trace_message =
"(init) checking yarp network";
150 if(!NetworkBase::checkNetwork(5.0))
152 strError =
"Yarp network server is not up.";
153 __trace_message.clear();
157 __trace_message = string(
"(init) checking existence of ") + strHost;
158 if(!
exists(strHost.c_str()))
161 strError +=
" does not exist. check yarprun is running as server.";
162 __trace_message.clear();
183 if(!bInitialized)
return false;
184 if(bOnlyConnector)
return false;
187 int ret = requestServer(runProperty());
190 strError =
"cannot ask ";
192 strError +=
" to run ";
196 strError += string(
" due to " + __trace_message);
200 double base = SystemClock::nowSystem();
205 if(strStdioUUID.size())
207 if(PeriodicThread::isRunning())
208 PeriodicThread::stop();
209 PeriodicThread::start();
215 strError =
"cannot run ";
224 if(!bInitialized)
return true;
225 if(bOnlyConnector)
return false;
241 strError =
"cannot ask ";
243 strError +=
" to stop ";
247 strError += string(
" due to " + __trace_message);
251 double base = SystemClock::nowSystem();
256 PeriodicThread::stop();
261 strError =
"Timeout! Cannot stop ";
265 PeriodicThread::stop();
271 if(!bInitialized)
return true;
272 if(bOnlyConnector)
return false;
289 strError =
"cannot ask ";
291 strError +=
" to kill ";
295 strError += string(
" due to " + __trace_message);
299 double base = SystemClock::nowSystem();
304 PeriodicThread::stop();
309 strError =
"cannot kill ";
313 PeriodicThread::stop();
320 if(!bInitialized)
return -1;
321 if(bOnlyConnector)
return -1;
336 int ret = SendMsg(msg, strHost, response, 3.0);
339 strError =
"cannot ask ";
341 strError +=
" to check for status of ";
345 strError += string(
" due to " + __trace_message);
348 return ((response.
get(0).
asString() ==
"running")?1:0);
365 string cmd = strCmd + string(
" ") + strParam;
366 command.
put(
"cmd", cmd);
367 command.
put(
"on", strHost);
368 command.
put(
"as", strTag);
369 if(!strWorkdir.empty())
370 command.
put(
"workdir", strWorkdir);
371 if(!strStdio.empty())
372 command.
put(
"stdio", strStdio);
374 command.
put(
"env", strEnv);
384 const char* carrier,
bool persist)
388 strError =
"no source port is introduced.";
394 strError =
"no destination port is introduced.";
409 string strCarrier = carrier;
410 bool needDisconnect = strCarrier.find(
"udp") == (size_t)0;
411 needDisconnect |= strCarrier.find(
"mcast") == (size_t)0;
412 if(needDisconnect ==
false) {
413 if(NetworkBase::isConnected(from, to, style))
417 NetworkBase::connect(from, to, style);
420 strError =
"cannot connect ";
422 strError +=
" to " + string(to);
428 string topic = string(
"topic:/") + string(from) + string(to);
429 NetworkBase::connect(from, topic, style);
430 NetworkBase::connect(topic, to, style);
433 strError =
"a persistent connection from ";
435 strError +=
" to " + string(to);
436 strError +=
" is created but not connected.";
450 strError =
"no source port is introduced.";
456 strError =
"no destination port is introduced.";
483 if(!NetworkBase::disconnect(from, to, style))
485 strError =
"cannot disconnect ";
487 strError +=
" from " + string(to);
499 return NetworkBase::exists(szport, style);
504 if((szport==
nullptr) || (request==
nullptr))
513 if(!port.
open(
"..."))
520 for(
int i=0; i<10; i++) {
521 ret = NetworkBase::connect(port.
getName(), szport, style);
523 SystemClock::delaySystem(1.0);
534 NetworkBase::disconnect(port.
getName(), szport);
552 return NetworkBase::isConnected(from, to, style);
559 if(!semParam.
check())
565 if(!port.
open(
"...")) {
566 __trace_message.clear();
582 __trace_message =
"(getSystemInfo) connecting to " + string(port.
getName());
587 strError = string(
"Cannot connect to ") + string(server);
588 __trace_message.clear();
593 __trace_message =
"(getSystemInfo) writing to " + string(port.
getName());
595 __trace_message =
"(getSystemInfo) disconnecting from " + string(port.
getName());
596 NetworkBase::disconnect(port.
getName(), server);
601 strError = string(server) + string(
" does not respond");
602 __trace_message.clear();
608 __trace_message.clear();
621 bool ret = NetworkBase::writeToNameServer(cmd, reply, style);
624 strError =
"Failed to reach name server\n";
632 const char* delm =
"registration name ";
634 while((pos1 = str.find(delm)) != std::string::npos)
636 str = str.substr(pos1+strlen(delm));
637 if((pos2 = str.find(
' ')) != std::string::npos)
638 ports.push_back(str.substr(0, pos2));
658 int ret = SendMsg(msg, server, response, 3.0);
661 for(
size_t i=0; i<response.
size(); i++)
673 processes.push_back(proc);
678 strError =
"cannot ask ";
680 strError +=
" to give the list of running processes.";
683 strError += string(
" due to " + __trace_message);
690 string topic = string(from) + string(to);
703 const char *qosFrom,
const char *qosTo) {
706 if(qosFrom && qosTo && !strlen(qosFrom) && !strlen(qosTo))
711 if(qosFrom !=
nullptr && strlen(qosFrom)) {
712 if(!getQosFromString(qosFrom, styleFrom)) {
713 strError =
"Error in parsing Qos properties of " + string(from);
717 if(qosTo !=
nullptr && strlen(qosTo))
718 if(!getQosFromString(qosTo, styleTo)) {
719 strError =
"Error in parsing Qos properties of " + string(to);
722 return NetworkBase::setConnectionQos(from, to, styleFrom, styleTo,
true);
727 transform(strQos.begin(), strQos.end(), strQos.begin(),
728 (
int(*)(
int))toupper);
729 strQos.erase( std::remove_if( strQos.begin(), strQos.end(), ::isspace ), strQos.end() );
732 stringstream ss(strQos);
734 while(getline(ss, prop,
';')) {
735 size_t p = prop.find(
':');
736 if (p != prop.npos) {
737 string key = prop.substr(0, p);
738 string value = prop.substr(p+1);
739 if (key.length() > 0 && value.length() > 0) {
740 if (key ==
"LEVEL" || key==
"DSCP" || key ==
"TOS") {
744 else if (key ==
"PRIORITY") {
746 int prio = strtol(value.c_str(), &p, 10);
749 else if (key ==
"POLICY") {
751 int policy = strtol(value.c_str(), &p, 10);
762 return strError.c_str();
766 bool YarpBroker::timeout(
double base,
double timeout)
768 SystemClock::delaySystem(1.0);
769 if((SystemClock::nowSystem()-base) > timeout)
776 if(!strStdioUUID.size())
779 string strStdioPort = strStdioUUID +
"/stdout";
780 stdioPort.
open(
"...");
782 double base = SystemClock::nowSystem();
786 while(!timeout(base, 5.0))
788 if(NetworkBase::connect(strStdioPort, stdioPort.
getName(), style))
792 strError =
"Cannot connect to stdio port ";
793 strError += strStdioPort;
804 for (
size_t i=0; i<input->
size(); i++)
812 NetworkBase::disconnect(stdioPort.
getName(), strStdioUUID);
817 int YarpBroker::SendMsg(
Bottle& msg, std::string target,
Bottle& response,
float fTimeout)
819 if(!
exists(target.c_str()))
822 if(!semParam.
check())
828 if(!port.
open(
"..."))
830 __trace_message.clear();
840 __trace_message =
"(SendMsg) connecting to " + string(target);
841 for(
int i=0; i<10; i++)
843 ret = NetworkBase::connect(port.
getName(), target, style);
845 SystemClock::delaySystem(1.0);
851 __trace_message.clear();
856 __trace_message =
"(SendMsg) writing to " + string(target);
858 __trace_message =
"(SendMsg) disconnecting from " + string(target);
859 NetworkBase::disconnect(port.
getName(),target);
860 __trace_message.clear();
874 int YarpBroker::requestServer(
Property& config)
882 if (config.
check(
"cmd") && config.
check(
"stdio"))
905 if(response.
size() > 2)
915 if (config.
check(
"cmd"))
virtual void onBrokerStdout(const char *msg)
BrokerEventSink * eventSink
unsigned int generateID()
void threadRelease() override
Release method.
const char * error() override
bool connected(const char *from, const char *to, const char *carrier) override
bool getAllPorts(std::vector< std::string > &stingList)
const char * requestRpc(const char *szport, const char *request, double timeout) override
bool getSystemInfo(const char *server, yarp::os::SystemInfoSerializer &info)
bool getAllProcesses(const char *server, ProcessContainer &processes)
void detachStdout() override
bool connect(const char *from, const char *to, const char *carrier, bool persist=false) override
connection broker
bool rmconnect(const char *from, const char *to)
bool threadInit() override
Initialization method.
bool setQos(const char *from, const char *to, const char *qosFrom, const char *qosTo)
bool disconnect(const char *from, const char *to, const char *carrier) override
void run() override
Loop function.
bool attachStdout() override
bool exists(const char *port) override
A simple collection of objects that can be described and transmitted in a portable way.
void fromString(const std::string &text)
Initializes bottle from a string.
Bottle & addList()
Places an empty nested list in the bottle, at the end of the list.
size_type size() const
Gets the number of elements in the bottle.
Value & get(size_type index) const
Reads a Value v from a certain part of the list.
void clear()
Empties the bottle of any objects it contains.
void addInt32(std::int32_t x)
Places a 32-bit integer in the bottle, at the end of the list.
void addString(const char *str)
Places a string in the bottle, at the end of the list.
std::string toString() const override
Gives a human-readable textual representation of the bottle.
std::string getName() const override
Get name of port.
void close() override
Stop port activity.
bool open(const std::string &name) override
Start port operation, with a specific name, with automatically-chosen network parameters.
T * read(bool shouldWait=true) override
Read an available object from the port.
static bool connect(const std::string &src, const std::string &dest, const std::string &carrier="", bool quiet=true)
Request that an output port connect to an input port.
An abstraction for a periodic thread.
A mini-server for network communication.
bool write(const PortWriter &writer, const PortWriter *callback=nullptr) const override
Write an object to the port.
bool setTimeout(float timeout)
Set a timeout on network operations.
void close() override
Stop port activity.
bool open(const std::string &name) override
Start port operation, with a specific name, with automatically-chosen network parameters.
A class for storing options and configuration information.
Value & find(const std::string &key) const override
Gets a value corresponding to a given keyword.
void put(const std::string &key, const std::string &value)
Associate the given key with the given string.
bool check(const std::string &key) const override
Check if there exists a property of the given name.
void clear()
Remove all associations.
Bottle & findGroup(const std::string &key) const override
Gets a list corresponding to a given keyword.
Preferences for the port's Quality of Service.
void setThreadPriority(int priority)
sets the communication thread priority level
bool setPacketPriority(const std::string &priority)
sets the packet priority from a string.
void setThreadPolicy(int policy)
sets the communication thread scheduling policy
void wait()
Decrement the counter, even if we must wait to do that.
bool check()
Decrement the counter, unless that would require waiting.
void post()
Increment the counter.
A helper class to pass the SystemInfo object around the YARP network.
virtual bool isString() const
Checks if value is a string.
virtual std::int32_t asInt32() const
Get 32-bit integer value.
Value & find(const std::string &key) const override
Gets a value corresponding to a given keyword.
bool check(const std::string &key) const override
Check if there exists a property of the given name.
virtual std::string asString() const
Get string value.
std::stringstream OSTRINGSTREAM
std::vector< Process > ProcessContainer
The components from which ports and connections are built.
An interface to the operating system, including Port based communication.
bool write(const ImageOf< PixelRgb > &src, const std::string &dest, image_fileformat format=FORMAT_PPM)
#define CONNECTION_TIMEOUT
#define YARPRUN_CONNECTION_TIMOUT
#define YARPRUN_NORESPONSE
#define YARPRUN_NOCONNECTION
#define YARPRUN_SEMAPHORE_PARAM
const char * yarprun_err_msg[]
#define EVENT_THREAD_PERIOD