YARP
Yet Another Robot Platform
PortCore.h
Go to the documentation of this file.
1/*
2 * SPDX-FileCopyrightText: 2006-2021 Istituto Italiano di Tecnologia (IIT)
3 * SPDX-FileCopyrightText: 2006-2010 RobotCub Consortium
4 * SPDX-License-Identifier: BSD-3-Clause
5 */
6
7#ifndef YARP_OS_IMPL_PORTCORE_H
8#define YARP_OS_IMPL_PORTCORE_H
9
10#include <yarp/os/Carriers.h>
11#include <yarp/os/Contact.h>
12#include <yarp/os/Contactable.h>
14#include <yarp/os/PortReader.h>
16#include <yarp/os/PortReport.h>
17#include <yarp/os/PortWriter.h>
18#include <yarp/os/Property.h>
19#include <yarp/os/Type.h>
20#include <yarp/os/Vocab.h>
24
25#ifndef YARP_NO_DEPRECATED // since YARP 3.3
26#define YARP_INCLUDING_DEPRECATED_HEADER_ON_PURPOSE
27#include <yarp/os/Mutex.h>
28#undef YARP_INCLUDING_DEPRECATED_HEADER_ON_PURPOSE
29#endif
30
31#include <atomic>
32#include <condition_variable>
33#include <mutex>
34#include <vector>
35
36namespace yarp::os::impl {
37
38class PortCoreUnit;
39
40#define PORTCORE_SEND_NORMAL (1)
41#define PORTCORE_SEND_LOG (2)
42
43// some flags for restricting port behavior
44#define PORTCORE_IS_NULL (0)
45#define PORTCORE_IS_RPC (1)
46#define PORTCORE_IS_INPUT (2)
47#define PORTCORE_IS_OUTPUT (4)
48
108{
109public:
111 outputModifier(nullptr),
112 inputModifier(nullptr)
113 {
114 }
115
117 {
118 releaseOutModifier();
119 releaseInModifier();
120 }
121
123 {
124 if (outputModifier != nullptr) {
125 outputModifier->close();
126 delete outputModifier;
127 outputModifier = nullptr;
128 }
129 }
130
132 {
133 if (inputModifier != nullptr) {
134 inputModifier->close();
135 delete inputModifier;
136 inputModifier = nullptr;
137 }
138 }
139
140public:
143 std::mutex outputMutex;
144 std::mutex inputMutex;
145};
146
148 public ThreadImpl,
150{
151public:
156
160 ~PortCore();
161
171 bool addOutput(const std::string& dest,
172 void* id,
174 bool onlyIfNeeded = false);
175
179 void addOutput(OutputProtocol* op);
180
187 void removeInput(const std::string& src,
188 void* id,
190
197 void removeOutput(const std::string& dest,
198 void* id,
200
207 bool removeIO(const Route& route, bool synch = false);
208
214 void describe(void* id, yarp::os::OutputStream* os);
215
220 void describe(yarp::os::PortReport& reporter);
221
228 bool readBlock(ConnectionReader& reader,
229 void* id,
231
238 bool adminBlock(ConnectionReader& reader,
239 void* id);
240
245 void setName(const std::string& name);
246
250 std::string getName();
251
257 void setEnvelope(const std::string& envelope);
258
262 bool setEnvelope(yarp::os::PortWriter& envelope);
263
264 std::string getEnvelope();
265
269 bool getEnvelope(yarp::os::PortReader& envelope);
270
277 void report(const yarp::os::PortInfo& info);
278
286 void reportUnit(PortCoreUnit* unit, bool active);
287
291 void setFlags(unsigned int flags)
292 {
293 this->m_flags = flags;
294 }
295
296 void setContactable(Contactable* contactable)
297 {
298 this->m_contactable = contactable;
299 }
300
304 unsigned int getFlags()
305 {
306 return m_flags;
307 }
308
312 bool listen(const Contact& address, bool shouldAnnounce = true);
313
317 bool isWriting();
318
322 int getInputCount();
323
327 int getOutputCount();
328
332 void setReadHandler(yarp::os::PortReader& reader);
333
337 void setAdminReadHandler(yarp::os::PortReader& reader);
338
342 void setReadCreator(yarp::os::PortReaderCreator& creator);
343
348 void setWaitBeforeSend(bool waitBeforeSend)
349 {
350 this->m_waitBeforeSend = waitBeforeSend;
351 }
352
357 void setWaitAfterSend(bool waitAfterSend)
358 {
359 this->m_waitAfterSend = waitAfterSend;
360 }
361
365 bool read(yarp::os::ConnectionReader& reader) override
366 {
367 // does nothing by default
368 YARP_UNUSED(reader);
369 return true;
370 }
371
375 bool start() override;
376
380 bool manualStart(const char* sourceName);
381
388 bool send(const yarp::os::PortWriter& writer,
389 yarp::os::PortReader* reader = nullptr,
390 const yarp::os::PortWriter* callback = nullptr);
391
398 bool sendHelper(const yarp::os::PortWriter& writer,
399 int mode,
400 yarp::os::PortReader* reader = nullptr,
401 const yarp::os::PortWriter* callback = nullptr);
402
406 void close() override;
407
411 void run() override;
412
416 int getEventCount();
417
421 const Contact& getAddress() const;
422
423 void resetPortName(const std::string& str);
424
428 yarp::os::PortReaderCreator* getReadCreator();
429
433 void notifyCompletion(void* tracker);
434
439 void setControlRegistration(bool flag);
440
444 void interrupt();
445
449 void resume();
450
454 void setReportCallback(yarp::os::PortReport* reporter);
455
459 void resetReportCallback();
460
465 bool isListening() const;
466
471 bool isManual() const;
472
476 bool isInterrupted() const;
477
478 void setTimeout(float timeout);
479
483 Property* acquireProperties(bool readOnly);
484
488 void releaseProperties(Property* prop);
489
490#ifndef YARP_NO_DEPRECATED // since YARP 3.3
494 bool setCallbackLock(yarp::os::Mutex* mutex);
496#endif // YARP_NO_DEPRECATED
497
498 bool setCallbackLock(std::mutex* mutex = nullptr);
499
500 bool removeCallbackLock();
501
502 bool lockCallback();
503
504 bool tryLockCallback();
505
506 void unlockCallback();
507
508 yarp::os::impl::PortDataModifier& getPortModifier();
509
510 void checkType(PortReader& reader);
511
512 yarp::os::Type getType();
513
514 void promiseType(const Type& typ);
515
516private:
517 // main internal PortCore state and operations
518 std::vector<PortCoreUnit *> m_units;
519 std::mutex m_stateMutex;
520 std::condition_variable m_stateCv;
521 std::mutex m_packetMutex;
522 std::condition_variable m_connectionChangeCv;
523 Face* m_face {nullptr};
524 std::string m_name;
525 yarp::os::Contact m_address;
526 yarp::os::PortReader *m_reader {nullptr};
527 yarp::os::PortReader *m_adminReader {nullptr};
528 yarp::os::PortReaderCreator *m_readableCreator {nullptr};
529 yarp::os::PortReport *m_eventReporter {nullptr};
530 std::atomic<bool> m_listening {false};
531 std::atomic<bool> m_running {false};
532 std::atomic<bool> m_starting {false};
533 std::atomic<bool> m_closing {false};
534 std::atomic<bool> m_finished {false};
535 bool m_finishing {false};
536 bool m_waitBeforeSend {true};
537 bool m_waitAfterSend {true};
538 bool m_controlRegistration {true};
539 bool m_interruptable {true};
540 bool m_interrupted {false};
541 bool m_manual {false};
542 int m_events {0};
543 int m_connectionListeners {0};
544 int m_inputCount {0};
545 int m_outputCount {0};
546 int m_dataOutputCount {0};
547 unsigned int m_flags {PORTCORE_IS_INPUT | PORTCORE_IS_OUTPUT};
548 bool m_logNeeded {false};
549 PortCorePackets m_packets {};
550 std::string m_envelope;
551 float m_timeout {-1};
552 int m_counter {1};
553 yarp::os::Property *m_prop {nullptr};
554 yarp::os::Contactable *m_contactable {nullptr};
555 std::mutex* m_mutex {nullptr};
556#ifndef YARP_NO_DEPRECATED // since YARP 3.3
559 yarp::os::Mutex* m_old_mutex {nullptr};
561#endif // YARP_NO_DEPRECATED
562 bool m_mutexOwned {false};
563 BufferedConnectionWriter m_envelopeWriter {true};
564
565 std::mutex m_typeMutex;
566 bool m_checkedType {false};
567 Type m_type;
568
569 // port data modifier
571
572 // set IP packet TOS
573 bool setTypeOfService(PortCoreUnit* unit, int tos);
574
575 // get IP packet TOS
576 int getTypeOfService(PortCoreUnit* unit);
577
578 // set the scheduling properties of all threads
579 // within the process scope.
580 bool setProcessSchedulingParam(int priority = -1, int policy = -1);
581
582 // attach a portmonitor plugin to the port or to a specific connection
583 bool attachPortMonitor(yarp::os::Property& prop, bool isOutput, std::string& errMsg);
584
585 // detach the portmonitor from the port or specific connection
586 bool detachPortMonitor(bool isOutput);
587
588 // set the parameter for the portmonitor of the port (if any)
589 bool setParamPortMonitor(const yarp::os::Property& param, bool isOutput, std::string& errMsg);
590
591 // get the parameters from the portmonitor of the port (if any)
592 bool getParamPortMonitor(yarp::os::Property& param, bool isOutput, std::string& errMsg);
593
594 void closeMain();
595
596 bool isUnit(const Route& route, int index);
597
598 // only called in "finished" phase
599 void closeUnits();
600
601 // called anytime, garbage collects terminated units
602 void cleanUnits(bool blocking = true);
603
604 // only called by the manager
605 void reapUnits();
606
607 // only called in "running" phase
608 void addInput(InputProtocol* ip);
609
610 bool removeUnit(const Route& route, bool synch = false, bool* except = nullptr);
611
612 int getNextIndex();
613};
614
615} // namespace yarp::os::impl
616
617#endif // YARP_OS_IMPL_PORTCORE_H
#define PORTCORE_IS_INPUT
Definition: PortCore.h:46
#define PORTCORE_IS_OUTPUT
Definition: PortCore.h:47
A base class for connection types (tcp, mcast, shmem, ...) which are called carriers in YARP.
Definition: Carrier.h:44
An interface for reading from a network connection.
Represents how to reach a part of a YARP network.
Definition: Contact.h:33
An abstract port.
Definition: Contactable.h:34
The initial point-of-contact with a port.
Definition: Face.h:20
Basic wrapper for mutual exclusion.
Definition: Mutex.h:31
The output side of an active connection between two ports.
Simple specification of the minimum functions needed from output streams.
Definition: OutputStream.h:21
Information about a port connection or event.
Definition: PortInfo.h:25
A creator for readers.
Interface implemented by all objects that can read themselves from the network, such as Bottle object...
Definition: PortReader.h:24
A base class for objects that want information about port status changes.
Definition: PortReport.h:25
Interface implemented by all objects that can write themselves to the network, such as Bottle objects...
Definition: PortWriter.h:23
A class for storing options and configuration information.
Definition: Property.h:33
Information about a connection between two ports.
Definition: Route.h:28
This manages a single threaded resource related to a single input or output connection.
Definition: PortCoreUnit.h:25
void setFlags(unsigned int flags)
Configure the port to meet certain restrictions in behavior.
Definition: PortCore.h:291
void setWaitAfterSend(bool waitAfterSend)
After sending a message, should we wait for it to be sent to all destinations before returning?
Definition: PortCore.h:357
void setWaitBeforeSend(bool waitBeforeSend)
Upon being asked to send a message, should we wait for any existing message to be sent to all destina...
Definition: PortCore.h:348
void setContactable(Contactable *contactable)
Definition: PortCore.h:296
bool read(yarp::os::ConnectionReader &reader) override
Callback for data.
Definition: PortCore.h:365
unsigned int getFlags()
Check current configuration of port.
Definition: PortCore.h:304
This is the heart of a yarp port.
Definition: PortCore.h:108
yarp::os::Carrier * outputModifier
Definition: PortCore.h:141
yarp::os::Carrier * inputModifier
Definition: PortCore.h:142
An abstraction for a thread of execution.
Definition: ThreadImpl.h:21
#define YARP_DEPRECATED
Expands to either the standard [[deprecated]] attribute or a compiler-specific decorator such as __at...
Definition: compiler.h:2894
The components from which ports and connections are built.
#define YARP_WARNING_POP
Ends a temporary alteration of the enabled warnings.
Definition: system.h:334
#define YARP_WARNING_PUSH
Starts a temporary alteration of the enabled warnings.
Definition: system.h:333
#define YARP_DISABLE_DEPRECATED_WARNING
Disable deprecated warnings in the following code.
Definition: system.h:335
#define YARP_UNUSED(var)
Definition: api.h:162
#define YARP_os_impl_API
Definition: api.h:46