YARP
Yet Another Robot Platform
MpiBcastCarrier.cpp
Go to the documentation of this file.
1 /*
2  * Copyright (C) 2006-2020 Istituto Italiano di Tecnologia (IIT)
3  * Copyright (C) 2010 Daniel Krieg <krieg@fias.uni-frankfurt.de>
4  * All rights reserved.
5  *
6  * This software may be modified and distributed under the terms of the
7  * BSD-3-Clause license. See the accompanying LICENSE file for details.
8  */
9 
10 #include "MpiBcastCarrier.h"
11 
12 #include <yarp/os/Network.h>
13 #include <yarp/os/Log.h>
14 
15 using namespace yarp::os;
16 
18  yCTrace(MPI_CARRIER, "[MpiBcastCarrier @ %s] Destructor", name.c_str());
19 }
20 
22  yCDebug(MPI_CARRIER, "[MpiBcastCarrier @ %s] Closing carrier", name.c_str() );
23  if (electionMember) {
24  getCaster().remove(name, this);
25  MpiBcastCarrier* elect = getCaster().getElect(name);
26  if (elect == nullptr) {
27  delete comm;
28  }
29  } else {
30  delete comm;
31  }
32 }
33 
34 void MpiBcastCarrier::createStream(bool sender) {
35  if (sender) {
36  MpiBcastCarrier* elect = getCaster().getElect(name);
37  if (elect != nullptr) {
38  comm = elect->comm;
39  }
40  else {
41  comm = new MpiComm(name+"->bcast");
42  }
43  stream = new MpiBcastStream(name+"->bcast", comm);
44  auto* mpiStream = dynamic_cast<MpiBcastStream*> (stream);
45  if(mpiStream)
46  mpiStream->startJoin();
47  getCaster().add(name, this);
48  electionMember = true;
49  } else {
50  comm = new MpiComm(route);
51  stream = new MpiBcastStream(route, comm);
52  }
53 
54 }
55 
57  comm->sema.wait();
58  yCDebug(MPI_CARRIER, "[MpiBcastCarrier @ %s] Disconnect : %s", name.c_str(), other.c_str());
59  int cmd = CMD_DISCONNECT;
60  MPI_Bcast(&cmd, 1, MPI_INT, 0,comm->comm);
61  int length = other.length() + name.length() + 3;
62  char* remote_c = new char[length];
63  strcpy(remote_c, (other+"<-"+name).c_str());
64  MPI_Bcast(&length, 1, MPI_INT, 0,comm->comm);
65  MPI_Bcast(remote_c, length, MPI_CHAR, 0,comm->comm);
66  delete [] remote_c;
67  comm->disconnect(false);
68  comm->sema.post();
69 
70  //dynamic_cast<MpiBcastStream*> (stream)->disconnect(other);
71  }
72 
73 
74 
75 /*
76  * Adopted from MCastCarrier
77  * ----------------------------
78  */
79 ElectionOf<yarp::os::PeerRecord<MpiBcastCarrier> > *MpiBcastCarrier::caster = nullptr;
80 
81 ElectionOf<yarp::os::PeerRecord<MpiBcastCarrier> >& MpiBcastCarrier::getCaster() {
83  if (caster==nullptr) {
86  if (caster==nullptr) {
87  yCError(MPI_CARRIER, "No memory for MpiBcastCarrier::caster");
88  std::exit(1);
89  }
90  } else {
92  }
93  return *caster;
94 }
96  MpiBcastCarrier *elect = getCaster().getElect(name);
97  return elect==this || elect==nullptr;
98 }
99 
101  return isElect();
102 }
103 
104 /*
105  * ----------------------------
106  */
#define CMD_DISCONNECT
const yarp::os::LogComponent & MPI_CARRIER()
Carrier for port communicating via MPI broadcast.
virtual ~MpiBcastCarrier()
void close() override
Close the carrier.
void createStream(bool sender) override
virtual bool isElect() const
void prepareDisconnect() override
Do cleanup and preparation for the coming disconnect, if necessary.
bool isActive() const override
Check if carrier is alive and error free.
Implements communication via MPI broadcast.
MpiComm * comm
Definition: MpiCarrier.h:32
Wrapper for MPI_Comm communicator.
Definition: MpiComm.h:53
Pick one of a set of peers to be "active".
Definition: Election.h:64
static void unlock()
Call post() on a global mutual-exclusion semaphore allocated by YARP.
Definition: Network.cpp:1467
static void lock()
Call wait() on a global mutual-exclusion semaphore allocated by YARP.
Definition: Network.cpp:1462
#define yCError(component,...)
Definition: LogComponent.h:157
#define yCTrace(component,...)
Definition: LogComponent.h:88
#define yCDebug(component,...)
Definition: LogComponent.h:112
An interface to the operating system, including Port based communication.