YARP
Yet Another Robot Platform
NetworkClock.cpp
Go to the documentation of this file.
1 /*
2  * Copyright (C) 2006-2020 Istituto Italiano di Tecnologia (IIT)
3  * All rights reserved.
4  *
5  * This software may be modified and distributed under the terms of the
6  * BSD-3-Clause license. See the accompanying LICENSE file for details.
7  */
8 
9 #include <yarp/os/NetworkClock.h>
10 
11 #include <yarp/conf/numeric.h>
12 #include <yarp/conf/system.h>
13 
14 #include <yarp/os/NestedContact.h>
15 #include <yarp/os/NetInt32.h>
16 #include <yarp/os/Network.h>
17 #include <yarp/os/Os.h>
18 #include <yarp/os/Port.h>
19 #include <yarp/os/PortReader.h>
20 #include <yarp/os/Semaphore.h>
21 #include <yarp/os/SystemClock.h>
22 #include <yarp/os/SystemInfo.h>
24 
25 #include <cstring>
26 #include <list>
27 #include <mutex>
28 #include <utility>
29 
30 
31 using namespace yarp::os;
32 using namespace yarp::os::impl;
33 
34 namespace {
35 YARP_OS_LOG_COMPONENT(NETWORKCLOCK, "yarp.os.NetworkClock")
36 }
37 
39 {
40 public:
41  Private();
42  ~Private() override;
43 
44  bool read(ConnectionReader& reader) override;
45 
46  std::string clockName;
47 
48  using Waiters = std::list<std::pair<double, Semaphore*>>;
51 
52  std::mutex listMutex;
53  std::mutex timeMutex;
54 
55  std::int32_t sec{0};
56  std::int32_t nsec{0};
57  double _time{0};
58  bool closing{false};
59  bool initted{false};
60 };
61 
63  clockName("/clock"),
64  waiters(new Waiters())
65 {
66 }
67 
69 {
70  listMutex.lock();
71  closing = true;
72  port.interrupt();
73 
74  auto waiter_it = waiters->begin();
75  while (waiter_it != waiters->end()) {
76  Semaphore* waiterSemaphore = waiter_it->second;
77  waiter_it = waiters->erase(waiter_it);
78  if (waiterSemaphore != nullptr) {
79  waiterSemaphore->post();
80  }
81  }
82  delete waiters;
83  listMutex.unlock();
84 
86  style.persistent = true;
87  NetworkBase::disconnect(clockName, port.getName(), style);
88 }
89 
91 {
92  Bottle bot;
93  bool ok = bot.read(reader);
94 
95  if (closing) {
96  _time = -1;
97  return false;
98  }
99 
100  if (!ok && !closing) {
101  yCError(NETWORKCLOCK, "Error reading clock port");
102  return false;
103  }
104 
105  timeMutex.lock();
106  double oldTime = _time;
107  sec = bot.get(0).asInt32();
108  nsec = bot.get(1).asInt32();
109  _time = sec + (nsec * 1e-9);
110  initted = true;
111  timeMutex.unlock();
112 
113  listMutex.lock();
114  auto waiter_it = waiters->begin();
115  if (oldTime > _time) {
116  // Update the wake-up time. In case of a time reset it closes the gap
117  // between the waiter and _time.
118  waiter_it->first = _time + (waiter_it->first - oldTime);
119  }
120 
121  while (waiter_it != waiters->end())
122  {
123  if (waiter_it->first - _time < 1E-12) {
124  Semaphore* waiterSemaphore = waiter_it->second;
125  waiter_it = waiters->erase(waiter_it);
126  if (waiterSemaphore != nullptr) {
127  waiterSemaphore->post();
128  }
129  } else {
130  ++waiter_it;
131  }
132  }
133  listMutex.unlock();
134  return true;
135 }
136 
137 
138 NetworkClock::NetworkClock() :
139  mPriv(new Private)
140 {
141 }
142 
144 {
145  yCWarning(NETWORKCLOCK, "Destroying network clock");
146  delete mPriv;
147 }
148 
149 bool NetworkClock::open(const std::string& clockSourcePortName, std::string localPortName)
150 {
151  mPriv->port.setReadOnly();
152  mPriv->port.setReader(*mPriv);
153  if (!clockSourcePortName.empty()) {
154  mPriv->clockName = clockSourcePortName;
155  }
156  NestedContact nc(mPriv->clockName);
157 
159  style.persistent = true;
160 
161  if (localPortName.empty()) {
162  const int MAX_STRING_SIZE = 255;
163  char hostName[MAX_STRING_SIZE];
164  yarp::os::gethostname(hostName, MAX_STRING_SIZE);
165 
167 
168  localPortName = "/";
169  // Ports may be anonymous to not pollute the yarp name list
170  localPortName += std::string(hostName) + "/" + processInfo.name + "/" + std::string(std::to_string(processInfo.pid)) + "/clock:i";
171  }
172 
173  // if receiving port cannot be opened, return false.
174  bool ret = mPriv->port.open(localPortName);
175  if (!ret) {
176  return false;
177  }
178 
179  if (nc.getNestedName().empty()) {
180  Contact src = NetworkBase::queryName(mPriv->clockName);
181 
182  ret = NetworkBase::connect(mPriv->clockName, mPriv->port.getName(), style);
183 
184  if (!src.isValid()) {
185  yCError(NETWORKCLOCK, "Cannot find time port \"%s\" or a time topic \"%s@\"\n", mPriv->clockName.c_str(), mPriv->clockName.c_str());
186  }
187  }
188 
189  return ret;
190 }
191 
193 {
194  mPriv->timeMutex.lock();
195  double result = mPriv->_time;
196  mPriv->timeMutex.unlock();
197  return result;
198 }
199 
200 void NetworkClock::delay(double seconds)
201 {
202  if (seconds <= 1E-12) {
203  return;
204  }
205 
206  mPriv->listMutex.lock();
207  if (mPriv->closing) {
208  // We are shutting down. The time signal is no longer available.
209  // Make a short delay and return.
210  mPriv->listMutex.unlock();
211  SystemClock::delaySystem(seconds);
212  return;
213  }
214 
215  std::pair<double, Semaphore*> waiter(now() + seconds, new Semaphore(0));
216  mPriv->waiters->push_back(waiter);
217  mPriv->listMutex.unlock();
218 
219  waiter.second->wait();
220  delete waiter.second;
221  waiter.second = nullptr;
222 }
223 
225 {
226  return mPriv->initted;
227 }
bool ret
std::list< std::pair< double, Semaphore * > > Waiters
bool read(ConnectionReader &reader) override
Read this object from a network connection.
A simple collection of objects that can be described and transmitted in a portable way.
Definition: Bottle.h:73
bool read(ConnectionReader &reader) override
Set the bottle's value based on input from a network connection.
Definition: Bottle.cpp:243
Value & get(size_type index) const
Reads a Value v from a certain part of the list.
Definition: Bottle.cpp:249
An interface for reading from a network connection.
Preferences for how to communicate with a contact.
Definition: ContactStyle.h:27
bool persistent
Specify whether a requested connection should be persistent.
Definition: ContactStyle.h:66
Represents how to reach a part of a YARP network.
Definition: Contact.h:39
bool isValid() const
Checks if a Contact is tagged as valid.
Definition: Contact.cpp:301
A placeholder for rich contact information.
Definition: NestedContact.h:27
std::string getNestedName() const
static bool connect(const std::string &src, const std::string &dest, const std::string &carrier="", bool quiet=true)
Request that an output port connect to an input port.
Definition: Network.cpp:685
static Contact queryName(const std::string &name)
Find out information about a registered name.
Definition: Network.cpp:998
void delay(double seconds) override
Wait for a certain number of seconds.
bool isValid() const override
Check if time is valid (non-zero).
bool open(const std::string &clockSourcePortName, std::string localPortName="")
double now() override
Return the current time in seconds, relative to an arbitrary starting point.
Interface implemented by all objects that can read themselves from the network, such as Bottle object...
Definition: PortReader.h:28
A mini-server for network communication.
Definition: Port.h:50
A class for thread synchronization and mutual exclusion.
Definition: Semaphore.h:29
void post()
Increment the counter.
Definition: Semaphore.cpp:114
static void delaySystem(double seconds)
Definition: SystemClock.cpp:32
static ProcessInfo getProcessInfo(int pid=0)
gets the operating system process information given by its PID.
Definition: SystemInfo.cpp:808
virtual std::int32_t asInt32() const
Get 32-bit integer value.
Definition: Value.cpp:207
#define yCError(component,...)
Definition: LogComponent.h:157
#define yCWarning(component,...)
Definition: LogComponent.h:146
#define YARP_OS_LOG_COMPONENT(name, name_string)
Definition: LogComponent.h:37
The components from which ports and connections are built.
An interface to the operating system, including Port based communication.
void gethostname(char *hostname, size_t size)
Portable wrapper for the gethostname() function.
Definition: Os.cpp:100
bool read(ImageOf< PixelRgb > &dest, const std::string &src, image_fileformat format=FORMAT_ANY)
Definition: ImageFile.cpp:656
The ProcessInfo struct provides the operating system process information.
Definition: SystemInfo.h:116