36# include <ace/INET_Addr.h>
37# include <ace/Sched_Params.h>
46# include <sys/types.h>
59PortCore::PortCore() =
default;
74 yCIError(PORTCORE,
getName(),
"YARP not initialized; create a yarp::os::Network object before using ports");
82 std::lock_guard<std::mutex> lock(m_stateMutex);
105 if (m_face ==
nullptr) {
110 if (m_address.
getPort() <= 0) {
119 m_listening.store(
true);
123 if (shouldAnnounce) {
153 m_adminReader = &reader;
161 m_readableCreator = &creator;
187 std::lock_guard<std::mutex> lock(m_stateMutex);
188 m_running.store(
true);
189 m_starting.store(
false);
193 m_stateCv.notify_one();
201 bool shouldStop =
false;
202 while (!shouldStop) {
209 std::lock_guard<std::mutex> lock(m_stateMutex);
221 shouldStop |= m_closing.load();
249 std::lock_guard<std::mutex> lock(m_stateMutex);
250 m_connectionListeners = 0;
251 m_connectionChangeCv.notify_all();
257 std::lock_guard<std::mutex> lock(m_stateMutex);
258 m_connectionListeners = 0;
259 m_connectionChangeCv.notify_all();
260 m_finished.store(
true);
268 if (m_prop !=
nullptr) {
282 std::unique_lock<std::mutex> lock(m_stateMutex);
291 m_starting.store(
true);
301 m_stateCv.wait(lock, [&]{
return m_running.load(); });
318 m_interruptable =
false;
328 m_interrupted =
false;
336 if (!m_listening.load()) {
341 m_interrupted =
true;
346 if (!m_interruptable) {
356 std::lock_guard<std::mutex> lock(m_stateMutex);
357 if (m_reader !=
nullptr) {
358 yCIDebug(PORTCORE,
getName(),
"sending update-state message to listener");
368void PortCore::closeMain()
374 std::lock_guard<std::mutex> lock(m_stateMutex);
377 if (m_finishing || !(m_running.load() || m_manual)) {
396 std::string prevName;
399 std::string removeName;
402 std::lock_guard<std::mutex> lock(m_stateMutex);
403 for (
auto* unit : m_units) {
404 if ((unit !=
nullptr) && unit->isInput() && !unit->isDoomed()) {
405 Route r = unit->getRoute();
407 if (s.length() >= 1 && s[0] ==
'/' && s !=
getName() && s != prevName) {
416 yCIDebug(PORTCORE,
getName(),
"requesting removal of connection from %s", removeName.c_str());
425 prevName = removeName;
437 std::lock_guard<std::mutex> lock(m_stateMutex);
438 for (
auto* unit : m_units) {
439 if ((unit !=
nullptr) && unit->isOutput() && !unit->isFinished()) {
440 removeRoute = unit->getRoute();
449 removeUnit(removeRoute,
true);
453 bool stopRunning = m_running.load();
458 m_closing.store(
true);
481 std::lock_guard<std::mutex> lock(m_stateMutex);
482 m_finished.store(
false);
483 m_closing.store(
false);
484 m_running.store(
false);
490 if (m_listening.load()) {
495 m_listening.store(
false);
501 if (m_reader !=
nullptr) {
502 yCIDebug(PORTCORE,
getName(),
"sending end-of-port message to listener");
511 if (name != std::string(
"")) {
512 if (m_controlRegistration) {
535 std::lock_guard<std::mutex> lock(m_stateMutex);
541void PortCore::closeUnits()
549 for (
auto& i : m_units) {
551 if (unit !=
nullptr) {
566void PortCore::reapUnits()
570 if (!m_finished.load()) {
571 std::lock_guard<std::mutex> lock(m_stateMutex);
572 for (
auto* unit : m_units) {
575 yCIDebug(PORTCORE,
getName(),
"Informing connection %s that it is doomed", s.c_str());
579 yCIDebug(PORTCORE,
getName(),
"Joined thread of connection %s", s.c_str());
586void PortCore::cleanUnits(
bool blocking)
593 std::unique_lock<std::mutex> lock(m_stateMutex, std::defer_lock);
597 bool have_lock = lock.try_lock();
605 int updatedInputCount = 0;
606 int updatedOutputCount = 0;
607 int updatedDataOutputCount = 0;
608 yCIDebug(PORTCORE,
getName(),
"/ routine check of connections to this port begins");
609 if (!m_finished.load()) {
612 for (
auto& i : m_units) {
614 if (unit !=
nullptr) {
618 yCIDebug(PORTCORE,
getName(),
"| removing connection %s", con.c_str());
628 updatedOutputCount++;
630 updatedDataOutputCount++;
647 for (
size_t i2 = 0; i2 < m_units.size(); i2++) {
648 if (m_units[i2] !=
nullptr) {
650 m_units[rem] = m_units[i2];
651 m_units[i2] =
nullptr;
658 for (
size_t i3 = 0; i3 < m_units.size() - rem; i3++) {
664 m_dataOutputCount = updatedDataOutputCount;
667 m_packetMutex.lock();
668 m_inputCount = updatedInputCount;
669 m_outputCount = updatedOutputCount;
670 m_packetMutex.unlock();
671 yCIDebug(PORTCORE,
getName(),
"\\ routine check of connections to this port ends");
685 std::lock_guard<std::mutex> lock(m_stateMutex);
692 m_units.push_back(unit);
693 yCITrace(PORTCORE,
getName(),
"there are now %zu units", m_units.size());
709 if (!m_finished.load()) {
710 std::lock_guard<std::mutex> lock(m_stateMutex);
714 m_units.push_back(unit);
719bool PortCore::isUnit(
const Route& route,
int index)
722 bool needReap =
false;
723 if (!m_finished.load()) {
724 for (
auto* unit : m_units) {
725 if (unit !=
nullptr) {
727 std::string wild =
"*";
730 ok = ok && (unit->
getIndex() == index);
754bool PortCore::removeUnit(
const Route& route,
bool synch,
bool* except)
759 if (except !=
nullptr) {
768 std::vector<int> removals;
770 bool needReap =
false;
771 if (!m_finished.load()) {
772 std::lock_guard<std::mutex> lock(m_stateMutex);
773 for (
auto* unit : m_units) {
774 if (unit !=
nullptr) {
776 std::string wild =
"*";
785 if (except ==
nullptr) {
797 removals.push_back(unit->
getIndex());
810 yCIDebug(PORTCORE,
getName(),
"one or more connections need prodding to die");
822 yCIDebug(PORTCORE,
getName(),
"sent message to prod connection death");
829 std::unique_lock<std::mutex> lock(m_stateMutex);
830 while (std::any_of(removals.begin(), removals.end(), [&](
int removal){ return isUnit(route, removal); })) {
831 m_connectionListeners++;
832 m_connectionChangeCv.wait(lock, [&]{
return m_connectionListeners == 0; });
848 yCIDebug(PORTCORE,
getName(),
"asked to add output to %s", dest.c_str());
861 bw.
appendLine(std::string(
"Do not know how to connect to ") + dest);
878 yCIDebug(PORTCORE,
getName(),
"output already present to %s", dest.c_str());
879 bw.
appendLine(std::string(
"Desired connection already present from ") +
getName() +
" to " + dest);
893 aname = address.
toURI(
false);
910 bool is_log = (!mode.empty());
913 err =
"Logger configured as log." + mode +
", but only log.in is supported";
916 append =
"; " + r.
getFromName() +
" will forward messages and replies (if any) to " + r.
getToName();
927 err =
"Outputs not allowed";
932 if (m_dataOutputCount >= 1 && !is_log) {
933 err =
"RPC output already connected";
959 bool ok = op->
open(r);
969 bw.
appendLine(std::string(
"Cannot connect to ") + dest);
990 if (!m_finished.load()) {
991 std::lock_guard<std::mutex> lock(m_stateMutex);
998 m_units.push_back(unit);
1003 bw.
appendLine(std::string(
"Added connection from ") +
getName() +
" to " + dest + append);
1004 if (os !=
nullptr) {
1015 yCIDebug(PORTCORE,
getName(),
"asked to remove output to %s", dest.c_str());
1019 if (removeUnit(
Route(
"*", dest,
"*"),
true)) {
1020 bw.
appendLine(std::string(
"Removed connection from ") +
getName() +
" to " + dest);
1022 bw.
appendLine(std::string(
"Could not find an outgoing connection to ") + dest);
1024 if (os !=
nullptr) {
1033 yCIDebug(PORTCORE,
getName(),
"asked to remove input to %s", src.c_str());
1037 if (removeUnit(
Route(src,
"*",
"*"),
true)) {
1038 bw.
appendLine(std::string(
"Removing connection from ") + src +
" to " +
getName());
1040 bw.
appendLine(std::string(
"Could not find an incoming connection from ") + src);
1042 if (os !=
nullptr) {
1059 std::lock_guard<std::mutex> lock(m_stateMutex);
1066 for (
auto* unit : m_units) {
1075 bw.
appendLine(
"There are no outgoing connections");
1080 for (
auto* unit : m_units) {
1091 bw.
appendLine(
"There are no incoming connections");
1096 if (os !=
nullptr) {
1101 printf(
"%s\n", sos.
toString().c_str());
1110 std::lock_guard<std::mutex> lock(m_stateMutex);
1115 std::string portName = m_address.
getRegName();
1116 baseInfo.
message = std::string(
"This is ") + portName +
" at " + m_address.
toURI();
1117 reporter.
report(baseInfo);
1121 for (
auto* unit : m_units) {
1140 info.
message =
"There are no outgoing connections";
1146 for (
auto* unit : m_units) {
1165 info.
message =
"There are no incoming connections";
1173 std::lock_guard<std::mutex> lock(m_stateMutex);
1174 if (reporter !=
nullptr) {
1175 m_eventReporter = reporter;
1181 std::lock_guard<std::mutex> lock(m_stateMutex);
1182 m_eventReporter =
nullptr;
1193 if (m_eventReporter !=
nullptr) {
1194 m_eventReporter->
report(info);
1211 if (m_reader !=
nullptr && !m_interrupted) {
1212 m_interruptable =
false;
1214 bool haveOutputs = (m_outputCount != 0);
1216 if (m_logNeeded && haveOutputs) {
1223 recorder.
init(&reader);
1225 result = m_reader->
read(recorder);
1233 result = m_reader->
read(reader);
1237 m_interruptable =
true;
1242 result = b.
read(reader);
1277 if (m_interrupted || m_finishing) {
1282 bool gotReply =
false;
1284 std::string envelopeString = m_envelope;
1306 std::lock_guard<std::mutex> lock(m_stateMutex);
1309 if (m_finished.load()) {
1316 m_packetMutex.lock();
1319 packet->
setContent(&writer,
false, callback);
1320 m_packetMutex.unlock();
1323 for (
auto* unit : m_units) {
1325 bool log = (!unit->
getMode().empty());
1336 m_packetMutex.lock();
1338 m_packetMutex.unlock();
1340 bool gotReplyOne =
false;
1342 void* out = unit->
send(writer,
1344 (callback !=
nullptr) ? callback : (&writer),
1345 reinterpret_cast<void*
>(packet),
1350 gotReply = gotReply || gotReplyOne;
1352 if (out !=
nullptr) {
1354 m_packetMutex.lock();
1357 m_packetMutex.unlock();
1368 m_packetMutex.lock();
1376 m_packetMutex.unlock();
1380 if (logCount == 0) {
1381 m_logNeeded =
false;
1387 if (m_waitAfterSend && reader !=
nullptr) {
1388 all_ok = all_ok && gotReply;
1397 bool writing =
false;
1401 if (!m_finished.load()) {
1402 std::lock_guard<std::mutex> lock(m_stateMutex);
1403 for (
auto* unit : m_units) {
1417 m_packetMutex.lock();
1418 int result = m_inputCount;
1419 m_packetMutex.unlock();
1426 m_packetMutex.lock();
1427 int result = m_outputCount;
1428 m_packetMutex.unlock();
1436 m_packetMutex.lock();
1437 if (tracker !=
nullptr) {
1441 m_packetMutex.unlock();
1449 bool ok = envelope.
write(m_envelopeWriter);
1459 m_envelope = envelope;
1460 for (
size_t i = 0; i < m_envelope.length(); i++) {
1463 if (m_envelope[i] < 32) {
1464 m_envelope = m_envelope.substr(0, i);
1468 yCIDebug(PORTCORE,
getName(),
"set envelope to %s", m_envelope.c_str());
1479 sis.
add(m_envelope);
1483 sbr.
reset(sis,
nullptr, route, 0,
true);
1484 return envelope.
read(sbr);
1492 const char* carrier,
1498 style.
quiet = !verbose;
1515 result = addr.set(c.
getPort(),
"127.0.0.1");
1519 result = addr.set(c.
getPort(),
"127.0.1.1");
1568 if (cmd ==
"publisherUpdate") {
1569 return PortCoreCommand::RosPublisherUpdate;
1571 if (cmd ==
"requestTopic") {
1572 return PortCoreCommand::RosRequestTopic;
1574 if (cmd ==
"getPid") {
1575 return PortCoreCommand::RosGetPid;
1577 if (cmd ==
"getBusInfo") {
1578 return PortCoreCommand::RosGetBusInfo;
1582 auto cmd =
static_cast<PortCoreCommand
>(v.
asVocab32());
1584 case PortCoreCommand::Help:
1585 case PortCoreCommand::Ver:
1586 case PortCoreCommand::Pray:
1587 case PortCoreCommand::Add:
1588 case PortCoreCommand::Del:
1589 case PortCoreCommand::Atch:
1590 case PortCoreCommand::Dtch:
1591 case PortCoreCommand::List:
1592 case PortCoreCommand::Set:
1593 case PortCoreCommand::Get:
1594 case PortCoreCommand::Prop:
1595 case PortCoreCommand::RosPublisherUpdate:
1596 case PortCoreCommand::RosRequestTopic:
1597 case PortCoreCommand::RosGetPid:
1598 case PortCoreCommand::RosGetBusInfo:
1601 return PortCoreCommand::Unknown;
1605PortCoreConnectionDirection parseConnectionDirection(
yarp::conf::vocab32_t v,
bool errorIsOut =
false)
1607 auto dir =
static_cast<PortCoreConnectionDirection
>(v);
1609 case PortCoreConnectionDirection::In:
1610 case PortCoreConnectionDirection::Out:
1613 return errorIsOut ? PortCoreConnectionDirection::Out : PortCoreConnectionDirection::Error;
1619 auto action =
static_cast<PortCorePropertyAction
>(v);
1621 case PortCorePropertyAction::Get:
1622 case PortCorePropertyAction::Set:
1625 return PortCorePropertyAction::Error;
1629void describeRoute(
const Route& route,
Bottle& result)
1646 bconnectionless.
addString(
"connectionless");
1649 if (!carrier->
isPush()) {
1673 auto handleAdminHelpCmd = []() {
1677 result.
addString(
"[help] # give this help");
1678 result.
addString(
"[ver] # report protocol version information");
1679 result.
addString(
"[add] $portname # add an output connection");
1680 result.
addString(
"[add] $portname $car # add an output with a given protocol");
1681 result.
addString(
"[del] $portname # remove an input or output connection");
1682 result.
addString(
"[list] [in] # list input connections");
1683 result.
addString(
"[list] [out] # list output connections");
1684 result.
addString(
"[list] [in] $portname # give details for input");
1685 result.
addString(
"[list] [out] $portname # give details for output");
1686 result.
addString(
"[prop] [get] # get all user-defined port properties");
1687 result.
addString(
"[prop] [get] $prop # get a user-defined port property (prop, val)");
1688 result.
addString(
"[prop] [set] $prop $val # set a user-defined port property (prop, val)");
1689 result.
addString(
"[prop] [get] $portname # get Qos properties of a connection to/from a port");
1690 result.
addString(
"[prop] [set] $portname # set Qos properties of a connection to/from a port");
1691 result.
addString(
"[prop] [get] $cur_port # get information about current process (e.g., scheduling priority, pid)");
1692 result.
addString(
"[prop] [set] $cur_port # set properties of the current process (e.g., scheduling priority, pid)");
1693 result.
addString(
"[atch] [out] $prop # attach a portmonitor plug-in to the port's output");
1694 result.
addString(
"[atch] [in] $prop # attach a portmonitor plug-in to the port's input");
1695 result.
addString(
"[dtch] [out] # detach portmonitor plug-in from the port's output");
1696 result.
addString(
"[dtch] [in] # detach portmonitor plug-in from the port's input");
1702 auto handleAdminVerCmd = []() {
1713 auto handleAdminPrayCmd = [
this]() {
1725 while (name[0] ==
'/') {
1726 name = name.substr(1);
1729 auto i = name.find(
'/');
1730 if (i != std::string::npos) {
1731 name = name.substr(0, i);
1735 std::random_device rd;
1736 std::mt19937 mt(rd());
1737 std::uniform_int_distribution<int> dist2(0,1);
1738 auto d2 = std::bind(dist2, mt);
1740 result.
addString(
"You begin praying to " + name +
".");
1741 result.
addString(
"You finish your prayer.");
1743 static const char* godvoices[] = {
1749 std::uniform_int_distribution<int> godvoices_dist(0, (
sizeof(godvoices) /
sizeof(godvoices[0])) - 1);
1750 auto godvoice = [&]() {
1751 return std::string(godvoices[godvoices_dist(mt)]);
1754 static const char* creatures[] = {
1759 std::uniform_int_distribution<int> creatures_dist(0, (
sizeof(creatures) /
sizeof(creatures[0])) - 1);
1760 auto creature = [&]() {
1761 return std::string(creatures[creatures_dist(mt)]);
1764 static const char* auras[] = {
1772 std::uniform_int_distribution<int> auras_dist(0, (
sizeof(auras) /
sizeof(auras[0])) - 1);
1774 return std::string(auras[auras_dist(mt)]);
1777 static const char* items[] = {
1787 std::uniform_int_distribution<int> items_dist(0, (
sizeof(items) /
sizeof(items[0])) - 1);
1789 return std::string(items[items_dist(mt)]);
1792 static const char* blessings[] = {
1793 "You feel more limber.",
1794 "The slime disappears.",
1795 "Your amulet vanishes! You can breathe again.",
1796 "You can breathe again.",
1797 "You are back on solid ground.",
1798 "Your stomach feels content.",
1800 "You feel much better.",
1801 "Your surroundings change.",
1802 "Your shape becomes uncertain.",
1803 "Your chain disappears.",
1804 "There's a tiger in your tank.",
1805 "You feel in good health again.",
1806 "Your eye feels better.",
1807 "Your eyes feel better.",
1808 "Looks like you are back in Kansas.",
1809 "Your <ITEM> softly glows <AURA>.",
1811 std::uniform_int_distribution<int> blessings_dist(0, (
sizeof(blessings) /
sizeof(blessings[0])) - 1);
1812 auto blessing = [&](){
1813 auto blessing = std::string(blessings[blessings_dist(mt)]);
1814 blessing = std::regex_replace(blessing, std::regex(
"<ITEM>"), item());
1815 blessing = std::regex_replace(blessing, std::regex(
"<AURA>"), aura());
1819 std::uniform_int_distribution<int> dist13(0,12);
1820 switch(dist13(mt)) {
1823 result.
addString(
"You feel that " + name +
" is " + (d2() ?
"bummed" :
"displeased") +
".");
1827 result.
addString(
"The voice of " + name +
" " + godvoice() +
1828 ": \"Thou " + (d2() ?
"hast strayed from the path" :
"art arrogant") +
1829 ", " + creature() +
". Thou must relearn thy lessons!\"");
1833 result.
addString(
"The voice of " + name +
" " + godvoice() +
1834 ": \"Thou hast angered me.\"");
1835 result.
addString(
"A black glow surrounds you.");
1838 result.
addString(
"The voice of " + name +
" " + godvoice() +
1839 ": \"Thou hast angered me.\"");
1843 result.
addString(
"The voice of " + name +
" " + godvoice() +
1844 ": \"Thou durst " + (d2() ?
"scorn" :
"call upon") +
1845 " me? Then die, " + creature() +
"!\"");
1848 result.
addString(
"You feel that " + name +
" is " + (d2() ?
"pleased as punch" :
"well-pleased") +
".");
1852 result.
addString(
"You feel that " + name +
" is " + (d2() ?
"ticklish" :
"pleased") +
".");
1856 result.
addString(
"You feel that " + name +
" is " + (d2() ?
"full" :
"satisfied") +
".");
1860 result.
addString(
"The voice of " + name +
" " + godvoice() +
1861 ": \"Thou hast angered me.\"");
1862 result.
addString(
"Suddenly, a bolt of lightning strikes you!");
1863 result.
addString(
"You fry to a crisp!");
1870 auto handleAdminAddCmd = [
this,
id](std::string output,
1871 const std::string& carrier) {
1875 if (!carrier.empty()) {
1876 output = carrier +
":/" + output;
1880 int v = (r[0] ==
'A') ? 0 : -1;
1886 auto handleAdminDelCmd = [
this,
id](
const std::string& dest) {
1895 int v = (r1[0] ==
'R' || r2[0] ==
'R') ? 0 : -1;
1897 if (r1[0] ==
'R' && r2[0] !=
'R') {
1899 }
else if (r1[0] !=
'R' && r2[0] ==
'R') {
1907 auto handleAdminAtchCmd = [
this](PortCoreConnectionDirection direction,
1910 switch (direction) {
1911 case PortCoreConnectionDirection::Out: {
1913 if (!attachPortMonitor(prop,
true, errMsg)) {
1920 case PortCoreConnectionDirection::In: {
1922 if (!attachPortMonitor(prop,
false, errMsg)) {
1929 case PortCoreConnectionDirection::Error:
1931 result.
addString(
"attach command must be followed by [out] or [in]");
1936 auto handleAdminDtchCmd = [
this](PortCoreConnectionDirection direction) {
1938 switch (direction) {
1939 case PortCoreConnectionDirection::Out: {
1940 if (detachPortMonitor(
true)) {
1946 case PortCoreConnectionDirection::In: {
1947 if (detachPortMonitor(
false)) {
1953 case PortCoreConnectionDirection::Error:
1955 result.
addString(
"detach command must be followed by [out] or [in]");
1960 auto handleAdminListCmd = [
this](
const PortCoreConnectionDirection direction,
1961 const std::string& target) {
1963 switch (direction) {
1964 case PortCoreConnectionDirection::In: {
1966 std::lock_guard<std::mutex> lock(m_stateMutex);
1967 for (
auto* unit : m_units) {
1970 if (target.empty()) {
1972 if (!name.empty()) {
1976 describeRoute(route, result);
1981 case PortCoreConnectionDirection::Out: {
1983 std::lock_guard<std::mutex> lock(m_stateMutex);
1984 for (
auto* unit : m_units) {
1987 if (target.empty()) {
1989 }
else if (route.
getToName() == target) {
1990 describeRoute(route, result);
1995 case PortCoreConnectionDirection::Error:
2003 auto handleAdminSetInCmd = [
this](
const std::string& target,
2007 std::lock_guard<std::mutex> lock(m_stateMutex);
2008 if (target.empty()) {
2010 result.
addString(
"target port is not specified.\r\n");
2014 if (!setParamPortMonitor(property,
false, errMsg)) {
2021 for (
auto* unit : m_units) {
2027 std::string msg =
"Configured connection from ";
2036 if (result.
size() == 0) {
2038 std::string msg =
"Could not find an incoming connection from ";
2047 auto handleAdminSetOutCmd = [
this](
const std::string& target,
2051 std::lock_guard<std::mutex> lock(m_stateMutex);
2052 if (target.empty()) {
2054 result.
addString(
"target port is not specified.\r\n");
2058 if (!setParamPortMonitor(property,
true, errMsg)) {
2065 for (
auto* unit : m_units) {
2071 std::string msg =
"Configured connection to ";
2080 if (result.
size() == 0) {
2082 std::string msg =
"Could not find an incoming connection to ";
2091 auto handleAdminGetInCmd = [
this](
const std::string& target) {
2094 std::lock_guard<std::mutex> lock(m_stateMutex);
2095 if (target.empty()) {
2097 result.
addString(
"target port is not specified.\r\n");
2098 }
else if (target ==
getName()) {
2101 if (!getParamPortMonitor(property,
false, errMsg)) {
2108 for (
auto* unit : m_units) {
2119 if (result.
size() == 0) {
2121 std::string msg =
"Could not find an incoming connection from ";
2130 auto handleAdminGetOutCmd = [
this](
const std::string& target) {
2133 std::lock_guard<std::mutex> lock(m_stateMutex);
2134 if (target.empty()) {
2136 result.
addString(
"target port is not specified.\r\n");
2137 }
else if (target ==
getName()) {
2140 if (!getParamPortMonitor(property,
true, errMsg)) {
2147 for (
auto* unit : m_units) {
2158 if (result.
size() == 0) {
2160 std::string msg =
"Could not find an incoming connection to ";
2169 auto handleAdminPropGetCmd = [
this](
const std::string& key) {
2177 if (key[0] ==
'/') {
2178 bool bFound =
false;
2185 sched_prop.
put(
"tid",
static_cast<int>(this->
getTid()));
2193 proc_prop.
put(
"pid", info.
pid);
2194 proc_prop.
put(
"name", (info.
pid != -1) ? info.
name :
"unknown");
2195 proc_prop.
put(
"arguments", (info.
pid != -1) ? info.
arguments :
"unknown");
2203 platform_prop.
put(
"os", pinfo.
name);
2204 platform_prop.
put(
"hostname", m_address.
getHost());
2213 port_prop.
put(
"is_input", is_input);
2214 port_prop.
put(
"is_output", is_output);
2215 port_prop.
put(
"is_rpc", is_rpc);
2218 for (
auto* unit : m_units) {
2219 if ((unit !=
nullptr) && !unit->
isFinished()) {
2222 if (key == coreName) {
2226 int tos = getTypeOfService(unit);
2227 int tid =
static_cast<int>(unit->
getTid());
2231 sched_prop.
put(
"tid",
tid);
2232 sched_prop.
put(
"priority", priority);
2233 sched_prop.
put(
"policy", policy);
2237 qos_prop.
put(
"tos", tos);
2245 std::string msg =
"cannot find any connection to/from ";
2259 auto handleAdminPropSetCmd = [
this](
const std::string& key,
2272 if (!process.isNull()) {
2273 std::string portName = key;
2274 if ((!portName.empty()) && (portName[0] ==
'/')) {
2279 if (process_prop !=
nullptr) {
2282 if (process_prop->
check(
"priority")) {
2285 if (process_prop->
check(
"policy")) {
2288 bOk = setProcessSchedulingParam(prio, policy);
2299 if (!sched.isNull()) {
2300 if ((!key.empty()) && (key[0] ==
'/')) {
2302 for (
auto* unit : m_units) {
2303 if ((unit !=
nullptr) && !unit->
isFinished()) {
2307 if (portName == key) {
2309 if (sched_prop !=
nullptr) {
2312 if (sched_prop->
check(
"priority")) {
2315 if (sched_prop->
check(
"policy")) {
2332 if (!qos.isNull()) {
2333 if ((!key.empty()) && (key[0] ==
'/')) {
2335 for (
auto* unit : m_units) {
2336 if ((unit !=
nullptr) && !unit->
isFinished()) {
2339 if (portName == key) {
2341 if (qos_prop !=
nullptr) {
2343 if (qos_prop->
check(
"priority")) {
2368 }
else if (qos_prop->
check(
"dscp")) {
2373 auto dscp_val = qos_prop->
find(
"dscp");
2374 if (dscp_val.isInt32()) {
2378 dscp =
static_cast<int>(dscp_class);
2380 if ((dscp >= 0) && (dscp < 64)) {
2383 }
else if (qos_prop->
check(
"tos")) {
2385 auto tos_val = qos_prop->
find(
"tos");
2386 if (tos_val.isInt32()) {
2391 bOk = setTypeOfService(unit, tos);
2410 auto handleAdminRosPublisherUpdateCmd = [
this](
const std::string& topic,
Bottle* pubs) {
2419 if (pubs !=
nullptr) {
2421 for (
size_t i = 0; i < pubs->size(); i++) {
2422 std::string pub = pubs->get(i).asString();
2428 std::lock_guard<std::mutex> lock(m_stateMutex);
2429 for (
auto* unit : m_units) {
2430 if ((unit !=
nullptr) && unit->
isPupped()) {
2433 if (!listed.
check(me)) {
2439 for (
size_t i = 0; i < pubs->size(); i++) {
2440 std::string pub = pubs->get(i).asString();
2441 if (!present.
check(pub)) {
2454 if (!
__pc_rpc(c,
"xmlrpc", req, reply,
false)) {
2455 fprintf(stderr,
"Cannot connect to ROS subscriber %s\n", pub.c_str());
2457 __pc_rpc(c,
"xmlrpc", req, reply,
true);
2461 std::string hostname;
2462 std::string carrier;
2465 fprintf(stderr,
"Failure looking up topic %s: %s\n", topic.c_str(), reply.
toString().c_str());
2466 }
else if (pref ==
nullptr) {
2467 fprintf(stderr,
"Failure looking up topic %s: expected list of protocols\n", topic.c_str());
2469 fprintf(stderr,
"Failure looking up topic %s: unsupported protocol %s\n", topic.c_str(), pref->
get(0).
asString().c_str());
2475 carrier =
"tcpros+role.pub+topic.";
2477 yCIDebug(PORTCORE,
getName(),
"topic %s available at %s:%d", topic.c_str(), hostname.c_str(), portnum);
2480 Contact addr(hostname, portnum);
2484 if (op ==
nullptr) {
2485 fprintf(stderr,
"NO CONNECTION\n");
2497 std::lock_guard<std::mutex> lock(m_stateMutex);
2505 m_units.push_back(unit);
2517 auto handleAdminRosRequestTopicCmd = [
this]() {
2531 auto handleAdminRosGetPidCmd = []() {
2540 auto handleAdminRosGetBusInfoCmd = []() {
2550 auto handleAdminUnknownCmd = [
this](
const Bottle& cmd) {
2553 if (m_adminReader !=
nullptr) {
2565 result.
addString(
"send [help] for list of valid commands");
2570 const PortCoreCommand command = parseCommand(cmd.
get(0));
2572 case PortCoreCommand::Help:
2573 result = handleAdminHelpCmd();
2575 case PortCoreCommand::Ver:
2576 result = handleAdminVerCmd();
2578 case PortCoreCommand::Pray:
2579 result = handleAdminPrayCmd();
2581 case PortCoreCommand::Add: {
2584 result = handleAdminAddCmd(std::move(output), carrier);
2586 case PortCoreCommand::Del: {
2588 result = handleAdminDelCmd(dest);
2590 case PortCoreCommand::Atch: {
2591 const PortCoreConnectionDirection direction = parseConnectionDirection(cmd.
get(1).
asVocab32());
2593 result = handleAdminAtchCmd(direction, std::move(prop));
2595 case PortCoreCommand::Dtch: {
2596 const PortCoreConnectionDirection direction = parseConnectionDirection(cmd.
get(1).
asVocab32());
2597 result = handleAdminDtchCmd(direction);
2599 case PortCoreCommand::List: {
2600 const PortCoreConnectionDirection direction = parseConnectionDirection(cmd.
get(1).
asVocab32(),
true);
2602 result = handleAdminListCmd(direction, target);
2604 case PortCoreCommand::Set: {
2605 const PortCoreConnectionDirection direction = parseConnectionDirection(cmd.
get(1).
asVocab32(),
true);
2609 switch (direction) {
2610 case PortCoreConnectionDirection::In:
2611 result = handleAdminSetInCmd(target, property);
2613 case PortCoreConnectionDirection::Out:
2614 result = handleAdminSetOutCmd(target, property);
2616 case PortCoreConnectionDirection::Error:
2621 case PortCoreCommand::Get: {
2622 const PortCoreConnectionDirection direction = parseConnectionDirection(cmd.
get(1).
asVocab32(),
true);
2624 switch (direction) {
2625 case PortCoreConnectionDirection::In:
2626 result = handleAdminGetInCmd(target);
2628 case PortCoreConnectionDirection::Out:
2629 result = handleAdminGetOutCmd(target);
2631 case PortCoreConnectionDirection::Error:
2636 case PortCoreCommand::Prop: {
2637 PortCorePropertyAction action = parsePropertyAction(cmd.
get(1).
asVocab32());
2641 case PortCorePropertyAction::Get:
2642 result = handleAdminPropGetCmd(key);
2644 case PortCorePropertyAction::Set: {
2649 result = handleAdminPropSetCmd(key, value, process, sched, qos);
2651 case PortCorePropertyAction::Error:
2653 result.
addString(
"property action not known");
2657 case PortCoreCommand::RosPublisherUpdate: {
2662 result = handleAdminRosPublisherUpdateCmd(topic, pubs);
2665 case PortCoreCommand::RosRequestTopic:
2670 result = handleAdminRosRequestTopicCmd();
2673 case PortCoreCommand::RosGetPid:
2675 result = handleAdminRosGetPidCmd();
2678 case PortCoreCommand::RosGetBusInfo:
2680 result = handleAdminRosGetBusInfoCmd();
2683 case PortCoreCommand::Unknown:
2684 result = handleAdminUnknownCmd(cmd);
2689 if (writer !=
nullptr) {
2690 result.
write(*writer);
2697bool PortCore::setTypeOfService(
PortCoreUnit* unit,
int tos)
2699 if (unit ==
nullptr) {
2707 if (outUnit !=
nullptr) {
2709 if (op !=
nullptr) {
2710 yCIDebug(PORTCORE,
getName(),
"Trying to set TOS = %d on output unit", tos);
2727 if (inUnit !=
nullptr) {
2730 yCIDebug(PORTCORE,
getName(),
"Trying to set TOS = %d on input unit", tos);
2745 if (unit ==
nullptr) {
2751 if (outUnit !=
nullptr) {
2753 if (op !=
nullptr) {
2766 if (inUnit !=
nullptr) {
2777bool PortCore::attachPortMonitor(
yarp::os::Property& prop,
bool isOutput, std::string& errMsg)
2781 if (portmonitor ==
nullptr) {
2782 errMsg =
"Portmonitor carrier modifier cannot be find or it is not enabled in YARP!";
2787 detachPortMonitor(
true);
2789 prop.
put(
"destination",
"");
2790 prop.
put(
"sender_side", 1);
2791 prop.
put(
"receiver_side", 0);
2792 prop.
put(
"carrier",
"");
2797 errMsg =
"Failed to configure the portmonitor plug-in";
2803 detachPortMonitor(
false);
2804 prop.
put(
"source",
"");
2806 prop.
put(
"sender_side", 0);
2807 prop.
put(
"receiver_side", 1);
2808 prop.
put(
"carrier",
"");
2813 errMsg =
"Failed to configure the portmonitor plug-in";
2823bool PortCore::detachPortMonitor(
bool isOutput)
2839 std::string& errMsg)
2844 errMsg =
"No port modifier is attached to the output";
2853 errMsg =
"No port modifier is attached to the input";
2865 std::string& errMsg)
2870 errMsg =
"No port modifier is attached to the output";
2879 errMsg =
"No port modifier is attached to the input";
2892 if (unit !=
nullptr) {
2893 bool isLog = (!unit->
getMode().empty());
2900bool PortCore::setProcessSchedulingParam(
int priority,
int policy)
2902#if defined(__linux__)
2904 struct sched_param sch_param;
2905 sch_param.__sched_priority = priority;
2908 char path[PATH_MAX];
2911 dir = opendir(path);
2912 if (dir ==
nullptr) {
2920 while ((d = readdir(dir)) !=
nullptr) {
2921 if (isdigit(
static_cast<unsigned char>(*d->d_name)) == 0) {
2925 tid = strtol(d->d_name, &end, 10);
2926 if (d->d_name == end || ((end !=
nullptr) && (*end != 0))) {
2930 ret &= (sched_setscheduler(
static_cast<pid_t
>(
tid), policy, &sch_param) == 0);
2934#elif defined(YARP_HAS_ACE)
2936 ACE_Sched_Params param(policy, (ACE_Sched_Priority)priority, ACE_SCOPE_PROCESS);
2946 m_stateMutex.lock();
2948 if (m_prop ==
nullptr) {
2958 m_stateMutex.unlock();
2963 return removeUnit(route, synch);
2976int PortCore::getNextIndex()
2978 int result = m_counter;
2980 if (m_counter < 0) {
2998 return m_readableCreator;
3003 m_controlRegistration = flag;
3008 return m_listening.load();
3018 return m_interrupted;
3023 m_timeout = timeout;
3029 if (mutex !=
nullptr) {
3031 m_mutexOwned =
false;
3033 m_mutex =
new std::mutex;
3034 m_mutexOwned =
true;
3041 if (m_mutexOwned && (m_mutex !=
nullptr)) {
3045 m_mutexOwned =
false;
3051 if (m_mutex ==
nullptr) {
3060 if (m_mutex ==
nullptr) {
3063 return m_mutex->try_lock();
3068 if (m_mutex ==
nullptr) {
3082 if (!m_checkedType) {
3086 m_checkedType =
true;
3088 m_typeMutex.unlock();
3095 m_typeMutex.unlock();
3103 m_typeMutex.unlock();
static bool __tcp_check(const Contact &c)
static bool __pc_rpc(const Contact &c, const char *carrier, Bottle &writer, Bottle &reader, bool verbose)
#define PORTCORE_IS_INPUT
#define PORTCORE_SEND_LOG
#define PORTCORE_IS_OUTPUT
#define PORTCORE_SEND_NORMAL
static bool rpc(const Contact &c, const char *carrier, Bottle &writer, Bottle &reader)
A simple collection of objects that can be described and transmitted in a portable way.
void add(const Value &value)
Add a Value to the bottle, at the end of the list.
void addVocab32(yarp::conf::vocab32_t x)
Places a vocabulary item in the bottle, at the end of the list.
void fromString(const std::string &text)
Initializes bottle from a string.
Property & addDict()
Places an empty key/value object in the bottle, at the end of the list.
Bottle & addList()
Places an empty nested list in the bottle, at the end of the list.
size_type size() const
Gets the number of elements in the bottle.
bool read(ConnectionReader &reader) override
Set the bottle's value based on input from a network connection.
Value & get(size_type index) const
Reads a Value v from a certain part of the list.
Bottle & findGroup(const std::string &key) const override
Gets a list corresponding to a given keyword.
bool check(const std::string &key) const override
Check if there exists a property of the given name.
bool write(ConnectionWriter &writer) const override
Output a representation of the bottle to a network connection.
void addInt32(std::int32_t x)
Places a 32-bit integer in the bottle, at the end of the list.
void addString(const char *str)
Places a string in the bottle, at the end of the list.
std::string toString() const override
Gives a human-readable textual representation of the bottle.
Value & find(const std::string &key) const override
Gets a value corresponding to a given keyword.
A base class for connection types (tcp, mcast, shmem, ...) which are called carriers in YARP.
bool isConnectionless() const override=0
Check if this carrier is connectionless (like udp, mcast) or connection based (like tcp).
void getCarrierParams(Property ¶ms) const override
Get carrier configuration and deliver it by port administrative commands.
virtual bool configureFromProperty(yarp::os::Property &options)
bool isPush() const override
Check if carrier is "push" or "pull" style.
bool acceptOutgoingData(const PortWriter &writer) override
Determine whether outgoing data should be accepted.
const PortWriter & modifyOutgoingData(const PortWriter &writer) override
Modify outgoing payload data, if appropriate.
void setCarrierParams(const Property ¶ms) override
Configure carrier from port administrative commands.
static Face * listen(const Contact &address)
Create a "proto-carrier" interface object that waits for incoming connections prior to a carrier bein...
static Carrier * getCarrierTemplate(const std::string &name)
Get template for carrier.
static Carrier * chooseCarrier(const std::string &name)
Select a carrier by name.
static OutputProtocol * connect(const Contact &address)
Initiate a connection to an address.
An interface for reading from a network connection.
virtual void requestDrop()=0
Tag the connection to be dropped after the current message.
virtual ConnectionWriter * getWriter()=0
Gets a way to reply to the message, if possible.
An interface for writing to a network connection.
virtual bool isPush() const =0
Check if carrier is "push" or "pull" style.
A dummy connection to test yarp::os::Portable implementations.
ConnectionWriter & getWriter()
Get the dummy ConnectionWriter loaded with whatever was written the ConnectionWriter since it was las...
ConnectionReader & getReader(ConnectionWriter *replyWriter=nullptr)
Get the dummy ConnectionReader loaded with whatever was written the ConnectionWriter since it was las...
virtual InputProtocol * read()=0
Block and wait for someone to talk to us.
virtual Contact getLocalAddress() const
Get address after open(), if more specific that the address provided to open() - otherwise an invalid...
virtual OutputProtocol * write(const Contact &address)=0
Try to reach out and talk to someone.
virtual void close()=0
Stop listening.
Simple abstraction for a YARP port name.
Contact toAddress() const
Create an address from the name.
std::string getCarrierModifier(const char *mod, bool *hasModifier=nullptr)
static bool initialized()
Returns true if YARP has been fully initialized.
static bool getLocalMode()
Get current value of flag "localMode", see setLocalMode function.
static Contact unregisterName(const std::string &name)
Removes the registration for a name from the name server.
static NameStore * getQueryBypass()
static Contact queryName(const std::string &name)
Find out information about a registered name.
static int disconnectInput(const std::string &src, const std::string &dest, bool silent=false)
Sends a disconnection command to the specified port.
static bool writeToNameServer(PortWriter &cmd, PortReader &reply, const ContactStyle &style)
Variant write method specialized to name server.
static bool disconnect(const std::string &src, const std::string &dest, bool quiet)
Request that an output port disconnect from an input port.
static bool write(const Contact &contact, PortWriter &cmd, PortReader &reply, bool admin=false, bool quiet=false, double timeout=-1)
Send a single command to a port and await a single response.
The output side of an active connection between two ports.
virtual OutputStream & getOutputStream()=0
Access the output stream associated with the connection.
virtual Connection & getConnection()=0
Get the connection whose protocol operations we are managing.
virtual bool open(const Route &route)=0
Start negotiating a carrier, using the given route (this should generally match the name of the sendi...
virtual void attachPort(Contactable *port)=0
Set the port to be associated with the connection.
virtual const Route & getRoute() const =0
virtual InputProtocol & getInput()=0
Get an interface for doing read operations on the connection.
virtual void close()=0
Negotiate an end to operations.
virtual void rename(const Route &route)=0
Relabel the route after the fact (e.g.
virtual bool setTimeout(double timeout)=0
Set the timeout to be used for network operations.
virtual bool isOk() const =0
Check if the connection is valid and can be used.
Simple specification of the minimum functions needed from output streams.
virtual int getTypeOfService()
virtual bool setTypeOfService(int tos)
Information about a port connection or event.
std::string targetName
Name of connection target, if any.
std::string carrierName
Name of protocol type, if releveant.
bool incoming
True if a connection is incoming, false if outgoing.
std::string message
A human-readable description of contents.
std::string portName
Name of port.
int tag
Type of information.
std::string sourceName
Name of connection source, if any.
@ PORTINFO_CONNECTION
Information about an incoming or outgoing connection.
@ PORTINFO_MISC
Unspecified information.
Interface implemented by all objects that can read themselves from the network, such as Bottle object...
virtual bool read(ConnectionReader &reader)=0
Read this object from a network connection.
virtual Type getReadType() const
A base class for objects that want information about port status changes.
virtual void report(const PortInfo &info)=0
Callback for port event/state information.
Interface implemented by all objects that can write themselves to the network, such as Bottle objects...
virtual bool write(ConnectionWriter &writer) const =0
Write this object to a network connection.
virtual void onCommencement() const
This is called when the port is about to begin writing operations.
A class for storing options and configuration information.
Value & find(const std::string &key) const override
Gets a value corresponding to a given keyword.
std::string toString() const override
Return a standard text representation of the content of the object.
void fromString(const std::string &txt, bool wipe=true)
Interprets a string as a list of properties.
void put(const std::string &key, const std::string &value)
Associate the given key with the given string.
bool check(const std::string &key) const override
Check if there exists a property of the given name.
PacketPriorityDSCP
The PacketPriorityDSCP defines the packets quality of service (priority) using DSCP.
static PacketPriorityDSCP getDSCPByVocab(yarp::conf::vocab32_t vocab)
returns the IPV4/6 DSCP value given as DSCP code
static std::string fromRosName(const std::string &name)
Information about a connection between two ports.
const std::string & getToName() const
Get the destination of the route.
const std::string & getCarrierName() const
Get the carrier type of the route.
std::string toString() const
Render a text form of the route, "source->carrier->dest".
const std::string & getFromName() const
Get the source of the route.
void swapNames()
Swap from and to names.
void setToContact(const Contact &toContact)
Set the destination contact of the route.
An OutputStream that produces a string.
std::string toString() const
static ProcessInfo getProcessInfo(int pid=0)
gets the operating system process information given by its PID.
static PlatformInfo getPlatformInfo()
getPlatformInfo
std::string getName() const
A single value (typically within a Bottle).
virtual bool isString() const
Checks if value is a string.
virtual yarp::conf::vocab32_t asVocab32() const
Get vocabulary identifier as an integer.
virtual std::int32_t asInt32() const
Get 32-bit integer value.
virtual Bottle * asList() const
Get list value.
virtual std::string asString() const
Get string value.
A helper for creating cached object descriptions.
std::string toString() const
bool write(ConnectionWriter &connection) const override
Write this object to a network connection.
void restart()
Tell the writer that we will be serializing a new object, but to keep any cached buffers that already...
virtual void appendLine(const std::string &data)
Send a string along with a carriage-return-line-feed sequence.
A helper for recording entire message/reply transactions.
void init(yarp::os::ConnectionReader *wrappedReader)
Call this to wrap a specific ConnectionReader.
void fini()
Call this when all reading/writing has been done.
Manager for a single output from a port.
A single message, potentially being transmitted on multiple connections.
void setContent(const yarp::os::PortWriter *writable, bool owned=false, const yarp::os::PortWriter *callback=nullptr, bool ownedCallback=false)
Configure the object being sent and where to send notifications.
void inc()
Increment the usage count for this messagae.
void dec()
Decrement the usage count for this messagae.
PortCorePacket * getFreePacket()
Get a packet that we can prepare for sending.
bool checkPacket(PortCorePacket *packet)
Move a packet to the inactive state if it has finished being sent on all connections.
This manages a single threaded resource related to a single input or output connection.
std::string getPupString() const
void setPupped(const std::string &pupString)
Tag this connection as having been created by a publisherUpdate message to the port's administrative ...
virtual bool isFinished()
std::string getMode(bool *hasMode=nullptr)
Read the "mode" of the connection - basically, whether it is used for logging or not.
virtual void getCarrierParams(yarp::os::Property ¶ms)
void setDoomed()
Request that this connection be shut down as soon as possible.
virtual void * send(const yarp::os::PortWriter &writer, yarp::os::PortReader *reader, const yarp::os::PortWriter *callback, void *tracker, const std::string &envelope, bool waitAfter=true, bool waitBefore=true, bool *gotReply=nullptr)
Send a message on the connection.
virtual void setCarrierParams(const yarp::os::Property ¶ms)
Set arbitrary parameters for this connection.
void notifyCompletion(void *tracker)
Call the right onCompletion() after sending message.
void resetPortName(const std::string &str)
std::string getEnvelope()
void setAdminReadHandler(yarp::os::PortReader &reader)
Set a callback for incoming administrative messages.
int getOutputCount()
Check how many output connections there are.
bool readBlock(ConnectionReader &reader, void *id, yarp::os::OutputStream *os)
Read a block of regular payload data.
void setReportCallback(yarp::os::PortReport *reporter)
Set a callback to be notified of changes in port status.
void run() override
The body of the main thread.
bool start() override
Begin main thread.
void checkType(PortReader &reader)
void resume()
Undo an interrupt()
void setTimeout(float timeout)
yarp::os::PortReaderCreator * getReadCreator()
Get the creator of callbacks.
Property * acquireProperties(bool readOnly)
void interrupt()
Prepare the port to be shut down.
bool setCallbackLock(std::mutex *mutex=nullptr)
void setEnvelope(const std::string &envelope)
Set some envelope information to pass along with a message without actually being part of the message...
bool removeIO(const Route &route, bool synch=false)
Remove any connection matching the supplied route.
bool isInterrupted() const
void report(const yarp::os::PortInfo &info)
Handle a port event (connection, disconnection, etc) Generate a description of the connections associ...
bool listen(const Contact &address, bool shouldAnnounce=true)
Begin service at a given address.
bool sendHelper(const yarp::os::PortWriter &writer, int mode, yarp::os::PortReader *reader=nullptr, const yarp::os::PortWriter *callback=nullptr)
Send a message with a specific mode (normal or log).
void removeOutput(const std::string &dest, void *id, yarp::os::OutputStream *os)
Remove an output connection.
void setControlRegistration(bool flag)
Normally the port will unregister its name with the name server when shutting down.
int getInputCount()
Check how many input connections there are.
bool manualStart(const char *sourceName)
Start up the port, but without a main thread.
void setReadHandler(yarp::os::PortReader &reader)
Set a callback for incoming data.
void reportUnit(PortCoreUnit *unit, bool active)
Called by a connection handler with active=true just after it is fully configured,...
void promiseType(const Type &typ)
void releaseProperties(Property *prop)
int getEventCount()
A diagnostic for testing purposes.
void close() override
Shut down port.
const Contact & getAddress() const
Get the address associated with the port.
void describe(void *id, yarp::os::OutputStream *os)
Produce a text description of the port and its connections.
bool isWriting()
Check if a message is currently being sent.
bool adminBlock(ConnectionReader &reader, void *id)
Read a block of administrative data.
void removeInput(const std::string &src, void *id, yarp::os::OutputStream *os)
Remove an input connection.
unsigned int getFlags()
Check current configuration of port.
bool send(const yarp::os::PortWriter &writer, yarp::os::PortReader *reader=nullptr, const yarp::os::PortWriter *callback=nullptr)
Send a normal message.
void resetReportCallback()
Reset the callback to be notified of changes in port status.
void setReadCreator(yarp::os::PortReaderCreator &creator)
Set a callback for creating callbacks for incoming data.
yarp::os::impl::PortDataModifier & getPortModifier()
void setName(const std::string &name)
Set the name of this port.
bool removeCallbackLock()
bool addOutput(const std::string &dest, void *id, yarp::os::OutputStream *os, bool onlyIfNeeded=false)
Add an output connection to this port.
This is the heart of a yarp port.
yarp::os::Carrier * outputModifier
yarp::os::Carrier * inputModifier
void releaseOutModifier()
Lets Readable objects read from the underlying InputStream associated with the connection between two...
void reset(yarp::os::InputStream &in, TwoWayStream *str, const Route &route, size_t len, bool textMode, bool bareMode=false)
int join(double seconds=-1)
int setPriority(int priority=-1, int policy=-1)
#define yCIAssert(component, id, x)
#define yCWarning(component,...)
#define yCIError(component, id,...)
#define yCITrace(component, id,...)
#define yCIDebug(component, id,...)
#define yCIWarning(component, id,...)
#define YARP_OS_LOG_COMPONENT(name, name_string)
std::string get_string(const std::string &key, bool *found=nullptr)
Read a string from an environment variable.
std::string to_string(IntegerType x)
The components from which ports and connections are built.
An interface to the operating system, including Port based communication.
int getpid()
Portable wrapper for the getppid() function.
std::int32_t NetInt32
Definition of the NetInt32 type.
constexpr yarp::conf::vocab32_t createVocab32(char a, char b=0, char c=0, char d=0)
Create a vocab from chars.
The main, catch-all namespace for YARP.
The ProcessInfo struct provides the operating system process information.