YARP
Yet Another Robot Platform
PortCore.cpp
Go to the documentation of this file.
1 /*
2  * Copyright (C) 2006-2020 Istituto Italiano di Tecnologia (IIT)
3  * Copyright (C) 2006-2010 RobotCub Consortium
4  * All rights reserved.
5  *
6  * This software may be modified and distributed under the terms of the
7  * BSD-3-Clause license. See the accompanying LICENSE file for details.
8  */
9 
10 #include <yarp/os/impl/PortCore.h>
11 
12 #include <yarp/conf/environment.h>
13 
14 #include <yarp/os/Bottle.h>
15 #include <yarp/os/DummyConnector.h>
16 #include <yarp/os/InputProtocol.h>
17 #include <yarp/os/Name.h>
18 #include <yarp/os/Network.h>
19 #include <yarp/os/PortInfo.h>
20 #include <yarp/os/RosNameSpace.h>
22 #include <yarp/os/SystemInfo.h>
23 #include <yarp/os/Time.h>
31 
32 #include <cstdio>
33 #include <functional>
34 #include <random>
35 #include <regex>
36 #include <vector>
37 
38 #ifdef YARP_HAS_ACE
39 # include <ace/INET_Addr.h>
40 # include <ace/Sched_Params.h>
41 // In one the ACE headers there is a definition of "main" for WIN32
42 # ifdef main
43 # undef main
44 # endif
45 #endif
46 
47 #if defined(__linux__) // used for configuring scheduling properties
48 # include <dirent.h>
49 # include <sys/types.h>
50 # include <unistd.h>
51 #endif
52 
53 
54 using namespace yarp::os::impl;
55 using namespace yarp::os;
56 using namespace yarp;
57 
58 namespace {
59 YARP_OS_LOG_COMPONENT(PORTCORE, "yarp.os.impl.PortCore")
60 } // namespace
61 
62 PortCore::PortCore() = default;
63 
65 {
66  close();
67  removeCallbackLock();
68 }
69 
70 
71 bool PortCore::listen(const Contact& address, bool shouldAnnounce)
72 {
73  yCDebug(PORTCORE, "Starting listening on %s", address.toURI().c_str());
74  // If we're using ACE, we really need to have it initialized before
75  // this point.
76  if (!NetworkBase::initialized()) {
77  yCError(PORTCORE, "YARP not initialized; create a yarp::os::Network object before using ports");
78  return false;
79  }
80 
81  yCTrace(PORTCORE, "listen");
82 
83  m_stateSemaphore.wait();
84 
85  // This method assumes we are not already on the network.
86  // We can assume this because it is not a user-facing class,
87  // and we carefully never call this method again without
88  // calling close().
89  yCAssert(PORTCORE, !m_listening);
90  yCAssert(PORTCORE, !m_running);
91  yCAssert(PORTCORE, !m_closing);
92  yCAssert(PORTCORE, !m_finished);
93  yCAssert(PORTCORE, m_face == nullptr);
94 
95  // Try to put the port on the network, using the user-supplied
96  // address (which may be incomplete). You can think of
97  // this as getting a server socket.
98  m_address = address;
99  setName(address.getRegName());
100  if (m_timeout > 0) {
101  m_address.setTimeout(m_timeout);
102  }
103  m_face = Carriers::listen(m_address);
104 
105  // We failed, abort.
106  if (m_face == nullptr) {
107  m_stateSemaphore.post();
108  return false;
109  }
110 
111  // Update our address if it was incomplete.
112  if (m_address.getPort() <= 0) {
113  m_address = m_face->getLocalAddress();
114  if (m_address.getRegName() == "...") {
115  m_address.setName(std::string("/") + m_address.getHost() + "_" + NetType::toString(m_address.getPort()));
116  setName(m_address.getRegName());
117  }
118  }
119 
120  // Move into listening phase
121  m_listening = true;
122  m_stateSemaphore.post();
123 
124  // Now that we are on the network, we can let the name server know this.
125  if (shouldAnnounce) {
126  if (!(NetworkBase::getLocalMode() && NetworkBase::getQueryBypass() == nullptr)) {
127  std::string portName = address.getRegName();
128  Bottle cmd;
129  Bottle reply;
130  cmd.addString("announce");
131  cmd.addString(portName);
132  ContactStyle style;
133  NetworkBase::writeToNameServer(cmd, reply, style);
134  }
135  }
136 
137  // Success!
138  return true;
139 }
140 
141 
143 {
144  // Don't even try to do this when the port is hot, it'll burn you
145  yCAssert(PORTCORE, !m_running);
146  yCAssert(PORTCORE, m_reader == nullptr);
147  m_reader = &reader;
148 }
149 
151 {
152  // Don't even try to do this when the port is hot, it'll burn you
153  yCAssert(PORTCORE, !m_running);
154  yCAssert(PORTCORE, m_adminReader == nullptr);
155  m_adminReader = &reader;
156 }
157 
159 {
160  // Don't even try to do this when the port is hot, it'll burn you
161  yCAssert(PORTCORE, !m_running);
162  yCAssert(PORTCORE, m_readableCreator == nullptr);
163  m_readableCreator = &creator;
164 }
165 
166 
168 {
169  yCTrace(PORTCORE, "run");
170 
171  // This is the server thread for the port. We listen on
172  // the network and handle any incoming connections.
173  // We don't touch those connections, just shove them
174  // in a list and move on. It is important that this
175  // thread doesn't make a connecting client wait just
176  // because some other client is slow.
177 
178  // We assume that listen() has succeeded and that
179  // start() has been called.
180  yCAssert(PORTCORE, m_listening);
181  yCAssert(PORTCORE, !m_running);
182  yCAssert(PORTCORE, !m_closing);
183  yCAssert(PORTCORE, !m_finished);
184  yCAssert(PORTCORE, m_starting);
185 
186  // Enter running phase
187  m_running = true;
188  m_starting = false;
189 
190  // This post is matched with a wait in start()
191  m_stateSemaphore.post();
192 
193  yCTrace(PORTCORE, "run running");
194 
195  // Enter main loop, where we block on incoming connections.
196  // The loop is exited when PortCore#closing is set. One last
197  // connection will be needed to de-block this thread and ensure
198  // that it checks PortCore#closing.
199  bool shouldStop = false;
200  while (!shouldStop) {
201 
202  // Block and wait for a connection
203  InputProtocol* ip = m_face->read();
204 
205  m_stateSemaphore.wait();
206 
207  // Attach the connection to this port and update its timeout setting
208  if (ip != nullptr) {
209  ip->attachPort(m_contactable);
210  yCDebug(PORTCORE, "received something");
211  if (m_timeout > 0) {
212  ip->setTimeout(m_timeout);
213  }
214  }
215 
216  // Check whether we should shut down
217  shouldStop |= m_closing;
218 
219  // Increment a global count of connection events
220  m_events++;
221 
222  m_stateSemaphore.post();
223 
224  // It we are not shutting down, spin off the connection.
225  // It starts life as an input connection (although it
226  // may later morph into an output).
227  if (!shouldStop) {
228  if (ip != nullptr) {
229  addInput(ip);
230  }
231  yCDebug(PORTCORE, "spun off a connection");
232  ip = nullptr;
233  }
234 
235  // If the connection wasn't spun off, just shut it down.
236  if (ip != nullptr) {
237  ip->close();
238  delete ip;
239  ip = nullptr;
240  }
241 
242  // Remove any defunct connections.
243  reapUnits();
244 
245  // Notify anyone listening for connection changes.
246  // This should be using a condition variable once we have them,
247  // this is not solid TODO
248  m_stateSemaphore.wait();
249  for (int i = 0; i < m_connectionListeners; i++) {
250  m_connectionChangeSemaphore.post();
251  }
252  m_connectionListeners = 0;
253  m_stateSemaphore.post();
254  }
255 
256  yCTrace(PORTCORE, "run closing");
257 
258  // The server thread is shutting down.
259  m_stateSemaphore.wait();
260  for (int i = 0; i < m_connectionListeners; i++) {
261  m_connectionChangeSemaphore.post();
262  }
263  m_connectionListeners = 0;
264  m_finished = true;
265  m_stateSemaphore.post();
266 }
267 
268 
270 {
271  closeMain();
272 
273  if (m_prop != nullptr) {
274  delete m_prop;
275  m_prop = nullptr;
276  }
277  m_modifier.releaseOutModifier();
278  m_modifier.releaseInModifier();
279 }
280 
281 
283 {
284  yCTrace(PORTCORE, "start");
285 
286  // This wait will, on success, be matched by a post in run()
287  m_stateSemaphore.wait();
288 
289  // We assume that listen() has been called.
290  yCAssert(PORTCORE, m_listening);
291  yCAssert(PORTCORE, !m_running);
292  yCAssert(PORTCORE, !m_starting);
293  yCAssert(PORTCORE, !m_finished);
294  yCAssert(PORTCORE, !m_closing);
295  m_starting = true;
296 
297  // Start the server thread.
298  bool started = ThreadImpl::start();
299  if (!started) {
300  // run() won't be happening
301  m_stateSemaphore.post();
302  } else {
303  // run() will signal stateSema once it is active
304  m_stateSemaphore.wait();
305  yCAssert(PORTCORE, m_running);
306 
307  // release stateSema for its normal task of controlling access to state
308  m_stateSemaphore.post();
309  }
310  return started;
311 }
312 
313 
314 bool PortCore::manualStart(const char* sourceName)
315 {
316  yCTrace(PORTCORE, "manualStart");
317 
318  // This variant of start() does not create a server thread.
319  // That is appropriate if we never expect to receive incoming
320  // connections for any reason. No incoming data, no requests
321  // for state information, no requests to change connections,
322  // nothing. We set the port's name to something fake, and
323  // act like nothing is wrong.
324  m_interruptable = false;
325  m_manual = true;
326  setName(sourceName);
327  return true;
328 }
329 
330 
332 {
333  // We are no longer interrupted.
334  m_interrupted = false;
335 }
336 
338 {
339  yCTrace(PORTCORE, "interrupt");
340 
341  // This is a no-op if there is no server thread.
342  if (!m_listening) {
343  return;
344  }
345 
346  // Ignore any future incoming data
347  m_interrupted = true;
348 
349  // What about data that is already coming in?
350  // If interruptable is not currently set, no worries, the user
351  // did not or will not end up blocked on a read.
352  if (!m_interruptable) {
353  return;
354  }
355 
356  // Since interruptable is set, it is possible that the user
357  // may be blocked on a read. We send an empty message,
358  // which is reserved for giving blocking readers a chance to
359  // update their state.
360  m_stateSemaphore.wait();
361  if (m_reader != nullptr) {
362  yCDebug(PORTCORE, "sending update-state message to listener");
364  lockCallback();
365  m_reader->read(sbr);
366  unlockCallback();
367  }
368  m_stateSemaphore.post();
369 }
370 
371 
372 void PortCore::closeMain()
373 {
374  yCTrace(PORTCORE, "closeMain");
375 
376  m_stateSemaphore.wait();
377 
378  // We may not have anything to do.
379  if (m_finishing || !(m_running || m_manual)) {
380  yCTrace(PORTCORE, "closeMain - nothing to do");
381  m_stateSemaphore.post();
382  return;
383  }
384 
385  yCTrace(PORTCORE, "closeMain - Central");
386 
387  // Move into official "finishing" phase.
388  m_finishing = true;
389  yCDebug(PORTCORE, "now preparing to shut down port");
390  m_stateSemaphore.post();
391 
392  // Start disconnecting inputs. We ask the other side of the
393  // connection to do this, so it won't come as a surprise.
394  // The details of how disconnection works vary by carrier.
395  // While we are doing this, the server thread may be still running.
396  // This is because other ports may need to talk to the server
397  // to organize details of how a connection should be broken down.
398  bool done = false;
399  std::string prevName;
400  while (!done) {
401  done = true;
402  std::string removeName;
403  m_stateSemaphore.wait();
404  for (auto* unit : m_units) {
405  if ((unit != nullptr) && unit->isInput() && !unit->isDoomed()) {
406  Route r = unit->getRoute();
407  std::string s = r.getFromName();
408  if (s.length() >= 1 && s[0] == '/' && s != getName() && s != prevName) {
409  removeName = s;
410  done = false;
411  break;
412  }
413  }
414  }
415  m_stateSemaphore.post();
416  if (!done) {
417  yCDebug(PORTCORE, "requesting removal of connection from %s", removeName.c_str());
418  bool result = NetworkBase::disconnect(removeName,
419  getName(),
420  true);
421  if (!result) {
423  removeName,
424  true);
425  }
426  prevName = removeName;
427  }
428  }
429 
430  // Start disconnecting outputs. We don't negotiate with the
431  // other side, we just break them down.
432  done = false;
433  while (!done) {
434  done = true;
435  Route removeRoute;
436  m_stateSemaphore.wait();
437  for (auto* unit : m_units) {
438  if ((unit != nullptr) && unit->isOutput() && !unit->isFinished()) {
439  removeRoute = unit->getRoute();
440  if (removeRoute.getFromName() == getName()) {
441  done = false;
442  break;
443  }
444  }
445  }
446  m_stateSemaphore.post();
447  if (!done) {
448  removeUnit(removeRoute, true);
449  }
450  }
451 
452  m_stateSemaphore.wait();
453  bool stopRunning = m_running;
454  m_stateSemaphore.post();
455 
456  // If the server thread is still running, we need to bring it down.
457  if (stopRunning) {
458  // Let the server thread know we no longer need its services.
459  m_stateSemaphore.wait();
460  m_closing = true;
461  m_stateSemaphore.post();
462 
463  // Wake up the server thread the only way we can, by sending
464  // a message to it. Then join it, it is done.
465  if (!m_manual) {
466  OutputProtocol* op = m_face->write(m_address);
467  if (op != nullptr) {
468  op->close();
469  delete op;
470  }
471  join();
472  }
473 
474  // We should be finished now.
475  m_stateSemaphore.wait();
476  yCAssert(PORTCORE, m_finished);
477  m_stateSemaphore.post();
478 
479  // Clean up our connection list. We couldn't do this earlier,
480  // because the server thread was running.
481  closeUnits();
482 
483  // Reset some state flags.
484  m_stateSemaphore.wait();
485  m_finished = false;
486  m_closing = false;
487  m_running = false;
488  m_stateSemaphore.post();
489  }
490 
491  // There should be no other threads at this point and we
492  // can stop listening on the network.
493  if (m_listening) {
494  yCAssert(PORTCORE, m_face != nullptr);
495  m_face->close();
496  delete m_face;
497  m_face = nullptr;
498  m_listening = false;
499  }
500 
501  // Check if the client is waiting for input. If so, wake them up
502  // with the bad news. An empty message signifies a request to
503  // check the port state.
504  if (m_reader != nullptr) {
505  yCDebug(PORTCORE, "sending end-of-port message to listener");
507  m_reader->read(sbr);
508  m_reader = nullptr;
509  }
510 
511  // We may need to unregister the port with the name server.
512  if (stopRunning) {
513  std::string name = getName();
514  if (name != std::string("")) {
515  if (m_controlRegistration) {
517  }
518  }
519  }
520 
521  // We are done with the finishing process.
522  m_finishing = false;
523 
524  // We are fresh as a daisy.
525  yCAssert(PORTCORE, !m_listening);
526  yCAssert(PORTCORE, !m_running);
527  yCAssert(PORTCORE, !m_starting);
528  yCAssert(PORTCORE, !m_closing);
529  yCAssert(PORTCORE, !m_finished);
530  yCAssert(PORTCORE, !m_finishing);
531  yCAssert(PORTCORE, m_face == nullptr);
532 }
533 
534 
536 {
537  // How many times has the server thread spun off a connection.
538  m_stateSemaphore.wait();
539  int ct = m_events;
540  m_stateSemaphore.post();
541  return ct;
542 }
543 
544 
545 void PortCore::closeUnits()
546 {
547  // Empty the PortCore#units list. This is only possible when
548  // the server thread is finished.
549  m_stateSemaphore.wait();
550  yCAssert(PORTCORE, m_finished);
551  m_stateSemaphore.post();
552 
553  // In the "finished" phase, nobody else touches the units,
554  // so we can go ahead and shut them down and delete them.
555  for (auto& i : m_units) {
556  PortCoreUnit* unit = i;
557  if (unit != nullptr) {
558  yCDebug(PORTCORE, "closing a unit");
559  unit->close();
560  yCDebug(PORTCORE, "joining a unit");
561  unit->join();
562  delete unit;
563  yCDebug(PORTCORE, "deleting a unit");
564  i = nullptr;
565  }
566  }
567 
568  // Get rid of all our nulls. Done!
569  m_units.clear();
570 }
571 
572 void PortCore::reapUnits()
573 {
574  // Connections that should be shut down get tagged as "doomed"
575  // but aren't otherwise touched until it is safe to do so.
576  m_stateSemaphore.wait();
577  if (!m_finished) {
578  for (auto* unit : m_units) {
579  if ((unit != nullptr) && unit->isDoomed() && !unit->isFinished()) {
580  std::string s = unit->getRoute().toString();
581  yCDebug(PORTCORE, "Informing connection %s that it is doomed", s.c_str());
582  unit->close();
583  yCDebug(PORTCORE, "Closed connection %s", s.c_str());
584  unit->join();
585  yCDebug(PORTCORE, "Joined thread of connection %s", s.c_str());
586  }
587  }
588  }
589  m_stateSemaphore.post();
590  cleanUnits();
591 }
592 
593 void PortCore::cleanUnits(bool blocking)
594 {
595  // We will remove any connections that have finished operating from
596  // the PortCore#units list.
597 
598  // Depending on urgency, either wait for a safe time to do this
599  // or skip if unsafe.
600  if (blocking) {
601  m_stateSemaphore.wait();
602  } else {
603  blocking = m_stateSemaphore.check();
604  if (!blocking) {
605  return;
606  }
607  }
608 
609  // We will update our connection counts as a by-product.
610  int updatedInputCount = 0;
611  int updatedOutputCount = 0;
612  int updatedDataOutputCount = 0;
613  yCDebug(PORTCORE, "/ routine check of connections to this port begins");
614  if (!m_finished) {
615 
616  // First, we delete and null out any defunct entries in the list.
617  for (auto& i : m_units) {
618  PortCoreUnit* unit = i;
619  if (unit != nullptr) {
620  yCDebug(PORTCORE, "| checking connection %s %s", unit->getRoute().toString().c_str(), unit->getMode().c_str());
621  if (unit->isFinished()) {
622  std::string con = unit->getRoute().toString();
623  yCDebug(PORTCORE, "| removing connection %s", con.c_str());
624  unit->close();
625  unit->join();
626  delete unit;
627  i = nullptr;
628  yCDebug(PORTCORE, "| removed connection %s", con.c_str());
629  } else {
630  // No work to do except updating connection counts.
631  if (!unit->isDoomed()) {
632  if (unit->isOutput()) {
633  updatedOutputCount++;
634  if (unit->getMode().empty()) {
635  updatedDataOutputCount++;
636  }
637  }
638  if (unit->isInput()) {
639  if (unit->getRoute().getFromName() != "admin") {
640  updatedInputCount++;
641  }
642  }
643  }
644  }
645  }
646  }
647 
648  // Now we do some awkward shuffling (list class may be from ACE
649  // or STL, if ACE it is quite limited). We move the nulls to
650  // the end of the list ...
651  size_t rem = 0;
652  for (size_t i2 = 0; i2 < m_units.size(); i2++) {
653  if (m_units[i2] != nullptr) {
654  if (rem < i2) {
655  m_units[rem] = m_units[i2];
656  m_units[i2] = nullptr;
657  }
658  rem++;
659  }
660  }
661 
662  // ... Now we get rid of the null entries
663  for (size_t i3 = 0; i3 < m_units.size() - rem; i3++) {
664  m_units.pop_back();
665  }
666  }
667 
668  // Finalize the connection counts.
669  m_dataOutputCount = updatedDataOutputCount;
670  m_stateSemaphore.post();
671  m_packetMutex.lock();
672  m_inputCount = updatedInputCount;
673  m_outputCount = updatedOutputCount;
674  m_packetMutex.unlock();
675  yCDebug(PORTCORE, "\\ routine check of connections to this port ends");
676 }
677 
678 
679 void PortCore::addInput(InputProtocol* ip)
680 {
681  yCTrace(PORTCORE, "addInput");
682 
683  // This method is only called by the server thread in its running phase.
684  // It wraps up a network connection as a unit and adds it to
685  // PortCore#units. The unit will have its own thread associated
686  // with it.
687 
688  yCAssert(PORTCORE, ip != nullptr);
689  m_stateSemaphore.wait();
690  PortCoreUnit* unit = new PortCoreInputUnit(*this,
691  getNextIndex(),
692  ip,
693  false);
694  yCAssert(PORTCORE, unit != nullptr);
695  unit->start();
696  m_units.push_back(unit);
697  yCTrace(PORTCORE, "there are now %zu units", m_units.size());
698  m_stateSemaphore.post();
699 }
700 
701 
703 {
704  yCTrace(PORTCORE, "addOutput");
705 
706  // This method is called from threads associated with input
707  // connections.
708  // It wraps up a network connection as a unit and adds it to
709  // PortCore#units. The unit will start with its own thread
710  // associated with it, but that thread will be very short-lived
711  // if the port is not configured to do background writes.
712 
713  yCAssert(PORTCORE, op != nullptr);
714  m_stateSemaphore.wait();
715  if (!m_finished) {
716  PortCoreUnit* unit = new PortCoreOutputUnit(*this, getNextIndex(), op);
717  yCAssert(PORTCORE, unit != nullptr);
718  unit->start();
719  m_units.push_back(unit);
720  }
721  m_stateSemaphore.post();
722 }
723 
724 
725 bool PortCore::isUnit(const Route& route, int index)
726 {
727  // Check if a connection with a specified route (and optional ID) is present
728  bool needReap = false;
729  if (!m_finished) {
730  for (auto* unit : m_units) {
731  if (unit != nullptr) {
732  Route alt = unit->getRoute();
733  std::string wild = "*";
734  bool ok = true;
735  if (index >= 0) {
736  ok = ok && (unit->getIndex() == index);
737  }
738  if (ok) {
739  if (route.getFromName() != wild) {
740  ok = ok && (route.getFromName() == alt.getFromName());
741  }
742  if (route.getToName() != wild) {
743  ok = ok && (route.getToName() == alt.getToName());
744  }
745  if (route.getCarrierName() != wild) {
746  ok = ok && (route.getCarrierName() == alt.getCarrierName());
747  }
748  }
749  if (ok) {
750  needReap = true;
751  break;
752  }
753  }
754  }
755  }
756  return needReap;
757 }
758 
759 
760 bool PortCore::removeUnit(const Route& route, bool synch, bool* except)
761 {
762  // This is a request to remove a connection. It could arise from any
763  // input thread.
764 
765  if (except != nullptr) {
766  yCDebug(PORTCORE, "asked to remove connection in the way of %s", route.toString().c_str());
767  *except = false;
768  } else {
769  yCDebug(PORTCORE, "asked to remove connection %s", route.toString().c_str());
770  }
771 
772  // Scan for units that match the given route, and collect their IDs.
773  // Mark them as "doomed".
774  std::vector<int> removals;
775  m_stateSemaphore.wait();
776  bool needReap = false;
777  if (!m_finished) {
778  for (auto* unit : m_units) {
779  if (unit != nullptr) {
780  Route alt = unit->getRoute();
781  std::string wild = "*";
782  bool ok = true;
783  if (route.getFromName() != wild) {
784  ok = ok && (route.getFromName() == alt.getFromName());
785  }
786  if (route.getToName() != wild) {
787  ok = ok && (route.getToName() == alt.getToName());
788  }
789  if (route.getCarrierName() != wild) {
790  if (except == nullptr) {
791  ok = ok && (route.getCarrierName() == alt.getCarrierName());
792  } else {
793  if (route.getCarrierName() == alt.getCarrierName()) {
794  *except = true;
795  ok = false;
796  }
797  }
798  }
799 
800  if (ok) {
801  yCDebug(PORTCORE, "removing connection %s", alt.toString().c_str());
802  removals.push_back(unit->getIndex());
803  unit->setDoomed();
804  needReap = true;
805  }
806  }
807  }
808  }
809  m_stateSemaphore.post();
810 
811  // If we find one or more matches, we need to do some work.
812  // We've marked those matches as "doomed" so they'll get cleared
813  // up eventually, but we can speed this up by waking up the
814  // server thread.
815  if (needReap) {
816  yCDebug(PORTCORE, "one or more connections need prodding to die");
817 
818  if (m_manual) {
819  // No server thread, no problems.
820  reapUnits();
821  } else {
822  // Send a blank message to make up server thread.
823  OutputProtocol* op = m_face->write(m_address);
824  if (op != nullptr) {
825  op->close();
826  delete op;
827  }
828  yCDebug(PORTCORE, "sent message to prod connection death");
829 
830  if (synch) {
831  // Wait for connections to be cleaned up.
832  yCDebug(PORTCORE, "synchronizing with connection death");
833  bool cont = false;
834  do {
835  m_stateSemaphore.wait();
836  for (int removal : removals) {
837  cont = isUnit(route, removal);
838  if (cont) {
839  break;
840  }
841  }
842  if (cont) {
843  m_connectionListeners++;
844  }
845  m_stateSemaphore.post();
846  if (cont) {
847  m_connectionChangeSemaphore.wait();
848  }
849  } while (cont);
850  }
851  }
852  }
853  return needReap;
854 }
855 
856 
857 bool PortCore::addOutput(const std::string& dest,
858  void* id,
859  OutputStream* os,
860  bool onlyIfNeeded)
861 {
862  YARP_UNUSED(id);
863  yCDebug(PORTCORE, "asked to add output to %s", dest.c_str());
864 
865  // Buffer to store text describing outcome (successful connection,
866  // or a failure).
867  BufferedConnectionWriter bw(true);
868 
869  // Look up the address we'll be connecting to.
870  Contact parts = Name(dest).toAddress();
871  Contact contact = NetworkBase::queryName(parts.getRegName());
872  Contact address = contact;
873 
874  // If we can't find it, say so and abort.
875  if (!address.isValid()) {
876  bw.appendLine(std::string("Do not know how to connect to ") + dest);
877  if (os != nullptr) {
878  bw.write(*os);
879  }
880  return false;
881  }
882 
883  // We clean all existing connections to the desired destination,
884  // optionally stopping if we find one with the right carrier.
885  if (onlyIfNeeded) {
886  // Remove any existing connections between source and destination
887  // with a different carrier. If we find a connection already
888  // present with the correct carrier, then we are done.
889  bool except = false;
890  removeUnit(Route(getName(), address.getRegName(), address.getCarrier()), true, &except);
891  if (except) {
892  // Connection already present.
893  yCDebug(PORTCORE, "output already present to %s", dest.c_str());
894  bw.appendLine(std::string("Desired connection already present from ") + getName() + " to " + dest);
895  if (os != nullptr) {
896  bw.write(*os);
897  }
898  return true;
899  }
900  } else {
901  // Remove any existing connections between source and destination.
902  removeUnit(Route(getName(), address.getRegName(), "*"), true);
903  }
904 
905  // Set up a named route for this connection.
906  std::string aname = address.getRegName();
907  if (aname.empty()) {
908  aname = address.toURI(false);
909  }
910  Route r(getName(),
911  aname,
912  ((!parts.getCarrier().empty()) ? parts.getCarrier() : address.getCarrier()));
913  r.setToContact(contact);
914 
915  // Check for any restrictions on the port. Perhaps it can only
916  // read, or write.
917  bool allowed = true;
918  std::string err;
919  std::string append;
920  unsigned int f = getFlags();
921  bool allow_output = (f & PORTCORE_IS_OUTPUT) != 0;
922  bool rpc = (f & PORTCORE_IS_RPC) != 0;
923  Name name(r.getCarrierName() + std::string("://test"));
924  std::string mode = name.getCarrierModifier("log");
925  bool is_log = (!mode.empty());
926  if (is_log) {
927  if (mode != "in") {
928  err = "Logger configured as log." + mode + ", but only log.in is supported";
929  allowed = false;
930  } else {
931  append = "; " + r.getFromName() + " will forward messages and replies (if any) to " + r.getToName();
932  }
933  }
934  if (!allow_output) {
935  if (!is_log) {
936  bool push = false;
938  if (c != nullptr) {
939  push = c->isPush();
940  }
941  if (push) {
942  err = "Outputs not allowed";
943  allowed = false;
944  }
945  }
946  } else if (rpc) {
947  if (m_dataOutputCount >= 1 && !is_log) {
948  err = "RPC output already connected";
949  allowed = false;
950  }
951  }
952 
953  // If we found a relevant restriction, abort.
954  if (!allowed) {
955  bw.appendLine(err);
956  if (os != nullptr) {
957  bw.write(*os);
958  }
959  return false;
960  }
961 
962  // Ok! We can go ahead and make a connection.
963  OutputProtocol* op = nullptr;
964  if (m_timeout > 0) {
965  address.setTimeout(m_timeout);
966  }
967  op = Carriers::connect(address);
968  if (op != nullptr) {
969  op->attachPort(m_contactable);
970  if (m_timeout > 0) {
971  op->setTimeout(m_timeout);
972  }
973 
974  bool ok = op->open(r);
975  if (!ok) {
976  yCDebug(PORTCORE, "open route error");
977  delete op;
978  op = nullptr;
979  }
980  }
981 
982  // No connection, abort.
983  if (op == nullptr) {
984  bw.appendLine(std::string("Cannot connect to ") + dest);
985  if (os != nullptr) {
986  bw.write(*os);
987  }
988  return false;
989  }
990 
991  // Ok, we have a connection, now add it to PortCore#units
992  if (op->getConnection().isPush()) {
993  // This is the normal case
994  addOutput(op);
995  } else {
996  // This is the case for connections that are initiated
997  // in the opposite direction to native YARP connections.
998  // Native YARP has push connections, initiated by the
999  // sender. HTTP and ROS have pull connections, initiated
1000  // by the receiver.
1001  // We invert the route, flip the protocol direction, and add.
1002  r.swapNames();
1003  op->rename(r);
1004  InputProtocol* ip = &(op->getInput());
1005  m_stateSemaphore.wait();
1006  if (!m_finished) {
1007  PortCoreUnit* unit = new PortCoreInputUnit(*this,
1008  getNextIndex(),
1009  ip,
1010  true);
1011  yCAssert(PORTCORE, unit != nullptr);
1012  unit->start();
1013  m_units.push_back(unit);
1014  }
1015  m_stateSemaphore.post();
1016  }
1017 
1018  // Communicated the good news.
1019  bw.appendLine(std::string("Added connection from ") + getName() + " to " + dest + append);
1020  if (os != nullptr) {
1021  bw.write(*os);
1022  }
1023  cleanUnits();
1024  return true;
1025 }
1026 
1027 
1028 void PortCore::removeOutput(const std::string& dest, void* id, OutputStream* os)
1029 {
1030  YARP_UNUSED(id);
1031  yCDebug(PORTCORE, "asked to remove output to %s", dest.c_str());
1032 
1033  // All the real work done by removeUnit().
1034  BufferedConnectionWriter bw(true);
1035  if (removeUnit(Route("*", dest, "*"), true)) {
1036  bw.appendLine(std::string("Removed connection from ") + getName() + " to " + dest);
1037  } else {
1038  bw.appendLine(std::string("Could not find an outgoing connection to ") + dest);
1039  }
1040  if (os != nullptr) {
1041  bw.write(*os);
1042  }
1043  cleanUnits();
1044 }
1045 
1046 void PortCore::removeInput(const std::string& src, void* id, OutputStream* os)
1047 {
1048  YARP_UNUSED(id);
1049  yCDebug(PORTCORE, "asked to remove input to %s", src.c_str());
1050 
1051  // All the real work done by removeUnit().
1052  BufferedConnectionWriter bw(true);
1053  if (removeUnit(Route(src, "*", "*"), true)) {
1054  bw.appendLine(std::string("Removing connection from ") + src + " to " + getName());
1055  } else {
1056  bw.appendLine(std::string("Could not find an incoming connection from ") + src);
1057  }
1058  if (os != nullptr) {
1059  bw.write(*os);
1060  }
1061  cleanUnits();
1062 }
1063 
1065 {
1066  YARP_UNUSED(id);
1067  cleanUnits();
1068 
1069  // Buffer to store a human-readable description of the port's
1070  // state.
1071  BufferedConnectionWriter bw(true);
1072 
1073  m_stateSemaphore.wait();
1074 
1075  // Report name and address.
1076  bw.appendLine(std::string("This is ") + m_address.getRegName() + " at " + m_address.toURI());
1077 
1078  // Report outgoing connections.
1079  int oct = 0;
1080  for (auto* unit : m_units) {
1081  if ((unit != nullptr) && unit->isOutput() && !unit->isFinished()) {
1082  Route route = unit->getRoute();
1083  std::string msg = "There is an output connection from " + route.getFromName() + " to " + route.getToName() + " using " + route.getCarrierName();
1084  bw.appendLine(msg);
1085  oct++;
1086  }
1087  }
1088  if (oct < 1) {
1089  bw.appendLine("There are no outgoing connections");
1090  }
1091 
1092  // Report incoming connections.
1093  int ict = 0;
1094  for (auto* unit : m_units) {
1095  if ((unit != nullptr) && unit->isInput() && !unit->isFinished()) {
1096  Route route = unit->getRoute();
1097  if (!route.getCarrierName().empty()) {
1098  std::string msg = "There is an input connection from " + route.getFromName() + " to " + route.getToName() + " using " + route.getCarrierName();
1099  bw.appendLine(msg);
1100  ict++;
1101  }
1102  }
1103  }
1104  if (ict < 1) {
1105  bw.appendLine("There are no incoming connections");
1106  }
1107 
1108  m_stateSemaphore.post();
1109 
1110  // Send description across network, or print it.
1111  if (os != nullptr) {
1112  bw.write(*os);
1113  } else {
1114  StringOutputStream sos;
1115  bw.write(sos);
1116  printf("%s\n", sos.toString().c_str());
1117  }
1118 }
1119 
1120 
1122 {
1123  cleanUnits();
1124 
1125  m_stateSemaphore.wait();
1126 
1127  // Report name and address of port.
1128  PortInfo baseInfo;
1130  std::string portName = m_address.getRegName();
1131  baseInfo.message = std::string("This is ") + portName + " at " + m_address.toURI();
1132  reporter.report(baseInfo);
1133 
1134  // Report outgoing connections.
1135  int oct = 0;
1136  for (auto* unit : m_units) {
1137  if ((unit != nullptr) && unit->isOutput() && !unit->isFinished()) {
1138  Route route = unit->getRoute();
1139  std::string msg = "There is an output connection from " + route.getFromName() + " to " + route.getToName() + " using " + route.getCarrierName();
1140  PortInfo info;
1141  info.message = msg;
1143  info.incoming = false;
1144  info.portName = portName;
1145  info.sourceName = route.getFromName();
1146  info.targetName = route.getToName();
1147  info.carrierName = route.getCarrierName();
1148  reporter.report(info);
1149  oct++;
1150  }
1151  }
1152  if (oct < 1) {
1153  PortInfo info;
1155  info.message = "There are no outgoing connections";
1156  reporter.report(info);
1157  }
1158 
1159  // Report incoming connections.
1160  int ict = 0;
1161  for (auto* unit : m_units) {
1162  if ((unit != nullptr) && unit->isInput() && !unit->isFinished()) {
1163  Route route = unit->getRoute();
1164  std::string msg = "There is an input connection from " + route.getFromName() + " to " + route.getToName() + " using " + route.getCarrierName();
1165  PortInfo info;
1166  info.message = msg;
1168  info.incoming = true;
1169  info.portName = portName;
1170  info.sourceName = route.getFromName();
1171  info.targetName = route.getToName();
1172  info.carrierName = route.getCarrierName();
1173  reporter.report(info);
1174  ict++;
1175  }
1176  }
1177  if (ict < 1) {
1178  PortInfo info;
1180  info.message = "There are no incoming connections";
1181  reporter.report(info);
1182  }
1183 
1184  m_stateSemaphore.post();
1185 }
1186 
1187 
1189 {
1190  m_stateSemaphore.wait();
1191  if (reporter != nullptr) {
1192  m_eventReporter = reporter;
1193  }
1194  m_stateSemaphore.post();
1195 }
1196 
1198 {
1199  m_stateSemaphore.wait();
1200  m_eventReporter = nullptr;
1201  m_stateSemaphore.post();
1202 }
1203 
1204 void PortCore::report(const PortInfo& info)
1205 {
1206  // We are in the context of one of the input or output threads,
1207  // so our contact with the PortCore must be absolutely minimal.
1208  //
1209  // It is safe to pick up the address of the reporter if this is
1210  // kept constant over the lifetime of the input/output threads.
1211 
1212  if (m_eventReporter != nullptr) {
1213  m_eventReporter->report(info);
1214  }
1215 }
1216 
1217 
1219 {
1220  YARP_UNUSED(id);
1221  YARP_UNUSED(os);
1222  bool result = true;
1223 
1224  // We are in the context of one of the input threads,
1225  // so our contact with the PortCore must be absolutely minimal.
1226  //
1227  // It is safe to pick up the address of the reader since this is
1228  // constant over the lifetime of the input threads.
1229 
1230  if (m_reader != nullptr && !m_interrupted) {
1231  m_interruptable = false; // No mutexing; user of interrupt() has to be careful.
1232 
1233  bool haveOutputs = (m_outputCount != 0); // No mutexing, but failure modes are benign.
1234 
1235  if (m_logNeeded && haveOutputs) {
1236  // Normally, yarp doesn't pay attention to the content of
1237  // messages received by the client. Likewise, the content
1238  // of replies are not monitored. However it may sometimes
1239  // be useful to log this traffic.
1240 
1241  ConnectionRecorder recorder;
1242  recorder.init(&reader);
1243  lockCallback();
1244  result = m_reader->read(recorder);
1245  unlockCallback();
1246  recorder.fini();
1247  // send off a log of this transaction to whoever wants it
1248  sendHelper(recorder, PORTCORE_SEND_LOG);
1249  } else {
1250  // YARP is not needed as a middleman
1251  lockCallback();
1252  result = m_reader->read(reader);
1253  unlockCallback();
1254  }
1255 
1256  m_interruptable = true;
1257  } else {
1258  // Read and ignore message, there is no where to send it.
1259  yCDebug(PORTCORE, "data received, no reader for it");
1260  Bottle b;
1261  result = b.read(reader);
1262  }
1263  return result;
1264 }
1265 
1266 
1267 bool PortCore::send(const PortWriter& writer,
1268  PortReader* reader,
1269  const PortWriter* callback)
1270 {
1271  // check if there is any modifier
1272  // we need to protect this part while the modifier
1273  // plugin is loading or unloading!
1274  m_modifier.outputMutex.lock();
1275  if (m_modifier.outputModifier != nullptr) {
1276  if (!m_modifier.outputModifier->acceptOutgoingData(writer)) {
1277  m_modifier.outputMutex.unlock();
1278  return false;
1279  }
1280  m_modifier.outputModifier->modifyOutgoingData(writer);
1281  }
1282  m_modifier.outputMutex.unlock();
1283  if (!m_logNeeded) {
1284  return sendHelper(writer, PORTCORE_SEND_NORMAL, reader, callback);
1285  }
1286  // logging is desired, so we need to wrap up and log this send
1287  // (and any reply it gets) -- TODO not yet supported
1288  return sendHelper(writer, PORTCORE_SEND_NORMAL, reader, callback);
1289 }
1290 
1292  int mode,
1293  PortReader* reader,
1294  const PortWriter* callback)
1295 {
1296  if (m_interrupted || m_finishing) {
1297  return false;
1298  }
1299 
1300  bool all_ok = true;
1301  bool gotReply = false;
1302  int logCount = 0;
1303  std::string envelopeString = m_envelope;
1304 
1305  // Pass a message to all output units for sending on. We could
1306  // be doing more here to cache the serialization of the message
1307  // and reuse it across output connections. However, one key
1308  // optimization is present: external blocks written by
1309  // yarp::os::ConnectionWriter::appendExternalBlock are never
1310  // copied. So for example the core image array in a yarp::sig::Image
1311  // is untouched by the port communications code.
1312 
1313  yCTrace(PORTCORE, "------- send in real");
1314 
1315  // Give user the chance to know that this object is about to be
1316  // written.
1317  writer.onCommencement();
1318 
1319  // All user-facing parts of this port will be blocked on this
1320  // operation, so we'll want to be snappy. How long the
1321  // operation lasts will depend on these flags:
1322  // * waitAfterSend
1323  // * waitBeforeSend
1324  // set by setWaitAfterSend() and setWaitBeforeSend().
1325  m_stateSemaphore.wait();
1326 
1327  // If the port is shutting down, abort.
1328  if (m_finished) {
1329  m_stateSemaphore.post();
1330  return false;
1331  }
1332 
1333  yCTrace(PORTCORE, "------- send in");
1334  // Prepare a "packet" for tracking a single message which
1335  // may travel by multiple outputs.
1336  m_packetMutex.lock();
1337  PortCorePacket* packet = m_packets.getFreePacket();
1338  yCAssert(PORTCORE, packet != nullptr);
1339  packet->setContent(&writer, false, callback);
1340  m_packetMutex.unlock();
1341 
1342  // Scan connections, placing message everywhere we can.
1343  for (auto* unit : m_units) {
1344  if ((unit != nullptr) && unit->isOutput() && !unit->isFinished()) {
1345  bool log = (!unit->getMode().empty());
1346  if (log) {
1347  // Some connections are for logging only.
1348  logCount++;
1349  }
1350  bool ok = (mode == PORTCORE_SEND_NORMAL) ? (!log) : (log);
1351  if (!ok) {
1352  continue;
1353  }
1354  bool waiter = m_waitAfterSend || (mode == PORTCORE_SEND_LOG);
1355  yCTrace(PORTCORE, "------- -- inc");
1356  m_packetMutex.lock();
1357  packet->inc(); // One more connection carrying message.
1358  m_packetMutex.unlock();
1359  yCTrace(PORTCORE, "------- -- pre-send");
1360  bool gotReplyOne = false;
1361  // Send the message off on this connection.
1362  void* out = unit->send(writer,
1363  reader,
1364  (callback != nullptr) ? callback : (&writer),
1365  reinterpret_cast<void*>(packet),
1366  envelopeString,
1367  waiter,
1368  m_waitBeforeSend,
1369  &gotReplyOne);
1370  gotReply = gotReply || gotReplyOne;
1371  yCTrace(PORTCORE, "------- -- send");
1372  if (out != nullptr) {
1373  // We got back a report of a message already sent.
1374  m_packetMutex.lock();
1375  (static_cast<PortCorePacket*>(out))->dec(); // Message on one fewer connections.
1376  m_packets.checkPacket(static_cast<PortCorePacket*>(out));
1377  m_packetMutex.unlock();
1378  }
1379  if (waiter) {
1380  if (unit->isFinished()) {
1381  all_ok = false;
1382  }
1383  }
1384  yCTrace(PORTCORE, "------- -- dec");
1385  }
1386  }
1387  yCTrace(PORTCORE, "------- pack check");
1388  m_packetMutex.lock();
1389 
1390  // We no longer concern ourselves with the message.
1391  // It may or may not be traveling on some connections.
1392  // But that is not our problem anymore.
1393  packet->dec();
1394 
1395  m_packets.checkPacket(packet);
1396  m_packetMutex.unlock();
1397  yCTrace(PORTCORE, "------- packed");
1398  yCTrace(PORTCORE, "------- send out");
1399  if (mode == PORTCORE_SEND_LOG) {
1400  if (logCount == 0) {
1401  m_logNeeded = false;
1402  }
1403  }
1404  m_stateSemaphore.post();
1405  yCTrace(PORTCORE, "------- send out real");
1406 
1407  if (m_waitAfterSend && reader != nullptr) {
1408  all_ok = all_ok && gotReply;
1409  }
1410 
1411  return all_ok;
1412 }
1413 
1414 
1416 {
1417  bool writing = false;
1418 
1419  m_stateSemaphore.wait();
1420 
1421  // Check if any port is currently writing. TODO optimize
1422  // this query by counting down with notifyCompletion().
1423  if (!m_finished) {
1424  for (auto* unit : m_units) {
1425  if ((unit != nullptr) && !unit->isFinished() && unit->isBusy()) {
1426  writing = true;
1427  }
1428  }
1429  }
1430 
1431  m_stateSemaphore.post();
1432 
1433  return writing;
1434 }
1435 
1436 
1438 {
1439  cleanUnits(false);
1440  m_packetMutex.lock();
1441  int result = m_inputCount;
1442  m_packetMutex.unlock();
1443  return result;
1444 }
1445 
1447 {
1448  cleanUnits(false);
1449  m_packetMutex.lock();
1450  int result = m_outputCount;
1451  m_packetMutex.unlock();
1452  return result;
1453 }
1454 
1455 
1456 void PortCore::notifyCompletion(void* tracker)
1457 {
1458  yCTrace(PORTCORE, "starting notifyCompletion");
1459  m_packetMutex.lock();
1460  if (tracker != nullptr) {
1461  (static_cast<PortCorePacket*>(tracker))->dec();
1462  m_packets.checkPacket(static_cast<PortCorePacket*>(tracker));
1463  }
1464  m_packetMutex.unlock();
1465  yCTrace(PORTCORE, "stopping notifyCompletion");
1466 }
1467 
1468 
1470 {
1471  m_envelopeWriter.restart();
1472  bool ok = envelope.write(m_envelopeWriter);
1473  if (ok) {
1474  setEnvelope(m_envelopeWriter.toString());
1475  }
1476  return ok;
1477 }
1478 
1479 
1480 void PortCore::setEnvelope(const std::string& envelope)
1481 {
1482  m_envelope = envelope;
1483  for (size_t i = 0; i < m_envelope.length(); i++) {
1484  // It looks like envelopes are constrained to be printable ASCII?
1485  // I'm not sure why this would be. TODO check.
1486  if (m_envelope[i] < 32) {
1487  m_envelope = m_envelope.substr(0, i);
1488  break;
1489  }
1490  }
1491  yCDebug(PORTCORE, "set envelope to %s", m_envelope.c_str());
1492 }
1493 
1495 {
1496  return m_envelope;
1497 }
1498 
1500 {
1501  StringInputStream sis;
1502  sis.add(m_envelope);
1503  sis.add("\r\n");
1505  Route route;
1506  sbr.reset(sis, nullptr, route, 0, true);
1507  return envelope.read(sbr);
1508 }
1509 
1510 // Make an RPC connection to talk to a ROS API, send a message, get reply.
1511 // NOTE: ROS support can now be moved out of here, once all documentation
1512 // of older ways to interoperate with it are purged and people stop
1513 // doing it.
1514 static bool __pc_rpc(const Contact& c,
1515  const char* carrier,
1516  Bottle& writer,
1517  Bottle& reader,
1518  bool verbose)
1519 {
1520  ContactStyle style;
1521  style.quiet = !verbose;
1522  style.timeout = 4;
1523  style.carrier = carrier;
1524  bool ok = Network::write(c, writer, reader, style);
1525  return ok;
1526 }
1527 
1528 // ACE is sometimes confused by localhost aliases, in a ROS-incompatible
1529 // way. This method does a quick sanity check if we are using ROS.
1530 static bool __tcp_check(const Contact& c)
1531 {
1532 #ifdef YARP_HAS_ACE
1533  ACE_INET_Addr addr;
1534  int result = addr.set(c.getPort(), c.getHost().c_str());
1535  if (result != 0) {
1536  yCWarning(PORTCORE, "ACE choked on %s:%d\n", c.getHost().c_str(), c.getPort());
1537  }
1538  result = addr.set(c.getPort(), "127.0.0.1");
1539  if (result != 0) {
1540  yCWarning(PORTCORE, "ACE choked on 127.0.0.1:%d\n", c.getPort());
1541  }
1542  result = addr.set(c.getPort(), "127.0.1.1");
1543  if (result != 0) {
1544  yCWarning(PORTCORE, "ACE choked on 127.0.1.1:%d\n", c.getPort());
1545  }
1546 #endif
1547  return true;
1548 }
1549 
1550 namespace {
1551 enum class PortCoreCommand : yarp::conf::vocab32_t
1552 {
1553  Unknown = 0,
1554  Help = yarp::os::createVocab('h', 'e', 'l', 'p'),
1555  Ver = yarp::os::createVocab('v', 'e', 'r'),
1556  Pray = yarp::os::createVocab('p', 'r', 'a', 'y'),
1557  Add = yarp::os::createVocab('a', 'd', 'd'),
1558  Del = yarp::os::createVocab('d', 'e', 'l'),
1559  Atch = yarp::os::createVocab('a', 't', 'c', 'h'),
1560  Dtch = yarp::os::createVocab('d', 't', 'c', 'h'),
1561  List = yarp::os::createVocab('l', 'i', 's', 't'),
1562  Set = yarp::os::createVocab('s', 'e', 't'),
1563  Get = yarp::os::createVocab('g', 'e', 't'),
1564  Prop = yarp::os::createVocab('p', 'r', 'o', 'p'),
1565  RosPublisherUpdate = yarp::os::createVocab('r', 'p', 'u', 'p'),
1566  RosRequestTopic = yarp::os::createVocab('r', 't', 'o', 'p'),
1567  RosGetPid = yarp::os::createVocab('p', 'i', 'd'),
1568  RosGetBusInfo = yarp::os::createVocab('b', 'u', 's'),
1569 };
1570 
1571 enum class PortCoreConnectionDirection : yarp::conf::vocab32_t
1572 {
1573  Error = 0,
1574  Out = yarp::os::createVocab('o', 'u', 't'),
1575  In = yarp::os::createVocab('i', 'n'),
1576 };
1577 
1578 enum class PortCorePropertyAction : yarp::conf::vocab32_t
1579 {
1580  Error = 0,
1581  Get = yarp::os::createVocab('g', 'e', 't'),
1582  Set = yarp::os::createVocab('s', 'e', 't')
1583 };
1584 
1585 PortCoreCommand parseCommand(const yarp::os::Value& v)
1586 {
1587  if (v.isString()) {
1588  // We support ROS client API these days. Here we recode some long ROS
1589  // command names, just for convenience.
1590  std::string cmd = v.asString();
1591  if (cmd == "publisherUpdate") {
1592  return PortCoreCommand::RosPublisherUpdate;
1593  }
1594  if (cmd == "requestTopic") {
1595  return PortCoreCommand::RosRequestTopic;
1596  }
1597  if (cmd == "getPid") {
1598  return PortCoreCommand::RosGetPid;
1599  }
1600  if (cmd == "getBusInfo") {
1601  return PortCoreCommand::RosGetBusInfo;
1602  }
1603  }
1604 
1605  auto cmd = static_cast<PortCoreCommand>(v.asVocab());
1606  switch (cmd) {
1607  case PortCoreCommand::Help:
1608  case PortCoreCommand::Ver:
1609  case PortCoreCommand::Pray:
1610  case PortCoreCommand::Add:
1611  case PortCoreCommand::Del:
1612  case PortCoreCommand::Atch:
1613  case PortCoreCommand::Dtch:
1614  case PortCoreCommand::List:
1615  case PortCoreCommand::Set:
1616  case PortCoreCommand::Get:
1617  case PortCoreCommand::Prop:
1618  case PortCoreCommand::RosPublisherUpdate:
1619  case PortCoreCommand::RosRequestTopic:
1620  case PortCoreCommand::RosGetPid:
1621  case PortCoreCommand::RosGetBusInfo:
1622  return cmd;
1623  default:
1624  return PortCoreCommand::Unknown;
1625  }
1626 }
1627 
1628 PortCoreConnectionDirection parseConnectionDirection(yarp::conf::vocab32_t v, bool errorIsOut = false)
1629 {
1630  auto dir = static_cast<PortCoreConnectionDirection>(v);
1631  switch (dir) {
1632  case PortCoreConnectionDirection::In:
1633  case PortCoreConnectionDirection::Out:
1634  return dir;
1635  default:
1636  return errorIsOut ? PortCoreConnectionDirection::Out : PortCoreConnectionDirection::Error;
1637  }
1638 }
1639 
1640 PortCorePropertyAction parsePropertyAction(yarp::conf::vocab32_t v)
1641 {
1642  auto action = static_cast<PortCorePropertyAction>(v);
1643  switch (action) {
1644  case PortCorePropertyAction::Get:
1645  case PortCorePropertyAction::Set:
1646  return action;
1647  default:
1648  return PortCorePropertyAction::Error;
1649  }
1650 }
1651 
1652 void describeRoute(const Route& route, Bottle& result)
1653 {
1654  Bottle& bfrom = result.addList();
1655  bfrom.addString("from");
1656  bfrom.addString(route.getFromName());
1657 
1658  Bottle& bto = result.addList();
1659  bto.addString("to");
1660  bto.addString(route.getToName());
1661 
1662  Bottle& bcarrier = result.addList();
1663  bcarrier.addString("carrier");
1664  bcarrier.addString(route.getCarrierName());
1665 
1666  Carrier* carrier = Carriers::chooseCarrier(route.getCarrierName());
1667  if (carrier->isConnectionless()) {
1668  Bottle& bconnectionless = result.addList();
1669  bconnectionless.addString("connectionless");
1670  bconnectionless.addInt32(1);
1671  }
1672  if (!carrier->isPush()) {
1673  Bottle& breverse = result.addList();
1674  breverse.addString("push");
1675  breverse.addInt32(0);
1676  }
1677  delete carrier;
1678 }
1679 
1680 } // namespace
1681 
1683  void* id)
1684 {
1685  Bottle cmd;
1686  Bottle result;
1687 
1688  // We've received a message to the port that is marked as administrative.
1689  // That means that instead of passing it along as data to the user of the
1690  // port, the port itself is responsible for reading and responding to
1691  // it. So let's read the message and see what we're supposed to do.
1692  cmd.read(reader);
1693 
1694  yCDebug(PORTCORE, "Port %s received command %s", getName().c_str(), cmd.toString().c_str());
1695 
1696  auto handleAdminHelpCmd = []() {
1697  Bottle result;
1698  // We give a list of the most useful administrative commands.
1699  result.addVocab(yarp::os::createVocab('m', 'a', 'n', 'y'));
1700  result.addString("[help] # give this help");
1701  result.addString("[ver] # report protocol version information");
1702  result.addString("[add] $portname # add an output connection");
1703  result.addString("[add] $portname $car # add an output with a given protocol");
1704  result.addString("[del] $portname # remove an input or output connection");
1705  result.addString("[list] [in] # list input connections");
1706  result.addString("[list] [out] # list output connections");
1707  result.addString("[list] [in] $portname # give details for input");
1708  result.addString("[list] [out] $portname # give details for output");
1709  result.addString("[prop] [get] # get all user-defined port properties");
1710  result.addString("[prop] [get] $prop # get a user-defined port property (prop, val)");
1711  result.addString("[prop] [set] $prop $val # set a user-defined port property (prop, val)");
1712  result.addString("[prop] [get] $portname # get Qos properties of a connection to/from a port");
1713  result.addString("[prop] [set] $portname # set Qos properties of a connection to/from a port");
1714  result.addString("[prop] [get] $cur_port # get information about current process (e.g., scheduling priority, pid)");
1715  result.addString("[prop] [set] $cur_port # set properties of the current process (e.g., scheduling priority, pid)");
1716  result.addString("[atch] [out] $prop # attach a portmonitor plug-in to the port's output");
1717  result.addString("[atch] [in] $prop # attach a portmonitor plug-in to the port's input");
1718  result.addString("[dtch] [out] # detach portmonitor plug-in from the port's output");
1719  result.addString("[dtch] [in] # detach portmonitor plug-in from the port's input");
1720  //result.addString("[atch] $portname $prop # attach a portmonitor plug-in to the connection to/from $portname");
1721  //result.addString("[dtch] $portname # detach any portmonitor plug-in from the connection to/from $portname");
1722  return result;
1723  };
1724 
1725  auto handleAdminVerCmd = []() {
1726  // Gives a version number for the administrative commands.
1727  // It is distinct from YARP library versioning.
1728  Bottle result;
1729  result.addVocab(Vocab::encode("ver"));
1730  result.addInt32(1);
1731  result.addInt32(2);
1732  result.addInt32(3);
1733  return result;
1734  };
1735 
1736  auto handleAdminPrayCmd = [this]() {
1737  // Strongly inspired by nethack #pray command:
1738  // https://nethackwiki.com/wiki/Prayer
1739  // http://www.steelypips.org/nethack/pray.html
1740 
1741  Bottle result;
1742 
1743  bool found = false;
1744  std::string name = yarp::conf::environment::getEnvironment("YARP_ROBOT_NAME", &found);
1745  if (!found) {
1746  name = getName();
1747  // Remove initial "/"
1748  while (name[0] == '/') {
1749  name = name.substr(1);
1750  }
1751  // Keep only the first part of the port name
1752  auto i = name.find('/');
1753  if (i != std::string::npos) {
1754  name = name.substr(0, i);
1755  }
1756  }
1757 
1758  std::random_device rd;
1759  std::mt19937 mt(rd());
1760  std::uniform_int_distribution<int> dist2(0,1);
1761  auto d2 = std::bind(dist2, mt);
1762 
1763  result.addString("You begin praying to " + name + ".");
1764  result.addString("You finish your prayer.");
1765 
1766  static const char* godvoices[] = {
1767  "booms out",
1768  "thunders",
1769  "rings out",
1770  "booms",
1771  };
1772  std::uniform_int_distribution<int> godvoices_dist(0, (sizeof(godvoices) / sizeof(godvoices[0])) - 1);
1773  auto godvoice = [&]() {
1774  return std::string(godvoices[godvoices_dist(mt)]);
1775  };
1776 
1777  static const char* creatures[] = {
1778  "mortal",
1779  "creature",
1780  "robot",
1781  };
1782  std::uniform_int_distribution<int> creatures_dist(0, (sizeof(creatures) / sizeof(creatures[0])) - 1);
1783  auto creature = [&]() {
1784  return std::string(creatures[creatures_dist(mt)]);
1785  };
1786 
1787  static const char* auras[] = {
1788  "amber",
1789  "light blue",
1790  "golden",
1791  "white",
1792  "orange",
1793  "black",
1794  };
1795  std::uniform_int_distribution<int> auras_dist(0, (sizeof(auras) / sizeof(auras[0])) - 1);
1796  auto aura = [&]() {
1797  return std::string(auras[auras_dist(mt)]);
1798  };
1799 
1800  static const char* items[] = {
1801  "keyboard",
1802  "mouse",
1803  "monitor",
1804  "headphones",
1805  "smartphone",
1806  "wallet",
1807  "eyeglasses",
1808  "shirt",
1809  };
1810  std::uniform_int_distribution<int> items_dist(0, (sizeof(items) / sizeof(items[0])) - 1);
1811  auto item = [&]() {
1812  return std::string(items[items_dist(mt)]);
1813  };
1814 
1815  static const char* blessings[] = {
1816  "You feel more limber.",
1817  "The slime disappears.",
1818  "Your amulet vanishes! You can breathe again.",
1819  "You can breathe again.",
1820  "You are back on solid ground.",
1821  "Your stomach feels content.",
1822  "You feel better.",
1823  "You feel much better.",
1824  "Your surroundings change.",
1825  "Your shape becomes uncertain.",
1826  "Your chain disappears.",
1827  "There's a tiger in your tank.",
1828  "You feel in good health again.",
1829  "Your eye feels better.",
1830  "Your eyes feel better.",
1831  "Looks like you are back in Kansas.",
1832  "Your <ITEM> softly glows <AURA>.",
1833  };
1834  std::uniform_int_distribution<int> blessings_dist(0, (sizeof(blessings) / sizeof(blessings[0])) - 1);
1835  auto blessing = [&](){
1836  auto blessing = std::string(blessings[blessings_dist(mt)]);
1837  blessing = std::regex_replace(blessing, std::regex("<ITEM>"), item());
1838  blessing = std::regex_replace(blessing, std::regex("<AURA>"), aura());
1839  return blessing;
1840  };
1841 
1842  std::uniform_int_distribution<int> dist13(0,12);
1843  switch(dist13(mt)) {
1844  case 0:
1845  case 1:
1846  result.addString("You feel that " + name + " is " + (d2() ? "bummed" : "displeased") + ".");
1847  break;
1848  case 2:
1849  case 3:
1850  result.addString("The voice of " + name + " " + godvoice() +
1851  ": \"Thou " + (d2() ? "hast strayed from the path" : "art arrogant") +
1852  ", " + creature() + ". Thou must relearn thy lessons!\"");
1853  break;
1854  case 4:
1855  case 5:
1856  result.addString("The voice of " + name + " " + godvoice() +
1857  ": \"Thou hast angered me.\"");
1858  result.addString("A black glow surrounds you.");
1859  break;
1860  case 6:
1861  result.addString("The voice of " + name + " " + godvoice() +
1862  ": \"Thou hast angered me.\"");
1863  break;
1864  case 7:
1865  case 8:
1866  result.addString("The voice of " + name + " " + godvoice() +
1867  ": \"Thou durst " + (d2() ? "scorn" : "call upon") +
1868  " me? Then die, " + creature() + "!\"");
1869  break;
1870  case 9:
1871  result.addString("You feel that " + name + " is " + (d2() ? "pleased as punch" : "well-pleased") + ".");
1872  result.addString(blessing());
1873  break;
1874  case 10:
1875  result.addString("You feel that " + name + " is " + (d2() ? "ticklish" : "pleased") + ".");
1876  result.addString(blessing());
1877  break;
1878  case 11:
1879  result.addString("You feel that " + name + " is " + (d2() ? "full" : "satisfied") + ".");
1880  result.addString(blessing());
1881  break;
1882  default:
1883  result.addString("The voice of " + name + " " + godvoice() +
1884  ": \"Thou hast angered me.\"");
1885  result.addString("Suddenly, a bolt of lightning strikes you!");
1886  result.addString("You fry to a crisp!");
1887  break;
1888  }
1889 
1890  return result;
1891  };
1892 
1893  auto handleAdminAddCmd = [this, id](std::string output,
1894  const std::string& carrier) {
1895  // Add an output to the port.
1896  Bottle result;
1897  StringOutputStream cache;
1898  if (!carrier.empty()) {
1899  output = carrier + ":/" + output;
1900  }
1901  addOutput(output, id, &cache, false);
1902  std::string r = cache.toString();
1903  int v = (r[0] == 'A') ? 0 : -1;
1904  result.addInt32(v);
1905  result.addString(r);
1906  return result;
1907  };
1908 
1909  auto handleAdminDelCmd = [this, id](const std::string& dest) {
1910  // Delete any inputs or outputs involving the named port.
1911  Bottle result;
1912  StringOutputStream cache;
1913  removeOutput(dest, id, &cache);
1914  std::string r1 = cache.toString();
1915  cache.reset();
1916  removeInput(dest, id, &cache);
1917  std::string r2 = cache.toString();
1918  int v = (r1[0] == 'R' || r2[0] == 'R') ? 0 : -1;
1919  result.addInt32(v);
1920  if (r1[0] == 'R' && r2[0] != 'R') {
1921  result.addString(r1);
1922  } else if (r1[0] != 'R' && r2[0] == 'R') {
1923  result.addString(r2);
1924  } else {
1925  result.addString(r1 + r2);
1926  }
1927  return result;
1928  };
1929 
1930  auto handleAdminAtchCmd = [this](PortCoreConnectionDirection direction,
1931  Property prop) {
1932  Bottle result;
1933  switch (direction) {
1934  case PortCoreConnectionDirection::Out: {
1935  std::string errMsg;
1936  if (!attachPortMonitor(prop, true, errMsg)) {
1937  result.addVocab(Vocab::encode("fail"));
1938  result.addString(errMsg);
1939  } else {
1940  result.addVocab(Vocab::encode("ok"));
1941  }
1942  } break;
1943  case PortCoreConnectionDirection::In: {
1944  std::string errMsg;
1945  if (!attachPortMonitor(prop, false, errMsg)) {
1946  result.addVocab(Vocab::encode("fail"));
1947  result.addString(errMsg);
1948  } else {
1949  result.addVocab(Vocab::encode("ok"));
1950  }
1951  } break;
1952  case PortCoreConnectionDirection::Error:
1953  result.addVocab(Vocab::encode("fail"));
1954  result.addString("attach command must be followed by [out] or [in]");
1955  }
1956  return result;
1957  };
1958 
1959  auto handleAdminDtchCmd = [this](PortCoreConnectionDirection direction) {
1960  Bottle result;
1961  switch (direction) {
1962  case PortCoreConnectionDirection::Out: {
1963  if (detachPortMonitor(true)) {
1964  result.addVocab(Vocab::encode("ok"));
1965  } else {
1966  result.addVocab(Vocab::encode("fail"));
1967  }
1968  } break;
1969  case PortCoreConnectionDirection::In: {
1970  if (detachPortMonitor(false)) {
1971  result.addVocab(Vocab::encode("ok"));
1972  } else {
1973  result.addVocab(Vocab::encode("fail"));
1974  }
1975  } break;
1976  case PortCoreConnectionDirection::Error:
1977  result.addVocab(Vocab::encode("fail"));
1978  result.addString("detach command must be followed by [out] or [in]");
1979  };
1980  return result;
1981  };
1982 
1983  auto handleAdminListCmd = [this](const PortCoreConnectionDirection direction,
1984  const std::string& target) {
1985  Bottle result;
1986  switch (direction) {
1987  case PortCoreConnectionDirection::In: {
1988  // Return a list of all input connections.
1989  m_stateSemaphore.wait();
1990  for (auto* unit : m_units) {
1991  if ((unit != nullptr) && unit->isInput() && !unit->isFinished()) {
1992  Route route = unit->getRoute();
1993  if (target.empty()) {
1994  const std::string& name = route.getFromName();
1995  if (!name.empty()) {
1996  result.addString(name);
1997  }
1998  } else if (route.getFromName() == target) {
1999  describeRoute(route, result);
2000  }
2001  }
2002  }
2003  m_stateSemaphore.post();
2004  } break;
2005  case PortCoreConnectionDirection::Out: {
2006  // Return a list of all output connections.
2007  m_stateSemaphore.wait();
2008  for (auto* unit : m_units) {
2009  if ((unit != nullptr) && unit->isOutput() && !unit->isFinished()) {
2010  Route route = unit->getRoute();
2011  if (target.empty()) {
2012  result.addString(route.getToName());
2013  } else if (route.getToName() == target) {
2014  describeRoute(route, result);
2015  }
2016  }
2017  }
2018  m_stateSemaphore.post();
2019  } break;
2020  case PortCoreConnectionDirection::Error:
2021  // Should never happen
2022  yCAssert(PORTCORE, false);
2023  break;
2024  }
2025  return result;
2026  };
2027 
2028  auto handleAdminSetInCmd = [this](const std::string& target,
2029  const Property& property) {
2030  Bottle result;
2031  // Set carrier parameters on a given input connection.
2032  m_stateSemaphore.wait();
2033  if (target.empty()) {
2034  result.addInt32(-1);
2035  result.addString("target port is not specified.\r\n");
2036  } else {
2037  if (target == getName()) {
2038  std::string errMsg;
2039  if (!setParamPortMonitor(property, false, errMsg)) {
2040  result.addVocab(Vocab::encode("fail"));
2041  result.addString(errMsg);
2042  } else {
2043  result.addVocab(Vocab::encode("ok"));
2044  }
2045  } else {
2046  for (auto* unit : m_units) {
2047  if ((unit != nullptr) && unit->isInput() && !unit->isFinished()) {
2048  Route route = unit->getRoute();
2049  if (route.getFromName() == target) {
2050  unit->setCarrierParams(property);
2051  result.addInt32(0);
2052  std::string msg = "Configured connection from ";
2053  msg += route.getFromName();
2054  msg += "\r\n";
2055  result.addString(msg);
2056  break;
2057  }
2058  }
2059  }
2060  }
2061  if (result.size() == 0) {
2062  result.addInt32(-1);
2063  std::string msg = "Could not find an incoming connection from ";
2064  msg += target;
2065  msg += "\r\n";
2066  result.addString(msg);
2067  }
2068  }
2069  m_stateSemaphore.post();
2070  return result;
2071  };
2072 
2073  auto handleAdminSetOutCmd = [this](const std::string& target,
2074  const Property& property) {
2075  Bottle result;
2076  // Set carrier parameters on a given output connection.
2077  m_stateSemaphore.wait();
2078  if (target.empty()) {
2079  result.addInt32(-1);
2080  result.addString("target port is not specified.\r\n");
2081  } else {
2082  if (target == getName()) {
2083  std::string errMsg;
2084  if (!setParamPortMonitor(property, true, errMsg)) {
2085  result.addVocab(Vocab::encode("fail"));
2086  result.addString(errMsg);
2087  } else {
2088  result.addVocab(Vocab::encode("ok"));
2089  }
2090  } else {
2091  for (auto* unit : m_units) {
2092  if ((unit != nullptr) && unit->isOutput() && !unit->isFinished()) {
2093  Route route = unit->getRoute();
2094  if (route.getToName() == target) {
2095  unit->setCarrierParams(property);
2096  result.addInt32(0);
2097  std::string msg = "Configured connection to ";
2098  msg += route.getToName();
2099  msg += "\r\n";
2100  result.addString(msg);
2101  break;
2102  }
2103  }
2104  }
2105  }
2106  if (result.size() == 0) {
2107  result.addInt32(-1);
2108  std::string msg = "Could not find an incoming connection to ";
2109  msg += target;
2110  msg += "\r\n";
2111  result.addString(msg);
2112  }
2113  }
2114  m_stateSemaphore.post();
2115  return result;
2116  };
2117 
2118  auto handleAdminGetInCmd = [this](const std::string& target) {
2119  Bottle result;
2120  // Get carrier parameters for a given input connection.
2121  m_stateSemaphore.wait();
2122  if (target.empty()) {
2123  result.addInt32(-1);
2124  result.addString("target port is not specified.\r\n");
2125  } else if (target == getName()) {
2126  yarp::os::Property property;
2127  std::string errMsg;
2128  if (!getParamPortMonitor(property, false, errMsg)) {
2129  result.addVocab(Vocab::encode("fail"));
2130  result.addString(errMsg);
2131  } else {
2132  result.addDict() = property;
2133  }
2134  } else {
2135  for (auto* unit : m_units) {
2136  if ((unit != nullptr) && unit->isInput() && !unit->isFinished()) {
2137  Route route = unit->getRoute();
2138  if (route.getFromName() == target) {
2139  yarp::os::Property property;
2140  unit->getCarrierParams(property);
2141  result.addDict() = property;
2142  break;
2143  }
2144  }
2145  }
2146  if (result.size() == 0) {
2147  result.addInt32(-1);
2148  std::string msg = "Could not find an incoming connection from ";
2149  msg += target;
2150  msg += "\r\n";
2151  result.addString(msg);
2152  }
2153  }
2154  m_stateSemaphore.post();
2155  return result;
2156  };
2157 
2158  auto handleAdminGetOutCmd = [this](const std::string& target) {
2159  Bottle result;
2160  // Get carrier parameters for a given output connection.
2161  m_stateSemaphore.wait();
2162  if (target.empty()) {
2163  result.addInt32(-1);
2164  result.addString("target port is not specified.\r\n");
2165  } else if (target == getName()) {
2166  yarp::os::Property property;
2167  std::string errMsg;
2168  if (!getParamPortMonitor(property, true, errMsg)) {
2169  result.addVocab(Vocab::encode("fail"));
2170  result.addString(errMsg);
2171  } else {
2172  result.addDict() = property;
2173  }
2174  } else {
2175  for (auto* unit : m_units) {
2176  if ((unit != nullptr) && unit->isOutput() && !unit->isFinished()) {
2177  Route route = unit->getRoute();
2178  if (route.getToName() == target) {
2179  yarp::os::Property property;
2180  unit->getCarrierParams(property);
2181  result.addDict() = property;
2182  break;
2183  }
2184  }
2185  }
2186  if (result.size() == 0) {
2187  result.addInt32(-1);
2188  std::string msg = "Could not find an incoming connection to ";
2189  msg += target;
2190  msg += "\r\n";
2191  result.addString(msg);
2192  }
2193  }
2194  m_stateSemaphore.post();
2195  return result;
2196  };
2197 
2198  auto handleAdminPropGetCmd = [this](const std::string& key) {
2199  Bottle result;
2200  Property* p = acquireProperties(false);
2201  if (p != nullptr) {
2202  if (key.empty()) {
2203  result.fromString(p->toString());
2204  } else {
2205  // request: "prop get /portname"
2206  if (key[0] == '/') {
2207  bool bFound = false;
2208  // check for their own name
2209  if (key == getName()) {
2210  bFound = true;
2211  Bottle& sched = result.addList();
2212  sched.addString("sched");
2213  Property& sched_prop = sched.addDict();
2214  sched_prop.put("tid", static_cast<int>(this->getTid()));
2215  sched_prop.put("priority", this->getPriority());
2216  sched_prop.put("policy", this->getPolicy());
2217 
2219  Bottle& proc = result.addList();
2220  proc.addString("process");
2221  Property& proc_prop = proc.addDict();
2222  proc_prop.put("pid", info.pid);
2223  proc_prop.put("name", (info.pid != -1) ? info.name : "unknown");
2224  proc_prop.put("arguments", (info.pid != -1) ? info.arguments : "unknown");
2225  proc_prop.put("priority", info.schedPriority);
2226  proc_prop.put("policy", info.schedPolicy);
2227 
2229  Bottle& platform = result.addList();
2230  platform.addString("platform");
2231  Property& platform_prop = platform.addDict();
2232  platform_prop.put("os", pinfo.name);
2233  platform_prop.put("hostname", m_address.getHost());
2234 
2235  unsigned int f = getFlags();
2236  bool is_input = (f & PORTCORE_IS_INPUT) != 0;
2237  bool is_output = (f & PORTCORE_IS_OUTPUT) != 0;
2238  bool is_rpc = (f & PORTCORE_IS_RPC) != 0;
2239  Bottle& port = result.addList();
2240  port.addString("port");
2241  Property& port_prop = port.addDict();
2242  port_prop.put("is_input", is_input);
2243  port_prop.put("is_output", is_output);
2244  port_prop.put("is_rpc", is_rpc);
2245  port_prop.put("type", getType().getName());
2246  } else {
2247  for (auto* unit : m_units) {
2248  if ((unit != nullptr) && !unit->isFinished()) {
2249  Route route = unit->getRoute();
2250  std::string coreName = (unit->isOutput()) ? route.getToName() : route.getFromName();
2251  if (key == coreName) {
2252  bFound = true;
2253  int priority = unit->getPriority();
2254  int policy = unit->getPolicy();
2255  int tos = getTypeOfService(unit);
2256  int tid = static_cast<int>(unit->getTid());
2257  Bottle& sched = result.addList();
2258  sched.addString("sched");
2259  Property& sched_prop = sched.addDict();
2260  sched_prop.put("tid", tid);
2261  sched_prop.put("priority", priority);
2262  sched_prop.put("policy", policy);
2263  Bottle& qos = result.addList();
2264  qos.addString("qos");
2265  Property& qos_prop = qos.addDict();
2266  qos_prop.put("tos", tos);
2267  }
2268  } // end isFinished()
2269  } // end for loop
2270  } // end portName == getname()
2271 
2272  if (!bFound) { // cannot find any port matches the requested one
2273  result.addVocab(Vocab::encode("fail"));
2274  std::string msg = "cannot find any connection to/from ";
2275  msg = msg + key;
2276  result.addString(msg);
2277  }
2278  // end of (portName[0] == '/')
2279  } else {
2280  result.add(p->find(key));
2281  }
2282  }
2283  }
2284  releaseProperties(p);
2285  return result;
2286  };
2287 
2288  auto handleAdminPropSetCmd = [this](const std::string& key,
2289  const Value& value,
2290  const Bottle& process,
2291  const Bottle& sched,
2292  const Bottle& qos) {
2293  Bottle result;
2294  Property* p = acquireProperties(false);
2295  bool bOk = true;
2296  if (p != nullptr) {
2297  p->put(key, value);
2298  // setting scheduling properties of all threads within the process
2299  // scope through the admin port
2300  // e.g. prop set /current_port (process ((priority 30) (policy 1)))
2301  if (!process.isNull()) {
2302  std::string portName = key;
2303  if ((!portName.empty()) && (portName[0] == '/')) {
2304  // check for their own name
2305  if (portName == getName()) {
2306  bOk = false;
2307  Bottle* process_prop = process.find("process").asList();
2308  if (process_prop != nullptr) {
2309  int prio = -1;
2310  int policy = -1;
2311  if (process_prop->check("priority")) {
2312  prio = process_prop->find("priority").asInt32();
2313  }
2314  if (process_prop->check("policy")) {
2315  policy = process_prop->find("policy").asInt32();
2316  }
2317  bOk = setProcessSchedulingParam(prio, policy);
2318  }
2319  }
2320  }
2321  }
2322  // check if we need to set the PortCoreUnit scheduling policy
2323  // e.g., "prop set /portname (sched ((priority 30) (policy 1)))"
2324  // The priority and policy values on Linux are:
2325  // SCHED_OTHER : policy=0, priority=[0 .. 0]
2326  // SCHED_FIFO : policy=1, priority=[1 .. 99]
2327  // SCHED_RR : policy=2, priority=[1 .. 99]
2328  if (!sched.isNull()) {
2329  if ((!key.empty()) && (key[0] == '/')) {
2330  bOk = false;
2331  for (auto* unit : m_units) {
2332  if ((unit != nullptr) && !unit->isFinished()) {
2333  Route route = unit->getRoute();
2334  std::string portName = (unit->isOutput()) ? route.getToName() : route.getFromName();
2335 
2336  if (portName == key) {
2337  Bottle* sched_prop = sched.find("sched").asList();
2338  if (sched_prop != nullptr) {
2339  int prio = -1;
2340  int policy = -1;
2341  if (sched_prop->check("priority")) {
2342  prio = sched_prop->find("priority").asInt32();
2343  }
2344  if (sched_prop->check("policy")) {
2345  policy = sched_prop->find("policy").asInt32();
2346  }
2347  bOk = (unit->setPriority(prio, policy) != -1);
2348  } else {
2349  bOk = false;
2350  }
2351  break;
2352  }
2353  }
2354  }
2355  }
2356  }
2357  // check if we need to set the packet QOS policy
2358  // e.g., "prop set /portname (qos ((priority HIGH)))"
2359  // e.g., "prop set /portname (qos ((dscp AF12)))"
2360  // e.g., "prop set /portname (qos ((tos 12)))"
2361  if (!qos.isNull()) {
2362  if ((!key.empty()) && (key[0] == '/')) {
2363  bOk = false;
2364  for (auto* unit : m_units) {
2365  if ((unit != nullptr) && !unit->isFinished()) {
2366  Route route = unit->getRoute();
2367  std::string portName = (unit->isOutput()) ? route.getToName() : route.getFromName();
2368  if (portName == key) {
2369  Bottle* qos_prop = qos.find("qos").asList();
2370  if (qos_prop != nullptr) {
2371  int tos = -1;
2372  if (qos_prop->check("priority")) {
2373  // set the packet TOS value on the socket based on some predefined
2374  // priority levels.
2375  // the expected levels are: LOW, NORM, HIGH, CRIT
2376  NetInt32 priority = qos_prop->find("priority").asVocab();
2377  int dscp;
2378  switch (priority) {
2379  case yarp::os::createVocab('L', 'O', 'W'):
2380  dscp = 10;
2381  break;
2382  case yarp::os::createVocab('N', 'O', 'R', 'M'):
2383  dscp = 0;
2384  break;
2385  case yarp::os::createVocab('H', 'I', 'G', 'H'):
2386  dscp = 36;
2387  break;
2388  case yarp::os::createVocab('C', 'R', 'I', 'T'):
2389  dscp = 44;
2390  break;
2391  default:
2392  dscp = -1;
2393  }
2394  if (dscp >= 0) {
2395  tos = (dscp << 2);
2396  }
2397  } else if (qos_prop->check("dscp")) {
2398  // Set the packet TOS value on the socket based on the DSCP level
2399  QosStyle::PacketPriorityDSCP dscp_class = QosStyle::getDSCPByVocab(qos_prop->find("dscp").asVocab());
2400  int dscp = -1;
2401  if (dscp_class == QosStyle::DSCP_Invalid) {
2402  auto dscp_val = qos_prop->find("dscp");
2403  if (dscp_val.isInt32()) {
2404  dscp = dscp_val.asInt32();
2405  }
2406  } else {
2407  dscp = static_cast<int>(dscp_class);
2408  }
2409  if ((dscp >= 0) && (dscp < 64)) {
2410  tos = (dscp << 2);
2411  }
2412  } else if (qos_prop->check("tos")) {
2413  // Set the TOS value directly
2414  auto tos_val = qos_prop->find("tos");
2415  if (tos_val.isInt32()) {
2416  tos = tos_val.asInt32();
2417  }
2418  }
2419  if (tos >= 0) {
2420  bOk = setTypeOfService(unit, tos);
2421  }
2422  } else {
2423  bOk = false;
2424  }
2425  break;
2426  }
2427  }
2428  }
2429  }
2430  }
2431  }
2432  releaseProperties(p);
2433  result.addVocab((bOk) ? Vocab::encode("ok") : Vocab::encode("fail"));
2434  return result;
2435  };
2436 
2437  // NOTE: YARP partially supports the ROS Slave API https://wiki.ros.org/ROS/Slave_API
2438 
2439  auto handleAdminRosPublisherUpdateCmd = [this](const std::string& topic, Bottle* pubs) {
2440  // When running against a ROS name server, we need to
2441  // support ROS-style callbacks for connecting publishers
2442  // with subscribers. Note: this should not be necessary
2443  // anymore, now that a dedicated yarp::os::Node class
2444  // has been implemented, but is still needed for older
2445  // ways of interfacing with ROS without using dedicated
2446  // node ports.
2447  Bottle result;
2448  if (pubs != nullptr) {
2449  Property listed;
2450  for (size_t i = 0; i < pubs->size(); i++) {
2451  std::string pub = pubs->get(i).asString();
2452  listed.put(pub, 1);
2453  }
2454  Property present;
2455  m_stateSemaphore.wait();
2456  for (auto* unit : m_units) {
2457  if ((unit != nullptr) && unit->isPupped()) {
2458  std::string me = unit->getPupString();
2459  present.put(me, 1);
2460  if (!listed.check(me)) {
2461  unit->setDoomed();
2462  }
2463  }
2464  }
2465  m_stateSemaphore.post();
2466  for (size_t i = 0; i < pubs->size(); i++) {
2467  std::string pub = pubs->get(i).asString();
2468  if (!present.check(pub)) {
2469  yCDebug(PORTCORE, "ROS ADD %s", pub.c_str());
2470  Bottle req;
2471  Bottle reply;
2472  req.addString("requestTopic");
2473  NestedContact nc(getName());
2474  req.addString(nc.getNodeName());
2475  req.addString(topic);
2476  Bottle& lst = req.addList();
2477  Bottle& sublst = lst.addList();
2478  sublst.addString("TCPROS");
2479  yCDebug(PORTCORE, "Sending [%s] to %s", req.toString().c_str(), pub.c_str());
2480  Contact c = Contact::fromString(pub);
2481  if (!__pc_rpc(c, "xmlrpc", req, reply, false)) {
2482  fprintf(stderr, "Cannot connect to ROS subscriber %s\n", pub.c_str());
2483  // show diagnosics
2484  __pc_rpc(c, "xmlrpc", req, reply, true);
2485  __tcp_check(c);
2486  } else {
2487  Bottle* pref = reply.get(2).asList();
2488  std::string hostname;
2489  std::string carrier;
2490  int portnum = 0;
2491  if (reply.get(0).asInt32() != 1) {
2492  fprintf(stderr, "Failure looking up topic %s: %s\n", topic.c_str(), reply.toString().c_str());
2493  } else if (pref == nullptr) {
2494  fprintf(stderr, "Failure looking up topic %s: expected list of protocols\n", topic.c_str());
2495  } else if (pref->get(0).asString() != "TCPROS") {
2496  fprintf(stderr, "Failure looking up topic %s: unsupported protocol %s\n", topic.c_str(), pref->get(0).asString().c_str());
2497  } else {
2498  Value hostname2 = pref->get(1);
2499  Value portnum2 = pref->get(2);
2500  hostname = hostname2.asString();
2501  portnum = portnum2.asInt32();
2502  carrier = "tcpros+role.pub+topic.";
2503  carrier += topic;
2504  yCDebug(PORTCORE, "topic %s available at %s:%d", topic.c_str(), hostname.c_str(), portnum);
2505  }
2506  if (portnum != 0) {
2507  Contact addr(hostname, portnum);
2508  OutputProtocol* op = nullptr;
2509  Route r = Route(getName(), pub, carrier);
2510  op = Carriers::connect(addr);
2511  if (op == nullptr) {
2512  fprintf(stderr, "NO CONNECTION\n");
2513  std::exit(1);
2514  } else {
2515  op->attachPort(m_contactable);
2516  op->open(r);
2517  }
2518  Route route = op->getRoute();
2519  route.swapNames();
2520  op->rename(route);
2521  InputProtocol* ip = &(op->getInput());
2522  m_stateSemaphore.wait();
2523  PortCoreUnit* unit = new PortCoreInputUnit(*this,
2524  getNextIndex(),
2525  ip,
2526  true);
2527  yCAssert(PORTCORE, unit != nullptr);
2528  unit->setPupped(pub);
2529  unit->start();
2530  m_units.push_back(unit);
2531  m_stateSemaphore.post();
2532  }
2533  }
2534  }
2535  }
2536  }
2537  result.addInt32(1);
2538  result.addString("ok");
2539  return result;
2540  };
2541 
2542  auto handleAdminRosRequestTopicCmd = [this]() {
2543  // ROS-style query for topics.
2544  Bottle result;
2545  result.addInt32(1);
2546  NestedContact nc(getName());
2547  result.addString(nc.getNodeName());
2548  Bottle& lst = result.addList();
2549  Contact addr = getAddress();
2550  lst.addString("TCPROS");
2551  lst.addString(addr.getHost());
2552  lst.addInt32(addr.getPort());
2553  return result;
2554  };
2555 
2556  auto handleAdminRosGetPidCmd = []() {
2557  // ROS-style query for PID.
2558  Bottle result;
2559  result.addInt32(1);
2560  result.addString("");
2561  result.addInt32(yarp::os::impl::getpid());
2562  return result;
2563  };
2564 
2565  auto handleAdminRosGetBusInfoCmd = []() {
2566  // ROS-style query for bus information - we support this
2567  // in yarp::os::Node but not otherwise.
2568  Bottle result;
2569  result.addInt32(1);
2570  result.addString("");
2571  result.addList().addList();
2572  return result;
2573  };
2574 
2575  auto handleAdminUnknownCmd = [this](const Bottle& cmd) {
2576  Bottle result;
2577  bool ok = false;
2578  if (m_adminReader != nullptr) {
2579  DummyConnector con;
2580  cmd.write(con.getWriter());
2581  lockCallback();
2582  ok = m_adminReader->read(con.getReader());
2583  unlockCallback();
2584  if (ok) {
2585  result.read(con.getReader());
2586  }
2587  }
2588  if (!ok) {
2589  result.addVocab(Vocab::encode("fail"));
2590  result.addString("send [help] for list of valid commands");
2591  }
2592  return result;
2593  };
2594 
2595  const PortCoreCommand command = parseCommand(cmd.get(0));
2596  switch (command) {
2597  case PortCoreCommand::Help:
2598  result = handleAdminHelpCmd();
2599  break;
2600  case PortCoreCommand::Ver:
2601  result = handleAdminVerCmd();
2602  break;
2603  case PortCoreCommand::Pray:
2604  result = handleAdminPrayCmd();
2605  break;
2606  case PortCoreCommand::Add: {
2607  std::string output = cmd.get(1).asString();
2608  std::string carrier = cmd.get(2).asString();
2609  result = handleAdminAddCmd(std::move(output), carrier);
2610  } break;
2611  case PortCoreCommand::Del: {
2612  const std::string dest = cmd.get(1).asString();
2613  result = handleAdminDelCmd(dest);
2614  } break;
2615  case PortCoreCommand::Atch: {
2616  const PortCoreConnectionDirection direction = parseConnectionDirection(cmd.get(1).asVocab());
2617  Property prop(cmd.get(2).asString().c_str());
2618  result = handleAdminAtchCmd(direction, std::move(prop));
2619  } break;
2620  case PortCoreCommand::Dtch: {
2621  const PortCoreConnectionDirection direction = parseConnectionDirection(cmd.get(1).asVocab());
2622  result = handleAdminDtchCmd(direction);
2623  } break;
2624  case PortCoreCommand::List: {
2625  const PortCoreConnectionDirection direction = parseConnectionDirection(cmd.get(1).asVocab(), true);
2626  const std::string target = cmd.get(2).asString();
2627  result = handleAdminListCmd(direction, target);
2628  } break;
2629  case PortCoreCommand::Set: {
2630  const PortCoreConnectionDirection direction = parseConnectionDirection(cmd.get(1).asVocab(), true);
2631  const std::string target = cmd.get(2).asString();
2632  yarp::os::Property property;
2633  property.fromString(cmd.toString());
2634  switch (direction) {
2635  case PortCoreConnectionDirection::In:
2636  result = handleAdminSetInCmd(target, property);
2637  break;
2638  case PortCoreConnectionDirection::Out:
2639  result = handleAdminSetOutCmd(target, property);
2640  break;
2641  case PortCoreConnectionDirection::Error:
2642  yCAssert(PORTCORE, false); // Should never happen (error is out)
2643  break;
2644  }
2645  } break;
2646  case PortCoreCommand::Get: {
2647  const PortCoreConnectionDirection direction = parseConnectionDirection(cmd.get(1).asVocab(), true);
2648  const std::string target = cmd.get(2).asString();
2649  switch (direction) {
2650  case PortCoreConnectionDirection::In:
2651  result = handleAdminGetInCmd(target);
2652  break;
2653  case PortCoreConnectionDirection::Out:
2654  result = handleAdminGetOutCmd(target);
2655  break;
2656  case PortCoreConnectionDirection::Error:
2657  yCAssert(PORTCORE, false); // Should never happen (error is out)
2658  break;
2659  }
2660  } break;
2661  case PortCoreCommand::Prop: {
2662  PortCorePropertyAction action = parsePropertyAction(cmd.get(1).asVocab());
2663  const std::string key = cmd.get(2).asString();
2664  // Set/get arbitrary properties on a port.
2665  switch (action) {
2666  case PortCorePropertyAction::Get:
2667  result = handleAdminPropGetCmd(key);
2668  break;
2669  case PortCorePropertyAction::Set: {
2670  const Value& value = cmd.get(3);
2671  const Bottle& process = cmd.findGroup("process");
2672  const Bottle& sched = cmd.findGroup("sched");
2673  const Bottle& qos = cmd.findGroup("qos");
2674  result = handleAdminPropSetCmd(key, value, process, sched, qos);
2675  } break;
2676  case PortCorePropertyAction::Error:
2677  result.addVocab(Vocab::encode("fail"));
2678  result.addString("property action not known");
2679  break;
2680  }
2681  } break;
2682  case PortCoreCommand::RosPublisherUpdate: {
2683  yCDebug(PORTCORE, "publisherUpdate! --> %s", cmd.toString().c_str());
2684  // std::string caller_id = cmd.get(1).asString(); // Currently unused
2685  std::string topic = RosNameSpace::fromRosName(cmd.get(2).asString());
2686  Bottle* pubs = cmd.get(3).asList();
2687  result = handleAdminRosPublisherUpdateCmd(topic, pubs);
2688  reader.requestDrop(); // ROS needs us to close down.
2689  } break;
2690  case PortCoreCommand::RosRequestTopic:
2691  yCDebug(PORTCORE, "requestTopic! --> %s", cmd.toString().c_str());
2692  // std::string caller_id = cmd.get(1).asString(); // Currently unused
2693  // std::string topic = RosNameSpace::fromRosName(cmd.get(2).asString()); // Currently unused
2694  // Bottle protocols = cmd.get(3).asList(); // Currently unused
2695  result = handleAdminRosRequestTopicCmd();
2696  reader.requestDrop(); // ROS likes to close down.
2697  break;
2698  case PortCoreCommand::RosGetPid:
2699  // std::string caller_id = cmd.get(1).asString(); // Currently unused
2700  result = handleAdminRosGetPidCmd();
2701  reader.requestDrop(); // ROS likes to close down.
2702  break;
2703  case PortCoreCommand::RosGetBusInfo:
2704  // std::string caller_id = cmd.get(1).asString(); // Currently unused
2705  result = handleAdminRosGetBusInfoCmd();
2706  reader.requestDrop(); // ROS likes to close down.
2707  break;
2708  case PortCoreCommand::Unknown:
2709  result = handleAdminUnknownCmd(cmd);
2710  break;
2711  }
2712 
2713  ConnectionWriter* writer = reader.getWriter();
2714  if (writer != nullptr) {
2715  result.write(*writer);
2716  }
2717 
2718  return true;
2719 }
2720 
2721 
2722 bool PortCore::setTypeOfService(PortCoreUnit* unit, int tos)
2723 {
2724  if (unit == nullptr) {
2725  return false;
2726  }
2727 
2728  yCDebug(PORTCORE, "Trying to set TOS = %d", tos);
2729 
2730  if (unit->isOutput()) {
2731  auto* outUnit = dynamic_cast<PortCoreOutputUnit*>(unit);
2732  if (outUnit != nullptr) {
2733  OutputProtocol* op = outUnit->getOutPutProtocol();
2734  if (op != nullptr) {
2735  yCDebug(PORTCORE, "Trying to set TOS = %d on output unit", tos);
2736  bool ok = op->getOutputStream().setTypeOfService(tos);
2737  if (!ok) {
2738  yCWarning(PORTCORE, "Setting TOS on output unit failed");
2739  }
2740  return ok;
2741  }
2742  }
2743  }
2744 
2745  // Some of the input units may have output stream object to write back to
2746  // the connection (e.g., tcp ack and reply). Thus the QoS preferences should be
2747  // also configured for them.
2748 
2749 
2750  if (unit->isInput()) {
2751  auto* inUnit = dynamic_cast<PortCoreInputUnit*>(unit);
2752  if (inUnit != nullptr) {
2753  InputProtocol* ip = inUnit->getInPutProtocol();
2754  if ((ip != nullptr) && ip->getOutput().isOk()) {
2755  yCDebug(PORTCORE, "Trying to set TOS = %d on input unit", tos);
2756  bool ok = ip->getOutput().getOutputStream().setTypeOfService(tos);
2757  if (!ok) {
2758  yCWarning(PORTCORE, "Setting TOS on input unit failed");
2759  }
2760  return ok;
2761  }
2762  }
2763  }
2764  // if there is nothing to be set, returns true
2765  return true;
2766 }
2767 
2768 int PortCore::getTypeOfService(PortCoreUnit* unit)
2769 {
2770  if (unit == nullptr) {
2771  return -1;
2772  }
2773 
2774  if (unit->isOutput()) {
2775  auto* outUnit = dynamic_cast<PortCoreOutputUnit*>(unit);
2776  if (outUnit != nullptr) {
2777  OutputProtocol* op = outUnit->getOutPutProtocol();
2778  if (op != nullptr) {
2779  return op->getOutputStream().getTypeOfService();
2780  }
2781  }
2782  }
2783 
2784  // Some of the input units may have output stream object to write back to
2785  // the connection (e.g., tcp ack and reply). Thus the QoS preferences should be
2786  // also configured for them.
2787 
2788 
2789  if (unit->isInput()) {
2790  auto* inUnit = dynamic_cast<PortCoreInputUnit*>(unit);
2791  if (inUnit != nullptr) {
2792  InputProtocol* ip = inUnit->getInPutProtocol();
2793  if ((ip != nullptr) && ip->getOutput().isOk()) {
2794  return ip->getOutput().getOutputStream().getTypeOfService();
2795  }
2796  }
2797  }
2798  return -1;
2799 }
2800 
2801 // attach a portmonitor plugin to the port or to a specific connection
2802 bool PortCore::attachPortMonitor(yarp::os::Property& prop, bool isOutput, std::string& errMsg)
2803 {
2804  // attach to the current port
2805  Carrier* portmonitor = Carriers::chooseCarrier("portmonitor");
2806  if (portmonitor == nullptr) {
2807  errMsg = "Portmonitor carrier modifier cannot be find or it is not enabled in Yarp!";
2808  return false;
2809  }
2810 
2811  if (isOutput) {
2812  detachPortMonitor(true);
2813  prop.put("source", getName());
2814  prop.put("destination", "");
2815  prop.put("sender_side", 1);
2816  prop.put("receiver_side", 0);
2817  prop.put("carrier", "");
2818  m_modifier.outputMutex.lock();
2819  m_modifier.outputModifier = portmonitor;
2820  if (!m_modifier.outputModifier->configureFromProperty(prop)) {
2821  m_modifier.releaseOutModifier();
2822  errMsg = "Failed to configure the portmonitor plug-in";
2823  m_modifier.outputMutex.unlock();
2824  return false;
2825  }
2826  m_modifier.outputMutex.unlock();
2827  } else {
2828  detachPortMonitor(false);
2829  prop.put("source", "");
2830  prop.put("destination", getName());
2831  prop.put("sender_side", 0);
2832  prop.put("receiver_side", 1);
2833  prop.put("carrier", "");
2834  m_modifier.inputMutex.lock();
2835  m_modifier.inputModifier = portmonitor;
2836  if (!m_modifier.inputModifier->configureFromProperty(prop)) {
2837  m_modifier.releaseInModifier();
2838  errMsg = "Failed to configure the portmonitor plug-in";
2839  m_modifier.inputMutex.unlock();
2840  return false;
2841  }
2842  m_modifier.inputMutex.unlock();
2843  }
2844  return true;
2845 }
2846 
2847 // detach the portmonitor from the port or specific connection
2848 bool PortCore::detachPortMonitor(bool isOutput)
2849 {
2850  if (isOutput) {
2851  m_modifier.outputMutex.lock();
2852  m_modifier.releaseOutModifier();
2853  m_modifier.outputMutex.unlock();
2854  } else {
2855  m_modifier.inputMutex.lock();
2856  m_modifier.releaseInModifier();
2857  m_modifier.inputMutex.unlock();
2858  }
2859  return true;
2860 }
2861 
2862 bool PortCore::setParamPortMonitor(const yarp::os::Property& param,
2863  bool isOutput,
2864  std::string& errMsg)
2865 {
2866  if (isOutput) {
2867  m_modifier.outputMutex.lock();
2868  if (m_modifier.outputModifier == nullptr) {
2869  errMsg = "No port modifier is attached to the output";
2870  m_modifier.outputMutex.unlock();
2871  return false;
2872  }
2873  m_modifier.outputModifier->setCarrierParams(param);
2874  m_modifier.outputMutex.unlock();
2875  } else {
2876  m_modifier.inputMutex.lock();
2877  if (m_modifier.inputModifier == nullptr) {
2878  errMsg = "No port modifier is attached to the input";
2879  m_modifier.inputMutex.unlock();
2880  return false;
2881  }
2882  m_modifier.inputModifier->setCarrierParams(param);
2883  m_modifier.inputMutex.unlock();
2884  }
2885  return true;
2886 }
2887 
2888 bool PortCore::getParamPortMonitor(yarp::os::Property& param,
2889  bool isOutput,
2890  std::string& errMsg)
2891 {
2892  if (isOutput) {
2893  m_modifier.outputMutex.lock();
2894  if (m_modifier.outputModifier == nullptr) {
2895  errMsg = "No port modifier is attached to the output";
2896  m_modifier.outputMutex.unlock();
2897  return false;
2898  }
2899  m_modifier.outputModifier->getCarrierParams(param);
2900  m_modifier.outputMutex.unlock();
2901  } else {
2902  m_modifier.inputMutex.lock();
2903  if (m_modifier.inputModifier == nullptr) {
2904  errMsg = "No port modifier is attached to the input";
2905  m_modifier.inputMutex.unlock();
2906  return false;
2907  }
2908  m_modifier.inputModifier->getCarrierParams(param);
2909  m_modifier.inputMutex.unlock();
2910  }
2911  return true;
2912 }
2913 
2914 void PortCore::reportUnit(PortCoreUnit* unit, bool active)
2915 {
2916  YARP_UNUSED(active);
2917  if (unit != nullptr) {
2918  bool isLog = (!unit->getMode().empty());
2919  if (isLog) {
2920  m_logNeeded = true;
2921  }
2922  }
2923 }
2924 
2925 bool PortCore::setProcessSchedulingParam(int priority, int policy)
2926 {
2927 #if defined(__linux__)
2928  // set the sched properties of all threads within the process
2929  struct sched_param sch_param;
2930  sch_param.__sched_priority = priority;
2931 
2932  DIR* dir;
2933  char path[PATH_MAX];
2934  sprintf(path, "/proc/%d/task/", yarp::os::impl::getpid());
2935 
2936  dir = opendir(path);
2937  if (dir == nullptr) {
2938  return false;
2939  }
2940 
2941  struct dirent* d;
2942  char* end;
2943  long tid = 0;
2944  bool ret = true;
2945  while ((d = readdir(dir)) != nullptr) {
2946  if (isdigit(static_cast<unsigned char>(*d->d_name)) == 0) {
2947  continue;
2948  }
2949 
2950  tid = strtol(d->d_name, &end, 10);
2951  if (d->d_name == end || ((end != nullptr) && (*end != 0))) {
2952  closedir(dir);
2953  return false;
2954  }
2955  ret &= (sched_setscheduler(static_cast<pid_t>(tid), policy, &sch_param) == 0);
2956  }
2957  closedir(dir);
2958  return ret;
2959 #elif defined(YARP_HAS_ACE) // for other platforms
2960  // TODO: Check how to set the scheduling properties of all process's threads in Windows
2961  ACE_Sched_Params param(policy, (ACE_Sched_Priority)priority, ACE_SCOPE_PROCESS);
2962  int ret = ACE_OS::set_scheduling_params(param, yarp::os::impl::getpid());
2963  return (ret != -1);
2964 #else
2965  return false;
2966 #endif
2967 }
2968 
2970 {
2971  m_stateSemaphore.wait();
2972  if (!readOnly) {
2973  if (m_prop == nullptr) {
2974  m_prop = new Property();
2975  }
2976  }
2977  return m_prop;
2978 }
2979 
2981 {
2982  YARP_UNUSED(prop);
2983  m_stateSemaphore.post();
2984 }
2985 
2986 bool PortCore::removeIO(const Route& route, bool synch)
2987 {
2988  return removeUnit(route, synch);
2989 }
2990 
2991 void PortCore::setName(const std::string& name)
2992 {
2993  m_name = name;
2994 }
2995 
2996 std::string PortCore::getName()
2997 {
2998  return m_name;
2999 }
3000 
3001 int PortCore::getNextIndex()
3002 {
3003  int result = m_counter;
3004  m_counter++;
3005  if (m_counter < 0) {
3006  m_counter = 1;
3007  }
3008  return result;
3009 }
3010 
3012 {
3013  return m_address;
3014 }
3015 
3016 void PortCore::resetPortName(const std::string& str)
3017 {
3018  m_address.setName(str);
3019 }
3020 
3022 {
3023  return m_readableCreator;
3024 }
3025 
3027 {
3028  m_controlRegistration = flag;
3029 }
3030 
3032 {
3033  return m_listening;
3034 }
3035 
3037 {
3038  return m_manual;
3039 }
3040 
3042 {
3043  return m_interrupted;
3044 }
3045 
3046 void PortCore::setTimeout(float timeout)
3047 {
3048  m_timeout = timeout;
3049 }
3050 
3051 #ifndef YARP_NO_DEPRECATED // since YARP 3.3
3055 {
3056  removeCallbackLock();
3057  if (mutex != nullptr) {
3058  m_old_mutex = mutex;
3059  m_mutexOwned = false;
3060  } else {
3061  m_old_mutex = new yarp::os::Mutex();
3062  m_mutexOwned = true;
3063  }
3064  return true;
3065 }
3067 #endif // YARP_NO_DEPRECATED
3068 
3069 bool PortCore::setCallbackLock(std::mutex* mutex)
3070 {
3071  removeCallbackLock();
3072  if (mutex != nullptr) {
3073  m_mutex = mutex;
3074  m_mutexOwned = false;
3075  } else {
3076  m_mutex = new std::mutex;
3077  m_mutexOwned = true;
3078  }
3079  return true;
3080 }
3081 
3083 {
3084  if (m_mutexOwned && (m_mutex != nullptr)) {
3085  delete m_mutex;
3086  }
3087  m_mutex = nullptr;
3088 #ifndef YARP_NO_DEPRECATED // since YARP 3.3
3089  m_old_mutex = nullptr;
3090 #endif // YARP_NO_DEPRECATED
3091  m_mutexOwned = false;
3092  return true;
3093 }
3094 
3096 {
3097  if (m_mutex == nullptr) {
3098 #ifndef YARP_NO_DEPRECATED // since YARP 3.3
3099  if (m_old_mutex == nullptr) {
3100  return false;
3101  }
3102  m_old_mutex->lock();
3103  return true;
3104 #else // YARP_NO_DEPRECATED
3105  return false;
3106 #endif // YARP_NO_DEPRECATED
3107  }
3108  m_mutex->lock();
3109  return true;
3110 }
3111 
3113 {
3114  if (m_mutex == nullptr) {
3115 #ifndef YARP_NO_DEPRECATED // since YARP 3.3
3116  if (m_old_mutex == nullptr) {
3117  return true;
3118  }
3119  return m_old_mutex->try_lock();
3120 #else // YARP_NO_DEPRECATED
3121  return true;
3122 #endif // YARP_NO_DEPRECATED
3123  }
3124  return m_mutex->try_lock();
3125 }
3126 
3128 {
3129  if (m_mutex == nullptr) {
3130 #ifndef YARP_NO_DEPRECATED // since YARP 3.3
3131  if (m_old_mutex == nullptr) {
3132  return;
3133  }
3134  return m_old_mutex->unlock();
3135 #else // YARP_NO_DEPRECATED
3136  return;
3137 #endif // YARP_NO_DEPRECATED
3138  }
3139  m_mutex->unlock();
3140 }
3141 
3143 {
3144  return m_modifier;
3145 }
3146 
3148 {
3149  m_typeMutex.lock();
3150  if (!m_checkedType) {
3151  if (!m_type.isValid()) {
3152  m_type = reader.getReadType();
3153  }
3154  m_checkedType = true;
3155  }
3156  m_typeMutex.unlock();
3157 }
3158 
3160 {
3161  m_typeMutex.lock();
3162  Type t = m_type;
3163  m_typeMutex.unlock();
3164  return t;
3165 }
3166 
3167 void PortCore::promiseType(const Type& typ)
3168 {
3169  m_typeMutex.lock();
3170  m_type = typ;
3171  m_typeMutex.unlock();
3172 }
float t
bool ret
static bool __tcp_check(const Contact &c)
Definition: PortCore.cpp:1530
static bool __pc_rpc(const Contact &c, const char *carrier, Bottle &writer, Bottle &reader, bool verbose)
Definition: PortCore.cpp:1514
#define PORTCORE_IS_INPUT
Definition: PortCore.h:50
#define PORTCORE_SEND_LOG
Definition: PortCore.h:45
#define PORTCORE_IS_RPC
Definition: PortCore.h:49
#define PORTCORE_IS_OUTPUT
Definition: PortCore.h:51
#define PORTCORE_SEND_NORMAL
Definition: PortCore.h:44
static bool rpc(const Contact &c, const char *carrier, Bottle &writer, Bottle &reader)
Definition: RosLookup.cpp:22
A simple collection of objects that can be described and transmitted in a portable way.
Definition: Bottle.h:73
void add(const Value &value)
Add a Value to the bottle, at the end of the list.
Definition: Bottle.cpp:339
void fromString(const std::string &text)
Initializes bottle from a string.
Definition: Bottle.cpp:207
Property & addDict()
Places an empty key/value object in the bottle, at the end of the list.
Definition: Bottle.cpp:191
Bottle & addList()
Places an empty nested list in the bottle, at the end of the list.
Definition: Bottle.cpp:185
size_type size() const
Gets the number of elements in the bottle.
Definition: Bottle.cpp:254
bool read(ConnectionReader &reader) override
Set the bottle's value based on input from a network connection.
Definition: Bottle.cpp:243
void addVocab(int x)
Places a vocabulary item in the bottle, at the end of the list.
Definition: Bottle.cpp:167
Value & get(size_type index) const
Reads a Value v from a certain part of the list.
Definition: Bottle.cpp:249
Bottle & findGroup(const std::string &key) const override
Gets a list corresponding to a given keyword.
Definition: Bottle.cpp:305
bool check(const std::string &key) const override
Check if there exists a property of the given name.
Definition: Bottle.cpp:280
bool write(ConnectionWriter &writer) const override
Output a representation of the bottle to a network connection.
Definition: Bottle.cpp:233
void addInt32(std::int32_t x)
Places a 32-bit integer in the bottle, at the end of the list.
Definition: Bottle.cpp:143
void addString(const char *str)
Places a string in the bottle, at the end of the list.
Definition: Bottle.cpp:173
std::string toString() const override
Gives a human-readable textual representation of the bottle.
Definition: Bottle.cpp:214
Value & find(const std::string &key) const override
Gets a value corresponding to a given keyword.
Definition: Bottle.cpp:290
A base class for connection types (tcp, mcast, shmem, ...) which are called carriers in YARP.
Definition: Carrier.h:48
bool isConnectionless() const override=0
Check if this carrier is connectionless (like udp, mcast) or connection based (like tcp).
bool isPush() const override
Check if carrier is "push" or "pull" style.
Definition: Carrier.cpp:25
void setCarrierParams(const Property &params) override
Configure carrier from port administrative commands.
Definition: Carrier.cpp:118
static Face * listen(const Contact &address)
Create a "proto-carrier" interface object that waits for incoming connections prior to a carrier bein...
Definition: Carriers.cpp:253
static Carrier * getCarrierTemplate(const std::string &name)
Get template for carrier.
Definition: Carriers.cpp:241
static Carrier * chooseCarrier(const std::string &name)
Select a carrier by name.
Definition: Carriers.cpp:236
static OutputProtocol * connect(const Contact &address)
Initiate a connection to an address.
Definition: Carriers.cpp:285
An interface for reading from a network connection.
virtual void requestDrop()=0
Tag the connection to be dropped after the current message.
virtual ConnectionWriter * getWriter()=0
Gets a way to reply to the message, if possible.
An interface for writing to a network connection.
virtual bool isPush() const =0
Check if carrier is "push" or "pull" style.
Preferences for how to communicate with a contact.
Definition: ContactStyle.h:27
double timeout
Set a timeout for communication (in units of seconds, fractional seconds allowed).
Definition: ContactStyle.h:50
bool quiet
Suppress all outputs and warnings.
Definition: ContactStyle.h:39
std::string carrier
Request that communication be made using a particular carrier.
Definition: ContactStyle.h:56
Represents how to reach a part of a YARP network.
Definition: Contact.h:39
bool isValid() const
Checks if a Contact is tagged as valid.
Definition: Contact.cpp:301
std::string getRegName() const
Get the name associated with this Contact.
Definition: Contact.cpp:220
static Contact fromString(const std::string &txt)
Factory method.
Definition: Contact.cpp:142
std::string toURI(bool includeCarrier=true) const
Get a representation of the Contact as a URI.
Definition: Contact.cpp:316
int getPort() const
Get the port number associated with this Contact for socket communication.
Definition: Contact.cpp:242
void setTimeout(float timeout)
Set timeout for this Contact.
Definition: Contact.cpp:285
std::string getCarrier() const
Get the carrier associated with this Contact for socket communication.
Definition: Contact.cpp:253
std::string getHost() const
Get the host name associated with this Contact for socket communication.
Definition: Contact.cpp:231
A dummy connection to test yarp::os::Portable implementations.
ConnectionWriter & getWriter()
Get the dummy ConnectionWriter loaded with whatever was written the ConnectionWriter since it was las...
ConnectionReader & getReader()
Get the dummy ConnectionReader loaded with whatever was written the ConnectionWriter since it was las...
The input side of an active connection between two ports.
Definition: InputProtocol.h:38
virtual void close()=0
Negotiate an end to operations.
virtual bool setTimeout(double timeout)=0
Set the timeout to be used for network operations.
virtual OutputProtocol & getOutput()=0
Get an interface for doing write operations on the connection.
virtual void attachPort(Contactable *port)=0
Set the port to be associated with the connection.
Basic wrapper for mutual exclusion.
Definition: Mutex.h:35
Simple abstraction for a YARP port name.
Definition: Name.h:22
Contact toAddress() const
Create an address from the name.
Definition: Name.cpp:30
std::string getCarrierModifier(const char *mod, bool *hasModifier=nullptr)
Definition: Name.cpp:47
A placeholder for rich contact information.
Definition: NestedContact.h:27
std::string getNodeName() const
static std::string toString(int x)
Definition: NetType.cpp:138
static bool initialized()
Returns true if YARP has been fully initialized.
Definition: Network.cpp:1392
static bool getLocalMode()
Get current value of flag "localMode", see setLocalMode function.
Definition: Network.cpp:1057
static Contact unregisterName(const std::string &name)
Removes the registration for a name from the name server.
Definition: Network.cpp:1026
static NameStore * getQueryBypass()
Definition: Network.cpp:1415
static Contact queryName(const std::string &name)
Find out information about a registered name.
Definition: Network.cpp:998
static int disconnectInput(const std::string &src, const std::string &dest, bool silent=false)
Sends a disconnection command to the specified port.
Definition: Network.cpp:1555
static bool writeToNameServer(PortWriter &cmd, PortReader &reply, const ContactStyle &style)
Variant write method specialized to name server.
Definition: Network.cpp:1986
static bool disconnect(const std::string &src, const std::string &dest, bool quiet)
Request that an output port disconnect from an input port.
Definition: Network.cpp:703
static bool write(const Contact &contact, PortWriter &cmd, PortReader &reply, bool admin=false, bool quiet=false, double timeout=-1)
Send a single command to a port and await a single response.
Definition: Network.cpp:1229
The output side of an active connection between two ports.
virtual const Route & getRoute() const =0
virtual Connection & getConnection()=0
Get the connection whose protocol operations we are managing.
virtual bool open(const Route &route)=0
Start negotiating a carrier, using the given route (this should generally match the name of the sendi...
virtual void attachPort(Contactable *port)=0
Set the port to be associated with the connection.
virtual OutputStream & getOutputStream()=0
Access the output stream associated with the connection.
virtual InputProtocol & getInput()=0
Get an interface for doing read operations on the connection.
virtual void close()=0
Negotiate an end to operations.
virtual void rename(const Route &route)=0
Relabel the route after the fact (e.g.
virtual bool setTimeout(double timeout)=0
Set the timeout to be used for network operations.
virtual bool isOk() const =0
Check if the connection is valid and can be used.
virtual bool write(SizedWriter &writer)=0
Write a message on the connection.
Simple specification of the minimum functions needed from output streams.
Definition: OutputStream.h:25
virtual int getTypeOfService()
virtual bool setTypeOfService(int tos)
Information about a port connection or event.
Definition: PortInfo.h:29
std::string targetName
Name of connection target, if any.
Definition: PortInfo.h:66
std::string carrierName
Name of protocol type, if releveant.
Definition: PortInfo.h:69
bool incoming
True if a connection is incoming, false if outgoing.
Definition: PortInfo.h:54
std::string message
A human-readable description of contents.
Definition: PortInfo.h:72
std::string portName
Name of port.
Definition: PortInfo.h:60
int tag
Type of information.
Definition: PortInfo.h:51
std::string sourceName
Name of connection source, if any.
Definition: PortInfo.h:63
@ PORTINFO_CONNECTION
Information about an incoming or outgoing connection.
Definition: PortInfo.h:43
@ PORTINFO_MISC
Unspecified information.
Definition: PortInfo.h:46
A creator for readers.
Interface implemented by all objects that can read themselves from the network, such as Bottle object...
Definition: PortReader.h:28
virtual bool read(ConnectionReader &reader)=0
Read this object from a network connection.
virtual Type getReadType() const
Definition: PortReader.cpp:15
A base class for objects that want information about port status changes.
Definition: PortReport.h:31
virtual void report(const PortInfo &info)=0
Callback for port event/state information.
Interface implemented by all objects that can write themselves to the network, such as Bottle objects...
Definition: PortWriter.h:27
virtual bool write(ConnectionWriter &writer) const =0
Write this object to a network connection.
virtual void onCommencement() const
This is called when the port is about to begin writing operations.
Definition: PortWriter.cpp:20
A class for storing options and configuration information.
Definition: Property.h:37
Value & find(const std::string &key) const override
Gets a value corresponding to a given keyword.
Definition: Property.cpp:1034
std::string toString() const override
Return a standard text representation of the content of the object.
Definition: Property.cpp:1052
void fromString(const std::string &txt, bool wipe=true)
Interprets a string as a list of properties.
Definition: Property.cpp:1046
void put(const std::string &key, const std::string &value)
Associate the given key with the given string.
Definition: Property.cpp:998
bool check(const std::string &key) const override
Check if there exists a property of the given name.
Definition: Property.cpp:1024
PacketPriorityDSCP
The PacketPriorityDSCP defines the packets quality of service (priority) using DSCP.
Definition: QosStyle.h:48
static PacketPriorityDSCP getDSCPByVocab(int vocab)
returns the IPV4/6 DSCP value given as DSCP code
Definition: QosStyle.cpp:158
static std::string fromRosName(const std::string &name)
Information about a connection between two ports.
Definition: Route.h:32
const std::string & getToName() const
Get the destination of the route.
Definition: Route.cpp:106
const std::string & getCarrierName() const
Get the carrier type of the route.
Definition: Route.cpp:126
std::string toString() const
Render a text form of the route, "source->carrier->dest".
Definition: Route.cpp:141
const std::string & getFromName() const
Get the source of the route.
Definition: Route.cpp:96
void swapNames()
Swap from and to names.
Definition: Route.cpp:136
void setToContact(const Contact &toContact)
Set the destination contact of the route.
Definition: Route.cpp:121
An InputStream that reads from a string.
void add(const std::string &txt)
An OutputStream that produces a string.
static ProcessInfo getProcessInfo(int pid=0)
gets the operating system process information given by its PID.
Definition: SystemInfo.cpp:808
static PlatformInfo getPlatformInfo()
getPlatformInfo
Definition: SystemInfo.cpp:600
A single value (typically within a Bottle).
Definition: Value.h:47
virtual bool isString() const
Checks if value is a string.
Definition: Value.cpp:159
virtual std::int32_t asInt32() const
Get 32-bit integer value.
Definition: Value.cpp:207
virtual Bottle * asList() const
Get list value.
Definition: Value.cpp:243
virtual std::int32_t asVocab() const
Get vocabulary identifier as an integer.
Definition: Value.cpp:231
virtual std::string asString() const
Get string value.
Definition: Value.cpp:237
A helper for creating cached object descriptions.
bool write(ConnectionWriter &connection) const override
Write this object to a network connection.
virtual void appendLine(const std::string &data)
Send a string along with a carriage-return-line-feed sequence.
A helper for recording entire message/reply transactions.
void init(yarp::os::ConnectionReader *wrappedReader)
Call this to wrap a specific ConnectionReader.
void fini()
Call this when all reading/writing has been done.
Manager for a single input to a port.
Manager for a single output from a port.
A single message, potentially being transmitted on multiple connections.
void setContent(const yarp::os::PortWriter *writable, bool owned=false, const yarp::os::PortWriter *callback=nullptr, bool ownedCallback=false)
Configure the object being sent and where to send notifications.
void inc()
Increment the usage count for this messagae.
void dec()
Decrement the usage count for this messagae.
This manages a single threaded resource related to a single input or output connection.
Definition: PortCoreUnit.h:30
std::string getPupString() const
Definition: PortCoreUnit.h:237
void setPupped(const std::string &pupString)
Tag this connection as having been created by a publisherUpdate message to the port's administrative ...
Definition: PortCoreUnit.h:251
virtual void * send(const yarp::os::PortWriter &writer, yarp::os::PortReader *reader, const yarp::os::PortWriter *callback, void *tracker, const std::string &envelope, bool waitAfter=true, bool waitBefore=true, bool *gotReply=nullptr)
Send a message on the connection.
Definition: PortCoreUnit.h:130
std::string getMode(bool *hasMode=nullptr)
Read the "mode" of the connection - basically, whether it is used for logging or not.
Definition: PortCoreUnit.h:211
virtual void getCarrierParams(yarp::os::Property &params)
Definition: PortCoreUnit.h:269
void setDoomed()
Request that this connection be shut down as soon as possible.
Definition: PortCoreUnit.h:100
virtual void setCarrierParams(const yarp::os::Property &params)
Set arbitrary parameters for this connection.
Definition: PortCoreUnit.h:261
void notifyCompletion(void *tracker)
Call the right onCompletion() after sending message.
Definition: PortCore.cpp:1456
void resetPortName(const std::string &str)
Definition: PortCore.cpp:3016
std::string getEnvelope()
Definition: PortCore.cpp:1494
void setAdminReadHandler(yarp::os::PortReader &reader)
Set a callback for incoming administrative messages.
Definition: PortCore.cpp:150
int getOutputCount()
Check how many output connections there are.
Definition: PortCore.cpp:1446
bool readBlock(ConnectionReader &reader, void *id, yarp::os::OutputStream *os)
Read a block of regular payload data.
Definition: PortCore.cpp:1218
void setReportCallback(yarp::os::PortReport *reporter)
Set a callback to be notified of changes in port status.
Definition: PortCore.cpp:1188
void run() override
The body of the main thread.
Definition: PortCore.cpp:167
bool start() override
Begin main thread.
Definition: PortCore.cpp:282
void checkType(PortReader &reader)
Definition: PortCore.cpp:3147
void resume()
Undo an interrupt()
Definition: PortCore.cpp:331
void setTimeout(float timeout)
Definition: PortCore.cpp:3046
yarp::os::PortReaderCreator * getReadCreator()
Get the creator of callbacks.
Definition: PortCore.cpp:3021
Property * acquireProperties(bool readOnly)
Definition: PortCore.cpp:2969
void interrupt()
Prepare the port to be shut down.
Definition: PortCore.cpp:337
void setEnvelope(const std::string &envelope)
Set some envelope information to pass along with a message without actually being part of the message...
Definition: PortCore.cpp:1480
bool removeIO(const Route &route, bool synch=false)
Remove any connection matching the supplied route.
Definition: PortCore.cpp:2986
~PortCore()
Destructor.
Definition: PortCore.cpp:64
bool isInterrupted() const
Definition: PortCore.cpp:3041
void report(const yarp::os::PortInfo &info)
Handle a port event (connection, disconnection, etc) Generate a description of the connections associ...
Definition: PortCore.cpp:1204
bool listen(const Contact &address, bool shouldAnnounce=true)
Begin service at a given address.
Definition: PortCore.cpp:71
bool sendHelper(const yarp::os::PortWriter &writer, int mode, yarp::os::PortReader *reader=nullptr, const yarp::os::PortWriter *callback=nullptr)
Send a message with a specific mode (normal or log).
Definition: PortCore.cpp:1291
void removeOutput(const std::string &dest, void *id, yarp::os::OutputStream *os)
Remove an output connection.
Definition: PortCore.cpp:1028
bool isListening() const
Definition: PortCore.cpp:3031
void setControlRegistration(bool flag)
Normally the port will unregister its name with the name server when shutting down.
Definition: PortCore.cpp:3026
bool setCallbackLock(yarp::os::Mutex *mutex)
Definition: PortCore.cpp:3054
int getInputCount()
Check how many input connections there are.
Definition: PortCore.cpp:1437
bool manualStart(const char *sourceName)
Start up the port, but without a main thread.
Definition: PortCore.cpp:314
void setReadHandler(yarp::os::PortReader &reader)
Set a callback for incoming data.
Definition: PortCore.cpp:142
void reportUnit(PortCoreUnit *unit, bool active)
Called by a connection handler with active=true just after it is fully configured,...
Definition: PortCore.cpp:2914
void promiseType(const Type &typ)
Definition: PortCore.cpp:3167
void releaseProperties(Property *prop)
Definition: PortCore.cpp:2980
int getEventCount()
A diagnostic for testing purposes.
Definition: PortCore.cpp:535
void close() override
Shut down port.
Definition: PortCore.cpp:269
const Contact & getAddress() const
Get the address associated with the port.
Definition: PortCore.cpp:3011
void describe(void *id, yarp::os::OutputStream *os)
Produce a text description of the port and its connections.
Definition: PortCore.cpp:1064
yarp::os::Type getType()
Definition: PortCore.cpp:3159
bool isWriting()
Check if a message is currently being sent.
Definition: PortCore.cpp:1415
bool adminBlock(ConnectionReader &reader, void *id)
Read a block of administrative data.
Definition: PortCore.cpp:1682
void removeInput(const std::string &src, void *id, yarp::os::OutputStream *os)
Remove an input connection.
Definition: PortCore.cpp:1046
bool send(const yarp::os::PortWriter &writer, yarp::os::PortReader *reader=nullptr, const yarp::os::PortWriter *callback=nullptr)
Send a normal message.
Definition: PortCore.cpp:1267
void resetReportCallback()
Reset the callback to be notified of changes in port status.
Definition: PortCore.cpp:1197
void setReadCreator(yarp::os::PortReaderCreator &creator)
Set a callback for creating callbacks for incoming data.
Definition: PortCore.cpp:158
yarp::os::impl::PortDataModifier & getPortModifier()
Definition: PortCore.cpp:3142
void setName(const std::string &name)
Set the name of this port.
Definition: PortCore.cpp:2991
bool addOutput(const std::string &dest, void *id, yarp::os::OutputStream *os, bool onlyIfNeeded=false)
Add an output connection to this port.
Definition: PortCore.cpp:857
This is the heart of a yarp port.
Definition: PortCore.h:112
Lets Readable objects read from the underlying InputStream associated with the connection between two...
void reset(yarp::os::InputStream &in, TwoWayStream *str, const Route &route, size_t len, bool textMode, bool bareMode=false)
int join(double seconds=-1)
Definition: ThreadImpl.cpp:123
int setPriority(int priority=-1, int policy=-1)
Definition: ThreadImpl.cpp:249
#define yCError(component,...)
Definition: LogComponent.h:157
#define yCAssert(component, x)
Definition: LogComponent.h:172
#define yCTrace(component,...)
Definition: LogComponent.h:88
#define yCWarning(component,...)
Definition: LogComponent.h:146
#define yCDebug(component,...)
Definition: LogComponent.h:112
#define YARP_OS_LOG_COMPONENT(name, name_string)
Definition: LogComponent.h:37
std::string getEnvironment(const char *key, bool *found=nullptr)
Read a variable from the environment.
Definition: environment.h:31
std::int32_t vocab32_t
Definition: numeric.h:52
NetInt32 encode(const std::string &str)
Convert a string into a vocabulary identifier.
Definition: Vocab.cpp:14
The components from which ports and connections are built.
An interface to the operating system, including Port based communication.
int getpid()
Portable wrapper for the getppid() function.
Definition: Os.cpp:94
std::int32_t NetInt32
Definition of the NetInt32 type.
Definition: NetInt32.h:33
constexpr yarp::conf::vocab32_t createVocab(char a, char b=0, char c=0, char d=0)
Definition: Vocab.h:22
The main, catch-all namespace for YARP.
Definition: environment.h:18
The PlatformInfo struct holds the operating system information.
Definition: SystemInfo.h:84
The ProcessInfo struct provides the operating system process information.
Definition: SystemInfo.h:116
#define YARP_WARNING_POP
Ends a temporary alteration of the enabled warnings.
Definition: system.h:335
#define YARP_WARNING_PUSH
Starts a temporary alteration of the enabled warnings.
Definition: system.h:334
#define YARP_DISABLE_DEPRECATED_WARNING
Disable deprecated warnings in the following code.
Definition: system.h:336
#define YARP_UNUSED(var)
Definition: api.h:159