YARP
Yet Another Robot Platform
 
Loading...
Searching...
No Matches
GstreamerStream.cpp
Go to the documentation of this file.
1/*
2 * SPDX-FileCopyrightText: 2024-2024 Istituto Italiano di Tecnologia (IIT)
3 * SPDX-License-Identifier: BSD-3-Clause
4 */
5
6#include "GstreamerStream.h"
7
8#include <yarp/conf/system.h>
10
11#include <yarp/os/NetType.h>
12#include <yarp/os/Time.h>
13
15#include <yarp/os/Log.h>
16#include <yarp/os/LogStream.h>
17#include <iostream>
18
19#include <cerrno>
20#include <cstring>
21
22using namespace yarp::os::impl;
23using namespace yarp::os;
24
25
27 "yarp.carrier.gstreamer.gstreamerStream",
31 nullptr)
32
33bool GstreamerStream::open(const std::string& pipeline_string, const Contact& remote)
34{
35 Contact local = Contact("127.0.0.1", -1);
36 return open(pipeline_string, local, remote);
37}
38
39bool GstreamerStream::open(const std::string& pipeline_string, const Contact& local, const Contact& remote)
40{
41 m_localAddress = local;
42 m_remoteAddress = remote;
43
44 //yCDebug(GSTREAMER_STREAM, "Update: GstreamerStream::open() from %s to %s", m_localAddress.toURI().c_str(), m_remoteAddress.toURI().c_str());
45
47 m_decoder = new GstYarpDecoder(&m_mutex, &m_sema, params);
48
49 bool ret = false;
50 ret = m_decoder->init(pipeline_string);
51 if (!ret)
52 {
53 return false;
54 }
55 ret = m_decoder->start();
56 if (!ret)
57 {
58 return false;
59 }
60
61 return true;
62}
63
68
70{
71 bool act = false;
72 m_mutex.lock();
73 if ((!m_closed) && (!m_interrupting))
74 {
75 act = true;
76 m_interrupting = true;
77 m_closed = true;
78 }
79 m_mutex.unlock();
80
81 // wait for interruption to be done
82 if (m_interrupting)
83 {
84 while (m_interrupting)
85 {
86 yCDebug(GSTREAMER_STREAM, "waiting for interrupt to be finished...");
88 }
89 }
90}
91
93{
94 if (m_decoder)
95 {
96 m_decoder->stop();
97
98 delete m_decoder;
99 m_decoder = nullptr;
100 }
101}
102
104{
105 return true;
106}
107
108//once that you enter here, it is guaranteed that frame will be not modified until prepareNextFrame() is called.
110{
111 //create a fake frame, just for test
112 if (m_debug_test_image_generation)
113 {
115 frame.resize(640, 480);
116 static int val = 0;
117 if (val) {
118 memset(frame.getRawImage(), 90, frame.getRawImageSize());
119 val = 0;
120 } else {
121 memset(frame.getRawImage(), 200, frame.getRawImageSize());
122 val = 1;
123 }
124 return &frame;
125 }
126
127 //get the frame from the decoder.
128 //this call is blocking until the frame is ready.
129 return m_decoder->getLastFrame();
130}
131
133{
134 if (m_debug_test_image_generation)
135 {
136 return;
137 }
138 return m_decoder->prepareNextFrame();
139}
140
142{
143 size_t bl = b.length();
144
145 if (m_enum_phases == enum_phases::PHASE_0_GET_IMG)
146 {
147 m_pointer_last_frame = getFrame();
148 m_enum_phases = enum_phases::PHASE_1_PREPARE_HEADER;
149 }
150 if (m_enum_phases == enum_phases::PHASE_1_PREPARE_HEADER)
151 {
152 m_imgHeader.setFromImage(*m_pointer_last_frame);
153 m_enum_phases = enum_phases::PHASE_2_SEND_HEADER;
154 }
155 if (m_enum_phases == enum_phases::PHASE_2_SEND_HEADER)
156 {
158 // starting to send the header
159 if (m_remaining == 0)
160 {
161 // cursor is set to the header, which has lenght = remaining
162 m_cursor = (char*)(&m_imgHeader);
163 m_remaining = sizeof(m_imgHeader);
164 }
165
166 size_t bytestobecopied = m_remaining;
167 if (bytestobecopied > bl)
168 {
170 memcpy(b.get(), m_cursor, bytestobecopied);
171 m_remaining -= bytestobecopied;
172 m_cursor += bytestobecopied;
173 return bytestobecopied;
174 }
175 else
176 {
177 memcpy(b.get(), m_cursor, bytestobecopied);
178 m_remaining = 0;
179 m_enum_phases = enum_phases::PHASE_3_SEND_IMAGE;
180 return bytestobecopied;
181 }
182 }
183 if (m_enum_phases == enum_phases::PHASE_3_SEND_IMAGE)
184 {
185 // starting to send the image
186 if (m_remaining == 0) {
187 // cursor is set to the image, which has lenght = remaining
188 m_cursor = (char*)(m_pointer_last_frame->getRawImage());
189 m_remaining = m_pointer_last_frame->getRawImageSize();
190 }
191
192 size_t bytestobecopied = m_remaining;
193 if (bytestobecopied > bl)
194 {
196 memcpy(b.get(), m_cursor, bytestobecopied);
197 m_remaining -= bytestobecopied;
198 m_cursor += bytestobecopied;
199 return bytestobecopied;
200 }
201 else
202 {
203 memcpy(b.get(), m_cursor, bytestobecopied);
204 m_remaining = 0;
205 m_enum_phases = enum_phases::PHASE_0_GET_IMG;
206 this->prepareNextFrame();
207 return bytestobecopied;
208 }
209 }
210
211 // unreachable code
212 yError("Logic bug 2");
213 yAssert(false);
214 return -1;
215}
216
218{
219}
220
222{
223}
224
228
const yarp::os::LogComponent & GSTREAMER_STREAM()
bool ret
#define yError(...)
Definition Log.h:361
#define yAssert(x)
Definition Log.h:388
yarp::sig::ImageOf< yarp::sig::PixelRgb > * getLastFrame()
bool init(std::string pipeline_string)
void reset() override
Reset the stream.
void interrupt() override
Interrupt the stream.
void beginPacket() override
Mark the beginning of a logical packet.
yarp::sig::ImageOf< yarp::sig::PixelRgb > * getFrame()
bool isOk() const override
Check if the stream is ok or in an error state.
virtual ~GstreamerStream()
virtual bool open(const std::string &pipeline_string, const yarp::os::Contact &remote)
virtual void closeMain()
void endPacket() override
Mark the end of a logical packet (see beginPacket).
void write(const yarp::os::Bytes &b) override
Write a block of bytes to the stream.
A mini-server for performing network communication in the background.
A simple abstraction for a block of bytes.
Definition Bytes.h:24
size_t length() const
Definition Bytes.cpp:22
const char * get() const
Definition Bytes.cpp:27
Represents how to reach a part of a YARP network.
Definition Contact.h:33
virtual int read()
Read and return a single byte.
static LogCallback printCallback()
Get current print callback.
Definition Log.cpp:873
static LogType minimumPrintLevel()
Get current minimum print level.
Definition Log.cpp:833
@ LogTypeReserved
Definition Log.h:98
static void delaySystem(double seconds)
void setFromImage(const Image &image)
Typed image class.
Definition Image.h:605
unsigned char * getRawImage() const
Access to the internal image buffer.
Definition Image.cpp:479
size_t getRawImageSize() const
Access to the internal buffer size information (this is how much memory has been allocated for the im...
Definition Image.cpp:488
void resize(size_t imgWidth, size_t imgHeight)
Reallocate an image to be of a desired size, throwing away its current contents.
Definition Image.cpp:402
#define yCDebug(component,...)
#define YARP_LOG_COMPONENT(name,...)
STL namespace.
::ssize_t ssize_t
Definition numeric.h:86
The components from which ports and connections are built.
An interface to the operating system, including Port based communication.