YARP
Yet Another Robot Platform
MpiCarrier.cpp
Go to the documentation of this file.
1 /*
2  * Copyright (C) 2006-2020 Istituto Italiano di Tecnologia (IIT)
3  * Copyright (C) 2010 Daniel Krieg <krieg@fias.uni-frankfurt.de>
4  * All rights reserved.
5  *
6  * This software may be modified and distributed under the terms of the
7  * BSD-3-Clause license. See the accompanying LICENSE file for details.
8  */
9 
10 #include "MpiCarrier.h"
11 
12 #include <yarp/os/Route.h>
13 #include <sys/types.h>
14 
15 using namespace yarp::os;
16 
17 MpiCarrier::MpiCarrier() : stream(nullptr), comm(nullptr) {
18 }
19 
21  yCTrace(MPI_CARRIER, "[MpiCarrier @ %s] Destructor called", route.c_str() );
22 }
23 
24 void MpiCarrier::getHeader(Bytes& header) const {
25  for (size_t i=0; i<8 && i<header.length(); i++) {
26  header.get()[i] = target.c_str()[i];
27  }
28 }
29 
30 bool MpiCarrier::checkHeader(const Bytes& header) {
31  if (header.length()!=8) {
32  return false;
33  }
34  for (int i=0; i<8; i++) {
35  if (header.get()[i] != target.c_str()[i]) {
36  return false;
37  }
38  }
39  return true;
40 }
41 
43  // Send the "magic number" for this carrier
44  ManagedBytes header(8);
45  getHeader(header.bytes());
46  proto.os().write(header.bytes());
47  if (!proto.os().isOk()) return false;
48 
49  // Now we can do whatever we want, as long as somehow
50  // we also send the name of the originating port
51 
52  name = proto.getRoute().getFromName();
53  other = proto.getRoute().getToName();
54  Bytes b2((char*)name.c_str(),name.length());
55  proto.os().write(b2);
56  proto.os().write('\r');
57  proto.os().write('\n');
58 
59  // Sender
60  route = name + "->" + other;
61 
62  createStream(true);
63 
64  if (!MpiControl) return false;
65  if (! MpiControl->isRunning())
66  return false;
67  comm->openPort();
68  char* port = comm->port_name;
69  char* uid = comm->unique_id;
70 
71  yCDebug(MPI_CARRIER, "[MpiCarrier @ %s] setting up MpiPort '%s'", route.c_str(), port);
72 
73  Bytes b4(uid,strlen(uid));
74  proto.os().write(b4);
75  proto.os().write('\r');
76  proto.os().write('\n');
77 
78  Bytes b3(port,strlen(port));
79  proto.os().write(b3);
80  proto.os().write('\r');
81  proto.os().write('\n');
82  proto.os().flush();
83 
84  yCDebug(MPI_CARRIER, "[MpiCarrier @ %s] Header sent", route.c_str());
85 
86  return proto.os().isOk();
87 }
88 
89 
90 
92  // interpret everything that sendHeader wrote
93  name = proto.getRoute().getToName();
94 
95  yCDebug(MPI_CARRIER, "[MpiCarrier @ %s] Waiting for header", route.c_str());
96 
97  other = proto.is().readLine();
98  Route r = proto.getRoute();
99  r.setFromName(other);
100  proto.setRoute(r);
101 
102  // Receiver
103  route = name + "<-" + other;
104 
105  createStream(false);
106  if (!MpiControl) return false;
107  if (! MpiControl->isRunning())
108  return false;
109 
110  std::string other_id = proto.is().readLine();
111  bool notLocal = comm->notLocal(other_id);
112 
113  port = proto.is().readLine();
114 
115  yCDebug(MPI_CARRIER, "[MpiCarrier @ %s] Header received", route.c_str());
116 
117  return notLocal && proto.is().isOk();
118 }
119 
121  // SWITCH TO NEW STREAM TYPE
122  yCDebug(MPI_CARRIER, "[MpiCarrier @ %s] trying to connect to MpiPort '%s'", route.c_str(), port.c_str());
123 
124  if (!comm->connect(port)) {
125  delete stream;
126  return false;
127  }
128  proto.takeStreams(stream);
129 
130  yCDebug(MPI_CARRIER, "[MpiCarrier @ %s] MpiStream successfully setup", route.c_str() );
131 
132  return proto.is().isOk();
133 }
134 
136  // SWITCH TO NEW STREAM TYPE
137  if (!comm->accept()) {
138  delete stream;
139  return false;
140  }
141  proto.takeStreams(stream);
142 
143  yCDebug(MPI_CARRIER, "[MpiCarrier @ %s] MpiStream successfully setup", route.c_str() );
144 
145  return proto.os().isOk();
146 }
MpiControlThread * MpiControl
Definition: MpiComm.cpp:25
const yarp::os::LogComponent & MPI_CARRIER()
void getHeader(yarp::os::Bytes &header) const override
Provide 8 bytes describing this connection sufficiently to allow the other side of a connection to se...
Definition: MpiCarrier.cpp:24
bool sendHeader(yarp::os::ConnectionState &proto) override
Write a header appropriate to the carrier to the connection, followed by any carrier-specific data.
Definition: MpiCarrier.cpp:42
std::string target
Definition: MpiCarrier.h:35
std::string port
Definition: MpiCarrier.h:33
bool expectSenderSpecifier(yarp::os::ConnectionState &proto) override
Expect the name of the sending port.
Definition: MpiCarrier.cpp:91
MpiStream * stream
Definition: MpiCarrier.h:31
virtual void createStream(bool sender)=0
bool expectReplyToHeader(yarp::os::ConnectionState &proto) override
Process reply to header, if one is expected for this carrier.
Definition: MpiCarrier.cpp:135
virtual ~MpiCarrier()
Definition: MpiCarrier.cpp:20
MpiComm * comm
Definition: MpiCarrier.h:32
bool checkHeader(const yarp::os::Bytes &header) override
Given the first 8 bytes received on a connection, decide if this is the right carrier type to use for...
Definition: MpiCarrier.cpp:30
bool respondToHeader(yarp::os::ConnectionState &proto) override
Respond to the header.
Definition: MpiCarrier.cpp:120
std::string name
Definition: MpiCarrier.h:34
std::string route
Definition: MpiCarrier.h:34
std::string other
Definition: MpiCarrier.h:34
bool accept()
Definition: MpiComm.cpp:146
bool connect(std::string port)
Definition: MpiComm.cpp:116
bool notLocal(std::string other)
Definition: MpiComm.cpp:108
char unique_id[10+MPI_MAX_PROCESSOR_NAME]
Definition: MpiComm.h:58
void openPort()
Definition: MpiComm.h:73
char port_name[MPI_MAX_PORT_NAME]
Definition: MpiComm.h:57
A simple abstraction for a block of bytes.
Definition: Bytes.h:28
size_t length() const
Definition: Bytes.cpp:25
const char * get() const
Definition: Bytes.cpp:30
The basic state of a connection - route, streams in use, etc.
virtual const Route & getRoute() const =0
Get the route associated with this connection.
virtual void takeStreams(TwoWayStream *streams)=0
Provide streams to be used with the connection.
InputStream & is()
Shorthand for getInputStream()
OutputStream & os()
Shorthand for getOutputStream()
virtual void setRoute(const Route &route)=0
Set the route associated with this connection.
virtual bool isOk() const =0
Check if the stream is ok or in an error state.
std::string readLine(const char terminal='\n', bool *success=nullptr)
Read a block of text terminated with a specific marker (or EOF).
Definition: InputStream.cpp:57
An abstraction for a block of bytes, with optional responsibility for allocating/destroying that bloc...
Definition: ManagedBytes.h:25
const Bytes & bytes() const
virtual void flush()
Make sure all pending write operations are finished.
virtual bool isOk() const =0
Check if the stream is ok or in an error state.
virtual void write(char ch)
Write a single byte to the stream.
Information about a connection between two ports.
Definition: Route.h:32
const std::string & getToName() const
Get the destination of the route.
Definition: Route.cpp:106
const std::string & getFromName() const
Get the source of the route.
Definition: Route.cpp:96
void setFromName(const std::string &fromName)
Set the source of the route.
Definition: Route.cpp:101
bool isRunning()
Returns true if the thread is running (Thread::start has been called successfully and the thread has ...
Definition: Thread.cpp:108
#define yCTrace(component,...)
Definition: LogComponent.h:88
#define yCDebug(component,...)
Definition: LogComponent.h:112
An interface to the operating system, including Port based communication.