YARP
Yet Another Robot Platform
 
Loading...
Searching...
No Matches
MpiComm.cpp
Go to the documentation of this file.
1/*
2 * SPDX-FileCopyrightText: 2006-2021 Istituto Italiano di Tecnologia (IIT)
3 * SPDX-FileCopyrightText: 2010 Daniel Krieg <krieg@fias.uni-frankfurt.de>
4 * SPDX-License-Identifier: BSD-3-Clause
5 */
6
7#include "MpiComm.h"
8
9#include <yarp/os/Log.h>
10#include <yarp/os/NetType.h>
11#include <mpi.h>
12
13#include <cstdlib>
14#include <utility>
15#include <unistd.h>
16
17using namespace yarp::os;
18
19/* --------------------------------------- */
20/* MpiControlThread */
21
23
25 if (MpiControl) {
27 delete MpiControl;
28 MpiControl = nullptr;
29 }
30 int ct = 0;
31 int finalized;
32 while (ct < 5) {
33 sleep(1);
35 if (finalized) {
36 return;
37 }
38 ct++;
39 }
40 yCError(MPI_CARRIER, "MpiControlThread: Finalizing MPI failed! Calling MPI_Abort");
42}
43
45 yCInfo(MPI_CARRIER, "MpiControlThread: Trying to finalize MPI...");
47 yCInfo(MPI_CARRIER,"MpiControlThread: Successfully finalized MPI...");
48}
49
51 // We have to finalize MPI at process termination
53
54 yCDebug(MPI_CARRIER,"[MpiControl] Initialize");
55
56 int provided;
57 // We need full multithread support for MPI
59 // Passing NULL for argc/argv pointers is fine for MPI-2
60 int err = MPI_Init_thread(nullptr, nullptr, requested , &provided);
61 if (err != MPI_SUCCESS ) {
62 yCError(MPI_CARRIER, "MpiControlThread: Couldn't initialize MPI");
63 return false;
64 }
65
66 if (provided >= requested) {
67 return true;
68 }
69 else {
71 yCError(MPI_CARRIER, "MpiControlThread: MPI implementation doesn't provide required thread safety: requested %s, provided %s", yarp::conf::numeric::to_string(requested).c_str(), yarp::conf::numeric::to_string(provided).c_str());
72 return false;
73 }
74}
75
76
77/* --------------------------------------- */
78/* MpiComm */
79
80MpiComm::MpiComm(std::string name) :
81 name(std::move(name))
82{
83 if (MpiControl == nullptr) {
85 }
86 if (! MpiControl->isRunning()) {
88 }
89
90 // Complicated way of doing comm = MPI_COMM_SELF;
91 // but safer
95
96
97 // Create a unique identifier to prevent intra-process use of MPI
98 int length = 0;
100 sprintf(unique_id + length, "____pid____%d", getpid());
101 yCDebug(MPI_CARRIER, "[MpiComm @ %s] Unique id: %s", name.c_str(), unique_id);
102}
103
104//TODO: replace by static variable check??!?
105bool MpiComm::notLocal(std::string other) {
106 if (other == std::string(unique_id)) {
107 yCError(MPI_CARRIER, "MPI does not support process local communication");
108 return false;
109 }
110 return true;
111}
112
113bool MpiComm::connect(std::string port) {
114
115 char* port_name = new char[port.length()+1];
116 memcpy(port_name, port.c_str(), port.length());
117 port_name[port.length()] = '\0';
118
119 yCDebug(MPI_CARRIER, "[MpiComm @ %s] Waiting for accept", name.c_str());
120
125
126 if (err != MPI_SUCCESS ) {
127 yCError(MPI_CARRIER, "MpiCarrier: Couldn't create connection");
128 return false;
129 }
130
131 yCDebug(MPI_CARRIER, "[MpiComm @ %s] Connection established", name.c_str());
132
133 bool high = true;
136
137 yCDebug(MPI_CARRIER, "[MpiComm @ %s] Comms merged", name.c_str());
138
139 delete[] port_name;
140
141 return true;
142}
144 yCDebug(MPI_CARRIER, "[MpiComm @ %s] Waiting for connect", name.c_str());
145
148
149 yCDebug(MPI_CARRIER, "[MpiComm @ %s] Connection accepted", name.c_str());
150
151 bool high = false;
152 // Complicated way of doing comm = Merge(intercomm)
153 // but necessary
157 comm = newintra;
158
159 yCDebug(MPI_CARRIER, "[MpiComm @ %s] Comms merged", name.c_str());
160
161 return true;
162}
163
164
166 yCDebug(MPI_CARRIER, "[MpiComm @ %s] split from group : %d", name.c_str(), disconn);
170 comm = new_comm;
171 yCDebug(MPI_CARRIER, "[MpiComm @ %s] new rank : %d", name.c_str(), rank());
172}
MpiControlThread * MpiControl
Definition MpiComm.cpp:22
void finalizeMPI()
Definition MpiComm.cpp:24
MpiControlThread * MpiControl
Definition MpiComm.cpp:22
const yarp::os::LogComponent & MPI_CARRIER()
bool accept()
Definition MpiComm.cpp:143
bool connect(std::string port)
Definition MpiComm.cpp:113
int rank()
Definition MpiComm.h:76
MPI_Comm comm
Definition MpiComm.h:56
bool notLocal(std::string other)
Definition MpiComm.cpp:105
char unique_id[10+MPI_MAX_PROCESSOR_NAME]
Definition MpiComm.h:55
void disconnect(bool disconn)
Definition MpiComm.cpp:165
MpiComm(std::string name)
Definition MpiComm.cpp:80
char port_name[MPI_MAX_PORT_NAME]
Definition MpiComm.h:54
void finalize()
Definition MpiComm.h:28
bool threadInit() override
Initialization method.
Definition MpiComm.cpp:50
void threadRelease() override
Release method.
Definition MpiComm.cpp:44
A mini-server for performing network communication in the background.
bool isRunning()
Returns true if the thread is running (Thread::start has been called successfully and the thread has ...
Definition Thread.cpp:105
bool start()
Start the new thread running.
Definition Thread.cpp:93
#define yCInfo(component,...)
#define yCError(component,...)
#define yCDebug(component,...)
STL namespace.
std::string to_string(IntegerType x)
Definition numeric.h:115
An interface to the operating system, including Port based communication.
int getpid()
Portable wrapper for the getppid() function.
Definition Os.cpp:84