YARP
Yet Another Robot Platform
PriorityCarrier.cpp
Go to the documentation of this file.
1 /*
2  * Copyright (C) 2006-2020 Istituto Italiano di Tecnologia (IIT)
3  * All rights reserved.
4  *
5  * This software may be modified and distributed under the terms of the
6  * BSD-3-Clause license. See the accompanying LICENSE file for details.
7  */
8 
9 #include "PriorityCarrier.h"
10 
11 #include <yarp/os/Log.h>
12 #include <yarp/os/LogComponent.h>
14 #include <yarp/os/Route.h>
15 
16 #include <yarp/math/Math.h>
17 #include <yarp/math/SVD.h>
18 #include <string>
19 
20 
21 using namespace yarp::os;
22 using namespace yarp::math;
23 
24 namespace {
25 YARP_LOG_COMPONENT(PRIORITYCARRIER,
26  "yarp.carrier.priority",
30  nullptr)
31 }
32 
33 
38 ElectionOf<PriorityGroup> *PriorityCarrier::peers = nullptr;
39 
40 // Make a singleton manager for finding peer carriers.
41 ElectionOf<PriorityGroup>& PriorityCarrier::getPeers() {
43  if (peers==nullptr) {
44  peers = new ElectionOf<PriorityGroup>;
46  yCAssert(PRIORITYCARRIER, peers);
47  } else {
49  }
50  return *peers;
51 }
52 
53 // Decide whether data should be accepted.
55  getPeers().lock();
56  yCAssert(PRIORITYCARRIER, group);
57  bool result = group->acceptIncomingData(reader,this);
58  getPeers().unlock();
59  return result;
60 }
61 
62 // Read connection settings.
64  portName = proto.getRoute().getToName();
65  sourceName = proto.getRoute().getFromName();
66  group = getPeers().add(portName,this);
67  if (!group) return false;
68 
69  Property options;
70  options.fromString(proto.getSenderSpecifier());
71 
72  timeConstant = fabs(options.check("tc",Value(1.0)).asFloat64());
73  timeResting = fabs(options.check("tr",Value(0.0)).asFloat64());
74  stimulation = fabs(options.check("st",Value(STIMUL_THRESHOLD*10)).asFloat64());
75  // Zero stimulation is undefined and will be interpreted as S=Thresould.
76  if(stimulation == 0)
77  stimulation = STIMUL_THRESHOLD*10;
78  stimulation /= 10.0;
79 
80  bias = options.check("bs",Value(STIMUL_THRESHOLD*10)).asFloat64();
81  bias /= 10.0;
82 
83  excitation = options.findGroup("ex");
84  isVirtual = options.check("virtual");
85 
86 #ifdef WITH_PRIORITY_DEBUG
87  if(options.check("debug"))
88  {
89  std::string msg;
90  char dummy[1024];
91  std::snprintf(dummy, 1024, "\n%s:\n", sourceName.c_str());
92  msg+= dummy;
93  std::snprintf(dummy, 1024, " stimulation: %.2f\n", stimulation);
94  msg+= dummy;
95  std::snprintf(dummy, 1024, " bias: %.2f\n", bias);
96  msg+= dummy;
97  std::snprintf(dummy, 1024, " tc: %.2fs\n", timeConstant);
98  msg+= dummy;
99  std::snprintf(dummy, 1024, " tr: %.2fs\n", timeResting);
100  msg+= dummy;
101  std::snprintf(dummy, 1024, " ex: ");
102  msg+= dummy;
103  for(size_t i=0; i<excitation.size(); i++)
104  {
105  Value v = excitation.get(i);
106  if(v.isList() && (v.asList()->size()>=2))
107  {
108  Bottle* b = v.asList();
109  std::snprintf(dummy, 1024, "(%s, %.2f) ",
110  b->get(0).asString().c_str(),
111  b->get(1).asFloat64()/10.0 );
112  msg+= dummy;
113  }
114  }
115  //std::snprintf(dummy, 1024, "\n");
116  msg+= "\n";
117  std::snprintf(dummy, 1024, " virtual: %s\n",
118  (isVirtual)?"yes":"no");
119  msg+= dummy;
120  double rate = options.check("rate", Value(10)).asInt32() / 1000.0;
121  std::snprintf(dummy, 1024, " db.rate: %fs\n", rate);
122  msg+= dummy;
123  yCInfo(PRIORITYCARRIER, "%s", msg.c_str());
124  debugger.stop();
125  debugger.setPeriod(rate);
126  debugger.start();
127  }
128 #endif
129  return true;
130 }
131 
132 
134 {
135  //
136  // +P| ---___
137  // | | -__
138  // | | -_
139  // 0|------------------> (Active state)
140  // Ta Tc
141  //
142  // Ta Tr
143  // 0|-------------_----> (Resting state && P<0)
144  // | | __-
145  // | | ___-
146  // -P| ---
147  //
148  //
149  // P(t) = Pi * (1-exp((t-Tc-Ta)/Tc*5) + exp(-5))
150  // t:time, Pi: temporal Priority level
151  // Tc: reset time, Ta: arrival time
152  //
153  // we do not consider ports which has not seen any message
154  // from them yet.
155  //if(timeArrival == 0)
156  // return 0;
157 
158  double dt = t - timeArrival;
159  // Temporal priority is inverted if this is a neuron model and the temporal
160  // stimulation has already reached to STIMUL_THRESHOLD and waited for Tc.
161  if((timeResting > 0)
162  && (dt >= fabs(timeConstant))
163  && (temporalStimulation >= STIMUL_THRESHOLD))
164  temporalStimulation = -temporalStimulation;
165 
166  double actualStimulation;
167  if(!isResting(temporalStimulation)) // behavior is in stimulation state
168  {
169  // After a gap bigger than Tc, the
170  // priority is set to zero to avoid redundant calculation.
171  if(dt > fabs(timeConstant))
172  actualStimulation = 0;
173  else
174  actualStimulation = temporalStimulation *
175  (1.0 - exp((dt-timeConstant)/timeConstant*5.0) + exp(-5.0));
176  }
177  else // behavior is in resting state
178  {
179  // it is in waiting state for Tc
180  if(temporalStimulation > 0)
181  actualStimulation = temporalStimulation;
182  else
183  {
184  dt -= fabs(timeConstant);
185  // After a gap bigger than Tr, the
186  // priority is set to zero to avoid redundant calculation.
187  if(dt > fabs(timeResting))
188  actualStimulation = 0;
189  else
190  actualStimulation = temporalStimulation *
191  (1.0 - exp((dt-timeResting)/timeResting*5.0) + exp(-5.0));
192  }
193  }
194 
195  if(actualStimulation <= 0)
196  isActive = false;
197 
198  return actualStimulation;
199 }
200 
202 {
203  // calculating E(t) = Sum(e.I(t)) + b
204  if(!isActive)
205  return 0.0;
206 
207  double E = 0;
208  for (auto& it : group->peerSet)
209  {
210  PriorityCarrier *peer = it.first;
211  if(peer != this)
212  {
213  for(size_t i=0; i<peer->excitation.size(); i++)
214  {
215  Value v = peer->excitation.get(i);
216  if(v.isList() && (v.asList()->size()>=2))
217  {
218  Bottle* b = v.asList();
219  // an exitatory to this priority carrier
220  if(sourceName == b->get(0).asString())
221  E += peer->getActualInput(t) * (b->get(1).asFloat64()/10.0);
222  }
223  }
224 
225  }
226  }
227  E += bias;
228  double I = E * getActualStimulation(t);
229  return ((I<0) ? 0 : I); //I'(t)
230 }
231 
232 
238 {
239  //TODO: find the correct way to get the size of peerSet
240  int nConnections = 0;
241  for(auto it=peerSet.begin(); it!=peerSet.end(); it++)
242  nConnections++;
243 
244  // calculate matrices X, B, InvA and Y
245  X.resize(nConnections, 1);
246  B.resize(nConnections, 1);
247  Y.resize(nConnections, 1);
248  InvA.resize(nConnections, nConnections);
249  InvA.eye();
250 
251  int row = 0;
252  for(auto& rowItr : peerSet)
253  {
254  PriorityCarrier* peer = rowItr.first;
255  // call 'getActualStimulation' to update 'isActive'
256  peer->getActualStimulation(t);
257  double xi = (peer->isActive) ? STIMUL_THRESHOLD : 0.0;
258  B(row,0) = peer->bias * xi;
259  X(row,0) = xi;
260 
261  int col = 0;
262  for(auto& it : peerSet)
263  {
264  PriorityCarrier *peerCol = it.first;
265  for(size_t i=0; i<peerCol->excitation.size(); i++)
266  {
267  Value v = peerCol->excitation.get(i);
268  if(v.isList() && (v.asList()->size()>=2))
269  {
270  Bottle* b = v.asList();
271  // an exitatory link to this connection
272  if(peer->sourceName == b->get(0).asString())
273  InvA(row,col) = -(b->get(1).asFloat64()/10.0)*xi;
274  }
275  }
276  col++;
277  }
278  row++;
279  }
280 
281  yCTrace(PRIORITYCARRIER, "A:\n %s", InvA.toString(1).c_str());
282 
283  // calclulating the determinant
284  double determinant = yarp::math::det(InvA);
285  if(determinant == 0)
286  {
287  yCError(PRIORITYCARRIER, "Inconsistent regulation! non-invertible weight matrix");
288  return false;
289  }
290 
291  // inverting the weight matrix
292  InvA = yarp::math::luinv(InvA);
293  Y = InvA * B;
294 
295  yCTrace(PRIORITYCARRIER, "X:\n %s", X.toString(1).c_str());
296  yCTrace(PRIORITYCARRIER, "B:\n %s", B.toString(1).c_str());
297  yCTrace(PRIORITYCARRIER, "Y:\n %s", Y.toString(1).c_str());
298 
299  return true;
300 }
301 
302 // Decide whether data should be accepted, for real.
304  PriorityCarrier *source) {
305 
306  bool accept;
307  // updates message's arrival time
308  double tNow = yarp::os::Time::now();
309  source->stimulate(tNow);
310 
311  if(!recalculate(tNow))
312  return false;
313 
314  int row = 0;
315  PriorityCarrier *maxPeer = nullptr;
316  double maxStimuli = 0.0;
317  for(auto& it : peerSet)
318  {
319  PriorityCarrier *peer = it.first;
320  double output = Y(row,0) * X(row,0);
321  peer->yi = output; // only for debug purpose
322 
323  if(!peer->isVirtual)
324  {
325  if(output > maxStimuli)
326  {
327  maxStimuli = output;
328  maxPeer = peer;
329  }
330  }
331  row++;
332  }
333  accept = (maxPeer == source);
334 
335  // a virtual message will never be delivered. It will be only
336  // used for the coordination
337  if(source->isVirtual)
338  accept = false;
339 
340  return accept;
341 }
342 
343 
348 #ifdef WITH_PRIORITY_DEBUG
350 {
351  pcarrier = carrier;
352  count = 0;
353 }
354 
356 {
357  if(isRunning()) stop();
358 }
359 
361 {
363  v.resize(4);
364  // a vector of [t, S(t), S'(t), I'(t)]
365  double t = yarp::os::Time::now();
366  v[0] = t;
368  v[2] = pcarrier->yi;
369  debugPort.write();
370 }
371 
373 {
374  debugPortName = pcarrier->portName + pcarrier->sourceName + std::string(":debug");
375  return debugPort.open(debugPortName);
376 }
377 
379 {
380  debugPort.close();
381 }
382 
383 #endif //WITH_PRIORITY_DEBUG
float t
#define STIMUL_THRESHOLD
Allow priority-based message selection.
std::string sourceName
bool configure(yarp::os::ConnectionState &proto) override
Give carrier a shot at looking at how the connection is set up.
double getActualInput(double t)
void stimulate(double t)
bool acceptIncomingData(yarp::os::ConnectionReader &reader) override
Determine whether incoming data should be accepted.
yarp::os::Bottle excitation
double getActualStimulation(double t)
yarp::os::BufferedPort< yarp::sig::Vector > debugPort
~PriorityDebugThread() override
void threadRelease() override
Release method.
void run() override
Loop function.
PriorityCarrier * pcarrier
bool threadInit() override
Initialization method.
PriorityDebugThread(PriorityCarrier *carrier)
Class PriorityDebugThread.
std::string debugPortName
virtual bool acceptIncomingData(yarp::os::ConnectionReader &reader, PriorityCarrier *source)
bool recalculate(double t)
Class PriorityGroup.
A simple collection of objects that can be described and transmitted in a portable way.
Definition: Bottle.h:73
size_type size() const
Gets the number of elements in the bottle.
Definition: Bottle.cpp:254
Value & get(size_type index) const
Reads a Value v from a certain part of the list.
Definition: Bottle.cpp:249
void close() override
Stop port activity.
bool open(const std::string &name) override
Start port operation, with a specific name, with automatically-chosen network parameters.
void write(bool forceStrict=false)
Write the current object being returned by BufferedPort::prepare.
T & prepare()
Access the object which will be transmitted by the next call to yarp::os::BufferedPort::write.
An interface for reading from a network connection.
The basic state of a connection - route, streams in use, etc.
virtual const Route & getRoute() const =0
Get the route associated with this connection.
virtual std::string getSenderSpecifier() const =0
Extract a name for the sender, if the connection type supports that.
static LogCallback printCallback()
Get current print callback.
Definition: Log.cpp:852
static LogType minimumPrintLevel()
Get current minimum print level.
Definition: Log.cpp:805
@ LogTypeReserved
Definition: Log.h:83
static void unlock()
Call post() on a global mutual-exclusion semaphore allocated by YARP.
Definition: Network.cpp:1467
static void lock()
Call wait() on a global mutual-exclusion semaphore allocated by YARP.
Definition: Network.cpp:1462
An abstraction for a periodic thread.
bool isRunning() const
Returns true when the thread is started, false otherwise.
void stop()
Call this to stop the thread, this call blocks until the thread is terminated (and releaseThread() ca...
A class for storing options and configuration information.
Definition: Property.h:37
void fromString(const std::string &txt, bool wipe=true)
Interprets a string as a list of properties.
Definition: Property.cpp:1046
bool check(const std::string &key) const override
Check if there exists a property of the given name.
Definition: Property.cpp:1024
Bottle & findGroup(const std::string &key) const override
Gets a list corresponding to a given keyword.
Definition: Property.cpp:1125
const std::string & getToName() const
Get the destination of the route.
Definition: Route.cpp:106
const std::string & getFromName() const
Get the source of the route.
Definition: Route.cpp:96
A single value (typically within a Bottle).
Definition: Value.h:47
virtual yarp::conf::float64_t asFloat64() const
Get 64-bit floating point value.
Definition: Value.cpp:225
virtual bool isList() const
Checks if value is a list.
Definition: Value.cpp:165
virtual Bottle * asList() const
Get list value.
Definition: Value.cpp:243
virtual std::string asString() const
Get string value.
Definition: Value.cpp:237
void resize(size_t size) override
Resize the vector.
Definition: Vector.h:254
#define yCInfo(component,...)
Definition: LogComponent.h:135
#define yCError(component,...)
Definition: LogComponent.h:157
#define yCAssert(component, x)
Definition: LogComponent.h:172
#define yCTrace(component,...)
Definition: LogComponent.h:88
#define YARP_LOG_COMPONENT(name,...)
Definition: LogComponent.h:80
double det(const yarp::sig::Matrix &in)
Computes the determinant of a matrix (defined in Math.h).
Definition: math.cpp:583
yarp::sig::Matrix luinv(const yarp::sig::Matrix &in)
Invert a square matrix using LU-decomposition (defined in Math.h).
Definition: math.cpp:588
double now()
Return the current time in seconds, relative to an arbitrary starting point.
Definition: Time.cpp:124
An interface to the operating system, including Port based communication.