YARP
Yet Another Robot Platform
PortCore.h
Go to the documentation of this file.
1/*
2 * SPDX-FileCopyrightText: 2006-2021 Istituto Italiano di Tecnologia (IIT)
3 * SPDX-FileCopyrightText: 2006-2010 RobotCub Consortium
4 * SPDX-License-Identifier: BSD-3-Clause
5 */
6
7#ifndef YARP_OS_IMPL_PORTCORE_H
8#define YARP_OS_IMPL_PORTCORE_H
9
10#include <yarp/os/Carriers.h>
11#include <yarp/os/Contact.h>
12#include <yarp/os/Contactable.h>
14#include <yarp/os/PortReader.h>
16#include <yarp/os/PortReport.h>
17#include <yarp/os/PortWriter.h>
18#include <yarp/os/Property.h>
19#include <yarp/os/Type.h>
20#include <yarp/os/Vocab.h>
24
25#include <atomic>
26#include <condition_variable>
27#include <mutex>
28#include <vector>
29
30namespace yarp::os::impl {
31
32class PortCoreUnit;
33
34#define PORTCORE_SEND_NORMAL (1)
35#define PORTCORE_SEND_LOG (2)
36
37// some flags for restricting port behavior
38#define PORTCORE_IS_NULL (0)
39#define PORTCORE_IS_RPC (1)
40#define PORTCORE_IS_INPUT (2)
41#define PORTCORE_IS_OUTPUT (4)
42
102{
103public:
105 outputModifier(nullptr),
106 inputModifier(nullptr)
107 {
108 }
109
111 {
112 releaseOutModifier();
113 releaseInModifier();
114 }
115
117 {
118 if (outputModifier != nullptr) {
119 outputModifier->close();
120 delete outputModifier;
121 outputModifier = nullptr;
122 }
123 }
124
126 {
127 if (inputModifier != nullptr) {
128 inputModifier->close();
129 delete inputModifier;
130 inputModifier = nullptr;
131 }
132 }
133
134public:
137 std::mutex outputMutex;
138 std::mutex inputMutex;
139};
140
142 public ThreadImpl,
144{
145public:
150
154 ~PortCore();
155
165 bool addOutput(const std::string& dest,
166 void* id,
168 bool onlyIfNeeded = false);
169
173 void addOutput(OutputProtocol* op);
174
181 void removeInput(const std::string& src,
182 void* id,
184
191 void removeOutput(const std::string& dest,
192 void* id,
194
201 bool removeIO(const Route& route, bool synch = false);
202
208 void describe(void* id, yarp::os::OutputStream* os);
209
214 void describe(yarp::os::PortReport& reporter);
215
222 bool readBlock(ConnectionReader& reader,
223 void* id,
225
232 bool adminBlock(ConnectionReader& reader,
233 void* id);
234
239 void setName(const std::string& name);
240
244 std::string getName();
245
251 void setEnvelope(const std::string& envelope);
252
256 bool setEnvelope(yarp::os::PortWriter& envelope);
257
258 std::string getEnvelope();
259
263 bool getEnvelope(yarp::os::PortReader& envelope);
264
271 void report(const yarp::os::PortInfo& info);
272
280 void reportUnit(PortCoreUnit* unit, bool active);
281
285 void setFlags(unsigned int flags)
286 {
287 this->m_flags = flags;
288 }
289
290 void setContactable(Contactable* contactable)
291 {
292 this->m_contactable = contactable;
293 }
294
298 unsigned int getFlags()
299 {
300 return m_flags;
301 }
302
306 bool listen(const Contact& address, bool shouldAnnounce = true);
307
311 bool isWriting();
312
316 int getInputCount();
317
321 int getOutputCount();
322
326 void setReadHandler(yarp::os::PortReader& reader);
327
331 void setAdminReadHandler(yarp::os::PortReader& reader);
332
336 void setReadCreator(yarp::os::PortReaderCreator& creator);
337
342 void setWaitBeforeSend(bool waitBeforeSend)
343 {
344 this->m_waitBeforeSend = waitBeforeSend;
345 }
346
351 void setWaitAfterSend(bool waitAfterSend)
352 {
353 this->m_waitAfterSend = waitAfterSend;
354 }
355
359 bool read(yarp::os::ConnectionReader& reader) override
360 {
361 // does nothing by default
362 YARP_UNUSED(reader);
363 return true;
364 }
365
369 bool start() override;
370
374 bool manualStart(const char* sourceName);
375
382 bool send(const yarp::os::PortWriter& writer,
383 yarp::os::PortReader* reader = nullptr,
384 const yarp::os::PortWriter* callback = nullptr);
385
392 bool sendHelper(const yarp::os::PortWriter& writer,
393 int mode,
394 yarp::os::PortReader* reader = nullptr,
395 const yarp::os::PortWriter* callback = nullptr);
396
400 void close() override;
401
405 void run() override;
406
410 int getEventCount();
411
415 const Contact& getAddress() const;
416
417 void resetPortName(const std::string& str);
418
422 yarp::os::PortReaderCreator* getReadCreator();
423
427 void notifyCompletion(void* tracker);
428
433 void setControlRegistration(bool flag);
434
438 void interrupt();
439
443 void resume();
444
448 void setReportCallback(yarp::os::PortReport* reporter);
449
453 void resetReportCallback();
454
459 bool isListening() const;
460
465 bool isManual() const;
466
470 bool isInterrupted() const;
471
472 void setTimeout(float timeout);
473
477 Property* acquireProperties(bool readOnly);
478
482 void releaseProperties(Property* prop);
483
484 bool setCallbackLock(std::mutex* mutex = nullptr);
485
486 bool removeCallbackLock();
487
488 bool lockCallback();
489
490 bool tryLockCallback();
491
492 void unlockCallback();
493
494 yarp::os::impl::PortDataModifier& getPortModifier();
495
496 void checkType(PortReader& reader);
497
498 yarp::os::Type getType();
499
500 void promiseType(const Type& typ);
501
502private:
503 // main internal PortCore state and operations
504 std::vector<PortCoreUnit *> m_units;
505 std::mutex m_stateMutex;
506 std::condition_variable m_stateCv;
507 std::mutex m_packetMutex;
508 std::condition_variable m_connectionChangeCv;
509 Face* m_face {nullptr};
510 std::string m_name;
511 yarp::os::Contact m_address;
512 yarp::os::PortReader *m_reader {nullptr};
513 yarp::os::PortReader *m_adminReader {nullptr};
514 yarp::os::PortReaderCreator *m_readableCreator {nullptr};
515 yarp::os::PortReport *m_eventReporter {nullptr};
516 std::atomic<bool> m_listening {false};
517 std::atomic<bool> m_running {false};
518 std::atomic<bool> m_starting {false};
519 std::atomic<bool> m_closing {false};
520 std::atomic<bool> m_finished {false};
521 bool m_finishing {false};
522 bool m_waitBeforeSend {true};
523 bool m_waitAfterSend {true};
524 bool m_controlRegistration {true};
525 bool m_interruptable {true};
526 bool m_interrupted {false};
527 bool m_manual {false};
528 int m_events {0};
529 int m_connectionListeners {0};
530 int m_inputCount {0};
531 int m_outputCount {0};
532 int m_dataOutputCount {0};
533 unsigned int m_flags {PORTCORE_IS_INPUT | PORTCORE_IS_OUTPUT};
534 bool m_logNeeded {false};
535 PortCorePackets m_packets {};
536 std::string m_envelope;
537 float m_timeout {-1};
538 int m_counter {1};
539 yarp::os::Property *m_prop {nullptr};
540 yarp::os::Contactable *m_contactable {nullptr};
541 std::mutex* m_mutex {nullptr};
542 bool m_mutexOwned {false};
543 BufferedConnectionWriter m_envelopeWriter {true};
544
545 std::mutex m_typeMutex;
546 bool m_checkedType {false};
547 Type m_type;
548
549 // port data modifier
551
552 // set IP packet TOS
553 bool setTypeOfService(PortCoreUnit* unit, int tos);
554
555 // get IP packet TOS
556 int getTypeOfService(PortCoreUnit* unit);
557
558 // set the scheduling properties of all threads
559 // within the process scope.
560 bool setProcessSchedulingParam(int priority = -1, int policy = -1);
561
562 // attach a portmonitor plugin to the port or to a specific connection
563 bool attachPortMonitor(yarp::os::Property& prop, bool isOutput, std::string& errMsg);
564
565 // detach the portmonitor from the port or specific connection
566 bool detachPortMonitor(bool isOutput);
567
568 // set the parameter for the portmonitor of the port (if any)
569 bool setParamPortMonitor(const yarp::os::Property& param, bool isOutput, std::string& errMsg);
570
571 // get the parameters from the portmonitor of the port (if any)
572 bool getParamPortMonitor(yarp::os::Property& param, bool isOutput, std::string& errMsg);
573
574 void closeMain();
575
576 bool isUnit(const Route& route, int index);
577
578 // only called in "finished" phase
579 void closeUnits();
580
581 // called anytime, garbage collects terminated units
582 void cleanUnits(bool blocking = true);
583
584 // only called by the manager
585 void reapUnits();
586
587 // only called in "running" phase
588 void addInput(InputProtocol* ip);
589
590 bool removeUnit(const Route& route, bool synch = false, bool* except = nullptr);
591
592 int getNextIndex();
593};
594
595} // namespace yarp::os::impl
596
597#endif // YARP_OS_IMPL_PORTCORE_H
#define PORTCORE_IS_INPUT
Definition: PortCore.h:40
#define PORTCORE_IS_OUTPUT
Definition: PortCore.h:41
A base class for connection types (tcp, mcast, shmem, ...) which are called carriers in YARP.
Definition: Carrier.h:44
An interface for reading from a network connection.
Represents how to reach a part of a YARP network.
Definition: Contact.h:33
An abstract port.
Definition: Contactable.h:28
The initial point-of-contact with a port.
Definition: Face.h:20
The output side of an active connection between two ports.
Simple specification of the minimum functions needed from output streams.
Definition: OutputStream.h:21
Information about a port connection or event.
Definition: PortInfo.h:25
A creator for readers.
Interface implemented by all objects that can read themselves from the network, such as Bottle object...
Definition: PortReader.h:24
A base class for objects that want information about port status changes.
Definition: PortReport.h:25
Interface implemented by all objects that can write themselves to the network, such as Bottle objects...
Definition: PortWriter.h:23
A class for storing options and configuration information.
Definition: Property.h:33
Information about a connection between two ports.
Definition: Route.h:28
This manages a single threaded resource related to a single input or output connection.
Definition: PortCoreUnit.h:25
void setFlags(unsigned int flags)
Configure the port to meet certain restrictions in behavior.
Definition: PortCore.h:285
void setWaitAfterSend(bool waitAfterSend)
After sending a message, should we wait for it to be sent to all destinations before returning?
Definition: PortCore.h:351
void setWaitBeforeSend(bool waitBeforeSend)
Upon being asked to send a message, should we wait for any existing message to be sent to all destina...
Definition: PortCore.h:342
void setContactable(Contactable *contactable)
Definition: PortCore.h:290
bool read(yarp::os::ConnectionReader &reader) override
Callback for data.
Definition: PortCore.h:359
unsigned int getFlags()
Check current configuration of port.
Definition: PortCore.h:298
This is the heart of a yarp port.
Definition: PortCore.h:102
yarp::os::Carrier * outputModifier
Definition: PortCore.h:135
yarp::os::Carrier * inputModifier
Definition: PortCore.h:136
An abstraction for a thread of execution.
Definition: ThreadImpl.h:21
The components from which ports and connections are built.
#define YARP_UNUSED(var)
Definition: api.h:162
#define YARP_os_impl_API
Definition: api.h:46