YARP
Yet Another Robot Platform
MpiP2PStream.cpp
Go to the documentation of this file.
1 /*
2  * SPDX-FileCopyrightText: 2006-2021 Istituto Italiano di Tecnologia (IIT)
3  * SPDX-FileCopyrightText: 2010 Daniel Krieg <krieg@fias.uni-frankfurt.de>
4  * SPDX-License-Identifier: BSD-3-Clause
5  */
6 
7 #include "MpiP2PStream.h"
8 
9 using namespace yarp::os;
10 
11 
13 // InputStream
14 
16  if (readAvail == 0) {
17  // get new data
18  reset();
19  int size;
20  int available = 0;
21  int tag = 0;
22  int rank = comm->rank();
23  MPI_Status status;
24  while (true) {
25  if (terminate) {
26  return -1;
27  }
28  // Check for a message
29  MPI_Iprobe(!rank, tag, comm->comm, &available, &status);
30  if (available) {
31  break;
32  }
33  // Prevent the busy polling which hurts
34  // performance in the oversubscription scenario
35  Time::yield();
36  }
37  MPI_Get_count(&status, MPI_BYTE, &size);
38  if (size == (int)b.length()) {
39  // size of received data matches expected data
40  // do not use buffer, but write directly
41  MPI_Recv(b.get(), size, MPI_BYTE, !rank, tag, comm->comm, &status);
42  return size;
43  }
44  else {
45  // allocate new buffer
46  readBuffer = new char[size];
47  MPI_Recv(readBuffer, size, MPI_BYTE, !rank, tag, comm->comm, &status);
48  yCDebug(MPI_CARRIER, "got new msg of size %d", size);
49  readAvail = size;
50  readAt = 0;
51  }
52  }
53  if (readAvail>0) {
54  // copy data from buffer to destination object
55  int take = readAvail;
56  if (take>(int)b.length()) {
57  take = (int)b.length();
58  }
59  memcpy(b.get(),readBuffer+readAt,take);
60  yCDebug(MPI_CARRIER, "read %d of %d", take, readAvail);
61  readAt += take;
62  readAvail -= take;
63  return take;
64  }
65  return 0;
66 }
67 
69 // OutputStream
70 
71 void MpiP2PStream::write(const Bytes& b) {
72  int size = b.length();
73  //MPI_Bcast(&size, 1, MPI_INT, MPI_ROOT, intercomm );
74  MPI_Request request;
75  MPI_Status status;
76  int flag = 0;
77  int rank = comm->rank();
78  //MPI_Send(b.get(), size, MPI_BYTE, 0, 0, intercomm);
79 
80  MPI_Isend(b.get(), size, MPI_BYTE, !rank , 0, comm->comm, &request );
81  while(true) {
82  /*
83  // TODO: Need to implement a mechanism for breaking!!
84  if (terminate)
85  break;
86  */
87  // Check if message has been received
88  MPI_Test(&request, &flag, &status);
89  if (flag) {
90  break;
91  }
92  // Prevent the busy polling which hurts
93  // performance in the oversubscription scenario
94  Time::yield();
95  }
96 }
const yarp::os::LogComponent & MPI_CARRIER()
ssize_t read(yarp::os::Bytes &b) override=0
Read and return a single byte.
void write(const yarp::os::Bytes &b) override=0
A simple abstraction for a block of bytes.
Definition: Bytes.h:25
size_t length() const
Definition: Bytes.cpp:22
const char * get() const
Definition: Bytes.cpp:27
#define yCDebug(component,...)
Definition: LogComponent.h:109
::ssize_t ssize_t
Definition: numeric.h:86
void yield()
The calling thread releases its remaining quantum upon calling this function.
Definition: Time.cpp:138
An interface to the operating system, including Port based communication.