36 # include <ace/INET_Addr.h>
37 # include <ace/Sched_Params.h>
44 #if defined(__linux__)
46 # include <sys/types.h>
70 yCDebug(PORTCORE,
"Starting listening on %s", address.
toURI().c_str());
74 yCError(PORTCORE,
"YARP not initialized; create a yarp::os::Network object before using ports");
82 std::lock_guard<std::mutex> lock(m_stateMutex);
90 yCAssert(PORTCORE, !m_closing.load());
91 yCAssert(PORTCORE, !m_finished.load());
92 yCAssert(PORTCORE, m_face ==
nullptr);
100 m_address.setTimeout(m_timeout);
105 if (m_face ==
nullptr) {
110 if (m_address.getPort() <= 0) {
111 m_address = m_face->getLocalAddress();
112 if (m_address.getRegName() ==
"...") {
114 setName(m_address.getRegName());
119 m_listening.store(
true);
123 if (shouldAnnounce) {
143 yCAssert(PORTCORE, !m_running.load());
144 yCAssert(PORTCORE, m_reader ==
nullptr);
151 yCAssert(PORTCORE, !m_running.load());
152 yCAssert(PORTCORE, m_adminReader ==
nullptr);
153 m_adminReader = &reader;
159 yCAssert(PORTCORE, !m_running.load());
160 yCAssert(PORTCORE, m_readableCreator ==
nullptr);
161 m_readableCreator = &creator;
178 yCAssert(PORTCORE, m_listening.load());
179 yCAssert(PORTCORE, !m_running.load());
180 yCAssert(PORTCORE, !m_closing.load());
181 yCAssert(PORTCORE, !m_finished.load());
182 yCAssert(PORTCORE, m_starting.load());
187 std::lock_guard<std::mutex> lock(m_stateMutex);
188 m_running.store(
true);
189 m_starting.store(
false);
193 m_stateCv.notify_one();
195 yCTrace(PORTCORE,
"run running");
201 bool shouldStop =
false;
202 while (!shouldStop) {
209 std::lock_guard<std::mutex> lock(m_stateMutex);
214 yCDebug(PORTCORE,
"received something");
221 shouldStop |= m_closing.load();
234 yCDebug(PORTCORE,
"spun off a connection");
249 std::lock_guard<std::mutex> lock(m_stateMutex);
250 m_connectionListeners = 0;
251 m_connectionChangeCv.notify_all();
254 yCTrace(PORTCORE,
"run closing");
257 std::lock_guard<std::mutex> lock(m_stateMutex);
258 m_connectionListeners = 0;
259 m_connectionChangeCv.notify_all();
260 m_finished.store(
true);
268 if (m_prop !=
nullptr) {
272 m_modifier.releaseOutModifier();
273 m_modifier.releaseInModifier();
282 std::unique_lock<std::mutex> lock(m_stateMutex);
285 yCAssert(PORTCORE, m_listening.load());
286 yCAssert(PORTCORE, !m_running.load());
287 yCAssert(PORTCORE, !m_starting.load());
288 yCAssert(PORTCORE, !m_finished.load());
289 yCAssert(PORTCORE, !m_closing.load());
291 m_starting.store(
true);
301 m_stateCv.wait(lock, [&]{
return m_running.load(); });
302 yCAssert(PORTCORE, m_running.load());
310 yCTrace(PORTCORE,
"manualStart");
318 m_interruptable =
false;
328 m_interrupted =
false;
333 yCTrace(PORTCORE,
"interrupt");
336 if (!m_listening.load()) {
341 m_interrupted =
true;
346 if (!m_interruptable) {
356 std::lock_guard<std::mutex> lock(m_stateMutex);
357 if (m_reader !=
nullptr) {
358 yCDebug(PORTCORE,
"sending update-state message to listener");
368 void PortCore::closeMain()
370 yCTrace(PORTCORE,
"closeMain");
374 std::lock_guard<std::mutex> lock(m_stateMutex);
377 if (m_finishing || !(m_running.load() || m_manual)) {
378 yCTrace(PORTCORE,
"closeMain - nothing to do");
382 yCTrace(PORTCORE,
"closeMain - Central");
386 yCDebug(PORTCORE,
"now preparing to shut down port");
396 std::string prevName;
399 std::string removeName;
402 std::lock_guard<std::mutex> lock(m_stateMutex);
403 for (
auto* unit : m_units) {
404 if ((unit !=
nullptr) && unit->isInput() && !unit->isDoomed()) {
405 Route r = unit->getRoute();
407 if (s.length() >= 1 && s[0] ==
'/' && s != getName() && s != prevName) {
416 yCDebug(PORTCORE,
"requesting removal of connection from %s", removeName.c_str());
425 prevName = removeName;
437 std::lock_guard<std::mutex> lock(m_stateMutex);
438 for (
auto* unit : m_units) {
439 if ((unit !=
nullptr) && unit->isOutput() && !unit->isFinished()) {
440 removeRoute = unit->getRoute();
449 removeUnit(removeRoute,
true);
453 bool stopRunning = m_running.load();
458 m_closing.store(
true);
472 yCAssert(PORTCORE, m_finished.load());
481 std::lock_guard<std::mutex> lock(m_stateMutex);
482 m_finished.store(
false);
483 m_closing.store(
false);
484 m_running.store(
false);
490 if (m_listening.load()) {
491 yCAssert(PORTCORE, m_face !=
nullptr);
495 m_listening.store(
false);
501 if (m_reader !=
nullptr) {
502 yCDebug(PORTCORE,
"sending end-of-port message to listener");
510 std::string name = getName();
511 if (name != std::string(
"")) {
512 if (m_controlRegistration) {
522 yCAssert(PORTCORE, !m_listening.load());
523 yCAssert(PORTCORE, !m_running.load());
524 yCAssert(PORTCORE, !m_starting.load());
525 yCAssert(PORTCORE, !m_closing.load());
526 yCAssert(PORTCORE, !m_finished.load());
528 yCAssert(PORTCORE, m_face ==
nullptr);
535 std::lock_guard<std::mutex> lock(m_stateMutex);
541 void PortCore::closeUnits()
545 yCAssert(PORTCORE, m_finished.load());
549 for (
auto& i : m_units) {
551 if (unit !=
nullptr) {
552 yCDebug(PORTCORE,
"closing a unit");
554 yCDebug(PORTCORE,
"joining a unit");
557 yCDebug(PORTCORE,
"deleting a unit");
566 void PortCore::reapUnits()
570 if (!m_finished.load()) {
571 std::lock_guard<std::mutex> lock(m_stateMutex);
572 for (
auto* unit : m_units) {
575 yCDebug(PORTCORE,
"Informing connection %s that it is doomed", s.c_str());
577 yCDebug(PORTCORE,
"Closed connection %s", s.c_str());
579 yCDebug(PORTCORE,
"Joined thread of connection %s", s.c_str());
586 void PortCore::cleanUnits(
bool blocking)
593 std::unique_lock<std::mutex> lock(m_stateMutex, std::defer_lock);
597 bool have_lock = lock.try_lock();
605 int updatedInputCount = 0;
606 int updatedOutputCount = 0;
607 int updatedDataOutputCount = 0;
608 yCDebug(PORTCORE,
"/ routine check of connections to this port begins");
609 if (!m_finished.load()) {
612 for (
auto& i : m_units) {
614 if (unit !=
nullptr) {
618 yCDebug(PORTCORE,
"| removing connection %s", con.c_str());
623 yCDebug(PORTCORE,
"| removed connection %s", con.c_str());
628 updatedOutputCount++;
630 updatedDataOutputCount++;
647 for (
size_t i2 = 0; i2 < m_units.size(); i2++) {
648 if (m_units[i2] !=
nullptr) {
650 m_units[rem] = m_units[i2];
651 m_units[i2] =
nullptr;
658 for (
size_t i3 = 0; i3 < m_units.size() - rem; i3++) {
664 m_dataOutputCount = updatedDataOutputCount;
667 m_packetMutex.lock();
668 m_inputCount = updatedInputCount;
669 m_outputCount = updatedOutputCount;
670 m_packetMutex.unlock();
671 yCDebug(PORTCORE,
"\\ routine check of connections to this port ends");
685 std::lock_guard<std::mutex> lock(m_stateMutex);
690 yCAssert(PORTCORE, unit !=
nullptr);
692 m_units.push_back(unit);
693 yCTrace(PORTCORE,
"there are now %zu units", m_units.size());
699 yCTrace(PORTCORE,
"addOutput");
709 if (!m_finished.load()) {
710 std::lock_guard<std::mutex> lock(m_stateMutex);
712 yCAssert(PORTCORE, unit !=
nullptr);
714 m_units.push_back(unit);
719 bool PortCore::isUnit(
const Route& route,
int index)
722 bool needReap =
false;
723 if (!m_finished.load()) {
724 for (
auto* unit : m_units) {
725 if (unit !=
nullptr) {
727 std::string wild =
"*";
730 ok = ok && (unit->
getIndex() == index);
754 bool PortCore::removeUnit(
const Route& route,
bool synch,
bool* except)
759 if (except !=
nullptr) {
760 yCDebug(PORTCORE,
"asked to remove connection in the way of %s", route.
toString().c_str());
763 yCDebug(PORTCORE,
"asked to remove connection %s", route.
toString().c_str());
768 std::vector<int> removals;
770 bool needReap =
false;
771 if (!m_finished.load()) {
772 std::lock_guard<std::mutex> lock(m_stateMutex);
773 for (
auto* unit : m_units) {
774 if (unit !=
nullptr) {
776 std::string wild =
"*";
785 if (except ==
nullptr) {
797 removals.push_back(unit->
getIndex());
810 yCDebug(PORTCORE,
"one or more connections need prodding to die");
822 yCDebug(PORTCORE,
"sent message to prod connection death");
826 yCDebug(PORTCORE,
"synchronizing with connection death");
829 std::unique_lock<std::mutex> lock(m_stateMutex);
830 while (std::any_of(removals.begin(), removals.end(), [&](
int removal){ return isUnit(route, removal); })) {
831 m_connectionListeners++;
832 m_connectionChangeCv.wait(lock, [&]{
return m_connectionListeners == 0; });
848 yCDebug(PORTCORE,
"asked to add output to %s", dest.c_str());
861 bw.
appendLine(std::string(
"Do not know how to connect to ") + dest);
878 yCDebug(PORTCORE,
"output already present to %s", dest.c_str());
879 bw.
appendLine(std::string(
"Desired connection already present from ") + getName() +
" to " + dest);
893 aname = address.
toURI(
false);
905 unsigned int f = getFlags();
910 bool is_log = (!mode.empty());
913 err =
"Logger configured as log." + mode +
", but only log.in is supported";
916 append =
"; " + r.
getFromName() +
" will forward messages and replies (if any) to " + r.
getToName();
927 err =
"Outputs not allowed";
932 if (m_dataOutputCount >= 1 && !is_log) {
933 err =
"RPC output already connected";
959 bool ok = op->
open(r);
961 yCDebug(PORTCORE,
"open route error");
969 bw.
appendLine(std::string(
"Cannot connect to ") + dest);
990 if (!m_finished.load()) {
991 std::lock_guard<std::mutex> lock(m_stateMutex);
996 yCAssert(PORTCORE, unit !=
nullptr);
998 m_units.push_back(unit);
1003 bw.
appendLine(std::string(
"Added connection from ") + getName() +
" to " + dest + append);
1004 if (os !=
nullptr) {
1015 yCDebug(PORTCORE,
"asked to remove output to %s", dest.c_str());
1019 if (removeUnit(
Route(
"*", dest,
"*"),
true)) {
1020 bw.
appendLine(std::string(
"Removed connection from ") + getName() +
" to " + dest);
1022 bw.
appendLine(std::string(
"Could not find an outgoing connection to ") + dest);
1024 if (os !=
nullptr) {
1033 yCDebug(PORTCORE,
"asked to remove input to %s", src.c_str());
1037 if (removeUnit(
Route(src,
"*",
"*"),
true)) {
1038 bw.
appendLine(std::string(
"Removing connection from ") + src +
" to " + getName());
1040 bw.
appendLine(std::string(
"Could not find an incoming connection from ") + src);
1042 if (os !=
nullptr) {
1059 std::lock_guard<std::mutex> lock(m_stateMutex);
1062 bw.
appendLine(std::string(
"This is ") + m_address.getRegName() +
" at " + m_address.toURI());
1066 for (
auto* unit : m_units) {
1075 bw.
appendLine(
"There are no outgoing connections");
1080 for (
auto* unit : m_units) {
1091 bw.
appendLine(
"There are no incoming connections");
1096 if (os !=
nullptr) {
1101 printf(
"%s\n", sos.
toString().c_str());
1110 std::lock_guard<std::mutex> lock(m_stateMutex);
1115 std::string portName = m_address.getRegName();
1116 baseInfo.
message = std::string(
"This is ") + portName +
" at " + m_address.toURI();
1117 reporter.
report(baseInfo);
1121 for (
auto* unit : m_units) {
1140 info.
message =
"There are no outgoing connections";
1146 for (
auto* unit : m_units) {
1165 info.
message =
"There are no incoming connections";
1173 std::lock_guard<std::mutex> lock(m_stateMutex);
1174 if (reporter !=
nullptr) {
1175 m_eventReporter = reporter;
1181 std::lock_guard<std::mutex> lock(m_stateMutex);
1182 m_eventReporter =
nullptr;
1193 if (m_eventReporter !=
nullptr) {
1194 m_eventReporter->report(info);
1211 if (m_reader !=
nullptr && !m_interrupted) {
1212 m_interruptable =
false;
1214 bool haveOutputs = (m_outputCount != 0);
1216 if (m_logNeeded && haveOutputs) {
1223 recorder.
init(&reader);
1225 result = m_reader->read(recorder);
1233 result = m_reader->read(reader);
1237 m_interruptable =
true;
1240 yCDebug(PORTCORE,
"data received, no reader for it");
1242 result = b.
read(reader);
1255 m_modifier.outputMutex.lock();
1256 if (m_modifier.outputModifier !=
nullptr) {
1257 if (!m_modifier.outputModifier->acceptOutgoingData(writer)) {
1258 m_modifier.outputMutex.unlock();
1261 m_modifier.outputModifier->modifyOutgoingData(writer);
1263 m_modifier.outputMutex.unlock();
1277 if (m_interrupted || m_finishing) {
1282 bool gotReply =
false;
1284 std::string envelopeString = m_envelope;
1294 yCTrace(PORTCORE,
"------- send in real");
1306 std::lock_guard<std::mutex> lock(m_stateMutex);
1309 if (m_finished.load()) {
1313 yCTrace(PORTCORE,
"------- send in");
1316 m_packetMutex.lock();
1318 yCAssert(PORTCORE, packet !=
nullptr);
1319 packet->
setContent(&writer,
false, callback);
1320 m_packetMutex.unlock();
1323 for (
auto* unit : m_units) {
1325 bool log = (!unit->
getMode().empty());
1335 yCTrace(PORTCORE,
"------- -- inc");
1336 m_packetMutex.lock();
1338 m_packetMutex.unlock();
1339 yCTrace(PORTCORE,
"------- -- pre-send");
1340 bool gotReplyOne =
false;
1342 void* out = unit->
send(writer,
1344 (callback !=
nullptr) ? callback : (&writer),
1345 reinterpret_cast<void*
>(packet),
1350 gotReply = gotReply || gotReplyOne;
1351 yCTrace(PORTCORE,
"------- -- send");
1352 if (out !=
nullptr) {
1354 m_packetMutex.lock();
1357 m_packetMutex.unlock();
1364 yCTrace(PORTCORE,
"------- -- dec");
1367 yCTrace(PORTCORE,
"------- pack check");
1368 m_packetMutex.lock();
1375 m_packets.checkPacket(packet);
1376 m_packetMutex.unlock();
1377 yCTrace(PORTCORE,
"------- packed");
1378 yCTrace(PORTCORE,
"------- send out");
1380 if (logCount == 0) {
1381 m_logNeeded =
false;
1385 yCTrace(PORTCORE,
"------- send out real");
1387 if (m_waitAfterSend && reader !=
nullptr) {
1388 all_ok = all_ok && gotReply;
1397 bool writing =
false;
1401 if (!m_finished.load()) {
1402 std::lock_guard<std::mutex> lock(m_stateMutex);
1403 for (
auto* unit : m_units) {
1417 m_packetMutex.lock();
1418 int result = m_inputCount;
1419 m_packetMutex.unlock();
1426 m_packetMutex.lock();
1427 int result = m_outputCount;
1428 m_packetMutex.unlock();
1435 yCTrace(PORTCORE,
"starting notifyCompletion");
1436 m_packetMutex.lock();
1437 if (tracker !=
nullptr) {
1441 m_packetMutex.unlock();
1442 yCTrace(PORTCORE,
"stopping notifyCompletion");
1448 m_envelopeWriter.restart();
1449 bool ok = envelope.
write(m_envelopeWriter);
1451 setEnvelope(m_envelopeWriter.toString());
1459 m_envelope = envelope;
1460 for (
size_t i = 0; i < m_envelope.length(); i++) {
1463 if (m_envelope[i] < 32) {
1464 m_envelope = m_envelope.substr(0, i);
1468 yCDebug(PORTCORE,
"set envelope to %s", m_envelope.c_str());
1479 sis.
add(m_envelope);
1483 sbr.
reset(sis,
nullptr, route, 0,
true);
1484 return envelope.
read(sbr);
1492 const char* carrier,
1498 style.
quiet = !verbose;
1515 result = addr.set(c.
getPort(),
"127.0.0.1");
1519 result = addr.set(c.
getPort(),
"127.0.1.1");
1568 if (cmd ==
"publisherUpdate") {
1569 return PortCoreCommand::RosPublisherUpdate;
1571 if (cmd ==
"requestTopic") {
1572 return PortCoreCommand::RosRequestTopic;
1574 if (cmd ==
"getPid") {
1575 return PortCoreCommand::RosGetPid;
1577 if (cmd ==
"getBusInfo") {
1578 return PortCoreCommand::RosGetBusInfo;
1582 auto cmd =
static_cast<PortCoreCommand
>(v.
asVocab32());
1584 case PortCoreCommand::Help:
1585 case PortCoreCommand::Ver:
1586 case PortCoreCommand::Pray:
1587 case PortCoreCommand::Add:
1588 case PortCoreCommand::Del:
1589 case PortCoreCommand::Atch:
1590 case PortCoreCommand::Dtch:
1591 case PortCoreCommand::List:
1592 case PortCoreCommand::Set:
1593 case PortCoreCommand::Get:
1594 case PortCoreCommand::Prop:
1595 case PortCoreCommand::RosPublisherUpdate:
1596 case PortCoreCommand::RosRequestTopic:
1597 case PortCoreCommand::RosGetPid:
1598 case PortCoreCommand::RosGetBusInfo:
1601 return PortCoreCommand::Unknown;
1605 PortCoreConnectionDirection parseConnectionDirection(
yarp::conf::vocab32_t v,
bool errorIsOut =
false)
1607 auto dir =
static_cast<PortCoreConnectionDirection
>(v);
1609 case PortCoreConnectionDirection::In:
1610 case PortCoreConnectionDirection::Out:
1613 return errorIsOut ? PortCoreConnectionDirection::Out : PortCoreConnectionDirection::Error;
1619 auto action =
static_cast<PortCorePropertyAction
>(v);
1621 case PortCorePropertyAction::Get:
1622 case PortCorePropertyAction::Set:
1625 return PortCorePropertyAction::Error;
1629 void describeRoute(
const Route& route,
Bottle& result)
1646 bconnectionless.
addString(
"connectionless");
1649 if (!carrier->
isPush()) {
1671 yCDebug(PORTCORE,
"Port %s received command %s", getName().c_str(), cmd.
toString().c_str());
1673 auto handleAdminHelpCmd = []() {
1677 result.
addString(
"[help] # give this help");
1678 result.
addString(
"[ver] # report protocol version information");
1679 result.
addString(
"[add] $portname # add an output connection");
1680 result.
addString(
"[add] $portname $car # add an output with a given protocol");
1681 result.
addString(
"[del] $portname # remove an input or output connection");
1682 result.
addString(
"[list] [in] # list input connections");
1683 result.
addString(
"[list] [out] # list output connections");
1684 result.
addString(
"[list] [in] $portname # give details for input");
1685 result.
addString(
"[list] [out] $portname # give details for output");
1686 result.
addString(
"[prop] [get] # get all user-defined port properties");
1687 result.
addString(
"[prop] [get] $prop # get a user-defined port property (prop, val)");
1688 result.
addString(
"[prop] [set] $prop $val # set a user-defined port property (prop, val)");
1689 result.
addString(
"[prop] [get] $portname # get Qos properties of a connection to/from a port");
1690 result.
addString(
"[prop] [set] $portname # set Qos properties of a connection to/from a port");
1691 result.
addString(
"[prop] [get] $cur_port # get information about current process (e.g., scheduling priority, pid)");
1692 result.
addString(
"[prop] [set] $cur_port # set properties of the current process (e.g., scheduling priority, pid)");
1693 result.
addString(
"[atch] [out] $prop # attach a portmonitor plug-in to the port's output");
1694 result.
addString(
"[atch] [in] $prop # attach a portmonitor plug-in to the port's input");
1695 result.
addString(
"[dtch] [out] # detach portmonitor plug-in from the port's output");
1696 result.
addString(
"[dtch] [in] # detach portmonitor plug-in from the port's input");
1702 auto handleAdminVerCmd = []() {
1713 auto handleAdminPrayCmd = [
this]() {
1725 while (name[0] ==
'/') {
1726 name = name.substr(1);
1729 auto i = name.find(
'/');
1730 if (i != std::string::npos) {
1731 name = name.substr(0, i);
1735 std::random_device rd;
1736 std::mt19937 mt(rd());
1737 std::uniform_int_distribution<int> dist2(0,1);
1738 auto d2 = std::bind(dist2, mt);
1740 result.
addString(
"You begin praying to " + name +
".");
1741 result.
addString(
"You finish your prayer.");
1743 static const char* godvoices[] = {
1749 std::uniform_int_distribution<int> godvoices_dist(0, (
sizeof(godvoices) /
sizeof(godvoices[0])) - 1);
1750 auto godvoice = [&]() {
1751 return std::string(godvoices[godvoices_dist(mt)]);
1754 static const char* creatures[] = {
1759 std::uniform_int_distribution<int> creatures_dist(0, (
sizeof(creatures) /
sizeof(creatures[0])) - 1);
1760 auto creature = [&]() {
1761 return std::string(creatures[creatures_dist(mt)]);
1764 static const char* auras[] = {
1772 std::uniform_int_distribution<int> auras_dist(0, (
sizeof(auras) /
sizeof(auras[0])) - 1);
1774 return std::string(auras[auras_dist(mt)]);
1777 static const char* items[] = {
1787 std::uniform_int_distribution<int> items_dist(0, (
sizeof(items) /
sizeof(items[0])) - 1);
1789 return std::string(items[items_dist(mt)]);
1792 static const char* blessings[] = {
1793 "You feel more limber.",
1794 "The slime disappears.",
1795 "Your amulet vanishes! You can breathe again.",
1796 "You can breathe again.",
1797 "You are back on solid ground.",
1798 "Your stomach feels content.",
1800 "You feel much better.",
1801 "Your surroundings change.",
1802 "Your shape becomes uncertain.",
1803 "Your chain disappears.",
1804 "There's a tiger in your tank.",
1805 "You feel in good health again.",
1806 "Your eye feels better.",
1807 "Your eyes feel better.",
1808 "Looks like you are back in Kansas.",
1809 "Your <ITEM> softly glows <AURA>.",
1811 std::uniform_int_distribution<int> blessings_dist(0, (
sizeof(blessings) /
sizeof(blessings[0])) - 1);
1812 auto blessing = [&](){
1813 auto blessing = std::string(blessings[blessings_dist(mt)]);
1814 blessing = std::regex_replace(blessing, std::regex(
"<ITEM>"), item());
1815 blessing = std::regex_replace(blessing, std::regex(
"<AURA>"), aura());
1819 std::uniform_int_distribution<int> dist13(0,12);
1820 switch(dist13(mt)) {
1823 result.
addString(
"You feel that " + name +
" is " + (d2() ?
"bummed" :
"displeased") +
".");
1827 result.
addString(
"The voice of " + name +
" " + godvoice() +
1828 ": \"Thou " + (d2() ?
"hast strayed from the path" :
"art arrogant") +
1829 ", " + creature() +
". Thou must relearn thy lessons!\"");
1833 result.
addString(
"The voice of " + name +
" " + godvoice() +
1834 ": \"Thou hast angered me.\"");
1835 result.
addString(
"A black glow surrounds you.");
1838 result.
addString(
"The voice of " + name +
" " + godvoice() +
1839 ": \"Thou hast angered me.\"");
1843 result.
addString(
"The voice of " + name +
" " + godvoice() +
1844 ": \"Thou durst " + (d2() ?
"scorn" :
"call upon") +
1845 " me? Then die, " + creature() +
"!\"");
1848 result.
addString(
"You feel that " + name +
" is " + (d2() ?
"pleased as punch" :
"well-pleased") +
".");
1852 result.
addString(
"You feel that " + name +
" is " + (d2() ?
"ticklish" :
"pleased") +
".");
1856 result.
addString(
"You feel that " + name +
" is " + (d2() ?
"full" :
"satisfied") +
".");
1860 result.
addString(
"The voice of " + name +
" " + godvoice() +
1861 ": \"Thou hast angered me.\"");
1862 result.
addString(
"Suddenly, a bolt of lightning strikes you!");
1863 result.
addString(
"You fry to a crisp!");
1870 auto handleAdminAddCmd = [
this, id](std::string output,
1871 const std::string& carrier) {
1875 if (!carrier.empty()) {
1876 output = carrier +
":/" + output;
1878 addOutput(output,
id, &cache,
false);
1880 int v = (r[0] ==
'A') ? 0 : -1;
1886 auto handleAdminDelCmd = [
this, id](
const std::string& dest) {
1890 removeOutput(dest,
id, &cache);
1893 removeInput(dest,
id, &cache);
1895 int v = (r1[0] ==
'R' || r2[0] ==
'R') ? 0 : -1;
1897 if (r1[0] ==
'R' && r2[0] !=
'R') {
1899 }
else if (r1[0] !=
'R' && r2[0] ==
'R') {
1907 auto handleAdminAtchCmd = [
this](PortCoreConnectionDirection direction,
1910 switch (direction) {
1911 case PortCoreConnectionDirection::Out: {
1913 if (!attachPortMonitor(prop,
true, errMsg)) {
1920 case PortCoreConnectionDirection::In: {
1922 if (!attachPortMonitor(prop,
false, errMsg)) {
1929 case PortCoreConnectionDirection::Error:
1931 result.
addString(
"attach command must be followed by [out] or [in]");
1936 auto handleAdminDtchCmd = [
this](PortCoreConnectionDirection direction) {
1938 switch (direction) {
1939 case PortCoreConnectionDirection::Out: {
1940 if (detachPortMonitor(
true)) {
1946 case PortCoreConnectionDirection::In: {
1947 if (detachPortMonitor(
false)) {
1953 case PortCoreConnectionDirection::Error:
1955 result.
addString(
"detach command must be followed by [out] or [in]");
1960 auto handleAdminListCmd = [
this](
const PortCoreConnectionDirection direction,
1961 const std::string& target) {
1963 switch (direction) {
1964 case PortCoreConnectionDirection::In: {
1966 std::lock_guard<std::mutex> lock(m_stateMutex);
1967 for (
auto* unit : m_units) {
1970 if (target.empty()) {
1972 if (!name.empty()) {
1976 describeRoute(route, result);
1981 case PortCoreConnectionDirection::Out: {
1983 std::lock_guard<std::mutex> lock(m_stateMutex);
1984 for (
auto* unit : m_units) {
1987 if (target.empty()) {
1989 }
else if (route.
getToName() == target) {
1990 describeRoute(route, result);
1995 case PortCoreConnectionDirection::Error:
2003 auto handleAdminSetInCmd = [
this](
const std::string& target,
2007 std::lock_guard<std::mutex> lock(m_stateMutex);
2008 if (target.empty()) {
2010 result.
addString(
"target port is not specified.\r\n");
2012 if (target == getName()) {
2014 if (!setParamPortMonitor(property,
false, errMsg)) {
2021 for (
auto* unit : m_units) {
2027 std::string msg =
"Configured connection from ";
2036 if (result.
size() == 0) {
2038 std::string msg =
"Could not find an incoming connection from ";
2047 auto handleAdminSetOutCmd = [
this](
const std::string& target,
2051 std::lock_guard<std::mutex> lock(m_stateMutex);
2052 if (target.empty()) {
2054 result.
addString(
"target port is not specified.\r\n");
2056 if (target == getName()) {
2058 if (!setParamPortMonitor(property,
true, errMsg)) {
2065 for (
auto* unit : m_units) {
2071 std::string msg =
"Configured connection to ";
2080 if (result.
size() == 0) {
2082 std::string msg =
"Could not find an incoming connection to ";
2091 auto handleAdminGetInCmd = [
this](
const std::string& target) {
2094 std::lock_guard<std::mutex> lock(m_stateMutex);
2095 if (target.empty()) {
2097 result.
addString(
"target port is not specified.\r\n");
2098 }
else if (target == getName()) {
2101 if (!getParamPortMonitor(property,
false, errMsg)) {
2108 for (
auto* unit : m_units) {
2119 if (result.
size() == 0) {
2121 std::string msg =
"Could not find an incoming connection from ";
2130 auto handleAdminGetOutCmd = [
this](
const std::string& target) {
2133 std::lock_guard<std::mutex> lock(m_stateMutex);
2134 if (target.empty()) {
2136 result.
addString(
"target port is not specified.\r\n");
2137 }
else if (target == getName()) {
2140 if (!getParamPortMonitor(property,
true, errMsg)) {
2147 for (
auto* unit : m_units) {
2158 if (result.
size() == 0) {
2160 std::string msg =
"Could not find an incoming connection to ";
2169 auto handleAdminPropGetCmd = [
this](
const std::string& key) {
2171 Property* p = acquireProperties(
false);
2177 if (key[0] ==
'/') {
2178 bool bFound =
false;
2180 if (key == getName()) {
2185 sched_prop.
put(
"tid",
static_cast<int>(this->getTid()));
2186 sched_prop.
put(
"priority", this->getPriority());
2187 sched_prop.
put(
"policy", this->getPolicy());
2193 proc_prop.
put(
"pid", info.
pid);
2194 proc_prop.
put(
"name", (info.
pid != -1) ? info.
name :
"unknown");
2195 proc_prop.
put(
"arguments", (info.
pid != -1) ? info.
arguments :
"unknown");
2203 platform_prop.
put(
"os", pinfo.
name);
2204 platform_prop.
put(
"hostname", m_address.getHost());
2206 unsigned int f = getFlags();
2213 port_prop.
put(
"is_input", is_input);
2214 port_prop.
put(
"is_output", is_output);
2215 port_prop.
put(
"is_rpc", is_rpc);
2216 port_prop.
put(
"type", getType().getName());
2218 for (
auto* unit : m_units) {
2219 if ((unit !=
nullptr) && !unit->
isFinished()) {
2222 if (key == coreName) {
2226 int tos = getTypeOfService(unit);
2227 int tid =
static_cast<int>(unit->
getTid());
2231 sched_prop.
put(
"tid", tid);
2232 sched_prop.
put(
"priority", priority);
2233 sched_prop.
put(
"policy", policy);
2237 qos_prop.
put(
"tos", tos);
2245 std::string msg =
"cannot find any connection to/from ";
2255 releaseProperties(p);
2259 auto handleAdminPropSetCmd = [
this](
const std::string& key,
2265 Property* p = acquireProperties(
false);
2272 if (!process.isNull()) {
2273 std::string portName = key;
2274 if ((!portName.empty()) && (portName[0] ==
'/')) {
2276 if (portName == getName()) {
2279 if (process_prop !=
nullptr) {
2282 if (process_prop->
check(
"priority")) {
2285 if (process_prop->
check(
"policy")) {
2288 bOk = setProcessSchedulingParam(prio, policy);
2299 if (!sched.isNull()) {
2300 if ((!key.empty()) && (key[0] ==
'/')) {
2302 for (
auto* unit : m_units) {
2303 if ((unit !=
nullptr) && !unit->
isFinished()) {
2307 if (portName == key) {
2309 if (sched_prop !=
nullptr) {
2312 if (sched_prop->
check(
"priority")) {
2315 if (sched_prop->
check(
"policy")) {
2332 if (!qos.isNull()) {
2333 if ((!key.empty()) && (key[0] ==
'/')) {
2335 for (
auto* unit : m_units) {
2336 if ((unit !=
nullptr) && !unit->
isFinished()) {
2339 if (portName == key) {
2341 if (qos_prop !=
nullptr) {
2343 if (qos_prop->
check(
"priority")) {
2368 }
else if (qos_prop->
check(
"dscp")) {
2373 auto dscp_val = qos_prop->
find(
"dscp");
2374 if (dscp_val.isInt32()) {
2378 dscp =
static_cast<int>(dscp_class);
2380 if ((dscp >= 0) && (dscp < 64)) {
2383 }
else if (qos_prop->
check(
"tos")) {
2385 auto tos_val = qos_prop->
find(
"tos");
2386 if (tos_val.isInt32()) {
2391 bOk = setTypeOfService(unit, tos);
2403 releaseProperties(p);
2410 auto handleAdminRosPublisherUpdateCmd = [
this](
const std::string& topic,
Bottle* pubs) {
2419 if (pubs !=
nullptr) {
2421 for (
size_t i = 0; i < pubs->size(); i++) {
2422 std::string pub = pubs->get(i).asString();
2428 std::lock_guard<std::mutex> lock(m_stateMutex);
2429 for (
auto* unit : m_units) {
2430 if ((unit !=
nullptr) && unit->
isPupped()) {
2433 if (!listed.
check(me)) {
2439 for (
size_t i = 0; i < pubs->size(); i++) {
2440 std::string pub = pubs->get(i).asString();
2441 if (!present.
check(pub)) {
2442 yCDebug(PORTCORE,
"ROS ADD %s", pub.c_str());
2452 yCDebug(PORTCORE,
"Sending [%s] to %s", req.
toString().c_str(), pub.c_str());
2454 if (!
__pc_rpc(c,
"xmlrpc", req, reply,
false)) {
2455 fprintf(stderr,
"Cannot connect to ROS subscriber %s\n", pub.c_str());
2457 __pc_rpc(c,
"xmlrpc", req, reply,
true);
2461 std::string hostname;
2462 std::string carrier;
2465 fprintf(stderr,
"Failure looking up topic %s: %s\n", topic.c_str(), reply.
toString().c_str());
2466 }
else if (pref ==
nullptr) {
2467 fprintf(stderr,
"Failure looking up topic %s: expected list of protocols\n", topic.c_str());
2469 fprintf(stderr,
"Failure looking up topic %s: unsupported protocol %s\n", topic.c_str(), pref->
get(0).
asString().c_str());
2475 carrier =
"tcpros+role.pub+topic.";
2477 yCDebug(PORTCORE,
"topic %s available at %s:%d", topic.c_str(), hostname.c_str(), portnum);
2480 Contact addr(hostname, portnum);
2484 if (op ==
nullptr) {
2485 fprintf(stderr,
"NO CONNECTION\n");
2497 std::lock_guard<std::mutex> lock(m_stateMutex);
2502 yCAssert(PORTCORE, unit !=
nullptr);
2505 m_units.push_back(unit);
2517 auto handleAdminRosRequestTopicCmd = [
this]() {
2531 auto handleAdminRosGetPidCmd = []() {
2540 auto handleAdminRosGetBusInfoCmd = []() {
2550 auto handleAdminUnknownCmd = [
this](
const Bottle& cmd) {
2553 if (m_adminReader !=
nullptr) {
2557 ok = m_adminReader->read(con.
getReader());
2565 result.
addString(
"send [help] for list of valid commands");
2570 const PortCoreCommand command = parseCommand(cmd.
get(0));
2572 case PortCoreCommand::Help:
2573 result = handleAdminHelpCmd();
2575 case PortCoreCommand::Ver:
2576 result = handleAdminVerCmd();
2578 case PortCoreCommand::Pray:
2579 result = handleAdminPrayCmd();
2581 case PortCoreCommand::Add: {
2584 result = handleAdminAddCmd(std::move(output), carrier);
2586 case PortCoreCommand::Del: {
2588 result = handleAdminDelCmd(dest);
2590 case PortCoreCommand::Atch: {
2591 const PortCoreConnectionDirection direction = parseConnectionDirection(cmd.
get(1).
asVocab32());
2593 result = handleAdminAtchCmd(direction, std::move(prop));
2595 case PortCoreCommand::Dtch: {
2596 const PortCoreConnectionDirection direction = parseConnectionDirection(cmd.
get(1).
asVocab32());
2597 result = handleAdminDtchCmd(direction);
2599 case PortCoreCommand::List: {
2600 const PortCoreConnectionDirection direction = parseConnectionDirection(cmd.
get(1).
asVocab32(),
true);
2602 result = handleAdminListCmd(direction, target);
2604 case PortCoreCommand::Set: {
2605 const PortCoreConnectionDirection direction = parseConnectionDirection(cmd.
get(1).
asVocab32(),
true);
2609 switch (direction) {
2610 case PortCoreConnectionDirection::In:
2611 result = handleAdminSetInCmd(target, property);
2613 case PortCoreConnectionDirection::Out:
2614 result = handleAdminSetOutCmd(target, property);
2616 case PortCoreConnectionDirection::Error:
2621 case PortCoreCommand::Get: {
2622 const PortCoreConnectionDirection direction = parseConnectionDirection(cmd.
get(1).
asVocab32(),
true);
2624 switch (direction) {
2625 case PortCoreConnectionDirection::In:
2626 result = handleAdminGetInCmd(target);
2628 case PortCoreConnectionDirection::Out:
2629 result = handleAdminGetOutCmd(target);
2631 case PortCoreConnectionDirection::Error:
2636 case PortCoreCommand::Prop: {
2637 PortCorePropertyAction action = parsePropertyAction(cmd.
get(1).
asVocab32());
2641 case PortCorePropertyAction::Get:
2642 result = handleAdminPropGetCmd(key);
2644 case PortCorePropertyAction::Set: {
2649 result = handleAdminPropSetCmd(key, value, process, sched, qos);
2651 case PortCorePropertyAction::Error:
2653 result.
addString(
"property action not known");
2657 case PortCoreCommand::RosPublisherUpdate: {
2662 result = handleAdminRosPublisherUpdateCmd(topic, pubs);
2665 case PortCoreCommand::RosRequestTopic:
2670 result = handleAdminRosRequestTopicCmd();
2673 case PortCoreCommand::RosGetPid:
2675 result = handleAdminRosGetPidCmd();
2678 case PortCoreCommand::RosGetBusInfo:
2680 result = handleAdminRosGetBusInfoCmd();
2683 case PortCoreCommand::Unknown:
2684 result = handleAdminUnknownCmd(cmd);
2689 if (writer !=
nullptr) {
2690 result.
write(*writer);
2697 bool PortCore::setTypeOfService(
PortCoreUnit* unit,
int tos)
2699 if (unit ==
nullptr) {
2703 yCDebug(PORTCORE,
"Trying to set TOS = %d", tos);
2707 if (outUnit !=
nullptr) {
2709 if (op !=
nullptr) {
2710 yCDebug(PORTCORE,
"Trying to set TOS = %d on output unit", tos);
2713 yCWarning(PORTCORE,
"Setting TOS on output unit failed");
2727 if (inUnit !=
nullptr) {
2730 yCDebug(PORTCORE,
"Trying to set TOS = %d on input unit", tos);
2733 yCWarning(PORTCORE,
"Setting TOS on input unit failed");
2745 if (unit ==
nullptr) {
2751 if (outUnit !=
nullptr) {
2753 if (op !=
nullptr) {
2766 if (inUnit !=
nullptr) {
2777 bool PortCore::attachPortMonitor(
yarp::os::Property& prop,
bool isOutput, std::string& errMsg)
2781 if (portmonitor ==
nullptr) {
2782 errMsg =
"Portmonitor carrier modifier cannot be find or it is not enabled in YARP!";
2787 detachPortMonitor(
true);
2788 prop.
put(
"source", getName());
2789 prop.
put(
"destination",
"");
2790 prop.
put(
"sender_side", 1);
2791 prop.
put(
"receiver_side", 0);
2792 prop.
put(
"carrier",
"");
2793 m_modifier.outputMutex.lock();
2794 m_modifier.outputModifier = portmonitor;
2795 if (!m_modifier.outputModifier->configureFromProperty(prop)) {
2796 m_modifier.releaseOutModifier();
2797 errMsg =
"Failed to configure the portmonitor plug-in";
2798 m_modifier.outputMutex.unlock();
2801 m_modifier.outputMutex.unlock();
2803 detachPortMonitor(
false);
2804 prop.
put(
"source",
"");
2805 prop.
put(
"destination", getName());
2806 prop.
put(
"sender_side", 0);
2807 prop.
put(
"receiver_side", 1);
2808 prop.
put(
"carrier",
"");
2809 m_modifier.inputMutex.lock();
2810 m_modifier.inputModifier = portmonitor;
2811 if (!m_modifier.inputModifier->configureFromProperty(prop)) {
2812 m_modifier.releaseInModifier();
2813 errMsg =
"Failed to configure the portmonitor plug-in";
2814 m_modifier.inputMutex.unlock();
2817 m_modifier.inputMutex.unlock();
2823 bool PortCore::detachPortMonitor(
bool isOutput)
2826 m_modifier.outputMutex.lock();
2827 m_modifier.releaseOutModifier();
2828 m_modifier.outputMutex.unlock();
2830 m_modifier.inputMutex.lock();
2831 m_modifier.releaseInModifier();
2832 m_modifier.inputMutex.unlock();
2839 std::string& errMsg)
2842 m_modifier.outputMutex.lock();
2843 if (m_modifier.outputModifier ==
nullptr) {
2844 errMsg =
"No port modifier is attached to the output";
2845 m_modifier.outputMutex.unlock();
2849 m_modifier.outputMutex.unlock();
2851 m_modifier.inputMutex.lock();
2852 if (m_modifier.inputModifier ==
nullptr) {
2853 errMsg =
"No port modifier is attached to the input";
2854 m_modifier.inputMutex.unlock();
2857 m_modifier.inputModifier->setCarrierParams(param);
2858 m_modifier.inputMutex.unlock();
2865 std::string& errMsg)
2868 m_modifier.outputMutex.lock();
2869 if (m_modifier.outputModifier ==
nullptr) {
2870 errMsg =
"No port modifier is attached to the output";
2871 m_modifier.outputMutex.unlock();
2874 m_modifier.outputModifier->getCarrierParams(param);
2875 m_modifier.outputMutex.unlock();
2877 m_modifier.inputMutex.lock();
2878 if (m_modifier.inputModifier ==
nullptr) {
2879 errMsg =
"No port modifier is attached to the input";
2880 m_modifier.inputMutex.unlock();
2883 m_modifier.inputModifier->getCarrierParams(param);
2884 m_modifier.inputMutex.unlock();
2892 if (unit !=
nullptr) {
2893 bool isLog = (!unit->
getMode().empty());
2900 bool PortCore::setProcessSchedulingParam(
int priority,
int policy)
2902 #if defined(__linux__)
2904 struct sched_param sch_param;
2905 sch_param.__sched_priority = priority;
2908 char path[PATH_MAX];
2911 dir = opendir(path);
2912 if (dir ==
nullptr) {
2920 while ((d = readdir(dir)) !=
nullptr) {
2921 if (isdigit(
static_cast<unsigned char>(*d->d_name)) == 0) {
2925 tid = strtol(d->d_name, &end, 10);
2926 if (d->d_name == end || ((end !=
nullptr) && (*end != 0))) {
2930 ret &= (sched_setscheduler(
static_cast<pid_t
>(tid), policy, &sch_param) == 0);
2934 #elif defined(YARP_HAS_ACE)
2936 ACE_Sched_Params param(policy, (ACE_Sched_Priority)priority, ACE_SCOPE_PROCESS);
2946 m_stateMutex.lock();
2948 if (m_prop ==
nullptr) {
2958 m_stateMutex.unlock();
2963 return removeUnit(route, synch);
2976 int PortCore::getNextIndex()
2978 int result = m_counter;
2980 if (m_counter < 0) {
2993 m_address.setName(str);
2998 return m_readableCreator;
3003 m_controlRegistration = flag;
3008 return m_listening.load();
3018 return m_interrupted;
3023 m_timeout = timeout;
3026 #ifndef YARP_NO_DEPRECATED
3031 removeCallbackLock();
3032 if (mutex !=
nullptr) {
3033 m_old_mutex = mutex;
3034 m_mutexOwned =
false;
3037 m_mutexOwned =
true;
3046 removeCallbackLock();
3047 if (mutex !=
nullptr) {
3049 m_mutexOwned =
false;
3051 m_mutex =
new std::mutex;
3052 m_mutexOwned =
true;
3059 if (m_mutexOwned && (m_mutex !=
nullptr)) {
3063 #ifndef YARP_NO_DEPRECATED
3064 m_old_mutex =
nullptr;
3066 m_mutexOwned =
false;
3072 if (m_mutex ==
nullptr) {
3073 #ifndef YARP_NO_DEPRECATED
3074 if (m_old_mutex ==
nullptr) {
3077 m_old_mutex->lock();
3089 if (m_mutex ==
nullptr) {
3090 #ifndef YARP_NO_DEPRECATED
3091 if (m_old_mutex ==
nullptr) {
3094 return m_old_mutex->try_lock();
3099 return m_mutex->try_lock();
3104 if (m_mutex ==
nullptr) {
3105 #ifndef YARP_NO_DEPRECATED
3106 if (m_old_mutex ==
nullptr) {
3109 return m_old_mutex->unlock();
3125 if (!m_checkedType) {
3126 if (!m_type.isValid()) {
3129 m_checkedType =
true;
3131 m_typeMutex.unlock();
3138 m_typeMutex.unlock();
3146 m_typeMutex.unlock();
static bool __tcp_check(const Contact &c)
static bool __pc_rpc(const Contact &c, const char *carrier, Bottle &writer, Bottle &reader, bool verbose)
#define PORTCORE_IS_INPUT
#define PORTCORE_SEND_LOG
#define PORTCORE_IS_OUTPUT
#define PORTCORE_SEND_NORMAL
static bool rpc(const Contact &c, const char *carrier, Bottle &writer, Bottle &reader)
A simple collection of objects that can be described and transmitted in a portable way.
void add(const Value &value)
Add a Value to the bottle, at the end of the list.
void addVocab32(yarp::conf::vocab32_t x)
Places a vocabulary item in the bottle, at the end of the list.
void fromString(const std::string &text)
Initializes bottle from a string.
Property & addDict()
Places an empty key/value object in the bottle, at the end of the list.
Bottle & addList()
Places an empty nested list in the bottle, at the end of the list.
size_type size() const
Gets the number of elements in the bottle.
bool read(ConnectionReader &reader) override
Set the bottle's value based on input from a network connection.
Value & get(size_type index) const
Reads a Value v from a certain part of the list.
Bottle & findGroup(const std::string &key) const override
Gets a list corresponding to a given keyword.
bool check(const std::string &key) const override
Check if there exists a property of the given name.
bool write(ConnectionWriter &writer) const override
Output a representation of the bottle to a network connection.
void addInt32(std::int32_t x)
Places a 32-bit integer in the bottle, at the end of the list.
void addString(const char *str)
Places a string in the bottle, at the end of the list.
std::string toString() const override
Gives a human-readable textual representation of the bottle.
Value & find(const std::string &key) const override
Gets a value corresponding to a given keyword.
A base class for connection types (tcp, mcast, shmem, ...) which are called carriers in YARP.
bool isConnectionless() const override=0
Check if this carrier is connectionless (like udp, mcast) or connection based (like tcp).
bool isPush() const override
Check if carrier is "push" or "pull" style.
void setCarrierParams(const Property ¶ms) override
Configure carrier from port administrative commands.
static Face * listen(const Contact &address)
Create a "proto-carrier" interface object that waits for incoming connections prior to a carrier bein...
static Carrier * getCarrierTemplate(const std::string &name)
Get template for carrier.
static Carrier * chooseCarrier(const std::string &name)
Select a carrier by name.
static OutputProtocol * connect(const Contact &address)
Initiate a connection to an address.
An interface for reading from a network connection.
virtual void requestDrop()=0
Tag the connection to be dropped after the current message.
virtual ConnectionWriter * getWriter()=0
Gets a way to reply to the message, if possible.
An interface for writing to a network connection.
virtual bool isPush() const =0
Check if carrier is "push" or "pull" style.
A dummy connection to test yarp::os::Portable implementations.
ConnectionWriter & getWriter()
Get the dummy ConnectionWriter loaded with whatever was written the ConnectionWriter since it was las...
ConnectionReader & getReader(ConnectionWriter *replyWriter=nullptr)
Get the dummy ConnectionReader loaded with whatever was written the ConnectionWriter since it was las...
Basic wrapper for mutual exclusion.
Simple abstraction for a YARP port name.
Contact toAddress() const
Create an address from the name.
std::string getCarrierModifier(const char *mod, bool *hasModifier=nullptr)
static bool initialized()
Returns true if YARP has been fully initialized.
static bool getLocalMode()
Get current value of flag "localMode", see setLocalMode function.
static Contact unregisterName(const std::string &name)
Removes the registration for a name from the name server.
static NameStore * getQueryBypass()
static Contact queryName(const std::string &name)
Find out information about a registered name.
static int disconnectInput(const std::string &src, const std::string &dest, bool silent=false)
Sends a disconnection command to the specified port.
static bool writeToNameServer(PortWriter &cmd, PortReader &reply, const ContactStyle &style)
Variant write method specialized to name server.
static bool disconnect(const std::string &src, const std::string &dest, bool quiet)
Request that an output port disconnect from an input port.
static bool write(const Contact &contact, PortWriter &cmd, PortReader &reply, bool admin=false, bool quiet=false, double timeout=-1)
Send a single command to a port and await a single response.
The output side of an active connection between two ports.
virtual const Route & getRoute() const =0
virtual Connection & getConnection()=0
Get the connection whose protocol operations we are managing.
virtual bool open(const Route &route)=0
Start negotiating a carrier, using the given route (this should generally match the name of the sendi...
virtual void attachPort(Contactable *port)=0
Set the port to be associated with the connection.
virtual OutputStream & getOutputStream()=0
Access the output stream associated with the connection.
virtual InputProtocol & getInput()=0
Get an interface for doing read operations on the connection.
virtual void close()=0
Negotiate an end to operations.
virtual void rename(const Route &route)=0
Relabel the route after the fact (e.g.
virtual bool setTimeout(double timeout)=0
Set the timeout to be used for network operations.
virtual bool isOk() const =0
Check if the connection is valid and can be used.
virtual bool write(SizedWriter &writer)=0
Write a message on the connection.
Simple specification of the minimum functions needed from output streams.
virtual int getTypeOfService()
virtual bool setTypeOfService(int tos)
Information about a port connection or event.
std::string targetName
Name of connection target, if any.
std::string carrierName
Name of protocol type, if releveant.
bool incoming
True if a connection is incoming, false if outgoing.
std::string message
A human-readable description of contents.
std::string portName
Name of port.
int tag
Type of information.
std::string sourceName
Name of connection source, if any.
@ PORTINFO_CONNECTION
Information about an incoming or outgoing connection.
@ PORTINFO_MISC
Unspecified information.
Interface implemented by all objects that can read themselves from the network, such as Bottle object...
virtual bool read(ConnectionReader &reader)=0
Read this object from a network connection.
virtual Type getReadType() const
A base class for objects that want information about port status changes.
virtual void report(const PortInfo &info)=0
Callback for port event/state information.
Interface implemented by all objects that can write themselves to the network, such as Bottle objects...
virtual bool write(ConnectionWriter &writer) const =0
Write this object to a network connection.
virtual void onCommencement() const
This is called when the port is about to begin writing operations.
A class for storing options and configuration information.
Value & find(const std::string &key) const override
Gets a value corresponding to a given keyword.
std::string toString() const override
Return a standard text representation of the content of the object.
void fromString(const std::string &txt, bool wipe=true)
Interprets a string as a list of properties.
void put(const std::string &key, const std::string &value)
Associate the given key with the given string.
bool check(const std::string &key) const override
Check if there exists a property of the given name.
PacketPriorityDSCP
The PacketPriorityDSCP defines the packets quality of service (priority) using DSCP.
static PacketPriorityDSCP getDSCPByVocab(yarp::conf::vocab32_t vocab)
returns the IPV4/6 DSCP value given as DSCP code
static std::string fromRosName(const std::string &name)
Information about a connection between two ports.
const std::string & getToName() const
Get the destination of the route.
const std::string & getCarrierName() const
Get the carrier type of the route.
std::string toString() const
Render a text form of the route, "source->carrier->dest".
const std::string & getFromName() const
Get the source of the route.
void swapNames()
Swap from and to names.
void setToContact(const Contact &toContact)
Set the destination contact of the route.
An OutputStream that produces a string.
std::string toString() const
static ProcessInfo getProcessInfo(int pid=0)
gets the operating system process information given by its PID.
static PlatformInfo getPlatformInfo()
getPlatformInfo
A single value (typically within a Bottle).
virtual bool isString() const
Checks if value is a string.
virtual yarp::conf::vocab32_t asVocab32() const
Get vocabulary identifier as an integer.
virtual std::int32_t asInt32() const
Get 32-bit integer value.
virtual Bottle * asList() const
Get list value.
virtual std::string asString() const
Get string value.
A helper for creating cached object descriptions.
bool write(ConnectionWriter &connection) const override
Write this object to a network connection.
virtual void appendLine(const std::string &data)
Send a string along with a carriage-return-line-feed sequence.
A helper for recording entire message/reply transactions.
void init(yarp::os::ConnectionReader *wrappedReader)
Call this to wrap a specific ConnectionReader.
void fini()
Call this when all reading/writing has been done.
Manager for a single output from a port.
A single message, potentially being transmitted on multiple connections.
void setContent(const yarp::os::PortWriter *writable, bool owned=false, const yarp::os::PortWriter *callback=nullptr, bool ownedCallback=false)
Configure the object being sent and where to send notifications.
void inc()
Increment the usage count for this messagae.
void dec()
Decrement the usage count for this messagae.
This manages a single threaded resource related to a single input or output connection.
std::string getPupString() const
void setPupped(const std::string &pupString)
Tag this connection as having been created by a publisherUpdate message to the port's administrative ...
virtual void * send(const yarp::os::PortWriter &writer, yarp::os::PortReader *reader, const yarp::os::PortWriter *callback, void *tracker, const std::string &envelope, bool waitAfter=true, bool waitBefore=true, bool *gotReply=nullptr)
Send a message on the connection.
virtual bool isFinished()
std::string getMode(bool *hasMode=nullptr)
Read the "mode" of the connection - basically, whether it is used for logging or not.
virtual void getCarrierParams(yarp::os::Property ¶ms)
void setDoomed()
Request that this connection be shut down as soon as possible.
virtual void setCarrierParams(const yarp::os::Property ¶ms)
Set arbitrary parameters for this connection.
void notifyCompletion(void *tracker)
Call the right onCompletion() after sending message.
void resetPortName(const std::string &str)
std::string getEnvelope()
void setAdminReadHandler(yarp::os::PortReader &reader)
Set a callback for incoming administrative messages.
int getOutputCount()
Check how many output connections there are.
bool readBlock(ConnectionReader &reader, void *id, yarp::os::OutputStream *os)
Read a block of regular payload data.
void setReportCallback(yarp::os::PortReport *reporter)
Set a callback to be notified of changes in port status.
void run() override
The body of the main thread.
bool start() override
Begin main thread.
void checkType(PortReader &reader)
void resume()
Undo an interrupt()
void setTimeout(float timeout)
yarp::os::PortReaderCreator * getReadCreator()
Get the creator of callbacks.
Property * acquireProperties(bool readOnly)
void interrupt()
Prepare the port to be shut down.
void setEnvelope(const std::string &envelope)
Set some envelope information to pass along with a message without actually being part of the message...
bool removeIO(const Route &route, bool synch=false)
Remove any connection matching the supplied route.
bool isInterrupted() const
void report(const yarp::os::PortInfo &info)
Handle a port event (connection, disconnection, etc) Generate a description of the connections associ...
bool listen(const Contact &address, bool shouldAnnounce=true)
Begin service at a given address.
bool sendHelper(const yarp::os::PortWriter &writer, int mode, yarp::os::PortReader *reader=nullptr, const yarp::os::PortWriter *callback=nullptr)
Send a message with a specific mode (normal or log).
void removeOutput(const std::string &dest, void *id, yarp::os::OutputStream *os)
Remove an output connection.
void setControlRegistration(bool flag)
Normally the port will unregister its name with the name server when shutting down.
bool setCallbackLock(yarp::os::Mutex *mutex)
int getInputCount()
Check how many input connections there are.
bool manualStart(const char *sourceName)
Start up the port, but without a main thread.
void setReadHandler(yarp::os::PortReader &reader)
Set a callback for incoming data.
void reportUnit(PortCoreUnit *unit, bool active)
Called by a connection handler with active=true just after it is fully configured,...
void promiseType(const Type &typ)
void releaseProperties(Property *prop)
int getEventCount()
A diagnostic for testing purposes.
void close() override
Shut down port.
const Contact & getAddress() const
Get the address associated with the port.
void describe(void *id, yarp::os::OutputStream *os)
Produce a text description of the port and its connections.
bool isWriting()
Check if a message is currently being sent.
bool adminBlock(ConnectionReader &reader, void *id)
Read a block of administrative data.
void removeInput(const std::string &src, void *id, yarp::os::OutputStream *os)
Remove an input connection.
bool send(const yarp::os::PortWriter &writer, yarp::os::PortReader *reader=nullptr, const yarp::os::PortWriter *callback=nullptr)
Send a normal message.
void resetReportCallback()
Reset the callback to be notified of changes in port status.
void setReadCreator(yarp::os::PortReaderCreator &creator)
Set a callback for creating callbacks for incoming data.
yarp::os::impl::PortDataModifier & getPortModifier()
void setName(const std::string &name)
Set the name of this port.
bool removeCallbackLock()
bool addOutput(const std::string &dest, void *id, yarp::os::OutputStream *os, bool onlyIfNeeded=false)
Add an output connection to this port.
This is the heart of a yarp port.
Lets Readable objects read from the underlying InputStream associated with the connection between two...
void reset(yarp::os::InputStream &in, TwoWayStream *str, const Route &route, size_t len, bool textMode, bool bareMode=false)
int join(double seconds=-1)
int setPriority(int priority=-1, int policy=-1)
#define yCError(component,...)
#define yCAssert(component, x)
#define yCTrace(component,...)
#define yCWarning(component,...)
#define yCDebug(component,...)
#define YARP_OS_LOG_COMPONENT(name, name_string)
std::string get_string(const std::string &key, bool *found=nullptr)
Read a string from an environment variable.
std::string to_string(IntegerType x)
ContainerT::value_type join(typename ContainerT::const_iterator begin, typename ContainerT::const_iterator end, const typename ContainerT::value_type &separator=", ")
Utility to join the elements in a container to a single string separated by a separator.
The components from which ports and connections are built.
An interface to the operating system, including Port based communication.
int getpid()
Portable wrapper for the getppid() function.
std::int32_t NetInt32
Definition of the NetInt32 type.
constexpr yarp::conf::vocab32_t createVocab32(char a, char b=0, char c=0, char d=0)
Create a vocab from chars.
The main, catch-all namespace for YARP.
The ProcessInfo struct provides the operating system process information.
#define YARP_WARNING_POP
Ends a temporary alteration of the enabled warnings.
#define YARP_WARNING_PUSH
Starts a temporary alteration of the enabled warnings.
#define YARP_DISABLE_DEPRECATED_WARNING
Disable deprecated warnings in the following code.