YARP
Yet Another Robot Platform
 
Loading...
Searching...
No Matches
PriorityCarrier.cpp
Go to the documentation of this file.
1/*
2 * SPDX-FileCopyrightText: 2006-2021 Istituto Italiano di Tecnologia (IIT)
3 * SPDX-License-Identifier: BSD-3-Clause
4 */
5
6#include "PriorityCarrier.h"
7
8#include <yarp/os/Log.h>
11#include <yarp/os/Route.h>
12
13#include <yarp/math/Math.h>
14#include <yarp/math/SVD.h>
15#include <string>
16
17
18using namespace yarp::os;
19using namespace yarp::math;
20
21namespace {
23 "yarp.carrier.priority",
27 nullptr)
28}
29
30
35ElectionOf<PriorityGroup> *PriorityCarrier::peers = nullptr;
36
37// Make a singleton manager for finding peer carriers.
38ElectionOf<PriorityGroup>& PriorityCarrier::getPeers() {
40 if (peers==nullptr) {
41 peers = new ElectionOf<PriorityGroup>;
44 } else {
46 }
47 return *peers;
48}
49
50// Decide whether data should be accepted.
52 getPeers().lock();
54 bool result = group->acceptIncomingData(reader,this);
55 getPeers().unlock();
56 return result;
57}
58
59// Read connection settings.
61 portName = proto.getRoute().getToName();
62 sourceName = proto.getRoute().getFromName();
63 group = getPeers().add(portName,this);
64 if (!group) {
65 return false;
66 }
67
68 Property options;
69 options.fromString(proto.getSenderSpecifier());
70
71 timeConstant = fabs(options.check("tc",Value(1.0)).asFloat64());
72 timeResting = fabs(options.check("tr",Value(0.0)).asFloat64());
73 stimulation = fabs(options.check("st",Value(STIMUL_THRESHOLD*10)).asFloat64());
74 // Zero stimulation is undefined and will be interpreted as S=Thresould.
75 if (stimulation == 0) {
77 }
78 stimulation /= 10.0;
79
80 bias = options.check("bs",Value(STIMUL_THRESHOLD*10)).asFloat64();
81 bias /= 10.0;
82
83 excitation = options.findGroup("ex");
84 isVirtual = options.check("virtual");
85
86#ifdef WITH_PRIORITY_DEBUG
87 if(options.check("debug"))
88 {
89 std::string msg;
90 char dummy[1024];
91 std::snprintf(dummy, 1024, "\n%s:\n", sourceName.c_str());
92 msg+= dummy;
93 std::snprintf(dummy, 1024, " stimulation: %.2f\n", stimulation);
94 msg+= dummy;
95 std::snprintf(dummy, 1024, " bias: %.2f\n", bias);
96 msg+= dummy;
97 std::snprintf(dummy, 1024, " tc: %.2fs\n", timeConstant);
98 msg+= dummy;
99 std::snprintf(dummy, 1024, " tr: %.2fs\n", timeResting);
100 msg+= dummy;
101 std::snprintf(dummy, 1024, " ex: ");
102 msg+= dummy;
103 for(size_t i=0; i<excitation.size(); i++)
104 {
105 Value v = excitation.get(i);
106 if(v.isList() && (v.asList()->size()>=2))
107 {
108 Bottle* b = v.asList();
109 std::snprintf(dummy, 1024, "(%s, %.2f) ",
110 b->get(0).asString().c_str(),
111 b->get(1).asFloat64()/10.0 );
112 msg+= dummy;
113 }
114 }
115 //std::snprintf(dummy, 1024, "\n");
116 msg+= "\n";
117 std::snprintf(dummy, 1024, " virtual: %s\n",
118 (isVirtual)?"yes":"no");
119 msg+= dummy;
120 double rate = options.check("rate", Value(10)).asInt32() / 1000.0;
121 std::snprintf(dummy, 1024, " db.rate: %fs\n", rate);
122 msg+= dummy;
123 yCInfo(PRIORITYCARRIER, "%s", msg.c_str());
124 debugger.stop();
125 debugger.setPeriod(rate);
126 debugger.start();
127 }
128#endif
129 return true;
130}
131
132
134{
135 //
136 // +P| ---___
137 // | | -__
138 // | | -_
139 // 0|------------------> (Active state)
140 // Ta Tc
141 //
142 // Ta Tr
143 // 0|-------------_----> (Resting state && P<0)
144 // | | __-
145 // | | ___-
146 // -P| ---
147 //
148 //
149 // P(t) = Pi * (1-exp((t-Tc-Ta)/Tc*5) + exp(-5))
150 // t:time, Pi: temporal Priority level
151 // Tc: reset time, Ta: arrival time
152 //
153 // we do not consider ports which has not seen any message
154 // from them yet.
155 //if(timeArrival == 0)
156 // return 0;
157
158 double dt = t - timeArrival;
159 // Temporal priority is inverted if this is a neuron model and the temporal
160 // stimulation has already reached to STIMUL_THRESHOLD and waited for Tc.
161 if ((timeResting > 0)
162 && (dt >= fabs(timeConstant))
165 }
166
167 double actualStimulation;
168 if(!isResting(temporalStimulation)) // behavior is in stimulation state
169 {
170 // After a gap bigger than Tc, the
171 // priority is set to zero to avoid redundant calculation.
172 if (dt > fabs(timeConstant)) {
174 } else {
176 }
177 }
178 else // behavior is in resting state
179 {
180 // it is in waiting state for Tc
181 if (temporalStimulation > 0) {
183 } else {
184 dt -= fabs(timeConstant);
185 // After a gap bigger than Tr, the
186 // priority is set to zero to avoid redundant calculation.
187 if (dt > fabs(timeResting)) {
189 } else {
190 actualStimulation = temporalStimulation * (1.0 - exp((dt - timeResting) / timeResting * 5.0) + exp(-5.0));
191 }
192 }
193 }
194
195 if (actualStimulation <= 0) {
196 isActive = false;
197 }
198
199 return actualStimulation;
200}
201
203{
204 // calculating E(t) = Sum(e.I(t)) + b
205 if (!isActive) {
206 return 0.0;
207 }
208
209 double E = 0;
210 for (auto& it : group->peerSet)
211 {
212 PriorityCarrier *peer = it.first;
213 if(peer != this)
214 {
215 for(size_t i=0; i<peer->excitation.size(); i++)
216 {
217 Value v = peer->excitation.get(i);
218 if(v.isList() && (v.asList()->size()>=2))
219 {
220 Bottle* b = v.asList();
221 // an exitatory to this priority carrier
222 if (sourceName == b->get(0).asString()) {
223 E += peer->getActualInput(t) * (b->get(1).asFloat64() / 10.0);
224 }
225 }
226 }
227
228 }
229 }
230 E += bias;
231 double I = E * getActualStimulation(t);
232 return ((I<0) ? 0 : I); //I'(t)
233}
234
235
241{
242 //TODO: find the correct way to get the size of peerSet
243 int nConnections = 0;
244 for (auto it = peerSet.begin(); it != peerSet.end(); it++) {
245 nConnections++;
246 }
247
248 // calculate matrices X, B, InvA and Y
253 InvA.eye();
254
255 int row = 0;
256 for(auto& rowItr : peerSet)
257 {
258 PriorityCarrier* peer = rowItr.first;
259 // call 'getActualStimulation' to update 'isActive'
260 peer->getActualStimulation(t);
261 double xi = (peer->isActive) ? STIMUL_THRESHOLD : 0.0;
262 B(row,0) = peer->bias * xi;
263 X(row,0) = xi;
264
265 int col = 0;
266 for(auto& it : peerSet)
267 {
268 PriorityCarrier *peerCol = it.first;
269 for(size_t i=0; i<peerCol->excitation.size(); i++)
270 {
271 Value v = peerCol->excitation.get(i);
272 if(v.isList() && (v.asList()->size()>=2))
273 {
274 Bottle* b = v.asList();
275 // an exitatory link to this connection
276 if (peer->sourceName == b->get(0).asString()) {
277 InvA(row, col) = -(b->get(1).asFloat64() / 10.0) * xi;
278 }
279 }
280 }
281 col++;
282 }
283 row++;
284 }
285
286 yCTrace(PRIORITYCARRIER, "A:\n %s", InvA.toString(1).c_str());
287
288 // calclulating the determinant
290 if(determinant == 0)
291 {
292 yCError(PRIORITYCARRIER, "Inconsistent regulation! non-invertible weight matrix");
293 return false;
294 }
295
296 // inverting the weight matrix
298 Y = InvA * B;
299
300 yCTrace(PRIORITYCARRIER, "X:\n %s", X.toString(1).c_str());
301 yCTrace(PRIORITYCARRIER, "B:\n %s", B.toString(1).c_str());
302 yCTrace(PRIORITYCARRIER, "Y:\n %s", Y.toString(1).c_str());
303
304 return true;
305}
306
307// Decide whether data should be accepted, for real.
309 PriorityCarrier *source) {
310
311 bool accept;
312 // updates message's arrival time
313 double tNow = yarp::os::Time::now();
314 source->stimulate(tNow);
315
316 if (!recalculate(tNow)) {
317 return false;
318 }
319
320 int row = 0;
321 PriorityCarrier *maxPeer = nullptr;
322 double maxStimuli = 0.0;
323 for(auto& it : peerSet)
324 {
325 PriorityCarrier *peer = it.first;
326 double output = Y(row,0) * X(row,0);
327 peer->yi = output; // only for debug purpose
328
329 if(!peer->isVirtual)
330 {
331 if(output > maxStimuli)
332 {
333 maxStimuli = output;
334 maxPeer = peer;
335 }
336 }
337 row++;
338 }
339 accept = (maxPeer == source);
340
341 // a virtual message will never be delivered. It will be only
342 // used for the coordination
343 if (source->isVirtual) {
344 accept = false;
345 }
346
347 return accept;
348}
349
350
355#ifdef WITH_PRIORITY_DEBUG
361
363{
364 if (isRunning()) {
365 stop();
366 }
367}
368
370{
372 v.resize(4);
373 // a vector of [t, S(t), S'(t), I'(t)]
374 double t = yarp::os::Time::now();
375 v[0] = t;
377 v[2] = pcarrier->yi;
379}
380
382{
383 debugPortName = pcarrier->portName + pcarrier->sourceName + std::string(":debug");
385}
386
391
392#endif //WITH_PRIORITY_DEBUG
#define STIMUL_THRESHOLD
Allow priority-based message selection.
std::string sourceName
bool configure(yarp::os::ConnectionState &proto) override
Give carrier a shot at looking at how the connection is set up.
double getActualInput(double t)
void stimulate(double t)
bool isResting(double priority)
bool acceptIncomingData(yarp::os::ConnectionReader &reader) override
Determine whether incoming data should be accepted.
yarp::os::Bottle excitation
double getActualStimulation(double t)
yarp::os::BufferedPort< yarp::sig::Vector > debugPort
void threadRelease() override
Release method.
void run() override
Loop function.
PriorityCarrier * pcarrier
bool threadInit() override
Initialization method.
PriorityDebugThread(PriorityCarrier *carrier)
Class PriorityDebugThread.
std::string debugPortName
yarp::sig::Matrix X
virtual bool acceptIncomingData(yarp::os::ConnectionReader &reader, PriorityCarrier *source)
bool recalculate(double t)
Class PriorityGroup.
yarp::sig::Matrix Y
yarp::sig::Matrix InvA
yarp::sig::Matrix B
A simple collection of objects that can be described and transmitted in a portable way.
Definition Bottle.h:64
size_type size() const
Gets the number of elements in the bottle.
Definition Bottle.cpp:251
Value & get(size_type index) const
Reads a Value v from a certain part of the list.
Definition Bottle.cpp:246
A mini-server for performing network communication in the background.
void close() override
Stop port activity.
bool open(const std::string &name) override
Start port operation, with a specific name, with automatically-chosen network parameters.
void write(bool forceStrict=false)
Write the current object being returned by BufferedPort::prepare.
T & prepare()
Access the object which will be transmitted by the next call to yarp::os::BufferedPort::write.
An interface for reading from a network connection.
The basic state of a connection - route, streams in use, etc.
virtual const Route & getRoute() const =0
Get the route associated with this connection.
virtual std::string getSenderSpecifier() const =0
Extract a name for the sender, if the connection type supports that.
static LogCallback printCallback()
Get current print callback.
Definition Log.cpp:873
static LogType minimumPrintLevel()
Get current minimum print level.
Definition Log.cpp:833
@ LogTypeReserved
Definition Log.h:98
static void unlock()
Call post() on a global mutual-exclusion semaphore allocated by YARP.
Definition Network.cpp:1423
static void lock()
Call wait() on a global mutual-exclusion semaphore allocated by YARP.
Definition Network.cpp:1418
An abstraction for a periodic thread.
bool setPeriod(double period)
Set the (new) period of the thread.
bool isRunning() const
Returns true when the thread is started, false otherwise.
bool start()
Call this to start the thread.
void stop()
Call this to stop the thread, this call blocks until the thread is terminated (and releaseThread() ca...
A class for storing options and configuration information.
Definition Property.h:33
void fromString(const std::string &txt, bool wipe=true)
Interprets a string as a list of properties.
bool check(const std::string &key) const override
Check if there exists a property of the given name.
Bottle & findGroup(const std::string &key) const override
Gets a list corresponding to a given keyword.
A single value (typically within a Bottle).
Definition Value.h:43
virtual yarp::conf::float64_t asFloat64() const
Get 64-bit floating point value.
Definition Value.cpp:222
virtual bool isList() const
Checks if value is a list.
Definition Value.cpp:162
virtual Bottle * asList() const
Get list value.
Definition Value.cpp:240
virtual std::string asString() const
Get string value.
Definition Value.cpp:234
void resize(size_t r, size_t c)
Resize the matrix, if matrix is not empty preserve old content.
Definition Matrix.cpp:265
std::string toString(int precision=-1, int width=-1, const char *endRowStr="\n") const
Print matrix to a string.
Definition Matrix.cpp:171
const Matrix & eye()
Build an identity matrix, don't resize.
Definition Matrix.cpp:456
void resize(size_t size) override
Resize the vector.
Definition Vector.h:221
#define yCInfo(component,...)
#define yCError(component,...)
#define yCAssert(component, x)
#define yCTrace(component,...)
#define YARP_LOG_COMPONENT(name,...)
double det(const yarp::sig::Matrix &in)
Computes the determinant of a matrix (defined in Math.h).
Definition math.cpp:607
yarp::sig::Matrix luinv(const yarp::sig::Matrix &in)
Invert a square matrix using LU-decomposition (defined in Math.h).
Definition math.cpp:612
double now()
Return the current time in seconds, relative to an arbitrary starting point.
Definition Time.cpp:121
An interface to the operating system, including Port based communication.