YARP
Yet Another Robot Platform
MpiCarrier.cpp
Go to the documentation of this file.
1 /*
2  * SPDX-FileCopyrightText: 2006-2021 Istituto Italiano di Tecnologia (IIT)
3  * SPDX-FileCopyrightText: 2010 Daniel Krieg <krieg@fias.uni-frankfurt.de>
4  * SPDX-License-Identifier: BSD-3-Clause
5  */
6 
7 #include "MpiCarrier.h"
8 
9 #include <yarp/os/Route.h>
10 #include <sys/types.h>
11 
12 using namespace yarp::os;
13 
14 MpiCarrier::MpiCarrier() : stream(nullptr), comm(nullptr) {
15 }
16 
18  yCTrace(MPI_CARRIER, "[MpiCarrier @ %s] Destructor called", route.c_str() );
19 }
20 
21 void MpiCarrier::getHeader(Bytes& header) const {
22  for (size_t i=0; i<8 && i<header.length(); i++) {
23  header.get()[i] = target.c_str()[i];
24  }
25 }
26 
27 bool MpiCarrier::checkHeader(const Bytes& header) {
28  if (header.length()!=8) {
29  return false;
30  }
31  for (int i=0; i<8; i++) {
32  if (header.get()[i] != target.c_str()[i]) {
33  return false;
34  }
35  }
36  return true;
37 }
38 
40  // Send the "magic number" for this carrier
41  ManagedBytes header(8);
42  getHeader(header.bytes());
43  proto.os().write(header.bytes());
44  if (!proto.os().isOk()) {
45  return false;
46  }
47 
48  // Now we can do whatever we want, as long as somehow
49  // we also send the name of the originating port
50 
51  name = proto.getRoute().getFromName();
52  other = proto.getRoute().getToName();
53  Bytes b2((char*)name.c_str(),name.length());
54  proto.os().write(b2);
55  proto.os().write('\r');
56  proto.os().write('\n');
57 
58  // Sender
59  route = name + "->" + other;
60 
61  createStream(true);
62 
63  if (!MpiControl) {
64  return false;
65  }
66  if (!MpiControl->isRunning()) {
67  return false;
68  }
69  comm->openPort();
70  char* port = comm->port_name;
71  char* uid = comm->unique_id;
72 
73  yCDebug(MPI_CARRIER, "[MpiCarrier @ %s] setting up MpiPort '%s'", route.c_str(), port);
74 
75  Bytes b4(uid,strlen(uid));
76  proto.os().write(b4);
77  proto.os().write('\r');
78  proto.os().write('\n');
79 
80  Bytes b3(port,strlen(port));
81  proto.os().write(b3);
82  proto.os().write('\r');
83  proto.os().write('\n');
84  proto.os().flush();
85 
86  yCDebug(MPI_CARRIER, "[MpiCarrier @ %s] Header sent", route.c_str());
87 
88  return proto.os().isOk();
89 }
90 
91 
92 
94  // interpret everything that sendHeader wrote
95  name = proto.getRoute().getToName();
96 
97  yCDebug(MPI_CARRIER, "[MpiCarrier @ %s] Waiting for header", route.c_str());
98 
99  other = proto.is().readLine();
100  Route r = proto.getRoute();
101  r.setFromName(other);
102  proto.setRoute(r);
103 
104  // Receiver
105  route = name + "<-" + other;
106 
107  createStream(false);
108  if (!MpiControl) {
109  return false;
110  }
111  if (!MpiControl->isRunning()) {
112  return false;
113  }
114 
115  std::string other_id = proto.is().readLine();
116  bool notLocal = comm->notLocal(other_id);
117 
118  port = proto.is().readLine();
119 
120  yCDebug(MPI_CARRIER, "[MpiCarrier @ %s] Header received", route.c_str());
121 
122  return notLocal && proto.is().isOk();
123 }
124 
126  // SWITCH TO NEW STREAM TYPE
127  yCDebug(MPI_CARRIER, "[MpiCarrier @ %s] trying to connect to MpiPort '%s'", route.c_str(), port.c_str());
128 
129  if (!comm->connect(port)) {
130  delete stream;
131  return false;
132  }
133  proto.takeStreams(stream);
134 
135  yCDebug(MPI_CARRIER, "[MpiCarrier @ %s] MpiStream successfully setup", route.c_str() );
136 
137  return proto.is().isOk();
138 }
139 
141  // SWITCH TO NEW STREAM TYPE
142  if (!comm->accept()) {
143  delete stream;
144  return false;
145  }
146  proto.takeStreams(stream);
147 
148  yCDebug(MPI_CARRIER, "[MpiCarrier @ %s] MpiStream successfully setup", route.c_str() );
149 
150  return proto.os().isOk();
151 }
MpiControlThread * MpiControl
Definition: MpiComm.cpp:22
const yarp::os::LogComponent & MPI_CARRIER()
void getHeader(yarp::os::Bytes &header) const override
Provide 8 bytes describing this connection sufficiently to allow the other side of a connection to se...
Definition: MpiCarrier.cpp:21
bool sendHeader(yarp::os::ConnectionState &proto) override
Write a header appropriate to the carrier to the connection, followed by any carrier-specific data.
Definition: MpiCarrier.cpp:39
std::string target
Definition: MpiCarrier.h:32
std::string port
Definition: MpiCarrier.h:30
bool expectSenderSpecifier(yarp::os::ConnectionState &proto) override
Expect the name of the sending port.
Definition: MpiCarrier.cpp:93
MpiStream * stream
Definition: MpiCarrier.h:28
virtual void createStream(bool sender)=0
bool expectReplyToHeader(yarp::os::ConnectionState &proto) override
Process reply to header, if one is expected for this carrier.
Definition: MpiCarrier.cpp:140
virtual ~MpiCarrier()
Definition: MpiCarrier.cpp:17
MpiComm * comm
Definition: MpiCarrier.h:29
bool checkHeader(const yarp::os::Bytes &header) override
Given the first 8 bytes received on a connection, decide if this is the right carrier type to use for...
Definition: MpiCarrier.cpp:27
bool respondToHeader(yarp::os::ConnectionState &proto) override
Respond to the header.
Definition: MpiCarrier.cpp:125
std::string name
Definition: MpiCarrier.h:31
std::string route
Definition: MpiCarrier.h:31
std::string other
Definition: MpiCarrier.h:31
bool accept()
Definition: MpiComm.cpp:143
bool connect(std::string port)
Definition: MpiComm.cpp:113
bool notLocal(std::string other)
Definition: MpiComm.cpp:105
char unique_id[10+MPI_MAX_PROCESSOR_NAME]
Definition: MpiComm.h:55
void openPort()
Definition: MpiComm.h:70
char port_name[MPI_MAX_PORT_NAME]
Definition: MpiComm.h:54
A simple abstraction for a block of bytes.
Definition: Bytes.h:25
size_t length() const
Definition: Bytes.cpp:22
const char * get() const
Definition: Bytes.cpp:27
The basic state of a connection - route, streams in use, etc.
virtual const Route & getRoute() const =0
Get the route associated with this connection.
virtual void takeStreams(TwoWayStream *streams)=0
Provide streams to be used with the connection.
InputStream & is()
Shorthand for getInputStream()
OutputStream & os()
Shorthand for getOutputStream()
virtual void setRoute(const Route &route)=0
Set the route associated with this connection.
virtual bool isOk() const =0
Check if the stream is ok or in an error state.
std::string readLine(const char terminal='\n', bool *success=nullptr)
Read a block of text terminated with a specific marker (or EOF).
Definition: InputStream.cpp:54
An abstraction for a block of bytes, with optional responsibility for allocating/destroying that bloc...
Definition: ManagedBytes.h:22
const Bytes & bytes() const
virtual void flush()
Make sure all pending write operations are finished.
virtual bool isOk() const =0
Check if the stream is ok or in an error state.
virtual void write(char ch)
Write a single byte to the stream.
Information about a connection between two ports.
Definition: Route.h:29
const std::string & getToName() const
Get the destination of the route.
Definition: Route.cpp:103
const std::string & getFromName() const
Get the source of the route.
Definition: Route.cpp:93
void setFromName(const std::string &fromName)
Set the source of the route.
Definition: Route.cpp:98
bool isRunning()
Returns true if the thread is running (Thread::start has been called successfully and the thread has ...
Definition: Thread.cpp:105
#define yCTrace(component,...)
Definition: LogComponent.h:85
#define yCDebug(component,...)
Definition: LogComponent.h:109
An interface to the operating system, including Port based communication.