YARP
Yet Another Robot Platform
PortCore.cpp
Go to the documentation of this file.
1 /*
2  * SPDX-FileCopyrightText: 2006-2021 Istituto Italiano di Tecnologia (IIT)
3  * SPDX-FileCopyrightText: 2006-2010 RobotCub Consortium
4  * SPDX-License-Identifier: BSD-3-Clause
5  */
6 
8 
10 
11 #include <yarp/os/Bottle.h>
12 #include <yarp/os/DummyConnector.h>
13 #include <yarp/os/InputProtocol.h>
14 #include <yarp/os/Name.h>
15 #include <yarp/os/Network.h>
16 #include <yarp/os/PortInfo.h>
17 #include <yarp/os/RosNameSpace.h>
19 #include <yarp/os/SystemInfo.h>
20 #include <yarp/os/Time.h>
28 
29 #include <cstdio>
30 #include <functional>
31 #include <random>
32 #include <regex>
33 #include <vector>
34 
35 #ifdef YARP_HAS_ACE
36 # include <ace/INET_Addr.h>
37 # include <ace/Sched_Params.h>
38 // In one the ACE headers there is a definition of "main" for WIN32
39 # ifdef main
40 # undef main
41 # endif
42 #endif
43 
44 #if defined(__linux__) // used for configuring scheduling properties
45 # include <dirent.h>
46 # include <sys/types.h>
47 # include <unistd.h>
48 #endif
49 
50 
51 using namespace yarp::os::impl;
52 using namespace yarp::os;
53 using namespace yarp;
54 
55 namespace {
56 YARP_OS_LOG_COMPONENT(PORTCORE, "yarp.os.impl.PortCore")
57 } // namespace
58 
59 PortCore::PortCore() = default;
60 
62 {
63  close();
64  removeCallbackLock();
65 }
66 
67 
68 bool PortCore::listen(const Contact& address, bool shouldAnnounce)
69 {
70  yCDebug(PORTCORE, "Starting listening on %s", address.toURI().c_str());
71  // If we're using ACE, we really need to have it initialized before
72  // this point.
73  if (!NetworkBase::initialized()) {
74  yCError(PORTCORE, "YARP not initialized; create a yarp::os::Network object before using ports");
75  return false;
76  }
77 
78  yCTrace(PORTCORE, "listen");
79 
80  {
81  // Critical section
82  std::lock_guard<std::mutex> lock(m_stateMutex);
83 
84  // This method assumes we are not already on the network.
85  // We can assume this because it is not a user-facing class,
86  // and we carefully never call this method again without
87  // calling close().
88  yCAssert(PORTCORE, !m_listening);
89  yCAssert(PORTCORE, !m_running);
90  yCAssert(PORTCORE, !m_closing.load());
91  yCAssert(PORTCORE, !m_finished.load());
92  yCAssert(PORTCORE, m_face == nullptr);
93 
94  // Try to put the port on the network, using the user-supplied
95  // address (which may be incomplete). You can think of
96  // this as getting a server socket.
97  m_address = address;
98  setName(address.getRegName());
99  if (m_timeout > 0) {
100  m_address.setTimeout(m_timeout);
101  }
102  m_face = Carriers::listen(m_address);
103 
104  // We failed, abort.
105  if (m_face == nullptr) {
106  return false;
107  }
108 
109  // Update our address if it was incomplete.
110  if (m_address.getPort() <= 0) {
111  m_address = m_face->getLocalAddress();
112  if (m_address.getRegName() == "...") {
113  m_address.setName(std::string("/") + m_address.getHost() + "_" + yarp::conf::numeric::to_string(m_address.getPort()));
114  setName(m_address.getRegName());
115  }
116  }
117 
118  // Move into listening phase
119  m_listening.store(true);
120  }
121 
122  // Now that we are on the network, we can let the name server know this.
123  if (shouldAnnounce) {
124  if (!(NetworkBase::getLocalMode() && NetworkBase::getQueryBypass() == nullptr)) {
125  std::string portName = address.getRegName();
126  Bottle cmd;
127  Bottle reply;
128  cmd.addString("announce");
129  cmd.addString(portName);
130  ContactStyle style;
131  NetworkBase::writeToNameServer(cmd, reply, style);
132  }
133  }
134 
135  // Success!
136  return true;
137 }
138 
139 
141 {
142  // Don't even try to do this when the port is hot, it'll burn you
143  yCAssert(PORTCORE, !m_running.load());
144  yCAssert(PORTCORE, m_reader == nullptr);
145  m_reader = &reader;
146 }
147 
149 {
150  // Don't even try to do this when the port is hot, it'll burn you
151  yCAssert(PORTCORE, !m_running.load());
152  yCAssert(PORTCORE, m_adminReader == nullptr);
153  m_adminReader = &reader;
154 }
155 
157 {
158  // Don't even try to do this when the port is hot, it'll burn you
159  yCAssert(PORTCORE, !m_running.load());
160  yCAssert(PORTCORE, m_readableCreator == nullptr);
161  m_readableCreator = &creator;
162 }
163 
164 
166 {
167  yCTrace(PORTCORE, "run");
168 
169  // This is the server thread for the port. We listen on
170  // the network and handle any incoming connections.
171  // We don't touch those connections, just shove them
172  // in a list and move on. It is important that this
173  // thread doesn't make a connecting client wait just
174  // because some other client is slow.
175 
176  // We assume that listen() has succeeded and that
177  // start() has been called.
178  yCAssert(PORTCORE, m_listening.load());
179  yCAssert(PORTCORE, !m_running.load());
180  yCAssert(PORTCORE, !m_closing.load());
181  yCAssert(PORTCORE, !m_finished.load());
182  yCAssert(PORTCORE, m_starting.load());
183 
184  // Enter running phase
185  {
186  // Critical section
187  std::lock_guard<std::mutex> lock(m_stateMutex);
188  m_running.store(true);
189  m_starting.store(false);
190  }
191 
192  // Notify the start() thread that the run() thread is running
193  m_stateCv.notify_one();
194 
195  yCTrace(PORTCORE, "run running");
196 
197  // Enter main loop, where we block on incoming connections.
198  // The loop is exited when PortCore#closing is set. One last
199  // connection will be needed to de-block this thread and ensure
200  // that it checks PortCore#closing.
201  bool shouldStop = false;
202  while (!shouldStop) {
203 
204  // Block and wait for a connection
205  InputProtocol* ip = m_face->read();
206 
207  {
208  // Critical section
209  std::lock_guard<std::mutex> lock(m_stateMutex);
210 
211  // Attach the connection to this port and update its timeout setting
212  if (ip != nullptr) {
213  ip->attachPort(m_contactable);
214  yCDebug(PORTCORE, "received something");
215  if (m_timeout > 0) {
216  ip->setTimeout(m_timeout);
217  }
218  }
219 
220  // Check whether we should shut down
221  shouldStop |= m_closing.load();
222 
223  // Increment a global count of connection events
224  m_events++;
225  }
226 
227  // It we are not shutting down, spin off the connection.
228  // It starts life as an input connection (although it
229  // may later morph into an output).
230  if (!shouldStop) {
231  if (ip != nullptr) {
232  addInput(ip);
233  }
234  yCDebug(PORTCORE, "spun off a connection");
235  ip = nullptr;
236  }
237 
238  // If the connection wasn't spun off, just shut it down.
239  if (ip != nullptr) {
240  ip->close();
241  delete ip;
242  ip = nullptr;
243  }
244 
245  // Remove any defunct connections.
246  reapUnits();
247 
248  // Notify anyone listening for connection changes.
249  std::lock_guard<std::mutex> lock(m_stateMutex);
250  m_connectionListeners = 0;
251  m_connectionChangeCv.notify_all();
252  }
253 
254  yCTrace(PORTCORE, "run closing");
255 
256  // The server thread is shutting down.
257  std::lock_guard<std::mutex> lock(m_stateMutex);
258  m_connectionListeners = 0;
259  m_connectionChangeCv.notify_all();
260  m_finished.store(true);
261 }
262 
263 
265 {
266  closeMain();
267 
268  if (m_prop != nullptr) {
269  delete m_prop;
270  m_prop = nullptr;
271  }
272  m_modifier.releaseOutModifier();
273  m_modifier.releaseInModifier();
274 }
275 
276 
278 {
279  yCTrace(PORTCORE, "start");
280 
281  // This wait will, on success, be matched by a post in run()
282  std::unique_lock<std::mutex> lock(m_stateMutex);
283 
284  // We assume that listen() has been called.
285  yCAssert(PORTCORE, m_listening.load());
286  yCAssert(PORTCORE, !m_running.load());
287  yCAssert(PORTCORE, !m_starting.load());
288  yCAssert(PORTCORE, !m_finished.load());
289  yCAssert(PORTCORE, !m_closing.load());
290 
291  m_starting.store(true);
292 
293  // Start the server thread.
294  bool started = ThreadImpl::start();
295  if (!started) {
296  // run() won't be happening
297  yAssert(false);
298 
299  } else {
300  // Wait for the signal from the run thread before returning.
301  m_stateCv.wait(lock, [&]{ return m_running.load(); });
302  yCAssert(PORTCORE, m_running.load());
303  }
304  return started;
305 }
306 
307 
308 bool PortCore::manualStart(const char* sourceName)
309 {
310  yCTrace(PORTCORE, "manualStart");
311 
312  // This variant of start() does not create a server thread.
313  // That is appropriate if we never expect to receive incoming
314  // connections for any reason. No incoming data, no requests
315  // for state information, no requests to change connections,
316  // nothing. We set the port's name to something fake, and
317  // act like nothing is wrong.
318  m_interruptable = false;
319  m_manual = true;
320  setName(sourceName);
321  return true;
322 }
323 
324 
326 {
327  // We are no longer interrupted.
328  m_interrupted = false;
329 }
330 
332 {
333  yCTrace(PORTCORE, "interrupt");
334 
335  // This is a no-op if there is no server thread.
336  if (!m_listening.load()) {
337  return;
338  }
339 
340  // Ignore any future incoming data
341  m_interrupted = true;
342 
343  // What about data that is already coming in?
344  // If interruptable is not currently set, no worries, the user
345  // did not or will not end up blocked on a read.
346  if (!m_interruptable) {
347  return;
348  }
349 
350  // Since interruptable is set, it is possible that the user
351  // may be blocked on a read. We send an empty message,
352  // which is reserved for giving blocking readers a chance to
353  // update their state.
354  {
355  // Critical section
356  std::lock_guard<std::mutex> lock(m_stateMutex);
357  if (m_reader != nullptr) {
358  yCDebug(PORTCORE, "sending update-state message to listener");
360  lockCallback();
361  m_reader->read(sbr);
362  unlockCallback();
363  }
364  }
365 }
366 
367 
368 void PortCore::closeMain()
369 {
370  yCTrace(PORTCORE, "closeMain");
371 
372  {
373  // Critical section
374  std::lock_guard<std::mutex> lock(m_stateMutex);
375 
376  // We may not have anything to do.
377  if (m_finishing || !(m_running.load() || m_manual)) {
378  yCTrace(PORTCORE, "closeMain - nothing to do");
379  return;
380  }
381 
382  yCTrace(PORTCORE, "closeMain - Central");
383 
384  // Move into official "finishing" phase.
385  m_finishing = true;
386  yCDebug(PORTCORE, "now preparing to shut down port");
387  }
388 
389  // Start disconnecting inputs. We ask the other side of the
390  // connection to do this, so it won't come as a surprise.
391  // The details of how disconnection works vary by carrier.
392  // While we are doing this, the server thread may be still running.
393  // This is because other ports may need to talk to the server
394  // to organize details of how a connection should be broken down.
395  bool done = false;
396  std::string prevName;
397  while (!done) {
398  done = true;
399  std::string removeName;
400  {
401  // Critical section
402  std::lock_guard<std::mutex> lock(m_stateMutex);
403  for (auto* unit : m_units) {
404  if ((unit != nullptr) && unit->isInput() && !unit->isDoomed()) {
405  Route r = unit->getRoute();
406  std::string s = r.getFromName();
407  if (s.length() >= 1 && s[0] == '/' && s != getName() && s != prevName) {
408  removeName = s;
409  done = false;
410  break;
411  }
412  }
413  }
414  }
415  if (!done) {
416  yCDebug(PORTCORE, "requesting removal of connection from %s", removeName.c_str());
417  bool result = NetworkBase::disconnect(removeName,
418  getName(),
419  true);
420  if (!result) {
422  removeName,
423  true);
424  }
425  prevName = removeName;
426  }
427  }
428 
429  // Start disconnecting outputs. We don't negotiate with the
430  // other side, we just break them down.
431  done = false;
432  while (!done) {
433  done = true;
434  Route removeRoute;
435  {
436  // Critical section
437  std::lock_guard<std::mutex> lock(m_stateMutex);
438  for (auto* unit : m_units) {
439  if ((unit != nullptr) && unit->isOutput() && !unit->isFinished()) {
440  removeRoute = unit->getRoute();
441  if (removeRoute.getFromName() == getName()) {
442  done = false;
443  break;
444  }
445  }
446  }
447  }
448  if (!done) {
449  removeUnit(removeRoute, true);
450  }
451  }
452 
453  bool stopRunning = m_running.load();
454 
455  // If the server thread is still running, we need to bring it down.
456  if (stopRunning) {
457  // Let the server thread know we no longer need its services.
458  m_closing.store(true);
459 
460  // Wake up the server thread the only way we can, by sending
461  // a message to it. Then join it, it is done.
462  if (!m_manual) {
463  OutputProtocol* op = m_face->write(m_address);
464  if (op != nullptr) {
465  op->close();
466  delete op;
467  }
468  join();
469  }
470 
471  // We should be finished now.
472  yCAssert(PORTCORE, m_finished.load());
473 
474  // Clean up our connection list. We couldn't do this earlier,
475  // because the server thread was running.
476  closeUnits();
477 
478  // Reset some state flags.
479  {
480  // Critical section
481  std::lock_guard<std::mutex> lock(m_stateMutex);
482  m_finished.store(false);
483  m_closing.store(false);
484  m_running.store(false);
485  }
486  }
487 
488  // There should be no other threads at this point and we
489  // can stop listening on the network.
490  if (m_listening.load()) {
491  yCAssert(PORTCORE, m_face != nullptr);
492  m_face->close();
493  delete m_face;
494  m_face = nullptr;
495  m_listening.store(false);
496  }
497 
498  // Check if the client is waiting for input. If so, wake them up
499  // with the bad news. An empty message signifies a request to
500  // check the port state.
501  if (m_reader != nullptr) {
502  yCDebug(PORTCORE, "sending end-of-port message to listener");
504  m_reader->read(sbr);
505  m_reader = nullptr;
506  }
507 
508  // We may need to unregister the port with the name server.
509  if (stopRunning) {
510  std::string name = getName();
511  if (name != std::string("")) {
512  if (m_controlRegistration) {
514  }
515  }
516  }
517 
518  // We are done with the finishing process.
519  m_finishing = false;
520 
521  // We are fresh as a daisy.
522  yCAssert(PORTCORE, !m_listening.load());
523  yCAssert(PORTCORE, !m_running.load());
524  yCAssert(PORTCORE, !m_starting.load());
525  yCAssert(PORTCORE, !m_closing.load());
526  yCAssert(PORTCORE, !m_finished.load());
527  yCAssert(PORTCORE, !m_finishing);
528  yCAssert(PORTCORE, m_face == nullptr);
529 }
530 
531 
533 {
534  // How many times has the server thread spun off a connection.
535  std::lock_guard<std::mutex> lock(m_stateMutex);
536  int ct = m_events;
537  return ct;
538 }
539 
540 
541 void PortCore::closeUnits()
542 {
543  // Empty the PortCore#units list. This is only possible when
544  // the server thread is finished.
545  yCAssert(PORTCORE, m_finished.load());
546 
547  // In the "finished" phase, nobody else touches the units,
548  // so we can go ahead and shut them down and delete them.
549  for (auto& i : m_units) {
550  PortCoreUnit* unit = i;
551  if (unit != nullptr) {
552  yCDebug(PORTCORE, "closing a unit");
553  unit->close();
554  yCDebug(PORTCORE, "joining a unit");
555  unit->join();
556  delete unit;
557  yCDebug(PORTCORE, "deleting a unit");
558  i = nullptr;
559  }
560  }
561 
562  // Get rid of all our nulls. Done!
563  m_units.clear();
564 }
565 
566 void PortCore::reapUnits()
567 {
568  // Connections that should be shut down get tagged as "doomed"
569  // but aren't otherwise touched until it is safe to do so.
570  if (!m_finished.load()) {
571  std::lock_guard<std::mutex> lock(m_stateMutex);
572  for (auto* unit : m_units) {
573  if ((unit != nullptr) && unit->isDoomed() && !unit->isFinished()) {
574  std::string s = unit->getRoute().toString();
575  yCDebug(PORTCORE, "Informing connection %s that it is doomed", s.c_str());
576  unit->close();
577  yCDebug(PORTCORE, "Closed connection %s", s.c_str());
578  unit->join();
579  yCDebug(PORTCORE, "Joined thread of connection %s", s.c_str());
580  }
581  }
582  }
583  cleanUnits();
584 }
585 
586 void PortCore::cleanUnits(bool blocking)
587 {
588  // We will remove any connections that have finished operating from
589  // the PortCore#units list.
590 
591  // Depending on urgency, either wait for a safe time to do this
592  // or skip if unsafe.
593  std::unique_lock<std::mutex> lock(m_stateMutex, std::defer_lock);
594  if (blocking) {
595  lock.lock();
596  } else {
597  bool have_lock = lock.try_lock();
598  if (!have_lock) {
599  return;
600  }
601  }
602  // here we have the lock
603 
604  // We will update our connection counts as a by-product.
605  int updatedInputCount = 0;
606  int updatedOutputCount = 0;
607  int updatedDataOutputCount = 0;
608  yCDebug(PORTCORE, "/ routine check of connections to this port begins");
609  if (!m_finished.load()) {
610 
611  // First, we delete and null out any defunct entries in the list.
612  for (auto& i : m_units) {
613  PortCoreUnit* unit = i;
614  if (unit != nullptr) {
615  yCDebug(PORTCORE, "| checking connection %s %s", unit->getRoute().toString().c_str(), unit->getMode().c_str());
616  if (unit->isFinished()) {
617  std::string con = unit->getRoute().toString();
618  yCDebug(PORTCORE, "| removing connection %s", con.c_str());
619  unit->close();
620  unit->join();
621  delete unit;
622  i = nullptr;
623  yCDebug(PORTCORE, "| removed connection %s", con.c_str());
624  } else {
625  // No work to do except updating connection counts.
626  if (!unit->isDoomed()) {
627  if (unit->isOutput()) {
628  updatedOutputCount++;
629  if (unit->getMode().empty()) {
630  updatedDataOutputCount++;
631  }
632  }
633  if (unit->isInput()) {
634  if (unit->getRoute().getFromName() != "admin") {
635  updatedInputCount++;
636  }
637  }
638  }
639  }
640  }
641  }
642 
643  // Now we do some awkward shuffling (list class may be from ACE
644  // or STL, if ACE it is quite limited). We move the nulls to
645  // the end of the list ...
646  size_t rem = 0;
647  for (size_t i2 = 0; i2 < m_units.size(); i2++) {
648  if (m_units[i2] != nullptr) {
649  if (rem < i2) {
650  m_units[rem] = m_units[i2];
651  m_units[i2] = nullptr;
652  }
653  rem++;
654  }
655  }
656 
657  // ... Now we get rid of the null entries
658  for (size_t i3 = 0; i3 < m_units.size() - rem; i3++) {
659  m_units.pop_back();
660  }
661  }
662 
663  // Finalize the connection counts.
664  m_dataOutputCount = updatedDataOutputCount;
665  lock.unlock();
666 
667  m_packetMutex.lock();
668  m_inputCount = updatedInputCount;
669  m_outputCount = updatedOutputCount;
670  m_packetMutex.unlock();
671  yCDebug(PORTCORE, "\\ routine check of connections to this port ends");
672 }
673 
674 
675 void PortCore::addInput(InputProtocol* ip)
676 {
677  yCTrace(PORTCORE, "addInput");
678 
679  // This method is only called by the server thread in its running phase.
680  // It wraps up a network connection as a unit and adds it to
681  // PortCore#units. The unit will have its own thread associated
682  // with it.
683 
684  yCAssert(PORTCORE, ip != nullptr);
685  std::lock_guard<std::mutex> lock(m_stateMutex);
686  PortCoreUnit* unit = new PortCoreInputUnit(*this,
687  getNextIndex(),
688  ip,
689  false);
690  yCAssert(PORTCORE, unit != nullptr);
691  unit->start();
692  m_units.push_back(unit);
693  yCTrace(PORTCORE, "there are now %zu units", m_units.size());
694 }
695 
696 
698 {
699  yCTrace(PORTCORE, "addOutput");
700 
701  // This method is called from threads associated with input
702  // connections.
703  // It wraps up a network connection as a unit and adds it to
704  // PortCore#units. The unit will start with its own thread
705  // associated with it, but that thread will be very short-lived
706  // if the port is not configured to do background writes.
707 
708  yCAssert(PORTCORE, op != nullptr);
709  if (!m_finished.load()) {
710  std::lock_guard<std::mutex> lock(m_stateMutex);
711  PortCoreUnit* unit = new PortCoreOutputUnit(*this, getNextIndex(), op);
712  yCAssert(PORTCORE, unit != nullptr);
713  unit->start();
714  m_units.push_back(unit);
715  }
716 }
717 
718 
719 bool PortCore::isUnit(const Route& route, int index)
720 {
721  // Check if a connection with a specified route (and optional ID) is present
722  bool needReap = false;
723  if (!m_finished.load()) {
724  for (auto* unit : m_units) {
725  if (unit != nullptr) {
726  Route alt = unit->getRoute();
727  std::string wild = "*";
728  bool ok = true;
729  if (index >= 0) {
730  ok = ok && (unit->getIndex() == index);
731  }
732  if (ok) {
733  if (route.getFromName() != wild) {
734  ok = ok && (route.getFromName() == alt.getFromName());
735  }
736  if (route.getToName() != wild) {
737  ok = ok && (route.getToName() == alt.getToName());
738  }
739  if (route.getCarrierName() != wild) {
740  ok = ok && (route.getCarrierName() == alt.getCarrierName());
741  }
742  }
743  if (ok) {
744  needReap = true;
745  break;
746  }
747  }
748  }
749  }
750  return needReap;
751 }
752 
753 
754 bool PortCore::removeUnit(const Route& route, bool synch, bool* except)
755 {
756  // This is a request to remove a connection. It could arise from any
757  // input thread.
758 
759  if (except != nullptr) {
760  yCDebug(PORTCORE, "asked to remove connection in the way of %s", route.toString().c_str());
761  *except = false;
762  } else {
763  yCDebug(PORTCORE, "asked to remove connection %s", route.toString().c_str());
764  }
765 
766  // Scan for units that match the given route, and collect their IDs.
767  // Mark them as "doomed".
768  std::vector<int> removals;
769 
770  bool needReap = false;
771  if (!m_finished.load()) {
772  std::lock_guard<std::mutex> lock(m_stateMutex);
773  for (auto* unit : m_units) {
774  if (unit != nullptr) {
775  Route alt = unit->getRoute();
776  std::string wild = "*";
777  bool ok = true;
778  if (route.getFromName() != wild) {
779  ok = ok && (route.getFromName() == alt.getFromName());
780  }
781  if (route.getToName() != wild) {
782  ok = ok && (route.getToName() == alt.getToName());
783  }
784  if (route.getCarrierName() != wild) {
785  if (except == nullptr) {
786  ok = ok && (route.getCarrierName() == alt.getCarrierName());
787  } else {
788  if (route.getCarrierName() == alt.getCarrierName()) {
789  *except = true;
790  ok = false;
791  }
792  }
793  }
794 
795  if (ok) {
796  yCDebug(PORTCORE, "removing connection %s", alt.toString().c_str());
797  removals.push_back(unit->getIndex());
798  unit->setDoomed();
799  needReap = true;
800  }
801  }
802  }
803  }
804 
805  // If we find one or more matches, we need to do some work.
806  // We've marked those matches as "doomed" so they'll get cleared
807  // up eventually, but we can speed this up by waking up the
808  // server thread.
809  if (needReap) {
810  yCDebug(PORTCORE, "one or more connections need prodding to die");
811 
812  if (m_manual) {
813  // No server thread, no problems.
814  reapUnits();
815  } else {
816  // Send a blank message to make up server thread.
817  OutputProtocol* op = m_face->write(m_address);
818  if (op != nullptr) {
819  op->close();
820  delete op;
821  }
822  yCDebug(PORTCORE, "sent message to prod connection death");
823 
824  if (synch) {
825  // Wait for connections to be cleaned up.
826  yCDebug(PORTCORE, "synchronizing with connection death");
827  {
828  // Critical section
829  std::unique_lock<std::mutex> lock(m_stateMutex);
830  while (std::any_of(removals.begin(), removals.end(), [&](int removal){ return isUnit(route, removal); })) {
831  m_connectionListeners++;
832  m_connectionChangeCv.wait(lock, [&]{ return m_connectionListeners == 0; });
833  }
834  }
835  }
836  }
837  }
838  return needReap;
839 }
840 
841 
842 bool PortCore::addOutput(const std::string& dest,
843  void* id,
844  OutputStream* os,
845  bool onlyIfNeeded)
846 {
847  YARP_UNUSED(id);
848  yCDebug(PORTCORE, "asked to add output to %s", dest.c_str());
849 
850  // Buffer to store text describing outcome (successful connection,
851  // or a failure).
852  BufferedConnectionWriter bw(true);
853 
854  // Look up the address we'll be connecting to.
855  Contact parts = Name(dest).toAddress();
856  Contact contact = NetworkBase::queryName(parts.getRegName());
857  Contact address = contact;
858 
859  // If we can't find it, say so and abort.
860  if (!address.isValid()) {
861  bw.appendLine(std::string("Do not know how to connect to ") + dest);
862  if (os != nullptr) {
863  bw.write(*os);
864  }
865  return false;
866  }
867 
868  // We clean all existing connections to the desired destination,
869  // optionally stopping if we find one with the right carrier.
870  if (onlyIfNeeded) {
871  // Remove any existing connections between source and destination
872  // with a different carrier. If we find a connection already
873  // present with the correct carrier, then we are done.
874  bool except = false;
875  removeUnit(Route(getName(), address.getRegName(), address.getCarrier()), true, &except);
876  if (except) {
877  // Connection already present.
878  yCDebug(PORTCORE, "output already present to %s", dest.c_str());
879  bw.appendLine(std::string("Desired connection already present from ") + getName() + " to " + dest);
880  if (os != nullptr) {
881  bw.write(*os);
882  }
883  return true;
884  }
885  } else {
886  // Remove any existing connections between source and destination.
887  removeUnit(Route(getName(), address.getRegName(), "*"), true);
888  }
889 
890  // Set up a named route for this connection.
891  std::string aname = address.getRegName();
892  if (aname.empty()) {
893  aname = address.toURI(false);
894  }
895  Route r(getName(),
896  aname,
897  ((!parts.getCarrier().empty()) ? parts.getCarrier() : address.getCarrier()));
898  r.setToContact(contact);
899 
900  // Check for any restrictions on the port. Perhaps it can only
901  // read, or write.
902  bool allowed = true;
903  std::string err;
904  std::string append;
905  unsigned int f = getFlags();
906  bool allow_output = (f & PORTCORE_IS_OUTPUT) != 0;
907  bool rpc = (f & PORTCORE_IS_RPC) != 0;
908  Name name(r.getCarrierName() + std::string("://test"));
909  std::string mode = name.getCarrierModifier("log");
910  bool is_log = (!mode.empty());
911  if (is_log) {
912  if (mode != "in") {
913  err = "Logger configured as log." + mode + ", but only log.in is supported";
914  allowed = false;
915  } else {
916  append = "; " + r.getFromName() + " will forward messages and replies (if any) to " + r.getToName();
917  }
918  }
919  if (!allow_output) {
920  if (!is_log) {
921  bool push = false;
923  if (c != nullptr) {
924  push = c->isPush();
925  }
926  if (push) {
927  err = "Outputs not allowed";
928  allowed = false;
929  }
930  }
931  } else if (rpc) {
932  if (m_dataOutputCount >= 1 && !is_log) {
933  err = "RPC output already connected";
934  allowed = false;
935  }
936  }
937 
938  // If we found a relevant restriction, abort.
939  if (!allowed) {
940  bw.appendLine(err);
941  if (os != nullptr) {
942  bw.write(*os);
943  }
944  return false;
945  }
946 
947  // Ok! We can go ahead and make a connection.
948  OutputProtocol* op = nullptr;
949  if (m_timeout > 0) {
950  address.setTimeout(m_timeout);
951  }
952  op = Carriers::connect(address);
953  if (op != nullptr) {
954  op->attachPort(m_contactable);
955  if (m_timeout > 0) {
956  op->setTimeout(m_timeout);
957  }
958 
959  bool ok = op->open(r);
960  if (!ok) {
961  yCDebug(PORTCORE, "open route error");
962  delete op;
963  op = nullptr;
964  }
965  }
966 
967  // No connection, abort.
968  if (op == nullptr) {
969  bw.appendLine(std::string("Cannot connect to ") + dest);
970  if (os != nullptr) {
971  bw.write(*os);
972  }
973  return false;
974  }
975 
976  // Ok, we have a connection, now add it to PortCore#units
977  if (op->getConnection().isPush()) {
978  // This is the normal case
979  addOutput(op);
980  } else {
981  // This is the case for connections that are initiated
982  // in the opposite direction to native YARP connections.
983  // Native YARP has push connections, initiated by the
984  // sender. HTTP and ROS have pull connections, initiated
985  // by the receiver.
986  // We invert the route, flip the protocol direction, and add.
987  r.swapNames();
988  op->rename(r);
989  InputProtocol* ip = &(op->getInput());
990  if (!m_finished.load()) {
991  std::lock_guard<std::mutex> lock(m_stateMutex);
992  PortCoreUnit* unit = new PortCoreInputUnit(*this,
993  getNextIndex(),
994  ip,
995  true);
996  yCAssert(PORTCORE, unit != nullptr);
997  unit->start();
998  m_units.push_back(unit);
999  }
1000  }
1001 
1002  // Communicated the good news.
1003  bw.appendLine(std::string("Added connection from ") + getName() + " to " + dest + append);
1004  if (os != nullptr) {
1005  bw.write(*os);
1006  }
1007  cleanUnits();
1008  return true;
1009 }
1010 
1011 
1012 void PortCore::removeOutput(const std::string& dest, void* id, OutputStream* os)
1013 {
1014  YARP_UNUSED(id);
1015  yCDebug(PORTCORE, "asked to remove output to %s", dest.c_str());
1016 
1017  // All the real work done by removeUnit().
1018  BufferedConnectionWriter bw(true);
1019  if (removeUnit(Route("*", dest, "*"), true)) {
1020  bw.appendLine(std::string("Removed connection from ") + getName() + " to " + dest);
1021  } else {
1022  bw.appendLine(std::string("Could not find an outgoing connection to ") + dest);
1023  }
1024  if (os != nullptr) {
1025  bw.write(*os);
1026  }
1027  cleanUnits();
1028 }
1029 
1030 void PortCore::removeInput(const std::string& src, void* id, OutputStream* os)
1031 {
1032  YARP_UNUSED(id);
1033  yCDebug(PORTCORE, "asked to remove input to %s", src.c_str());
1034 
1035  // All the real work done by removeUnit().
1036  BufferedConnectionWriter bw(true);
1037  if (removeUnit(Route(src, "*", "*"), true)) {
1038  bw.appendLine(std::string("Removing connection from ") + src + " to " + getName());
1039  } else {
1040  bw.appendLine(std::string("Could not find an incoming connection from ") + src);
1041  }
1042  if (os != nullptr) {
1043  bw.write(*os);
1044  }
1045  cleanUnits();
1046 }
1047 
1049 {
1050  YARP_UNUSED(id);
1051  cleanUnits();
1052 
1053  // Buffer to store a human-readable description of the port's
1054  // state.
1055  BufferedConnectionWriter bw(true);
1056 
1057  {
1058  // Critical section
1059  std::lock_guard<std::mutex> lock(m_stateMutex);
1060 
1061  // Report name and address.
1062  bw.appendLine(std::string("This is ") + m_address.getRegName() + " at " + m_address.toURI());
1063 
1064  // Report outgoing connections.
1065  int oct = 0;
1066  for (auto* unit : m_units) {
1067  if ((unit != nullptr) && unit->isOutput() && !unit->isFinished()) {
1068  Route route = unit->getRoute();
1069  std::string msg = "There is an output connection from " + route.getFromName() + " to " + route.getToName() + " using " + route.getCarrierName();
1070  bw.appendLine(msg);
1071  oct++;
1072  }
1073  }
1074  if (oct < 1) {
1075  bw.appendLine("There are no outgoing connections");
1076  }
1077 
1078  // Report incoming connections.
1079  int ict = 0;
1080  for (auto* unit : m_units) {
1081  if ((unit != nullptr) && unit->isInput() && !unit->isFinished()) {
1082  Route route = unit->getRoute();
1083  if (!route.getCarrierName().empty()) {
1084  std::string msg = "There is an input connection from " + route.getFromName() + " to " + route.getToName() + " using " + route.getCarrierName();
1085  bw.appendLine(msg);
1086  ict++;
1087  }
1088  }
1089  }
1090  if (ict < 1) {
1091  bw.appendLine("There are no incoming connections");
1092  }
1093  }
1094 
1095  // Send description across network, or print it.
1096  if (os != nullptr) {
1097  bw.write(*os);
1098  } else {
1099  StringOutputStream sos;
1100  bw.write(sos);
1101  printf("%s\n", sos.toString().c_str());
1102  }
1103 }
1104 
1105 
1107 {
1108  cleanUnits();
1109 
1110  std::lock_guard<std::mutex> lock(m_stateMutex);
1111 
1112  // Report name and address of port.
1113  PortInfo baseInfo;
1115  std::string portName = m_address.getRegName();
1116  baseInfo.message = std::string("This is ") + portName + " at " + m_address.toURI();
1117  reporter.report(baseInfo);
1118 
1119  // Report outgoing connections.
1120  int oct = 0;
1121  for (auto* unit : m_units) {
1122  if ((unit != nullptr) && unit->isOutput() && !unit->isFinished()) {
1123  Route route = unit->getRoute();
1124  std::string msg = "There is an output connection from " + route.getFromName() + " to " + route.getToName() + " using " + route.getCarrierName();
1125  PortInfo info;
1126  info.message = msg;
1128  info.incoming = false;
1129  info.portName = portName;
1130  info.sourceName = route.getFromName();
1131  info.targetName = route.getToName();
1132  info.carrierName = route.getCarrierName();
1133  reporter.report(info);
1134  oct++;
1135  }
1136  }
1137  if (oct < 1) {
1138  PortInfo info;
1140  info.message = "There are no outgoing connections";
1141  reporter.report(info);
1142  }
1143 
1144  // Report incoming connections.
1145  int ict = 0;
1146  for (auto* unit : m_units) {
1147  if ((unit != nullptr) && unit->isInput() && !unit->isFinished()) {
1148  Route route = unit->getRoute();
1149  std::string msg = "There is an input connection from " + route.getFromName() + " to " + route.getToName() + " using " + route.getCarrierName();
1150  PortInfo info;
1151  info.message = msg;
1153  info.incoming = true;
1154  info.portName = portName;
1155  info.sourceName = route.getFromName();
1156  info.targetName = route.getToName();
1157  info.carrierName = route.getCarrierName();
1158  reporter.report(info);
1159  ict++;
1160  }
1161  }
1162  if (ict < 1) {
1163  PortInfo info;
1165  info.message = "There are no incoming connections";
1166  reporter.report(info);
1167  }
1168 }
1169 
1170 
1172 {
1173  std::lock_guard<std::mutex> lock(m_stateMutex);
1174  if (reporter != nullptr) {
1175  m_eventReporter = reporter;
1176  }
1177 }
1178 
1180 {
1181  std::lock_guard<std::mutex> lock(m_stateMutex);
1182  m_eventReporter = nullptr;
1183 }
1184 
1185 void PortCore::report(const PortInfo& info)
1186 {
1187  // We are in the context of one of the input or output threads,
1188  // so our contact with the PortCore must be absolutely minimal.
1189  //
1190  // It is safe to pick up the address of the reporter if this is
1191  // kept constant over the lifetime of the input/output threads.
1192 
1193  if (m_eventReporter != nullptr) {
1194  m_eventReporter->report(info);
1195  }
1196 }
1197 
1198 
1200 {
1201  YARP_UNUSED(id);
1202  YARP_UNUSED(os);
1203  bool result = true;
1204 
1205  // We are in the context of one of the input threads,
1206  // so our contact with the PortCore must be absolutely minimal.
1207  //
1208  // It is safe to pick up the address of the reader since this is
1209  // constant over the lifetime of the input threads.
1210 
1211  if (m_reader != nullptr && !m_interrupted) {
1212  m_interruptable = false; // No mutexing; user of interrupt() has to be careful.
1213 
1214  bool haveOutputs = (m_outputCount != 0); // No mutexing, but failure modes are benign.
1215 
1216  if (m_logNeeded && haveOutputs) {
1217  // Normally, yarp doesn't pay attention to the content of
1218  // messages received by the client. Likewise, the content
1219  // of replies are not monitored. However it may sometimes
1220  // be useful to log this traffic.
1221 
1222  ConnectionRecorder recorder;
1223  recorder.init(&reader);
1224  lockCallback();
1225  result = m_reader->read(recorder);
1226  unlockCallback();
1227  recorder.fini();
1228  // send off a log of this transaction to whoever wants it
1229  sendHelper(recorder, PORTCORE_SEND_LOG);
1230  } else {
1231  // YARP is not needed as a middleman
1232  lockCallback();
1233  result = m_reader->read(reader);
1234  unlockCallback();
1235  }
1236 
1237  m_interruptable = true;
1238  } else {
1239  // Read and ignore message, there is no where to send it.
1240  yCDebug(PORTCORE, "data received, no reader for it");
1241  Bottle b;
1242  result = b.read(reader);
1243  }
1244  return result;
1245 }
1246 
1247 
1248 bool PortCore::send(const PortWriter& writer,
1249  PortReader* reader,
1250  const PortWriter* callback)
1251 {
1252  // check if there is any modifier
1253  // we need to protect this part while the modifier
1254  // plugin is loading or unloading!
1255  m_modifier.outputMutex.lock();
1256  if (m_modifier.outputModifier != nullptr) {
1257  if (!m_modifier.outputModifier->acceptOutgoingData(writer)) {
1258  m_modifier.outputMutex.unlock();
1259  return false;
1260  }
1261  m_modifier.outputModifier->modifyOutgoingData(writer);
1262  }
1263  m_modifier.outputMutex.unlock();
1264  if (!m_logNeeded) {
1265  return sendHelper(writer, PORTCORE_SEND_NORMAL, reader, callback);
1266  }
1267  // logging is desired, so we need to wrap up and log this send
1268  // (and any reply it gets) -- TODO not yet supported
1269  return sendHelper(writer, PORTCORE_SEND_NORMAL, reader, callback);
1270 }
1271 
1273  int mode,
1274  PortReader* reader,
1275  const PortWriter* callback)
1276 {
1277  if (m_interrupted || m_finishing) {
1278  return false;
1279  }
1280 
1281  bool all_ok = true;
1282  bool gotReply = false;
1283  int logCount = 0;
1284  std::string envelopeString = m_envelope;
1285 
1286  // Pass a message to all output units for sending on. We could
1287  // be doing more here to cache the serialization of the message
1288  // and reuse it across output connections. However, one key
1289  // optimization is present: external blocks written by
1290  // yarp::os::ConnectionWriter::appendExternalBlock are never
1291  // copied. So for example the core image array in a yarp::sig::Image
1292  // is untouched by the port communications code.
1293 
1294  yCTrace(PORTCORE, "------- send in real");
1295 
1296  // Give user the chance to know that this object is about to be
1297  // written.
1298  writer.onCommencement();
1299 
1300  // All user-facing parts of this port will be blocked on this
1301  // operation, so we'll want to be snappy. How long the
1302  // operation lasts will depend on these flags:
1303  // * waitAfterSend
1304  // * waitBeforeSend
1305  // set by setWaitAfterSend() and setWaitBeforeSend().
1306  std::lock_guard<std::mutex> lock(m_stateMutex);
1307 
1308  // If the port is shutting down, abort.
1309  if (m_finished.load()) {
1310  return false;
1311  }
1312 
1313  yCTrace(PORTCORE, "------- send in");
1314  // Prepare a "packet" for tracking a single message which
1315  // may travel by multiple outputs.
1316  m_packetMutex.lock();
1317  PortCorePacket* packet = m_packets.getFreePacket();
1318  yCAssert(PORTCORE, packet != nullptr);
1319  packet->setContent(&writer, false, callback);
1320  m_packetMutex.unlock();
1321 
1322  // Scan connections, placing message everywhere we can.
1323  for (auto* unit : m_units) {
1324  if ((unit != nullptr) && unit->isOutput() && !unit->isFinished()) {
1325  bool log = (!unit->getMode().empty());
1326  if (log) {
1327  // Some connections are for logging only.
1328  logCount++;
1329  }
1330  bool ok = (mode == PORTCORE_SEND_NORMAL) ? (!log) : (log);
1331  if (!ok) {
1332  continue;
1333  }
1334  bool waiter = m_waitAfterSend || (mode == PORTCORE_SEND_LOG);
1335  yCTrace(PORTCORE, "------- -- inc");
1336  m_packetMutex.lock();
1337  packet->inc(); // One more connection carrying message.
1338  m_packetMutex.unlock();
1339  yCTrace(PORTCORE, "------- -- pre-send");
1340  bool gotReplyOne = false;
1341  // Send the message off on this connection.
1342  void* out = unit->send(writer,
1343  reader,
1344  (callback != nullptr) ? callback : (&writer),
1345  reinterpret_cast<void*>(packet),
1346  envelopeString,
1347  waiter,
1348  m_waitBeforeSend,
1349  &gotReplyOne);
1350  gotReply = gotReply || gotReplyOne;
1351  yCTrace(PORTCORE, "------- -- send");
1352  if (out != nullptr) {
1353  // We got back a report of a message already sent.
1354  m_packetMutex.lock();
1355  (static_cast<PortCorePacket*>(out))->dec(); // Message on one fewer connections.
1356  m_packets.checkPacket(static_cast<PortCorePacket*>(out));
1357  m_packetMutex.unlock();
1358  }
1359  if (waiter) {
1360  if (unit->isFinished()) {
1361  all_ok = false;
1362  }
1363  }
1364  yCTrace(PORTCORE, "------- -- dec");
1365  }
1366  }
1367  yCTrace(PORTCORE, "------- pack check");
1368  m_packetMutex.lock();
1369 
1370  // We no longer concern ourselves with the message.
1371  // It may or may not be traveling on some connections.
1372  // But that is not our problem anymore.
1373  packet->dec();
1374 
1375  m_packets.checkPacket(packet);
1376  m_packetMutex.unlock();
1377  yCTrace(PORTCORE, "------- packed");
1378  yCTrace(PORTCORE, "------- send out");
1379  if (mode == PORTCORE_SEND_LOG) {
1380  if (logCount == 0) {
1381  m_logNeeded = false;
1382  }
1383  }
1384 
1385  yCTrace(PORTCORE, "------- send out real");
1386 
1387  if (m_waitAfterSend && reader != nullptr) {
1388  all_ok = all_ok && gotReply;
1389  }
1390 
1391  return all_ok;
1392 }
1393 
1394 
1396 {
1397  bool writing = false;
1398 
1399  // Check if any port is currently writing. TODO optimize
1400  // this query by counting down with notifyCompletion().
1401  if (!m_finished.load()) {
1402  std::lock_guard<std::mutex> lock(m_stateMutex);
1403  for (auto* unit : m_units) {
1404  if ((unit != nullptr) && !unit->isFinished() && unit->isBusy()) {
1405  writing = true;
1406  }
1407  }
1408  }
1409 
1410  return writing;
1411 }
1412 
1413 
1415 {
1416  cleanUnits(false);
1417  m_packetMutex.lock();
1418  int result = m_inputCount;
1419  m_packetMutex.unlock();
1420  return result;
1421 }
1422 
1424 {
1425  cleanUnits(false);
1426  m_packetMutex.lock();
1427  int result = m_outputCount;
1428  m_packetMutex.unlock();
1429  return result;
1430 }
1431 
1432 
1433 void PortCore::notifyCompletion(void* tracker)
1434 {
1435  yCTrace(PORTCORE, "starting notifyCompletion");
1436  m_packetMutex.lock();
1437  if (tracker != nullptr) {
1438  (static_cast<PortCorePacket*>(tracker))->dec();
1439  m_packets.checkPacket(static_cast<PortCorePacket*>(tracker));
1440  }
1441  m_packetMutex.unlock();
1442  yCTrace(PORTCORE, "stopping notifyCompletion");
1443 }
1444 
1445 
1447 {
1448  m_envelopeWriter.restart();
1449  bool ok = envelope.write(m_envelopeWriter);
1450  if (ok) {
1451  setEnvelope(m_envelopeWriter.toString());
1452  }
1453  return ok;
1454 }
1455 
1456 
1457 void PortCore::setEnvelope(const std::string& envelope)
1458 {
1459  m_envelope = envelope;
1460  for (size_t i = 0; i < m_envelope.length(); i++) {
1461  // It looks like envelopes are constrained to be printable ASCII?
1462  // I'm not sure why this would be. TODO check.
1463  if (m_envelope[i] < 32) {
1464  m_envelope = m_envelope.substr(0, i);
1465  break;
1466  }
1467  }
1468  yCDebug(PORTCORE, "set envelope to %s", m_envelope.c_str());
1469 }
1470 
1472 {
1473  return m_envelope;
1474 }
1475 
1477 {
1478  StringInputStream sis;
1479  sis.add(m_envelope);
1480  sis.add("\r\n");
1482  Route route;
1483  sbr.reset(sis, nullptr, route, 0, true);
1484  return envelope.read(sbr);
1485 }
1486 
1487 // Make an RPC connection to talk to a ROS API, send a message, get reply.
1488 // NOTE: ROS support can now be moved out of here, once all documentation
1489 // of older ways to interoperate with it are purged and people stop
1490 // doing it.
1491 static bool __pc_rpc(const Contact& c,
1492  const char* carrier,
1493  Bottle& writer,
1494  Bottle& reader,
1495  bool verbose)
1496 {
1497  ContactStyle style;
1498  style.quiet = !verbose;
1499  style.timeout = 4;
1500  style.carrier = carrier;
1501  bool ok = Network::write(c, writer, reader, style);
1502  return ok;
1503 }
1504 
1505 // ACE is sometimes confused by localhost aliases, in a ROS-incompatible
1506 // way. This method does a quick sanity check if we are using ROS.
1507 static bool __tcp_check(const Contact& c)
1508 {
1509 #ifdef YARP_HAS_ACE
1510  ACE_INET_Addr addr;
1511  int result = addr.set(c.getPort(), c.getHost().c_str());
1512  if (result != 0) {
1513  yCWarning(PORTCORE, "ACE choked on %s:%d\n", c.getHost().c_str(), c.getPort());
1514  }
1515  result = addr.set(c.getPort(), "127.0.0.1");
1516  if (result != 0) {
1517  yCWarning(PORTCORE, "ACE choked on 127.0.0.1:%d\n", c.getPort());
1518  }
1519  result = addr.set(c.getPort(), "127.0.1.1");
1520  if (result != 0) {
1521  yCWarning(PORTCORE, "ACE choked on 127.0.1.1:%d\n", c.getPort());
1522  }
1523 #endif
1524  return true;
1525 }
1526 
1527 namespace {
1528 enum class PortCoreCommand : yarp::conf::vocab32_t
1529 {
1530  Unknown = 0,
1531  Help = yarp::os::createVocab32('h', 'e', 'l', 'p'),
1532  Ver = yarp::os::createVocab32('v', 'e', 'r'),
1533  Pray = yarp::os::createVocab32('p', 'r', 'a', 'y'),
1534  Add = yarp::os::createVocab32('a', 'd', 'd'),
1535  Del = yarp::os::createVocab32('d', 'e', 'l'),
1536  Atch = yarp::os::createVocab32('a', 't', 'c', 'h'),
1537  Dtch = yarp::os::createVocab32('d', 't', 'c', 'h'),
1538  List = yarp::os::createVocab32('l', 'i', 's', 't'),
1539  Set = yarp::os::createVocab32('s', 'e', 't'),
1540  Get = yarp::os::createVocab32('g', 'e', 't'),
1541  Prop = yarp::os::createVocab32('p', 'r', 'o', 'p'),
1542  RosPublisherUpdate = yarp::os::createVocab32('r', 'p', 'u', 'p'),
1543  RosRequestTopic = yarp::os::createVocab32('r', 't', 'o', 'p'),
1544  RosGetPid = yarp::os::createVocab32('p', 'i', 'd'),
1545  RosGetBusInfo = yarp::os::createVocab32('b', 'u', 's'),
1546 };
1547 
1548 enum class PortCoreConnectionDirection : yarp::conf::vocab32_t
1549 {
1550  Error = 0,
1551  Out = yarp::os::createVocab32('o', 'u', 't'),
1552  In = yarp::os::createVocab32('i', 'n'),
1553 };
1554 
1555 enum class PortCorePropertyAction : yarp::conf::vocab32_t
1556 {
1557  Error = 0,
1558  Get = yarp::os::createVocab32('g', 'e', 't'),
1559  Set = yarp::os::createVocab32('s', 'e', 't')
1560 };
1561 
1562 PortCoreCommand parseCommand(const yarp::os::Value& v)
1563 {
1564  if (v.isString()) {
1565  // We support ROS client API these days. Here we recode some long ROS
1566  // command names, just for convenience.
1567  std::string cmd = v.asString();
1568  if (cmd == "publisherUpdate") {
1569  return PortCoreCommand::RosPublisherUpdate;
1570  }
1571  if (cmd == "requestTopic") {
1572  return PortCoreCommand::RosRequestTopic;
1573  }
1574  if (cmd == "getPid") {
1575  return PortCoreCommand::RosGetPid;
1576  }
1577  if (cmd == "getBusInfo") {
1578  return PortCoreCommand::RosGetBusInfo;
1579  }
1580  }
1581 
1582  auto cmd = static_cast<PortCoreCommand>(v.asVocab32());
1583  switch (cmd) {
1584  case PortCoreCommand::Help:
1585  case PortCoreCommand::Ver:
1586  case PortCoreCommand::Pray:
1587  case PortCoreCommand::Add:
1588  case PortCoreCommand::Del:
1589  case PortCoreCommand::Atch:
1590  case PortCoreCommand::Dtch:
1591  case PortCoreCommand::List:
1592  case PortCoreCommand::Set:
1593  case PortCoreCommand::Get:
1594  case PortCoreCommand::Prop:
1595  case PortCoreCommand::RosPublisherUpdate:
1596  case PortCoreCommand::RosRequestTopic:
1597  case PortCoreCommand::RosGetPid:
1598  case PortCoreCommand::RosGetBusInfo:
1599  return cmd;
1600  default:
1601  return PortCoreCommand::Unknown;
1602  }
1603 }
1604 
1605 PortCoreConnectionDirection parseConnectionDirection(yarp::conf::vocab32_t v, bool errorIsOut = false)
1606 {
1607  auto dir = static_cast<PortCoreConnectionDirection>(v);
1608  switch (dir) {
1609  case PortCoreConnectionDirection::In:
1610  case PortCoreConnectionDirection::Out:
1611  return dir;
1612  default:
1613  return errorIsOut ? PortCoreConnectionDirection::Out : PortCoreConnectionDirection::Error;
1614  }
1615 }
1616 
1617 PortCorePropertyAction parsePropertyAction(yarp::conf::vocab32_t v)
1618 {
1619  auto action = static_cast<PortCorePropertyAction>(v);
1620  switch (action) {
1621  case PortCorePropertyAction::Get:
1622  case PortCorePropertyAction::Set:
1623  return action;
1624  default:
1625  return PortCorePropertyAction::Error;
1626  }
1627 }
1628 
1629 void describeRoute(const Route& route, Bottle& result)
1630 {
1631  Bottle& bfrom = result.addList();
1632  bfrom.addString("from");
1633  bfrom.addString(route.getFromName());
1634 
1635  Bottle& bto = result.addList();
1636  bto.addString("to");
1637  bto.addString(route.getToName());
1638 
1639  Bottle& bcarrier = result.addList();
1640  bcarrier.addString("carrier");
1641  bcarrier.addString(route.getCarrierName());
1642 
1643  Carrier* carrier = Carriers::chooseCarrier(route.getCarrierName());
1644  if (carrier->isConnectionless()) {
1645  Bottle& bconnectionless = result.addList();
1646  bconnectionless.addString("connectionless");
1647  bconnectionless.addInt32(1);
1648  }
1649  if (!carrier->isPush()) {
1650  Bottle& breverse = result.addList();
1651  breverse.addString("push");
1652  breverse.addInt32(0);
1653  }
1654  delete carrier;
1655 }
1656 
1657 } // namespace
1658 
1660  void* id)
1661 {
1662  Bottle cmd;
1663  Bottle result;
1664 
1665  // We've received a message to the port that is marked as administrative.
1666  // That means that instead of passing it along as data to the user of the
1667  // port, the port itself is responsible for reading and responding to
1668  // it. So let's read the message and see what we're supposed to do.
1669  cmd.read(reader);
1670 
1671  yCDebug(PORTCORE, "Port %s received command %s", getName().c_str(), cmd.toString().c_str());
1672 
1673  auto handleAdminHelpCmd = []() {
1674  Bottle result;
1675  // We give a list of the most useful administrative commands.
1676  result.addVocab32('m', 'a', 'n', 'y');
1677  result.addString("[help] # give this help");
1678  result.addString("[ver] # report protocol version information");
1679  result.addString("[add] $portname # add an output connection");
1680  result.addString("[add] $portname $car # add an output with a given protocol");
1681  result.addString("[del] $portname # remove an input or output connection");
1682  result.addString("[list] [in] # list input connections");
1683  result.addString("[list] [out] # list output connections");
1684  result.addString("[list] [in] $portname # give details for input");
1685  result.addString("[list] [out] $portname # give details for output");
1686  result.addString("[prop] [get] # get all user-defined port properties");
1687  result.addString("[prop] [get] $prop # get a user-defined port property (prop, val)");
1688  result.addString("[prop] [set] $prop $val # set a user-defined port property (prop, val)");
1689  result.addString("[prop] [get] $portname # get Qos properties of a connection to/from a port");
1690  result.addString("[prop] [set] $portname # set Qos properties of a connection to/from a port");
1691  result.addString("[prop] [get] $cur_port # get information about current process (e.g., scheduling priority, pid)");
1692  result.addString("[prop] [set] $cur_port # set properties of the current process (e.g., scheduling priority, pid)");
1693  result.addString("[atch] [out] $prop # attach a portmonitor plug-in to the port's output");
1694  result.addString("[atch] [in] $prop # attach a portmonitor plug-in to the port's input");
1695  result.addString("[dtch] [out] # detach portmonitor plug-in from the port's output");
1696  result.addString("[dtch] [in] # detach portmonitor plug-in from the port's input");
1697  //result.addString("[atch] $portname $prop # attach a portmonitor plug-in to the connection to/from $portname");
1698  //result.addString("[dtch] $portname # detach any portmonitor plug-in from the connection to/from $portname");
1699  return result;
1700  };
1701 
1702  auto handleAdminVerCmd = []() {
1703  // Gives a version number for the administrative commands.
1704  // It is distinct from YARP library versioning.
1705  Bottle result;
1706  result.addVocab32("ver");
1707  result.addInt32(1);
1708  result.addInt32(2);
1709  result.addInt32(3);
1710  return result;
1711  };
1712 
1713  auto handleAdminPrayCmd = [this]() {
1714  // Strongly inspired by nethack #pray command:
1715  // https://nethackwiki.com/wiki/Prayer
1716  // http://www.steelypips.org/nethack/pray.html
1717 
1718  Bottle result;
1719 
1720  bool found = false;
1721  std::string name = yarp::conf::environment::get_string("YARP_ROBOT_NAME", &found);
1722  if (!found) {
1723  name = getName();
1724  // Remove initial "/"
1725  while (name[0] == '/') {
1726  name = name.substr(1);
1727  }
1728  // Keep only the first part of the port name
1729  auto i = name.find('/');
1730  if (i != std::string::npos) {
1731  name = name.substr(0, i);
1732  }
1733  }
1734 
1735  std::random_device rd;
1736  std::mt19937 mt(rd());
1737  std::uniform_int_distribution<int> dist2(0,1);
1738  auto d2 = std::bind(dist2, mt);
1739 
1740  result.addString("You begin praying to " + name + ".");
1741  result.addString("You finish your prayer.");
1742 
1743  static const char* godvoices[] = {
1744  "booms out",
1745  "thunders",
1746  "rings out",
1747  "booms",
1748  };
1749  std::uniform_int_distribution<int> godvoices_dist(0, (sizeof(godvoices) / sizeof(godvoices[0])) - 1);
1750  auto godvoice = [&]() {
1751  return std::string(godvoices[godvoices_dist(mt)]);
1752  };
1753 
1754  static const char* creatures[] = {
1755  "mortal",
1756  "creature",
1757  "robot",
1758  };
1759  std::uniform_int_distribution<int> creatures_dist(0, (sizeof(creatures) / sizeof(creatures[0])) - 1);
1760  auto creature = [&]() {
1761  return std::string(creatures[creatures_dist(mt)]);
1762  };
1763 
1764  static const char* auras[] = {
1765  "amber",
1766  "light blue",
1767  "golden",
1768  "white",
1769  "orange",
1770  "black",
1771  };
1772  std::uniform_int_distribution<int> auras_dist(0, (sizeof(auras) / sizeof(auras[0])) - 1);
1773  auto aura = [&]() {
1774  return std::string(auras[auras_dist(mt)]);
1775  };
1776 
1777  static const char* items[] = {
1778  "keyboard",
1779  "mouse",
1780  "monitor",
1781  "headphones",
1782  "smartphone",
1783  "wallet",
1784  "eyeglasses",
1785  "shirt",
1786  };
1787  std::uniform_int_distribution<int> items_dist(0, (sizeof(items) / sizeof(items[0])) - 1);
1788  auto item = [&]() {
1789  return std::string(items[items_dist(mt)]);
1790  };
1791 
1792  static const char* blessings[] = {
1793  "You feel more limber.",
1794  "The slime disappears.",
1795  "Your amulet vanishes! You can breathe again.",
1796  "You can breathe again.",
1797  "You are back on solid ground.",
1798  "Your stomach feels content.",
1799  "You feel better.",
1800  "You feel much better.",
1801  "Your surroundings change.",
1802  "Your shape becomes uncertain.",
1803  "Your chain disappears.",
1804  "There's a tiger in your tank.",
1805  "You feel in good health again.",
1806  "Your eye feels better.",
1807  "Your eyes feel better.",
1808  "Looks like you are back in Kansas.",
1809  "Your <ITEM> softly glows <AURA>.",
1810  };
1811  std::uniform_int_distribution<int> blessings_dist(0, (sizeof(blessings) / sizeof(blessings[0])) - 1);
1812  auto blessing = [&](){
1813  auto blessing = std::string(blessings[blessings_dist(mt)]);
1814  blessing = std::regex_replace(blessing, std::regex("<ITEM>"), item());
1815  blessing = std::regex_replace(blessing, std::regex("<AURA>"), aura());
1816  return blessing;
1817  };
1818 
1819  std::uniform_int_distribution<int> dist13(0,12);
1820  switch(dist13(mt)) {
1821  case 0:
1822  case 1:
1823  result.addString("You feel that " + name + " is " + (d2() ? "bummed" : "displeased") + ".");
1824  break;
1825  case 2:
1826  case 3:
1827  result.addString("The voice of " + name + " " + godvoice() +
1828  ": \"Thou " + (d2() ? "hast strayed from the path" : "art arrogant") +
1829  ", " + creature() + ". Thou must relearn thy lessons!\"");
1830  break;
1831  case 4:
1832  case 5:
1833  result.addString("The voice of " + name + " " + godvoice() +
1834  ": \"Thou hast angered me.\"");
1835  result.addString("A black glow surrounds you.");
1836  break;
1837  case 6:
1838  result.addString("The voice of " + name + " " + godvoice() +
1839  ": \"Thou hast angered me.\"");
1840  break;
1841  case 7:
1842  case 8:
1843  result.addString("The voice of " + name + " " + godvoice() +
1844  ": \"Thou durst " + (d2() ? "scorn" : "call upon") +
1845  " me? Then die, " + creature() + "!\"");
1846  break;
1847  case 9:
1848  result.addString("You feel that " + name + " is " + (d2() ? "pleased as punch" : "well-pleased") + ".");
1849  result.addString(blessing());
1850  break;
1851  case 10:
1852  result.addString("You feel that " + name + " is " + (d2() ? "ticklish" : "pleased") + ".");
1853  result.addString(blessing());
1854  break;
1855  case 11:
1856  result.addString("You feel that " + name + " is " + (d2() ? "full" : "satisfied") + ".");
1857  result.addString(blessing());
1858  break;
1859  default:
1860  result.addString("The voice of " + name + " " + godvoice() +
1861  ": \"Thou hast angered me.\"");
1862  result.addString("Suddenly, a bolt of lightning strikes you!");
1863  result.addString("You fry to a crisp!");
1864  break;
1865  }
1866 
1867  return result;
1868  };
1869 
1870  auto handleAdminAddCmd = [this, id](std::string output,
1871  const std::string& carrier) {
1872  // Add an output to the port.
1873  Bottle result;
1874  StringOutputStream cache;
1875  if (!carrier.empty()) {
1876  output = carrier + ":/" + output;
1877  }
1878  addOutput(output, id, &cache, false);
1879  std::string r = cache.toString();
1880  int v = (r[0] == 'A') ? 0 : -1;
1881  result.addInt32(v);
1882  result.addString(r);
1883  return result;
1884  };
1885 
1886  auto handleAdminDelCmd = [this, id](const std::string& dest) {
1887  // Delete any inputs or outputs involving the named port.
1888  Bottle result;
1889  StringOutputStream cache;
1890  removeOutput(dest, id, &cache);
1891  std::string r1 = cache.toString();
1892  cache.reset();
1893  removeInput(dest, id, &cache);
1894  std::string r2 = cache.toString();
1895  int v = (r1[0] == 'R' || r2[0] == 'R') ? 0 : -1;
1896  result.addInt32(v);
1897  if (r1[0] == 'R' && r2[0] != 'R') {
1898  result.addString(r1);
1899  } else if (r1[0] != 'R' && r2[0] == 'R') {
1900  result.addString(r2);
1901  } else {
1902  result.addString(r1 + r2);
1903  }
1904  return result;
1905  };
1906 
1907  auto handleAdminAtchCmd = [this](PortCoreConnectionDirection direction,
1908  Property prop) {
1909  Bottle result;
1910  switch (direction) {
1911  case PortCoreConnectionDirection::Out: {
1912  std::string errMsg;
1913  if (!attachPortMonitor(prop, true, errMsg)) {
1914  result.addVocab32("fail");
1915  result.addString(errMsg);
1916  } else {
1917  result.addVocab32("ok");
1918  }
1919  } break;
1920  case PortCoreConnectionDirection::In: {
1921  std::string errMsg;
1922  if (!attachPortMonitor(prop, false, errMsg)) {
1923  result.addVocab32("fail");
1924  result.addString(errMsg);
1925  } else {
1926  result.addVocab32("ok");
1927  }
1928  } break;
1929  case PortCoreConnectionDirection::Error:
1930  result.addVocab32("fail");
1931  result.addString("attach command must be followed by [out] or [in]");
1932  }
1933  return result;
1934  };
1935 
1936  auto handleAdminDtchCmd = [this](PortCoreConnectionDirection direction) {
1937  Bottle result;
1938  switch (direction) {
1939  case PortCoreConnectionDirection::Out: {
1940  if (detachPortMonitor(true)) {
1941  result.addVocab32("ok");
1942  } else {
1943  result.addVocab32("fail");
1944  }
1945  } break;
1946  case PortCoreConnectionDirection::In: {
1947  if (detachPortMonitor(false)) {
1948  result.addVocab32("ok");
1949  } else {
1950  result.addVocab32("fail");
1951  }
1952  } break;
1953  case PortCoreConnectionDirection::Error:
1954  result.addVocab32("fail");
1955  result.addString("detach command must be followed by [out] or [in]");
1956  };
1957  return result;
1958  };
1959 
1960  auto handleAdminListCmd = [this](const PortCoreConnectionDirection direction,
1961  const std::string& target) {
1962  Bottle result;
1963  switch (direction) {
1964  case PortCoreConnectionDirection::In: {
1965  // Return a list of all input connections.
1966  std::lock_guard<std::mutex> lock(m_stateMutex);
1967  for (auto* unit : m_units) {
1968  if ((unit != nullptr) && unit->isInput() && !unit->isFinished()) {
1969  Route route = unit->getRoute();
1970  if (target.empty()) {
1971  const std::string& name = route.getFromName();
1972  if (!name.empty()) {
1973  result.addString(name);
1974  }
1975  } else if (route.getFromName() == target) {
1976  describeRoute(route, result);
1977  }
1978  }
1979  }
1980  } break;
1981  case PortCoreConnectionDirection::Out: {
1982  // Return a list of all output connections.
1983  std::lock_guard<std::mutex> lock(m_stateMutex);
1984  for (auto* unit : m_units) {
1985  if ((unit != nullptr) && unit->isOutput() && !unit->isFinished()) {
1986  Route route = unit->getRoute();
1987  if (target.empty()) {
1988  result.addString(route.getToName());
1989  } else if (route.getToName() == target) {
1990  describeRoute(route, result);
1991  }
1992  }
1993  }
1994  } break;
1995  case PortCoreConnectionDirection::Error:
1996  // Should never happen
1997  yCAssert(PORTCORE, false);
1998  break;
1999  }
2000  return result;
2001  };
2002 
2003  auto handleAdminSetInCmd = [this](const std::string& target,
2004  const Property& property) {
2005  Bottle result;
2006  // Set carrier parameters on a given input connection.
2007  std::lock_guard<std::mutex> lock(m_stateMutex);
2008  if (target.empty()) {
2009  result.addInt32(-1);
2010  result.addString("target port is not specified.\r\n");
2011  } else {
2012  if (target == getName()) {
2013  std::string errMsg;
2014  if (!setParamPortMonitor(property, false, errMsg)) {
2015  result.addVocab32("fail");
2016  result.addString(errMsg);
2017  } else {
2018  result.addVocab32("ok");
2019  }
2020  } else {
2021  for (auto* unit : m_units) {
2022  if ((unit != nullptr) && unit->isInput() && !unit->isFinished()) {
2023  Route route = unit->getRoute();
2024  if (route.getFromName() == target) {
2025  unit->setCarrierParams(property);
2026  result.addInt32(0);
2027  std::string msg = "Configured connection from ";
2028  msg += route.getFromName();
2029  msg += "\r\n";
2030  result.addString(msg);
2031  break;
2032  }
2033  }
2034  }
2035  }
2036  if (result.size() == 0) {
2037  result.addInt32(-1);
2038  std::string msg = "Could not find an incoming connection from ";
2039  msg += target;
2040  msg += "\r\n";
2041  result.addString(msg);
2042  }
2043  }
2044  return result;
2045  };
2046 
2047  auto handleAdminSetOutCmd = [this](const std::string& target,
2048  const Property& property) {
2049  Bottle result;
2050  // Set carrier parameters on a given output connection.
2051  std::lock_guard<std::mutex> lock(m_stateMutex);
2052  if (target.empty()) {
2053  result.addInt32(-1);
2054  result.addString("target port is not specified.\r\n");
2055  } else {
2056  if (target == getName()) {
2057  std::string errMsg;
2058  if (!setParamPortMonitor(property, true, errMsg)) {
2059  result.addVocab32("fail");
2060  result.addString(errMsg);
2061  } else {
2062  result.addVocab32("ok");
2063  }
2064  } else {
2065  for (auto* unit : m_units) {
2066  if ((unit != nullptr) && unit->isOutput() && !unit->isFinished()) {
2067  Route route = unit->getRoute();
2068  if (route.getToName() == target) {
2069  unit->setCarrierParams(property);
2070  result.addInt32(0);
2071  std::string msg = "Configured connection to ";
2072  msg += route.getToName();
2073  msg += "\r\n";
2074  result.addString(msg);
2075  break;
2076  }
2077  }
2078  }
2079  }
2080  if (result.size() == 0) {
2081  result.addInt32(-1);
2082  std::string msg = "Could not find an incoming connection to ";
2083  msg += target;
2084  msg += "\r\n";
2085  result.addString(msg);
2086  }
2087  }
2088  return result;
2089  };
2090 
2091  auto handleAdminGetInCmd = [this](const std::string& target) {
2092  Bottle result;
2093  // Get carrier parameters for a given input connection.
2094  std::lock_guard<std::mutex> lock(m_stateMutex);
2095  if (target.empty()) {
2096  result.addInt32(-1);
2097  result.addString("target port is not specified.\r\n");
2098  } else if (target == getName()) {
2099  yarp::os::Property property;
2100  std::string errMsg;
2101  if (!getParamPortMonitor(property, false, errMsg)) {
2102  result.addVocab32("fail");
2103  result.addString(errMsg);
2104  } else {
2105  result.addDict() = property;
2106  }
2107  } else {
2108  for (auto* unit : m_units) {
2109  if ((unit != nullptr) && unit->isInput() && !unit->isFinished()) {
2110  Route route = unit->getRoute();
2111  if (route.getFromName() == target) {
2112  yarp::os::Property property;
2113  unit->getCarrierParams(property);
2114  result.addDict() = property;
2115  break;
2116  }
2117  }
2118  }
2119  if (result.size() == 0) {
2120  result.addInt32(-1);
2121  std::string msg = "Could not find an incoming connection from ";
2122  msg += target;
2123  msg += "\r\n";
2124  result.addString(msg);
2125  }
2126  }
2127  return result;
2128  };
2129 
2130  auto handleAdminGetOutCmd = [this](const std::string& target) {
2131  Bottle result;
2132  // Get carrier parameters for a given output connection.
2133  std::lock_guard<std::mutex> lock(m_stateMutex);
2134  if (target.empty()) {
2135  result.addInt32(-1);
2136  result.addString("target port is not specified.\r\n");
2137  } else if (target == getName()) {
2138  yarp::os::Property property;
2139  std::string errMsg;
2140  if (!getParamPortMonitor(property, true, errMsg)) {
2141  result.addVocab32("fail");
2142  result.addString(errMsg);
2143  } else {
2144  result.addDict() = property;
2145  }
2146  } else {
2147  for (auto* unit : m_units) {
2148  if ((unit != nullptr) && unit->isOutput() && !unit->isFinished()) {
2149  Route route = unit->getRoute();
2150  if (route.getToName() == target) {
2151  yarp::os::Property property;
2152  unit->getCarrierParams(property);
2153  result.addDict() = property;
2154  break;
2155  }
2156  }
2157  }
2158  if (result.size() == 0) {
2159  result.addInt32(-1);
2160  std::string msg = "Could not find an incoming connection to ";
2161  msg += target;
2162  msg += "\r\n";
2163  result.addString(msg);
2164  }
2165  }
2166  return result;
2167  };
2168 
2169  auto handleAdminPropGetCmd = [this](const std::string& key) {
2170  Bottle result;
2171  Property* p = acquireProperties(false);
2172  if (p != nullptr) {
2173  if (key.empty()) {
2174  result.fromString(p->toString());
2175  } else {
2176  // request: "prop get /portname"
2177  if (key[0] == '/') {
2178  bool bFound = false;
2179  // check for their own name
2180  if (key == getName()) {
2181  bFound = true;
2182  Bottle& sched = result.addList();
2183  sched.addString("sched");
2184  Property& sched_prop = sched.addDict();
2185  sched_prop.put("tid", static_cast<int>(this->getTid()));
2186  sched_prop.put("priority", this->getPriority());
2187  sched_prop.put("policy", this->getPolicy());
2188 
2190  Bottle& proc = result.addList();
2191  proc.addString("process");
2192  Property& proc_prop = proc.addDict();
2193  proc_prop.put("pid", info.pid);
2194  proc_prop.put("name", (info.pid != -1) ? info.name : "unknown");
2195  proc_prop.put("arguments", (info.pid != -1) ? info.arguments : "unknown");
2196  proc_prop.put("priority", info.schedPriority);
2197  proc_prop.put("policy", info.schedPolicy);
2198 
2200  Bottle& platform = result.addList();
2201  platform.addString("platform");
2202  Property& platform_prop = platform.addDict();
2203  platform_prop.put("os", pinfo.name);
2204  platform_prop.put("hostname", m_address.getHost());
2205 
2206  unsigned int f = getFlags();
2207  bool is_input = (f & PORTCORE_IS_INPUT) != 0;
2208  bool is_output = (f & PORTCORE_IS_OUTPUT) != 0;
2209  bool is_rpc = (f & PORTCORE_IS_RPC) != 0;
2210  Bottle& port = result.addList();
2211  port.addString("port");
2212  Property& port_prop = port.addDict();
2213  port_prop.put("is_input", is_input);
2214  port_prop.put("is_output", is_output);
2215  port_prop.put("is_rpc", is_rpc);
2216  port_prop.put("type", getType().getName());
2217  } else {
2218  for (auto* unit : m_units) {
2219  if ((unit != nullptr) && !unit->isFinished()) {
2220  Route route = unit->getRoute();
2221  std::string coreName = (unit->isOutput()) ? route.getToName() : route.getFromName();
2222  if (key == coreName) {
2223  bFound = true;
2224  int priority = unit->getPriority();
2225  int policy = unit->getPolicy();
2226  int tos = getTypeOfService(unit);
2227  int tid = static_cast<int>(unit->getTid());
2228  Bottle& sched = result.addList();
2229  sched.addString("sched");
2230  Property& sched_prop = sched.addDict();
2231  sched_prop.put("tid", tid);
2232  sched_prop.put("priority", priority);
2233  sched_prop.put("policy", policy);
2234  Bottle& qos = result.addList();
2235  qos.addString("qos");
2236  Property& qos_prop = qos.addDict();
2237  qos_prop.put("tos", tos);
2238  }
2239  } // end isFinished()
2240  } // end for loop
2241  } // end portName == getname()
2242 
2243  if (!bFound) { // cannot find any port matches the requested one
2244  result.addVocab32("fail");
2245  std::string msg = "cannot find any connection to/from ";
2246  msg = msg + key;
2247  result.addString(msg);
2248  }
2249  // end of (portName[0] == '/')
2250  } else {
2251  result.add(p->find(key));
2252  }
2253  }
2254  }
2255  releaseProperties(p);
2256  return result;
2257  };
2258 
2259  auto handleAdminPropSetCmd = [this](const std::string& key,
2260  const Value& value,
2261  const Bottle& process,
2262  const Bottle& sched,
2263  const Bottle& qos) {
2264  Bottle result;
2265  Property* p = acquireProperties(false);
2266  bool bOk = true;
2267  if (p != nullptr) {
2268  p->put(key, value);
2269  // setting scheduling properties of all threads within the process
2270  // scope through the admin port
2271  // e.g. prop set /current_port (process ((priority 30) (policy 1)))
2272  if (!process.isNull()) {
2273  std::string portName = key;
2274  if ((!portName.empty()) && (portName[0] == '/')) {
2275  // check for their own name
2276  if (portName == getName()) {
2277  bOk = false;
2278  Bottle* process_prop = process.find("process").asList();
2279  if (process_prop != nullptr) {
2280  int prio = -1;
2281  int policy = -1;
2282  if (process_prop->check("priority")) {
2283  prio = process_prop->find("priority").asInt32();
2284  }
2285  if (process_prop->check("policy")) {
2286  policy = process_prop->find("policy").asInt32();
2287  }
2288  bOk = setProcessSchedulingParam(prio, policy);
2289  }
2290  }
2291  }
2292  }
2293  // check if we need to set the PortCoreUnit scheduling policy
2294  // e.g., "prop set /portname (sched ((priority 30) (policy 1)))"
2295  // The priority and policy values on Linux are:
2296  // SCHED_OTHER : policy=0, priority=[0 .. 0]
2297  // SCHED_FIFO : policy=1, priority=[1 .. 99]
2298  // SCHED_RR : policy=2, priority=[1 .. 99]
2299  if (!sched.isNull()) {
2300  if ((!key.empty()) && (key[0] == '/')) {
2301  bOk = false;
2302  for (auto* unit : m_units) {
2303  if ((unit != nullptr) && !unit->isFinished()) {
2304  Route route = unit->getRoute();
2305  std::string portName = (unit->isOutput()) ? route.getToName() : route.getFromName();
2306 
2307  if (portName == key) {
2308  Bottle* sched_prop = sched.find("sched").asList();
2309  if (sched_prop != nullptr) {
2310  int prio = -1;
2311  int policy = -1;
2312  if (sched_prop->check("priority")) {
2313  prio = sched_prop->find("priority").asInt32();
2314  }
2315  if (sched_prop->check("policy")) {
2316  policy = sched_prop->find("policy").asInt32();
2317  }
2318  bOk = (unit->setPriority(prio, policy) != -1);
2319  } else {
2320  bOk = false;
2321  }
2322  break;
2323  }
2324  }
2325  }
2326  }
2327  }
2328  // check if we need to set the packet QOS policy
2329  // e.g., "prop set /portname (qos ((priority HIGH)))"
2330  // e.g., "prop set /portname (qos ((dscp AF12)))"
2331  // e.g., "prop set /portname (qos ((tos 12)))"
2332  if (!qos.isNull()) {
2333  if ((!key.empty()) && (key[0] == '/')) {
2334  bOk = false;
2335  for (auto* unit : m_units) {
2336  if ((unit != nullptr) && !unit->isFinished()) {
2337  Route route = unit->getRoute();
2338  std::string portName = (unit->isOutput()) ? route.getToName() : route.getFromName();
2339  if (portName == key) {
2340  Bottle* qos_prop = qos.find("qos").asList();
2341  if (qos_prop != nullptr) {
2342  int tos = -1;
2343  if (qos_prop->check("priority")) {
2344  // set the packet TOS value on the socket based on some predefined
2345  // priority levels.
2346  // the expected levels are: LOW, NORM, HIGH, CRIT
2347  NetInt32 priority = qos_prop->find("priority").asVocab32();
2348  int dscp;
2349  switch (priority) {
2350  case yarp::os::createVocab32('L', 'O', 'W'):
2351  dscp = 10;
2352  break;
2353  case yarp::os::createVocab32('N', 'O', 'R', 'M'):
2354  dscp = 0;
2355  break;
2356  case yarp::os::createVocab32('H', 'I', 'G', 'H'):
2357  dscp = 36;
2358  break;
2359  case yarp::os::createVocab32('C', 'R', 'I', 'T'):
2360  dscp = 44;
2361  break;
2362  default:
2363  dscp = -1;
2364  }
2365  if (dscp >= 0) {
2366  tos = (dscp << 2);
2367  }
2368  } else if (qos_prop->check("dscp")) {
2369  // Set the packet TOS value on the socket based on the DSCP level
2370  QosStyle::PacketPriorityDSCP dscp_class = QosStyle::getDSCPByVocab(qos_prop->find("dscp").asVocab32());
2371  int dscp = -1;
2372  if (dscp_class == QosStyle::DSCP_Invalid) {
2373  auto dscp_val = qos_prop->find("dscp");
2374  if (dscp_val.isInt32()) {
2375  dscp = dscp_val.asInt32();
2376  }
2377  } else {
2378  dscp = static_cast<int>(dscp_class);
2379  }
2380  if ((dscp >= 0) && (dscp < 64)) {
2381  tos = (dscp << 2);
2382  }
2383  } else if (qos_prop->check("tos")) {
2384  // Set the TOS value directly
2385  auto tos_val = qos_prop->find("tos");
2386  if (tos_val.isInt32()) {
2387  tos = tos_val.asInt32();
2388  }
2389  }
2390  if (tos >= 0) {
2391  bOk = setTypeOfService(unit, tos);
2392  }
2393  } else {
2394  bOk = false;
2395  }
2396  break;
2397  }
2398  }
2399  }
2400  }
2401  }
2402  }
2403  releaseProperties(p);
2404  result.addVocab32((bOk) ? "ok" : "fail");
2405  return result;
2406  };
2407 
2408  // NOTE: YARP partially supports the ROS Slave API https://wiki.ros.org/ROS/Slave_API
2409 
2410  auto handleAdminRosPublisherUpdateCmd = [this](const std::string& topic, Bottle* pubs) {
2411  // When running against a ROS name server, we need to
2412  // support ROS-style callbacks for connecting publishers
2413  // with subscribers. Note: this should not be necessary
2414  // anymore, now that a dedicated yarp::os::Node class
2415  // has been implemented, but is still needed for older
2416  // ways of interfacing with ROS without using dedicated
2417  // node ports.
2418  Bottle result;
2419  if (pubs != nullptr) {
2420  Property listed;
2421  for (size_t i = 0; i < pubs->size(); i++) {
2422  std::string pub = pubs->get(i).asString();
2423  listed.put(pub, 1);
2424  }
2425  Property present;
2426  {
2427  // Critical section
2428  std::lock_guard<std::mutex> lock(m_stateMutex);
2429  for (auto* unit : m_units) {
2430  if ((unit != nullptr) && unit->isPupped()) {
2431  std::string me = unit->getPupString();
2432  present.put(me, 1);
2433  if (!listed.check(me)) {
2434  unit->setDoomed();
2435  }
2436  }
2437  }
2438  }
2439  for (size_t i = 0; i < pubs->size(); i++) {
2440  std::string pub = pubs->get(i).asString();
2441  if (!present.check(pub)) {
2442  yCDebug(PORTCORE, "ROS ADD %s", pub.c_str());
2443  Bottle req;
2444  Bottle reply;
2445  req.addString("requestTopic");
2446  NestedContact nc(getName());
2447  req.addString(nc.getNodeName());
2448  req.addString(topic);
2449  Bottle& lst = req.addList();
2450  Bottle& sublst = lst.addList();
2451  sublst.addString("TCPROS");
2452  yCDebug(PORTCORE, "Sending [%s] to %s", req.toString().c_str(), pub.c_str());
2453  Contact c = Contact::fromString(pub);
2454  if (!__pc_rpc(c, "xmlrpc", req, reply, false)) {
2455  fprintf(stderr, "Cannot connect to ROS subscriber %s\n", pub.c_str());
2456  // show diagnosics
2457  __pc_rpc(c, "xmlrpc", req, reply, true);
2458  __tcp_check(c);
2459  } else {
2460  Bottle* pref = reply.get(2).asList();
2461  std::string hostname;
2462  std::string carrier;
2463  int portnum = 0;
2464  if (reply.get(0).asInt32() != 1) {
2465  fprintf(stderr, "Failure looking up topic %s: %s\n", topic.c_str(), reply.toString().c_str());
2466  } else if (pref == nullptr) {
2467  fprintf(stderr, "Failure looking up topic %s: expected list of protocols\n", topic.c_str());
2468  } else if (pref->get(0).asString() != "TCPROS") {
2469  fprintf(stderr, "Failure looking up topic %s: unsupported protocol %s\n", topic.c_str(), pref->get(0).asString().c_str());
2470  } else {
2471  Value hostname2 = pref->get(1);
2472  Value portnum2 = pref->get(2);
2473  hostname = hostname2.asString();
2474  portnum = portnum2.asInt32();
2475  carrier = "tcpros+role.pub+topic.";
2476  carrier += topic;
2477  yCDebug(PORTCORE, "topic %s available at %s:%d", topic.c_str(), hostname.c_str(), portnum);
2478  }
2479  if (portnum != 0) {
2480  Contact addr(hostname, portnum);
2481  OutputProtocol* op = nullptr;
2482  Route r = Route(getName(), pub, carrier);
2483  op = Carriers::connect(addr);
2484  if (op == nullptr) {
2485  fprintf(stderr, "NO CONNECTION\n");
2486  std::exit(1);
2487  } else {
2488  op->attachPort(m_contactable);
2489  op->open(r);
2490  }
2491  Route route = op->getRoute();
2492  route.swapNames();
2493  op->rename(route);
2494  InputProtocol* ip = &(op->getInput());
2495  {
2496  // Critical section
2497  std::lock_guard<std::mutex> lock(m_stateMutex);
2498  PortCoreUnit* unit = new PortCoreInputUnit(*this,
2499  getNextIndex(),
2500  ip,
2501  true);
2502  yCAssert(PORTCORE, unit != nullptr);
2503  unit->setPupped(pub);
2504  unit->start();
2505  m_units.push_back(unit);
2506  }
2507  }
2508  }
2509  }
2510  }
2511  }
2512  result.addInt32(1);
2513  result.addString("ok");
2514  return result;
2515  };
2516 
2517  auto handleAdminRosRequestTopicCmd = [this]() {
2518  // ROS-style query for topics.
2519  Bottle result;
2520  result.addInt32(1);
2521  NestedContact nc(getName());
2522  result.addString(nc.getNodeName());
2523  Bottle& lst = result.addList();
2524  Contact addr = getAddress();
2525  lst.addString("TCPROS");
2526  lst.addString(addr.getHost());
2527  lst.addInt32(addr.getPort());
2528  return result;
2529  };
2530 
2531  auto handleAdminRosGetPidCmd = []() {
2532  // ROS-style query for PID.
2533  Bottle result;
2534  result.addInt32(1);
2535  result.addString("");
2536  result.addInt32(yarp::os::impl::getpid());
2537  return result;
2538  };
2539 
2540  auto handleAdminRosGetBusInfoCmd = []() {
2541  // ROS-style query for bus information - we support this
2542  // in yarp::os::Node but not otherwise.
2543  Bottle result;
2544  result.addInt32(1);
2545  result.addString("");
2546  result.addList().addList();
2547  return result;
2548  };
2549 
2550  auto handleAdminUnknownCmd = [this](const Bottle& cmd) {
2551  Bottle result;
2552  bool ok = false;
2553  if (m_adminReader != nullptr) {
2554  DummyConnector con;
2555  cmd.write(con.getWriter());
2556  lockCallback();
2557  ok = m_adminReader->read(con.getReader());
2558  unlockCallback();
2559  if (ok) {
2560  result.read(con.getReader());
2561  }
2562  }
2563  if (!ok) {
2564  result.addVocab32("fail");
2565  result.addString("send [help] for list of valid commands");
2566  }
2567  return result;
2568  };
2569 
2570  const PortCoreCommand command = parseCommand(cmd.get(0));
2571  switch (command) {
2572  case PortCoreCommand::Help:
2573  result = handleAdminHelpCmd();
2574  break;
2575  case PortCoreCommand::Ver:
2576  result = handleAdminVerCmd();
2577  break;
2578  case PortCoreCommand::Pray:
2579  result = handleAdminPrayCmd();
2580  break;
2581  case PortCoreCommand::Add: {
2582  std::string output = cmd.get(1).asString();
2583  std::string carrier = cmd.get(2).asString();
2584  result = handleAdminAddCmd(std::move(output), carrier);
2585  } break;
2586  case PortCoreCommand::Del: {
2587  const std::string dest = cmd.get(1).asString();
2588  result = handleAdminDelCmd(dest);
2589  } break;
2590  case PortCoreCommand::Atch: {
2591  const PortCoreConnectionDirection direction = parseConnectionDirection(cmd.get(1).asVocab32());
2592  Property prop(cmd.get(2).asString().c_str());
2593  result = handleAdminAtchCmd(direction, std::move(prop));
2594  } break;
2595  case PortCoreCommand::Dtch: {
2596  const PortCoreConnectionDirection direction = parseConnectionDirection(cmd.get(1).asVocab32());
2597  result = handleAdminDtchCmd(direction);
2598  } break;
2599  case PortCoreCommand::List: {
2600  const PortCoreConnectionDirection direction = parseConnectionDirection(cmd.get(1).asVocab32(), true);
2601  const std::string target = cmd.get(2).asString();
2602  result = handleAdminListCmd(direction, target);
2603  } break;
2604  case PortCoreCommand::Set: {
2605  const PortCoreConnectionDirection direction = parseConnectionDirection(cmd.get(1).asVocab32(), true);
2606  const std::string target = cmd.get(2).asString();
2607  yarp::os::Property property;
2608  property.fromString(cmd.toString());
2609  switch (direction) {
2610  case PortCoreConnectionDirection::In:
2611  result = handleAdminSetInCmd(target, property);
2612  break;
2613  case PortCoreConnectionDirection::Out:
2614  result = handleAdminSetOutCmd(target, property);
2615  break;
2616  case PortCoreConnectionDirection::Error:
2617  yCAssert(PORTCORE, false); // Should never happen (error is out)
2618  break;
2619  }
2620  } break;
2621  case PortCoreCommand::Get: {
2622  const PortCoreConnectionDirection direction = parseConnectionDirection(cmd.get(1).asVocab32(), true);
2623  const std::string target = cmd.get(2).asString();
2624  switch (direction) {
2625  case PortCoreConnectionDirection::In:
2626  result = handleAdminGetInCmd(target);
2627  break;
2628  case PortCoreConnectionDirection::Out:
2629  result = handleAdminGetOutCmd(target);
2630  break;
2631  case PortCoreConnectionDirection::Error:
2632  yCAssert(PORTCORE, false); // Should never happen (error is out)
2633  break;
2634  }
2635  } break;
2636  case PortCoreCommand::Prop: {
2637  PortCorePropertyAction action = parsePropertyAction(cmd.get(1).asVocab32());
2638  const std::string key = cmd.get(2).asString();
2639  // Set/get arbitrary properties on a port.
2640  switch (action) {
2641  case PortCorePropertyAction::Get:
2642  result = handleAdminPropGetCmd(key);
2643  break;
2644  case PortCorePropertyAction::Set: {
2645  const Value& value = cmd.get(3);
2646  const Bottle& process = cmd.findGroup("process");
2647  const Bottle& sched = cmd.findGroup("sched");
2648  const Bottle& qos = cmd.findGroup("qos");
2649  result = handleAdminPropSetCmd(key, value, process, sched, qos);
2650  } break;
2651  case PortCorePropertyAction::Error:
2652  result.addVocab32("fail");
2653  result.addString("property action not known");
2654  break;
2655  }
2656  } break;
2657  case PortCoreCommand::RosPublisherUpdate: {
2658  yCDebug(PORTCORE, "publisherUpdate! --> %s", cmd.toString().c_str());
2659  // std::string caller_id = cmd.get(1).asString(); // Currently unused
2660  std::string topic = RosNameSpace::fromRosName(cmd.get(2).asString());
2661  Bottle* pubs = cmd.get(3).asList();
2662  result = handleAdminRosPublisherUpdateCmd(topic, pubs);
2663  reader.requestDrop(); // ROS needs us to close down.
2664  } break;
2665  case PortCoreCommand::RosRequestTopic:
2666  yCDebug(PORTCORE, "requestTopic! --> %s", cmd.toString().c_str());
2667  // std::string caller_id = cmd.get(1).asString(); // Currently unused
2668  // std::string topic = RosNameSpace::fromRosName(cmd.get(2).asString()); // Currently unused
2669  // Bottle protocols = cmd.get(3).asList(); // Currently unused
2670  result = handleAdminRosRequestTopicCmd();
2671  reader.requestDrop(); // ROS likes to close down.
2672  break;
2673  case PortCoreCommand::RosGetPid:
2674  // std::string caller_id = cmd.get(1).asString(); // Currently unused
2675  result = handleAdminRosGetPidCmd();
2676  reader.requestDrop(); // ROS likes to close down.
2677  break;
2678  case PortCoreCommand::RosGetBusInfo:
2679  // std::string caller_id = cmd.get(1).asString(); // Currently unused
2680  result = handleAdminRosGetBusInfoCmd();
2681  reader.requestDrop(); // ROS likes to close down.
2682  break;
2683  case PortCoreCommand::Unknown:
2684  result = handleAdminUnknownCmd(cmd);
2685  break;
2686  }
2687 
2688  ConnectionWriter* writer = reader.getWriter();
2689  if (writer != nullptr) {
2690  result.write(*writer);
2691  }
2692 
2693  return true;
2694 }
2695 
2696 
2697 bool PortCore::setTypeOfService(PortCoreUnit* unit, int tos)
2698 {
2699  if (unit == nullptr) {
2700  return false;
2701  }
2702 
2703  yCDebug(PORTCORE, "Trying to set TOS = %d", tos);
2704 
2705  if (unit->isOutput()) {
2706  auto* outUnit = dynamic_cast<PortCoreOutputUnit*>(unit);
2707  if (outUnit != nullptr) {
2708  OutputProtocol* op = outUnit->getOutPutProtocol();
2709  if (op != nullptr) {
2710  yCDebug(PORTCORE, "Trying to set TOS = %d on output unit", tos);
2711  bool ok = op->getOutputStream().setTypeOfService(tos);
2712  if (!ok) {
2713  yCWarning(PORTCORE, "Setting TOS on output unit failed");
2714  }
2715  return ok;
2716  }
2717  }
2718  }
2719 
2720  // Some of the input units may have output stream object to write back to
2721  // the connection (e.g., tcp ack and reply). Thus the QoS preferences should be
2722  // also configured for them.
2723 
2724 
2725  if (unit->isInput()) {
2726  auto* inUnit = dynamic_cast<PortCoreInputUnit*>(unit);
2727  if (inUnit != nullptr) {
2728  InputProtocol* ip = inUnit->getInPutProtocol();
2729  if ((ip != nullptr) && ip->getOutput().isOk()) {
2730  yCDebug(PORTCORE, "Trying to set TOS = %d on input unit", tos);
2731  bool ok = ip->getOutput().getOutputStream().setTypeOfService(tos);
2732  if (!ok) {
2733  yCWarning(PORTCORE, "Setting TOS on input unit failed");
2734  }
2735  return ok;
2736  }
2737  }
2738  }
2739  // if there is nothing to be set, returns true
2740  return true;
2741 }
2742 
2743 int PortCore::getTypeOfService(PortCoreUnit* unit)
2744 {
2745  if (unit == nullptr) {
2746  return -1;
2747  }
2748 
2749  if (unit->isOutput()) {
2750  auto* outUnit = dynamic_cast<PortCoreOutputUnit*>(unit);
2751  if (outUnit != nullptr) {
2752  OutputProtocol* op = outUnit->getOutPutProtocol();
2753  if (op != nullptr) {
2754  return op->getOutputStream().getTypeOfService();
2755  }
2756  }
2757  }
2758 
2759  // Some of the input units may have output stream object to write back to
2760  // the connection (e.g., tcp ack and reply). Thus the QoS preferences should be
2761  // also configured for them.
2762 
2763 
2764  if (unit->isInput()) {
2765  auto* inUnit = dynamic_cast<PortCoreInputUnit*>(unit);
2766  if (inUnit != nullptr) {
2767  InputProtocol* ip = inUnit->getInPutProtocol();
2768  if ((ip != nullptr) && ip->getOutput().isOk()) {
2769  return ip->getOutput().getOutputStream().getTypeOfService();
2770  }
2771  }
2772  }
2773  return -1;
2774 }
2775 
2776 // attach a portmonitor plugin to the port or to a specific connection
2777 bool PortCore::attachPortMonitor(yarp::os::Property& prop, bool isOutput, std::string& errMsg)
2778 {
2779  // attach to the current port
2780  Carrier* portmonitor = Carriers::chooseCarrier("portmonitor");
2781  if (portmonitor == nullptr) {
2782  errMsg = "Portmonitor carrier modifier cannot be find or it is not enabled in YARP!";
2783  return false;
2784  }
2785 
2786  if (isOutput) {
2787  detachPortMonitor(true);
2788  prop.put("source", getName());
2789  prop.put("destination", "");
2790  prop.put("sender_side", 1);
2791  prop.put("receiver_side", 0);
2792  prop.put("carrier", "");
2793  m_modifier.outputMutex.lock();
2794  m_modifier.outputModifier = portmonitor;
2795  if (!m_modifier.outputModifier->configureFromProperty(prop)) {
2796  m_modifier.releaseOutModifier();
2797  errMsg = "Failed to configure the portmonitor plug-in";
2798  m_modifier.outputMutex.unlock();
2799  return false;
2800  }
2801  m_modifier.outputMutex.unlock();
2802  } else {
2803  detachPortMonitor(false);
2804  prop.put("source", "");
2805  prop.put("destination", getName());
2806  prop.put("sender_side", 0);
2807  prop.put("receiver_side", 1);
2808  prop.put("carrier", "");
2809  m_modifier.inputMutex.lock();
2810  m_modifier.inputModifier = portmonitor;
2811  if (!m_modifier.inputModifier->configureFromProperty(prop)) {
2812  m_modifier.releaseInModifier();
2813  errMsg = "Failed to configure the portmonitor plug-in";
2814  m_modifier.inputMutex.unlock();
2815  return false;
2816  }
2817  m_modifier.inputMutex.unlock();
2818  }
2819  return true;
2820 }
2821 
2822 // detach the portmonitor from the port or specific connection
2823 bool PortCore::detachPortMonitor(bool isOutput)
2824 {
2825  if (isOutput) {
2826  m_modifier.outputMutex.lock();
2827  m_modifier.releaseOutModifier();
2828  m_modifier.outputMutex.unlock();
2829  } else {
2830  m_modifier.inputMutex.lock();
2831  m_modifier.releaseInModifier();
2832  m_modifier.inputMutex.unlock();
2833  }
2834  return true;
2835 }
2836 
2837 bool PortCore::setParamPortMonitor(const yarp::os::Property& param,
2838  bool isOutput,
2839  std::string& errMsg)
2840 {
2841  if (isOutput) {
2842  m_modifier.outputMutex.lock();
2843  if (m_modifier.outputModifier == nullptr) {
2844  errMsg = "No port modifier is attached to the output";
2845  m_modifier.outputMutex.unlock();
2846  return false;
2847  }
2848  m_modifier.outputModifier->setCarrierParams(param);
2849  m_modifier.outputMutex.unlock();
2850  } else {
2851  m_modifier.inputMutex.lock();
2852  if (m_modifier.inputModifier == nullptr) {
2853  errMsg = "No port modifier is attached to the input";
2854  m_modifier.inputMutex.unlock();
2855  return false;
2856  }
2857  m_modifier.inputModifier->setCarrierParams(param);
2858  m_modifier.inputMutex.unlock();
2859  }
2860  return true;
2861 }
2862 
2863 bool PortCore::getParamPortMonitor(yarp::os::Property& param,
2864  bool isOutput,
2865  std::string& errMsg)
2866 {
2867  if (isOutput) {
2868  m_modifier.outputMutex.lock();
2869  if (m_modifier.outputModifier == nullptr) {
2870  errMsg = "No port modifier is attached to the output";
2871  m_modifier.outputMutex.unlock();
2872  return false;
2873  }
2874  m_modifier.outputModifier->getCarrierParams(param);
2875  m_modifier.outputMutex.unlock();
2876  } else {
2877  m_modifier.inputMutex.lock();
2878  if (m_modifier.inputModifier == nullptr) {
2879  errMsg = "No port modifier is attached to the input";
2880  m_modifier.inputMutex.unlock();
2881  return false;
2882  }
2883  m_modifier.inputModifier->getCarrierParams(param);
2884  m_modifier.inputMutex.unlock();
2885  }
2886  return true;
2887 }
2888 
2889 void PortCore::reportUnit(PortCoreUnit* unit, bool active)
2890 {
2891  YARP_UNUSED(active);
2892  if (unit != nullptr) {
2893  bool isLog = (!unit->getMode().empty());
2894  if (isLog) {
2895  m_logNeeded = true;
2896  }
2897  }
2898 }
2899 
2900 bool PortCore::setProcessSchedulingParam(int priority, int policy)
2901 {
2902 #if defined(__linux__)
2903  // set the sched properties of all threads within the process
2904  struct sched_param sch_param;
2905  sch_param.__sched_priority = priority;
2906 
2907  DIR* dir;
2908  char path[PATH_MAX];
2909  sprintf(path, "/proc/%d/task/", yarp::os::impl::getpid());
2910 
2911  dir = opendir(path);
2912  if (dir == nullptr) {
2913  return false;
2914  }
2915 
2916  struct dirent* d;
2917  char* end;
2918  long tid = 0;
2919  bool ret = true;
2920  while ((d = readdir(dir)) != nullptr) {
2921  if (isdigit(static_cast<unsigned char>(*d->d_name)) == 0) {
2922  continue;
2923  }
2924 
2925  tid = strtol(d->d_name, &end, 10);
2926  if (d->d_name == end || ((end != nullptr) && (*end != 0))) {
2927  closedir(dir);
2928  return false;
2929  }
2930  ret &= (sched_setscheduler(static_cast<pid_t>(tid), policy, &sch_param) == 0);
2931  }
2932  closedir(dir);
2933  return ret;
2934 #elif defined(YARP_HAS_ACE) // for other platforms
2935  // TODO: Check how to set the scheduling properties of all process's threads in Windows
2936  ACE_Sched_Params param(policy, (ACE_Sched_Priority)priority, ACE_SCOPE_PROCESS);
2937  int ret = ACE_OS::set_scheduling_params(param, yarp::os::impl::getpid());
2938  return (ret != -1);
2939 #else
2940  return false;
2941 #endif
2942 }
2943 
2945 {
2946  m_stateMutex.lock();
2947  if (!readOnly) {
2948  if (m_prop == nullptr) {
2949  m_prop = new Property();
2950  }
2951  }
2952  return m_prop;
2953 }
2954 
2956 {
2957  YARP_UNUSED(prop);
2958  m_stateMutex.unlock();
2959 }
2960 
2961 bool PortCore::removeIO(const Route& route, bool synch)
2962 {
2963  return removeUnit(route, synch);
2964 }
2965 
2966 void PortCore::setName(const std::string& name)
2967 {
2968  m_name = name;
2969 }
2970 
2971 std::string PortCore::getName()
2972 {
2973  return m_name;
2974 }
2975 
2976 int PortCore::getNextIndex()
2977 {
2978  int result = m_counter;
2979  m_counter++;
2980  if (m_counter < 0) {
2981  m_counter = 1;
2982  }
2983  return result;
2984 }
2985 
2987 {
2988  return m_address;
2989 }
2990 
2991 void PortCore::resetPortName(const std::string& str)
2992 {
2993  m_address.setName(str);
2994 }
2995 
2997 {
2998  return m_readableCreator;
2999 }
3000 
3002 {
3003  m_controlRegistration = flag;
3004 }
3005 
3007 {
3008  return m_listening.load();
3009 }
3010 
3012 {
3013  return m_manual;
3014 }
3015 
3017 {
3018  return m_interrupted;
3019 }
3020 
3021 void PortCore::setTimeout(float timeout)
3022 {
3023  m_timeout = timeout;
3024 }
3025 
3026 #ifndef YARP_NO_DEPRECATED // since YARP 3.3
3030 {
3031  removeCallbackLock();
3032  if (mutex != nullptr) {
3033  m_old_mutex = mutex;
3034  m_mutexOwned = false;
3035  } else {
3036  m_old_mutex = new yarp::os::Mutex();
3037  m_mutexOwned = true;
3038  }
3039  return true;
3040 }
3042 #endif // YARP_NO_DEPRECATED
3043 
3044 bool PortCore::setCallbackLock(std::mutex* mutex)
3045 {
3046  removeCallbackLock();
3047  if (mutex != nullptr) {
3048  m_mutex = mutex;
3049  m_mutexOwned = false;
3050  } else {
3051  m_mutex = new std::mutex;
3052  m_mutexOwned = true;
3053  }
3054  return true;
3055 }
3056 
3058 {
3059  if (m_mutexOwned && (m_mutex != nullptr)) {
3060  delete m_mutex;
3061  }
3062  m_mutex = nullptr;
3063 #ifndef YARP_NO_DEPRECATED // since YARP 3.3
3064  m_old_mutex = nullptr;
3065 #endif // YARP_NO_DEPRECATED
3066  m_mutexOwned = false;
3067  return true;
3068 }
3069 
3071 {
3072  if (m_mutex == nullptr) {
3073 #ifndef YARP_NO_DEPRECATED // since YARP 3.3
3074  if (m_old_mutex == nullptr) {
3075  return false;
3076  }
3077  m_old_mutex->lock();
3078  return true;
3079 #else // YARP_NO_DEPRECATED
3080  return false;
3081 #endif // YARP_NO_DEPRECATED
3082  }
3083  m_mutex->lock();
3084  return true;
3085 }
3086 
3088 {
3089  if (m_mutex == nullptr) {
3090 #ifndef YARP_NO_DEPRECATED // since YARP 3.3
3091  if (m_old_mutex == nullptr) {
3092  return true;
3093  }
3094  return m_old_mutex->try_lock();
3095 #else // YARP_NO_DEPRECATED
3096  return true;
3097 #endif // YARP_NO_DEPRECATED
3098  }
3099  return m_mutex->try_lock();
3100 }
3101 
3103 {
3104  if (m_mutex == nullptr) {
3105 #ifndef YARP_NO_DEPRECATED // since YARP 3.3
3106  if (m_old_mutex == nullptr) {
3107  return;
3108  }
3109  return m_old_mutex->unlock();
3110 #else // YARP_NO_DEPRECATED
3111  return;
3112 #endif // YARP_NO_DEPRECATED
3113  }
3114  m_mutex->unlock();
3115 }
3116 
3118 {
3119  return m_modifier;
3120 }
3121 
3123 {
3124  m_typeMutex.lock();
3125  if (!m_checkedType) {
3126  if (!m_type.isValid()) {
3127  m_type = reader.getReadType();
3128  }
3129  m_checkedType = true;
3130  }
3131  m_typeMutex.unlock();
3132 }
3133 
3135 {
3136  m_typeMutex.lock();
3137  Type t = m_type;
3138  m_typeMutex.unlock();
3139  return t;
3140 }
3141 
3142 void PortCore::promiseType(const Type& typ)
3143 {
3144  m_typeMutex.lock();
3145  m_type = typ;
3146  m_typeMutex.unlock();
3147 }
float t
bool ret
#define yAssert(x)
Definition: Log.h:294
static bool __tcp_check(const Contact &c)
Definition: PortCore.cpp:1507
static bool __pc_rpc(const Contact &c, const char *carrier, Bottle &writer, Bottle &reader, bool verbose)
Definition: PortCore.cpp:1491
#define PORTCORE_IS_INPUT
Definition: PortCore.h:48
#define PORTCORE_SEND_LOG
Definition: PortCore.h:43
#define PORTCORE_IS_RPC
Definition: PortCore.h:47
#define PORTCORE_IS_OUTPUT
Definition: PortCore.h:49
#define PORTCORE_SEND_NORMAL
Definition: PortCore.h:42
static bool rpc(const Contact &c, const char *carrier, Bottle &writer, Bottle &reader)
Definition: RosLookup.cpp:19
A simple collection of objects that can be described and transmitted in a portable way.
Definition: Bottle.h:74
void add(const Value &value)
Add a Value to the bottle, at the end of the list.
Definition: Bottle.cpp:336
void addVocab32(yarp::conf::vocab32_t x)
Places a vocabulary item in the bottle, at the end of the list.
Definition: Bottle.cpp:164
void fromString(const std::string &text)
Initializes bottle from a string.
Definition: Bottle.cpp:204
Property & addDict()
Places an empty key/value object in the bottle, at the end of the list.
Definition: Bottle.cpp:188
Bottle & addList()
Places an empty nested list in the bottle, at the end of the list.
Definition: Bottle.cpp:182
size_type size() const
Gets the number of elements in the bottle.
Definition: Bottle.cpp:251
bool read(ConnectionReader &reader) override
Set the bottle's value based on input from a network connection.
Definition: Bottle.cpp:240
Value & get(size_type index) const
Reads a Value v from a certain part of the list.
Definition: Bottle.cpp:246
Bottle & findGroup(const std::string &key) const override
Gets a list corresponding to a given keyword.
Definition: Bottle.cpp:302
bool check(const std::string &key) const override
Check if there exists a property of the given name.
Definition: Bottle.cpp:277
bool write(ConnectionWriter &writer) const override
Output a representation of the bottle to a network connection.
Definition: Bottle.cpp:230
void addInt32(std::int32_t x)
Places a 32-bit integer in the bottle, at the end of the list.
Definition: Bottle.cpp:140
void addString(const char *str)
Places a string in the bottle, at the end of the list.
Definition: Bottle.cpp:170
std::string toString() const override
Gives a human-readable textual representation of the bottle.
Definition: Bottle.cpp:211
Value & find(const std::string &key) const override
Gets a value corresponding to a given keyword.
Definition: Bottle.cpp:287
A base class for connection types (tcp, mcast, shmem, ...) which are called carriers in YARP.
Definition: Carrier.h:45
bool isConnectionless() const override=0
Check if this carrier is connectionless (like udp, mcast) or connection based (like tcp).
bool isPush() const override
Check if carrier is "push" or "pull" style.
Definition: Carrier.cpp:23
void setCarrierParams(const Property &params) override
Configure carrier from port administrative commands.
Definition: Carrier.cpp:122
static Face * listen(const Contact &address)
Create a "proto-carrier" interface object that waits for incoming connections prior to a carrier bein...
Definition: Carriers.cpp:250
static Carrier * getCarrierTemplate(const std::string &name)
Get template for carrier.
Definition: Carriers.cpp:238
static Carrier * chooseCarrier(const std::string &name)
Select a carrier by name.
Definition: Carriers.cpp:233
static OutputProtocol * connect(const Contact &address)
Initiate a connection to an address.
Definition: Carriers.cpp:282
An interface for reading from a network connection.
virtual void requestDrop()=0
Tag the connection to be dropped after the current message.
virtual ConnectionWriter * getWriter()=0
Gets a way to reply to the message, if possible.
An interface for writing to a network connection.
virtual bool isPush() const =0
Check if carrier is "push" or "pull" style.
Preferences for how to communicate with a contact.
Definition: ContactStyle.h:24
double timeout
Set a timeout for communication (in units of seconds, fractional seconds allowed).
Definition: ContactStyle.h:47
bool quiet
Suppress all outputs and warnings.
Definition: ContactStyle.h:36
std::string carrier
Request that communication be made using a particular carrier.
Definition: ContactStyle.h:53
Represents how to reach a part of a YARP network.
Definition: Contact.h:36
bool isValid() const
Checks if a Contact is tagged as valid.
Definition: Contact.cpp:298
std::string getRegName() const
Get the name associated with this Contact.
Definition: Contact.cpp:217
static Contact fromString(const std::string &txt)
Factory method.
Definition: Contact.cpp:139
std::string toURI(bool includeCarrier=true) const
Get a representation of the Contact as a URI.
Definition: Contact.cpp:313
int getPort() const
Get the port number associated with this Contact for socket communication.
Definition: Contact.cpp:239
void setTimeout(float timeout)
Set timeout for this Contact.
Definition: Contact.cpp:282
std::string getCarrier() const
Get the carrier associated with this Contact for socket communication.
Definition: Contact.cpp:250
std::string getHost() const
Get the host name associated with this Contact for socket communication.
Definition: Contact.cpp:228
A dummy connection to test yarp::os::Portable implementations.
ConnectionWriter & getWriter()
Get the dummy ConnectionWriter loaded with whatever was written the ConnectionWriter since it was las...
ConnectionReader & getReader(ConnectionWriter *replyWriter=nullptr)
Get the dummy ConnectionReader loaded with whatever was written the ConnectionWriter since it was las...
The input side of an active connection between two ports.
Definition: InputProtocol.h:35
virtual void close()=0
Negotiate an end to operations.
virtual bool setTimeout(double timeout)=0
Set the timeout to be used for network operations.
virtual OutputProtocol & getOutput()=0
Get an interface for doing write operations on the connection.
virtual void attachPort(Contactable *port)=0
Set the port to be associated with the connection.
Basic wrapper for mutual exclusion.
Definition: Mutex.h:32
Simple abstraction for a YARP port name.
Definition: Name.h:19
Contact toAddress() const
Create an address from the name.
Definition: Name.cpp:27
std::string getCarrierModifier(const char *mod, bool *hasModifier=nullptr)
Definition: Name.cpp:44
A placeholder for rich contact information.
Definition: NestedContact.h:24
std::string getNodeName() const
static bool initialized()
Returns true if YARP has been fully initialized.
Definition: Network.cpp:1389
static bool getLocalMode()
Get current value of flag "localMode", see setLocalMode function.
Definition: Network.cpp:1054
static Contact unregisterName(const std::string &name)
Removes the registration for a name from the name server.
Definition: Network.cpp:1023
static NameStore * getQueryBypass()
Definition: Network.cpp:1412
static Contact queryName(const std::string &name)
Find out information about a registered name.
Definition: Network.cpp:995
static int disconnectInput(const std::string &src, const std::string &dest, bool silent=false)
Sends a disconnection command to the specified port.
Definition: Network.cpp:1552
static bool writeToNameServer(PortWriter &cmd, PortReader &reply, const ContactStyle &style)
Variant write method specialized to name server.
Definition: Network.cpp:1983
static bool disconnect(const std::string &src, const std::string &dest, bool quiet)
Request that an output port disconnect from an input port.
Definition: Network.cpp:700
static bool write(const Contact &contact, PortWriter &cmd, PortReader &reply, bool admin=false, bool quiet=false, double timeout=-1)
Send a single command to a port and await a single response.
Definition: Network.cpp:1226
The output side of an active connection between two ports.
virtual const Route & getRoute() const =0
virtual Connection & getConnection()=0
Get the connection whose protocol operations we are managing.
virtual bool open(const Route &route)=0
Start negotiating a carrier, using the given route (this should generally match the name of the sendi...
virtual void attachPort(Contactable *port)=0
Set the port to be associated with the connection.
virtual OutputStream & getOutputStream()=0
Access the output stream associated with the connection.
virtual InputProtocol & getInput()=0
Get an interface for doing read operations on the connection.
virtual void close()=0
Negotiate an end to operations.
virtual void rename(const Route &route)=0
Relabel the route after the fact (e.g.
virtual bool setTimeout(double timeout)=0
Set the timeout to be used for network operations.
virtual bool isOk() const =0
Check if the connection is valid and can be used.
virtual bool write(SizedWriter &writer)=0
Write a message on the connection.
Simple specification of the minimum functions needed from output streams.
Definition: OutputStream.h:22
virtual int getTypeOfService()
virtual bool setTypeOfService(int tos)
Information about a port connection or event.
Definition: PortInfo.h:26
std::string targetName
Name of connection target, if any.
Definition: PortInfo.h:63
std::string carrierName
Name of protocol type, if releveant.
Definition: PortInfo.h:66
bool incoming
True if a connection is incoming, false if outgoing.
Definition: PortInfo.h:51
std::string message
A human-readable description of contents.
Definition: PortInfo.h:69
std::string portName
Name of port.
Definition: PortInfo.h:57
int tag
Type of information.
Definition: PortInfo.h:48
std::string sourceName
Name of connection source, if any.
Definition: PortInfo.h:60
@ PORTINFO_CONNECTION
Information about an incoming or outgoing connection.
Definition: PortInfo.h:40
@ PORTINFO_MISC
Unspecified information.
Definition: PortInfo.h:43
A creator for readers.
Interface implemented by all objects that can read themselves from the network, such as Bottle object...
Definition: PortReader.h:25
virtual bool read(ConnectionReader &reader)=0
Read this object from a network connection.
virtual Type getReadType() const
Definition: PortReader.cpp:12
A base class for objects that want information about port status changes.
Definition: PortReport.h:28
virtual void report(const PortInfo &info)=0
Callback for port event/state information.
Interface implemented by all objects that can write themselves to the network, such as Bottle objects...
Definition: PortWriter.h:24
virtual bool write(ConnectionWriter &writer) const =0
Write this object to a network connection.
virtual void onCommencement() const
This is called when the port is about to begin writing operations.
Definition: PortWriter.cpp:17
A class for storing options and configuration information.
Definition: Property.h:34
Value & find(const std::string &key) const override
Gets a value corresponding to a given keyword.
Definition: Property.cpp:1051
std::string toString() const override
Return a standard text representation of the content of the object.
Definition: Property.cpp:1069
void fromString(const std::string &txt, bool wipe=true)
Interprets a string as a list of properties.
Definition: Property.cpp:1063
void put(const std::string &key, const std::string &value)
Associate the given key with the given string.
Definition: Property.cpp:1015
bool check(const std::string &key) const override
Check if there exists a property of the given name.
Definition: Property.cpp:1041
PacketPriorityDSCP
The PacketPriorityDSCP defines the packets quality of service (priority) using DSCP.
Definition: QosStyle.h:46
static PacketPriorityDSCP getDSCPByVocab(yarp::conf::vocab32_t vocab)
returns the IPV4/6 DSCP value given as DSCP code
Definition: QosStyle.cpp:155
static std::string fromRosName(const std::string &name)
Information about a connection between two ports.
Definition: Route.h:29
const std::string & getToName() const
Get the destination of the route.
Definition: Route.cpp:103
const std::string & getCarrierName() const
Get the carrier type of the route.
Definition: Route.cpp:123
std::string toString() const
Render a text form of the route, "source->carrier->dest".
Definition: Route.cpp:138
const std::string & getFromName() const
Get the source of the route.
Definition: Route.cpp:93
void swapNames()
Swap from and to names.
Definition: Route.cpp:133
void setToContact(const Contact &toContact)
Set the destination contact of the route.
Definition: Route.cpp:118
An InputStream that reads from a string.
void add(const std::string &txt)
An OutputStream that produces a string.
static ProcessInfo getProcessInfo(int pid=0)
gets the operating system process information given by its PID.
Definition: SystemInfo.cpp:805
static PlatformInfo getPlatformInfo()
getPlatformInfo
Definition: SystemInfo.cpp:597
A single value (typically within a Bottle).
Definition: Value.h:45
virtual bool isString() const
Checks if value is a string.
Definition: Value.cpp:156
virtual yarp::conf::vocab32_t asVocab32() const
Get vocabulary identifier as an integer.
Definition: Value.cpp:228
virtual std::int32_t asInt32() const
Get 32-bit integer value.
Definition: Value.cpp:204
virtual Bottle * asList() const
Get list value.
Definition: Value.cpp:240
virtual std::string asString() const
Get string value.
Definition: Value.cpp:234
A helper for creating cached object descriptions.
bool write(ConnectionWriter &connection) const override
Write this object to a network connection.
virtual void appendLine(const std::string &data)
Send a string along with a carriage-return-line-feed sequence.
A helper for recording entire message/reply transactions.
void init(yarp::os::ConnectionReader *wrappedReader)
Call this to wrap a specific ConnectionReader.
void fini()
Call this when all reading/writing has been done.
Manager for a single input to a port.
Manager for a single output from a port.
A single message, potentially being transmitted on multiple connections.
void setContent(const yarp::os::PortWriter *writable, bool owned=false, const yarp::os::PortWriter *callback=nullptr, bool ownedCallback=false)
Configure the object being sent and where to send notifications.
void inc()
Increment the usage count for this messagae.
void dec()
Decrement the usage count for this messagae.
This manages a single threaded resource related to a single input or output connection.
Definition: PortCoreUnit.h:27
std::string getPupString() const
Definition: PortCoreUnit.h:234
void setPupped(const std::string &pupString)
Tag this connection as having been created by a publisherUpdate message to the port's administrative ...
Definition: PortCoreUnit.h:248
virtual void * send(const yarp::os::PortWriter &writer, yarp::os::PortReader *reader, const yarp::os::PortWriter *callback, void *tracker, const std::string &envelope, bool waitAfter=true, bool waitBefore=true, bool *gotReply=nullptr)
Send a message on the connection.
Definition: PortCoreUnit.h:127
std::string getMode(bool *hasMode=nullptr)
Read the "mode" of the connection - basically, whether it is used for logging or not.
Definition: PortCoreUnit.h:208
virtual void getCarrierParams(yarp::os::Property &params)
Definition: PortCoreUnit.h:266
void setDoomed()
Request that this connection be shut down as soon as possible.
Definition: PortCoreUnit.h:97
virtual void setCarrierParams(const yarp::os::Property &params)
Set arbitrary parameters for this connection.
Definition: PortCoreUnit.h:258
void notifyCompletion(void *tracker)
Call the right onCompletion() after sending message.
Definition: PortCore.cpp:1433
void resetPortName(const std::string &str)
Definition: PortCore.cpp:2991
std::string getEnvelope()
Definition: PortCore.cpp:1471
void setAdminReadHandler(yarp::os::PortReader &reader)
Set a callback for incoming administrative messages.
Definition: PortCore.cpp:148
int getOutputCount()
Check how many output connections there are.
Definition: PortCore.cpp:1423
bool readBlock(ConnectionReader &reader, void *id, yarp::os::OutputStream *os)
Read a block of regular payload data.
Definition: PortCore.cpp:1199
void setReportCallback(yarp::os::PortReport *reporter)
Set a callback to be notified of changes in port status.
Definition: PortCore.cpp:1171
void run() override
The body of the main thread.
Definition: PortCore.cpp:165
bool start() override
Begin main thread.
Definition: PortCore.cpp:277
void checkType(PortReader &reader)
Definition: PortCore.cpp:3122
void resume()
Undo an interrupt()
Definition: PortCore.cpp:325
void setTimeout(float timeout)
Definition: PortCore.cpp:3021
yarp::os::PortReaderCreator * getReadCreator()
Get the creator of callbacks.
Definition: PortCore.cpp:2996
Property * acquireProperties(bool readOnly)
Definition: PortCore.cpp:2944
void interrupt()
Prepare the port to be shut down.
Definition: PortCore.cpp:331
void setEnvelope(const std::string &envelope)
Set some envelope information to pass along with a message without actually being part of the message...
Definition: PortCore.cpp:1457
bool removeIO(const Route &route, bool synch=false)
Remove any connection matching the supplied route.
Definition: PortCore.cpp:2961
~PortCore()
Destructor.
Definition: PortCore.cpp:61
bool isInterrupted() const
Definition: PortCore.cpp:3016
void report(const yarp::os::PortInfo &info)
Handle a port event (connection, disconnection, etc) Generate a description of the connections associ...
Definition: PortCore.cpp:1185
bool listen(const Contact &address, bool shouldAnnounce=true)
Begin service at a given address.
Definition: PortCore.cpp:68
bool sendHelper(const yarp::os::PortWriter &writer, int mode, yarp::os::PortReader *reader=nullptr, const yarp::os::PortWriter *callback=nullptr)
Send a message with a specific mode (normal or log).
Definition: PortCore.cpp:1272
void removeOutput(const std::string &dest, void *id, yarp::os::OutputStream *os)
Remove an output connection.
Definition: PortCore.cpp:1012
bool isListening() const
Definition: PortCore.cpp:3006
void setControlRegistration(bool flag)
Normally the port will unregister its name with the name server when shutting down.
Definition: PortCore.cpp:3001
bool setCallbackLock(yarp::os::Mutex *mutex)
Definition: PortCore.cpp:3029
int getInputCount()
Check how many input connections there are.
Definition: PortCore.cpp:1414
bool manualStart(const char *sourceName)
Start up the port, but without a main thread.
Definition: PortCore.cpp:308
void setReadHandler(yarp::os::PortReader &reader)
Set a callback for incoming data.
Definition: PortCore.cpp:140
void reportUnit(PortCoreUnit *unit, bool active)
Called by a connection handler with active=true just after it is fully configured,...
Definition: PortCore.cpp:2889
void promiseType(const Type &typ)
Definition: PortCore.cpp:3142
void releaseProperties(Property *prop)
Definition: PortCore.cpp:2955
int getEventCount()
A diagnostic for testing purposes.
Definition: PortCore.cpp:532
void close() override
Shut down port.
Definition: PortCore.cpp:264
const Contact & getAddress() const
Get the address associated with the port.
Definition: PortCore.cpp:2986
void describe(void *id, yarp::os::OutputStream *os)
Produce a text description of the port and its connections.
Definition: PortCore.cpp:1048
yarp::os::Type getType()
Definition: PortCore.cpp:3134
bool isWriting()
Check if a message is currently being sent.
Definition: PortCore.cpp:1395
bool adminBlock(ConnectionReader &reader, void *id)
Read a block of administrative data.
Definition: PortCore.cpp:1659
void removeInput(const std::string &src, void *id, yarp::os::OutputStream *os)
Remove an input connection.
Definition: PortCore.cpp:1030
bool send(const yarp::os::PortWriter &writer, yarp::os::PortReader *reader=nullptr, const yarp::os::PortWriter *callback=nullptr)
Send a normal message.
Definition: PortCore.cpp:1248
void resetReportCallback()
Reset the callback to be notified of changes in port status.
Definition: PortCore.cpp:1179
void setReadCreator(yarp::os::PortReaderCreator &creator)
Set a callback for creating callbacks for incoming data.
Definition: PortCore.cpp:156
yarp::os::impl::PortDataModifier & getPortModifier()
Definition: PortCore.cpp:3117
void setName(const std::string &name)
Set the name of this port.
Definition: PortCore.cpp:2966
bool addOutput(const std::string &dest, void *id, yarp::os::OutputStream *os, bool onlyIfNeeded=false)
Add an output connection to this port.
Definition: PortCore.cpp:842
This is the heart of a yarp port.
Definition: PortCore.h:110
Lets Readable objects read from the underlying InputStream associated with the connection between two...
void reset(yarp::os::InputStream &in, TwoWayStream *str, const Route &route, size_t len, bool textMode, bool bareMode=false)
int join(double seconds=-1)
Definition: ThreadImpl.cpp:120
int setPriority(int priority=-1, int policy=-1)
Definition: ThreadImpl.cpp:246
#define yCError(component,...)
Definition: LogComponent.h:154
#define yCAssert(component, x)
Definition: LogComponent.h:169
#define yCTrace(component,...)
Definition: LogComponent.h:85
#define yCWarning(component,...)
Definition: LogComponent.h:143
#define yCDebug(component,...)
Definition: LogComponent.h:109
#define YARP_OS_LOG_COMPONENT(name, name_string)
Definition: LogComponent.h:34
std::string get_string(const std::string &key, bool *found=nullptr)
Read a string from an environment variable.
Definition: environment.h:68
std::string to_string(IntegerType x)
Definition: numeric.h:115
ContainerT::value_type join(typename ContainerT::const_iterator begin, typename ContainerT::const_iterator end, const typename ContainerT::value_type &separator=", ")
Utility to join the elements in a container to a single string separated by a separator.
Definition: string.h:93
std::int32_t vocab32_t
Definition: numeric.h:78
The components from which ports and connections are built.
An interface to the operating system, including Port based communication.
int getpid()
Portable wrapper for the getppid() function.
Definition: Os.cpp:91
std::int32_t NetInt32
Definition of the NetInt32 type.
Definition: NetInt32.h:30
constexpr yarp::conf::vocab32_t createVocab32(char a, char b=0, char c=0, char d=0)
Create a vocab from chars.
Definition: Vocab.h:28
The main, catch-all namespace for YARP.
Definition: dirs.h:16
The PlatformInfo struct holds the operating system information.
Definition: SystemInfo.h:81
The ProcessInfo struct provides the operating system process information.
Definition: SystemInfo.h:113
#define YARP_WARNING_POP
Ends a temporary alteration of the enabled warnings.
Definition: system.h:332
#define YARP_WARNING_PUSH
Starts a temporary alteration of the enabled warnings.
Definition: system.h:331
#define YARP_DISABLE_DEPRECATED_WARNING
Disable deprecated warnings in the following code.
Definition: system.h:333
#define YARP_UNUSED(var)
Definition: api.h:162