YARP
Yet Another Robot Platform
 
Loading...
Searching...
No Matches
main.cpp
Go to the documentation of this file.
1/*
2 * SPDX-FileCopyrightText: 2006-2021 Istituto Italiano di Tecnologia (IIT)
3 * SPDX-License-Identifier: LGPL-2.1-or-later
4 */
5
6#include <yarp/os/Network.h>
7#include <yarp/os/Port.h>
8#include <yarp/os/Bottle.h>
9#include <yarp/os/Time.h>
10#include <yarp/os/Vocab.h>
11#include <yarp/os/RFModule.h>
12#include <yarp/os/RpcClient.h>
13#include <yarp/os/WireLink.h>
14#include <yarp/dataplayer/YarpDataplayer.h>
16
17#include <string>
18#include <cstdio>
19#include <iostream>
20
21#define LOGO_MESSAGE "\
22__ _____ ________ ___/ /__ _/ /____ ____ / /__ ___ _____ ____ \n\
23/ // / _ `/ __/ _ \\/ _ / _ `/ __/ _ `/ _ \\/ / _ `/ // / -_) __/ \n\
24\\_, /\\_,_/_/ / .__/\\_,_/\\_,_/\\__/\\_,_/ .__/_/\\_,_/\\_, /\\__/_/ \n\
25/___/ /_/ /_/ /___/ "
26
27
28using namespace yarp::os;
29using namespace yarp::yarpDataplayer;
30
32{
33 DataplayerUtilities* utilities = nullptr;
34 std::vector<yarp::yarpDataplayer::RowInfo> rowInfoVec;
35 int subDirCnt;
36 yarp::os::RpcServer rpcPort;
37 bool verbose;
38 std::string dataset;
39 std::string status;
40
41 float progress;
42
43 /**********************************************************/
44 bool configure(yarp::os::ResourceFinder &rf) override
45 {
46 std::string moduleName = rf.check("module_name", Value("yarpdataplayer")).asString();
47 bool add_prefix = rf.check("add_prefix", Value(false)).asBool();
48 verbose = rf.check("verbose",Value(false)).asBool();
49 dataset = rf.check("dataset",Value("")).asString();
50
51 utilities = new DataplayerUtilities(moduleName, add_prefix, verbose);
52 if (rf.check("withExtraTimeCol"))
53 {
54 utilities->withExtraColumn = true;
55 utilities->column = rf.find("withExtraTimeCol").asInt32();
56 }
57
58 utilities->dataplayerEngine->stepfromCmd = false;
59 subDirCnt = 0;
60
61 rpcPort.open("/yarpdataplayer/rpc:i");
62 attach(rpcPort);
63
64 progress = 0.0;
65 status = "";
66
67 std::cout<<std::endl<<std::endl<<LOGO_MESSAGE<<std::endl<<std::endl;
68
69 if (!dataset.empty())
70 {
71 load(dataset);
72 std::cout << std::endl << std::endl;
73 }
74 return true;
75 }
76
77 /**********************************************************/
78 bool attach(Port& source) override
79 {
80 return this->yarp().attachAsServer(source);
81 }
82
83 /**********************************************************/
84 bool attach(RpcServer &source) override
85 {
86 return this->yarp().attachAsServer(source);
87 }
88
89 /**********************************************************/
90 bool interruptModule() override
91 {
92 rpcPort.interrupt();
93 return true;
94 }
95
96 /**********************************************************/
97 bool close() override
98 {
99 rpcPort.interrupt();
100 rpcPort.close();
101
102 if (utilities)
103 {
104 delete utilities;
105 utilities = nullptr;
106 }
107
108 return true;
109 }
110
111 /**********************************************************/
112 double getPeriod() override
113 {
114 return 1.0;
115 }
116
117 /**********************************************************/
118 bool updateModule() override
119 {
120 return true;
121 }
122
123 /**********************************************************/
124 bool load(const std::string &filename) override
125 {
126 utilities->resetMaxTimeStamp();
127 subDirCnt = utilities->getRecSubDirList(filename, rowInfoVec, 1);
128 if (verbose)
129 {
130 yInfo() << "the size of subDirs is: " << subDirCnt;
131 }
132 //reset totalSent to 0
133 utilities->totalSent = 0;
134 utilities->totalThreads = subDirCnt;
135 if (subDirCnt > 0)
136 {
137 utilities->partDetails = new yarp::yarpDataplayer::PartsData [subDirCnt];
138 }
139
140 //fill in parts with all data
141 for (int x=0; x < subDirCnt; x++)
142 {
143 utilities->partDetails[x].name = rowInfoVec[x].name;
144 utilities->partDetails[x].infoFile = rowInfoVec[x].info;
145 utilities->partDetails[x].logFile = rowInfoVec[x].log;
146 utilities->partDetails[x].path = rowInfoVec[x].path;
147
148 utilities->setupDataFromParts(utilities->partDetails[x]);
149
150 utilities->partDetails[x].worker = new yarp::yarpDataplayer::DataplayerWorker(x, subDirCnt);
151 utilities->partDetails[x].worker->setManager(utilities);
152 }
153
154 //get the max timestamp of all the parts for synchronization
155 if (subDirCnt > 0)
156 {
157 utilities->getMaxTimeStamp();
158 }
159
160 if (subDirCnt > 0){
161 utilities->getMinTimeStamp();
162 }
163
164
165 //set initial frames for all parts depending on first timestamps
166 for (int x=0; x < subDirCnt; x++)
167 {
168 utilities->initialFrame.push_back(utilities->partDetails[x].currFrame);
169
170 double totalTime = 0.0;
171 double final = utilities->partDetails[x].timestamp[utilities->partDetails[x].timestamp.length()-1];
172 double initial = utilities->partDetails[x].timestamp[utilities->partDetails[x].currFrame];
173
174 totalTime = final - initial;
175 if (verbose)
176 {
177 yInfo() << "The part " << utilities->partDetails[x].name << " should last for: " << totalTime
178 << " with " << utilities->partDetails[x].maxFrame << " frames";
179 }
180 utilities->configurePorts(utilities->partDetails[x]);
181 }
182
183 utilities->dataplayerEngine->setNumPart(subDirCnt);
184
185
186 if (subDirCnt > 0)
187 {
188 return true;
189 }
190 else
191 {
192 return false;
193 }
194 }
195
196 /**********************************************************/
197 bool play() override
198 {
199 if (subDirCnt > 0)
200 {
201 bool allPartsStatus = utilities->dataplayerEngine->getAllPartsStatus();
202 if (allPartsStatus)
203 {
204 if (verbose)
205 {
206 yInfo() << "asking the threads to stop...";
207 }
208 if (utilities->dataplayerEngine->isSuspended())
209 {
210 utilities->dataplayerEngine->resume();
211 }
212
213 utilities->dataplayerEngine->stop();
214 if (verbose)
215 {
216 yInfo() << "done stopping!";
217 }
218 for (int i = 0; i < subDirCnt; i++) {
219 utilities->partDetails[i].currFrame = 1;
220 }
221
222 if (verbose)
223 {
224 yInfo() << "done stopping the thread...";
225 }
226 utilities->dataplayerEngine->setAllPartsStatus(false);
227 }
228
229 if ( utilities->dataplayerEngine->isSuspended() )
230 {
231 if (verbose)
232 {
233 yInfo() << "asking the thread to resume";
234 }
235
236 for (int i = 0; i < subDirCnt; i++) {
237 utilities->partDetails[i].worker->resetTime();
238 }
239
240 utilities->dataplayerEngine->resume();
241 }
242 else if (!utilities->dataplayerEngine->isRunning())
243 {
244 if (verbose)
245 {
246 yInfo() << "asking the thread to start";
247 yInfo() <<"initializing the workers...";
248 }
249
250 for (int i = 0; i < subDirCnt; i++) {
251 utilities->partDetails[i].worker->init();
252 }
253
254 if (verbose)
255 {
256 yInfo() << "starting the master thread...";
257 }
258 utilities->dataplayerEngine->start();
259 }
260 status = "playing";
261 return true;
262 }
263 else
264 {
265 if (verbose)
266 {
267 yError() << "No dataset loaded";
268 }
269 return false;
270 }
271 }
272
273 /**********************************************************/
274 bool pause() override
275 {
276 if (subDirCnt > 0)
277 {
278 if (verbose)
279 {
280 yInfo() << "asking the threads to pause...";
281 }
282 utilities->dataplayerEngine->pause();
283 status = "paused";
284 return true;
285 }
286 else
287 {
288 if (verbose)
289 {
290 yError() << "No dataset loaded";
291 }
292 return false;
293 }
294 }
295
296 /**********************************************************/
297 bool resume() override
298 {
299 if (subDirCnt > 0)
300 {
301 if (verbose)
302 {
303 yInfo() << "asking the threads to resume...";
304 }
305 utilities->dataplayerEngine->resume();
306 return true;
307 }
308 else
309 {
310 if (verbose)
311 {
312 yError() << "No dataset loaded";
313 }
314 return false;
315 }
316 }
317
318 /**********************************************************/
319 bool stop() override
320 {
321 if (subDirCnt > 0)
322 {
323 if (verbose)
324 {
325 yInfo() << "asking the threads to stop...";
326 }
327 if (utilities->dataplayerEngine->isSuspended()){
328 utilities->dataplayerEngine->resume();
329 }
330
331 utilities->dataplayerEngine->stop();
332 if (verbose)
333 {
334 yInfo() << "done stopping!";
335 }
336 for (int i = 0; i < subDirCnt; i++) {
337 utilities->partDetails[i].currFrame = 1;
338 }
339
340 if (verbose)
341 {
342 yInfo() << "done stopping the thread...";
343 }
344 status = "stopped";
345 return true;
346 }
347 else
348 {
349 if (verbose)
350 {
351 yError() << "No dataset loaded";
352 }
353 return false;
354 }
355 }
356
357 /**********************************************************/
358 bool enable(const std::string &part) override
359 {
360 for (int i=0; i < subDirCnt; i++)
361 {
362 std::string partname = utilities->partDetails[i].name;
363 if (strcmp( partname.c_str() , part.c_str()) == 0)
364 {
365 if (verbose)
366 {
367 yInfo() << "Enabling" << partname;
368 }
369 utilities->dataplayerEngine->setPart(i, true);
370 return true;
371 break;
372 }
373 }
374 return false;
375 }
376
377
378 /**********************************************************/
379 bool disable(const std::string &part) override
380 {
381 for (int i=0; i < subDirCnt; i++)
382 {
383 std::string partname = utilities->partDetails[i].name;
384 if (strcmp( partname.c_str() , part.c_str()) == 0)
385 {
386 if (verbose)
387 {
388 yInfo() << "Disabling" << partname;
389 }
390 utilities->dataplayerEngine->setPart(i, false);
391 return true;
392 break;
393 }
394 }
395 return false;
396 }
397
398 /**********************************************************/
399 std::vector<std::string> getAllParts() override
400 {
401 std::vector<std::string> parts(subDirCnt, "");
402 if (subDirCnt > 0)
403 {
404 for (int i=0; i < subDirCnt; i++)
405 {
406 std::string partname = utilities->partDetails[i].name;
407 parts[i] = partname;
408 }
409 }
410 return parts;
411 }
412
413 /**********************************************************/
414 std::string getPortName(const std::string &part) override
415 {
416 std::string portname = "";
417 if (subDirCnt > 0)
418 {
419 for (int i=0; i < subDirCnt; i++)
420 {
421 if (utilities->partDetails[i].name == part)
422 {
423 portname = utilities->partDetails[i].portName;
424 }
425 }
426 }
427 return portname;
428 }
429
430 /**********************************************************/
431 bool setPortName(const std::string &part, const std::string &new_name) override
432 {
433 if (subDirCnt > 0)
434 {
435 for (int i=0; i < subDirCnt; i++)
436 {
437 std::string partname = utilities->partDetails[i].name;
438 if (partname == part)
439 {
440 utilities->partDetails[i].portName = new_name;
441 utilities->configurePorts(utilities->partDetails[i]);
442 return true;
443 }
444 }
445 }
446 return false;
447 }
448
449 /**********************************************************/
450 int getFrame(const std::string &partname) override
451 {
452 if (subDirCnt > 0)
453 {
454 for (int i=0; i < subDirCnt; i++)
455 {
456 if (strcmp (partname.c_str(), utilities->partDetails[i].name.c_str()) == 0)
457 {
458 return utilities->partDetails[i].currFrame;
459 }
460 }
461 }
462 return -1;
463 }
464
465 /**********************************************************/
466 bool setFrame(const int frameNum) override
467 {
468 if (subDirCnt > 0)
469 {
470 if (verbose)
471 {
472 yInfo() << "setting initial frame to " << frameNum;
473 }
474 for (int i=0; i < subDirCnt; i++)
475 {
476 utilities->dataplayerEngine->virtualTime = utilities->partDetails[i].timestamp[utilities->partDetails[i].currFrame];
477 utilities->partDetails[i].currFrame = frameNum;
478 }
479 utilities->dataplayerEngine->virtualTime = utilities->partDetails[0].timestamp[utilities->partDetails[0].currFrame];
480 return true;
481 }
482
483 return false;
484 }
485
486 /**********************************************************/
487 bool setSpeed(const double speed) override
488 {
489 if (verbose)
490 {
491 yInfo() << "Setting speed to" << speed;
492 }
493 utilities->speed = speed;
494 return true;
495 }
496
497 /**********************************************************/
498 double getSpeed() override
499 {
500 return utilities->speed;
501 }
502
503 /**********************************************************/
504 bool step() override
505 {
506 if (subDirCnt > 0)
507 {
508 utilities->stepThread();
509 return true;
510 }
511 return false;
512 }
513
514 /**********************************************************/
515 bool repeat(const bool val=false) override
516 {
517 if (verbose)
518 {
519 yInfo() << "Setting repeat mode to" << val;
520 }
521 utilities->repeat = val;
522 return true;
523 }
524
525 /**********************************************************/
526 bool setStrict(const bool val=false) override
527 {
528 if (verbose)
529 {
530 yInfo() << "Setting strict mode to" << val;
531 }
532 utilities->sendStrict = val;
533 return true;
534 }
535
536 /**********************************************************/
537 bool forward(const int steps=5) override
538 {
539 if (subDirCnt > 0)
540 {
541 if (verbose)
542 {
543 yInfo() << "Going forward of" << steps << "steps";
544 }
545 utilities->dataplayerEngine->forward(steps);
546 return true;
547 }
548 return false;
549 }
550
551 /**********************************************************/
552 bool backward(const int steps=5) override
553 {
554 if (subDirCnt > 0)
555 {
556 if (verbose)
557 {
558 yInfo() << "Going backward of" << steps << "steps";
559 }
560 utilities->dataplayerEngine->backward(steps);
561 return true;
562 }
563 return false;
564 }
565
566 /**********************************************************/
567 double getProgress() override
568 {
569 if (subDirCnt > 0)
570 {
571 double prog = 0.0;
572 int nactivParts = 0;
573 for (int x=0; x < subDirCnt; x++)
574 {
575 if (utilities->dataplayerEngine->isPartActive[x])
576 {
577 prog += ((utilities->partDetails[x].currFrame * 100) / utilities->partDetails[x].maxFrame);
578 nactivParts++;
579 }
580 }
581
582 prog /= nactivParts;
583 return prog;
584 }
585
586 if (verbose)
587 {
588 yError() << "Dataset not loaded";
589 }
590 return -1.0;
591 }
592
593 /**********************************************************/
594 std::string getStatus() override
595 {
596 return status;
597 }
598
599 /**********************************************************/
600 bool quit() override
601 {
602 if(utilities)
603 {
604 if (verbose)
605 {
606 yInfo() << "asking the threads to stop...";
607 }
608 if (utilities->dataplayerEngine->isSuspended()){
609 utilities->dataplayerEngine->resume();
610
611 }
612 utilities->dataplayerEngine->stop();
613 if (verbose)
614 {
615 yInfo() << "done stopping!";
616 }
617 for (int i = 0; i < subDirCnt; i++) {
618 utilities->partDetails[i].currFrame = 1;
619 }
620
621 if (verbose)
622 {
623 yInfo() << "Module closing...";
624 yInfo() << "Cleaning up...";
625 }
626 for (int x=0; x < subDirCnt; x++)
627 {
628 utilities->partDetails[x].worker->release();
629 }
630 if (verbose)
631 {
632 yInfo() << "Attempt to interrupt ports";
633 }
634 for (int x=0; x < subDirCnt; x++)
635 {
636 utilities->interruptPorts(utilities->partDetails[x]);
637 }
638 if (verbose)
639 {
640 yInfo() << "Attempt to close ports";
641 }
642 for (int x=0; x < subDirCnt; x++)
643 {
644 utilities->closePorts(utilities->partDetails[x]);
645 }
646
647 if(utilities)
648 {
649 delete utilities;
650 utilities = nullptr;
651 }
652 if (verbose)
653 {
654 yInfo() << "Done!...";
655 }
656 }
657 return true;
658 }
659
660};
661
662int main(int argc, char *argv[])
663{
665 if (!yarp.checkNetwork())
666 {
667 fprintf(stderr,"ERROR: check YARP network.\n");
668 return -1;
669 }
670
672 rf.configure(argc,argv);
673
674 dataplayer_module module;
675 return module.runModule(rf);
676}
#define yInfo(...)
Definition Log.h:319
#define yError(...)
Definition Log.h:361
bool open(const std::string &name) override
Start port operation, with a specific name, with automatically-chosen network parameters.
void close() override
Stop port activity.
void interrupt() override
Interrupt any current reads or writes attached to the port.
A mini-server for performing network communication in the background.
void resume() override
Put the port back in an operative state after interrupt() has been called.
void release(void *handle) override
Return control to YARP of an object previously taken control of with the acquire() method.
Utilities for manipulating the YARP network, including initialization and shutdown.
Definition Network.h:706
A mini-server for network communication.
Definition Port.h:46
A base-class for standard YARP modules that supports ResourceFinder.
Definition RFModule.h:20
Helper class for finding config files and other external resources.
bool check(const std::string &key) const override
Check if there exists a property of the given name.
bool configure(int argc, char *argv[], bool skipFirstArgument=true)
Sets up the ResourceFinder.
Value & find(const std::string &key) const override
Gets a value corresponding to a given keyword.
A port that is specialized as an RPC server.
Definition RpcServer.h:23
A single value (typically within a Bottle).
Definition Value.h:43
virtual std::int32_t asInt32() const
Get 32-bit integer value.
Definition Value.cpp:204
yarp::os::WireLink & yarp()
Get YARP state associated with this object.
Definition Wire.h:28
yarpdataplayer_console_IDL Interface.
int main(int argc, char *argv[])
Definition main.cpp:662
#define LOGO_MESSAGE
Definition main.cpp:21
An interface to the operating system, including Port based communication.
@ YARP_CLOCK_SYSTEM
Definition Time.h:28
The main, catch-all namespace for YARP.
Definition dirs.h:16