YARP
Yet Another Robot Platform
DgramTwoWayStream.cpp
Go to the documentation of this file.
1 /*
2  * SPDX-FileCopyrightText: 2006-2021 Istituto Italiano di Tecnologia (IIT)
3  * SPDX-FileCopyrightText: 2006-2010 RobotCub Consortium
4  * SPDX-License-Identifier: BSD-3-Clause
5  */
6 
8 
9 #include <yarp/conf/system.h>
10 #include <yarp/conf/environment.h>
11 
12 #include <yarp/os/NetType.h>
13 #include <yarp/os/Time.h>
15 
16 #if defined(YARP_HAS_ACE)
17 # include <ace/ACE.h>
18 # include <ace/Handle_Set.h>
19 # include <ace/INET_Addr.h>
20 # include <ace/Log_Msg.h>
21 # include <ace/OS_Memory.h>
22 # include <ace/OS_NS_sys_select.h>
23 # include <ace/SOCK_Dgram.h>
24 # include <ace/SOCK_Dgram_Mcast.h>
25 # include <ace/os_include/net/os_if.h>
26 // In one the ACE headers there is a definition of "main" for WIN32
27 # ifdef main
28 # undef main
29 # endif
30 #else
31 # include <arpa/inet.h>
32 # include <netinet/in.h>
33 # include <sys/socket.h>
34 # include <sys/types.h>
35 # include <unistd.h>
36 #endif
37 
38 #include <cerrno>
39 #include <cstring>
40 
41 using namespace yarp::os::impl;
42 using namespace yarp::os;
43 
44 #define CRC_SIZE 8
45 #define UDP_MAX_DATAGRAM_SIZE (65507 - CRC_SIZE)
46 
47 
48 namespace {
49 YARP_OS_LOG_COMPONENT(DGRAMTWOWAYSTREAM, "yarp.os.impl.DgramTwoWayStream")
50 } // namespace
51 
52 
53 static bool checkCrc(char* buf, yarp::conf::ssize_t length, yarp::conf::ssize_t crcLength, int pct, int* store_altPct = nullptr)
54 {
55  auto alt = (NetInt32)NetType::getCrc(buf + crcLength,
56  (length > crcLength) ? (length - crcLength) : 0);
57  Bytes b(buf, 4);
58  Bytes b2(buf + 4, 4);
59  NetInt32 curr = NetType::netInt(b);
60  int altPct = NetType::netInt(b2);
61  bool ok = (alt == curr && pct == altPct);
62  if (!ok) {
63  if (alt != curr) {
64  yCDebug(DGRAMTWOWAYSTREAM, "crc mismatch");
65  }
66  if (pct != altPct) {
67  yCDebug(DGRAMTWOWAYSTREAM, "packet code broken");
68  }
69  }
70  if (store_altPct != nullptr) {
71  *store_altPct = altPct;
72  }
73 
74  return ok;
75 }
76 
77 
78 static void addCrc(char* buf, yarp::conf::ssize_t length, yarp::conf::ssize_t crcLength, int pct)
79 {
80  auto alt = (NetInt32)NetType::getCrc(buf + crcLength,
81  (length > crcLength) ? (length - crcLength) : 0);
82  Bytes b(buf, 4);
83  Bytes b2(buf + 4, 4);
84  NetType::netInt((NetInt32)alt, b);
85  NetType::netInt((NetInt32)pct, b2);
86 }
87 
88 
89 bool DgramTwoWayStream::open(const Contact& remote)
90 {
91 #if defined(YARP_HAS_ACE)
92  ACE_INET_Addr anywhere((u_short)0, (ACE_UINT32)INADDR_ANY);
93  Contact local(anywhere.get_host_addr(),
94  anywhere.get_port_number());
95 #else
96  Contact local("localhost", -1);
97 #endif
98  return open(local, remote);
99 }
100 
101 bool DgramTwoWayStream::open(const Contact& local, const Contact& remote)
102 {
103  localAddress = local;
104  remoteAddress = remote;
105 
106 #if defined(YARP_HAS_ACE)
107  localHandle = ACE_INET_Addr((u_short)(localAddress.getPort()), (ACE_UINT32)INADDR_ANY);
108  if (remote.isValid()) {
109  remoteHandle.set(remoteAddress.getPort(), remoteAddress.getHost().c_str());
110  }
111  dgram = new ACE_SOCK_Dgram;
112  yCAssert(DGRAMTWOWAYSTREAM, dgram != nullptr);
113 
114  int result = dgram->open(localHandle,
115  ACE_PROTOCOL_FAMILY_INET,
116  0,
117  1);
118 #else
119  dgram = nullptr;
120  dgram_sockfd = -1;
121 
122  int s = -1;
123  if ((s = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP)) == -1) {
124  std::exit(1);
125  }
126  struct sockaddr_in dgram_sin;
127  memset((char*)&dgram_sin, 0, sizeof(dgram_sin));
128  dgram_sin.sin_family = AF_INET;
129  dgram_sin.sin_addr.s_addr = htonl(INADDR_ANY);
130  dgram_sin.sin_port = htons(remote.getPort());
131  if (local.isValid()) {
132  if (inet_pton(AF_INET, remote.getHost().c_str(), &dgram_sin.sin_addr) == 0) {
133  yCError(DGRAMTWOWAYSTREAM, "could not set up udp client");
134  std::exit(1);
135  }
136  if (connect(s, (struct sockaddr*)&dgram_sin, sizeof(dgram_sin)) == -1) {
137  yCError(DGRAMTWOWAYSTREAM, "could not connect udp client");
138  std::exit(1);
139  }
140  } else {
141  if (bind(s, (struct sockaddr*)&dgram_sin, sizeof(dgram_sin)) == -1) {
142  yCError(DGRAMTWOWAYSTREAM, "could not create udp server");
143  std::exit(1);
144  }
145  }
146  dgram_sockfd = s;
147  dgram = this;
148  int result = -1;
149  int local_port = -1;
150 
151  struct sockaddr_in sin;
152  socklen_t len = sizeof(sin);
153  if (getsockname(dgram_sockfd, (struct sockaddr*)&sin, &len) == 0 && sin.sin_family == AF_INET) {
154  result = 0;
155  local_port = ntohs(sin.sin_port);
156  }
157 #endif
158 
159  if (result != 0) {
160  yCError(DGRAMTWOWAYSTREAM, "could not open datagram socket");
161  return false;
162  }
163 
164  configureSystemBuffers();
165 
166 #if defined(YARP_HAS_ACE)
167  dgram->get_local_addr(localHandle);
168  yCDebug(DGRAMTWOWAYSTREAM, "starting DGRAM entity on port number %u",localHandle.get_port_number());
169  localAddress = Contact("127.0.0.1", localHandle.get_port_number());
170 #else
171  localAddress = Contact("127.0.0.1", local_port);
172 #endif
173 
174  yCDebug(DGRAMTWOWAYSTREAM, "Update: DGRAM from %s to %s", localAddress.toURI().c_str(), remoteAddress.toURI().c_str());
175 
176  allocate();
177 
178  return true;
179 }
180 
181 void DgramTwoWayStream::allocate(int readSize, int writeSize)
182 {
183  //These are only as another default. We should modify the method to return bool
184  //and fail if we cannot read the socket size.
185 
186  int _read_size = -1;
187  int _write_size = -1;
188 
189  std::string _env_dgram = yarp::conf::environment::get_string("YARP_DGRAM_SIZE");
190  std::string _env_mode;
191  if (multiMode) {
192  _env_mode = yarp::conf::environment::get_string("YARP_MCAST_SIZE");
193  } else {
194  _env_mode = yarp::conf::environment::get_string("YARP_UDP_SIZE");
195  }
196  if (!_env_mode.empty()) {
197  _env_dgram = _env_mode;
198  }
199  if (!_env_dgram.empty()) {
200  int sz = yarp::conf::numeric::from_string<int>(_env_dgram);
201  if (sz != 0) {
202  _read_size = _write_size = sz;
203  }
204  yCInfo(DGRAMTWOWAYSTREAM, "Datagram packet size set to %d", _read_size);
205  }
206  if (readSize != 0) {
207  _read_size = readSize;
208  yCInfo(DGRAMTWOWAYSTREAM, "Datagram read size reset to %d", _read_size);
209  }
210  if (writeSize != 0) {
211  _write_size = writeSize;
212  yCInfo(DGRAMTWOWAYSTREAM, "Datagram write size reset to %d", _write_size);
213  }
214 
215  // force the size of the write buffer to be under the max size of a udp datagram.
216  if (_write_size > UDP_MAX_DATAGRAM_SIZE || _write_size < 0) {
217  _write_size = UDP_MAX_DATAGRAM_SIZE;
218  }
219 
220  if (_read_size < 0) {
221 #if defined(YARP_HAS_ACE)
222  //Defaults to socket size
223  if (dgram != nullptr) {
224  int len = sizeof(_read_size);
225  int result = dgram->get_option(SOL_SOCKET, SO_RCVBUF, &_read_size, &len);
226  if (result < 0) {
227  yCError(DGRAMTWOWAYSTREAM, "Failed to read buffer size from RCVBUF socket with error: %s. Setting read buffer size to UDP_MAX_DATAGRAM_SIZE.",
228  strerror(errno));
229  _read_size = UDP_MAX_DATAGRAM_SIZE;
230  }
231  }
232 #else
233  socklen_t len = sizeof(_read_size);
234 
235  int result = getsockopt(dgram_sockfd, SOL_SOCKET, SO_RCVBUF, &_read_size, &len);
236  if (result < 0) {
237  yCError(DGRAMTWOWAYSTREAM, "Failed to read buffer size from RCVBUF socket with error: %s. Setting read buffer size to UDP_MAX_DATAGRAM_SIZE.",
238  strerror(errno));
239  _read_size = UDP_MAX_DATAGRAM_SIZE;
240  }
241 #endif
242  }
243 
244  readBuffer.allocate(_read_size);
245  writeBuffer.allocate(_write_size);
246  readAt = 0;
247  readAvail = 0;
248  writeAvail = CRC_SIZE;
249  //happy = true;
250  pct = 0;
251 }
252 
253 
254 void DgramTwoWayStream::configureSystemBuffers()
255 {
256  //By default the buffers are forced to the datagram size limit.
257  //These can be overwritten by environment variables
258  //Generic variable
259  std::string socketBufferSize = yarp::conf::environment::get_string("YARP_DGRAM_BUFFER_SIZE");
260  //Specific read
261  std::string socketReadBufferSize = yarp::conf::environment::get_string("YARP_DGRAM_RECV_BUFFER_SIZE");
262  //Specific write
263  std::string socketSendBufferSize = yarp::conf::environment::get_string("YARP_DGRAM_SND_BUFFER_SIZE");
264 
265  int readBufferSize = -1;
266  if (!socketReadBufferSize.empty()) {
267  readBufferSize = yarp::conf::numeric::from_string<int>(socketReadBufferSize);
268  } else if (!socketBufferSize.empty()) {
269  readBufferSize = yarp::conf::numeric::from_string<int>(socketBufferSize);
270  }
271 
272  int writeBufferSize = -1;
273  if (!socketSendBufferSize.empty()) {
274  writeBufferSize = yarp::conf::numeric::from_string<int>(socketSendBufferSize);
275  } else if (!socketBufferSize.empty()) {
276  writeBufferSize = yarp::conf::numeric::from_string<int>(socketBufferSize);
277  }
278  // The writeBufferSize can't be set greater than udp datagram
279  // maximum size
280  if (writeBufferSize < 0 || writeBufferSize > UDP_MAX_DATAGRAM_SIZE) {
281  if (writeBufferSize > UDP_MAX_DATAGRAM_SIZE) {
282  yCWarning(DGRAMTWOWAYSTREAM, "The desired SND buffer size is too big. It is set to the max datagram size : %d",
284  }
285  writeBufferSize = UDP_MAX_DATAGRAM_SIZE;
286  }
287 
288  if (readBufferSize > 0) {
289  int actualReadSize = -1;
290 
291 #if defined(YARP_HAS_ACE)
292  int intSize = sizeof(readBufferSize);
293  int setResult = dgram->set_option(SOL_SOCKET, SO_RCVBUF, (void*)&readBufferSize, intSize);
294 
295  int getResult = dgram->get_option(SOL_SOCKET, SO_RCVBUF, (void*)&actualReadSize, &intSize);
296 #else
297  socklen_t intSize = sizeof(readBufferSize);
298  int setResult = setsockopt(dgram_sockfd, SOL_SOCKET, SO_RCVBUF, (void*)&readBufferSize, intSize);
299  int getResult = getsockopt(dgram_sockfd, SOL_SOCKET, SO_RCVBUF, (void*)&actualReadSize, &intSize);
300 #endif
301  // in linux the value returned by getsockopt is "doubled"
302  // for some unknown reasons (see https://linux.die.net/man/7/socket)
303 #if defined(__linux__)
304  actualReadSize /= 2;
305 #endif
306  if (setResult < 0 || getResult < 0 || readBufferSize != actualReadSize) {
307  bufferAlertNeeded = true;
308  bufferAlerted = false;
309  yCWarning(DGRAMTWOWAYSTREAM, "Failed to set RECV socket buffer to desired size. Actual: %d, Desired %d",
310  actualReadSize,
311  readBufferSize);
312  }
313  }
314  if (writeBufferSize > 0) {
315  int actualWriteSize = -1;
316 #if defined(YARP_HAS_ACE)
317  int intSize = sizeof(writeBufferSize);
318  int setResult = dgram->set_option(SOL_SOCKET, SO_SNDBUF, (void*)&writeBufferSize, intSize);
319  int getResult = dgram->get_option(SOL_SOCKET, SO_SNDBUF, (void*)&actualWriteSize, &intSize);
320 #else
321  socklen_t intSize = sizeof(writeBufferSize);
322  int setResult = setsockopt(dgram_sockfd, SOL_SOCKET, SO_SNDBUF, (void*)&writeBufferSize, intSize);
323  int getResult = getsockopt(dgram_sockfd, SOL_SOCKET, SO_SNDBUF, (void*)&actualWriteSize, &intSize);
324 #endif
325  // in linux the value returned by getsockopt is "doubled"
326  // for some unknown reasons (see https://linux.die.net/man/7/socket)
327 #if defined(__linux__)
328  actualWriteSize /= 2;
329 #endif
330  if (setResult < 0 || getResult < 0 || writeBufferSize != actualWriteSize) {
331  bufferAlertNeeded = true;
332  bufferAlerted = false;
333  yCWarning(DGRAMTWOWAYSTREAM, "Failed to set SND socket buffer to desired size. Actual: %d, Desired: %d",
334  actualWriteSize,
335  writeBufferSize);
336  }
337  }
338 }
339 
340 
341 #if defined(YARP_HAS_ACE)
342 int DgramTwoWayStream::restrictMcast(ACE_SOCK_Dgram_Mcast* dmcast,
343  const Contact& group,
344  const Contact& ipLocal,
345  bool add)
346 {
347  restrictInterfaceIp = ipLocal;
348 
349  yCInfo(DGRAMTWOWAYSTREAM, "multicast connection %s on network interface for %s", group.getHost().c_str(), ipLocal.getHost().c_str());
350  int result = -1;
351  // There's some major damage in ACE mcast interfaces.
352  // Most require interface names, yet provide no way to query
353  // these names - and in the end, convert to IP addresses.
354  // Here we try to do an end run around ACE.
355 
356  // based on: ACE_SOCK_Dgram::set_nic
357 
358  ip_mreq multicast_address;
359  ACE_INET_Addr group_addr(group.getPort(),
360  group.getHost().c_str());
361  ACE_INET_Addr interface_addr(ipLocal.getPort(),
362  ipLocal.getHost().c_str());
363  multicast_address.imr_interface.s_addr = htonl(interface_addr.get_ip_address());
364  multicast_address.imr_multiaddr.s_addr = htonl(group_addr.get_ip_address());
365 
366  if (add) {
367  yCDebug(DGRAMTWOWAYSTREAM, "Trying to correct mcast membership...");
368  result = ((ACE_SOCK*)dmcast)->set_option(IPPROTO_IP, IP_ADD_MEMBERSHIP, &multicast_address, sizeof(struct ip_mreq));
369  } else {
370  yCDebug(DGRAMTWOWAYSTREAM, "Trying to correct mcast output...");
371  result = ((ACE_SOCK*)dmcast)->set_option(IPPROTO_IP, IP_MULTICAST_IF, &multicast_address.imr_interface.s_addr, sizeof(struct in_addr));
372  }
373  if (result != 0) {
374  int num = errno;
375  yCDebug(DGRAMTWOWAYSTREAM, "mcast result: %s", strerror(num));
376  if (num == 98) {
377  // our membership is already correct / Address already in use
378  result = 0;
379  }
380  result = 0; // in fact, best to proceed for Windows.
381  }
382 
383  return result;
384 }
385 #endif
386 
387 
389  const Contact& ipLocal)
390 {
391 
392  multiMode = true;
393 
394  localAddress = ipLocal;
395 
396 #if defined(YARP_HAS_ACE)
397  localHandle = ACE_INET_Addr((u_short)(localAddress.getPort()),
398  (ACE_UINT32)INADDR_ANY);
399 
400  ACE_SOCK_Dgram_Mcast::options mcastOptions = ACE_SOCK_Dgram_Mcast::DEFOPTS;
401 # if defined(__APPLE__)
402  mcastOptions = static_cast<ACE_SOCK_Dgram_Mcast::options>(ACE_SOCK_Dgram_Mcast::OPT_BINDADDR_NO | ACE_SOCK_Dgram_Mcast::DEFOPT_NULLIFACE);
403 # endif
404 
405  auto* dmcast = new ACE_SOCK_Dgram_Mcast(mcastOptions);
406  dgram = dmcast;
407  mgram = dmcast;
408  yCAssert(DGRAMTWOWAYSTREAM, dgram != nullptr);
409 
410  int result = -1;
411  ACE_INET_Addr addr(group.getPort(), group.getHost().c_str());
412  result = dmcast->open(addr, nullptr, 1);
413  if (result == 0) {
414  result = restrictMcast(dmcast, group, ipLocal, false);
415  }
416 
417  if (result != 0) {
418  yCError(DGRAMTWOWAYSTREAM, "could not open multicast datagram socket");
419  return false;
420  }
421 
422 #else
423  dgram = nullptr;
424  dgram_sockfd = -1;
425 
426  int s = -1;
427  struct sockaddr_in dgram_sin;
428  // create what looks like an ordinary UDP socket
429  if ((s = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP)) == -1) {
430  yCError(DGRAMTWOWAYSTREAM, "could not create sender socket");
431  std::exit(1);
432  }
433  // set up destination address
434  memset((char*)&dgram_sin, 0, sizeof(dgram_sin));
435  dgram_sin.sin_family = AF_INET;
436  dgram_sin.sin_port = htons(group.getPort());
437 
438 
439  if (inet_pton(AF_INET, group.getHost().c_str(), &dgram_sin.sin_addr) == 0) {
440  yCError(DGRAMTWOWAYSTREAM, "could not set up mcast client");
441  std::exit(1);
442  }
443  if (connect(s, (struct sockaddr*)&dgram_sin, sizeof(dgram_sin)) == -1) {
444  yCError(DGRAMTWOWAYSTREAM, "could not connect mcast client");
445  std::exit(1);
446  }
447 
448 
449  dgram_sockfd = s;
450  dgram = this;
451  int local_port = -1;
452 
453  struct sockaddr_in sin;
454  socklen_t len = sizeof(sin);
455  if (getsockname(dgram_sockfd, (struct sockaddr*)&sin, &len) == 0 && sin.sin_family == AF_INET) {
456  local_port = ntohs(sin.sin_port);
457  }
458 
459 
460 #endif
461  configureSystemBuffers();
462  remoteAddress = group;
463 #ifdef YARP_HAS_ACE
464 
465  localHandle.set(localAddress.getPort(), localAddress.getHost().c_str());
466  remoteHandle.set(remoteAddress.getPort(), remoteAddress.getHost().c_str());
467 #else
468 
469  remoteAddress = group;
470  localAddress = Contact("127.0.0.1", local_port);
471  localHandle = local_port;
472  remoteHandle = remoteAddress.getPort();
473 
474 
475 #endif
476  yCDebug(DGRAMTWOWAYSTREAM, "Update: DGRAM from %s to %s", localAddress.toURI().c_str(), remoteAddress.toURI().c_str());
477  allocate();
478 
479  return true;
480 }
481 
482 
483 bool DgramTwoWayStream::join(const Contact& group, bool sender, const Contact& ipLocal)
484 {
485  yCDebug(DGRAMTWOWAYSTREAM, "subscribing to mcast address %s for %s", group.toURI().c_str(), (sender ? "writing" : "reading"));
486 
487  multiMode = true;
488 
489  if (sender) {
490  if (ipLocal.isValid()) {
491  return openMcast(group, ipLocal);
492  }
493  // just use udp as normal
494  return open(group);
495  }
496 
497 #if defined(YARP_HAS_ACE)
498  ACE_SOCK_Dgram_Mcast::options mcastOptions = ACE_SOCK_Dgram_Mcast::DEFOPTS;
499 # if defined(__APPLE__)
500  mcastOptions = static_cast<ACE_SOCK_Dgram_Mcast::options>(ACE_SOCK_Dgram_Mcast::OPT_BINDADDR_NO | ACE_SOCK_Dgram_Mcast::DEFOPT_NULLIFACE);
501 # endif
502 
503  auto* dmcast = new ACE_SOCK_Dgram_Mcast(mcastOptions);
504 
505  dgram = dmcast;
506  mgram = dmcast;
507  yCAssert(DGRAMTWOWAYSTREAM, dgram != nullptr);
508 
509  ACE_INET_Addr addr(group.getPort(), group.getHost().c_str());
510 
511  int result = -1;
512  if (ipLocal.isValid()) {
513  result = dmcast->join(addr, 1);
514 
515  if (result == 0) {
516  result = restrictMcast(dmcast, group, ipLocal, true);
517  }
518  } else {
519  result = dmcast->join(addr, 1);
520  }
521 
522  if (result != 0) {
523  yCError(DGRAMTWOWAYSTREAM, "cannot connect to multi-cast address");
524  happy = false;
525  return false;
526  }
527 #else
528  struct ip_mreq mreq;
529  int s = -1;
530  if ((s = socket(AF_INET, SOCK_DGRAM, 0)) < 0) {
531  yCError(DGRAMTWOWAYSTREAM, "could not create receiver socket");
532  happy = false;
533  return false;
534  }
535  struct sockaddr_in addr;
536  u_int yes = 1;
537 
538  /* set up destination address */
539  memset(&addr, 0, sizeof(addr));
540  addr.sin_family = AF_INET;
541  addr.sin_addr.s_addr = htonl(INADDR_ANY);
542  addr.sin_port = htons(group.getPort());
543 
544  // allow multiple sockets to use the same PORT number
545  if (setsockopt(s, SOL_SOCKET, SO_REUSEADDR, &yes, sizeof(u_int)) < 0) {
546  yCError(DGRAMTWOWAYSTREAM, "could not allow sockets use the same ADDRESS");
547  happy = false;
548  return false;
549  }
550 
551 # if defined(__APPLE__)
552  if (setsockopt(s, SOL_SOCKET, SO_REUSEPORT, &yes, sizeof(u_int)) < 0) {
553  yCError(DGRAMTWOWAYSTREAM, "could not allow sockets use the same PORT number");
554  happy = false;
555  return false;
556  }
557 # endif
558 
559  // bind to receive address
560  if (bind(s, (struct sockaddr*)&addr, sizeof(addr)) == -1) {
561  yCError(DGRAMTWOWAYSTREAM, "could not create mcast server");
562  happy = false;
563  return false;
564  }
565 
566  // use setsockopt() to request that the kernel join a multicast group
567  if (inet_pton(AF_INET, group.getHost().c_str(), &mreq.imr_multiaddr) == 0) {
568  yCError(DGRAMTWOWAYSTREAM, "Could not set up the mcast server");
569  std::exit(1);
570  }
571  mreq.imr_interface.s_addr = htonl(INADDR_ANY);
572  if (setsockopt(s, IPPROTO_IP, IP_ADD_MEMBERSHIP, &mreq, sizeof(mreq)) < 0) {
573  yCError(DGRAMTWOWAYSTREAM, "could not join the multicast group");
574  yCError(DGRAMTWOWAYSTREAM, "sendto: %d, %s", errno, strerror(errno));
575  happy = false;
576  return false;
577  }
578 
579  dgram_sockfd = s;
580  dgram = this;
581 #endif
582  configureSystemBuffers();
583 
584  localAddress = group;
585  remoteAddress = group;
586 #ifdef YARP_HAS_ACE
587  localHandle.set(localAddress.getPort(), localAddress.getHost().c_str());
588  remoteHandle.set(remoteAddress.getPort(), remoteAddress.getHost().c_str());
589 #else
590  localHandle = localAddress.getPort();
591  remoteHandle = remoteAddress.getPort();
592 #endif
593  allocate();
594  return true;
595 }
596 
598 {
599  closeMain();
600 }
601 
603 {
604  bool act = false;
605  mutex.lock();
606  if ((!closed) && (!interrupting) && happy) {
607  act = true;
608  interrupting = true;
609  closed = true;
610  }
611  mutex.unlock();
612 
613  if (act) {
614  if (reader) {
615  int ct = 3;
616  while (happy && ct > 0) {
617  ct--;
618  DgramTwoWayStream tmp;
619  if (mgram != nullptr) {
620  yCDebug(DGRAMTWOWAYSTREAM, "* mcast interrupt, interface %s", restrictInterfaceIp.toString().c_str());
621  tmp.join(localAddress, true, restrictInterfaceIp);
622  } else {
623  yCDebug(DGRAMTWOWAYSTREAM, "* dgram interrupt");
624  tmp.open(Contact(localAddress.getHost(), 0),
625  localAddress);
626  }
627  yCDebug(DGRAMTWOWAYSTREAM, "* interrupt state %s %s %s",
628  (interrupting ? "true" : "false"),
629  (closed ? "true" : "false"),
630  (happy ? "true" : "false"));
631  ManagedBytes empty(10);
632  for (size_t i = 0; i < empty.length(); i++) {
633  empty.get()[i] = 0;
634  }
635 
636  // don't want this message getting into a valid packet
637  tmp.pct = -1;
638 
639  tmp.write(empty.bytes());
640  tmp.flush();
641  tmp.close();
642  if (happy) {
644  }
645  }
646  yCDebug(DGRAMTWOWAYSTREAM, "dgram interrupt done");
647  }
648  mutex.lock();
649  interrupting = false;
650  mutex.unlock();
651  } else {
652  // wait for interruption to be done
653  if (interrupting) {
654  while (interrupting) {
655  yCDebug(DGRAMTWOWAYSTREAM, "waiting for dgram interrupt to be finished...");
657  }
658  }
659  }
660 }
661 
663 {
664  if (dgram != nullptr) {
665  //printf("Dgram closing, interrupt state %d\n", interrupting);
666  interrupt();
667  mutex.lock();
668  closed = true;
669  happy = false;
670  //printf("Dgram closinger, interrupt state %d\n", interrupting);
671  mutex.unlock();
672  while (interrupting) {
673  happy = false;
675  }
676  mutex.lock();
677  if (dgram != nullptr) {
678 #if defined(YARP_HAS_ACE)
679  dgram->close();
680  delete dgram;
681 #else
682  if (dgram_sockfd >= 0) {
683  ::close(dgram_sockfd);
684  }
685  dgram_sockfd = -1;
686 #endif
687  dgram = nullptr;
688  mgram = nullptr;
689  }
690  happy = false;
691  mutex.unlock();
692  }
693  happy = false;
694 }
695 
697 {
698  reader = true;
699  bool done = false;
700 
701  while (!done) {
702 
703  if (closed) {
704  happy = false;
705  return -1;
706  }
707 
708  // if nothing is available, try to grab stuff
709  if (readAvail == 0) {
710  readAt = 0;
711 
712 
713  //yCAssert(DGRAMTWOWAYSTREAM, dgram != nullptr);
714  yCTrace(DGRAMTWOWAYSTREAM, "DGRAM Waiting for something!");
715  yarp::conf::ssize_t result = -1;
716 #if defined(YARP_HAS_ACE)
717  if ((dgram != nullptr) && restrictInterfaceIp.isValid()) {
718  yCTrace(DGRAMTWOWAYSTREAM, "Consider remote mcast");
719  yCTrace(DGRAMTWOWAYSTREAM, "What we know:");
720  yCTrace(DGRAMTWOWAYSTREAM, " %s", restrictInterfaceIp.toString().c_str());
721  yCTrace(DGRAMTWOWAYSTREAM, " %s", localAddress.toString().c_str());
722  yCTrace(DGRAMTWOWAYSTREAM, " %s", remoteAddress.toString().c_str());
723 
724  ACE_INET_Addr iface(restrictInterfaceIp.getPort(),
725  restrictInterfaceIp.getHost().c_str());
726  ACE_INET_Addr dummy((u_short)0, (ACE_UINT32)INADDR_ANY);
727  result = dgram->recv(readBuffer.get(), readBuffer.length(), dummy);
728  yCDebug(DGRAMTWOWAYSTREAM, "MCAST Got %zd bytes", result);
729 
730  } else
731 #endif
732  if (dgram != nullptr) {
733  yCAssert(DGRAMTWOWAYSTREAM, dgram != nullptr);
734 #if defined(YARP_HAS_ACE)
735  ACE_INET_Addr dummy((u_short)0, (ACE_UINT32)INADDR_ANY);
736  yCTrace(DGRAMTWOWAYSTREAM, "DGRAM Waiting for something!");
737  result = dgram->recv(readBuffer.get(), readBuffer.length(), dummy);
738 #else
739  result = recv(dgram_sockfd, readBuffer.get(), readBuffer.length(), 0);
740 #endif
741  yCDebug(DGRAMTWOWAYSTREAM, "DGRAM Got %zd bytes", result);
742  } else {
743  onMonitorInput();
744  //printf("Monitored input of %d bytes\n", monitor.length());
745  if (monitor.length() > readBuffer.length()) {
746  printf("Too big!\n");
747  std::exit(1);
748  }
749  memcpy(readBuffer.get(), monitor.get(), monitor.length());
750  result = monitor.length();
751  }
752 
753  if (closed || (result < 0)) {
754  happy = false;
755  return -1;
756  }
757  readAvail = result;
758 
759  // deal with CRC
760  int altPct = 0;
761  bool crcOk = checkCrc(readBuffer.get(), readAvail, CRC_SIZE, pct, &altPct);
762  if (altPct != -1) {
763  pct++;
764  if (!crcOk) {
765  if (bufferAlertNeeded && !bufferAlerted) {
766  yCError(DGRAMTWOWAYSTREAM, "*** Multicast/UDP packet dropped - checksum error ***");
767  yCInfo(DGRAMTWOWAYSTREAM, "The UDP/MCAST system buffer limit on your system is low.");
768  yCInfo(DGRAMTWOWAYSTREAM, "You may get packet loss under heavy conditions.");
769 #ifdef __linux__
770  yCInfo(DGRAMTWOWAYSTREAM, "To change the buffer limit on linux: sysctl -w net.core.rmem_max=8388608");
771  yCInfo(DGRAMTWOWAYSTREAM, "(Might be something like: sudo /sbin/sysctl -w net.core.rmem_max=8388608)");
772 #else
773  yCInfo(DGRAMTWOWAYSTREAM, "To change the limit use: sysctl for Linux/FreeBSD, ndd for Solaris, no for AIX");
774 #endif
775  bufferAlerted = true;
776  } else {
777  errCount++;
778  double now = SystemClock::nowSystem();
779  if (now - lastReportTime > 1) {
780  yCError(DGRAMTWOWAYSTREAM, "*** %d datagram packet(s) dropped - checksum error ***", errCount);
781  lastReportTime = now;
782  errCount = 0;
783  }
784  }
785  reset();
786  return -1;
787  }
788  readAt += CRC_SIZE;
789  readAvail -= CRC_SIZE;
790  done = true;
791  } else {
792  readAvail = 0;
793  }
794  }
795 
796  // if stuff is available, take it
797  if (readAvail > 0) {
798  size_t take = readAvail;
799  if (take > b.length()) {
800  take = b.length();
801  }
802  memcpy(b.get(), readBuffer.get() + readAt, take);
803  readAt += take;
804  readAvail -= take;
805  return take;
806  }
807  }
808 
809  return 0;
810 }
811 
813 {
814  yCTrace(DGRAMTWOWAYSTREAM, "DGRAM prep writing");
815  yCTrace(DGRAMTWOWAYSTREAM, "DGRAM write %zu bytes", b.length());
816 
817  if (reader) {
818  return;
819  }
820  if (writeBuffer.get() == nullptr) {
821  return;
822  }
823 
824  Bytes local = b;
825  while (local.length() > 0) {
826  yCTrace(DGRAMTWOWAYSTREAM, "DGRAM prep writing");
827  yarp::conf::ssize_t rem = local.length();
828  yarp::conf::ssize_t space = writeBuffer.length() - writeAvail;
829  bool shouldFlush = false;
830  if (rem >= space) {
831  rem = space;
832  shouldFlush = true;
833  }
834  memcpy(writeBuffer.get() + writeAvail, local.get(), rem);
835  writeAvail += rem;
836  local = Bytes(local.get() + rem, local.length() - rem);
837  if (shouldFlush) {
838  flush();
839  }
840  }
841 }
842 
843 
845 {
846  if (writeBuffer.get() == nullptr) {
847  return;
848  }
849 
850  // should set CRC
851  if (writeAvail <= CRC_SIZE) {
852  return;
853  }
854  addCrc(writeBuffer.get(), writeAvail, CRC_SIZE, pct);
855  pct++;
856 
857  if (writeAvail > 0) {
858  //yCAssert(DGRAMTWOWAYSTREAM, dgram != nullptr);
859  yarp::conf::ssize_t len = 0;
860 
861 #if defined(YARP_HAS_ACE)
862  if (mgram != nullptr) {
863  len = mgram->send(writeBuffer.get(), writeAvail);
864  yCDebug(DGRAMTWOWAYSTREAM, "MCAST - wrote %zd bytes", len);
865  } else
866 #endif
867  if (dgram != nullptr) {
868 #if defined(YARP_HAS_ACE)
869  len = dgram->send(writeBuffer.get(), writeAvail, remoteHandle);
870 #else
871  len = send(dgram_sockfd, writeBuffer.get(), writeAvail, 0);
872 #endif
873  yCDebug(DGRAMTWOWAYSTREAM, "DGRAM - wrote %zd bytes to %s", len, remoteAddress.toString().c_str());
874  } else {
875  Bytes b(writeBuffer.get(), writeAvail);
876  monitor = ManagedBytes(b, false);
877  monitor.copy();
878  //printf("Monitored output of %d bytes\n", monitor.length());
879  len = monitor.length();
880  onMonitorOutput();
881  }
882  if (len > writeBuffer.length() * 0.75) {
883  yCDebug(DGRAMTWOWAYSTREAM, "long dgrams might need a little time");
884 
885  // Under heavy loads, packets could get dropped
886  // 640x480x3 images correspond to about 15 datagrams
887  // so there's not much time possible between them
888  // looked at iperf, it just does a busy-waiting delay
889  // there's an implementation below, but commented out -
890  // better solution was to increase recv buffer size
891 
892  double first = yarp::os::SystemClock::nowSystem();
893  double now;
894  int ct = 0;
895  do {
896  //printf("Busy wait... %d\n", ct);
899  ct++;
900  } while (now - first < 0.001);
901  }
902 
903  if (len < 0) {
904  happy = false;
905  yCDebug(DGRAMTWOWAYSTREAM, "DGRAM failed to send message with error: %s", strerror(errno));
906  return;
907  }
908  writeAvail -= len;
909 
910  if (writeAvail != 0) {
911  // well, we have a problem
912  // checksums will cause dumping
913  yCDebug(DGRAMTWOWAYSTREAM, "dgram/mcast send behaving badly");
914  }
915  }
916  // finally: writeAvail should be 0
917 
918  // make space for CRC
919  writeAvail = CRC_SIZE;
920 }
921 
922 
924 {
925  return happy;
926 }
927 
928 
930 {
931  readAt = 0;
932  readAvail = 0;
933  writeAvail = CRC_SIZE;
934  pct = 0;
935 }
936 
937 
939 {
940 // yCError(DGRAMTWOWAYSTREAM, "Packet begins: %s", (reader ? "reader" : "writer"));
941  pct = 0;
942 }
943 
945 {
946 // yCError(DGRAMTWOWAYSTREAM, "Packet ends: %s", (reader ? "reader" : "writer"));
947  if (!reader) {
948  pct = 0;
949  }
950 }
951 
953 {
954  return monitor.bytes();
955 }
956 
957 
959 {
960  monitor.clear();
961 }
962 
963 
965 {
966  if (dgram == nullptr) {
967  return false;
968  }
969 #if defined(YARP_HAS_ACE)
970  return (dgram->set_option(IPPROTO_IP, IP_TOS, (int*)&tos, (int)sizeof(tos)) == 0);
971 #else
972  return (setsockopt(dgram_sockfd, IPPROTO_IP, IP_TOS, (int*)&tos, (int)sizeof(tos)) == 0);
973 #endif
974 }
975 
977 {
978  int tos = -1;
979  if (dgram == nullptr) {
980  return tos;
981  }
982 #if defined(YARP_HAS_ACE)
983  int optlen;
984  dgram->get_option(IPPROTO_IP, IP_TOS, (int*)&tos, &optlen);
985 #else
986  socklen_t optlen;
987  getsockopt(dgram_sockfd, IPPROTO_IP, IP_TOS, (int*)&tos, &optlen);
988 #endif
989  return tos;
990 }
static bool checkCrc(char *buf, yarp::conf::ssize_t length, yarp::conf::ssize_t crcLength, int pct, int *store_altPct=nullptr)
#define CRC_SIZE
#define UDP_MAX_DATAGRAM_SIZE
static void addCrc(char *buf, yarp::conf::ssize_t length, yarp::conf::ssize_t crcLength, int pct)
A simple abstraction for a block of bytes.
Definition: Bytes.h:25
size_t length() const
Definition: Bytes.cpp:22
const char * get() const
Definition: Bytes.cpp:27
Represents how to reach a part of a YARP network.
Definition: Contact.h:36
bool isValid() const
Checks if a Contact is tagged as valid.
Definition: Contact.cpp:298
std::string toURI(bool includeCarrier=true) const
Get a representation of the Contact as a URI.
Definition: Contact.cpp:313
int getPort() const
Get the port number associated with this Contact for socket communication.
Definition: Contact.cpp:239
std::string getHost() const
Get the host name associated with this Contact for socket communication.
Definition: Contact.cpp:228
virtual int read()
Read and return a single byte.
Definition: InputStream.cpp:20
An abstraction for a block of bytes, with optional responsibility for allocating/destroying that bloc...
Definition: ManagedBytes.h:22
const Bytes & bytes() const
const char * get() const
static int netInt(const yarp::os::Bytes &code)
Definition: NetType.cpp:27
static unsigned long int getCrc(char *buf, size_t len)
Definition: NetType.cpp:169
static double nowSystem()
Definition: SystemClock.cpp:34
static void delaySystem(double seconds)
Definition: SystemClock.cpp:29
A stream abstraction for datagram communication.
bool setTypeOfService(int tos) override
void write(const yarp::os::Bytes &b) override
Write a block of bytes to the stream.
void interrupt() override
Interrupt the stream.
virtual bool open(const Contact &remote)
void close() override
Terminate the stream.
void endPacket() override
Mark the end of a logical packet (see beginPacket).
virtual bool openMcast(const Contact &group, const Contact &ipLocal)
void reset() override
Reset the stream.
virtual bool join(const Contact &group, bool sender, const Contact &ipLocal)
void beginPacket() override
Mark the beginning of a logical packet.
void flush() override
Make sure all pending write operations are finished.
bool isOk() const override
Check if the stream is ok or in an error state.
#define yCInfo(component,...)
Definition: LogComponent.h:132
#define yCError(component,...)
Definition: LogComponent.h:154
#define yCAssert(component, x)
Definition: LogComponent.h:169
#define yCTrace(component,...)
Definition: LogComponent.h:85
#define yCWarning(component,...)
Definition: LogComponent.h:143
#define yCDebug(component,...)
Definition: LogComponent.h:109
#define YARP_OS_LOG_COMPONENT(name, name_string)
Definition: LogComponent.h:35
std::string get_string(const std::string &key, bool *found=nullptr)
Read a string from an environment variable.
Definition: environment.h:68
::ssize_t ssize_t
Definition: numeric.h:86
double now()
Return the current time in seconds, relative to an arbitrary starting point.
Definition: Time.cpp:121
The components from which ports and connections are built.
An interface to the operating system, including Port based communication.
std::int32_t NetInt32
Definition of the NetInt32 type.
Definition: NetInt32.h:30