YARP
Yet Another Robot Platform
MjpegCarrier.cpp
Go to the documentation of this file.
1 /*
2  * Copyright (C) 2006-2021 Istituto Italiano di Tecnologia (IIT)
3  * Copyright (C) 2006-2010 RobotCub Consortium
4  * All rights reserved.
5  *
6  * This software may be modified and distributed under the terms of the
7  * BSD-3-Clause license. See the accompanying LICENSE file for details.
8  */
9 
10 #include "MjpegCarrier.h"
11 #include "MjpegLogComponent.h"
12 
13 #include <cstdio>
14 
15 /*
16  On Windows, libjpeg does some slightly odd-ball stuff, including
17  unconditionally defining INT32 to be "long". This needs to
18  be worked around. Work around begins...
19  */
20 #if defined(_WIN32)
21 #define INT32 long // jpeg's definition
22 #define QGLOBAL_H 1
23 #endif
24 
25 #ifdef _MSC_VER
26 #pragma warning (push)
27 #pragma warning (disable : 4091)
28 #endif
29 
30 extern "C" {
31 #include <jpeglib.h>
32 }
33 
34 #ifdef _MSC_VER
35 #pragma warning (pop)
36 #endif
37 
38 #if defined(_WIN32)
39 #undef INT32
40 #undef QGLOBAL_H
41 #endif
42 /*
43  work around ends.
44  */
45 
46 #include <yarp/sig/Image.h>
48 #include <yarp/os/Name.h>
49 #include <yarp/os/Bytes.h>
50 #include <yarp/os/Route.h>
51 
53 
54 #include <map>
55 
56 using namespace yarp::os;
57 using namespace yarp::sig;
58 using namespace yarp::wire_rep_utils;
59 
60 static const std::map<int, J_COLOR_SPACE> yarpCode2Mjpeg { {VOCAB_PIXEL_MONO, JCS_GRAYSCALE},
61  {VOCAB_PIXEL_MONO16, JCS_GRAYSCALE},
62  {VOCAB_PIXEL_RGB , JCS_RGB},
63  {VOCAB_PIXEL_RGBA , JCS_EXT_RGBA},
64  {VOCAB_PIXEL_BGRA , JCS_EXT_BGRA},
65  {VOCAB_PIXEL_BGR , JCS_EXT_BGR} };
66 
67 static const std::map<int, int> yarpCode2Channels { {VOCAB_PIXEL_MONO, 1},
68  {VOCAB_PIXEL_MONO16, 2},
69  {VOCAB_PIXEL_RGB , 3},
70  {VOCAB_PIXEL_RGBA , 4},
71  {VOCAB_PIXEL_BGRA , 4},
72  {VOCAB_PIXEL_BGR , 3} };
73 
74 
76 {
77  struct jpeg_destination_mgr pub;
78 
79  JOCTET *buffer;
80  int bufsize;
81  JOCTET cache[1000000]; // need to make this variable...
82 };
83 
85 
86 void send_net_data(JOCTET *data, int len, void *client) {
87  yCTrace(MJPEGCARRIER, "Send %d bytes", len);
88  auto* p = (ConnectionState *)client;
89  constexpr size_t hdr_size = 1000;
90  char hdr[hdr_size];
91  const char *brk = "\r\n";
92  std::snprintf(hdr, hdr_size, "Content-Type: image/jpeg%s\
93 Content-Length: %d%s%s", brk, len, brk, brk);
94  Bytes hbuf(hdr,strlen(hdr));
95  p->os().write(hbuf);
96  Bytes buf((char *)data,len);
97  /*
98  // add corruption now and then, for testing.
99  static int ct = 0;
100  ct++;
101  if (ct==50) {
102  yCTrace(MJPEGCARRIER, "Adding corruption");
103  buf.get()[0] = 'z';
104  ct = 0;
105  }
106  */
107  p->os().write(buf);
108  std::snprintf(hdr, hdr_size, "%s--boundarydonotcross%s", brk, brk);
109  Bytes hbuf2(hdr,strlen(hdr));
110  p->os().write(hbuf2);
111 
112 }
113 
114 static void init_net_destination(j_compress_ptr cinfo) {
115  yCTrace(MJPEGCARRIER, "Initializing destination");
116  auto dest = (net_destination_ptr)cinfo->dest;
117  dest->buffer = &(dest->cache[0]);
118  dest->bufsize = sizeof(dest->cache);
119  dest->pub.next_output_byte = dest->buffer;
120  dest->pub.free_in_buffer = dest->bufsize;
121 }
122 
123 static boolean empty_net_output_buffer(j_compress_ptr cinfo) {
124  auto dest = (net_destination_ptr)cinfo->dest;
125  yCWarning(MJPEGCARRIER, "Empty buffer - PROBLEM");
126  send_net_data(dest->buffer,dest->bufsize-dest->pub.free_in_buffer,
127  cinfo->client_data);
128  dest->pub.next_output_byte = dest->buffer;
129  dest->pub.free_in_buffer = dest->bufsize;
130  return TRUE;
131 }
132 
133 static void term_net_destination(j_compress_ptr cinfo) {
134  auto dest = (net_destination_ptr)cinfo->dest;
135  yCTrace(MJPEGCARRIER, "Terminating net %d %zd", dest->bufsize,dest->pub.free_in_buffer);
136  send_net_data(dest->buffer,dest->bufsize-dest->pub.free_in_buffer,
137  cinfo->client_data);
138 }
139 
140 void jpeg_net_dest(j_compress_ptr cinfo) {
141  net_destination_ptr dest;
142 
143  //ERREXIT(cinfo, JERR_BUFFER_SIZE);
144 
145  /* The destination object is made permanent so that multiple JPEG images
146  * can be written to the same buffer without re-executing jpeg_net_dest.
147  */
148  if (cinfo->dest == nullptr) { /* first time for this JPEG object? */
149  cinfo->dest = (struct jpeg_destination_mgr *)
150  (*cinfo->mem->alloc_large) ((j_common_ptr) cinfo, JPOOL_PERMANENT,
151  sizeof(net_destination_mgr));
152  }
153 
154  dest = (net_destination_ptr) cinfo->dest;
155  dest->pub.init_destination = init_net_destination;
156  dest->pub.empty_output_buffer = empty_net_output_buffer;
157  dest->pub.term_destination = term_net_destination;
158 }
159 
161  WireImage rep;
162  FlexImage *img = rep.checkForImage(writer);
163 
164  if (img==nullptr) return false;
165  int w = img->width();
166  int h = img->height();
167  int row_stride = img->getRowSize();
168  auto* data = (JOCTET*)img->getRawImage();
169 
170  JSAMPROW row_pointer[1];
171 
172  struct jpeg_compress_struct cinfo;
173  struct jpeg_error_mgr jerr;
174  cinfo.err = jpeg_std_error(&jerr);
175  cinfo.client_data = &proto;
176  jpeg_create_compress(&cinfo);
177  jpeg_net_dest(&cinfo);
178  cinfo.image_width = w;
179  cinfo.image_height = h;
180  cinfo.in_color_space = yarpCode2Mjpeg.at(img->getPixelCode());
181  cinfo.input_components = yarpCode2Channels.at(img->getPixelCode());
182  jpeg_set_defaults(&cinfo);
183  //jpeg_set_quality(&cinfo, 85, TRUE);
184  yCTrace(MJPEGCARRIER, "Starting to compress...");
185  jpeg_start_compress(&cinfo, TRUE);
186  if(!envelope.empty()) {
187  jpeg_write_marker(&cinfo, JPEG_COM, reinterpret_cast<const JOCTET*>(envelope.c_str()), envelope.length() + 1);
188  envelope.clear();
189  }
190  yCTrace(MJPEGCARRIER, "Done compressing (height %d)", cinfo.image_height);
191  while (cinfo.next_scanline < cinfo.image_height) {
192  yCTrace(MJPEGCARRIER, "Writing row %d...", cinfo.next_scanline);
193  row_pointer[0] = data + cinfo.next_scanline * row_stride;
194  jpeg_write_scanlines(&cinfo, row_pointer, 1);
195  }
196  jpeg_finish_compress(&cinfo);
197  jpeg_destroy_compress(&cinfo);
198 
199  return true;
200 }
201 
203  return false;
204 }
205 
206 
208  Name n(proto.getRoute().getCarrierName() + "://test");
209  std::string pathValue = n.getCarrierModifier("path");
210  std::string target = "GET /?action=stream\n\n";
211  if (pathValue!="") {
212  target = "GET /";
213  target += pathValue;
214  }
215  target += " HTTP/1.1\n";
216  Contact host = proto.getRoute().getToContact();
217  if (host.getHost()!="") {
218  target += "Host: ";
219  target += host.getHost();
220  target += "\r\n";
221  }
222  target += "\n";
223  Bytes b((char*)target.c_str(),target.length());
224  proto.os().write(b);
225  return true;
226 }
227 
229 #ifdef MJPEG_AUTOCOMPRESS
230  return true;
231 #else
232  return false;
233 #endif
234 }
static const std::map< int, int > yarpCode2Channels
static boolean empty_net_output_buffer(j_compress_ptr cinfo)
static const std::map< int, J_COLOR_SPACE > yarpCode2Mjpeg
void jpeg_net_dest(j_compress_ptr cinfo)
net_destination_mgr * net_destination_ptr
static void init_net_destination(j_compress_ptr cinfo)
void send_net_data(JOCTET *data, int len, void *client)
static void term_net_destination(j_compress_ptr cinfo)
const yarp::os::LogComponent & MJPEGCARRIER()
virtual bool autoCompression() const
bool reply(yarp::os::ConnectionState &proto, yarp::os::SizedWriter &writer) override
bool write(yarp::os::ConnectionState &proto, yarp::os::SizedWriter &writer) override
Write a message.
bool sendHeader(yarp::os::ConnectionState &proto) override
Write a header appropriate to the carrier to the connection, followed by any carrier-specific data.
A simple abstraction for a block of bytes.
Definition: Bytes.h:28
The basic state of a connection - route, streams in use, etc.
virtual const Route & getRoute() const =0
Get the route associated with this connection.
OutputStream & os()
Shorthand for getOutputStream()
Represents how to reach a part of a YARP network.
Definition: Contact.h:39
std::string getHost() const
Get the host name associated with this Contact for socket communication.
Definition: Contact.cpp:231
Simple abstraction for a YARP port name.
Definition: Name.h:22
std::string getCarrierModifier(const char *mod, bool *hasModifier=nullptr)
Definition: Name.cpp:47
virtual void write(char ch)
Write a single byte to the stream.
const std::string & getCarrierName() const
Get the carrier type of the route.
Definition: Route.cpp:126
const Contact & getToContact() const
Get the destination contact of the route, if available.
Definition: Route.cpp:116
Minimal requirements for an efficient Writer.
Definition: SizedWriter.h:36
Image class with user control of representation details.
Definition: Image.h:403
size_t width() const
Gets width of image in pixels.
Definition: Image.h:153
size_t getRowSize() const
Size of the underlying image buffer rows.
Definition: Image.h:179
unsigned char * getRawImage() const
Access to the internal image buffer.
Definition: Image.cpp:534
size_t height() const
Gets height of image in pixels.
Definition: Image.h:159
virtual int getPixelCode() const
Gets pixel type identifier.
Definition: Image.cpp:454
yarp::sig::FlexImage * checkForImage(yarp::os::SizedWriter &writer)
Definition: WireImage.cpp:18
#define yCTrace(component,...)
Definition: LogComponent.h:88
#define yCWarning(component,...)
Definition: LogComponent.h:146
@ VOCAB_PIXEL_RGBA
Definition: Image.h:51
@ VOCAB_PIXEL_MONO16
Definition: Image.h:49
@ VOCAB_PIXEL_BGRA
Definition: Image.h:52
@ VOCAB_PIXEL_BGR
Definition: Image.h:55
@ VOCAB_PIXEL_MONO
Definition: Image.h:48
@ VOCAB_PIXEL_RGB
Definition: Image.h:50
An interface to the operating system, including Port based communication.
Signal processing.
Definition: Image.h:25
struct jpeg_destination_mgr pub