YARP
Yet Another Robot Platform
MessageStack.cpp
Go to the documentation of this file.
1 /*
2  * Copyright (C) 2006-2020 Istituto Italiano di Tecnologia (IIT)
3  * All rights reserved.
4  *
5  * This software may be modified and distributed under the terms of the
6  * BSD-3-Clause license. See the accompanying LICENSE file for details.
7  */
8 
9 #include <yarp/os/MessageStack.h>
10 
11 #include <yarp/conf/numeric.h>
12 
13 #include <yarp/os/Bottle.h>
14 #include <yarp/os/DummyConnector.h>
15 #include <yarp/os/Log.h>
16 #include <yarp/os/Semaphore.h>
17 #include <yarp/os/Thread.h>
18 
19 #include <condition_variable>
20 #include <deque>
21 #include <list>
22 #include <mutex>
23 
24 using namespace yarp::os;
25 
26 namespace {
27 
28 class MessageStackHelper;
29 
30 class MessageStackThread : public Thread
31 {
32 public:
33  MessageStackHelper& helper;
34 
35  explicit MessageStackThread(MessageStackHelper& helper) :
36  helper(helper)
37  {
38  }
39 
40  void run() override;
41 };
42 
43 class MessageStackHelper
44 {
45 private:
46  std::list<MessageStackThread*> threads;
47  std::deque<Bottle> msgs;
48  std::mutex mutex;
49  std::condition_variable cv;
50  size_t max_threads;
51  int available_threads;
52  PortReader& owner;
53  bool active;
54 
55 public:
56  MessageStackHelper(size_t max_threads, PortReader& owner) :
57  owner(owner)
58  {
59  this->max_threads = max_threads;
60  available_threads = 0;
61  active = true;
62  }
63 
64  void clear()
65  {
66  active = false;
67  cv.notify_all();
68  for (auto& thread : threads) {
69  thread->stop();
70  delete thread;
71  thread = nullptr;
72  }
73  threads.clear();
74  msgs.clear();
75  active = true;
76  }
77 
78  void stack(PortWriter& msg, const std::string& tag)
79  {
80  std::unique_lock<std::mutex> lock(mutex);
81  msgs.emplace_back();
82  if (!tag.empty()) {
83  Bottle b;
84  b.read(msg);
85  Bottle& back = msgs.back();
86  back.clear();
87  back.addString(tag);
88  back.append(b);
89  } else {
90  msgs.back().read(msg);
91  }
92  if (available_threads == 0) {
93  if (threads.size() < max_threads || max_threads == 0) {
94  available_threads++;
95  threads.push_back(new MessageStackThread(*this));
96  threads.back()->start();
97  }
98  }
99  available_threads--;
100  cv.notify_one();
101  }
102 
103  bool process()
104  {
105  std::unique_lock<std::mutex> lock(mutex);
106  cv.wait(lock, [&]{return !msgs.empty() || !active;});
107  if (!active) {
108  return false;
109  }
110  Bottle b = msgs.front();
111  msgs.pop_front();
112  lock.unlock();
113  DummyConnector con;
114  b.write(con.getWriter());
115  owner.read(con.getReader());
116  lock.lock();
117  available_threads++;
118  lock.unlock();
119  return active;
120  }
121 
122  bool isOwner(PortReader& owner)
123  {
124  return &(this->owner) == &owner;
125  }
126 };
127 
128 
129 void MessageStackThread::run()
130 {
131  while (helper.process()) {
132  // forever
133  }
134 }
135 
136 
137 } // namespace
138 
139 
140 
142 {
143 public:
144  size_t max_threads{0};
145  MessageStackHelper* helper = nullptr;
146 
147  explicit Private(size_t max_threads) :
149  {
150  }
151 
153  {
154  if (helper == nullptr) {
155  return;
156  }
157  helper->clear();
158  delete helper;
159  }
160 
161  void attach(PortReader& owner) {
162  if (helper != nullptr) {
163  if (helper->isOwner(owner)) {
164  return;
165  }
166  delete helper;
167  helper = nullptr;
168  }
169  helper = new MessageStackHelper(max_threads, owner);
170  }
171 
172  void stack(PortWriter& msg, const std::string& tag)
173  {
174  if (helper == nullptr) {
175  return;
176  }
177  helper->stack(msg, tag);
178  }
179 };
180 
181 
182 
183 MessageStack::MessageStack(size_t max_threads) :
184  mPriv(new Private(max_threads))
185 {
186 }
187 
189 {
190  delete mPriv;
191 }
192 
194 {
195  mPriv->attach(owner);
196 }
197 
198 void MessageStack::stack(PortWriter& msg, const std::string& tag)
199 {
200  mPriv->stack(msg, tag);
201 }
void attach(PortReader &owner)
void stack(PortWriter &msg, const std::string &tag)
A simple collection of objects that can be described and transmitted in a portable way.
Definition: Bottle.h:73
void append(const Bottle &alt)
Append the content of the given bottle to the current list.
Definition: Bottle.cpp:383
bool read(ConnectionReader &reader) override
Set the bottle's value based on input from a network connection.
Definition: Bottle.cpp:243
void clear()
Empties the bottle of any objects it contains.
Definition: Bottle.cpp:124
bool write(ConnectionWriter &writer) const override
Output a representation of the bottle to a network connection.
Definition: Bottle.cpp:233
void addString(const char *str)
Places a string in the bottle, at the end of the list.
Definition: Bottle.cpp:173
A dummy connection to test yarp::os::Portable implementations.
ConnectionWriter & getWriter()
Get the dummy ConnectionWriter loaded with whatever was written the ConnectionWriter since it was las...
ConnectionReader & getReader()
Get the dummy ConnectionReader loaded with whatever was written the ConnectionWriter since it was las...
MessageStack(size_t max_threads=0)
Constructor.
void attach(PortReader &owner)
void stack(PortWriter &msg, const std::string &tag="")
Add a message to the message stack, to be sent whenever the gods see fit.
virtual ~MessageStack()
Destructor.
Interface implemented by all objects that can read themselves from the network, such as Bottle object...
Definition: PortReader.h:28
virtual bool read(ConnectionReader &reader)=0
Read this object from a network connection.
Interface implemented by all objects that can write themselves to the network, such as Bottle objects...
Definition: PortWriter.h:27
An abstraction for a thread of execution.
Definition: Thread.h:25
An interface to the operating system, including Port based communication.