YARP
Yet Another Robot Platform
H264Stream.cpp
Go to the documentation of this file.
1 /*
2  * SPDX-FileCopyrightText: 2006-2021 Istituto Italiano di Tecnologia (IIT)
3  * SPDX-License-Identifier: BSD-3-Clause
4  */
5 
6 #include "H264Stream.h"
7 #include "H264LogComponent.h"
8 
9 #include <yarp/os/LogStream.h>
10 #include <yarp/sig/Image.h>
12 
13 #include <cstdio>
14 #include <cstring>
15 
16 
17 //#define debug_time 1
18 
19 #ifdef debug_time
20  #include <yarp/os/Time.h>
21  #define DBG_TIME_PERIOD_PRINTS 10 //10 sec
22 #endif
23 
24 
25 using namespace yarp::os;
26 using namespace yarp::sig;
27 using namespace std;
28 
30  delegate(nullptr),
31  blobHeader{0,0,0},
32  phase(0),
33  cursor(nullptr),
34  remaining(0),
35  decoder(nullptr),
36  cfg(config)
37 {}
38 
40 {
41  delete decoder;
42  delete delegate;
43 }
44 
45 
46 
48 {
49  delegate = stream;
50  if(nullptr == delegate)
51  {
52  return false;
53  }
54  return true;
55 }
56 
58 {
59  decoder = new H264Decoder(this->cfg);
60  decoder->init();
61  decoder->start();
62 }
63 
65 {
66  return *this;
67 }
68 
70 {
71  return *this;
72 }
73 
74 //using yarp::os::OutputStream::write;
75 
76 
77 //using yarp::os::InputStream::read;
78 
79 bool H264Stream::setReadEnvelopeCallback(InputStream::readEnvelopeCallbackType callback, void* data)
80 {
81  return true;
82 }
83 
85 {
86 
87 #ifdef debug_time
88  static bool isFirst = true;
89  double start_time = Time::now();
90  double start_timeCopy;
91  double end_time=0;
92  static double last_call;
93  static double sumOf_timeBetweenCalls=0;
94 
95  static double sumOf_timeOnMutex = 0;
96  static double sumOf_timeOfCopyPerPahse[5] ={0};
97  static uint32_t count=0;
98  static uint32_t countPerPhase[5]={0};
99  #define MAX_COUNT 100
100 
101 
102  if(isFirst)
103  {
104  last_call = start_time;
105  isFirst = false;
106  }
107  else
108  {
109  sumOf_timeBetweenCalls+=(start_time -last_call);
110  last_call = start_time;
111  }
112 
113 
114 #endif
115 
116  if (remaining==0)
117  {
118  if (phase==1)
119  {
120  phase = 2;
121  cursor = (char*)(img.getRawImage());
122  remaining = img.getRawImageSize();
123  } else if (phase==3)
124  {
125  phase = 4;
126  cursor = nullptr;
127  remaining = blobHeader.blobLen;
128  } else
129  {
130  phase = 0;
131  }
132  }
133  while (phase==0)
134  {
135  decoder->mutex.lock();
136  int len = 0;
137  if(decoder->newFrameIsAvailable())
138  {
139  ImageOf<PixelRgb> & img_dec = decoder->getLastFrame();
140  img.copy(img_dec);
141  len = decoder->getLastFrameSize();
142  decoder->mutex.unlock();
143  #ifdef debug_time
144  end_time = Time::now();
145  sumOf_timeOnMutex +=(end_time - start_time);
146  count++;
147  if(count>=MAX_COUNT)
148  {
150  "STREAM On %d times: timeOnMutex is long %.6f sec",
151  MAX_COUNT, (sumOf_timeOnMutex/MAX_COUNT) );
152  for(int x=0; x<5; x++)
153  {
155  "STREAM: phase:%d, count=%u, time=%.6f sec",
156  x,
157  countPerPhase[x],
158  ((countPerPhase[x]==0) ? 0: sumOf_timeOfCopyPerPahse[x]/countPerPhase[x]) );
159  countPerPhase[x] = 0;
160  sumOf_timeOfCopyPerPahse[x] = 0;
161  }
162  yCDebug(H264CARRIER, "sleep=%.6f", sumOf_timeBetweenCalls/count);
164  count = 0;
165  isFirst = true;
166  sumOf_timeOnMutex = 0;
167  sumOf_timeBetweenCalls = 0;
168  }
169  #endif
170 
171  }
172  else
173  {
174  yCTrace(H264CARRIER, "h264Stream::read has been called but no frame is available!!");
175  phase = 0;
176  remaining = 0;
177  cursor = nullptr;
178  decoder->setReq();
179  decoder->mutex.unlock();
180  decoder->semaphore.waitWithTimeout(1);
181  return 0;
182  }
183 
184  yCTrace(H264CARRIER, "Length is \"%d\"", len);
185 
186  imgHeader.setFromImage(img);
187  phase = 1;
188  cursor = (char*)(&imgHeader);
189  remaining = sizeof(imgHeader);
190  }
191 
192  if (remaining>0)
193  {
194  size_t allow = remaining;
195  if (b.length()<allow)
196  {
197  allow = b.length();
198  }
199  if (cursor!=nullptr)
200  {
201  #ifdef debug_time
202  start_timeCopy = Time::now();
203  #endif
204  memcpy(b.get(),cursor,allow);
205  cursor+=allow;
206  remaining-=allow;
207  yCDebug(H264CARRIER, "returning %zd bytes", allow);
208  #ifdef debug_time
209  end_time = Time::now();
210  sumOf_timeOfCopyPerPahse[phase] +=(end_time - start_timeCopy);
211  countPerPhase[phase]++;
212  #endif
213  return allow;
214  } else
215  {
216  yarp::conf::ssize_t result = delegate->getInputStream().read(b);
217  yCTrace(H264CARRIER, "Read %zu bytes", result);
218  if (result>0)
219  {
220  remaining-=result;
221  yCTrace(H264CARRIER, "%zu bytes of meat", result);
222  return result;
223  }
224  }
225  }
226  return -1;
227 }
228 
229 
230 void H264Stream::write(const Bytes& b)
231 {
232  delegate->getOutputStream().write(b);
233 }
const yarp::os::LogComponent & H264CARRIER()
bool newFrameIsAvailable()
int getLastFrameSize()
yarp::os::Semaphore semaphore
Definition: H264Decoder.h:42
std::mutex mutex
Definition: H264Decoder.h:41
yarp::sig::ImageOf< yarp::sig::PixelRgb > & getLastFrame()
OutputStream & getOutputStream() override
Get an OutputStream to write to.
Definition: H264Stream.cpp:69
H264Stream(h264Decoder_cfgParamters &config)
Definition: H264Stream.cpp:29
void write(const yarp::os::Bytes &b) override
Write a block of bytes to the stream.
Definition: H264Stream.cpp:230
virtual ~H264Stream()
Definition: H264Stream.cpp:39
void start()
Definition: H264Stream.cpp:57
bool setReadEnvelopeCallback(InputStream::readEnvelopeCallbackType callback, void *data) override
Definition: H264Stream.cpp:79
InputStream & getInputStream() override
Get an InputStream to read from.
Definition: H264Stream.cpp:64
bool setStream(yarp::os::impl::DgramTwoWayStream *stream)
Definition: H264Stream.cpp:47
A simple abstraction for a block of bytes.
Definition: Bytes.h:25
size_t length() const
Definition: Bytes.cpp:22
const char * get() const
Definition: Bytes.cpp:27
Simple specification of the minimum functions needed from input streams.
Definition: InputStream.h:26
virtual int read()
Read and return a single byte.
Definition: InputStream.cpp:20
Simple specification of the minimum functions needed from output streams.
Definition: OutputStream.h:22
bool waitWithTimeout(double timeoutInSeconds)
Try to decrement the counter, even if we must wait - but don't wait forever.
Definition: Semaphore.cpp:101
A stream abstraction for datagram communication.
void setFromImage(const Image &image)
unsigned char * getRawImage() const
Access to the internal image buffer.
Definition: Image.cpp:541
bool copy(const Image &alt)
Copy operator.
Definition: Image.cpp:836
size_t getRawImageSize() const
Access to the internal buffer size information (this is how much memory has been allocated for the im...
Definition: Image.cpp:550
#define yCTrace(component,...)
Definition: LogComponent.h:85
#define yCDebug(component,...)
Definition: LogComponent.h:109
::ssize_t ssize_t
Definition: numeric.h:86
double now()
Return the current time in seconds, relative to an arbitrary starting point.
Definition: Time.cpp:121
An interface to the operating system, including Port based communication.
Signal processing.
Definition: Image.h:22