YARP
Yet Another Robot Platform
 
Loading...
Searching...
No Matches
MessageStack.cpp
Go to the documentation of this file.
1/*
2 * SPDX-FileCopyrightText: 2006-2021 Istituto Italiano di Tecnologia (IIT)
3 * SPDX-License-Identifier: BSD-3-Clause
4 */
5
7
8#include <yarp/conf/numeric.h>
9
10#include <yarp/os/Bottle.h>
12#include <yarp/os/Log.h>
13#include <yarp/os/Thread.h>
14
15#include <condition_variable>
16#include <deque>
17#include <list>
18#include <mutex>
19
20using namespace yarp::os;
21
22namespace {
23
24class MessageStackHelper;
25
26class MessageStackThread : public Thread
27{
28public:
29 MessageStackHelper& helper;
30
31 explicit MessageStackThread(MessageStackHelper& helper) :
32 helper(helper)
33 {
34 }
35
36 void run() override;
37};
38
39class MessageStackHelper
40{
41private:
42 std::list<MessageStackThread*> threads;
43 std::deque<Bottle> msgs;
44 std::mutex mutex;
45 std::condition_variable cv;
46 size_t max_threads;
47 int available_threads;
48 PortReader& owner;
49 bool active;
50
51public:
52 MessageStackHelper(size_t max_threads, PortReader& owner) :
53 owner(owner)
54 {
55 this->max_threads = max_threads;
56 available_threads = 0;
57 active = true;
58 }
59
60 void clear()
61 {
62 active = false;
63 cv.notify_all();
64 for (auto& thread : threads) {
65 thread->stop();
66 delete thread;
67 thread = nullptr;
68 }
69 threads.clear();
70 msgs.clear();
71 active = true;
72 }
73
74 void stack(PortWriter& msg, const std::string& tag)
75 {
76 std::unique_lock<std::mutex> lock(mutex);
77 msgs.emplace_back();
78 if (!tag.empty()) {
79 Bottle b;
80 b.read(msg);
81 Bottle& back = msgs.back();
82 back.clear();
83 back.addString(tag);
84 back.append(b);
85 } else {
86 msgs.back().read(msg);
87 }
88 if (available_threads == 0) {
89 if (threads.size() < max_threads || max_threads == 0) {
90 available_threads++;
91 threads.push_back(new MessageStackThread(*this));
92 threads.back()->start();
93 }
94 }
95 available_threads--;
96 cv.notify_one();
97 }
98
99 bool process()
100 {
101 std::unique_lock<std::mutex> lock(mutex);
102 cv.wait(lock, [&]{return !msgs.empty() || !active;});
103 if (!active) {
104 return false;
105 }
106 Bottle b = msgs.front();
107 msgs.pop_front();
108 lock.unlock();
109 DummyConnector con;
110 b.write(con.getWriter());
111 owner.read(con.getReader());
112 lock.lock();
113 available_threads++;
114 lock.unlock();
115 return active;
116 }
117
118 bool isOwner(PortReader& owner)
119 {
120 return &(this->owner) == &owner;
121 }
122};
123
124
125void MessageStackThread::run()
126{
127 while (helper.process()) {
128 // forever
129 }
130}
131
132
133} // namespace
134
135
136
138{
139public:
140 size_t max_threads{0};
141 MessageStackHelper* helper = nullptr;
142
143 explicit Private(size_t max_threads) :
145 {
146 }
147
149 {
150 if (helper == nullptr) {
151 return;
152 }
153 helper->clear();
154 delete helper;
155 }
156
157 void attach(PortReader& owner) {
158 if (helper != nullptr) {
159 if (helper->isOwner(owner)) {
160 return;
161 }
162 delete helper;
163 helper = nullptr;
164 }
165 helper = new MessageStackHelper(max_threads, owner);
166 }
167
168 void stack(PortWriter& msg, const std::string& tag)
169 {
170 if (helper == nullptr) {
171 return;
172 }
173 helper->stack(msg, tag);
174 }
175};
176
177
178
179MessageStack::MessageStack(size_t max_threads) :
180 mPriv(new Private(max_threads))
181{
182}
183
185{
186 delete mPriv;
187}
188
190{
191 mPriv->attach(owner);
192}
193
194void MessageStack::stack(PortWriter& msg, const std::string& tag)
195{
196 mPriv->stack(msg, tag);
197}
void attach(PortReader &owner)
void stack(PortWriter &msg, const std::string &tag)
A simple collection of objects that can be described and transmitted in a portable way.
Definition Bottle.h:64
void append(const Bottle &alt)
Append the content of the given bottle to the current list.
Definition Bottle.cpp:353
bool read(ConnectionReader &reader) override
Set the bottle's value based on input from a network connection.
Definition Bottle.cpp:240
void clear()
Empties the bottle of any objects it contains.
Definition Bottle.cpp:121
bool write(ConnectionWriter &writer) const override
Output a representation of the bottle to a network connection.
Definition Bottle.cpp:230
void addString(const char *str)
Places a string in the bottle, at the end of the list.
Definition Bottle.cpp:170
A mini-server for performing network communication in the background.
T * read(bool shouldWait=true) override
Read an available object from the port.
A dummy connection to test yarp::os::Portable implementations.
ConnectionWriter & getWriter()
Get the dummy ConnectionWriter loaded with whatever was written the ConnectionWriter since it was las...
ConnectionReader & getReader(ConnectionWriter *replyWriter=nullptr)
Get the dummy ConnectionReader loaded with whatever was written the ConnectionWriter since it was las...
MessageStack(size_t max_threads=0)
Constructor.
void attach(PortReader &owner)
void stack(PortWriter &msg, const std::string &tag="")
Add a message to the message stack, to be sent whenever the gods see fit.
virtual ~MessageStack()
Destructor.
Interface implemented by all objects that can read themselves from the network, such as Bottle object...
Definition PortReader.h:24
virtual bool read(ConnectionReader &reader)=0
Read this object from a network connection.
Interface implemented by all objects that can write themselves to the network, such as Bottle objects...
Definition PortWriter.h:23
An abstraction for a thread of execution.
Definition Thread.h:21
virtual void run()=0
Main body of the new thread.
An interface to the operating system, including Port based communication.