YARP
Yet Another Robot Platform
 
Loading...
Searching...
No Matches
MpiBcastStream.cpp
Go to the documentation of this file.
1/*
2 * SPDX-FileCopyrightText: 2006-2021 Istituto Italiano di Tecnologia (IIT)
3 * SPDX-FileCopyrightText: 2010 Daniel Krieg <krieg@fias.uni-frankfurt.de>
4 * SPDX-License-Identifier: BSD-3-Clause
5 */
6
7#include "MpiBcastStream.h"
8
9using namespace yarp::os;
10
11
13 comm->sema.wait();
14 int cmd = CMD_JOIN;
15 MPI_Bcast(&cmd, 1, MPI_INT, 0,comm->comm);
16}
17
18
19
20// Connection commands
21
23 switch (cmd) {
24 case CMD_JOIN:
25 // Connect:
26 // Let a new port join the broadcast group
27 comm->accept();
28 break;
29 case CMD_DISCONNECT:
30 // Disconnect:
31 // Let a port leave the broadcast group
32 int length;
33 MPI_Bcast(&length, 1, MPI_INT, 0,comm->comm);
34 char* remote = new char[length];
35 MPI_Bcast(remote, length, MPI_CHAR, 0,comm->comm);
36 terminate = !strcmp(remote, name.c_str());
37 yCDebug(MPI_CARRIER, "[MpiBcastStream @ %s] Got disconnect : %s => %d", name.c_str(), remote, terminate);
38 delete [] remote;
40 break;
41 }
42}
43
44
45
47// InputStream
48
50 if (terminate) {
51 return -1;
52 }
53 if (readAvail == 0) {
54 // get new data
55 reset();
56 int size;
57 yCDebug(MPI_CARRIER, "[MpiBcastStream @ %s] Trying to read", name.c_str());
58
59 MPI_Bcast(&size, 1, MPI_INT, 0,comm->comm);
60 yCDebug(MPI_CARRIER, "[MpiBcastStream @ %s] got size %d", name.c_str(), size);
61 if (size < 0) {
62 execCmd(size);
63 return 0;
64 }
65 if ((size_t)size == b.length()) {
66 // size of received data matches expected data
67 // do not use buffer, but write directly
68 MPI_Bcast(b.get(), size, MPI_BYTE, 0, comm->comm);
69 return size;
70 }
71 else {
72 // allocate new buffer
73 readBuffer = new char[size];
75 yCDebug(MPI_CARRIER, "got new msg of size %d", size);
76 readAvail = size;
77 readAt = 0;
78 }
79 }
80 if (readAvail>0) {
81 // copy data from buffer to destination object
82 int take = readAvail;
83 if (take>(int)b.length()) {
84 take = b.length();
85 }
86 memcpy(b.get(),readBuffer+readAt,take);
87 yCDebug(MPI_CARRIER, "read %d of %d", take, readAvail);
88 readAt += take;
89 readAvail -= take;
90 return take;
91 }
92 return 0;
93}
94
96// OutputStream
97
99 yCDebug(MPI_CARRIER, "[MpiBcastStream @ %s] getting sema for write", name.c_str());
100 comm->sema.wait();
101
102 yCDebug(MPI_CARRIER, "[MpiBcastStream @ %s] trying to write", name.c_str());
103 int size = b.length();
104 MPI_Bcast(&size, 1, MPI_INT, 0, comm->comm );
105 MPI_Bcast((void*)b.get(), size, MPI_BYTE, 0, comm->comm );
106 comm->sema.post();
107
108 yCDebug(MPI_CARRIER, "[MpiBcastStream @ %s] done writing", name.c_str());
109}
#define CMD_DISCONNECT
#define CMD_JOIN
const yarp::os::LogComponent & MPI_CARRIER()
void write(const yarp::os::Bytes &b) override
Write a block of bytes to the stream.
void execCmd(int cmd)
bool accept()
Definition MpiComm.cpp:143
yarp::os::Semaphore sema
Definition MpiComm.h:57
MPI_Comm comm
Definition MpiComm.h:56
void disconnect(bool disconn)
Definition MpiComm.cpp:165
MpiComm * comm
Definition MpiStream.h:35
int readAvail
Definition MpiStream.h:31
char * readBuffer
Definition MpiStream.h:32
std::string name
Definition MpiStream.h:34
bool terminate
Definition MpiStream.h:33
void reset() override
Reset the stream.
Definition MpiStream.h:56
yarp::os::Contact remote
Definition MpiStream.h:38
int readAt
Definition MpiStream.h:31
A mini-server for performing network communication in the background.
A simple abstraction for a block of bytes.
Definition Bytes.h:24
size_t length() const
Definition Bytes.cpp:22
const char * get() const
Definition Bytes.cpp:27
virtual int read()
Read and return a single byte.
void wait()
Decrement the counter, even if we must wait to do that.
Definition Semaphore.cpp:96
void post()
Increment the counter.
#define yCDebug(component,...)
An interface to the operating system, including Port based communication.