YARP
Yet Another Robot Platform
 
Loading...
Searching...
No Matches
MpiP2PStream.cpp
Go to the documentation of this file.
1/*
2 * SPDX-FileCopyrightText: 2006-2021 Istituto Italiano di Tecnologia (IIT)
3 * SPDX-FileCopyrightText: 2010 Daniel Krieg <krieg@fias.uni-frankfurt.de>
4 * SPDX-License-Identifier: BSD-3-Clause
5 */
6
7#include "MpiP2PStream.h"
8
9using namespace yarp::os;
10
11
13// InputStream
14
16 if (readAvail == 0) {
17 // get new data
18 reset();
19 int size;
20 int available = 0;
21 int tag = 0;
22 int rank = comm->rank();
23 MPI_Status status;
24 while (true) {
25 if (terminate) {
26 return -1;
27 }
28 // Check for a message
29 MPI_Iprobe(!rank, tag, comm->comm, &available, &status);
30 if (available) {
31 break;
32 }
33 // Prevent the busy polling which hurts
34 // performance in the oversubscription scenario
36 }
37 MPI_Get_count(&status, MPI_BYTE, &size);
38 if (size == (int)b.length()) {
39 // size of received data matches expected data
40 // do not use buffer, but write directly
41 MPI_Recv(b.get(), size, MPI_BYTE, !rank, tag, comm->comm, &status);
42 return size;
43 }
44 else {
45 // allocate new buffer
46 readBuffer = new char[size];
47 MPI_Recv(readBuffer, size, MPI_BYTE, !rank, tag, comm->comm, &status);
48 yCDebug(MPI_CARRIER, "got new msg of size %d", size);
49 readAvail = size;
50 readAt = 0;
51 }
52 }
53 if (readAvail>0) {
54 // copy data from buffer to destination object
55 int take = readAvail;
56 if (take>(int)b.length()) {
57 take = (int)b.length();
58 }
59 memcpy(b.get(),readBuffer+readAt,take);
60 yCDebug(MPI_CARRIER, "read %d of %d", take, readAvail);
61 readAt += take;
62 readAvail -= take;
63 return take;
64 }
65 return 0;
66}
67
69// OutputStream
70
71void MpiP2PStream::write(const Bytes& b) {
72 int size = b.length();
73 //MPI_Bcast(&size, 1, MPI_INT, MPI_ROOT, intercomm );
74 MPI_Request request;
75 MPI_Status status;
76 int flag = 0;
77 int rank = comm->rank();
78 //MPI_Send(b.get(), size, MPI_BYTE, 0, 0, intercomm);
79
80 MPI_Isend(b.get(), size, MPI_BYTE, !rank , 0, comm->comm, &request );
81 while(true) {
82 /*
83 // TODO: Need to implement a mechanism for breaking!!
84 if (terminate)
85 break;
86 */
87 // Check if message has been received
88 MPI_Test(&request, &flag, &status);
89 if (flag) {
90 break;
91 }
92 // Prevent the busy polling which hurts
93 // performance in the oversubscription scenario
95 }
96}
const yarp::os::LogComponent & MPI_CARRIER()
int rank()
Definition MpiComm.h:76
MPI_Comm comm
Definition MpiComm.h:56
void write(const yarp::os::Bytes &b) override
Write a block of bytes to the stream.
MpiComm * comm
Definition MpiStream.h:35
int readAvail
Definition MpiStream.h:31
char * readBuffer
Definition MpiStream.h:32
bool terminate
Definition MpiStream.h:33
void reset() override
Reset the stream.
Definition MpiStream.h:56
int readAt
Definition MpiStream.h:31
A mini-server for performing network communication in the background.
A simple abstraction for a block of bytes.
Definition Bytes.h:24
size_t length() const
Definition Bytes.cpp:22
const char * get() const
Definition Bytes.cpp:27
virtual int read()
Read and return a single byte.
#define yCDebug(component,...)
void yield()
The calling thread releases its remaining quantum upon calling this function.
Definition Time.cpp:131
An interface to the operating system, including Port based communication.