19 #include <condition_variable>
28 class MessageStackHelper;
30 class MessageStackThread :
public Thread
33 MessageStackHelper& helper;
35 explicit MessageStackThread(MessageStackHelper& helper) :
43 class MessageStackHelper
46 std::list<MessageStackThread*> threads;
47 std::deque<Bottle> msgs;
49 std::condition_variable cv;
51 int available_threads;
56 MessageStackHelper(
size_t max_threads,
PortReader& owner) :
59 this->max_threads = max_threads;
60 available_threads = 0;
68 for (
auto& thread : threads) {
78 void stack(
PortWriter& msg,
const std::string& tag)
80 std::unique_lock<std::mutex> lock(mutex);
85 Bottle& back = msgs.back();
90 msgs.back().read(msg);
92 if (available_threads == 0) {
93 if (threads.size() < max_threads || max_threads == 0) {
95 threads.push_back(
new MessageStackThread(*
this));
96 threads.back()->start();
105 std::unique_lock<std::mutex> lock(mutex);
106 cv.wait(lock, [&]{
return !msgs.empty() || !active;});
124 return &(this->owner) == &owner;
129 void MessageStackThread::run()
131 while (helper.process()) {
163 if (
helper->isOwner(owner)) {
184 mPriv(new
Private(max_threads))
195 mPriv->attach(owner);
200 mPriv->stack(msg, tag);
MessageStackHelper * helper
void attach(PortReader &owner)
Private(size_t max_threads)
void stack(PortWriter &msg, const std::string &tag)
A simple collection of objects that can be described and transmitted in a portable way.
void append(const Bottle &alt)
Append the content of the given bottle to the current list.
bool read(ConnectionReader &reader) override
Set the bottle's value based on input from a network connection.
void clear()
Empties the bottle of any objects it contains.
bool write(ConnectionWriter &writer) const override
Output a representation of the bottle to a network connection.
void addString(const char *str)
Places a string in the bottle, at the end of the list.
A dummy connection to test yarp::os::Portable implementations.
ConnectionWriter & getWriter()
Get the dummy ConnectionWriter loaded with whatever was written the ConnectionWriter since it was las...
ConnectionReader & getReader()
Get the dummy ConnectionReader loaded with whatever was written the ConnectionWriter since it was las...
MessageStack(size_t max_threads=0)
Constructor.
void attach(PortReader &owner)
void stack(PortWriter &msg, const std::string &tag="")
Add a message to the message stack, to be sent whenever the gods see fit.
virtual ~MessageStack()
Destructor.
Interface implemented by all objects that can read themselves from the network, such as Bottle object...
virtual bool read(ConnectionReader &reader)=0
Read this object from a network connection.
Interface implemented by all objects that can write themselves to the network, such as Bottle objects...
An abstraction for a thread of execution.
An interface to the operating system, including Port based communication.