YARP
Yet Another Robot Platform
MpiComm.cpp
Go to the documentation of this file.
1/*
2 * SPDX-FileCopyrightText: 2006-2021 Istituto Italiano di Tecnologia (IIT)
3 * SPDX-FileCopyrightText: 2010 Daniel Krieg <krieg@fias.uni-frankfurt.de>
4 * SPDX-License-Identifier: BSD-3-Clause
5 */
6
7#include "MpiComm.h"
8
9#include <yarp/os/Log.h>
10#include <yarp/os/NetType.h>
11#include <mpi.h>
12
13#include <cstdlib>
14#include <utility>
15#include <unistd.h>
16
17using namespace yarp::os;
18
19/* --------------------------------------- */
20/* MpiControlThread */
21
23
25 if (MpiControl) {
27 delete MpiControl;
28 MpiControl = nullptr;
29 }
30 int ct = 0;
31 int finalized;
32 while (ct < 5) {
33 sleep(1);
34 MPI_Finalized(&finalized);
35 if (finalized) {
36 return;
37 }
38 ct++;
39 }
40 yCError(MPI_CARRIER, "MpiControlThread: Finalizing MPI failed! Calling MPI_Abort");
41 MPI_Abort(MPI_COMM_WORLD,1);
42}
43
45 yCInfo(MPI_CARRIER, "MpiControlThread: Trying to finalize MPI...");
46 MPI_Finalize();
47 yCInfo(MPI_CARRIER,"MpiControlThread: Successfully finalized MPI...");
48}
49
51 // We have to finalize MPI at process termination
52 atexit(finalizeMPI);
53
54 yCDebug(MPI_CARRIER,"[MpiControl] Initialize");
55
56 int provided;
57 // We need full multithread support for MPI
58 int requested = MPI_THREAD_MULTIPLE;
59 // Passing NULL for argc/argv pointers is fine for MPI-2
60 int err = MPI_Init_thread(nullptr, nullptr, requested , &provided);
61 if (err != MPI_SUCCESS ) {
62 yCError(MPI_CARRIER, "MpiControlThread: Couldn't initialize MPI");
63 return false;
64 }
65
66 if (provided >= requested) {
67 return true;
68 }
69 else {
70 MPI_Finalize();
71 yCError(MPI_CARRIER, "MpiControlThread: MPI implementation doesn't provide required thread safety: requested %s, provided %s", yarp::conf::numeric::to_string(requested).c_str(), yarp::conf::numeric::to_string(provided).c_str());
72 return false;
73 }
74}
75
76
77/* --------------------------------------- */
78/* MpiComm */
79
80MpiComm::MpiComm(std::string name) :
81 name(std::move(name))
82{
83 if (MpiControl == nullptr) {
85 }
86 if (! MpiControl->isRunning()) {
88 }
89
90 // Complicated way of doing comm = MPI_COMM_SELF;
91 // but safer
92 MPI_Group self_group;
93 MPI_Comm_group( MPI_COMM_SELF, &self_group );
94 MPI_Comm_create( MPI_COMM_SELF, self_group, &comm );
95
96
97 // Create a unique identifier to prevent intra-process use of MPI
98 int length = 0;
99 MPI_Get_processor_name(unique_id, &length);
100 sprintf(unique_id + length, "____pid____%d", getpid());
101 yCDebug(MPI_CARRIER, "[MpiComm @ %s] Unique id: %s", name.c_str(), unique_id);
102}
103
104//TODO: replace by static variable check??!?
105bool MpiComm::notLocal(std::string other) {
106 if (other == std::string(unique_id)) {
107 yCError(MPI_CARRIER, "MPI does not support process local communication");
108 return false;
109 }
110 return true;
111}
112
113bool MpiComm::connect(std::string port) {
114
115 char* port_name = new char[port.length()+1];
116 memcpy(port_name, port.c_str(), port.length());
117 port_name[port.length()] = '\0';
118
119 yCDebug(MPI_CARRIER, "[MpiComm @ %s] Waiting for accept", name.c_str());
120
121 MPI_Comm intercomm;
122 MPI_Comm_set_errhandler(comm, MPI_ERRORS_RETURN);
123 int err = MPI_Comm_connect( port_name, MPI_INFO_NULL, 0, comm, &intercomm );
124 MPI_Comm_set_errhandler(comm, MPI_ERRORS_ARE_FATAL);
125
126 if (err != MPI_SUCCESS ) {
127 yCError(MPI_CARRIER, "MpiCarrier: Couldn't create connection");
128 return false;
129 }
130
131 yCDebug(MPI_CARRIER, "[MpiComm @ %s] Connection established", name.c_str());
132
133 bool high = true;
134 MPI_Intercomm_merge(intercomm, high, &comm);
135 MPI_Comm_disconnect(&intercomm);
136
137 yCDebug(MPI_CARRIER, "[MpiComm @ %s] Comms merged", name.c_str());
138
139 delete[] port_name;
140
141 return true;
142}
144 yCDebug(MPI_CARRIER, "[MpiComm @ %s] Waiting for connect", name.c_str());
145
146 MPI_Comm intercomm, newintra;
147 MPI_Comm_accept( port_name, MPI_INFO_NULL, 0, comm, &intercomm );
148
149 yCDebug(MPI_CARRIER, "[MpiComm @ %s] Connection accepted", name.c_str());
150
151 bool high = false;
152 // Complicated way of doing comm = Merge(intercomm)
153 // but necessary
154 MPI_Intercomm_merge(intercomm, high, &newintra);
155 MPI_Comm_disconnect(&intercomm);
156 MPI_Comm_disconnect(&comm);
157 comm = newintra;
158
159 yCDebug(MPI_CARRIER, "[MpiComm @ %s] Comms merged", name.c_str());
160
161 return true;
162}
163
164
165void MpiComm::disconnect(bool disconn) {
166 yCDebug(MPI_CARRIER, "[MpiComm @ %s] split from group : %d", name.c_str(), disconn);
167 MPI_Comm new_comm;
168 MPI_Comm_split(comm, disconn, rank(), &new_comm);
169 MPI_Comm_disconnect(&comm);
170 comm = new_comm;
171 yCDebug(MPI_CARRIER, "[MpiComm @ %s] new rank : %d", name.c_str(), rank());
172}
MpiControlThread * MpiControl
Definition: MpiComm.cpp:22
void finalizeMPI()
Definition: MpiComm.cpp:24
const yarp::os::LogComponent & MPI_CARRIER()
bool accept()
Definition: MpiComm.cpp:143
bool connect(std::string port)
Definition: MpiComm.cpp:113
int rank()
Definition: MpiComm.h:76
MPI_Comm comm
Definition: MpiComm.h:56
bool notLocal(std::string other)
Definition: MpiComm.cpp:105
char unique_id[10+MPI_MAX_PROCESSOR_NAME]
Definition: MpiComm.h:55
void disconnect(bool disconn)
Definition: MpiComm.cpp:165
MpiComm(std::string name)
Definition: MpiComm.cpp:80
char port_name[MPI_MAX_PORT_NAME]
Definition: MpiComm.h:54
void finalize()
Definition: MpiComm.h:28
bool threadInit() override
Initialization method.
Definition: MpiComm.cpp:50
void threadRelease() override
Release method.
Definition: MpiComm.cpp:44
bool isRunning()
Returns true if the thread is running (Thread::start has been called successfully and the thread has ...
Definition: Thread.cpp:105
bool start()
Start the new thread running.
Definition: Thread.cpp:93
#define yCInfo(component,...)
Definition: LogComponent.h:171
#define yCError(component,...)
Definition: LogComponent.h:213
#define yCDebug(component,...)
Definition: LogComponent.h:128
STL namespace.
std::string to_string(IntegerType x)
Definition: numeric.h:115
An interface to the operating system, including Port based communication.
int getpid()
Portable wrapper for the getppid() function.
Definition: Os.cpp:91