YARP
Yet Another Robot Platform
MpiBcastStream.cpp
Go to the documentation of this file.
1 /*
2  * SPDX-FileCopyrightText: 2006-2021 Istituto Italiano di Tecnologia (IIT)
3  * SPDX-FileCopyrightText: 2010 Daniel Krieg <krieg@fias.uni-frankfurt.de>
4  * SPDX-License-Identifier: BSD-3-Clause
5  */
6 
7 #include "MpiBcastStream.h"
8 
9 using namespace yarp::os;
10 
11 
13  comm->sema.wait();
14  int cmd = CMD_JOIN;
15  MPI_Bcast(&cmd, 1, MPI_INT, 0,comm->comm);
16 }
17 
18 
19 
20 // Connection commands
21 
22 void MpiBcastStream::execCmd(int cmd) {
23  switch (cmd) {
24  case CMD_JOIN:
25  // Connect:
26  // Let a new port join the broadcast group
27  comm->accept();
28  break;
29  case CMD_DISCONNECT:
30  // Disconnect:
31  // Let a port leave the broadcast group
32  int length;
33  MPI_Bcast(&length, 1, MPI_INT, 0,comm->comm);
34  char* remote = new char[length];
35  MPI_Bcast(remote, length, MPI_CHAR, 0,comm->comm);
36  terminate = !strcmp(remote, name.c_str());
37  yCDebug(MPI_CARRIER, "[MpiBcastStream @ %s] Got disconnect : %s => %d", name.c_str(), remote, terminate);
38  delete [] remote;
39  comm->disconnect(terminate);
40  break;
41  }
42 }
43 
44 
45 
47 // InputStream
48 
50  if (terminate) {
51  return -1;
52  }
53  if (readAvail == 0) {
54  // get new data
55  reset();
56  int size;
57  yCDebug(MPI_CARRIER, "[MpiBcastStream @ %s] Trying to read", name.c_str());
58 
59  MPI_Bcast(&size, 1, MPI_INT, 0,comm->comm);
60  yCDebug(MPI_CARRIER, "[MpiBcastStream @ %s] got size %d", name.c_str(), size);
61  if (size < 0) {
62  execCmd(size);
63  return 0;
64  }
65  if ((size_t)size == b.length()) {
66  // size of received data matches expected data
67  // do not use buffer, but write directly
68  MPI_Bcast(b.get(), size, MPI_BYTE, 0, comm->comm);
69  return size;
70  }
71  else {
72  // allocate new buffer
73  readBuffer = new char[size];
74  MPI_Bcast(readBuffer, size, MPI_BYTE, 0, comm->comm);
75  yCDebug(MPI_CARRIER, "got new msg of size %d", size);
76  readAvail = size;
77  readAt = 0;
78  }
79  }
80  if (readAvail>0) {
81  // copy data from buffer to destination object
82  int take = readAvail;
83  if (take>(int)b.length()) {
84  take = b.length();
85  }
86  memcpy(b.get(),readBuffer+readAt,take);
87  yCDebug(MPI_CARRIER, "read %d of %d", take, readAvail);
88  readAt += take;
89  readAvail -= take;
90  return take;
91  }
92  return 0;
93 }
94 
96 // OutputStream
97 
98 void MpiBcastStream::write(const Bytes& b) {
99  yCDebug(MPI_CARRIER, "[MpiBcastStream @ %s] getting sema for write", name.c_str());
100  comm->sema.wait();
101 
102  yCDebug(MPI_CARRIER, "[MpiBcastStream @ %s] trying to write", name.c_str());
103  int size = b.length();
104  MPI_Bcast(&size, 1, MPI_INT, 0, comm->comm );
105  MPI_Bcast((void*)b.get(), size, MPI_BYTE, 0, comm->comm );
106  comm->sema.post();
107 
108  yCDebug(MPI_CARRIER, "[MpiBcastStream @ %s] done writing", name.c_str());
109 }
#define CMD_DISCONNECT
#define CMD_JOIN
const yarp::os::LogComponent & MPI_CARRIER()
ssize_t read(yarp::os::Bytes &b) override=0
Read and return a single byte.
void write(const yarp::os::Bytes &b) override=0
void execCmd(int cmd)
A simple abstraction for a block of bytes.
Definition: Bytes.h:25
size_t length() const
Definition: Bytes.cpp:22
const char * get() const
Definition: Bytes.cpp:27
#define yCDebug(component,...)
Definition: LogComponent.h:109
::ssize_t ssize_t
Definition: numeric.h:86
An interface to the operating system, including Port based communication.