16 "yarp.stream.websocket",
75 makeFrame(CLOSING_OPCODE, b, frame);
77 return delegate->getOutputStream().write(toWrite);
85 while (bytesRead < b.
length()) {
88 WebSocketFrameType frameType;
90 frameType = getFrame(
buffer);
91 }
while (frameType != BINARY_FRAME && frameType != TEXT_FRAME);
95 size_t remainedFromBuffer =
buffer.
length() - currentHead;
98 size_t toAdd = (remainedFromBuffer >= b.
length()) ? b.
length() : remainedFromBuffer;
99 memcpy(b.
get(),
buffer.get() + currentHead, toAdd);
100 currentHead += toAdd;
112 makeFrame(BINARY_FRAME, b, frame);
114 return delegate->getOutputStream().write(toWrite);
145 delegate->getInputStream().read(header.
bytes());
146 unsigned char msg_opcode = header.
get()[0] & 0x0F;
147 unsigned char msg_masked = (header.
get()[1] >> 7) & 0x01;
148 if(msg_opcode == 0x9)
152 if(msg_opcode == 0xA) {
155 if (msg_opcode == CLOSING_OPCODE)
159 unsigned char toreturn[] =
"\0\0\0\0~\0\0\1q";
161 memcpy(payload.
get(), toreturn, 10);
162 return CLOSING_OPCODE;
167 if (length_field <= 125) {
168 payload_length = length_field;
171 int length_to_add = 0;
172 if (length_field == 126) {
174 }
else if (length_field == 127) {
177 additionalLength.
allocate(length_to_add);
178 delegate->getInputStream().read(additionalLength.
bytes());
179 for (
int i =0; i < length_to_add; i++) {
180 memcpy(
reinterpret_cast<unsigned char *
>(&payload_length) + i,
181 reinterpret_cast<unsigned char *
>(&additionalLength.
get()[(length_to_add-1) - i]),
189 delegate->getInputStream().read(mask_bytes.
bytes());
193 delegate->getInputStream().read(payload.
bytes());
196 for (
int i = 0; i < payload_length; i++) {
197 payload.
get()[i] = payload.
get()[i] ^ mask_bytes.
get()[i % 4];
201 if(msg_opcode == 0x0 || msg_opcode == 0x1)
205 if(msg_opcode == 0x2)
214 void WebSocketStream::makeFrame(WebSocketFrameType frame_type,
220 size_t size = payload.
length();
224 }
else if (size <= 65535) {
230 frame.
get()[pos] =
static_cast<char>(frame_type);
237 frame.
get()[pos++] = size;
238 }
else if (size <= 65535) {
239 frame.
get()[pos++] = 126;
240 frame.
get()[pos++] = (size >> 8) & 0xFF;
241 frame.
get()[pos++] = size & 0xFF;
243 frame.
get()[pos++] = 127;
245 for (
int i = 7; i >= 0; i--) {
246 frame.
get()[pos++] = ((size >> 8 * i) & 0xFF);
249 memcpy(
reinterpret_cast<void*
>(frame.
get() + pos), payload.
get(), size);
const yarp::os::LogComponent & WEBSOCK_STREAM()
void reset() override
Reset the stream.
const yarp::os::Contact & getLocalAddress() const override
Get the address of the local side of the stream.
void beginPacket() override
Mark the beginning of a logical packet.
~WebSocketStream() override
void endPacket() override
Mark the end of a logical packet (see beginPacket).
void write(const yarp::os::Bytes &bytesToWrite) override
Write a block of bytes to the stream.
void interrupt() override
Interrupt the stream.
void close() override
Terminate the stream.
OutputStream & getOutputStream() override
Get an OutputStream to write to.
const yarp::os::Contact & getRemoteAddress() const override
Get the address of the remote side of the stream.
InputStream & getInputStream() override
Get an InputStream to read from.
bool isOk() const override
Check if the stream is ok or in an error state.
A simple abstraction for a block of bytes.
static LogCallback printCallback()
Get current print callback.
An abstraction for a block of bytes, with optional responsibility for allocating/destroying that bloc...
const Bytes & bytes() const
void allocate(size_t len)
Makes a data block of the specified length that will be deleted if this object is destroyed.
Simple specification of the minimum functions needed from output streams.
A stream which can be asked to perform bidirectional communication.
#define yCTrace(component,...)
#define YARP_LOG_COMPONENT(name,...)
An interface to the operating system, including Port based communication.
std::int64_t NetInt64
Definition of the NetInt64 type.
std::int32_t NetInt32
Definition of the NetInt32 type.
The main, catch-all namespace for YARP.