YARP
Yet Another Robot Platform
MjpegCarrier.cpp
Go to the documentation of this file.
1 /*
2  * SPDX-FileCopyrightText: 2006-2021 Istituto Italiano di Tecnologia (IIT)
3  * SPDX-FileCopyrightText: 2006-2010 RobotCub Consortium
4  * SPDX-License-Identifier: BSD-3-Clause
5  */
6 
7 #include "MjpegCarrier.h"
8 #include "MjpegLogComponent.h"
9 
10 #include <cstdio>
11 
12 /*
13  On Windows, libjpeg does some slightly odd-ball stuff, including
14  unconditionally defining INT32 to be "long". This needs to
15  be worked around. Work around begins...
16  */
17 #if defined(_WIN32)
18 #define INT32 long // jpeg's definition
19 #define QGLOBAL_H 1
20 #endif
21 
22 #ifdef _MSC_VER
23 #pragma warning (push)
24 #pragma warning (disable : 4091)
25 #endif
26 
27 extern "C" {
28 #include <jpeglib.h>
29 }
30 
31 #ifdef _MSC_VER
32 #pragma warning (pop)
33 #endif
34 
35 #if defined(_WIN32)
36 #undef INT32
37 #undef QGLOBAL_H
38 #endif
39 /*
40  work around ends.
41  */
42 
43 #include <yarp/sig/Image.h>
45 #include <yarp/os/Name.h>
46 #include <yarp/os/Bytes.h>
47 #include <yarp/os/Route.h>
48 
50 
51 #include <map>
52 
53 using namespace yarp::os;
54 using namespace yarp::sig;
55 using namespace yarp::wire_rep_utils;
56 
57 static const std::map<int, J_COLOR_SPACE> yarpCode2Mjpeg { {VOCAB_PIXEL_MONO, JCS_GRAYSCALE},
58  {VOCAB_PIXEL_MONO16, JCS_GRAYSCALE},
59  {VOCAB_PIXEL_RGB , JCS_RGB},
60  {VOCAB_PIXEL_RGBA , JCS_EXT_RGBA},
61  {VOCAB_PIXEL_BGRA , JCS_EXT_BGRA},
62  {VOCAB_PIXEL_BGR , JCS_EXT_BGR} };
63 
64 static const std::map<int, int> yarpCode2Channels { {VOCAB_PIXEL_MONO, 1},
65  {VOCAB_PIXEL_MONO16, 2},
66  {VOCAB_PIXEL_RGB , 3},
67  {VOCAB_PIXEL_RGBA , 4},
68  {VOCAB_PIXEL_BGRA , 4},
69  {VOCAB_PIXEL_BGR , 3} };
70 
71 
73 {
74  struct jpeg_destination_mgr pub;
75 
76  JOCTET *buffer;
77  int bufsize;
78  JOCTET cache[1000000]; // need to make this variable...
79 };
80 
82 
83 void send_net_data(JOCTET *data, int len, void *client) {
84  yCTrace(MJPEGCARRIER, "Send %d bytes", len);
85  auto* p = (ConnectionState *)client;
86  constexpr size_t hdr_size = 1000;
87  char hdr[hdr_size];
88  const char *brk = "\r\n";
89  std::snprintf(hdr, hdr_size, "Content-Type: image/jpeg%s\
90 Content-Length: %d%s%s", brk, len, brk, brk);
91  Bytes hbuf(hdr,strlen(hdr));
92  p->os().write(hbuf);
93  Bytes buf((char *)data,len);
94  /*
95  // add corruption now and then, for testing.
96  static int ct = 0;
97  ct++;
98  if (ct==50) {
99  yCTrace(MJPEGCARRIER, "Adding corruption");
100  buf.get()[0] = 'z';
101  ct = 0;
102  }
103  */
104  p->os().write(buf);
105  std::snprintf(hdr, hdr_size, "%s--boundarydonotcross%s", brk, brk);
106  Bytes hbuf2(hdr,strlen(hdr));
107  p->os().write(hbuf2);
108 
109 }
110 
111 static void init_net_destination(j_compress_ptr cinfo) {
112  yCTrace(MJPEGCARRIER, "Initializing destination");
113  auto dest = (net_destination_ptr)cinfo->dest;
114  dest->buffer = &(dest->cache[0]);
115  dest->bufsize = sizeof(dest->cache);
116  dest->pub.next_output_byte = dest->buffer;
117  dest->pub.free_in_buffer = dest->bufsize;
118 }
119 
120 static boolean empty_net_output_buffer(j_compress_ptr cinfo) {
121  auto dest = (net_destination_ptr)cinfo->dest;
122  yCWarning(MJPEGCARRIER, "Empty buffer - PROBLEM");
123  send_net_data(dest->buffer,dest->bufsize-dest->pub.free_in_buffer,
124  cinfo->client_data);
125  dest->pub.next_output_byte = dest->buffer;
126  dest->pub.free_in_buffer = dest->bufsize;
127  return TRUE;
128 }
129 
130 static void term_net_destination(j_compress_ptr cinfo) {
131  auto dest = (net_destination_ptr)cinfo->dest;
132  yCTrace(MJPEGCARRIER, "Terminating net %d %zd", dest->bufsize,dest->pub.free_in_buffer);
133  send_net_data(dest->buffer,dest->bufsize-dest->pub.free_in_buffer,
134  cinfo->client_data);
135 }
136 
137 void jpeg_net_dest(j_compress_ptr cinfo) {
138  net_destination_ptr dest;
139 
140  //ERREXIT(cinfo, JERR_BUFFER_SIZE);
141 
142  /* The destination object is made permanent so that multiple JPEG images
143  * can be written to the same buffer without re-executing jpeg_net_dest.
144  */
145  if (cinfo->dest == nullptr) { /* first time for this JPEG object? */
146  cinfo->dest = (struct jpeg_destination_mgr *)
147  (*cinfo->mem->alloc_large) ((j_common_ptr) cinfo, JPOOL_PERMANENT,
148  sizeof(net_destination_mgr));
149  }
150 
151  dest = (net_destination_ptr) cinfo->dest;
152  dest->pub.init_destination = init_net_destination;
153  dest->pub.empty_output_buffer = empty_net_output_buffer;
154  dest->pub.term_destination = term_net_destination;
155 }
156 
158  WireImage rep;
159  FlexImage *img = rep.checkForImage(writer);
160 
161  if (img == nullptr) {
162  return false;
163  }
164  int w = img->width();
165  int h = img->height();
166  int row_stride = img->getRowSize();
167  auto* data = (JOCTET*)img->getRawImage();
168 
169  JSAMPROW row_pointer[1];
170 
171  struct jpeg_compress_struct cinfo;
172  struct jpeg_error_mgr jerr;
173  cinfo.err = jpeg_std_error(&jerr);
174  cinfo.client_data = &proto;
175  jpeg_create_compress(&cinfo);
176  jpeg_net_dest(&cinfo);
177  cinfo.image_width = w;
178  cinfo.image_height = h;
179  cinfo.in_color_space = yarpCode2Mjpeg.at(img->getPixelCode());
180  cinfo.input_components = yarpCode2Channels.at(img->getPixelCode());
181  jpeg_set_defaults(&cinfo);
182  //jpeg_set_quality(&cinfo, 85, TRUE);
183  yCTrace(MJPEGCARRIER, "Starting to compress...");
184  jpeg_start_compress(&cinfo, TRUE);
185  if(!envelope.empty()) {
186  jpeg_write_marker(&cinfo, JPEG_COM, reinterpret_cast<const JOCTET*>(envelope.c_str()), envelope.length() + 1);
187  envelope.clear();
188  }
189  yCTrace(MJPEGCARRIER, "Done compressing (height %d)", cinfo.image_height);
190  while (cinfo.next_scanline < cinfo.image_height) {
191  yCTrace(MJPEGCARRIER, "Writing row %d...", cinfo.next_scanline);
192  row_pointer[0] = data + cinfo.next_scanline * row_stride;
193  jpeg_write_scanlines(&cinfo, row_pointer, 1);
194  }
195  jpeg_finish_compress(&cinfo);
196  jpeg_destroy_compress(&cinfo);
197 
198  return true;
199 }
200 
202  return false;
203 }
204 
205 
207  Name n(proto.getRoute().getCarrierName() + "://test");
208  std::string pathValue = n.getCarrierModifier("path");
209  std::string target = "GET /?action=stream\n\n";
210  if (pathValue!="") {
211  target = "GET /";
212  target += pathValue;
213  }
214  target += " HTTP/1.1\n";
215  Contact host = proto.getRoute().getToContact();
216  if (host.getHost()!="") {
217  target += "Host: ";
218  target += host.getHost();
219  target += "\r\n";
220  }
221  target += "\n";
222  Bytes b((char*)target.c_str(),target.length());
223  proto.os().write(b);
224  return true;
225 }
226 
228 #ifdef MJPEG_AUTOCOMPRESS
229  return true;
230 #else
231  return false;
232 #endif
233 }
static const std::map< int, int > yarpCode2Channels
static boolean empty_net_output_buffer(j_compress_ptr cinfo)
static const std::map< int, J_COLOR_SPACE > yarpCode2Mjpeg
void jpeg_net_dest(j_compress_ptr cinfo)
net_destination_mgr * net_destination_ptr
static void init_net_destination(j_compress_ptr cinfo)
void send_net_data(JOCTET *data, int len, void *client)
static void term_net_destination(j_compress_ptr cinfo)
const yarp::os::LogComponent & MJPEGCARRIER()
virtual bool autoCompression() const
bool reply(yarp::os::ConnectionState &proto, yarp::os::SizedWriter &writer) override
bool write(yarp::os::ConnectionState &proto, yarp::os::SizedWriter &writer) override
Write a message.
bool sendHeader(yarp::os::ConnectionState &proto) override
Write a header appropriate to the carrier to the connection, followed by any carrier-specific data.
A simple abstraction for a block of bytes.
Definition: Bytes.h:25
The basic state of a connection - route, streams in use, etc.
virtual const Route & getRoute() const =0
Get the route associated with this connection.
OutputStream & os()
Shorthand for getOutputStream()
Represents how to reach a part of a YARP network.
Definition: Contact.h:36
std::string getHost() const
Get the host name associated with this Contact for socket communication.
Definition: Contact.cpp:228
Simple abstraction for a YARP port name.
Definition: Name.h:19
std::string getCarrierModifier(const char *mod, bool *hasModifier=nullptr)
Definition: Name.cpp:44
virtual void write(char ch)
Write a single byte to the stream.
const std::string & getCarrierName() const
Get the carrier type of the route.
Definition: Route.cpp:123
const Contact & getToContact() const
Get the destination contact of the route, if available.
Definition: Route.cpp:113
Minimal requirements for an efficient Writer.
Definition: SizedWriter.h:33
Image class with user control of representation details.
Definition: Image.h:414
size_t width() const
Gets width of image in pixels.
Definition: Image.h:166
size_t getRowSize() const
Size of the underlying image buffer rows.
Definition: Image.h:192
unsigned char * getRawImage() const
Access to the internal image buffer.
Definition: Image.cpp:541
size_t height() const
Gets height of image in pixels.
Definition: Image.h:172
virtual int getPixelCode() const
Gets pixel type identifier.
Definition: Image.cpp:440
yarp::sig::FlexImage * checkForImage(yarp::os::SizedWriter &writer)
Definition: WireImage.cpp:15
#define yCTrace(component,...)
Definition: LogComponent.h:85
#define yCWarning(component,...)
Definition: LogComponent.h:143
@ VOCAB_PIXEL_RGBA
Definition: Image.h:48
@ VOCAB_PIXEL_MONO16
Definition: Image.h:46
@ VOCAB_PIXEL_BGRA
Definition: Image.h:49
@ VOCAB_PIXEL_BGR
Definition: Image.h:52
@ VOCAB_PIXEL_MONO
Definition: Image.h:45
@ VOCAB_PIXEL_RGB
Definition: Image.h:47
An interface to the operating system, including Port based communication.
Signal processing.
Definition: Image.h:22
struct jpeg_destination_mgr pub