YARP
Yet Another Robot Platform
NetworkClock.cpp
Go to the documentation of this file.
1 /*
2  * SPDX-FileCopyrightText: 2006-2021 Istituto Italiano di Tecnologia (IIT)
3  * SPDX-License-Identifier: BSD-3-Clause
4  */
5 
6 #include <yarp/os/NetworkClock.h>
7 
8 #include <yarp/conf/numeric.h>
9 #include <yarp/conf/system.h>
10 
11 #include <yarp/os/NestedContact.h>
12 #include <yarp/os/NetInt32.h>
13 #include <yarp/os/Network.h>
14 #include <yarp/os/Os.h>
15 #include <yarp/os/Port.h>
16 #include <yarp/os/PortReader.h>
17 #include <yarp/os/Semaphore.h>
18 #include <yarp/os/SystemClock.h>
19 #include <yarp/os/SystemInfo.h>
21 
22 #include <cstring>
23 #include <list>
24 #include <mutex>
25 #include <utility>
26 
27 
28 using namespace yarp::os;
29 using namespace yarp::os::impl;
30 
31 namespace {
32 YARP_OS_LOG_COMPONENT(NETWORKCLOCK, "yarp.os.NetworkClock")
33 }
34 
36 {
37 public:
38  Private();
39  ~Private() override;
40 
41  bool read(ConnectionReader& reader) override;
42 
43  std::string clockName;
44 
45  using Waiters = std::list<std::pair<double, Semaphore*>>;
48 
49  std::mutex listMutex;
50  std::mutex timeMutex;
51 
52  std::int32_t sec{0};
53  std::int32_t nsec{0};
54  double _time{0};
55  bool closing{false};
56  bool initted{false};
57 };
58 
60  clockName("/clock"),
61  waiters(new Waiters())
62 {
63 }
64 
66 {
67  listMutex.lock();
68  closing = true;
69  port.interrupt();
70 
71  auto waiter_it = waiters->begin();
72  while (waiter_it != waiters->end()) {
73  Semaphore* waiterSemaphore = waiter_it->second;
74  waiter_it = waiters->erase(waiter_it);
75  if (waiterSemaphore != nullptr) {
76  waiterSemaphore->post();
77  }
78  }
79  delete waiters;
80  listMutex.unlock();
81 
83  style.persistent = true;
84  NetworkBase::disconnect(clockName, port.getName(), style);
85 }
86 
88 {
89  Bottle bot;
90  bool ok = bot.read(reader);
91 
92  if (closing) {
93  _time = -1;
94  return false;
95  }
96 
97  if (!ok && !closing) {
98  yCError(NETWORKCLOCK, "Error reading clock port");
99  return false;
100  }
101 
102  timeMutex.lock();
103  double oldTime = _time;
104  sec = bot.get(0).asInt32();
105  nsec = bot.get(1).asInt32();
106  _time = sec + (nsec * 1e-9);
107  initted = true;
108  timeMutex.unlock();
109 
110  listMutex.lock();
111  auto waiter_it = waiters->begin();
112  if (oldTime > _time) {
113  // Update the wake-up time. In case of a time reset it closes the gap
114  // between the waiter and _time.
115  waiter_it->first = _time + (waiter_it->first - oldTime);
116  }
117 
118  while (waiter_it != waiters->end())
119  {
120  if (waiter_it->first - _time < 1E-12) {
121  Semaphore* waiterSemaphore = waiter_it->second;
122  waiter_it = waiters->erase(waiter_it);
123  if (waiterSemaphore != nullptr) {
124  waiterSemaphore->post();
125  }
126  } else {
127  ++waiter_it;
128  }
129  }
130  listMutex.unlock();
131  return true;
132 }
133 
134 
135 NetworkClock::NetworkClock() :
136  mPriv(new Private)
137 {
138 }
139 
141 {
142  yCWarning(NETWORKCLOCK, "Destroying network clock");
143  delete mPriv;
144 }
145 
146 bool NetworkClock::open(const std::string& clockSourcePortName, std::string localPortName)
147 {
148  mPriv->port.setReadOnly();
149  mPriv->port.setReader(*mPriv);
150  if (!clockSourcePortName.empty()) {
151  mPriv->clockName = clockSourcePortName;
152  }
153  NestedContact nc(mPriv->clockName);
154 
156  style.persistent = true;
157 
158  if (localPortName.empty()) {
159  const int MAX_STRING_SIZE = 255;
160  char hostName[MAX_STRING_SIZE];
161  yarp::os::gethostname(hostName, MAX_STRING_SIZE);
162 
164 
165  localPortName = "/";
166  // Ports may be anonymous to not pollute the yarp name list
167  localPortName += std::string(hostName) + "/" + processInfo.name + "/" + std::string(std::to_string(processInfo.pid)) + "/clock:i";
168  }
169 
170  // if receiving port cannot be opened, return false.
171  bool ret = mPriv->port.open(localPortName);
172  if (!ret) {
173  return false;
174  }
175 
176  if (nc.getNestedName().empty()) {
177  Contact src = NetworkBase::queryName(mPriv->clockName);
178 
179  ret = NetworkBase::connect(mPriv->clockName, mPriv->port.getName(), style);
180 
181  if (!src.isValid()) {
182  yCError(NETWORKCLOCK, "Cannot find time port \"%s\" or a time topic \"%s@\"\n", mPriv->clockName.c_str(), mPriv->clockName.c_str());
183  }
184  }
185 
186  return ret;
187 }
188 
190 {
191  mPriv->timeMutex.lock();
192  double result = mPriv->_time;
193  mPriv->timeMutex.unlock();
194  return result;
195 }
196 
197 void NetworkClock::delay(double seconds)
198 {
199  if (seconds <= 1E-12) {
200  return;
201  }
202 
203  mPriv->listMutex.lock();
204  if (mPriv->closing) {
205  // We are shutting down. The time signal is no longer available.
206  // Make a short delay and return.
207  mPriv->listMutex.unlock();
208  SystemClock::delaySystem(seconds);
209  return;
210  }
211 
212  std::pair<double, Semaphore*> waiter(now() + seconds, new Semaphore(0));
213  mPriv->waiters->push_back(waiter);
214  mPriv->listMutex.unlock();
215 
216  waiter.second->wait();
217  delete waiter.second;
218  waiter.second = nullptr;
219 }
220 
222 {
223  return mPriv->initted;
224 }
bool ret
std::list< std::pair< double, Semaphore * > > Waiters
bool read(ConnectionReader &reader) override
Read this object from a network connection.
A simple collection of objects that can be described and transmitted in a portable way.
Definition: Bottle.h:74
bool read(ConnectionReader &reader) override
Set the bottle's value based on input from a network connection.
Definition: Bottle.cpp:240
Value & get(size_type index) const
Reads a Value v from a certain part of the list.
Definition: Bottle.cpp:246
An interface for reading from a network connection.
Preferences for how to communicate with a contact.
Definition: ContactStyle.h:24
bool persistent
Specify whether a requested connection should be persistent.
Definition: ContactStyle.h:63
Represents how to reach a part of a YARP network.
Definition: Contact.h:36
bool isValid() const
Checks if a Contact is tagged as valid.
Definition: Contact.cpp:298
A placeholder for rich contact information.
Definition: NestedContact.h:24
std::string getNestedName() const
static bool connect(const std::string &src, const std::string &dest, const std::string &carrier="", bool quiet=true)
Request that an output port connect to an input port.
Definition: Network.cpp:682
static Contact queryName(const std::string &name)
Find out information about a registered name.
Definition: Network.cpp:995
void delay(double seconds) override
Wait for a certain number of seconds.
bool isValid() const override
Check if time is valid (non-zero).
bool open(const std::string &clockSourcePortName, std::string localPortName="")
double now() override
Return the current time in seconds, relative to an arbitrary starting point.
Interface implemented by all objects that can read themselves from the network, such as Bottle object...
Definition: PortReader.h:25
A mini-server for network communication.
Definition: Port.h:47
A class for thread synchronization and mutual exclusion.
Definition: Semaphore.h:26
void post()
Increment the counter.
Definition: Semaphore.cpp:111
static void delaySystem(double seconds)
Definition: SystemClock.cpp:29
static ProcessInfo getProcessInfo(int pid=0)
gets the operating system process information given by its PID.
Definition: SystemInfo.cpp:805
virtual std::int32_t asInt32() const
Get 32-bit integer value.
Definition: Value.cpp:204
#define yCError(component,...)
Definition: LogComponent.h:154
#define yCWarning(component,...)
Definition: LogComponent.h:143
#define YARP_OS_LOG_COMPONENT(name, name_string)
Definition: LogComponent.h:34
std::string to_string(IntegerType x)
Definition: numeric.h:115
The components from which ports and connections are built.
An interface to the operating system, including Port based communication.
void gethostname(char *hostname, size_t size)
Portable wrapper for the gethostname() function.
Definition: Os.cpp:97
bool read(ImageOf< PixelRgb > &dest, const std::string &src, image_fileformat format=FORMAT_ANY)
Definition: ImageFile.cpp:923
The ProcessInfo struct provides the operating system process information.
Definition: SystemInfo.h:113