YARP
Yet Another Robot Platform
MessageStack.cpp
Go to the documentation of this file.
1 /*
2  * SPDX-FileCopyrightText: 2006-2021 Istituto Italiano di Tecnologia (IIT)
3  * SPDX-License-Identifier: BSD-3-Clause
4  */
5 
6 #include <yarp/os/MessageStack.h>
7 
8 #include <yarp/conf/numeric.h>
9 
10 #include <yarp/os/Bottle.h>
11 #include <yarp/os/DummyConnector.h>
12 #include <yarp/os/Log.h>
13 #include <yarp/os/Thread.h>
14 
15 #include <condition_variable>
16 #include <deque>
17 #include <list>
18 #include <mutex>
19 
20 using namespace yarp::os;
21 
22 namespace {
23 
24 class MessageStackHelper;
25 
26 class MessageStackThread : public Thread
27 {
28 public:
29  MessageStackHelper& helper;
30 
31  explicit MessageStackThread(MessageStackHelper& helper) :
32  helper(helper)
33  {
34  }
35 
36  void run() override;
37 };
38 
39 class MessageStackHelper
40 {
41 private:
42  std::list<MessageStackThread*> threads;
43  std::deque<Bottle> msgs;
44  std::mutex mutex;
45  std::condition_variable cv;
46  size_t max_threads;
47  int available_threads;
48  PortReader& owner;
49  bool active;
50 
51 public:
52  MessageStackHelper(size_t max_threads, PortReader& owner) :
53  owner(owner)
54  {
55  this->max_threads = max_threads;
56  available_threads = 0;
57  active = true;
58  }
59 
60  void clear()
61  {
62  active = false;
63  cv.notify_all();
64  for (auto& thread : threads) {
65  thread->stop();
66  delete thread;
67  thread = nullptr;
68  }
69  threads.clear();
70  msgs.clear();
71  active = true;
72  }
73 
74  void stack(PortWriter& msg, const std::string& tag)
75  {
76  std::unique_lock<std::mutex> lock(mutex);
77  msgs.emplace_back();
78  if (!tag.empty()) {
79  Bottle b;
80  b.read(msg);
81  Bottle& back = msgs.back();
82  back.clear();
83  back.addString(tag);
84  back.append(b);
85  } else {
86  msgs.back().read(msg);
87  }
88  if (available_threads == 0) {
89  if (threads.size() < max_threads || max_threads == 0) {
90  available_threads++;
91  threads.push_back(new MessageStackThread(*this));
92  threads.back()->start();
93  }
94  }
95  available_threads--;
96  cv.notify_one();
97  }
98 
99  bool process()
100  {
101  std::unique_lock<std::mutex> lock(mutex);
102  cv.wait(lock, [&]{return !msgs.empty() || !active;});
103  if (!active) {
104  return false;
105  }
106  Bottle b = msgs.front();
107  msgs.pop_front();
108  lock.unlock();
109  DummyConnector con;
110  b.write(con.getWriter());
111  owner.read(con.getReader());
112  lock.lock();
113  available_threads++;
114  lock.unlock();
115  return active;
116  }
117 
118  bool isOwner(PortReader& owner)
119  {
120  return &(this->owner) == &owner;
121  }
122 };
123 
124 
125 void MessageStackThread::run()
126 {
127  while (helper.process()) {
128  // forever
129  }
130 }
131 
132 
133 } // namespace
134 
135 
136 
138 {
139 public:
140  size_t max_threads{0};
141  MessageStackHelper* helper = nullptr;
142 
143  explicit Private(size_t max_threads) :
145  {
146  }
147 
149  {
150  if (helper == nullptr) {
151  return;
152  }
153  helper->clear();
154  delete helper;
155  }
156 
157  void attach(PortReader& owner) {
158  if (helper != nullptr) {
159  if (helper->isOwner(owner)) {
160  return;
161  }
162  delete helper;
163  helper = nullptr;
164  }
165  helper = new MessageStackHelper(max_threads, owner);
166  }
167 
168  void stack(PortWriter& msg, const std::string& tag)
169  {
170  if (helper == nullptr) {
171  return;
172  }
173  helper->stack(msg, tag);
174  }
175 };
176 
177 
178 
179 MessageStack::MessageStack(size_t max_threads) :
180  mPriv(new Private(max_threads))
181 {
182 }
183 
185 {
186  delete mPriv;
187 }
188 
190 {
191  mPriv->attach(owner);
192 }
193 
194 void MessageStack::stack(PortWriter& msg, const std::string& tag)
195 {
196  mPriv->stack(msg, tag);
197 }
void attach(PortReader &owner)
void stack(PortWriter &msg, const std::string &tag)
A simple collection of objects that can be described and transmitted in a portable way.
Definition: Bottle.h:74
void append(const Bottle &alt)
Append the content of the given bottle to the current list.
Definition: Bottle.cpp:380
bool read(ConnectionReader &reader) override
Set the bottle's value based on input from a network connection.
Definition: Bottle.cpp:240
void clear()
Empties the bottle of any objects it contains.
Definition: Bottle.cpp:121
bool write(ConnectionWriter &writer) const override
Output a representation of the bottle to a network connection.
Definition: Bottle.cpp:230
void addString(const char *str)
Places a string in the bottle, at the end of the list.
Definition: Bottle.cpp:170
A dummy connection to test yarp::os::Portable implementations.
ConnectionWriter & getWriter()
Get the dummy ConnectionWriter loaded with whatever was written the ConnectionWriter since it was las...
ConnectionReader & getReader(ConnectionWriter *replyWriter=nullptr)
Get the dummy ConnectionReader loaded with whatever was written the ConnectionWriter since it was las...
MessageStack(size_t max_threads=0)
Constructor.
void attach(PortReader &owner)
void stack(PortWriter &msg, const std::string &tag="")
Add a message to the message stack, to be sent whenever the gods see fit.
virtual ~MessageStack()
Destructor.
Interface implemented by all objects that can read themselves from the network, such as Bottle object...
Definition: PortReader.h:25
virtual bool read(ConnectionReader &reader)=0
Read this object from a network connection.
Interface implemented by all objects that can write themselves to the network, such as Bottle objects...
Definition: PortWriter.h:24
An abstraction for a thread of execution.
Definition: Thread.h:22
An interface to the operating system, including Port based communication.