12 #define YARPRUN_NORESPONSE 1
13 #define YARPRUN_NOCONNECTION 2
14 #define YARPRUN_CONNECTION_TIMOUT 3
15 #define YARPRUN_SEMAPHORE_PARAM 4
16 #define YARPRUN_UNDEF 5
18 #define CONNECTION_TIMEOUT 2.0
19 #define RUN_TIMEOUT 10.0
20 #define STOP_TIMEOUT 15.0
21 #define KILL_TIMEOUT 10.0
22 #define EVENT_THREAD_PERIOD 0.5
29 " (Remote host does not respond) ",
30 " (Remote host does no exist) ",
31 " (Timeout while connecting to the remote host) ",
32 " (Blocked in broker semaphor) ",
33 " (Undefined message) " };
42 bOnlyConnector = bInitialized =
false;
55 if (PeriodicThread::isRunning()) {
56 PeriodicThread::stop();
68 strError =
"YARP network server is not up.";
72 bOnlyConnector =
true;
86 const char* szhost,
const char* szstdio,
87 const char* szworkdir,
const char* szenv )
104 strError =
"command is not specified.";
111 strError =
"remote host port is not specified.";
116 if (szhost[0] !=
'/') {
117 strHost = std::string(
"/") + std::string(szhost);
123 if (strlen(szparam)) {
126 if (strlen(szworkdir)) {
127 strWorkdir = szworkdir;
132 if (szstdio[0] !=
'/') {
133 strStdio = std::string(
"/") + std::string(szstdio);
145 strTag = strHost + strCmd + strParam + strEnv + sstrID.str();
146 std::string::iterator itr;
147 for (itr = strTag.begin(); itr != strTag.end(); itr++) {
148 if (((*itr) ==
' ') || ((*itr) ==
'/')) {
153 __trace_message =
"(init) checking yarp network";
154 if(!NetworkBase::checkNetwork(5.0))
156 strError =
"YARP network server is not up.";
157 __trace_message.clear();
161 __trace_message = std::string(
"(init) checking existence of ") + strHost;
162 if(!
exists(strHost.c_str()))
165 strError +=
" does not exist. check yarprun is running as server.";
166 __trace_message.clear();
190 if (bOnlyConnector) {
195 int ret = requestServer(runProperty());
198 strError =
"cannot ask ";
200 strError +=
" to run ";
204 strError += std::string(
" due to " + __trace_message);
209 double base = SystemClock::nowSystem();
214 if(strStdioUUID.size())
216 if (PeriodicThread::isRunning()) {
217 PeriodicThread::stop();
219 PeriodicThread::start();
225 strError =
"cannot run ";
237 if (bOnlyConnector) {
255 strError =
"cannot ask ";
257 strError +=
" to stop ";
261 strError += std::string(
" due to " + __trace_message);
266 double base = SystemClock::nowSystem();
271 PeriodicThread::stop();
276 strError =
"Timeout! Cannot stop ";
280 PeriodicThread::stop();
289 if (bOnlyConnector) {
308 strError =
"cannot ask ";
310 strError +=
" to kill ";
314 strError += std::string(
" due to " + __trace_message);
319 double base = SystemClock::nowSystem();
324 PeriodicThread::stop();
329 strError =
"cannot kill ";
333 PeriodicThread::stop();
343 if (bOnlyConnector) {
360 int ret = SendMsg(msg, strHost, response, 3.0);
363 strError =
"cannot ask ";
365 strError +=
" to check for status of ";
369 strError += std::string(
" due to " + __trace_message);
373 return ((response.
get(0).
asString() ==
"running")?1:0);
390 std::string cmd = strCmd + std::string(
" ") + strParam;
391 command.
put(
"cmd", cmd);
392 command.
put(
"on", strHost);
393 command.
put(
"as", strTag);
394 if (!strWorkdir.empty()) {
395 command.
put(
"workdir", strWorkdir);
397 if (!strStdio.empty()) {
398 command.
put(
"stdio", strStdio);
400 if (!strEnv.empty()) {
401 command.
put(
"env", strEnv);
412 const char* carrier,
bool persist)
416 strError =
"no source port is introduced.";
422 strError =
"no destination port is introduced.";
437 std::string strCarrier = carrier;
438 bool needDisconnect = strCarrier.find(
"udp") == (
size_t)0;
439 needDisconnect |= strCarrier.find(
"mcast") == (
size_t)0;
440 if(needDisconnect ==
false) {
441 if (NetworkBase::isConnected(from, to, style)) {
446 NetworkBase::connect(from, to, style);
449 strError =
"cannot connect ";
451 strError +=
" to " + std::string(to);
457 std::string topic = std::string(
"topic:/") + std::string(from) + std::string(to);
458 NetworkBase::connect(from, topic, style);
459 NetworkBase::connect(topic, to, style);
462 strError =
"a persistent connection from ";
464 strError +=
" to " + std::string(to);
465 strError +=
" is created but not connected.";
479 strError =
"no source port is introduced.";
485 strError =
"no destination port is introduced.";
513 if(!NetworkBase::disconnect(from, to, style))
515 strError =
"cannot disconnect ";
517 strError +=
" from " + std::string(to);
529 return NetworkBase::exists(szport, style);
534 if ((szport ==
nullptr) || (request ==
nullptr)) {
545 if (!port.
open(
"...")) {
553 for(
int i=0; i<10; i++) {
554 ret = NetworkBase::connect(port.
getName(), szport, style);
558 SystemClock::delaySystem(1.0);
569 NetworkBase::disconnect(port.
getName(), szport);
588 return NetworkBase::isConnected(from, to, style);
593 if (!strlen(server)) {
596 if (!semParam.
check()) {
603 if(!port.
open(
"...")) {
604 __trace_message.clear();
620 __trace_message =
"(getSystemInfo) connecting to " + std::string(port.
getName());
625 strError = std::string(
"Cannot connect to ") + std::string(server);
626 __trace_message.clear();
631 __trace_message =
"(getSystemInfo) writing to " + std::string(port.
getName());
633 __trace_message =
"(getSystemInfo) disconnecting from " + std::string(port.
getName());
634 NetworkBase::disconnect(port.
getName(), server);
639 strError = std::string(server) + std::string(
" does not respond");
640 __trace_message.clear();
646 __trace_message.clear();
659 bool ret = NetworkBase::writeToNameServer(cmd, reply, style);
662 strError =
"Failed to reach name server\n";
671 const char* delm =
"registration name ";
673 while((pos1 = str.find(delm)) != std::string::npos)
675 str = str.substr(pos1+strlen(delm));
676 if ((pos2 = str.find(
' ')) != std::string::npos) {
677 ports.push_back(str.substr(0, pos2));
687 if (!strlen(server)) {
699 int ret = SendMsg(msg, server, response, 3.0);
702 for(
size_t i=0; i<response.
size(); i++)
716 processes.push_back(proc);
721 strError =
"cannot ask ";
723 strError +=
" to give the list of running processes.";
726 strError += std::string(
" due to " + __trace_message);
734 std::string topic = std::string(from) + std::string(to);
747 const char *qosFrom,
const char *qosTo) {
750 if (qosFrom && qosTo && !strlen(qosFrom) && !strlen(qosTo)) {
756 if(qosFrom !=
nullptr && strlen(qosFrom)) {
757 if(!getQosFromString(qosFrom, styleFrom)) {
758 strError =
"Error in parsing Qos properties of " + std::string(from);
762 if (qosTo !=
nullptr && strlen(qosTo)) {
763 if(!getQosFromString(qosTo, styleTo)) {
764 strError =
"Error in parsing Qos properties of " + std::string(to);
768 return NetworkBase::setConnectionQos(from, to, styleFrom, styleTo,
true);
772 std::string strQos(qos);
773 transform(strQos.begin(), strQos.end(), strQos.begin(),
774 (
int(*)(
int))toupper);
775 strQos.erase( std::remove_if( strQos.begin(), strQos.end(), ::isspace ), strQos.end() );
778 std::stringstream ss(strQos);
780 while(getline(ss, prop,
';')) {
781 size_t p = prop.find(
':');
782 if (p != prop.npos) {
783 std::string key = prop.substr(0, p);
784 std::string value = prop.substr(p+1);
785 if (key.length() > 0 && value.length() > 0) {
786 if (key ==
"LEVEL" || key==
"DSCP" || key ==
"TOS") {
791 else if (key ==
"PRIORITY") {
793 int prio = strtol(value.c_str(), &p, 10);
796 else if (key ==
"POLICY") {
798 int policy = strtol(value.c_str(), &p, 10);
809 return strError.c_str();
813 bool YarpBroker::timeout(
double base,
double timeout)
815 SystemClock::delaySystem(1.0);
816 if ((SystemClock::nowSystem() - base) > timeout) {
824 if (!strStdioUUID.size()) {
828 std::string strStdioPort = strStdioUUID +
"/stdout";
829 stdioPort.
open(
"...");
831 double base = SystemClock::nowSystem();
835 while(!timeout(base, 5.0))
837 if (NetworkBase::connect(strStdioPort, stdioPort.
getName(), style)) {
842 strError =
"Cannot connect to stdio port ";
843 strError += strStdioPort;
854 for (
size_t i = 0; i < input->
size(); i++) {
863 NetworkBase::disconnect(stdioPort.
getName(), strStdioUUID);
868 int YarpBroker::SendMsg(
Bottle& msg, std::string target,
Bottle& response,
float fTimeout)
870 if (!
exists(target.c_str())) {
874 if (!semParam.
check()) {
881 if(!port.
open(
"..."))
883 __trace_message.clear();
893 __trace_message =
"(SendMsg) connecting to " + std::string(target);
894 for(
int i=0; i<10; i++)
896 ret = NetworkBase::connect(port.
getName(), target, style);
900 SystemClock::delaySystem(1.0);
906 __trace_message.clear();
911 __trace_message =
"(SendMsg) writing to " + std::string(target);
913 __trace_message =
"(SendMsg) disconnecting from " + std::string(target);
914 NetworkBase::disconnect(port.
getName(),target);
915 __trace_message.clear();
929 int YarpBroker::requestServer(
Property& config)
937 if (config.
check(
"cmd") && config.
check(
"stdio"))
949 if (config.
check(
"workdir")) {
952 if (config.
check(
"geometry")) {
955 if (config.
check(
"hold")) {
958 if (config.
check(
"env")) {
969 if (response.
size() > 2) {
980 if (config.
check(
"cmd"))
989 if (config.
check(
"workdir")) {
992 if (config.
check(
"env")) {
virtual void onBrokerStdout(const char *msg)
BrokerEventSink * eventSink
unsigned int generateID()
void threadRelease() override
Release method.
const char * error() override
bool connected(const char *from, const char *to, const char *carrier) override
bool getAllPorts(std::vector< std::string > &stingList)
const char * requestRpc(const char *szport, const char *request, double timeout) override
bool getSystemInfo(const char *server, yarp::os::SystemInfoSerializer &info)
bool getAllProcesses(const char *server, ProcessContainer &processes)
void detachStdout() override
bool connect(const char *from, const char *to, const char *carrier, bool persist=false) override
connection broker
bool rmconnect(const char *from, const char *to)
bool threadInit() override
Initialization method.
bool setQos(const char *from, const char *to, const char *qosFrom, const char *qosTo)
bool disconnect(const char *from, const char *to, const char *carrier) override
void run() override
Loop function.
bool attachStdout() override
bool exists(const char *port) override
A simple collection of objects that can be described and transmitted in a portable way.
void fromString(const std::string &text)
Initializes bottle from a string.
Bottle & addList()
Places an empty nested list in the bottle, at the end of the list.
size_type size() const
Gets the number of elements in the bottle.
Value & get(size_type index) const
Reads a Value v from a certain part of the list.
void clear()
Empties the bottle of any objects it contains.
void addInt32(std::int32_t x)
Places a 32-bit integer in the bottle, at the end of the list.
void addString(const char *str)
Places a string in the bottle, at the end of the list.
std::string toString() const override
Gives a human-readable textual representation of the bottle.
std::string getName() const override
Get name of port.
void close() override
Stop port activity.
bool open(const std::string &name) override
Start port operation, with a specific name, with automatically-chosen network parameters.
T * read(bool shouldWait=true) override
Read an available object from the port.
static bool connect(const std::string &src, const std::string &dest, const std::string &carrier="", bool quiet=true)
Request that an output port connect to an input port.
An abstraction for a periodic thread.
A mini-server for network communication.
bool write(const PortWriter &writer, const PortWriter *callback=nullptr) const override
Write an object to the port.
bool setTimeout(float timeout)
Set a timeout on network operations.
void close() override
Stop port activity.
bool open(const std::string &name) override
Start port operation, with a specific name, with automatically-chosen network parameters.
A class for storing options and configuration information.
Value & find(const std::string &key) const override
Gets a value corresponding to a given keyword.
void put(const std::string &key, const std::string &value)
Associate the given key with the given string.
bool check(const std::string &key) const override
Check if there exists a property of the given name.
void clear()
Remove all associations.
Bottle & findGroup(const std::string &key) const override
Gets a list corresponding to a given keyword.
Preferences for the port's Quality of Service.
void setThreadPriority(int priority)
sets the communication thread priority level
bool setPacketPriority(const std::string &priority)
sets the packet priority from a string.
void setThreadPolicy(int policy)
sets the communication thread scheduling policy
void wait()
Decrement the counter, even if we must wait to do that.
bool check()
Decrement the counter, unless that would require waiting.
void post()
Increment the counter.
A helper class to pass the SystemInfo object around the YARP network.
virtual bool isString() const
Checks if value is a string.
virtual std::int32_t asInt32() const
Get 32-bit integer value.
Value & find(const std::string &key) const override
Gets a value corresponding to a given keyword.
bool check(const std::string &key) const override
Check if there exists a property of the given name.
virtual std::string asString() const
Get string value.
std::stringstream OSTRINGSTREAM
std::vector< Process > ProcessContainer
The components from which ports and connections are built.
An interface to the operating system, including Port based communication.
bool write(const ImageOf< PixelRgb > &src, const std::string &dest, image_fileformat format=FORMAT_PPM)
#define CONNECTION_TIMEOUT
#define YARPRUN_CONNECTION_TIMOUT
#define YARPRUN_NORESPONSE
#define YARPRUN_NOCONNECTION
#define YARPRUN_SEMAPHORE_PARAM
const char * yarprun_err_msg[]
#define EVENT_THREAD_PERIOD