YARP
Yet Another Robot Platform
WebSocketStream.cpp
Go to the documentation of this file.
1 /*
2  * SPDX-FileCopyrightText: 2006-2021 Istituto Italiano di Tecnologia (IIT)
3  * SPDX-License-Identifier: BSD-3-Clause
4  */
5 
6 #include "WebSocketStream.h"
7 
8 #include <yarp/os/LogComponent.h>
9 #include <yarp/os/LogStream.h>
10 #include <yarp/os/NetType.h>
11 #include <yarp/os/NetInt64.h>
12 
13 using namespace yarp::os;
14 
16  "yarp.stream.websocket",
20  nullptr)
21 
22 
24  delegate(delegate)
25 {
26 }
27 
28 
30 {
32 }
33 
34 
36 {
38  return *this;
39 }
40 
41 
43 {
45  return *this;
46 }
47 
48 
50 {
52  return local;
53 }
54 
55 
57 {
59  return remote;
60 }
61 
62 
64 {
66  close();
67 }
68 
69 
71 {
75  makeFrame(CLOSING_OPCODE, b, frame);
76  yarp::os::Bytes toWrite(frame.get(), frame.length());
77  return delegate->getOutputStream().write(toWrite);
78 }
79 
80 
82 {
84  size_t bytesRead = 0;
85  while (bytesRead < b.length()) {
86  // the buffer is empty
87  if (buffer.length() == 0 || buffer.length() == currentHead) {
88  WebSocketFrameType frameType;
89  do {
90  frameType = getFrame(buffer);
91  } while (frameType != BINARY_FRAME && frameType != TEXT_FRAME);
92  currentHead = 0;
93  }
94  // get the remaining bytes to read from the buffer
95  size_t remainedFromBuffer = buffer.length() - currentHead;
96  // if the buffer is enough then the size of the bytes is given,
97  // otherwise the remaining bytes from the buffer is copied and is read again from the buffer
98  size_t toAdd = (remainedFromBuffer >= b.length()) ? b.length() : remainedFromBuffer;
99  memcpy(b.get(), buffer.get() + currentHead, toAdd);
100  currentHead += toAdd;
101  bytesRead += toAdd;
102  }
103  return static_cast<yarp::conf::ssize_t>(b.length());
104 }
105 
106 
107 //TODO FIXME STE can be selected the type of frame?
109 {
112  makeFrame(BINARY_FRAME, b, frame);
113  yarp::os::Bytes toWrite(frame.get(), frame.length());
114  return delegate->getOutputStream().write(toWrite);
115 }
116 
117 
119 {
120  return true;
121 }
122 
123 
125 {
126 }
127 
128 
130 {
131 }
132 
133 
135 {
136 }
137 
138 
139 WebSocketFrameType WebSocketStream::getFrame(yarp::os::ManagedBytes& payload)
140 {
142  yarp::os::ManagedBytes header;
143  yarp::os::ManagedBytes mask_bytes;
144  header.allocate(2);
145  delegate->getInputStream().read(header.bytes());
146  unsigned char msg_opcode = header.get()[0] & 0x0F;
147  unsigned char msg_masked = (header.get()[1] >> 7) & 0x01;
148  if(msg_opcode == 0x9)
149  {
150  return PING_FRAME;
151  }
152  if(msg_opcode == 0xA) {
153  return PONG_FRAME;
154  }
155  if (msg_opcode == CLOSING_OPCODE)
156  {
157  // this returns a quit command to the caller through the yarp protocol,
158  // so the caller can quit the connection in a reasonable manner
159  unsigned char toreturn[] = "\0\0\0\0~\0\0\1q";
160  payload.allocate(10);
161  memcpy(payload.get(), toreturn, 10);
162  return CLOSING_OPCODE;
163  }
164  yarp::os::NetInt64 payload_length = 0;
165  yarp::os::NetInt32 length_field = header.get()[1] & (0x7F);
166 
167  if (length_field <= 125) {
168  payload_length = length_field;
169  } else {
170  yarp::os::ManagedBytes additionalLength;
171  int length_to_add = 0;
172  if (length_field == 126) { //msglen is 16bit!
173  length_to_add = 2;
174  } else if (length_field == 127) { //msglen is 64bit!
175  length_to_add = 8;
176  }
177  additionalLength.allocate(length_to_add);
178  delegate->getInputStream().read(additionalLength.bytes());
179  for (int i =0; i < length_to_add; i++) {
180  memcpy(reinterpret_cast<unsigned char *>(&payload_length) + i,
181  reinterpret_cast<unsigned char *>(&additionalLength.get()[(length_to_add-1) - i]),
182  1);
183  }
184  }
185 
186  if (msg_masked) {
187  // get the mask
188  mask_bytes.allocate(4);
189  delegate->getInputStream().read(mask_bytes.bytes());
190  }
191 
192  payload.allocate(payload_length);
193  delegate->getInputStream().read(payload.bytes());
194  if (msg_masked) {
195  // unmask data:
196  for (int i = 0; i < payload_length; i++) {
197  payload.get()[i] = payload.get()[i] ^ mask_bytes.get()[i % 4];
198  }
199  }
200 
201  if(msg_opcode == 0x0 || msg_opcode == 0x1)
202  {
203  return TEXT_FRAME;
204  }
205  if(msg_opcode == 0x2)
206  {
207  return BINARY_FRAME;
208  }
209  return ERROR_FRAME;
210 }
211 
212 
213 // TODO FIXME STE need to manage if frame is not passed
214 void WebSocketStream::makeFrame(WebSocketFrameType frame_type,
215  const yarp::os::Bytes& payload,
216  yarp::os::ManagedBytes& frame)
217 {
219  int pos = 0;
220  size_t size = payload.length();
221 
222  if (size <= 125) {
223  frame.allocate(2 + size);
224  } else if (size <= 65535) {
225  frame.allocate(4 + size);
226  } else { // >2^16-1 (65535)
227  frame.allocate(10 + size);
228  }
229 
230  frame.get()[pos] = static_cast<char>(frame_type); // text frame
231  pos++;
232 
233 
234  if (size <= 125) {
235  // this is a 7 bit size (the first bit is the mask
236  // that must be set to 0)
237  frame.get()[pos++] = size;
238  } else if (size <= 65535) {
239  frame.get()[pos++] = 126; //16 bit length follows
240  frame.get()[pos++] = (size >> 8) & 0xFF; // leftmost first
241  frame.get()[pos++] = size & 0xFF;
242  } else { // >2^16-1 (65535)
243  frame.get()[pos++] = 127; //64 bit length follows
244  // write the actual 64bit msg_length in the next 4 bytes
245  for (int i = 7; i >= 0; i--) {
246  frame.get()[pos++] = ((size >> 8 * i) & 0xFF);
247  }
248  }
249  memcpy(reinterpret_cast<void*>(frame.get() + pos), payload.get(), size);
250 }
const yarp::os::LogComponent & WEBSOCK_STREAM()
void reset() override
Reset the stream.
const yarp::os::Contact & getLocalAddress() const override
Get the address of the local side of the stream.
void beginPacket() override
Mark the beginning of a logical packet.
~WebSocketStream() override
void endPacket() override
Mark the end of a logical packet (see beginPacket).
void write(const yarp::os::Bytes &bytesToWrite) override
Write a block of bytes to the stream.
void interrupt() override
Interrupt the stream.
void close() override
Terminate the stream.
OutputStream & getOutputStream() override
Get an OutputStream to write to.
const yarp::os::Contact & getRemoteAddress() const override
Get the address of the remote side of the stream.
InputStream & getInputStream() override
Get an InputStream to read from.
bool isOk() const override
Check if the stream is ok or in an error state.
A simple abstraction for a block of bytes.
Definition: Bytes.h:25
size_t length() const
Definition: Bytes.cpp:22
const char * get() const
Definition: Bytes.cpp:27
Represents how to reach a part of a YARP network.
Definition: Contact.h:36
Simple specification of the minimum functions needed from input streams.
Definition: InputStream.h:26
virtual int read()
Read and return a single byte.
Definition: InputStream.cpp:20
static LogCallback printCallback()
Get current print callback.
Definition: Log.cpp:821
@ LogTypeReserved
Definition: Log.h:80
@ TraceType
Definition: Log.h:74
An abstraction for a block of bytes, with optional responsibility for allocating/destroying that bloc...
Definition: ManagedBytes.h:22
const Bytes & bytes() const
void allocate(size_t len)
Makes a data block of the specified length that will be deleted if this object is destroyed.
const char * get() const
Simple specification of the minimum functions needed from output streams.
Definition: OutputStream.h:22
A stream which can be asked to perform bidirectional communication.
Definition: TwoWayStream.h:26
#define yCTrace(component,...)
Definition: LogComponent.h:85
#define YARP_LOG_COMPONENT(name,...)
Definition: LogComponent.h:77
::ssize_t ssize_t
Definition: numeric.h:86
An interface to the operating system, including Port based communication.
std::int64_t NetInt64
Definition of the NetInt64 type.
Definition: NetInt64.h:30
std::int32_t NetInt32
Definition of the NetInt32 type.
Definition: NetInt32.h:30
The main, catch-all namespace for YARP.
Definition: dirs.h:16
size_t length
Definition: V4L_camera.h:65