YARP
Yet Another Robot Platform
MpiBcastCarrier.cpp
Go to the documentation of this file.
1 /*
2  * SPDX-FileCopyrightText: 2006-2021 Istituto Italiano di Tecnologia (IIT)
3  * SPDX-FileCopyrightText: 2010 Daniel Krieg <krieg@fias.uni-frankfurt.de>
4  * SPDX-License-Identifier: BSD-3-Clause
5  */
6 
7 #include "MpiBcastCarrier.h"
8 
9 #include <yarp/os/Network.h>
10 #include <yarp/os/Log.h>
11 
12 using namespace yarp::os;
13 
15  yCTrace(MPI_CARRIER, "[MpiBcastCarrier @ %s] Destructor", name.c_str());
16 }
17 
19  yCDebug(MPI_CARRIER, "[MpiBcastCarrier @ %s] Closing carrier", name.c_str() );
20  if (electionMember) {
21  getCaster().remove(name, this);
22  MpiBcastCarrier* elect = getCaster().getElect(name);
23  if (elect == nullptr) {
24  delete comm;
25  }
26  } else {
27  delete comm;
28  }
29 }
30 
31 void MpiBcastCarrier::createStream(bool sender) {
32  if (sender) {
33  MpiBcastCarrier* elect = getCaster().getElect(name);
34  if (elect != nullptr) {
35  comm = elect->comm;
36  }
37  else {
38  comm = new MpiComm(name+"->bcast");
39  }
40  stream = new MpiBcastStream(name+"->bcast", comm);
41  auto* mpiStream = dynamic_cast<MpiBcastStream*> (stream);
42  if (mpiStream) {
43  mpiStream->startJoin();
44  }
45  getCaster().add(name, this);
46  electionMember = true;
47  } else {
48  comm = new MpiComm(route);
49  stream = new MpiBcastStream(route, comm);
50  }
51 
52 }
53 
55  comm->sema.wait();
56  yCDebug(MPI_CARRIER, "[MpiBcastCarrier @ %s] Disconnect : %s", name.c_str(), other.c_str());
57  int cmd = CMD_DISCONNECT;
58  MPI_Bcast(&cmd, 1, MPI_INT, 0,comm->comm);
59  int length = other.length() + name.length() + 3;
60  char* remote_c = new char[length];
61  strcpy(remote_c, (other+"<-"+name).c_str());
62  MPI_Bcast(&length, 1, MPI_INT, 0,comm->comm);
63  MPI_Bcast(remote_c, length, MPI_CHAR, 0,comm->comm);
64  delete [] remote_c;
65  comm->disconnect(false);
66  comm->sema.post();
67 
68  //dynamic_cast<MpiBcastStream*> (stream)->disconnect(other);
69  }
70 
71 
72 
73 /*
74  * Adopted from MCastCarrier
75  * ----------------------------
76  */
77 ElectionOf<yarp::os::PeerRecord<MpiBcastCarrier> > *MpiBcastCarrier::caster = nullptr;
78 
79 ElectionOf<yarp::os::PeerRecord<MpiBcastCarrier> >& MpiBcastCarrier::getCaster() {
81  if (caster==nullptr) {
84  if (caster==nullptr) {
85  yCError(MPI_CARRIER, "No memory for MpiBcastCarrier::caster");
86  std::exit(1);
87  }
88  } else {
90  }
91  return *caster;
92 }
94  MpiBcastCarrier *elect = getCaster().getElect(name);
95  return elect==this || elect==nullptr;
96 }
97 
99  return isElect();
100 }
101 
102 /*
103  * ----------------------------
104  */
#define CMD_DISCONNECT
const yarp::os::LogComponent & MPI_CARRIER()
Carrier for port communicating via MPI broadcast.
virtual ~MpiBcastCarrier()
void close() override
Close the carrier.
void createStream(bool sender) override
virtual bool isElect() const
void prepareDisconnect() override
Do cleanup and preparation for the coming disconnect, if necessary.
bool isActive() const override
Check if carrier is alive and error free.
Implements communication via MPI broadcast.
MpiComm * comm
Definition: MpiCarrier.h:29
Wrapper for MPI_Comm communicator.
Definition: MpiComm.h:50
Pick one of a set of peers to be "active".
Definition: Election.h:61
static void unlock()
Call post() on a global mutual-exclusion semaphore allocated by YARP.
Definition: Network.cpp:1464
static void lock()
Call wait() on a global mutual-exclusion semaphore allocated by YARP.
Definition: Network.cpp:1459
#define yCError(component,...)
Definition: LogComponent.h:154
#define yCTrace(component,...)
Definition: LogComponent.h:85
#define yCDebug(component,...)
Definition: LogComponent.h:109
An interface to the operating system, including Port based communication.