YARP
Yet Another Robot Platform
 
Loading...
Searching...
No Matches
WebSocketStream.cpp
Go to the documentation of this file.
1/*
2 * SPDX-FileCopyrightText: 2006-2021 Istituto Italiano di Tecnologia (IIT)
3 * SPDX-License-Identifier: BSD-3-Clause
4 */
5
6#include "WebSocketStream.h"
7
9#include <yarp/os/LogStream.h>
10#include <yarp/os/NetType.h>
11#include <yarp/os/NetInt64.h>
12
13using namespace yarp::os;
14
16 "yarp.stream.websocket",
20 nullptr)
21
22
24 delegate(delegate)
25{
26}
27
28
33
34
40
41
47
48
50{
52 return local;
53}
54
55
57{
59 return remote;
60}
61
62
68
69
79
80
82{
84 size_t bytesRead = 0;
85 while (bytesRead < b.length()) {
86 // the buffer is empty
87 if (buffer.length() == 0 || buffer.length() == currentHead) {
89 do {
90 frameType = getFrame(buffer);
91 } while (frameType != BINARY_FRAME && frameType != TEXT_FRAME);
92 currentHead = 0;
93 }
94 // get the remaining bytes to read from the buffer
95 size_t remainedFromBuffer = buffer.length() - currentHead;
96 // if the buffer is enough then the size of the bytes is given,
97 // otherwise the remaining bytes from the buffer is copied and is read again from the buffer
99 memcpy(b.get(), buffer.get() + currentHead, toAdd);
100 currentHead += toAdd;
101 bytesRead += toAdd;
102 }
103 return static_cast<yarp::conf::ssize_t>(b.length());
104}
105
106
107//TODO FIXME STE can be selected the type of frame?
109{
112 makeFrame(BINARY_FRAME, b, frame);
113 yarp::os::Bytes toWrite(frame.get(), frame.length());
114 return delegate->getOutputStream().write(toWrite);
115}
116
117
119{
120 return true;
121}
122
123
125{
126}
127
128
132
133
137
138
139WebSocketFrameType WebSocketStream::getFrame(yarp::os::ManagedBytes& payload)
140{
144 header.allocate(2);
145 delegate->getInputStream().read(header.bytes());
146 unsigned char msg_opcode = header.get()[0] & 0x0F;
147 unsigned char msg_masked = (header.get()[1] >> 7) & 0x01;
148 if(msg_opcode == 0x9)
149 {
150 return PING_FRAME;
151 }
152 if(msg_opcode == 0xA) {
153 return PONG_FRAME;
154 }
156 {
157 // this returns a quit command to the caller through the yarp protocol,
158 // so the caller can quit the connection in a reasonable manner
159 unsigned char toreturn[] = "\0\0\0\0~\0\0\1q";
160 payload.allocate(10);
161 memcpy(payload.get(), toreturn, 10);
162 return CLOSING_OPCODE;
163 }
165 yarp::os::NetInt32 length_field = header.get()[1] & (0x7F);
166
167 if (length_field <= 125) {
169 } else {
171 int length_to_add = 0;
172 if (length_field == 126) { //msglen is 16bit!
173 length_to_add = 2;
174 } else if (length_field == 127) { //msglen is 64bit!
175 length_to_add = 8;
176 }
178 delegate->getInputStream().read(additionalLength.bytes());
179 for (int i =0; i < length_to_add; i++) {
180 memcpy(reinterpret_cast<unsigned char *>(&payload_length) + i,
181 reinterpret_cast<unsigned char *>(&additionalLength.get()[(length_to_add-1) - i]),
182 1);
183 }
184 }
185
186 if (msg_masked) {
187 // get the mask
188 mask_bytes.allocate(4);
189 delegate->getInputStream().read(mask_bytes.bytes());
190 }
191
192 payload.allocate(payload_length);
193 delegate->getInputStream().read(payload.bytes());
194 if (msg_masked) {
195 // unmask data:
196 for (int i = 0; i < payload_length; i++) {
197 payload.get()[i] = payload.get()[i] ^ mask_bytes.get()[i % 4];
198 }
199 }
200
201 if(msg_opcode == 0x0 || msg_opcode == 0x1)
202 {
203 return TEXT_FRAME;
204 }
205 if(msg_opcode == 0x2)
206 {
207 return BINARY_FRAME;
208 }
209 return ERROR_FRAME;
210}
211
212
213// TODO FIXME STE need to manage if frame is not passed
214void WebSocketStream::makeFrame(WebSocketFrameType frame_type,
217{
219 int pos = 0;
220 size_t size = payload.length();
221
222 if (size <= 125) {
223 frame.allocate(2 + size);
224 } else if (size <= 65535) {
225 frame.allocate(4 + size);
226 } else { // >2^16-1 (65535)
227 frame.allocate(10 + size);
228 }
229
230 frame.get()[pos] = static_cast<char>(frame_type); // text frame
231 pos++;
232
233
234 if (size <= 125) {
235 // this is a 7 bit size (the first bit is the mask
236 // that must be set to 0)
237 frame.get()[pos++] = size;
238 } else if (size <= 65535) {
239 frame.get()[pos++] = 126; //16 bit length follows
240 frame.get()[pos++] = (size >> 8) & 0xFF; // leftmost first
241 frame.get()[pos++] = size & 0xFF;
242 } else { // >2^16-1 (65535)
243 frame.get()[pos++] = 127; //64 bit length follows
244 // write the actual 64bit msg_length in the next 4 bytes
245 for (int i = 7; i >= 0; i--) {
246 frame.get()[pos++] = ((size >> 8 * i) & 0xFF);
247 }
248 }
249 memcpy(reinterpret_cast<void*>(frame.get() + pos), payload.get(), size);
250}
const yarp::os::LogComponent & WEBSOCK_STREAM()
void reset() override
Reset the stream.
const yarp::os::Contact & getLocalAddress() const override
Get the address of the local side of the stream.
void beginPacket() override
Mark the beginning of a logical packet.
~WebSocketStream() override
void endPacket() override
Mark the end of a logical packet (see beginPacket).
void write(const yarp::os::Bytes &bytesToWrite) override
Write a block of bytes to the stream.
void interrupt() override
Interrupt the stream.
void close() override
Terminate the stream.
OutputStream & getOutputStream() override
Get an OutputStream to write to.
const yarp::os::Contact & getRemoteAddress() const override
Get the address of the remote side of the stream.
InputStream & getInputStream() override
Get an InputStream to read from.
bool isOk() const override
Check if the stream is ok or in an error state.
A mini-server for performing network communication in the background.
T * read(bool shouldWait=true) override
Read an available object from the port.
void write(bool forceStrict=false)
Write the current object being returned by BufferedPort::prepare.
A simple abstraction for a block of bytes.
Definition Bytes.h:24
size_t length() const
Definition Bytes.cpp:22
const char * get() const
Definition Bytes.cpp:27
Represents how to reach a part of a YARP network.
Definition Contact.h:33
Simple specification of the minimum functions needed from input streams.
Definition InputStream.h:25
virtual int read()
Read and return a single byte.
static LogCallback printCallback()
Get current print callback.
Definition Log.cpp:873
@ LogTypeReserved
Definition Log.h:98
@ TraceType
Definition Log.h:92
An abstraction for a block of bytes, with optional responsibility for allocating/destroying that bloc...
const Bytes & bytes() const
void allocate(size_t len)
Makes a data block of the specified length that will be deleted if this object is destroyed.
const char * get() const
Simple specification of the minimum functions needed from output streams.
A stream which can be asked to perform bidirectional communication.
virtual InputStream & getInputStream()=0
Get an InputStream to read from.
virtual OutputStream & getOutputStream()=0
Get an OutputStream to write to.
#define yCTrace(component,...)
#define YARP_LOG_COMPONENT(name,...)
::ssize_t ssize_t
Definition numeric.h:86
An interface to the operating system, including Port based communication.
std::int64_t NetInt64
Definition of the NetInt64 type.
Definition NetInt64.h:29
std::int32_t NetInt32
Definition of the NetInt32 type.
Definition NetInt32.h:29
The main, catch-all namespace for YARP.
Definition dirs.h:16
size_t length
Definition V4L_camera.h:65