YARP
Yet Another Robot Platform
H264Stream.cpp
Go to the documentation of this file.
1 /*
2  * SPDX-FileCopyrightText: 2006-2021 Istituto Italiano di Tecnologia (IIT)
3  * SPDX-License-Identifier: BSD-3-Clause
4  */
5 
6 #include "H264Stream.h"
7 #include "H264LogComponent.h"
8 
9 #include <yarp/os/LogStream.h>
10 #include <yarp/sig/Image.h>
12 
13 #include <cstdio>
14 #include <cstring>
15 
16 
17 //#define debug_time 1
18 
19 #ifdef debug_time
20  #include <yarp/os/Time.h>
21  #define DBG_TIME_PERIOD_PRINTS 10 //10 sec
22 #endif
23 
24 
25 using namespace yarp::os;
26 using namespace yarp::sig;
27 
29  delegate(nullptr),
30  blobHeader{0,0,0},
31  phase(0),
32  cursor(nullptr),
33  remaining(0),
34  decoder(nullptr),
35  cfg(config)
36 {}
37 
39 {
40  delete decoder;
41  delete delegate;
42 }
43 
44 
45 
47 {
48  delegate = stream;
49  if(nullptr == delegate)
50  {
51  return false;
52  }
53  return true;
54 }
55 
57 {
58  decoder = new H264Decoder(this->cfg);
59  decoder->init();
60  decoder->start();
61 }
62 
64 {
65  return *this;
66 }
67 
69 {
70  return *this;
71 }
72 
73 //using yarp::os::OutputStream::write;
74 
75 
76 //using yarp::os::InputStream::read;
77 
78 bool H264Stream::setReadEnvelopeCallback(InputStream::readEnvelopeCallbackType callback, void* data)
79 {
80  return true;
81 }
82 
84 {
85 
86 #ifdef debug_time
87  static bool isFirst = true;
88  double start_time = Time::now();
89  double start_timeCopy;
90  double end_time=0;
91  static double last_call;
92  static double sumOf_timeBetweenCalls=0;
93 
94  static double sumOf_timeOnMutex = 0;
95  static double sumOf_timeOfCopyPerPahse[5] ={0};
96  static uint32_t count=0;
97  static uint32_t countPerPhase[5]={0};
98  #define MAX_COUNT 100
99 
100 
101  if(isFirst)
102  {
103  last_call = start_time;
104  isFirst = false;
105  }
106  else
107  {
108  sumOf_timeBetweenCalls+=(start_time -last_call);
109  last_call = start_time;
110  }
111 
112 
113 #endif
114 
115  if (remaining==0)
116  {
117  if (phase==1)
118  {
119  phase = 2;
120  cursor = (char*)(img.getRawImage());
121  remaining = img.getRawImageSize();
122  } else if (phase==3)
123  {
124  phase = 4;
125  cursor = nullptr;
126  remaining = blobHeader.blobLen;
127  } else
128  {
129  phase = 0;
130  }
131  }
132  while (phase==0)
133  {
134  decoder->mutex.lock();
135  int len = 0;
136  if(decoder->newFrameIsAvailable())
137  {
138  ImageOf<PixelRgb> & img_dec = decoder->getLastFrame();
139  img.copy(img_dec);
140  len = decoder->getLastFrameSize();
141  decoder->mutex.unlock();
142  #ifdef debug_time
143  end_time = Time::now();
144  sumOf_timeOnMutex +=(end_time - start_time);
145  count++;
146  if(count>=MAX_COUNT)
147  {
149  "STREAM On %d times: timeOnMutex is long %.6f sec",
150  MAX_COUNT, (sumOf_timeOnMutex/MAX_COUNT) );
151  for(int x=0; x<5; x++)
152  {
154  "STREAM: phase:%d, count=%u, time=%.6f sec",
155  x,
156  countPerPhase[x],
157  ((countPerPhase[x]==0) ? 0: sumOf_timeOfCopyPerPahse[x]/countPerPhase[x]) );
158  countPerPhase[x] = 0;
159  sumOf_timeOfCopyPerPahse[x] = 0;
160  }
161  yCDebug(H264CARRIER, "sleep=%.6f", sumOf_timeBetweenCalls/count);
163  count = 0;
164  isFirst = true;
165  sumOf_timeOnMutex = 0;
166  sumOf_timeBetweenCalls = 0;
167  }
168  #endif
169 
170  }
171  else
172  {
173  yCTrace(H264CARRIER, "h264Stream::read has been called but no frame is available!!");
174  phase = 0;
175  remaining = 0;
176  cursor = nullptr;
177  decoder->setReq();
178  decoder->mutex.unlock();
179  decoder->semaphore.waitWithTimeout(1);
180  return 0;
181  }
182 
183  yCTrace(H264CARRIER, "Length is \"%d\"", len);
184 
185  imgHeader.setFromImage(img);
186  phase = 1;
187  cursor = (char*)(&imgHeader);
188  remaining = sizeof(imgHeader);
189  }
190 
191  if (remaining>0)
192  {
193  size_t allow = remaining;
194  if (b.length()<allow)
195  {
196  allow = b.length();
197  }
198  if (cursor!=nullptr)
199  {
200  #ifdef debug_time
201  start_timeCopy = Time::now();
202  #endif
203  memcpy(b.get(),cursor,allow);
204  cursor+=allow;
205  remaining-=allow;
206  yCDebug(H264CARRIER, "returning %zd bytes", allow);
207  #ifdef debug_time
208  end_time = Time::now();
209  sumOf_timeOfCopyPerPahse[phase] +=(end_time - start_timeCopy);
210  countPerPhase[phase]++;
211  #endif
212  return allow;
213  } else
214  {
215  yarp::conf::ssize_t result = delegate->getInputStream().read(b);
216  yCTrace(H264CARRIER, "Read %zu bytes", result);
217  if (result>0)
218  {
219  remaining-=result;
220  yCTrace(H264CARRIER, "%zu bytes of meat", result);
221  return result;
222  }
223  }
224  }
225  return -1;
226 }
227 
228 
229 void H264Stream::write(const Bytes& b)
230 {
231  delegate->getOutputStream().write(b);
232 }
const yarp::os::LogComponent & H264CARRIER()
bool newFrameIsAvailable()
int getLastFrameSize()
yarp::os::Semaphore semaphore
Definition: H264Decoder.h:42
std::mutex mutex
Definition: H264Decoder.h:41
yarp::sig::ImageOf< yarp::sig::PixelRgb > & getLastFrame()
OutputStream & getOutputStream() override
Get an OutputStream to write to.
Definition: H264Stream.cpp:68
H264Stream(h264Decoder_cfgParamters &config)
Definition: H264Stream.cpp:28
void write(const yarp::os::Bytes &b) override
Write a block of bytes to the stream.
Definition: H264Stream.cpp:229
virtual ~H264Stream()
Definition: H264Stream.cpp:38
void start()
Definition: H264Stream.cpp:56
bool setReadEnvelopeCallback(InputStream::readEnvelopeCallbackType callback, void *data) override
Definition: H264Stream.cpp:78
InputStream & getInputStream() override
Get an InputStream to read from.
Definition: H264Stream.cpp:63
bool setStream(yarp::os::impl::DgramTwoWayStream *stream)
Definition: H264Stream.cpp:46
A simple abstraction for a block of bytes.
Definition: Bytes.h:25
size_t length() const
Definition: Bytes.cpp:22
const char * get() const
Definition: Bytes.cpp:27
Simple specification of the minimum functions needed from input streams.
Definition: InputStream.h:26
virtual int read()
Read and return a single byte.
Definition: InputStream.cpp:20
Simple specification of the minimum functions needed from output streams.
Definition: OutputStream.h:22
bool waitWithTimeout(double timeoutInSeconds)
Try to decrement the counter, even if we must wait - but don't wait forever.
Definition: Semaphore.cpp:101
A stream abstraction for datagram communication.
void setFromImage(const Image &image)
unsigned char * getRawImage() const
Access to the internal image buffer.
Definition: Image.cpp:541
bool copy(const Image &alt)
Copy operator.
Definition: Image.cpp:836
size_t getRawImageSize() const
Access to the internal buffer size information (this is how much memory has been allocated for the im...
Definition: Image.cpp:550
#define yCTrace(component,...)
Definition: LogComponent.h:85
#define yCDebug(component,...)
Definition: LogComponent.h:109
::ssize_t ssize_t
Definition: numeric.h:86
double now()
Return the current time in seconds, relative to an arbitrary starting point.
Definition: Time.cpp:121
An interface to the operating system, including Port based communication.
Signal processing.
Definition: Image.h:22