39 # include <ace/INET_Addr.h>
40 # include <ace/Sched_Params.h>
47 #if defined(__linux__)
49 # include <sys/types.h>
73 yCDebug(PORTCORE,
"Starting listening on %s", address.
toURI().c_str());
77 yCError(PORTCORE,
"YARP not initialized; create a yarp::os::Network object before using ports");
83 m_stateSemaphore.wait();
93 yCAssert(PORTCORE, m_face ==
nullptr);
101 m_address.setTimeout(m_timeout);
106 if (m_face ==
nullptr) {
107 m_stateSemaphore.post();
112 if (m_address.getPort() <= 0) {
113 m_address = m_face->getLocalAddress();
114 if (m_address.getRegName() ==
"...") {
115 m_address.setName(std::string(
"/") + m_address.getHost() +
"_" +
NetType::toString(m_address.getPort()));
116 setName(m_address.getRegName());
122 m_stateSemaphore.post();
125 if (shouldAnnounce) {
146 yCAssert(PORTCORE, m_reader ==
nullptr);
154 yCAssert(PORTCORE, m_adminReader ==
nullptr);
155 m_adminReader = &reader;
162 yCAssert(PORTCORE, m_readableCreator ==
nullptr);
163 m_readableCreator = &creator;
191 m_stateSemaphore.post();
193 yCTrace(PORTCORE,
"run running");
199 bool shouldStop =
false;
200 while (!shouldStop) {
205 m_stateSemaphore.wait();
210 yCDebug(PORTCORE,
"received something");
217 shouldStop |= m_closing;
222 m_stateSemaphore.post();
231 yCDebug(PORTCORE,
"spun off a connection");
248 m_stateSemaphore.wait();
249 for (
int i = 0; i < m_connectionListeners; i++) {
250 m_connectionChangeSemaphore.post();
252 m_connectionListeners = 0;
253 m_stateSemaphore.post();
256 yCTrace(PORTCORE,
"run closing");
259 m_stateSemaphore.wait();
260 for (
int i = 0; i < m_connectionListeners; i++) {
261 m_connectionChangeSemaphore.post();
263 m_connectionListeners = 0;
265 m_stateSemaphore.post();
273 if (m_prop !=
nullptr) {
277 m_modifier.releaseOutModifier();
278 m_modifier.releaseInModifier();
287 m_stateSemaphore.wait();
301 m_stateSemaphore.post();
304 m_stateSemaphore.wait();
308 m_stateSemaphore.post();
316 yCTrace(PORTCORE,
"manualStart");
324 m_interruptable =
false;
334 m_interrupted =
false;
339 yCTrace(PORTCORE,
"interrupt");
347 m_interrupted =
true;
352 if (!m_interruptable) {
360 m_stateSemaphore.wait();
361 if (m_reader !=
nullptr) {
362 yCDebug(PORTCORE,
"sending update-state message to listener");
368 m_stateSemaphore.post();
372 void PortCore::closeMain()
374 yCTrace(PORTCORE,
"closeMain");
376 m_stateSemaphore.wait();
379 if (m_finishing || !(m_running || m_manual)) {
380 yCTrace(PORTCORE,
"closeMain - nothing to do");
381 m_stateSemaphore.post();
385 yCTrace(PORTCORE,
"closeMain - Central");
389 yCDebug(PORTCORE,
"now preparing to shut down port");
390 m_stateSemaphore.post();
399 std::string prevName;
402 std::string removeName;
403 m_stateSemaphore.wait();
404 for (
auto* unit : m_units) {
405 if ((unit !=
nullptr) && unit->isInput() && !unit->isDoomed()) {
406 Route r = unit->getRoute();
408 if (s.length() >= 1 && s[0] ==
'/' && s != getName() && s != prevName) {
415 m_stateSemaphore.post();
417 yCDebug(PORTCORE,
"requesting removal of connection from %s", removeName.c_str());
426 prevName = removeName;
436 m_stateSemaphore.wait();
437 for (
auto* unit : m_units) {
438 if ((unit !=
nullptr) && unit->isOutput() && !unit->isFinished()) {
439 removeRoute = unit->getRoute();
446 m_stateSemaphore.post();
448 removeUnit(removeRoute,
true);
452 m_stateSemaphore.wait();
453 bool stopRunning = m_running;
454 m_stateSemaphore.post();
459 m_stateSemaphore.wait();
461 m_stateSemaphore.post();
475 m_stateSemaphore.wait();
477 m_stateSemaphore.post();
484 m_stateSemaphore.wait();
488 m_stateSemaphore.post();
494 yCAssert(PORTCORE, m_face !=
nullptr);
504 if (m_reader !=
nullptr) {
505 yCDebug(PORTCORE,
"sending end-of-port message to listener");
513 std::string name = getName();
514 if (name != std::string(
"")) {
515 if (m_controlRegistration) {
531 yCAssert(PORTCORE, m_face ==
nullptr);
538 m_stateSemaphore.wait();
540 m_stateSemaphore.post();
545 void PortCore::closeUnits()
549 m_stateSemaphore.wait();
551 m_stateSemaphore.post();
555 for (
auto& i : m_units) {
557 if (unit !=
nullptr) {
558 yCDebug(PORTCORE,
"closing a unit");
560 yCDebug(PORTCORE,
"joining a unit");
563 yCDebug(PORTCORE,
"deleting a unit");
572 void PortCore::reapUnits()
576 m_stateSemaphore.wait();
578 for (
auto* unit : m_units) {
581 yCDebug(PORTCORE,
"Informing connection %s that it is doomed", s.c_str());
583 yCDebug(PORTCORE,
"Closed connection %s", s.c_str());
585 yCDebug(PORTCORE,
"Joined thread of connection %s", s.c_str());
589 m_stateSemaphore.post();
593 void PortCore::cleanUnits(
bool blocking)
601 m_stateSemaphore.wait();
603 blocking = m_stateSemaphore.check();
610 int updatedInputCount = 0;
611 int updatedOutputCount = 0;
612 int updatedDataOutputCount = 0;
613 yCDebug(PORTCORE,
"/ routine check of connections to this port begins");
617 for (
auto& i : m_units) {
619 if (unit !=
nullptr) {
623 yCDebug(PORTCORE,
"| removing connection %s", con.c_str());
628 yCDebug(PORTCORE,
"| removed connection %s", con.c_str());
633 updatedOutputCount++;
635 updatedDataOutputCount++;
652 for (
size_t i2 = 0; i2 < m_units.size(); i2++) {
653 if (m_units[i2] !=
nullptr) {
655 m_units[rem] = m_units[i2];
656 m_units[i2] =
nullptr;
663 for (
size_t i3 = 0; i3 < m_units.size() - rem; i3++) {
669 m_dataOutputCount = updatedDataOutputCount;
670 m_stateSemaphore.post();
671 m_packetMutex.lock();
672 m_inputCount = updatedInputCount;
673 m_outputCount = updatedOutputCount;
674 m_packetMutex.unlock();
675 yCDebug(PORTCORE,
"\\ routine check of connections to this port ends");
689 m_stateSemaphore.wait();
694 yCAssert(PORTCORE, unit !=
nullptr);
696 m_units.push_back(unit);
697 yCTrace(PORTCORE,
"there are now %zu units", m_units.size());
698 m_stateSemaphore.post();
704 yCTrace(PORTCORE,
"addOutput");
714 m_stateSemaphore.wait();
717 yCAssert(PORTCORE, unit !=
nullptr);
719 m_units.push_back(unit);
721 m_stateSemaphore.post();
725 bool PortCore::isUnit(
const Route& route,
int index)
728 bool needReap =
false;
730 for (
auto* unit : m_units) {
731 if (unit !=
nullptr) {
733 std::string wild =
"*";
736 ok = ok && (unit->
getIndex() == index);
760 bool PortCore::removeUnit(
const Route& route,
bool synch,
bool* except)
765 if (except !=
nullptr) {
766 yCDebug(PORTCORE,
"asked to remove connection in the way of %s", route.
toString().c_str());
769 yCDebug(PORTCORE,
"asked to remove connection %s", route.
toString().c_str());
774 std::vector<int> removals;
775 m_stateSemaphore.wait();
776 bool needReap =
false;
778 for (
auto* unit : m_units) {
779 if (unit !=
nullptr) {
781 std::string wild =
"*";
790 if (except ==
nullptr) {
802 removals.push_back(unit->
getIndex());
809 m_stateSemaphore.post();
816 yCDebug(PORTCORE,
"one or more connections need prodding to die");
828 yCDebug(PORTCORE,
"sent message to prod connection death");
832 yCDebug(PORTCORE,
"synchronizing with connection death");
835 m_stateSemaphore.wait();
836 for (
int removal : removals) {
837 cont = isUnit(route, removal);
843 m_connectionListeners++;
845 m_stateSemaphore.post();
847 m_connectionChangeSemaphore.wait();
863 yCDebug(PORTCORE,
"asked to add output to %s", dest.c_str());
876 bw.
appendLine(std::string(
"Do not know how to connect to ") + dest);
893 yCDebug(PORTCORE,
"output already present to %s", dest.c_str());
894 bw.
appendLine(std::string(
"Desired connection already present from ") + getName() +
" to " + dest);
908 aname = address.
toURI(
false);
920 unsigned int f = getFlags();
925 bool is_log = (!mode.empty());
928 err =
"Logger configured as log." + mode +
", but only log.in is supported";
931 append =
"; " + r.
getFromName() +
" will forward messages and replies (if any) to " + r.
getToName();
942 err =
"Outputs not allowed";
947 if (m_dataOutputCount >= 1 && !is_log) {
948 err =
"RPC output already connected";
974 bool ok = op->
open(r);
976 yCDebug(PORTCORE,
"open route error");
984 bw.
appendLine(std::string(
"Cannot connect to ") + dest);
1005 m_stateSemaphore.wait();
1011 yCAssert(PORTCORE, unit !=
nullptr);
1013 m_units.push_back(unit);
1015 m_stateSemaphore.post();
1019 bw.
appendLine(std::string(
"Added connection from ") + getName() +
" to " + dest + append);
1020 if (os !=
nullptr) {
1031 yCDebug(PORTCORE,
"asked to remove output to %s", dest.c_str());
1035 if (removeUnit(
Route(
"*", dest,
"*"),
true)) {
1036 bw.
appendLine(std::string(
"Removed connection from ") + getName() +
" to " + dest);
1038 bw.
appendLine(std::string(
"Could not find an outgoing connection to ") + dest);
1040 if (os !=
nullptr) {
1049 yCDebug(PORTCORE,
"asked to remove input to %s", src.c_str());
1053 if (removeUnit(
Route(src,
"*",
"*"),
true)) {
1054 bw.
appendLine(std::string(
"Removing connection from ") + src +
" to " + getName());
1056 bw.
appendLine(std::string(
"Could not find an incoming connection from ") + src);
1058 if (os !=
nullptr) {
1073 m_stateSemaphore.wait();
1076 bw.
appendLine(std::string(
"This is ") + m_address.getRegName() +
" at " + m_address.toURI());
1080 for (
auto* unit : m_units) {
1089 bw.
appendLine(
"There are no outgoing connections");
1094 for (
auto* unit : m_units) {
1105 bw.
appendLine(
"There are no incoming connections");
1108 m_stateSemaphore.post();
1111 if (os !=
nullptr) {
1116 printf(
"%s\n", sos.
toString().c_str());
1125 m_stateSemaphore.wait();
1130 std::string portName = m_address.getRegName();
1131 baseInfo.
message = std::string(
"This is ") + portName +
" at " + m_address.toURI();
1132 reporter.
report(baseInfo);
1136 for (
auto* unit : m_units) {
1155 info.
message =
"There are no outgoing connections";
1161 for (
auto* unit : m_units) {
1180 info.
message =
"There are no incoming connections";
1184 m_stateSemaphore.post();
1190 m_stateSemaphore.wait();
1191 if (reporter !=
nullptr) {
1192 m_eventReporter = reporter;
1194 m_stateSemaphore.post();
1199 m_stateSemaphore.wait();
1200 m_eventReporter =
nullptr;
1201 m_stateSemaphore.post();
1212 if (m_eventReporter !=
nullptr) {
1213 m_eventReporter->report(info);
1230 if (m_reader !=
nullptr && !m_interrupted) {
1231 m_interruptable =
false;
1233 bool haveOutputs = (m_outputCount != 0);
1235 if (m_logNeeded && haveOutputs) {
1242 recorder.
init(&reader);
1244 result = m_reader->read(recorder);
1252 result = m_reader->read(reader);
1256 m_interruptable =
true;
1259 yCDebug(PORTCORE,
"data received, no reader for it");
1261 result = b.
read(reader);
1274 m_modifier.outputMutex.lock();
1275 if (m_modifier.outputModifier !=
nullptr) {
1276 if (!m_modifier.outputModifier->acceptOutgoingData(writer)) {
1277 m_modifier.outputMutex.unlock();
1280 m_modifier.outputModifier->modifyOutgoingData(writer);
1282 m_modifier.outputMutex.unlock();
1296 if (m_interrupted || m_finishing) {
1301 bool gotReply =
false;
1303 std::string envelopeString = m_envelope;
1313 yCTrace(PORTCORE,
"------- send in real");
1325 m_stateSemaphore.wait();
1329 m_stateSemaphore.post();
1333 yCTrace(PORTCORE,
"------- send in");
1336 m_packetMutex.lock();
1338 yCAssert(PORTCORE, packet !=
nullptr);
1339 packet->
setContent(&writer,
false, callback);
1340 m_packetMutex.unlock();
1343 for (
auto* unit : m_units) {
1345 bool log = (!unit->
getMode().empty());
1355 yCTrace(PORTCORE,
"------- -- inc");
1356 m_packetMutex.lock();
1358 m_packetMutex.unlock();
1359 yCTrace(PORTCORE,
"------- -- pre-send");
1360 bool gotReplyOne =
false;
1362 void* out = unit->
send(writer,
1364 (callback !=
nullptr) ? callback : (&writer),
1365 reinterpret_cast<void*
>(packet),
1370 gotReply = gotReply || gotReplyOne;
1371 yCTrace(PORTCORE,
"------- -- send");
1372 if (out !=
nullptr) {
1374 m_packetMutex.lock();
1377 m_packetMutex.unlock();
1384 yCTrace(PORTCORE,
"------- -- dec");
1387 yCTrace(PORTCORE,
"------- pack check");
1388 m_packetMutex.lock();
1395 m_packets.checkPacket(packet);
1396 m_packetMutex.unlock();
1397 yCTrace(PORTCORE,
"------- packed");
1398 yCTrace(PORTCORE,
"------- send out");
1400 if (logCount == 0) {
1401 m_logNeeded =
false;
1404 m_stateSemaphore.post();
1405 yCTrace(PORTCORE,
"------- send out real");
1407 if (m_waitAfterSend && reader !=
nullptr) {
1408 all_ok = all_ok && gotReply;
1417 bool writing =
false;
1419 m_stateSemaphore.wait();
1424 for (
auto* unit : m_units) {
1431 m_stateSemaphore.post();
1440 m_packetMutex.lock();
1441 int result = m_inputCount;
1442 m_packetMutex.unlock();
1449 m_packetMutex.lock();
1450 int result = m_outputCount;
1451 m_packetMutex.unlock();
1458 yCTrace(PORTCORE,
"starting notifyCompletion");
1459 m_packetMutex.lock();
1460 if (tracker !=
nullptr) {
1464 m_packetMutex.unlock();
1465 yCTrace(PORTCORE,
"stopping notifyCompletion");
1471 m_envelopeWriter.restart();
1472 bool ok = envelope.
write(m_envelopeWriter);
1474 setEnvelope(m_envelopeWriter.toString());
1482 m_envelope = envelope;
1483 for (
size_t i = 0; i < m_envelope.length(); i++) {
1486 if (m_envelope[i] < 32) {
1487 m_envelope = m_envelope.substr(0, i);
1491 yCDebug(PORTCORE,
"set envelope to %s", m_envelope.c_str());
1502 sis.
add(m_envelope);
1506 sbr.
reset(sis,
nullptr, route, 0,
true);
1507 return envelope.
read(sbr);
1515 const char* carrier,
1521 style.
quiet = !verbose;
1538 result = addr.set(c.
getPort(),
"127.0.0.1");
1542 result = addr.set(c.
getPort(),
"127.0.1.1");
1591 if (cmd ==
"publisherUpdate") {
1592 return PortCoreCommand::RosPublisherUpdate;
1594 if (cmd ==
"requestTopic") {
1595 return PortCoreCommand::RosRequestTopic;
1597 if (cmd ==
"getPid") {
1598 return PortCoreCommand::RosGetPid;
1600 if (cmd ==
"getBusInfo") {
1601 return PortCoreCommand::RosGetBusInfo;
1605 auto cmd =
static_cast<PortCoreCommand
>(v.
asVocab());
1607 case PortCoreCommand::Help:
1608 case PortCoreCommand::Ver:
1609 case PortCoreCommand::Pray:
1610 case PortCoreCommand::Add:
1611 case PortCoreCommand::Del:
1612 case PortCoreCommand::Atch:
1613 case PortCoreCommand::Dtch:
1614 case PortCoreCommand::List:
1615 case PortCoreCommand::Set:
1616 case PortCoreCommand::Get:
1617 case PortCoreCommand::Prop:
1618 case PortCoreCommand::RosPublisherUpdate:
1619 case PortCoreCommand::RosRequestTopic:
1620 case PortCoreCommand::RosGetPid:
1621 case PortCoreCommand::RosGetBusInfo:
1624 return PortCoreCommand::Unknown;
1628 PortCoreConnectionDirection parseConnectionDirection(
yarp::conf::vocab32_t v,
bool errorIsOut =
false)
1630 auto dir =
static_cast<PortCoreConnectionDirection
>(v);
1632 case PortCoreConnectionDirection::In:
1633 case PortCoreConnectionDirection::Out:
1636 return errorIsOut ? PortCoreConnectionDirection::Out : PortCoreConnectionDirection::Error;
1642 auto action =
static_cast<PortCorePropertyAction
>(v);
1644 case PortCorePropertyAction::Get:
1645 case PortCorePropertyAction::Set:
1648 return PortCorePropertyAction::Error;
1652 void describeRoute(
const Route& route,
Bottle& result)
1669 bconnectionless.
addString(
"connectionless");
1672 if (!carrier->
isPush()) {
1694 yCDebug(PORTCORE,
"Port %s received command %s", getName().c_str(), cmd.
toString().c_str());
1696 auto handleAdminHelpCmd = []() {
1700 result.
addString(
"[help] # give this help");
1701 result.
addString(
"[ver] # report protocol version information");
1702 result.
addString(
"[add] $portname # add an output connection");
1703 result.
addString(
"[add] $portname $car # add an output with a given protocol");
1704 result.
addString(
"[del] $portname # remove an input or output connection");
1705 result.
addString(
"[list] [in] # list input connections");
1706 result.
addString(
"[list] [out] # list output connections");
1707 result.
addString(
"[list] [in] $portname # give details for input");
1708 result.
addString(
"[list] [out] $portname # give details for output");
1709 result.
addString(
"[prop] [get] # get all user-defined port properties");
1710 result.
addString(
"[prop] [get] $prop # get a user-defined port property (prop, val)");
1711 result.
addString(
"[prop] [set] $prop $val # set a user-defined port property (prop, val)");
1712 result.
addString(
"[prop] [get] $portname # get Qos properties of a connection to/from a port");
1713 result.
addString(
"[prop] [set] $portname # set Qos properties of a connection to/from a port");
1714 result.
addString(
"[prop] [get] $cur_port # get information about current process (e.g., scheduling priority, pid)");
1715 result.
addString(
"[prop] [set] $cur_port # set properties of the current process (e.g., scheduling priority, pid)");
1716 result.
addString(
"[atch] [out] $prop # attach a portmonitor plug-in to the port's output");
1717 result.
addString(
"[atch] [in] $prop # attach a portmonitor plug-in to the port's input");
1718 result.
addString(
"[dtch] [out] # detach portmonitor plug-in from the port's output");
1719 result.
addString(
"[dtch] [in] # detach portmonitor plug-in from the port's input");
1725 auto handleAdminVerCmd = []() {
1736 auto handleAdminPrayCmd = [
this]() {
1748 while (name[0] ==
'/') {
1749 name = name.substr(1);
1752 auto i = name.find(
'/');
1753 if (i != std::string::npos) {
1754 name = name.substr(0, i);
1758 std::random_device rd;
1759 std::mt19937 mt(rd());
1760 std::uniform_int_distribution<int> dist2(0,1);
1761 auto d2 = std::bind(dist2, mt);
1763 result.
addString(
"You begin praying to " + name +
".");
1764 result.
addString(
"You finish your prayer.");
1766 static const char* godvoices[] = {
1772 std::uniform_int_distribution<int> godvoices_dist(0, (
sizeof(godvoices) /
sizeof(godvoices[0])) - 1);
1773 auto godvoice = [&]() {
1774 return std::string(godvoices[godvoices_dist(mt)]);
1777 static const char* creatures[] = {
1782 std::uniform_int_distribution<int> creatures_dist(0, (
sizeof(creatures) /
sizeof(creatures[0])) - 1);
1783 auto creature = [&]() {
1784 return std::string(creatures[creatures_dist(mt)]);
1787 static const char* auras[] = {
1795 std::uniform_int_distribution<int> auras_dist(0, (
sizeof(auras) /
sizeof(auras[0])) - 1);
1797 return std::string(auras[auras_dist(mt)]);
1800 static const char* items[] = {
1810 std::uniform_int_distribution<int> items_dist(0, (
sizeof(items) /
sizeof(items[0])) - 1);
1812 return std::string(items[items_dist(mt)]);
1815 static const char* blessings[] = {
1816 "You feel more limber.",
1817 "The slime disappears.",
1818 "Your amulet vanishes! You can breathe again.",
1819 "You can breathe again.",
1820 "You are back on solid ground.",
1821 "Your stomach feels content.",
1823 "You feel much better.",
1824 "Your surroundings change.",
1825 "Your shape becomes uncertain.",
1826 "Your chain disappears.",
1827 "There's a tiger in your tank.",
1828 "You feel in good health again.",
1829 "Your eye feels better.",
1830 "Your eyes feel better.",
1831 "Looks like you are back in Kansas.",
1832 "Your <ITEM> softly glows <AURA>.",
1834 std::uniform_int_distribution<int> blessings_dist(0, (
sizeof(blessings) /
sizeof(blessings[0])) - 1);
1835 auto blessing = [&](){
1836 auto blessing = std::string(blessings[blessings_dist(mt)]);
1837 blessing = std::regex_replace(blessing, std::regex(
"<ITEM>"), item());
1838 blessing = std::regex_replace(blessing, std::regex(
"<AURA>"), aura());
1842 std::uniform_int_distribution<int> dist13(0,12);
1843 switch(dist13(mt)) {
1846 result.
addString(
"You feel that " + name +
" is " + (d2() ?
"bummed" :
"displeased") +
".");
1850 result.
addString(
"The voice of " + name +
" " + godvoice() +
1851 ": \"Thou " + (d2() ?
"hast strayed from the path" :
"art arrogant") +
1852 ", " + creature() +
". Thou must relearn thy lessons!\"");
1856 result.
addString(
"The voice of " + name +
" " + godvoice() +
1857 ": \"Thou hast angered me.\"");
1858 result.
addString(
"A black glow surrounds you.");
1861 result.
addString(
"The voice of " + name +
" " + godvoice() +
1862 ": \"Thou hast angered me.\"");
1866 result.
addString(
"The voice of " + name +
" " + godvoice() +
1867 ": \"Thou durst " + (d2() ?
"scorn" :
"call upon") +
1868 " me? Then die, " + creature() +
"!\"");
1871 result.
addString(
"You feel that " + name +
" is " + (d2() ?
"pleased as punch" :
"well-pleased") +
".");
1875 result.
addString(
"You feel that " + name +
" is " + (d2() ?
"ticklish" :
"pleased") +
".");
1879 result.
addString(
"You feel that " + name +
" is " + (d2() ?
"full" :
"satisfied") +
".");
1883 result.
addString(
"The voice of " + name +
" " + godvoice() +
1884 ": \"Thou hast angered me.\"");
1885 result.
addString(
"Suddenly, a bolt of lightning strikes you!");
1886 result.
addString(
"You fry to a crisp!");
1893 auto handleAdminAddCmd = [
this, id](std::string output,
1894 const std::string& carrier) {
1898 if (!carrier.empty()) {
1899 output = carrier +
":/" + output;
1901 addOutput(output,
id, &cache,
false);
1903 int v = (r[0] ==
'A') ? 0 : -1;
1909 auto handleAdminDelCmd = [
this, id](
const std::string& dest) {
1913 removeOutput(dest,
id, &cache);
1916 removeInput(dest,
id, &cache);
1918 int v = (r1[0] ==
'R' || r2[0] ==
'R') ? 0 : -1;
1920 if (r1[0] ==
'R' && r2[0] !=
'R') {
1922 }
else if (r1[0] !=
'R' && r2[0] ==
'R') {
1930 auto handleAdminAtchCmd = [
this](PortCoreConnectionDirection direction,
1933 switch (direction) {
1934 case PortCoreConnectionDirection::Out: {
1936 if (!attachPortMonitor(prop,
true, errMsg)) {
1943 case PortCoreConnectionDirection::In: {
1945 if (!attachPortMonitor(prop,
false, errMsg)) {
1952 case PortCoreConnectionDirection::Error:
1954 result.
addString(
"attach command must be followed by [out] or [in]");
1959 auto handleAdminDtchCmd = [
this](PortCoreConnectionDirection direction) {
1961 switch (direction) {
1962 case PortCoreConnectionDirection::Out: {
1963 if (detachPortMonitor(
true)) {
1969 case PortCoreConnectionDirection::In: {
1970 if (detachPortMonitor(
false)) {
1976 case PortCoreConnectionDirection::Error:
1978 result.
addString(
"detach command must be followed by [out] or [in]");
1983 auto handleAdminListCmd = [
this](
const PortCoreConnectionDirection direction,
1984 const std::string& target) {
1986 switch (direction) {
1987 case PortCoreConnectionDirection::In: {
1989 m_stateSemaphore.wait();
1990 for (
auto* unit : m_units) {
1993 if (target.empty()) {
1995 if (!name.empty()) {
1999 describeRoute(route, result);
2003 m_stateSemaphore.post();
2005 case PortCoreConnectionDirection::Out: {
2007 m_stateSemaphore.wait();
2008 for (
auto* unit : m_units) {
2011 if (target.empty()) {
2013 }
else if (route.
getToName() == target) {
2014 describeRoute(route, result);
2018 m_stateSemaphore.post();
2020 case PortCoreConnectionDirection::Error:
2028 auto handleAdminSetInCmd = [
this](
const std::string& target,
2032 m_stateSemaphore.wait();
2033 if (target.empty()) {
2035 result.
addString(
"target port is not specified.\r\n");
2037 if (target == getName()) {
2039 if (!setParamPortMonitor(property,
false, errMsg)) {
2046 for (
auto* unit : m_units) {
2052 std::string msg =
"Configured connection from ";
2061 if (result.
size() == 0) {
2063 std::string msg =
"Could not find an incoming connection from ";
2069 m_stateSemaphore.post();
2073 auto handleAdminSetOutCmd = [
this](
const std::string& target,
2077 m_stateSemaphore.wait();
2078 if (target.empty()) {
2080 result.
addString(
"target port is not specified.\r\n");
2082 if (target == getName()) {
2084 if (!setParamPortMonitor(property,
true, errMsg)) {
2091 for (
auto* unit : m_units) {
2097 std::string msg =
"Configured connection to ";
2106 if (result.
size() == 0) {
2108 std::string msg =
"Could not find an incoming connection to ";
2114 m_stateSemaphore.post();
2118 auto handleAdminGetInCmd = [
this](
const std::string& target) {
2121 m_stateSemaphore.wait();
2122 if (target.empty()) {
2124 result.
addString(
"target port is not specified.\r\n");
2125 }
else if (target == getName()) {
2128 if (!getParamPortMonitor(property,
false, errMsg)) {
2135 for (
auto* unit : m_units) {
2146 if (result.
size() == 0) {
2148 std::string msg =
"Could not find an incoming connection from ";
2154 m_stateSemaphore.post();
2158 auto handleAdminGetOutCmd = [
this](
const std::string& target) {
2161 m_stateSemaphore.wait();
2162 if (target.empty()) {
2164 result.
addString(
"target port is not specified.\r\n");
2165 }
else if (target == getName()) {
2168 if (!getParamPortMonitor(property,
true, errMsg)) {
2175 for (
auto* unit : m_units) {
2186 if (result.
size() == 0) {
2188 std::string msg =
"Could not find an incoming connection to ";
2194 m_stateSemaphore.post();
2198 auto handleAdminPropGetCmd = [
this](
const std::string& key) {
2200 Property* p = acquireProperties(
false);
2206 if (key[0] ==
'/') {
2207 bool bFound =
false;
2209 if (key == getName()) {
2214 sched_prop.
put(
"tid",
static_cast<int>(this->getTid()));
2215 sched_prop.
put(
"priority", this->getPriority());
2216 sched_prop.
put(
"policy", this->getPolicy());
2222 proc_prop.
put(
"pid", info.
pid);
2223 proc_prop.
put(
"name", (info.
pid != -1) ? info.
name :
"unknown");
2224 proc_prop.
put(
"arguments", (info.
pid != -1) ? info.
arguments :
"unknown");
2232 platform_prop.
put(
"os", pinfo.
name);
2233 platform_prop.
put(
"hostname", m_address.getHost());
2235 unsigned int f = getFlags();
2242 port_prop.
put(
"is_input", is_input);
2243 port_prop.
put(
"is_output", is_output);
2244 port_prop.
put(
"is_rpc", is_rpc);
2245 port_prop.
put(
"type", getType().getName());
2247 for (
auto* unit : m_units) {
2248 if ((unit !=
nullptr) && !unit->
isFinished()) {
2251 if (key == coreName) {
2255 int tos = getTypeOfService(unit);
2256 int tid =
static_cast<int>(unit->
getTid());
2260 sched_prop.
put(
"tid", tid);
2261 sched_prop.
put(
"priority", priority);
2262 sched_prop.
put(
"policy", policy);
2266 qos_prop.
put(
"tos", tos);
2274 std::string msg =
"cannot find any connection to/from ";
2284 releaseProperties(p);
2288 auto handleAdminPropSetCmd = [
this](
const std::string& key,
2294 Property* p = acquireProperties(
false);
2301 if (!process.isNull()) {
2302 std::string portName = key;
2303 if ((!portName.empty()) && (portName[0] ==
'/')) {
2305 if (portName == getName()) {
2308 if (process_prop !=
nullptr) {
2311 if (process_prop->
check(
"priority")) {
2314 if (process_prop->
check(
"policy")) {
2317 bOk = setProcessSchedulingParam(prio, policy);
2328 if (!sched.isNull()) {
2329 if ((!key.empty()) && (key[0] ==
'/')) {
2331 for (
auto* unit : m_units) {
2332 if ((unit !=
nullptr) && !unit->
isFinished()) {
2336 if (portName == key) {
2338 if (sched_prop !=
nullptr) {
2341 if (sched_prop->
check(
"priority")) {
2344 if (sched_prop->
check(
"policy")) {
2361 if (!qos.isNull()) {
2362 if ((!key.empty()) && (key[0] ==
'/')) {
2364 for (
auto* unit : m_units) {
2365 if ((unit !=
nullptr) && !unit->
isFinished()) {
2368 if (portName == key) {
2370 if (qos_prop !=
nullptr) {
2372 if (qos_prop->
check(
"priority")) {
2397 }
else if (qos_prop->
check(
"dscp")) {
2402 auto dscp_val = qos_prop->
find(
"dscp");
2403 if (dscp_val.isInt32()) {
2407 dscp =
static_cast<int>(dscp_class);
2409 if ((dscp >= 0) && (dscp < 64)) {
2412 }
else if (qos_prop->
check(
"tos")) {
2414 auto tos_val = qos_prop->
find(
"tos");
2415 if (tos_val.isInt32()) {
2420 bOk = setTypeOfService(unit, tos);
2432 releaseProperties(p);
2439 auto handleAdminRosPublisherUpdateCmd = [
this](
const std::string& topic,
Bottle* pubs) {
2448 if (pubs !=
nullptr) {
2450 for (
size_t i = 0; i < pubs->size(); i++) {
2451 std::string pub = pubs->get(i).asString();
2455 m_stateSemaphore.wait();
2456 for (
auto* unit : m_units) {
2457 if ((unit !=
nullptr) && unit->
isPupped()) {
2460 if (!listed.
check(me)) {
2465 m_stateSemaphore.post();
2466 for (
size_t i = 0; i < pubs->size(); i++) {
2467 std::string pub = pubs->get(i).asString();
2468 if (!present.
check(pub)) {
2469 yCDebug(PORTCORE,
"ROS ADD %s", pub.c_str());
2479 yCDebug(PORTCORE,
"Sending [%s] to %s", req.
toString().c_str(), pub.c_str());
2481 if (!
__pc_rpc(c,
"xmlrpc", req, reply,
false)) {
2482 fprintf(stderr,
"Cannot connect to ROS subscriber %s\n", pub.c_str());
2484 __pc_rpc(c,
"xmlrpc", req, reply,
true);
2488 std::string hostname;
2489 std::string carrier;
2492 fprintf(stderr,
"Failure looking up topic %s: %s\n", topic.c_str(), reply.
toString().c_str());
2493 }
else if (pref ==
nullptr) {
2494 fprintf(stderr,
"Failure looking up topic %s: expected list of protocols\n", topic.c_str());
2496 fprintf(stderr,
"Failure looking up topic %s: unsupported protocol %s\n", topic.c_str(), pref->
get(0).
asString().c_str());
2502 carrier =
"tcpros+role.pub+topic.";
2504 yCDebug(PORTCORE,
"topic %s available at %s:%d", topic.c_str(), hostname.c_str(), portnum);
2507 Contact addr(hostname, portnum);
2511 if (op ==
nullptr) {
2512 fprintf(stderr,
"NO CONNECTION\n");
2522 m_stateSemaphore.wait();
2527 yCAssert(PORTCORE, unit !=
nullptr);
2530 m_units.push_back(unit);
2531 m_stateSemaphore.post();
2542 auto handleAdminRosRequestTopicCmd = [
this]() {
2556 auto handleAdminRosGetPidCmd = []() {
2565 auto handleAdminRosGetBusInfoCmd = []() {
2575 auto handleAdminUnknownCmd = [
this](
const Bottle& cmd) {
2578 if (m_adminReader !=
nullptr) {
2582 ok = m_adminReader->read(con.
getReader());
2590 result.
addString(
"send [help] for list of valid commands");
2595 const PortCoreCommand command = parseCommand(cmd.
get(0));
2597 case PortCoreCommand::Help:
2598 result = handleAdminHelpCmd();
2600 case PortCoreCommand::Ver:
2601 result = handleAdminVerCmd();
2603 case PortCoreCommand::Pray:
2604 result = handleAdminPrayCmd();
2606 case PortCoreCommand::Add: {
2609 result = handleAdminAddCmd(std::move(output), carrier);
2611 case PortCoreCommand::Del: {
2613 result = handleAdminDelCmd(dest);
2615 case PortCoreCommand::Atch: {
2616 const PortCoreConnectionDirection direction = parseConnectionDirection(cmd.
get(1).
asVocab());
2618 result = handleAdminAtchCmd(direction, std::move(prop));
2620 case PortCoreCommand::Dtch: {
2621 const PortCoreConnectionDirection direction = parseConnectionDirection(cmd.
get(1).
asVocab());
2622 result = handleAdminDtchCmd(direction);
2624 case PortCoreCommand::List: {
2625 const PortCoreConnectionDirection direction = parseConnectionDirection(cmd.
get(1).
asVocab(),
true);
2627 result = handleAdminListCmd(direction, target);
2629 case PortCoreCommand::Set: {
2630 const PortCoreConnectionDirection direction = parseConnectionDirection(cmd.
get(1).
asVocab(),
true);
2634 switch (direction) {
2635 case PortCoreConnectionDirection::In:
2636 result = handleAdminSetInCmd(target, property);
2638 case PortCoreConnectionDirection::Out:
2639 result = handleAdminSetOutCmd(target, property);
2641 case PortCoreConnectionDirection::Error:
2646 case PortCoreCommand::Get: {
2647 const PortCoreConnectionDirection direction = parseConnectionDirection(cmd.
get(1).
asVocab(),
true);
2649 switch (direction) {
2650 case PortCoreConnectionDirection::In:
2651 result = handleAdminGetInCmd(target);
2653 case PortCoreConnectionDirection::Out:
2654 result = handleAdminGetOutCmd(target);
2656 case PortCoreConnectionDirection::Error:
2661 case PortCoreCommand::Prop: {
2662 PortCorePropertyAction action = parsePropertyAction(cmd.
get(1).
asVocab());
2666 case PortCorePropertyAction::Get:
2667 result = handleAdminPropGetCmd(key);
2669 case PortCorePropertyAction::Set: {
2674 result = handleAdminPropSetCmd(key, value, process, sched, qos);
2676 case PortCorePropertyAction::Error:
2678 result.
addString(
"property action not known");
2682 case PortCoreCommand::RosPublisherUpdate: {
2687 result = handleAdminRosPublisherUpdateCmd(topic, pubs);
2690 case PortCoreCommand::RosRequestTopic:
2695 result = handleAdminRosRequestTopicCmd();
2698 case PortCoreCommand::RosGetPid:
2700 result = handleAdminRosGetPidCmd();
2703 case PortCoreCommand::RosGetBusInfo:
2705 result = handleAdminRosGetBusInfoCmd();
2708 case PortCoreCommand::Unknown:
2709 result = handleAdminUnknownCmd(cmd);
2714 if (writer !=
nullptr) {
2715 result.
write(*writer);
2722 bool PortCore::setTypeOfService(
PortCoreUnit* unit,
int tos)
2724 if (unit ==
nullptr) {
2728 yCDebug(PORTCORE,
"Trying to set TOS = %d", tos);
2732 if (outUnit !=
nullptr) {
2734 if (op !=
nullptr) {
2735 yCDebug(PORTCORE,
"Trying to set TOS = %d on output unit", tos);
2738 yCWarning(PORTCORE,
"Setting TOS on output unit failed");
2752 if (inUnit !=
nullptr) {
2755 yCDebug(PORTCORE,
"Trying to set TOS = %d on input unit", tos);
2758 yCWarning(PORTCORE,
"Setting TOS on input unit failed");
2770 if (unit ==
nullptr) {
2776 if (outUnit !=
nullptr) {
2778 if (op !=
nullptr) {
2791 if (inUnit !=
nullptr) {
2802 bool PortCore::attachPortMonitor(
yarp::os::Property& prop,
bool isOutput, std::string& errMsg)
2806 if (portmonitor ==
nullptr) {
2807 errMsg =
"Portmonitor carrier modifier cannot be find or it is not enabled in Yarp!";
2812 detachPortMonitor(
true);
2813 prop.
put(
"source", getName());
2814 prop.
put(
"destination",
"");
2815 prop.
put(
"sender_side", 1);
2816 prop.
put(
"receiver_side", 0);
2817 prop.
put(
"carrier",
"");
2818 m_modifier.outputMutex.lock();
2819 m_modifier.outputModifier = portmonitor;
2820 if (!m_modifier.outputModifier->configureFromProperty(prop)) {
2821 m_modifier.releaseOutModifier();
2822 errMsg =
"Failed to configure the portmonitor plug-in";
2823 m_modifier.outputMutex.unlock();
2826 m_modifier.outputMutex.unlock();
2828 detachPortMonitor(
false);
2829 prop.
put(
"source",
"");
2830 prop.
put(
"destination", getName());
2831 prop.
put(
"sender_side", 0);
2832 prop.
put(
"receiver_side", 1);
2833 prop.
put(
"carrier",
"");
2834 m_modifier.inputMutex.lock();
2835 m_modifier.inputModifier = portmonitor;
2836 if (!m_modifier.inputModifier->configureFromProperty(prop)) {
2837 m_modifier.releaseInModifier();
2838 errMsg =
"Failed to configure the portmonitor plug-in";
2839 m_modifier.inputMutex.unlock();
2842 m_modifier.inputMutex.unlock();
2848 bool PortCore::detachPortMonitor(
bool isOutput)
2851 m_modifier.outputMutex.lock();
2852 m_modifier.releaseOutModifier();
2853 m_modifier.outputMutex.unlock();
2855 m_modifier.inputMutex.lock();
2856 m_modifier.releaseInModifier();
2857 m_modifier.inputMutex.unlock();
2864 std::string& errMsg)
2867 m_modifier.outputMutex.lock();
2868 if (m_modifier.outputModifier ==
nullptr) {
2869 errMsg =
"No port modifier is attached to the output";
2870 m_modifier.outputMutex.unlock();
2874 m_modifier.outputMutex.unlock();
2876 m_modifier.inputMutex.lock();
2877 if (m_modifier.inputModifier ==
nullptr) {
2878 errMsg =
"No port modifier is attached to the input";
2879 m_modifier.inputMutex.unlock();
2882 m_modifier.inputModifier->setCarrierParams(param);
2883 m_modifier.inputMutex.unlock();
2890 std::string& errMsg)
2893 m_modifier.outputMutex.lock();
2894 if (m_modifier.outputModifier ==
nullptr) {
2895 errMsg =
"No port modifier is attached to the output";
2896 m_modifier.outputMutex.unlock();
2899 m_modifier.outputModifier->getCarrierParams(param);
2900 m_modifier.outputMutex.unlock();
2902 m_modifier.inputMutex.lock();
2903 if (m_modifier.inputModifier ==
nullptr) {
2904 errMsg =
"No port modifier is attached to the input";
2905 m_modifier.inputMutex.unlock();
2908 m_modifier.inputModifier->getCarrierParams(param);
2909 m_modifier.inputMutex.unlock();
2917 if (unit !=
nullptr) {
2918 bool isLog = (!unit->
getMode().empty());
2925 bool PortCore::setProcessSchedulingParam(
int priority,
int policy)
2927 #if defined(__linux__)
2929 struct sched_param sch_param;
2930 sch_param.__sched_priority = priority;
2933 char path[PATH_MAX];
2936 dir = opendir(path);
2937 if (dir ==
nullptr) {
2945 while ((d = readdir(dir)) !=
nullptr) {
2946 if (isdigit(
static_cast<unsigned char>(*d->d_name)) == 0) {
2950 tid = strtol(d->d_name, &end, 10);
2951 if (d->d_name == end || ((end !=
nullptr) && (*end != 0))) {
2955 ret &= (sched_setscheduler(
static_cast<pid_t
>(tid), policy, &sch_param) == 0);
2959 #elif defined(YARP_HAS_ACE)
2961 ACE_Sched_Params param(policy, (ACE_Sched_Priority)priority, ACE_SCOPE_PROCESS);
2971 m_stateSemaphore.wait();
2973 if (m_prop ==
nullptr) {
2983 m_stateSemaphore.post();
2988 return removeUnit(route, synch);
3001 int PortCore::getNextIndex()
3003 int result = m_counter;
3005 if (m_counter < 0) {
3018 m_address.setName(str);
3023 return m_readableCreator;
3028 m_controlRegistration = flag;
3043 return m_interrupted;
3048 m_timeout = timeout;
3051 #ifndef YARP_NO_DEPRECATED
3056 removeCallbackLock();
3057 if (mutex !=
nullptr) {
3058 m_old_mutex = mutex;
3059 m_mutexOwned =
false;
3062 m_mutexOwned =
true;
3071 removeCallbackLock();
3072 if (mutex !=
nullptr) {
3074 m_mutexOwned =
false;
3076 m_mutex =
new std::mutex;
3077 m_mutexOwned =
true;
3084 if (m_mutexOwned && (m_mutex !=
nullptr)) {
3088 #ifndef YARP_NO_DEPRECATED
3089 m_old_mutex =
nullptr;
3091 m_mutexOwned =
false;
3097 if (m_mutex ==
nullptr) {
3098 #ifndef YARP_NO_DEPRECATED
3099 if (m_old_mutex ==
nullptr) {
3102 m_old_mutex->lock();
3114 if (m_mutex ==
nullptr) {
3115 #ifndef YARP_NO_DEPRECATED
3116 if (m_old_mutex ==
nullptr) {
3119 return m_old_mutex->try_lock();
3124 return m_mutex->try_lock();
3129 if (m_mutex ==
nullptr) {
3130 #ifndef YARP_NO_DEPRECATED
3131 if (m_old_mutex ==
nullptr) {
3134 return m_old_mutex->unlock();
3150 if (!m_checkedType) {
3151 if (!m_type.isValid()) {
3154 m_checkedType =
true;
3156 m_typeMutex.unlock();
3163 m_typeMutex.unlock();
3171 m_typeMutex.unlock();
static bool __tcp_check(const Contact &c)
static bool __pc_rpc(const Contact &c, const char *carrier, Bottle &writer, Bottle &reader, bool verbose)
#define PORTCORE_IS_INPUT
#define PORTCORE_SEND_LOG
#define PORTCORE_IS_OUTPUT
#define PORTCORE_SEND_NORMAL
static bool rpc(const Contact &c, const char *carrier, Bottle &writer, Bottle &reader)
A simple collection of objects that can be described and transmitted in a portable way.
void add(const Value &value)
Add a Value to the bottle, at the end of the list.
void fromString(const std::string &text)
Initializes bottle from a string.
Property & addDict()
Places an empty key/value object in the bottle, at the end of the list.
Bottle & addList()
Places an empty nested list in the bottle, at the end of the list.
size_type size() const
Gets the number of elements in the bottle.
bool read(ConnectionReader &reader) override
Set the bottle's value based on input from a network connection.
void addVocab(int x)
Places a vocabulary item in the bottle, at the end of the list.
Value & get(size_type index) const
Reads a Value v from a certain part of the list.
Bottle & findGroup(const std::string &key) const override
Gets a list corresponding to a given keyword.
bool check(const std::string &key) const override
Check if there exists a property of the given name.
bool write(ConnectionWriter &writer) const override
Output a representation of the bottle to a network connection.
void addInt32(std::int32_t x)
Places a 32-bit integer in the bottle, at the end of the list.
void addString(const char *str)
Places a string in the bottle, at the end of the list.
std::string toString() const override
Gives a human-readable textual representation of the bottle.
Value & find(const std::string &key) const override
Gets a value corresponding to a given keyword.
A base class for connection types (tcp, mcast, shmem, ...) which are called carriers in YARP.
bool isConnectionless() const override=0
Check if this carrier is connectionless (like udp, mcast) or connection based (like tcp).
bool isPush() const override
Check if carrier is "push" or "pull" style.
void setCarrierParams(const Property ¶ms) override
Configure carrier from port administrative commands.
static Face * listen(const Contact &address)
Create a "proto-carrier" interface object that waits for incoming connections prior to a carrier bein...
static Carrier * getCarrierTemplate(const std::string &name)
Get template for carrier.
static Carrier * chooseCarrier(const std::string &name)
Select a carrier by name.
static OutputProtocol * connect(const Contact &address)
Initiate a connection to an address.
An interface for reading from a network connection.
virtual void requestDrop()=0
Tag the connection to be dropped after the current message.
virtual ConnectionWriter * getWriter()=0
Gets a way to reply to the message, if possible.
An interface for writing to a network connection.
virtual bool isPush() const =0
Check if carrier is "push" or "pull" style.
A dummy connection to test yarp::os::Portable implementations.
ConnectionWriter & getWriter()
Get the dummy ConnectionWriter loaded with whatever was written the ConnectionWriter since it was las...
ConnectionReader & getReader()
Get the dummy ConnectionReader loaded with whatever was written the ConnectionWriter since it was las...
Basic wrapper for mutual exclusion.
Simple abstraction for a YARP port name.
Contact toAddress() const
Create an address from the name.
std::string getCarrierModifier(const char *mod, bool *hasModifier=nullptr)
static std::string toString(int x)
static bool initialized()
Returns true if YARP has been fully initialized.
static bool getLocalMode()
Get current value of flag "localMode", see setLocalMode function.
static Contact unregisterName(const std::string &name)
Removes the registration for a name from the name server.
static NameStore * getQueryBypass()
static Contact queryName(const std::string &name)
Find out information about a registered name.
static int disconnectInput(const std::string &src, const std::string &dest, bool silent=false)
Sends a disconnection command to the specified port.
static bool writeToNameServer(PortWriter &cmd, PortReader &reply, const ContactStyle &style)
Variant write method specialized to name server.
static bool disconnect(const std::string &src, const std::string &dest, bool quiet)
Request that an output port disconnect from an input port.
static bool write(const Contact &contact, PortWriter &cmd, PortReader &reply, bool admin=false, bool quiet=false, double timeout=-1)
Send a single command to a port and await a single response.
The output side of an active connection between two ports.
virtual const Route & getRoute() const =0
virtual Connection & getConnection()=0
Get the connection whose protocol operations we are managing.
virtual bool open(const Route &route)=0
Start negotiating a carrier, using the given route (this should generally match the name of the sendi...
virtual void attachPort(Contactable *port)=0
Set the port to be associated with the connection.
virtual OutputStream & getOutputStream()=0
Access the output stream associated with the connection.
virtual InputProtocol & getInput()=0
Get an interface for doing read operations on the connection.
virtual void close()=0
Negotiate an end to operations.
virtual void rename(const Route &route)=0
Relabel the route after the fact (e.g.
virtual bool setTimeout(double timeout)=0
Set the timeout to be used for network operations.
virtual bool isOk() const =0
Check if the connection is valid and can be used.
virtual bool write(SizedWriter &writer)=0
Write a message on the connection.
Simple specification of the minimum functions needed from output streams.
virtual int getTypeOfService()
virtual bool setTypeOfService(int tos)
Information about a port connection or event.
std::string targetName
Name of connection target, if any.
std::string carrierName
Name of protocol type, if releveant.
bool incoming
True if a connection is incoming, false if outgoing.
std::string message
A human-readable description of contents.
std::string portName
Name of port.
int tag
Type of information.
std::string sourceName
Name of connection source, if any.
@ PORTINFO_CONNECTION
Information about an incoming or outgoing connection.
@ PORTINFO_MISC
Unspecified information.
Interface implemented by all objects that can read themselves from the network, such as Bottle object...
virtual bool read(ConnectionReader &reader)=0
Read this object from a network connection.
virtual Type getReadType() const
A base class for objects that want information about port status changes.
virtual void report(const PortInfo &info)=0
Callback for port event/state information.
Interface implemented by all objects that can write themselves to the network, such as Bottle objects...
virtual bool write(ConnectionWriter &writer) const =0
Write this object to a network connection.
virtual void onCommencement() const
This is called when the port is about to begin writing operations.
A class for storing options and configuration information.
Value & find(const std::string &key) const override
Gets a value corresponding to a given keyword.
std::string toString() const override
Return a standard text representation of the content of the object.
void fromString(const std::string &txt, bool wipe=true)
Interprets a string as a list of properties.
void put(const std::string &key, const std::string &value)
Associate the given key with the given string.
bool check(const std::string &key) const override
Check if there exists a property of the given name.
PacketPriorityDSCP
The PacketPriorityDSCP defines the packets quality of service (priority) using DSCP.
static PacketPriorityDSCP getDSCPByVocab(int vocab)
returns the IPV4/6 DSCP value given as DSCP code
static std::string fromRosName(const std::string &name)
Information about a connection between two ports.
const std::string & getToName() const
Get the destination of the route.
const std::string & getCarrierName() const
Get the carrier type of the route.
std::string toString() const
Render a text form of the route, "source->carrier->dest".
const std::string & getFromName() const
Get the source of the route.
void swapNames()
Swap from and to names.
void setToContact(const Contact &toContact)
Set the destination contact of the route.
An OutputStream that produces a string.
std::string toString() const
static ProcessInfo getProcessInfo(int pid=0)
gets the operating system process information given by its PID.
static PlatformInfo getPlatformInfo()
getPlatformInfo
A single value (typically within a Bottle).
virtual bool isString() const
Checks if value is a string.
virtual std::int32_t asInt32() const
Get 32-bit integer value.
virtual Bottle * asList() const
Get list value.
virtual std::int32_t asVocab() const
Get vocabulary identifier as an integer.
virtual std::string asString() const
Get string value.
A helper for creating cached object descriptions.
bool write(ConnectionWriter &connection) const override
Write this object to a network connection.
virtual void appendLine(const std::string &data)
Send a string along with a carriage-return-line-feed sequence.
A helper for recording entire message/reply transactions.
void init(yarp::os::ConnectionReader *wrappedReader)
Call this to wrap a specific ConnectionReader.
void fini()
Call this when all reading/writing has been done.
Manager for a single output from a port.
A single message, potentially being transmitted on multiple connections.
void setContent(const yarp::os::PortWriter *writable, bool owned=false, const yarp::os::PortWriter *callback=nullptr, bool ownedCallback=false)
Configure the object being sent and where to send notifications.
void inc()
Increment the usage count for this messagae.
void dec()
Decrement the usage count for this messagae.
This manages a single threaded resource related to a single input or output connection.
std::string getPupString() const
void setPupped(const std::string &pupString)
Tag this connection as having been created by a publisherUpdate message to the port's administrative ...
virtual void * send(const yarp::os::PortWriter &writer, yarp::os::PortReader *reader, const yarp::os::PortWriter *callback, void *tracker, const std::string &envelope, bool waitAfter=true, bool waitBefore=true, bool *gotReply=nullptr)
Send a message on the connection.
virtual bool isFinished()
std::string getMode(bool *hasMode=nullptr)
Read the "mode" of the connection - basically, whether it is used for logging or not.
virtual void getCarrierParams(yarp::os::Property ¶ms)
void setDoomed()
Request that this connection be shut down as soon as possible.
virtual void setCarrierParams(const yarp::os::Property ¶ms)
Set arbitrary parameters for this connection.
void notifyCompletion(void *tracker)
Call the right onCompletion() after sending message.
void resetPortName(const std::string &str)
std::string getEnvelope()
void setAdminReadHandler(yarp::os::PortReader &reader)
Set a callback for incoming administrative messages.
int getOutputCount()
Check how many output connections there are.
bool readBlock(ConnectionReader &reader, void *id, yarp::os::OutputStream *os)
Read a block of regular payload data.
void setReportCallback(yarp::os::PortReport *reporter)
Set a callback to be notified of changes in port status.
void run() override
The body of the main thread.
bool start() override
Begin main thread.
void checkType(PortReader &reader)
void resume()
Undo an interrupt()
void setTimeout(float timeout)
yarp::os::PortReaderCreator * getReadCreator()
Get the creator of callbacks.
Property * acquireProperties(bool readOnly)
void interrupt()
Prepare the port to be shut down.
void setEnvelope(const std::string &envelope)
Set some envelope information to pass along with a message without actually being part of the message...
bool removeIO(const Route &route, bool synch=false)
Remove any connection matching the supplied route.
bool isInterrupted() const
void report(const yarp::os::PortInfo &info)
Handle a port event (connection, disconnection, etc) Generate a description of the connections associ...
bool listen(const Contact &address, bool shouldAnnounce=true)
Begin service at a given address.
bool sendHelper(const yarp::os::PortWriter &writer, int mode, yarp::os::PortReader *reader=nullptr, const yarp::os::PortWriter *callback=nullptr)
Send a message with a specific mode (normal or log).
void removeOutput(const std::string &dest, void *id, yarp::os::OutputStream *os)
Remove an output connection.
void setControlRegistration(bool flag)
Normally the port will unregister its name with the name server when shutting down.
bool setCallbackLock(yarp::os::Mutex *mutex)
int getInputCount()
Check how many input connections there are.
bool manualStart(const char *sourceName)
Start up the port, but without a main thread.
void setReadHandler(yarp::os::PortReader &reader)
Set a callback for incoming data.
void reportUnit(PortCoreUnit *unit, bool active)
Called by a connection handler with active=true just after it is fully configured,...
void promiseType(const Type &typ)
void releaseProperties(Property *prop)
int getEventCount()
A diagnostic for testing purposes.
void close() override
Shut down port.
const Contact & getAddress() const
Get the address associated with the port.
void describe(void *id, yarp::os::OutputStream *os)
Produce a text description of the port and its connections.
bool isWriting()
Check if a message is currently being sent.
bool adminBlock(ConnectionReader &reader, void *id)
Read a block of administrative data.
void removeInput(const std::string &src, void *id, yarp::os::OutputStream *os)
Remove an input connection.
bool send(const yarp::os::PortWriter &writer, yarp::os::PortReader *reader=nullptr, const yarp::os::PortWriter *callback=nullptr)
Send a normal message.
void resetReportCallback()
Reset the callback to be notified of changes in port status.
void setReadCreator(yarp::os::PortReaderCreator &creator)
Set a callback for creating callbacks for incoming data.
yarp::os::impl::PortDataModifier & getPortModifier()
void setName(const std::string &name)
Set the name of this port.
bool removeCallbackLock()
bool addOutput(const std::string &dest, void *id, yarp::os::OutputStream *os, bool onlyIfNeeded=false)
Add an output connection to this port.
This is the heart of a yarp port.
Lets Readable objects read from the underlying InputStream associated with the connection between two...
void reset(yarp::os::InputStream &in, TwoWayStream *str, const Route &route, size_t len, bool textMode, bool bareMode=false)
int join(double seconds=-1)
int setPriority(int priority=-1, int policy=-1)
#define yCError(component,...)
#define yCAssert(component, x)
#define yCTrace(component,...)
#define yCWarning(component,...)
#define yCDebug(component,...)
#define YARP_OS_LOG_COMPONENT(name, name_string)
std::string getEnvironment(const char *key, bool *found=nullptr)
Read a variable from the environment.
NetInt32 encode(const std::string &str)
Convert a string into a vocabulary identifier.
The components from which ports and connections are built.
An interface to the operating system, including Port based communication.
int getpid()
Portable wrapper for the getppid() function.
std::int32_t NetInt32
Definition of the NetInt32 type.
constexpr yarp::conf::vocab32_t createVocab(char a, char b=0, char c=0, char d=0)
The main, catch-all namespace for YARP.
The ProcessInfo struct provides the operating system process information.
#define YARP_WARNING_POP
Ends a temporary alteration of the enabled warnings.
#define YARP_WARNING_PUSH
Starts a temporary alteration of the enabled warnings.
#define YARP_DISABLE_DEPRECATED_WARNING
Disable deprecated warnings in the following code.