YARP
Yet Another Robot Platform
 
Loading...
Searching...
No Matches
MpiCarrier.cpp
Go to the documentation of this file.
1/*
2 * SPDX-FileCopyrightText: 2006-2021 Istituto Italiano di Tecnologia (IIT)
3 * SPDX-FileCopyrightText: 2010 Daniel Krieg <krieg@fias.uni-frankfurt.de>
4 * SPDX-License-Identifier: BSD-3-Clause
5 */
6
7#include "MpiCarrier.h"
8
9#include <yarp/os/Route.h>
10#include <sys/types.h>
11
12using namespace yarp::os;
13
14MpiCarrier::MpiCarrier() : stream(nullptr), comm(nullptr) {
15}
16
18 yCTrace(MPI_CARRIER, "[MpiCarrier @ %s] Destructor called", route.c_str() );
19}
20
21void MpiCarrier::getHeader(Bytes& header) const {
22 for (size_t i=0; i<8 && i<header.length(); i++) {
23 header.get()[i] = target.c_str()[i];
24 }
25}
26
27bool MpiCarrier::checkHeader(const Bytes& header) {
28 if (header.length()!=8) {
29 return false;
30 }
31 for (int i=0; i<8; i++) {
32 if (header.get()[i] != target.c_str()[i]) {
33 return false;
34 }
35 }
36 return true;
37}
38
40 // Send the "magic number" for this carrier
41 ManagedBytes header(8);
42 getHeader(header.bytes());
43 proto.os().write(header.bytes());
44 if (!proto.os().isOk()) {
45 return false;
46 }
47
48 // Now we can do whatever we want, as long as somehow
49 // we also send the name of the originating port
50
51 name = proto.getRoute().getFromName();
52 other = proto.getRoute().getToName();
53 Bytes b2((char*)name.c_str(),name.length());
54 proto.os().write(b2);
55 proto.os().write('\r');
56 proto.os().write('\n');
57
58 // Sender
59 route = name + "->" + other;
60
61 createStream(true);
62
63 if (!MpiControl) {
64 return false;
65 }
66 if (!MpiControl->isRunning()) {
67 return false;
68 }
69 comm->openPort();
70 char* port = comm->port_name;
71 char* uid = comm->unique_id;
72
73 yCDebug(MPI_CARRIER, "[MpiCarrier @ %s] setting up MpiPort '%s'", route.c_str(), port);
74
76 proto.os().write(b4);
77 proto.os().write('\r');
78 proto.os().write('\n');
79
81 proto.os().write(b3);
82 proto.os().write('\r');
83 proto.os().write('\n');
84 proto.os().flush();
85
86 yCDebug(MPI_CARRIER, "[MpiCarrier @ %s] Header sent", route.c_str());
87
88 return proto.os().isOk();
89}
90
91
92
94 // interpret everything that sendHeader wrote
95 name = proto.getRoute().getToName();
96
97 yCDebug(MPI_CARRIER, "[MpiCarrier @ %s] Waiting for header", route.c_str());
98
99 other = proto.is().readLine();
100 Route r = proto.getRoute();
102 proto.setRoute(r);
103
104 // Receiver
105 route = name + "<-" + other;
106
107 createStream(false);
108 if (!MpiControl) {
109 return false;
110 }
111 if (!MpiControl->isRunning()) {
112 return false;
113 }
114
115 std::string other_id = proto.is().readLine();
116 bool notLocal = comm->notLocal(other_id);
117
118 port = proto.is().readLine();
119
120 yCDebug(MPI_CARRIER, "[MpiCarrier @ %s] Header received", route.c_str());
121
122 return notLocal && proto.is().isOk();
123}
124
126 // SWITCH TO NEW STREAM TYPE
127 yCDebug(MPI_CARRIER, "[MpiCarrier @ %s] trying to connect to MpiPort '%s'", route.c_str(), port.c_str());
128
129 if (!comm->connect(port)) {
130 delete stream;
131 return false;
132 }
133 proto.takeStreams(stream);
134
135 yCDebug(MPI_CARRIER, "[MpiCarrier @ %s] MpiStream successfully setup", route.c_str() );
136
137 return proto.is().isOk();
138}
139
141 // SWITCH TO NEW STREAM TYPE
142 if (!comm->accept()) {
143 delete stream;
144 return false;
145 }
146 proto.takeStreams(stream);
147
148 yCDebug(MPI_CARRIER, "[MpiCarrier @ %s] MpiStream successfully setup", route.c_str() );
149
150 return proto.os().isOk();
151}
MpiControlThread * MpiControl
Definition MpiComm.cpp:22
const yarp::os::LogComponent & MPI_CARRIER()
void getHeader(yarp::os::Bytes &header) const override
Provide 8 bytes describing this connection sufficiently to allow the other side of a connection to se...
bool sendHeader(yarp::os::ConnectionState &proto) override
Write a header appropriate to the carrier to the connection, followed by any carrier-specific data.
std::string target
Definition MpiCarrier.h:32
std::string port
Definition MpiCarrier.h:30
bool expectSenderSpecifier(yarp::os::ConnectionState &proto) override
Expect the name of the sending port.
MpiStream * stream
Definition MpiCarrier.h:28
virtual void createStream(bool sender)=0
bool expectReplyToHeader(yarp::os::ConnectionState &proto) override
Process reply to header, if one is expected for this carrier.
virtual ~MpiCarrier()
MpiComm * comm
Definition MpiCarrier.h:29
bool checkHeader(const yarp::os::Bytes &header) override
Given the first 8 bytes received on a connection, decide if this is the right carrier type to use for...
bool respondToHeader(yarp::os::ConnectionState &proto) override
Respond to the header.
std::string name
Definition MpiCarrier.h:31
std::string route
Definition MpiCarrier.h:31
std::string other
Definition MpiCarrier.h:31
bool accept()
Definition MpiComm.cpp:143
bool connect(std::string port)
Definition MpiComm.cpp:113
bool notLocal(std::string other)
Definition MpiComm.cpp:105
char unique_id[10+MPI_MAX_PROCESSOR_NAME]
Definition MpiComm.h:55
void openPort()
Definition MpiComm.h:70
char port_name[MPI_MAX_PORT_NAME]
Definition MpiComm.h:54
A mini-server for performing network communication in the background.
A simple abstraction for a block of bytes.
Definition Bytes.h:24
size_t length() const
Definition Bytes.cpp:22
const char * get() const
Definition Bytes.cpp:27
The basic state of a connection - route, streams in use, etc.
OutputStream & os()
Shorthand for getOutputStream()
virtual const Route & getRoute() const =0
Get the route associated with this connection.
InputStream & is()
Shorthand for getInputStream()
virtual void takeStreams(TwoWayStream *streams)=0
Provide streams to be used with the connection.
virtual void setRoute(const Route &route)=0
Set the route associated with this connection.
virtual bool isOk() const =0
Check if the stream is ok or in an error state.
std::string readLine(const char terminal='\n', bool *success=nullptr)
Read a block of text terminated with a specific marker (or EOF).
An abstraction for a block of bytes, with optional responsibility for allocating/destroying that bloc...
const Bytes & bytes() const
virtual void flush()
Make sure all pending write operations are finished.
virtual bool isOk() const =0
Check if the stream is ok or in an error state.
virtual void write(char ch)
Write a single byte to the stream.
Information about a connection between two ports.
Definition Route.h:28
void setFromName(const std::string &fromName)
Set the source of the route.
Definition Route.cpp:98
bool isRunning()
Returns true if the thread is running (Thread::start has been called successfully and the thread has ...
Definition Thread.cpp:105
#define yCTrace(component,...)
#define yCDebug(component,...)
An interface to the operating system, including Port based communication.