YARP
Yet Another Robot Platform
PortCore.h
Go to the documentation of this file.
1 /*
2  * Copyright (C) 2006-2020 Istituto Italiano di Tecnologia (IIT)
3  * Copyright (C) 2006-2010 RobotCub Consortium
4  * All rights reserved.
5  *
6  * This software may be modified and distributed under the terms of the
7  * BSD-3-Clause license. See the accompanying LICENSE file for details.
8  */
9 
10 #ifndef YARP_OS_IMPL_PORTCORE_H
11 #define YARP_OS_IMPL_PORTCORE_H
12 
13 #include <yarp/os/Carriers.h>
14 #include <yarp/os/Contact.h>
15 #include <yarp/os/Contactable.h>
17 #include <yarp/os/PortReader.h>
19 #include <yarp/os/PortReport.h>
20 #include <yarp/os/PortWriter.h>
21 #include <yarp/os/Property.h>
22 #include <yarp/os/Semaphore.h>
23 #include <yarp/os/Type.h>
24 #include <yarp/os/Vocab.h>
28 
29 #ifndef YARP_NO_DEPRECATED // since YARP 3.3
30 #define YARP_INCLUDING_DEPRECATED_HEADER_ON_PURPOSE
31 #include <yarp/os/Mutex.h>
32 #undef YARP_INCLUDING_DEPRECATED_HEADER_ON_PURPOSE
33 #endif
34 
35 #include <mutex>
36 #include <vector>
37 
38 namespace yarp {
39 namespace os {
40 namespace impl {
41 
42 class PortCoreUnit;
43 
44 #define PORTCORE_SEND_NORMAL (1)
45 #define PORTCORE_SEND_LOG (2)
46 
47 // some flags for restricting port behavior
48 #define PORTCORE_IS_NULL (0)
49 #define PORTCORE_IS_RPC (1)
50 #define PORTCORE_IS_INPUT (2)
51 #define PORTCORE_IS_OUTPUT (4)
52 
112 {
113 public:
115  outputModifier(nullptr),
116  inputModifier(nullptr)
117  {
118  }
119 
121  {
122  releaseOutModifier();
123  releaseInModifier();
124  }
125 
127  {
128  if (outputModifier != nullptr) {
129  outputModifier->close();
130  delete outputModifier;
131  outputModifier = nullptr;
132  }
133  }
134 
136  {
137  if (inputModifier != nullptr) {
138  inputModifier->close();
139  delete inputModifier;
140  inputModifier = nullptr;
141  }
142  }
143 
144 public:
147  std::mutex outputMutex;
148  std::mutex inputMutex;
149 };
150 
152  public ThreadImpl,
153  public yarp::os::PortReader
154 {
155 public:
160 
164  ~PortCore();
165 
175  bool addOutput(const std::string& dest,
176  void* id,
178  bool onlyIfNeeded = false);
179 
183  void addOutput(OutputProtocol* op);
184 
191  void removeInput(const std::string& src,
192  void* id,
194 
201  void removeOutput(const std::string& dest,
202  void* id,
204 
211  bool removeIO(const Route& route, bool synch = false);
212 
218  void describe(void* id, yarp::os::OutputStream* os);
219 
224  void describe(yarp::os::PortReport& reporter);
225 
232  bool readBlock(ConnectionReader& reader,
233  void* id,
235 
242  bool adminBlock(ConnectionReader& reader,
243  void* id);
244 
249  void setName(const std::string& name);
250 
254  std::string getName();
255 
261  void setEnvelope(const std::string& envelope);
262 
266  bool setEnvelope(yarp::os::PortWriter& envelope);
267 
268  std::string getEnvelope();
269 
273  bool getEnvelope(yarp::os::PortReader& envelope);
274 
281  void report(const yarp::os::PortInfo& info);
282 
290  void reportUnit(PortCoreUnit* unit, bool active);
291 
295  void setFlags(unsigned int flags)
296  {
297  this->m_flags = flags;
298  }
299 
300  void setContactable(Contactable* contactable)
301  {
302  this->m_contactable = contactable;
303  }
304 
308  unsigned int getFlags()
309  {
310  return m_flags;
311  }
312 
316  bool listen(const Contact& address, bool shouldAnnounce = true);
317 
321  bool isWriting();
322 
326  int getInputCount();
327 
331  int getOutputCount();
332 
336  void setReadHandler(yarp::os::PortReader& reader);
337 
341  void setAdminReadHandler(yarp::os::PortReader& reader);
342 
346  void setReadCreator(yarp::os::PortReaderCreator& creator);
347 
352  void setWaitBeforeSend(bool waitBeforeSend)
353  {
354  this->m_waitBeforeSend = waitBeforeSend;
355  }
356 
361  void setWaitAfterSend(bool waitAfterSend)
362  {
363  this->m_waitAfterSend = waitAfterSend;
364  }
365 
369  bool read(yarp::os::ConnectionReader& reader) override
370  {
371  // does nothing by default
372  YARP_UNUSED(reader);
373  return true;
374  }
375 
379  bool start() override;
380 
384  bool manualStart(const char* sourceName);
385 
392  bool send(const yarp::os::PortWriter& writer,
393  yarp::os::PortReader* reader = nullptr,
394  const yarp::os::PortWriter* callback = nullptr);
395 
402  bool sendHelper(const yarp::os::PortWriter& writer,
403  int mode,
404  yarp::os::PortReader* reader = nullptr,
405  const yarp::os::PortWriter* callback = nullptr);
406 
410  void close() override;
411 
415  void run() override;
416 
420  int getEventCount();
421 
425  const Contact& getAddress() const;
426 
427  void resetPortName(const std::string& str);
428 
432  yarp::os::PortReaderCreator* getReadCreator();
433 
437  void notifyCompletion(void* tracker);
438 
443  void setControlRegistration(bool flag);
444 
448  void interrupt();
449 
453  void resume();
454 
458  void setReportCallback(yarp::os::PortReport* reporter);
459 
463  void resetReportCallback();
464 
469  bool isListening() const;
470 
475  bool isManual() const;
476 
480  bool isInterrupted() const;
481 
482  void setTimeout(float timeout);
483 
484  Property* acquireProperties(bool readOnly);
485  void releaseProperties(Property* prop);
486 
487 #ifndef YARP_NO_DEPRECATED // since YARP 3.3
491  bool setCallbackLock(yarp::os::Mutex* mutex);
493 #endif // YARP_NO_DEPRECATED
494 
495  bool setCallbackLock(std::mutex* mutex = nullptr);
496 
497  bool removeCallbackLock();
498 
499  bool lockCallback();
500 
501  bool tryLockCallback();
502 
503  void unlockCallback();
504 
505  yarp::os::impl::PortDataModifier& getPortModifier();
506 
507  void checkType(PortReader& reader);
508 
509  yarp::os::Type getType();
510 
511  void promiseType(const Type& typ);
512 
513 private:
514  // main internal PortCore state and operations
515  std::vector<PortCoreUnit *> m_units;
516  yarp::os::Semaphore m_stateSemaphore {1};
517  std::mutex m_packetMutex;
518  yarp::os::Semaphore m_connectionChangeSemaphore {1};
519  Face* m_face {nullptr};
520  std::string m_name;
521  yarp::os::Contact m_address;
522  yarp::os::PortReader *m_reader {nullptr};
523  yarp::os::PortReader *m_adminReader {nullptr};
524  yarp::os::PortReaderCreator *m_readableCreator {nullptr};
525  yarp::os::PortReport *m_eventReporter {nullptr};
526  bool m_listening {false};
527  bool m_running {false};
528  bool m_starting {false};
529  bool m_closing {false};
530  bool m_finished {false};
531  bool m_finishing {false};
532  bool m_waitBeforeSend {true};
533  bool m_waitAfterSend {true};
534  bool m_controlRegistration {true};
535  bool m_interruptable {true};
536  bool m_interrupted {false};
537  bool m_manual {false};
538  int m_events {0};
539  int m_connectionListeners {0};
540  int m_inputCount {0};
541  int m_outputCount {0};
542  int m_dataOutputCount {0};
543  unsigned int m_flags {PORTCORE_IS_INPUT | PORTCORE_IS_OUTPUT};
544  bool m_logNeeded {false};
545  PortCorePackets m_packets {};
546  std::string m_envelope;
547  float m_timeout {-1};
548  int m_counter {1};
549  yarp::os::Property *m_prop {nullptr};
550  yarp::os::Contactable *m_contactable {nullptr};
551  std::mutex* m_mutex {nullptr};
552 #ifndef YARP_NO_DEPRECATED // since YARP 3.3
555  yarp::os::Mutex* m_old_mutex {nullptr};
557 #endif // YARP_NO_DEPRECATED
558  bool m_mutexOwned {false};
559  BufferedConnectionWriter m_envelopeWriter {true};
560 
561  std::mutex m_typeMutex;
562  bool m_checkedType {false};
563  Type m_type;
564 
565  // port data modifier
567 
568  // set IP packet TOS
569  bool setTypeOfService(PortCoreUnit* unit, int tos);
570 
571  // get IP packet TOS
572  int getTypeOfService(PortCoreUnit* unit);
573 
574  // set the scheduling properties of all threads
575  // within the process scope.
576  bool setProcessSchedulingParam(int priority = -1, int policy = -1);
577 
578  // attach a portmonitor plugin to the port or to a specific connection
579  bool attachPortMonitor(yarp::os::Property& prop, bool isOutput, std::string& errMsg);
580 
581  // detach the portmonitor from the port or specific connection
582  bool detachPortMonitor(bool isOutput);
583 
584  // set the parameter for the portmonitor of the port (if any)
585  bool setParamPortMonitor(const yarp::os::Property& param, bool isOutput, std::string& errMsg);
586 
587  // get the parameters from the portmonitor of the port (if any)
588  bool getParamPortMonitor(yarp::os::Property& param, bool isOutput, std::string& errMsg);
589 
590  void closeMain();
591 
592  bool isUnit(const Route& route, int index);
593 
594  // only called in "finished" phase
595  void closeUnits();
596 
597  // called anytime, garbage collects terminated units
598  void cleanUnits(bool blocking = true);
599 
600  // only called by the manager
601  void reapUnits();
602 
603  // only called in "running" phase
604  void addInput(InputProtocol* ip);
605 
606  bool removeUnit(const Route& route, bool synch = false, bool* except = nullptr);
607 
608  int getNextIndex();
609 };
610 
611 } // namespace impl
612 } // namespace os
613 } // namespace yarp
614 
615 #endif // YARP_OS_IMPL_PORTCORE_H
#define PORTCORE_IS_INPUT
Definition: PortCore.h:50
#define PORTCORE_IS_OUTPUT
Definition: PortCore.h:51
A base class for connection types (tcp, mcast, shmem, ...) which are called carriers in YARP.
Definition: Carrier.h:48
An interface for reading from a network connection.
Represents how to reach a part of a YARP network.
Definition: Contact.h:39
An abstract port.
Definition: Contactable.h:38
Basic wrapper for mutual exclusion.
Definition: Mutex.h:35
The output side of an active connection between two ports.
Simple specification of the minimum functions needed from output streams.
Definition: OutputStream.h:25
Information about a port connection or event.
Definition: PortInfo.h:29
A creator for readers.
Interface implemented by all objects that can read themselves from the network, such as Bottle object...
Definition: PortReader.h:28
A base class for objects that want information about port status changes.
Definition: PortReport.h:31
Interface implemented by all objects that can write themselves to the network, such as Bottle objects...
Definition: PortWriter.h:27
A class for storing options and configuration information.
Definition: Property.h:37
Information about a connection between two ports.
Definition: Route.h:32
A class for thread synchronization and mutual exclusion.
Definition: Semaphore.h:29
This manages a single threaded resource related to a single input or output connection.
Definition: PortCoreUnit.h:30
void setFlags(unsigned int flags)
Configure the port to meet certain restrictions in behavior.
Definition: PortCore.h:295
void setWaitAfterSend(bool waitAfterSend)
After sending a message, should we wait for it to be sent to all destinations before returning?
Definition: PortCore.h:361
void setWaitBeforeSend(bool waitBeforeSend)
Upon being asked to send a message, should we wait for any existing message to be sent to all destina...
Definition: PortCore.h:352
void setContactable(Contactable *contactable)
Definition: PortCore.h:300
bool read(yarp::os::ConnectionReader &reader) override
Callback for data.
Definition: PortCore.h:369
unsigned int getFlags()
Check current configuration of port.
Definition: PortCore.h:308
This is the heart of a yarp port.
Definition: PortCore.h:112
yarp::os::Carrier * outputModifier
Definition: PortCore.h:145
yarp::os::Carrier * inputModifier
Definition: PortCore.h:146
An abstraction for a thread of execution.
Definition: ThreadImpl.h:26
#define YARP_DEPRECATED
Expands to either the standard [[deprecated]] attribute or a compiler-specific decorator such as __at...
Definition: compiler.h:2882
The main, catch-all namespace for YARP.
Definition: environment.h:18
#define YARP_WARNING_POP
Ends a temporary alteration of the enabled warnings.
Definition: system.h:335
#define YARP_WARNING_PUSH
Starts a temporary alteration of the enabled warnings.
Definition: system.h:334
#define YARP_DISABLE_DEPRECATED_WARNING
Disable deprecated warnings in the following code.
Definition: system.h:336
#define YARP_UNUSED(var)
Definition: api.h:159
#define YARP_os_impl_API
Definition: api.h:45