YARP
Yet Another Robot Platform
PortCore.h
Go to the documentation of this file.
1 /*
2  * SPDX-FileCopyrightText: 2006-2021 Istituto Italiano di Tecnologia (IIT)
3  * SPDX-FileCopyrightText: 2006-2010 RobotCub Consortium
4  * SPDX-License-Identifier: BSD-3-Clause
5  */
6 
7 #ifndef YARP_OS_IMPL_PORTCORE_H
8 #define YARP_OS_IMPL_PORTCORE_H
9 
10 #include <yarp/os/Carriers.h>
11 #include <yarp/os/Contact.h>
12 #include <yarp/os/Contactable.h>
14 #include <yarp/os/PortReader.h>
16 #include <yarp/os/PortReport.h>
17 #include <yarp/os/PortWriter.h>
18 #include <yarp/os/Property.h>
19 #include <yarp/os/Type.h>
20 #include <yarp/os/Vocab.h>
24 
25 #ifndef YARP_NO_DEPRECATED // since YARP 3.3
26 #define YARP_INCLUDING_DEPRECATED_HEADER_ON_PURPOSE
27 #include <yarp/os/Mutex.h>
28 #undef YARP_INCLUDING_DEPRECATED_HEADER_ON_PURPOSE
29 #endif
30 
31 #include <atomic>
32 #include <condition_variable>
33 #include <mutex>
34 #include <vector>
35 
36 namespace yarp::os::impl {
37 
38 class PortCoreUnit;
39 
40 #define PORTCORE_SEND_NORMAL (1)
41 #define PORTCORE_SEND_LOG (2)
42 
43 // some flags for restricting port behavior
44 #define PORTCORE_IS_NULL (0)
45 #define PORTCORE_IS_RPC (1)
46 #define PORTCORE_IS_INPUT (2)
47 #define PORTCORE_IS_OUTPUT (4)
48 
108 {
109 public:
111  outputModifier(nullptr),
112  inputModifier(nullptr)
113  {
114  }
115 
117  {
118  releaseOutModifier();
119  releaseInModifier();
120  }
121 
123  {
124  if (outputModifier != nullptr) {
125  outputModifier->close();
126  delete outputModifier;
127  outputModifier = nullptr;
128  }
129  }
130 
132  {
133  if (inputModifier != nullptr) {
134  inputModifier->close();
135  delete inputModifier;
136  inputModifier = nullptr;
137  }
138  }
139 
140 public:
143  std::mutex outputMutex;
144  std::mutex inputMutex;
145 };
146 
148  public ThreadImpl,
149  public yarp::os::PortReader
150 {
151 public:
156 
160  ~PortCore();
161 
171  bool addOutput(const std::string& dest,
172  void* id,
174  bool onlyIfNeeded = false);
175 
179  void addOutput(OutputProtocol* op);
180 
187  void removeInput(const std::string& src,
188  void* id,
190 
197  void removeOutput(const std::string& dest,
198  void* id,
200 
207  bool removeIO(const Route& route, bool synch = false);
208 
214  void describe(void* id, yarp::os::OutputStream* os);
215 
220  void describe(yarp::os::PortReport& reporter);
221 
228  bool readBlock(ConnectionReader& reader,
229  void* id,
231 
238  bool adminBlock(ConnectionReader& reader,
239  void* id);
240 
245  void setName(const std::string& name);
246 
250  std::string getName();
251 
257  void setEnvelope(const std::string& envelope);
258 
262  bool setEnvelope(yarp::os::PortWriter& envelope);
263 
264  std::string getEnvelope();
265 
269  bool getEnvelope(yarp::os::PortReader& envelope);
270 
277  void report(const yarp::os::PortInfo& info);
278 
286  void reportUnit(PortCoreUnit* unit, bool active);
287 
291  void setFlags(unsigned int flags)
292  {
293  this->m_flags = flags;
294  }
295 
296  void setContactable(Contactable* contactable)
297  {
298  this->m_contactable = contactable;
299  }
300 
304  unsigned int getFlags()
305  {
306  return m_flags;
307  }
308 
312  bool listen(const Contact& address, bool shouldAnnounce = true);
313 
317  bool isWriting();
318 
322  int getInputCount();
323 
327  int getOutputCount();
328 
332  void setReadHandler(yarp::os::PortReader& reader);
333 
337  void setAdminReadHandler(yarp::os::PortReader& reader);
338 
342  void setReadCreator(yarp::os::PortReaderCreator& creator);
343 
348  void setWaitBeforeSend(bool waitBeforeSend)
349  {
350  this->m_waitBeforeSend = waitBeforeSend;
351  }
352 
357  void setWaitAfterSend(bool waitAfterSend)
358  {
359  this->m_waitAfterSend = waitAfterSend;
360  }
361 
365  bool read(yarp::os::ConnectionReader& reader) override
366  {
367  // does nothing by default
368  YARP_UNUSED(reader);
369  return true;
370  }
371 
375  bool start() override;
376 
380  bool manualStart(const char* sourceName);
381 
388  bool send(const yarp::os::PortWriter& writer,
389  yarp::os::PortReader* reader = nullptr,
390  const yarp::os::PortWriter* callback = nullptr);
391 
398  bool sendHelper(const yarp::os::PortWriter& writer,
399  int mode,
400  yarp::os::PortReader* reader = nullptr,
401  const yarp::os::PortWriter* callback = nullptr);
402 
406  void close() override;
407 
411  void run() override;
412 
416  int getEventCount();
417 
421  const Contact& getAddress() const;
422 
423  void resetPortName(const std::string& str);
424 
428  yarp::os::PortReaderCreator* getReadCreator();
429 
433  void notifyCompletion(void* tracker);
434 
439  void setControlRegistration(bool flag);
440 
444  void interrupt();
445 
449  void resume();
450 
454  void setReportCallback(yarp::os::PortReport* reporter);
455 
459  void resetReportCallback();
460 
465  bool isListening() const;
466 
471  bool isManual() const;
472 
476  bool isInterrupted() const;
477 
478  void setTimeout(float timeout);
479 
483  Property* acquireProperties(bool readOnly);
484 
488  void releaseProperties(Property* prop);
489 
490 #ifndef YARP_NO_DEPRECATED // since YARP 3.3
494  bool setCallbackLock(yarp::os::Mutex* mutex);
496 #endif // YARP_NO_DEPRECATED
497 
498  bool setCallbackLock(std::mutex* mutex = nullptr);
499 
500  bool removeCallbackLock();
501 
502  bool lockCallback();
503 
504  bool tryLockCallback();
505 
506  void unlockCallback();
507 
508  yarp::os::impl::PortDataModifier& getPortModifier();
509 
510  void checkType(PortReader& reader);
511 
512  yarp::os::Type getType();
513 
514  void promiseType(const Type& typ);
515 
516 private:
517  // main internal PortCore state and operations
518  std::vector<PortCoreUnit *> m_units;
519  std::mutex m_stateMutex;
520  std::condition_variable m_stateCv;
521  std::mutex m_packetMutex;
522  std::condition_variable m_connectionChangeCv;
523  Face* m_face {nullptr};
524  std::string m_name;
525  yarp::os::Contact m_address;
526  yarp::os::PortReader *m_reader {nullptr};
527  yarp::os::PortReader *m_adminReader {nullptr};
528  yarp::os::PortReaderCreator *m_readableCreator {nullptr};
529  yarp::os::PortReport *m_eventReporter {nullptr};
530  std::atomic<bool> m_listening {false};
531  std::atomic<bool> m_running {false};
532  std::atomic<bool> m_starting {false};
533  std::atomic<bool> m_closing {false};
534  std::atomic<bool> m_finished {false};
535  bool m_finishing {false};
536  bool m_waitBeforeSend {true};
537  bool m_waitAfterSend {true};
538  bool m_controlRegistration {true};
539  bool m_interruptable {true};
540  bool m_interrupted {false};
541  bool m_manual {false};
542  int m_events {0};
543  int m_connectionListeners {0};
544  int m_inputCount {0};
545  int m_outputCount {0};
546  int m_dataOutputCount {0};
547  unsigned int m_flags {PORTCORE_IS_INPUT | PORTCORE_IS_OUTPUT};
548  bool m_logNeeded {false};
549  PortCorePackets m_packets {};
550  std::string m_envelope;
551  float m_timeout {-1};
552  int m_counter {1};
553  yarp::os::Property *m_prop {nullptr};
554  yarp::os::Contactable *m_contactable {nullptr};
555  std::mutex* m_mutex {nullptr};
556 #ifndef YARP_NO_DEPRECATED // since YARP 3.3
559  yarp::os::Mutex* m_old_mutex {nullptr};
561 #endif // YARP_NO_DEPRECATED
562  bool m_mutexOwned {false};
563  BufferedConnectionWriter m_envelopeWriter {true};
564 
565  std::mutex m_typeMutex;
566  bool m_checkedType {false};
567  Type m_type;
568 
569  // port data modifier
571 
572  // set IP packet TOS
573  bool setTypeOfService(PortCoreUnit* unit, int tos);
574 
575  // get IP packet TOS
576  int getTypeOfService(PortCoreUnit* unit);
577 
578  // set the scheduling properties of all threads
579  // within the process scope.
580  bool setProcessSchedulingParam(int priority = -1, int policy = -1);
581 
582  // attach a portmonitor plugin to the port or to a specific connection
583  bool attachPortMonitor(yarp::os::Property& prop, bool isOutput, std::string& errMsg);
584 
585  // detach the portmonitor from the port or specific connection
586  bool detachPortMonitor(bool isOutput);
587 
588  // set the parameter for the portmonitor of the port (if any)
589  bool setParamPortMonitor(const yarp::os::Property& param, bool isOutput, std::string& errMsg);
590 
591  // get the parameters from the portmonitor of the port (if any)
592  bool getParamPortMonitor(yarp::os::Property& param, bool isOutput, std::string& errMsg);
593 
594  void closeMain();
595 
596  bool isUnit(const Route& route, int index);
597 
598  // only called in "finished" phase
599  void closeUnits();
600 
601  // called anytime, garbage collects terminated units
602  void cleanUnits(bool blocking = true);
603 
604  // only called by the manager
605  void reapUnits();
606 
607  // only called in "running" phase
608  void addInput(InputProtocol* ip);
609 
610  bool removeUnit(const Route& route, bool synch = false, bool* except = nullptr);
611 
612  int getNextIndex();
613 };
614 
615 } // namespace yarp::os::impl
616 
617 #endif // YARP_OS_IMPL_PORTCORE_H
#define PORTCORE_IS_INPUT
Definition: PortCore.h:46
#define PORTCORE_IS_OUTPUT
Definition: PortCore.h:47
A base class for connection types (tcp, mcast, shmem, ...) which are called carriers in YARP.
Definition: Carrier.h:44
An interface for reading from a network connection.
Represents how to reach a part of a YARP network.
Definition: Contact.h:33
An abstract port.
Definition: Contactable.h:34
The initial point-of-contact with a port.
Definition: Face.h:20
Basic wrapper for mutual exclusion.
Definition: Mutex.h:31
The output side of an active connection between two ports.
Simple specification of the minimum functions needed from output streams.
Definition: OutputStream.h:21
Information about a port connection or event.
Definition: PortInfo.h:25
A creator for readers.
Interface implemented by all objects that can read themselves from the network, such as Bottle object...
Definition: PortReader.h:24
A base class for objects that want information about port status changes.
Definition: PortReport.h:25
Interface implemented by all objects that can write themselves to the network, such as Bottle objects...
Definition: PortWriter.h:23
A class for storing options and configuration information.
Definition: Property.h:33
Information about a connection between two ports.
Definition: Route.h:28
This manages a single threaded resource related to a single input or output connection.
Definition: PortCoreUnit.h:25
void setFlags(unsigned int flags)
Configure the port to meet certain restrictions in behavior.
Definition: PortCore.h:291
void setWaitAfterSend(bool waitAfterSend)
After sending a message, should we wait for it to be sent to all destinations before returning?
Definition: PortCore.h:357
void setWaitBeforeSend(bool waitBeforeSend)
Upon being asked to send a message, should we wait for any existing message to be sent to all destina...
Definition: PortCore.h:348
void setContactable(Contactable *contactable)
Definition: PortCore.h:296
bool read(yarp::os::ConnectionReader &reader) override
Callback for data.
Definition: PortCore.h:365
unsigned int getFlags()
Check current configuration of port.
Definition: PortCore.h:304
This is the heart of a yarp port.
Definition: PortCore.h:108
yarp::os::Carrier * outputModifier
Definition: PortCore.h:141
yarp::os::Carrier * inputModifier
Definition: PortCore.h:142
An abstraction for a thread of execution.
Definition: ThreadImpl.h:21
#define YARP_DEPRECATED
Expands to either the standard [[deprecated]] attribute or a compiler-specific decorator such as __at...
Definition: compiler.h:2884
The components from which ports and connections are built.
#define YARP_WARNING_POP
Ends a temporary alteration of the enabled warnings.
Definition: system.h:334
#define YARP_WARNING_PUSH
Starts a temporary alteration of the enabled warnings.
Definition: system.h:333
#define YARP_DISABLE_DEPRECATED_WARNING
Disable deprecated warnings in the following code.
Definition: system.h:335
#define YARP_UNUSED(var)
Definition: api.h:162
#define YARP_os_impl_API
Definition: api.h:46