YARP
Yet Another Robot Platform
MpiComm.cpp
Go to the documentation of this file.
1 /*
2  * SPDX-FileCopyrightText: 2006-2021 Istituto Italiano di Tecnologia (IIT)
3  * SPDX-FileCopyrightText: 2010 Daniel Krieg <krieg@fias.uni-frankfurt.de>
4  * SPDX-License-Identifier: BSD-3-Clause
5  */
6 
7 #include "MpiComm.h"
8 
9 #include <yarp/os/Log.h>
10 #include <yarp/os/NetType.h>
11 #include <mpi.h>
12 
13 #include <cstdlib>
14 #include <utility>
15 #include <unistd.h>
16 
17 using namespace yarp::os;
18 
19 /* --------------------------------------- */
20 /* MpiControlThread */
21 
23 
24 void finalizeMPI() {
25  if (MpiControl) {
27  delete MpiControl;
28  MpiControl = nullptr;
29  }
30  int ct = 0;
31  int finalized;
32  while (ct < 5) {
33  sleep(1);
34  MPI_Finalized(&finalized);
35  if (finalized) {
36  return;
37  }
38  ct++;
39  }
40  yCError(MPI_CARRIER, "MpiControlThread: Finalizing MPI failed! Calling MPI_Abort");
41  MPI_Abort(MPI_COMM_WORLD,1);
42 }
43 
45  yCInfo(MPI_CARRIER, "MpiControlThread: Trying to finalize MPI...");
46  MPI_Finalize();
47  yCInfo(MPI_CARRIER,"MpiControlThread: Successfully finalized MPI...");
48 }
49 
51  // We have to finalize MPI at process termination
52  atexit(finalizeMPI);
53 
54  yCDebug(MPI_CARRIER,"[MpiControl] Initialize");
55 
56  int provided;
57  // We need full multithread support for MPI
58  int requested = MPI_THREAD_MULTIPLE;
59  // Passing NULL for argc/argv pointers is fine for MPI-2
60  int err = MPI_Init_thread(nullptr, nullptr, requested , &provided);
61  if (err != MPI_SUCCESS ) {
62  yCError(MPI_CARRIER, "MpiControlThread: Couldn't initialize MPI");
63  return false;
64  }
65 
66  if (provided >= requested) {
67  return true;
68  }
69  else {
70  MPI_Finalize();
71  yCError(MPI_CARRIER, "MpiControlThread: MPI implementation doesn't provide required thread safety: requested %s, provided %s", yarp::conf::numeric::to_string(requested).c_str(), yarp::conf::numeric::to_string(provided).c_str());
72  return false;
73  }
74 }
75 
76 
77 /* --------------------------------------- */
78 /* MpiComm */
79 
80 MpiComm::MpiComm(std::string name) :
81  name(std::move(name))
82 {
83  if (MpiControl == nullptr) {
85  }
86  if (! MpiControl->isRunning()) {
87  MpiControl->start();
88  }
89 
90  // Complicated way of doing comm = MPI_COMM_SELF;
91  // but safer
92  MPI_Group self_group;
93  MPI_Comm_group( MPI_COMM_SELF, &self_group );
94  MPI_Comm_create( MPI_COMM_SELF, self_group, &comm );
95 
96 
97  // Create a unique identifier to prevent intra-process use of MPI
98  int length = 0;
99  MPI_Get_processor_name(unique_id, &length);
100  sprintf(unique_id + length, "____pid____%d", getpid());
101  yCDebug(MPI_CARRIER, "[MpiComm @ %s] Unique id: %s", name.c_str(), unique_id);
102 }
103 
104 //TODO: replace by static variable check??!?
105 bool MpiComm::notLocal(std::string other) {
106  if (other == std::string(unique_id)) {
107  yCError(MPI_CARRIER, "MPI does not support process local communication");
108  return false;
109  }
110  return true;
111 }
112 
113 bool MpiComm::connect(std::string port) {
114 
115  char* port_name = new char[port.length()+1];
116  memcpy(port_name, port.c_str(), port.length());
117  port_name[port.length()] = '\0';
118 
119  yCDebug(MPI_CARRIER, "[MpiComm @ %s] Waiting for accept", name.c_str());
120 
121  MPI_Comm intercomm;
122  MPI_Comm_set_errhandler(comm, MPI_ERRORS_RETURN);
123  int err = MPI_Comm_connect( port_name, MPI_INFO_NULL, 0, comm, &intercomm );
124  MPI_Comm_set_errhandler(comm, MPI_ERRORS_ARE_FATAL);
125 
126  if (err != MPI_SUCCESS ) {
127  yCError(MPI_CARRIER, "MpiCarrier: Couldn't create connection");
128  return false;
129  }
130 
131  yCDebug(MPI_CARRIER, "[MpiComm @ %s] Connection established", name.c_str());
132 
133  bool high = true;
134  MPI_Intercomm_merge(intercomm, high, &comm);
135  MPI_Comm_disconnect(&intercomm);
136 
137  yCDebug(MPI_CARRIER, "[MpiComm @ %s] Comms merged", name.c_str());
138 
139  delete[] port_name;
140 
141  return true;
142 }
144  yCDebug(MPI_CARRIER, "[MpiComm @ %s] Waiting for connect", name.c_str());
145 
146  MPI_Comm intercomm, newintra;
147  MPI_Comm_accept( port_name, MPI_INFO_NULL, 0, comm, &intercomm );
148 
149  yCDebug(MPI_CARRIER, "[MpiComm @ %s] Connection accepted", name.c_str());
150 
151  bool high = false;
152  // Complicated way of doing comm = Merge(intercomm)
153  // but necessary
154  MPI_Intercomm_merge(intercomm, high, &newintra);
155  MPI_Comm_disconnect(&intercomm);
156  MPI_Comm_disconnect(&comm);
157  comm = newintra;
158 
159  yCDebug(MPI_CARRIER, "[MpiComm @ %s] Comms merged", name.c_str());
160 
161  return true;
162 }
163 
164 
165 void MpiComm::disconnect(bool disconn) {
166  yCDebug(MPI_CARRIER, "[MpiComm @ %s] split from group : %d", name.c_str(), disconn);
167  MPI_Comm new_comm;
168  MPI_Comm_split(comm, disconn, rank(), &new_comm);
169  MPI_Comm_disconnect(&comm);
170  comm = new_comm;
171  yCDebug(MPI_CARRIER, "[MpiComm @ %s] new rank : %d", name.c_str(), rank());
172 }
MpiControlThread * MpiControl
Definition: MpiComm.cpp:22
void finalizeMPI()
Definition: MpiComm.cpp:24
const yarp::os::LogComponent & MPI_CARRIER()
bool accept()
Definition: MpiComm.cpp:143
bool connect(std::string port)
Definition: MpiComm.cpp:113
int rank()
Definition: MpiComm.h:76
MPI_Comm comm
Definition: MpiComm.h:56
bool notLocal(std::string other)
Definition: MpiComm.cpp:105
char unique_id[10+MPI_MAX_PROCESSOR_NAME]
Definition: MpiComm.h:55
void disconnect(bool disconn)
Definition: MpiComm.cpp:165
MpiComm(std::string name)
Definition: MpiComm.cpp:80
char port_name[MPI_MAX_PORT_NAME]
Definition: MpiComm.h:54
void finalize()
Definition: MpiComm.h:28
bool threadInit() override
Initialization method.
Definition: MpiComm.cpp:50
void threadRelease() override
Release method.
Definition: MpiComm.cpp:44
bool isRunning()
Returns true if the thread is running (Thread::start has been called successfully and the thread has ...
Definition: Thread.cpp:105
bool start()
Start the new thread running.
Definition: Thread.cpp:93
#define yCInfo(component,...)
Definition: LogComponent.h:132
#define yCError(component,...)
Definition: LogComponent.h:154
#define yCDebug(component,...)
Definition: LogComponent.h:109
std::string to_string(IntegerType x)
Definition: numeric.h:115
An interface to the operating system, including Port based communication.
int getpid()
Portable wrapper for the getppid() function.
Definition: Os.cpp:91