YARP
Yet Another Robot Platform
MpiBcastCarrier.cpp
Go to the documentation of this file.
1/*
2 * SPDX-FileCopyrightText: 2006-2021 Istituto Italiano di Tecnologia (IIT)
3 * SPDX-FileCopyrightText: 2010 Daniel Krieg <krieg@fias.uni-frankfurt.de>
4 * SPDX-License-Identifier: BSD-3-Clause
5 */
6
7#include "MpiBcastCarrier.h"
8
9#include <yarp/os/Network.h>
10#include <yarp/os/Log.h>
11
12using namespace yarp::os;
13
15 yCTrace(MPI_CARRIER, "[MpiBcastCarrier @ %s] Destructor", name.c_str());
16}
17
19 yCDebug(MPI_CARRIER, "[MpiBcastCarrier @ %s] Closing carrier", name.c_str() );
20 if (electionMember) {
21 getCaster().remove(name, this);
22 MpiBcastCarrier* elect = getCaster().getElect(name);
23 if (elect == nullptr) {
24 delete comm;
25 }
26 } else {
27 delete comm;
28 }
29}
30
32 if (sender) {
33 MpiBcastCarrier* elect = getCaster().getElect(name);
34 if (elect != nullptr) {
35 comm = elect->comm;
36 }
37 else {
38 comm = new MpiComm(name+"->bcast");
39 }
40 stream = new MpiBcastStream(name+"->bcast", comm);
41 auto* mpiStream = dynamic_cast<MpiBcastStream*> (stream);
42 if (mpiStream) {
43 mpiStream->startJoin();
44 }
45 getCaster().add(name, this);
46 electionMember = true;
47 } else {
48 comm = new MpiComm(route);
50 }
51
52}
53
55 comm->sema.wait();
56 yCDebug(MPI_CARRIER, "[MpiBcastCarrier @ %s] Disconnect : %s", name.c_str(), other.c_str());
57 int cmd = CMD_DISCONNECT;
58 MPI_Bcast(&cmd, 1, MPI_INT, 0,comm->comm);
59 int length = other.length() + name.length() + 3;
60 char* remote_c = new char[length];
61 strcpy(remote_c, (other+"<-"+name).c_str());
62 MPI_Bcast(&length, 1, MPI_INT, 0,comm->comm);
63 MPI_Bcast(remote_c, length, MPI_CHAR, 0,comm->comm);
64 delete [] remote_c;
65 comm->disconnect(false);
66 comm->sema.post();
67
68 //dynamic_cast<MpiBcastStream*> (stream)->disconnect(other);
69 }
70
71
72
73/*
74 * Adopted from MCastCarrier
75 * ----------------------------
76 */
77ElectionOf<yarp::os::PeerRecord<MpiBcastCarrier> > *MpiBcastCarrier::caster = nullptr;
78
79ElectionOf<yarp::os::PeerRecord<MpiBcastCarrier> >& MpiBcastCarrier::getCaster() {
81 if (caster==nullptr) {
84 if (caster==nullptr) {
85 yCError(MPI_CARRIER, "No memory for MpiBcastCarrier::caster");
86 std::exit(1);
87 }
88 } else {
90 }
91 return *caster;
92}
94 MpiBcastCarrier *elect = getCaster().getElect(name);
95 return elect==this || elect==nullptr;
96}
97
99 return isElect();
100}
101
102/*
103 * ----------------------------
104 */
#define CMD_DISCONNECT
const yarp::os::LogComponent & MPI_CARRIER()
Carrier for port communicating via MPI broadcast.
virtual ~MpiBcastCarrier()
void close() override
Close the carrier.
void createStream(bool sender) override
virtual bool isElect() const
void prepareDisconnect() override
Do cleanup and preparation for the coming disconnect, if necessary.
bool isActive() const override
Check if carrier is alive and error free.
Implements communication via MPI broadcast.
MpiStream * stream
Definition: MpiCarrier.h:28
MpiComm * comm
Definition: MpiCarrier.h:29
std::string name
Definition: MpiCarrier.h:31
std::string route
Definition: MpiCarrier.h:31
std::string other
Definition: MpiCarrier.h:31
Wrapper for MPI_Comm communicator.
Definition: MpiComm.h:50
yarp::os::Semaphore sema
Definition: MpiComm.h:57
MPI_Comm comm
Definition: MpiComm.h:56
void disconnect(bool disconn)
Definition: MpiComm.cpp:165
Pick one of a set of peers to be "active".
Definition: Election.h:60
PR * add(const std::string &key, typename PR::peer_type *entity)
Definition: Election.h:87
PR::peer_type * getElect(const std::string &key)
Definition: Election.h:108
void remove(const std::string &key, typename PR::peer_type *entity)
Definition: Election.h:98
static void unlock()
Call post() on a global mutual-exclusion semaphore allocated by YARP.
Definition: Network.cpp:1464
static void lock()
Call wait() on a global mutual-exclusion semaphore allocated by YARP.
Definition: Network.cpp:1459
void wait()
Decrement the counter, even if we must wait to do that.
Definition: Semaphore.cpp:96
void post()
Increment the counter.
Definition: Semaphore.cpp:111
#define yCError(component,...)
Definition: LogComponent.h:213
#define yCTrace(component,...)
Definition: LogComponent.h:84
#define yCDebug(component,...)
Definition: LogComponent.h:128
An interface to the operating system, including Port based communication.