YARP
Yet Another Robot Platform
PortCore.h
Go to the documentation of this file.
1 /*
2  * SPDX-FileCopyrightText: 2006-2021 Istituto Italiano di Tecnologia (IIT)
3  * SPDX-FileCopyrightText: 2006-2010 RobotCub Consortium
4  * SPDX-License-Identifier: BSD-3-Clause
5  */
6 
7 #ifndef YARP_OS_IMPL_PORTCORE_H
8 #define YARP_OS_IMPL_PORTCORE_H
9 
10 #include <yarp/os/Carriers.h>
11 #include <yarp/os/Contact.h>
12 #include <yarp/os/Contactable.h>
14 #include <yarp/os/PortReader.h>
16 #include <yarp/os/PortReport.h>
17 #include <yarp/os/PortWriter.h>
18 #include <yarp/os/Property.h>
19 #include <yarp/os/Type.h>
20 #include <yarp/os/Vocab.h>
24 
25 #ifndef YARP_NO_DEPRECATED // since YARP 3.3
26 #define YARP_INCLUDING_DEPRECATED_HEADER_ON_PURPOSE
27 #include <yarp/os/Mutex.h>
28 #undef YARP_INCLUDING_DEPRECATED_HEADER_ON_PURPOSE
29 #endif
30 
31 #include <atomic>
32 #include <condition_variable>
33 #include <mutex>
34 #include <vector>
35 
36 namespace yarp {
37 namespace os {
38 namespace impl {
39 
40 class PortCoreUnit;
41 
42 #define PORTCORE_SEND_NORMAL (1)
43 #define PORTCORE_SEND_LOG (2)
44 
45 // some flags for restricting port behavior
46 #define PORTCORE_IS_NULL (0)
47 #define PORTCORE_IS_RPC (1)
48 #define PORTCORE_IS_INPUT (2)
49 #define PORTCORE_IS_OUTPUT (4)
50 
110 {
111 public:
113  outputModifier(nullptr),
114  inputModifier(nullptr)
115  {
116  }
117 
119  {
120  releaseOutModifier();
121  releaseInModifier();
122  }
123 
125  {
126  if (outputModifier != nullptr) {
127  outputModifier->close();
128  delete outputModifier;
129  outputModifier = nullptr;
130  }
131  }
132 
134  {
135  if (inputModifier != nullptr) {
136  inputModifier->close();
137  delete inputModifier;
138  inputModifier = nullptr;
139  }
140  }
141 
142 public:
145  std::mutex outputMutex;
146  std::mutex inputMutex;
147 };
148 
150  public ThreadImpl,
151  public yarp::os::PortReader
152 {
153 public:
158 
162  ~PortCore();
163 
173  bool addOutput(const std::string& dest,
174  void* id,
176  bool onlyIfNeeded = false);
177 
181  void addOutput(OutputProtocol* op);
182 
189  void removeInput(const std::string& src,
190  void* id,
192 
199  void removeOutput(const std::string& dest,
200  void* id,
202 
209  bool removeIO(const Route& route, bool synch = false);
210 
216  void describe(void* id, yarp::os::OutputStream* os);
217 
222  void describe(yarp::os::PortReport& reporter);
223 
230  bool readBlock(ConnectionReader& reader,
231  void* id,
233 
240  bool adminBlock(ConnectionReader& reader,
241  void* id);
242 
247  void setName(const std::string& name);
248 
252  std::string getName();
253 
259  void setEnvelope(const std::string& envelope);
260 
264  bool setEnvelope(yarp::os::PortWriter& envelope);
265 
266  std::string getEnvelope();
267 
271  bool getEnvelope(yarp::os::PortReader& envelope);
272 
279  void report(const yarp::os::PortInfo& info);
280 
288  void reportUnit(PortCoreUnit* unit, bool active);
289 
293  void setFlags(unsigned int flags)
294  {
295  this->m_flags = flags;
296  }
297 
298  void setContactable(Contactable* contactable)
299  {
300  this->m_contactable = contactable;
301  }
302 
306  unsigned int getFlags()
307  {
308  return m_flags;
309  }
310 
314  bool listen(const Contact& address, bool shouldAnnounce = true);
315 
319  bool isWriting();
320 
324  int getInputCount();
325 
329  int getOutputCount();
330 
334  void setReadHandler(yarp::os::PortReader& reader);
335 
339  void setAdminReadHandler(yarp::os::PortReader& reader);
340 
344  void setReadCreator(yarp::os::PortReaderCreator& creator);
345 
350  void setWaitBeforeSend(bool waitBeforeSend)
351  {
352  this->m_waitBeforeSend = waitBeforeSend;
353  }
354 
359  void setWaitAfterSend(bool waitAfterSend)
360  {
361  this->m_waitAfterSend = waitAfterSend;
362  }
363 
367  bool read(yarp::os::ConnectionReader& reader) override
368  {
369  // does nothing by default
370  YARP_UNUSED(reader);
371  return true;
372  }
373 
377  bool start() override;
378 
382  bool manualStart(const char* sourceName);
383 
390  bool send(const yarp::os::PortWriter& writer,
391  yarp::os::PortReader* reader = nullptr,
392  const yarp::os::PortWriter* callback = nullptr);
393 
400  bool sendHelper(const yarp::os::PortWriter& writer,
401  int mode,
402  yarp::os::PortReader* reader = nullptr,
403  const yarp::os::PortWriter* callback = nullptr);
404 
408  void close() override;
409 
413  void run() override;
414 
418  int getEventCount();
419 
423  const Contact& getAddress() const;
424 
425  void resetPortName(const std::string& str);
426 
430  yarp::os::PortReaderCreator* getReadCreator();
431 
435  void notifyCompletion(void* tracker);
436 
441  void setControlRegistration(bool flag);
442 
446  void interrupt();
447 
451  void resume();
452 
456  void setReportCallback(yarp::os::PortReport* reporter);
457 
461  void resetReportCallback();
462 
467  bool isListening() const;
468 
473  bool isManual() const;
474 
478  bool isInterrupted() const;
479 
480  void setTimeout(float timeout);
481 
485  Property* acquireProperties(bool readOnly);
486 
490  void releaseProperties(Property* prop);
491 
492 #ifndef YARP_NO_DEPRECATED // since YARP 3.3
496  bool setCallbackLock(yarp::os::Mutex* mutex);
498 #endif // YARP_NO_DEPRECATED
499 
500  bool setCallbackLock(std::mutex* mutex = nullptr);
501 
502  bool removeCallbackLock();
503 
504  bool lockCallback();
505 
506  bool tryLockCallback();
507 
508  void unlockCallback();
509 
510  yarp::os::impl::PortDataModifier& getPortModifier();
511 
512  void checkType(PortReader& reader);
513 
514  yarp::os::Type getType();
515 
516  void promiseType(const Type& typ);
517 
518 private:
519  // main internal PortCore state and operations
520  std::vector<PortCoreUnit *> m_units;
521  std::mutex m_stateMutex;
522  std::condition_variable m_stateCv;
523  std::mutex m_packetMutex;
524  std::condition_variable m_connectionChangeCv;
525  Face* m_face {nullptr};
526  std::string m_name;
527  yarp::os::Contact m_address;
528  yarp::os::PortReader *m_reader {nullptr};
529  yarp::os::PortReader *m_adminReader {nullptr};
530  yarp::os::PortReaderCreator *m_readableCreator {nullptr};
531  yarp::os::PortReport *m_eventReporter {nullptr};
532  std::atomic<bool> m_listening {false};
533  std::atomic<bool> m_running {false};
534  std::atomic<bool> m_starting {false};
535  std::atomic<bool> m_closing {false};
536  std::atomic<bool> m_finished {false};
537  bool m_finishing {false};
538  bool m_waitBeforeSend {true};
539  bool m_waitAfterSend {true};
540  bool m_controlRegistration {true};
541  bool m_interruptable {true};
542  bool m_interrupted {false};
543  bool m_manual {false};
544  int m_events {0};
545  int m_connectionListeners {0};
546  int m_inputCount {0};
547  int m_outputCount {0};
548  int m_dataOutputCount {0};
549  unsigned int m_flags {PORTCORE_IS_INPUT | PORTCORE_IS_OUTPUT};
550  bool m_logNeeded {false};
551  PortCorePackets m_packets {};
552  std::string m_envelope;
553  float m_timeout {-1};
554  int m_counter {1};
555  yarp::os::Property *m_prop {nullptr};
556  yarp::os::Contactable *m_contactable {nullptr};
557  std::mutex* m_mutex {nullptr};
558 #ifndef YARP_NO_DEPRECATED // since YARP 3.3
561  yarp::os::Mutex* m_old_mutex {nullptr};
563 #endif // YARP_NO_DEPRECATED
564  bool m_mutexOwned {false};
565  BufferedConnectionWriter m_envelopeWriter {true};
566 
567  std::mutex m_typeMutex;
568  bool m_checkedType {false};
569  Type m_type;
570 
571  // port data modifier
573 
574  // set IP packet TOS
575  bool setTypeOfService(PortCoreUnit* unit, int tos);
576 
577  // get IP packet TOS
578  int getTypeOfService(PortCoreUnit* unit);
579 
580  // set the scheduling properties of all threads
581  // within the process scope.
582  bool setProcessSchedulingParam(int priority = -1, int policy = -1);
583 
584  // attach a portmonitor plugin to the port or to a specific connection
585  bool attachPortMonitor(yarp::os::Property& prop, bool isOutput, std::string& errMsg);
586 
587  // detach the portmonitor from the port or specific connection
588  bool detachPortMonitor(bool isOutput);
589 
590  // set the parameter for the portmonitor of the port (if any)
591  bool setParamPortMonitor(const yarp::os::Property& param, bool isOutput, std::string& errMsg);
592 
593  // get the parameters from the portmonitor of the port (if any)
594  bool getParamPortMonitor(yarp::os::Property& param, bool isOutput, std::string& errMsg);
595 
596  void closeMain();
597 
598  bool isUnit(const Route& route, int index);
599 
600  // only called in "finished" phase
601  void closeUnits();
602 
603  // called anytime, garbage collects terminated units
604  void cleanUnits(bool blocking = true);
605 
606  // only called by the manager
607  void reapUnits();
608 
609  // only called in "running" phase
610  void addInput(InputProtocol* ip);
611 
612  bool removeUnit(const Route& route, bool synch = false, bool* except = nullptr);
613 
614  int getNextIndex();
615 };
616 
617 } // namespace impl
618 } // namespace os
619 } // namespace yarp
620 
621 #endif // YARP_OS_IMPL_PORTCORE_H
#define PORTCORE_IS_INPUT
Definition: PortCore.h:48
#define PORTCORE_IS_OUTPUT
Definition: PortCore.h:49
A base class for connection types (tcp, mcast, shmem, ...) which are called carriers in YARP.
Definition: Carrier.h:45
An interface for reading from a network connection.
Represents how to reach a part of a YARP network.
Definition: Contact.h:36
An abstract port.
Definition: Contactable.h:35
The initial point-of-contact with a port.
Definition: Face.h:21
Basic wrapper for mutual exclusion.
Definition: Mutex.h:32
The output side of an active connection between two ports.
Simple specification of the minimum functions needed from output streams.
Definition: OutputStream.h:22
Information about a port connection or event.
Definition: PortInfo.h:26
A creator for readers.
Interface implemented by all objects that can read themselves from the network, such as Bottle object...
Definition: PortReader.h:25
A base class for objects that want information about port status changes.
Definition: PortReport.h:28
Interface implemented by all objects that can write themselves to the network, such as Bottle objects...
Definition: PortWriter.h:24
A class for storing options and configuration information.
Definition: Property.h:34
Information about a connection between two ports.
Definition: Route.h:29
This manages a single threaded resource related to a single input or output connection.
Definition: PortCoreUnit.h:27
void setFlags(unsigned int flags)
Configure the port to meet certain restrictions in behavior.
Definition: PortCore.h:293
void setWaitAfterSend(bool waitAfterSend)
After sending a message, should we wait for it to be sent to all destinations before returning?
Definition: PortCore.h:359
void setWaitBeforeSend(bool waitBeforeSend)
Upon being asked to send a message, should we wait for any existing message to be sent to all destina...
Definition: PortCore.h:350
void setContactable(Contactable *contactable)
Definition: PortCore.h:298
bool read(yarp::os::ConnectionReader &reader) override
Callback for data.
Definition: PortCore.h:367
unsigned int getFlags()
Check current configuration of port.
Definition: PortCore.h:306
This is the heart of a yarp port.
Definition: PortCore.h:110
yarp::os::Carrier * outputModifier
Definition: PortCore.h:143
yarp::os::Carrier * inputModifier
Definition: PortCore.h:144
An abstraction for a thread of execution.
Definition: ThreadImpl.h:23
#define YARP_DEPRECATED
Expands to either the standard [[deprecated]] attribute or a compiler-specific decorator such as __at...
Definition: compiler.h:2884
The main, catch-all namespace for YARP.
Definition: dirs.h:16
#define YARP_WARNING_POP
Ends a temporary alteration of the enabled warnings.
Definition: system.h:332
#define YARP_WARNING_PUSH
Starts a temporary alteration of the enabled warnings.
Definition: system.h:331
#define YARP_DISABLE_DEPRECATED_WARNING
Disable deprecated warnings in the following code.
Definition: system.h:333
#define YARP_UNUSED(var)
Definition: api.h:162
#define YARP_os_impl_API
Definition: api.h:46