YARP
Yet Another Robot Platform
PriorityCarrier.cpp
Go to the documentation of this file.
1 /*
2  * SPDX-FileCopyrightText: 2006-2021 Istituto Italiano di Tecnologia (IIT)
3  * SPDX-License-Identifier: BSD-3-Clause
4  */
5 
6 #include "PriorityCarrier.h"
7 
8 #include <yarp/os/Log.h>
9 #include <yarp/os/LogComponent.h>
11 #include <yarp/os/Route.h>
12 
13 #include <yarp/math/Math.h>
14 #include <yarp/math/SVD.h>
15 #include <string>
16 
17 
18 using namespace yarp::os;
19 using namespace yarp::math;
20 
21 namespace {
22 YARP_LOG_COMPONENT(PRIORITYCARRIER,
23  "yarp.carrier.priority",
27  nullptr)
28 }
29 
30 
35 ElectionOf<PriorityGroup> *PriorityCarrier::peers = nullptr;
36 
37 // Make a singleton manager for finding peer carriers.
38 ElectionOf<PriorityGroup>& PriorityCarrier::getPeers() {
40  if (peers==nullptr) {
41  peers = new ElectionOf<PriorityGroup>;
43  yCAssert(PRIORITYCARRIER, peers);
44  } else {
46  }
47  return *peers;
48 }
49 
50 // Decide whether data should be accepted.
52  getPeers().lock();
53  yCAssert(PRIORITYCARRIER, group);
54  bool result = group->acceptIncomingData(reader,this);
55  getPeers().unlock();
56  return result;
57 }
58 
59 // Read connection settings.
61  portName = proto.getRoute().getToName();
62  sourceName = proto.getRoute().getFromName();
63  group = getPeers().add(portName,this);
64  if (!group) {
65  return false;
66  }
67 
68  Property options;
69  options.fromString(proto.getSenderSpecifier());
70 
71  timeConstant = fabs(options.check("tc",Value(1.0)).asFloat64());
72  timeResting = fabs(options.check("tr",Value(0.0)).asFloat64());
73  stimulation = fabs(options.check("st",Value(STIMUL_THRESHOLD*10)).asFloat64());
74  // Zero stimulation is undefined and will be interpreted as S=Thresould.
75  if (stimulation == 0) {
76  stimulation = STIMUL_THRESHOLD * 10;
77  }
78  stimulation /= 10.0;
79 
80  bias = options.check("bs",Value(STIMUL_THRESHOLD*10)).asFloat64();
81  bias /= 10.0;
82 
83  excitation = options.findGroup("ex");
84  isVirtual = options.check("virtual");
85 
86 #ifdef WITH_PRIORITY_DEBUG
87  if(options.check("debug"))
88  {
89  std::string msg;
90  char dummy[1024];
91  std::snprintf(dummy, 1024, "\n%s:\n", sourceName.c_str());
92  msg+= dummy;
93  std::snprintf(dummy, 1024, " stimulation: %.2f\n", stimulation);
94  msg+= dummy;
95  std::snprintf(dummy, 1024, " bias: %.2f\n", bias);
96  msg+= dummy;
97  std::snprintf(dummy, 1024, " tc: %.2fs\n", timeConstant);
98  msg+= dummy;
99  std::snprintf(dummy, 1024, " tr: %.2fs\n", timeResting);
100  msg+= dummy;
101  std::snprintf(dummy, 1024, " ex: ");
102  msg+= dummy;
103  for(size_t i=0; i<excitation.size(); i++)
104  {
105  Value v = excitation.get(i);
106  if(v.isList() && (v.asList()->size()>=2))
107  {
108  Bottle* b = v.asList();
109  std::snprintf(dummy, 1024, "(%s, %.2f) ",
110  b->get(0).asString().c_str(),
111  b->get(1).asFloat64()/10.0 );
112  msg+= dummy;
113  }
114  }
115  //std::snprintf(dummy, 1024, "\n");
116  msg+= "\n";
117  std::snprintf(dummy, 1024, " virtual: %s\n",
118  (isVirtual)?"yes":"no");
119  msg+= dummy;
120  double rate = options.check("rate", Value(10)).asInt32() / 1000.0;
121  std::snprintf(dummy, 1024, " db.rate: %fs\n", rate);
122  msg+= dummy;
123  yCInfo(PRIORITYCARRIER, "%s", msg.c_str());
124  debugger.stop();
125  debugger.setPeriod(rate);
126  debugger.start();
127  }
128 #endif
129  return true;
130 }
131 
132 
134 {
135  //
136  // +P| ---___
137  // | | -__
138  // | | -_
139  // 0|------------------> (Active state)
140  // Ta Tc
141  //
142  // Ta Tr
143  // 0|-------------_----> (Resting state && P<0)
144  // | | __-
145  // | | ___-
146  // -P| ---
147  //
148  //
149  // P(t) = Pi * (1-exp((t-Tc-Ta)/Tc*5) + exp(-5))
150  // t:time, Pi: temporal Priority level
151  // Tc: reset time, Ta: arrival time
152  //
153  // we do not consider ports which has not seen any message
154  // from them yet.
155  //if(timeArrival == 0)
156  // return 0;
157 
158  double dt = t - timeArrival;
159  // Temporal priority is inverted if this is a neuron model and the temporal
160  // stimulation has already reached to STIMUL_THRESHOLD and waited for Tc.
161  if ((timeResting > 0)
162  && (dt >= fabs(timeConstant))
163  && (temporalStimulation >= STIMUL_THRESHOLD)) {
164  temporalStimulation = -temporalStimulation;
165  }
166 
167  double actualStimulation;
168  if(!isResting(temporalStimulation)) // behavior is in stimulation state
169  {
170  // After a gap bigger than Tc, the
171  // priority is set to zero to avoid redundant calculation.
172  if (dt > fabs(timeConstant)) {
173  actualStimulation = 0;
174  } else {
175  actualStimulation = temporalStimulation * (1.0 - exp((dt - timeConstant) / timeConstant * 5.0) + exp(-5.0));
176  }
177  }
178  else // behavior is in resting state
179  {
180  // it is in waiting state for Tc
181  if (temporalStimulation > 0) {
182  actualStimulation = temporalStimulation;
183  } else {
184  dt -= fabs(timeConstant);
185  // After a gap bigger than Tr, the
186  // priority is set to zero to avoid redundant calculation.
187  if (dt > fabs(timeResting)) {
188  actualStimulation = 0;
189  } else {
190  actualStimulation = temporalStimulation * (1.0 - exp((dt - timeResting) / timeResting * 5.0) + exp(-5.0));
191  }
192  }
193  }
194 
195  if (actualStimulation <= 0) {
196  isActive = false;
197  }
198 
199  return actualStimulation;
200 }
201 
203 {
204  // calculating E(t) = Sum(e.I(t)) + b
205  if (!isActive) {
206  return 0.0;
207  }
208 
209  double E = 0;
210  for (auto& it : group->peerSet)
211  {
212  PriorityCarrier *peer = it.first;
213  if(peer != this)
214  {
215  for(size_t i=0; i<peer->excitation.size(); i++)
216  {
217  Value v = peer->excitation.get(i);
218  if(v.isList() && (v.asList()->size()>=2))
219  {
220  Bottle* b = v.asList();
221  // an exitatory to this priority carrier
222  if (sourceName == b->get(0).asString()) {
223  E += peer->getActualInput(t) * (b->get(1).asFloat64() / 10.0);
224  }
225  }
226  }
227 
228  }
229  }
230  E += bias;
231  double I = E * getActualStimulation(t);
232  return ((I<0) ? 0 : I); //I'(t)
233 }
234 
235 
241 {
242  //TODO: find the correct way to get the size of peerSet
243  int nConnections = 0;
244  for (auto it = peerSet.begin(); it != peerSet.end(); it++) {
245  nConnections++;
246  }
247 
248  // calculate matrices X, B, InvA and Y
249  X.resize(nConnections, 1);
250  B.resize(nConnections, 1);
251  Y.resize(nConnections, 1);
252  InvA.resize(nConnections, nConnections);
253  InvA.eye();
254 
255  int row = 0;
256  for(auto& rowItr : peerSet)
257  {
258  PriorityCarrier* peer = rowItr.first;
259  // call 'getActualStimulation' to update 'isActive'
260  peer->getActualStimulation(t);
261  double xi = (peer->isActive) ? STIMUL_THRESHOLD : 0.0;
262  B(row,0) = peer->bias * xi;
263  X(row,0) = xi;
264 
265  int col = 0;
266  for(auto& it : peerSet)
267  {
268  PriorityCarrier *peerCol = it.first;
269  for(size_t i=0; i<peerCol->excitation.size(); i++)
270  {
271  Value v = peerCol->excitation.get(i);
272  if(v.isList() && (v.asList()->size()>=2))
273  {
274  Bottle* b = v.asList();
275  // an exitatory link to this connection
276  if (peer->sourceName == b->get(0).asString()) {
277  InvA(row, col) = -(b->get(1).asFloat64() / 10.0) * xi;
278  }
279  }
280  }
281  col++;
282  }
283  row++;
284  }
285 
286  yCTrace(PRIORITYCARRIER, "A:\n %s", InvA.toString(1).c_str());
287 
288  // calclulating the determinant
289  double determinant = yarp::math::det(InvA);
290  if(determinant == 0)
291  {
292  yCError(PRIORITYCARRIER, "Inconsistent regulation! non-invertible weight matrix");
293  return false;
294  }
295 
296  // inverting the weight matrix
297  InvA = yarp::math::luinv(InvA);
298  Y = InvA * B;
299 
300  yCTrace(PRIORITYCARRIER, "X:\n %s", X.toString(1).c_str());
301  yCTrace(PRIORITYCARRIER, "B:\n %s", B.toString(1).c_str());
302  yCTrace(PRIORITYCARRIER, "Y:\n %s", Y.toString(1).c_str());
303 
304  return true;
305 }
306 
307 // Decide whether data should be accepted, for real.
309  PriorityCarrier *source) {
310 
311  bool accept;
312  // updates message's arrival time
313  double tNow = yarp::os::Time::now();
314  source->stimulate(tNow);
315 
316  if (!recalculate(tNow)) {
317  return false;
318  }
319 
320  int row = 0;
321  PriorityCarrier *maxPeer = nullptr;
322  double maxStimuli = 0.0;
323  for(auto& it : peerSet)
324  {
325  PriorityCarrier *peer = it.first;
326  double output = Y(row,0) * X(row,0);
327  peer->yi = output; // only for debug purpose
328 
329  if(!peer->isVirtual)
330  {
331  if(output > maxStimuli)
332  {
333  maxStimuli = output;
334  maxPeer = peer;
335  }
336  }
337  row++;
338  }
339  accept = (maxPeer == source);
340 
341  // a virtual message will never be delivered. It will be only
342  // used for the coordination
343  if (source->isVirtual) {
344  accept = false;
345  }
346 
347  return accept;
348 }
349 
350 
355 #ifdef WITH_PRIORITY_DEBUG
357 {
358  pcarrier = carrier;
359  count = 0;
360 }
361 
363 {
364  if (isRunning()) {
365  stop();
366  }
367 }
368 
370 {
372  v.resize(4);
373  // a vector of [t, S(t), S'(t), I'(t)]
374  double t = yarp::os::Time::now();
375  v[0] = t;
377  v[2] = pcarrier->yi;
378  debugPort.write();
379 }
380 
382 {
383  debugPortName = pcarrier->portName + pcarrier->sourceName + std::string(":debug");
384  return debugPort.open(debugPortName);
385 }
386 
388 {
389  debugPort.close();
390 }
391 
392 #endif //WITH_PRIORITY_DEBUG
float t
#define STIMUL_THRESHOLD
Allow priority-based message selection.
std::string sourceName
bool configure(yarp::os::ConnectionState &proto) override
Give carrier a shot at looking at how the connection is set up.
double getActualInput(double t)
void stimulate(double t)
bool acceptIncomingData(yarp::os::ConnectionReader &reader) override
Determine whether incoming data should be accepted.
yarp::os::Bottle excitation
double getActualStimulation(double t)
yarp::os::BufferedPort< yarp::sig::Vector > debugPort
~PriorityDebugThread() override
void threadRelease() override
Release method.
void run() override
Loop function.
PriorityCarrier * pcarrier
bool threadInit() override
Initialization method.
PriorityDebugThread(PriorityCarrier *carrier)
Class PriorityDebugThread.
std::string debugPortName
virtual bool acceptIncomingData(yarp::os::ConnectionReader &reader, PriorityCarrier *source)
bool recalculate(double t)
Class PriorityGroup.
A simple collection of objects that can be described and transmitted in a portable way.
Definition: Bottle.h:73
size_type size() const
Gets the number of elements in the bottle.
Definition: Bottle.cpp:251
Value & get(size_type index) const
Reads a Value v from a certain part of the list.
Definition: Bottle.cpp:246
void close() override
Stop port activity.
bool open(const std::string &name) override
Start port operation, with a specific name, with automatically-chosen network parameters.
void write(bool forceStrict=false)
Write the current object being returned by BufferedPort::prepare.
T & prepare()
Access the object which will be transmitted by the next call to yarp::os::BufferedPort::write.
An interface for reading from a network connection.
The basic state of a connection - route, streams in use, etc.
virtual const Route & getRoute() const =0
Get the route associated with this connection.
virtual std::string getSenderSpecifier() const =0
Extract a name for the sender, if the connection type supports that.
static LogCallback printCallback()
Get current print callback.
Definition: Log.cpp:880
static LogType minimumPrintLevel()
Get current minimum print level.
Definition: Log.cpp:833
@ LogTypeReserved
Definition: Log.h:97
static void unlock()
Call post() on a global mutual-exclusion semaphore allocated by YARP.
Definition: Network.cpp:1464
static void lock()
Call wait() on a global mutual-exclusion semaphore allocated by YARP.
Definition: Network.cpp:1459
An abstraction for a periodic thread.
bool isRunning() const
Returns true when the thread is started, false otherwise.
void stop()
Call this to stop the thread, this call blocks until the thread is terminated (and releaseThread() ca...
A class for storing options and configuration information.
Definition: Property.h:33
void fromString(const std::string &txt, bool wipe=true)
Interprets a string as a list of properties.
Definition: Property.cpp:1063
bool check(const std::string &key) const override
Check if there exists a property of the given name.
Definition: Property.cpp:1041
Bottle & findGroup(const std::string &key) const override
Gets a list corresponding to a given keyword.
Definition: Property.cpp:1142
const std::string & getToName() const
Get the destination of the route.
Definition: Route.cpp:103
const std::string & getFromName() const
Get the source of the route.
Definition: Route.cpp:93
A single value (typically within a Bottle).
Definition: Value.h:43
virtual yarp::conf::float64_t asFloat64() const
Get 64-bit floating point value.
Definition: Value.cpp:222
virtual bool isList() const
Checks if value is a list.
Definition: Value.cpp:162
virtual Bottle * asList() const
Get list value.
Definition: Value.cpp:240
virtual std::string asString() const
Get string value.
Definition: Value.cpp:234
void resize(size_t size) override
Resize the vector.
Definition: Vector.h:220
#define yCInfo(component,...)
Definition: LogComponent.h:171
#define yCError(component,...)
Definition: LogComponent.h:213
#define yCAssert(component, x)
Definition: LogComponent.h:240
#define yCTrace(component,...)
Definition: LogComponent.h:84
#define YARP_LOG_COMPONENT(name,...)
Definition: LogComponent.h:76
double det(const yarp::sig::Matrix &in)
Computes the determinant of a matrix (defined in Math.h).
Definition: math.cpp:607
yarp::sig::Matrix luinv(const yarp::sig::Matrix &in)
Invert a square matrix using LU-decomposition (defined in Math.h).
Definition: math.cpp:612
double now()
Return the current time in seconds, relative to an arbitrary starting point.
Definition: Time.cpp:121
An interface to the operating system, including Port based communication.