YARP
Yet Another Robot Platform
 
Loading...
Searching...
No Matches
DgramTwoWayStream.cpp
Go to the documentation of this file.
1/*
2 * SPDX-FileCopyrightText: 2006-2021 Istituto Italiano di Tecnologia (IIT)
3 * SPDX-FileCopyrightText: 2006-2010 RobotCub Consortium
4 * SPDX-License-Identifier: BSD-3-Clause
5 */
6
8
9#include <yarp/conf/system.h>
11
12#include <yarp/os/NetType.h>
13#include <yarp/os/Time.h>
15
16#if defined(YARP_HAS_ACE)
17# include <ace/ACE.h>
18# include <ace/Handle_Set.h>
19# include <ace/INET_Addr.h>
20# include <ace/Log_Msg.h>
21# include <ace/OS_Memory.h>
22# include <ace/OS_NS_sys_select.h>
23# include <ace/SOCK_Dgram.h>
24# include <ace/SOCK_Dgram_Mcast.h>
25# include <ace/os_include/net/os_if.h>
26// In one the ACE headers there is a definition of "main" for WIN32
27# ifdef main
28# undef main
29# endif
30#else
31# include <arpa/inet.h>
32# include <netinet/in.h>
33# include <sys/socket.h>
34# include <sys/types.h>
35# include <unistd.h>
36#endif
37
38#include <cerrno>
39#include <cstring>
40
41using namespace yarp::os::impl;
42using namespace yarp::os;
43
44#define CRC_SIZE 8
45#define UDP_MAX_DATAGRAM_SIZE (65507 - CRC_SIZE)
46
47
48namespace {
49YARP_OS_LOG_COMPONENT(DGRAMTWOWAYSTREAM, "yarp.os.impl.DgramTwoWayStream")
50} // namespace
51
52
53static bool checkCrc(char* buf, yarp::conf::ssize_t length, yarp::conf::ssize_t crcLength, int pct, int* store_altPct = nullptr)
54{
56 (length > crcLength) ? (length - crcLength) : 0);
57 Bytes b(buf, 4);
58 Bytes b2(buf + 4, 4);
61 bool ok = (alt == curr && pct == altPct);
62 if (!ok) {
63 if (alt != curr) {
64 yCDebug(DGRAMTWOWAYSTREAM, "crc mismatch");
65 }
66 if (pct != altPct) {
67 yCDebug(DGRAMTWOWAYSTREAM, "packet code broken");
68 }
69 }
70 if (store_altPct != nullptr) {
72 }
73
74 return ok;
75}
76
77
78static void addCrc(char* buf, yarp::conf::ssize_t length, yarp::conf::ssize_t crcLength, int pct)
79{
81 (length > crcLength) ? (length - crcLength) : 0);
82 Bytes b(buf, 4);
83 Bytes b2(buf + 4, 4);
86}
87
88
90{
91#if defined(YARP_HAS_ACE)
93 Contact local(anywhere.get_host_addr(),
94 anywhere.get_port_number());
95#else
96 Contact local("localhost", -1);
97#endif
98 return open(local, remote);
99}
100
101bool DgramTwoWayStream::open(const Contact& local, const Contact& remote)
102{
103 localAddress = local;
104 remoteAddress = remote;
105
106#if defined(YARP_HAS_ACE)
107 localHandle = ACE_INET_Addr((u_short)(localAddress.getPort()), (ACE_UINT32)INADDR_ANY);
108 if (remote.isValid()) {
109 remoteHandle.set(remoteAddress.getPort(), remoteAddress.getHost().c_str());
110 }
111 dgram = new ACE_SOCK_Dgram;
112 yCAssert(DGRAMTWOWAYSTREAM, dgram != nullptr);
113
114 int result = dgram->open(localHandle,
116 0,
117 1);
118#else
119 dgram = nullptr;
120 dgram_sockfd = -1;
121
122 int s = -1;
123 if ((s = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP)) == -1) {
124 std::exit(1);
125 }
126 struct sockaddr_in dgram_sin;
127 memset((char*)&dgram_sin, 0, sizeof(dgram_sin));
128 dgram_sin.sin_family = AF_INET;
129 dgram_sin.sin_addr.s_addr = htonl(INADDR_ANY);
130 dgram_sin.sin_port = htons(remote.getPort());
131 if (local.isValid()) {
132 if (inet_pton(AF_INET, remote.getHost().c_str(), &dgram_sin.sin_addr) == 0) {
133 yCError(DGRAMTWOWAYSTREAM, "could not set up udp client");
134 std::exit(1);
135 }
136 if (connect(s, (struct sockaddr*)&dgram_sin, sizeof(dgram_sin)) == -1) {
137 yCError(DGRAMTWOWAYSTREAM, "could not connect udp client");
138 std::exit(1);
139 }
140 } else {
141 if (bind(s, (struct sockaddr*)&dgram_sin, sizeof(dgram_sin)) == -1) {
142 yCError(DGRAMTWOWAYSTREAM, "could not create udp server");
143 std::exit(1);
144 }
145 }
146 dgram_sockfd = s;
147 dgram = this;
148 int result = -1;
149 int local_port = -1;
150
151 struct sockaddr_in sin;
152 socklen_t len = sizeof(sin);
153 if (getsockname(dgram_sockfd, (struct sockaddr*)&sin, &len) == 0 && sin.sin_family == AF_INET) {
154 result = 0;
155 local_port = ntohs(sin.sin_port);
156 }
157#endif
158
159 if (result != 0) {
160 yCError(DGRAMTWOWAYSTREAM, "could not open datagram socket");
161 return false;
162 }
163
164 configureSystemBuffers();
165
166#if defined(YARP_HAS_ACE)
167 dgram->get_local_addr(localHandle);
168 yCDebug(DGRAMTWOWAYSTREAM, "starting DGRAM entity on port number %u",localHandle.get_port_number());
169 localAddress = Contact("127.0.0.1", localHandle.get_port_number());
170#else
171 localAddress = Contact("127.0.0.1", local_port);
172#endif
173
174 yCDebug(DGRAMTWOWAYSTREAM, "Update: DGRAM from %s to %s", localAddress.toURI().c_str(), remoteAddress.toURI().c_str());
175
176 allocate();
177
178 return true;
179}
180
181void DgramTwoWayStream::allocate(int readSize, int writeSize)
182{
183 //These are only as another default. We should modify the method to return bool
184 //and fail if we cannot read the socket size.
185
186 int _read_size = -1;
187 int _write_size = -1;
188
189 std::string _env_dgram = yarp::conf::environment::get_string("YARP_DGRAM_SIZE");
190 std::string _env_mode;
191 if (multiMode) {
193 } else {
195 }
196 if (!_env_mode.empty()) {
198 }
199 if (!_env_dgram.empty()) {
200 int sz = yarp::conf::numeric::from_string<int>(_env_dgram);
201 if (sz != 0) {
203 }
204 yCInfo(DGRAMTWOWAYSTREAM, "Datagram packet size set to %d", _read_size);
205 }
206 if (readSize != 0) {
208 yCInfo(DGRAMTWOWAYSTREAM, "Datagram read size reset to %d", _read_size);
209 }
210 if (writeSize != 0) {
212 yCInfo(DGRAMTWOWAYSTREAM, "Datagram write size reset to %d", _write_size);
213 }
214
215 // force the size of the write buffer to be under the max size of a udp datagram.
218 }
219
220 if (_read_size < 0) {
221#if defined(YARP_HAS_ACE)
222 //Defaults to socket size
223 if (dgram != nullptr) {
224 int len = sizeof(_read_size);
225 int result = dgram->get_option(SOL_SOCKET, SO_RCVBUF, &_read_size, &len);
226 if (result < 0) {
227 yCError(DGRAMTWOWAYSTREAM, "Failed to read buffer size from RCVBUF socket with error: %s. Setting read buffer size to UDP_MAX_DATAGRAM_SIZE.",
228 strerror(errno));
230 }
231 }
232#else
233 socklen_t len = sizeof(_read_size);
234
235 int result = getsockopt(dgram_sockfd, SOL_SOCKET, SO_RCVBUF, &_read_size, &len);
236 if (result < 0) {
237 yCError(DGRAMTWOWAYSTREAM, "Failed to read buffer size from RCVBUF socket with error: %s. Setting read buffer size to UDP_MAX_DATAGRAM_SIZE.",
238 strerror(errno));
240 }
241#endif
242 }
243
244 readBuffer.allocate(_read_size);
245 writeBuffer.allocate(_write_size);
246 readAt = 0;
247 readAvail = 0;
248 writeAvail = CRC_SIZE;
249 //happy = true;
250 pct = 0;
251}
252
253
254void DgramTwoWayStream::configureSystemBuffers()
255{
256 //By default the buffers are forced to the datagram size limit.
257 //These can be overwritten by environment variables
258 //Generic variable
259 std::string socketBufferSize = yarp::conf::environment::get_string("YARP_DGRAM_BUFFER_SIZE");
260 //Specific read
261 std::string socketReadBufferSize = yarp::conf::environment::get_string("YARP_DGRAM_RECV_BUFFER_SIZE");
262 //Specific write
263 std::string socketSendBufferSize = yarp::conf::environment::get_string("YARP_DGRAM_SND_BUFFER_SIZE");
264
265 int readBufferSize = -1;
266 if (!socketReadBufferSize.empty()) {
267 readBufferSize = yarp::conf::numeric::from_string<int>(socketReadBufferSize);
268 } else if (!socketBufferSize.empty()) {
269 readBufferSize = yarp::conf::numeric::from_string<int>(socketBufferSize);
270 }
271
272 int writeBufferSize = -1;
273 if (!socketSendBufferSize.empty()) {
274 writeBufferSize = yarp::conf::numeric::from_string<int>(socketSendBufferSize);
275 } else if (!socketBufferSize.empty()) {
276 writeBufferSize = yarp::conf::numeric::from_string<int>(socketBufferSize);
277 }
278 // The writeBufferSize can't be set greater than udp datagram
279 // maximum size
282 yCWarning(DGRAMTWOWAYSTREAM, "The desired SND buffer size is too big. It is set to the max datagram size : %d",
284 }
286 }
287
288 if (readBufferSize > 0) {
289 int actualReadSize = -1;
290
291#if defined(YARP_HAS_ACE)
292 int intSize = sizeof(readBufferSize);
293 int setResult = dgram->set_option(SOL_SOCKET, SO_RCVBUF, (void*)&readBufferSize, intSize);
294
295 int getResult = dgram->get_option(SOL_SOCKET, SO_RCVBUF, (void*)&actualReadSize, &intSize);
296#else
298 int setResult = setsockopt(dgram_sockfd, SOL_SOCKET, SO_RCVBUF, (void*)&readBufferSize, intSize);
299 int getResult = getsockopt(dgram_sockfd, SOL_SOCKET, SO_RCVBUF, (void*)&actualReadSize, &intSize);
300#endif
301 // in linux the value returned by getsockopt is "doubled"
302 // for some unknown reasons (see https://linux.die.net/man/7/socket)
303#if defined(__linux__)
304 actualReadSize /= 2;
305#endif
306 if (setResult < 0 || getResult < 0 || readBufferSize != actualReadSize) {
307 bufferAlertNeeded = true;
308 bufferAlerted = false;
309 yCWarning(DGRAMTWOWAYSTREAM, "Failed to set RECV socket buffer to desired size. Actual: %d, Desired %d",
312 }
313 }
314 if (writeBufferSize > 0) {
315 int actualWriteSize = -1;
316#if defined(YARP_HAS_ACE)
317 int intSize = sizeof(writeBufferSize);
318 int setResult = dgram->set_option(SOL_SOCKET, SO_SNDBUF, (void*)&writeBufferSize, intSize);
319 int getResult = dgram->get_option(SOL_SOCKET, SO_SNDBUF, (void*)&actualWriteSize, &intSize);
320#else
322 int setResult = setsockopt(dgram_sockfd, SOL_SOCKET, SO_SNDBUF, (void*)&writeBufferSize, intSize);
323 int getResult = getsockopt(dgram_sockfd, SOL_SOCKET, SO_SNDBUF, (void*)&actualWriteSize, &intSize);
324#endif
325 // in linux the value returned by getsockopt is "doubled"
326 // for some unknown reasons (see https://linux.die.net/man/7/socket)
327#if defined(__linux__)
328 actualWriteSize /= 2;
329#endif
330 if (setResult < 0 || getResult < 0 || writeBufferSize != actualWriteSize) {
331 bufferAlertNeeded = true;
332 bufferAlerted = false;
333 yCWarning(DGRAMTWOWAYSTREAM, "Failed to set SND socket buffer to desired size. Actual: %d, Desired: %d",
336 }
337 }
338}
339
340
341#if defined(YARP_HAS_ACE)
342int DgramTwoWayStream::restrictMcast(ACE_SOCK_Dgram_Mcast* dmcast,
343 const Contact& group,
344 const Contact& ipLocal,
345 bool add)
346{
347 restrictInterfaceIp = ipLocal;
348
349 yCInfo(DGRAMTWOWAYSTREAM, "multicast connection %s on network interface for %s", group.getHost().c_str(), ipLocal.getHost().c_str());
350 int result = -1;
351 // There's some major damage in ACE mcast interfaces.
352 // Most require interface names, yet provide no way to query
353 // these names - and in the end, convert to IP addresses.
354 // Here we try to do an end run around ACE.
355
356 // based on: ACE_SOCK_Dgram::set_nic
357
360 group.getHost().c_str());
362 ipLocal.getHost().c_str());
363 multicast_address.imr_interface.s_addr = htonl(interface_addr.get_ip_address());
364 multicast_address.imr_multiaddr.s_addr = htonl(group_addr.get_ip_address());
365
366 if (add) {
367 yCDebug(DGRAMTWOWAYSTREAM, "Trying to correct mcast membership...");
368 result = ((ACE_SOCK*)dmcast)->set_option(IPPROTO_IP, IP_ADD_MEMBERSHIP, &multicast_address, sizeof(struct ip_mreq));
369 } else {
370 yCDebug(DGRAMTWOWAYSTREAM, "Trying to correct mcast output...");
371 result = ((ACE_SOCK*)dmcast)->set_option(IPPROTO_IP, IP_MULTICAST_IF, &multicast_address.imr_interface.s_addr, sizeof(struct in_addr));
372 }
373 if (result != 0) {
374 int num = errno;
375 yCDebug(DGRAMTWOWAYSTREAM, "mcast result: %s", strerror(num));
376 if (num == 98) {
377 // our membership is already correct / Address already in use
378 result = 0;
379 }
380 result = 0; // in fact, best to proceed for Windows.
381 }
382
383 return result;
384}
385#endif
386
387
389 const Contact& ipLocal)
390{
391
392 multiMode = true;
393
394 localAddress = ipLocal;
395
396#if defined(YARP_HAS_ACE)
397 localHandle = ACE_INET_Addr((u_short)(localAddress.getPort()),
399
400 ACE_SOCK_Dgram_Mcast::options mcastOptions = ACE_SOCK_Dgram_Mcast::DEFOPTS;
401# if defined(__APPLE__)
402 mcastOptions = static_cast<ACE_SOCK_Dgram_Mcast::options>(ACE_SOCK_Dgram_Mcast::OPT_BINDADDR_NO | ACE_SOCK_Dgram_Mcast::DEFOPT_NULLIFACE);
403# endif
404
406 dgram = dmcast;
407 mgram = dmcast;
408 yCAssert(DGRAMTWOWAYSTREAM, dgram != nullptr);
409
410 int result = -1;
411 ACE_INET_Addr addr(group.getPort(), group.getHost().c_str());
412 result = dmcast->open(addr, nullptr, 1);
413 if (result == 0) {
414 result = restrictMcast(dmcast, group, ipLocal, false);
415 }
416
417 if (result != 0) {
418 yCError(DGRAMTWOWAYSTREAM, "could not open multicast datagram socket");
419 return false;
420 }
421
422#else
423 dgram = nullptr;
424 dgram_sockfd = -1;
425
426 int s = -1;
427 struct sockaddr_in dgram_sin;
428 // create what looks like an ordinary UDP socket
429 if ((s = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP)) == -1) {
430 yCError(DGRAMTWOWAYSTREAM, "could not create sender socket");
431 std::exit(1);
432 }
433 // set up destination address
434 memset((char*)&dgram_sin, 0, sizeof(dgram_sin));
435 dgram_sin.sin_family = AF_INET;
436 dgram_sin.sin_port = htons(group.getPort());
437
438
439 if (inet_pton(AF_INET, group.getHost().c_str(), &dgram_sin.sin_addr) == 0) {
440 yCError(DGRAMTWOWAYSTREAM, "could not set up mcast client");
441 std::exit(1);
442 }
443 if (connect(s, (struct sockaddr*)&dgram_sin, sizeof(dgram_sin)) == -1) {
444 yCError(DGRAMTWOWAYSTREAM, "could not connect mcast client");
445 std::exit(1);
446 }
447
448
449 dgram_sockfd = s;
450 dgram = this;
451 int local_port = -1;
452
453 struct sockaddr_in sin;
454 socklen_t len = sizeof(sin);
455 if (getsockname(dgram_sockfd, (struct sockaddr*)&sin, &len) == 0 && sin.sin_family == AF_INET) {
456 local_port = ntohs(sin.sin_port);
457 }
458
459
460#endif
461 configureSystemBuffers();
462 remoteAddress = group;
463#ifdef YARP_HAS_ACE
464
465 localHandle.set(localAddress.getPort(), localAddress.getHost().c_str());
466 remoteHandle.set(remoteAddress.getPort(), remoteAddress.getHost().c_str());
467#else
468
469 remoteAddress = group;
470 localAddress = Contact("127.0.0.1", local_port);
471 localHandle = local_port;
472 remoteHandle = remoteAddress.getPort();
473
474
475#endif
476 yCDebug(DGRAMTWOWAYSTREAM, "Update: DGRAM from %s to %s", localAddress.toURI().c_str(), remoteAddress.toURI().c_str());
477 allocate();
478
479 return true;
480}
481
482
483bool DgramTwoWayStream::join(const Contact& group, bool sender, const Contact& ipLocal)
484{
485 yCDebug(DGRAMTWOWAYSTREAM, "subscribing to mcast address %s for %s", group.toURI().c_str(), (sender ? "writing" : "reading"));
486
487 multiMode = true;
488
489 if (sender) {
490 if (ipLocal.isValid()) {
491 return openMcast(group, ipLocal);
492 }
493 // just use udp as normal
494 return open(group);
495 }
496
497#if defined(YARP_HAS_ACE)
498 ACE_SOCK_Dgram_Mcast::options mcastOptions = ACE_SOCK_Dgram_Mcast::DEFOPTS;
499# if defined(__APPLE__)
500 mcastOptions = static_cast<ACE_SOCK_Dgram_Mcast::options>(ACE_SOCK_Dgram_Mcast::OPT_BINDADDR_NO | ACE_SOCK_Dgram_Mcast::DEFOPT_NULLIFACE);
501# endif
502
504
505 dgram = dmcast;
506 mgram = dmcast;
507 yCAssert(DGRAMTWOWAYSTREAM, dgram != nullptr);
508
509 ACE_INET_Addr addr(group.getPort(), group.getHost().c_str());
510
511 int result = -1;
512 if (ipLocal.isValid()) {
513 result = dmcast->join(addr, 1);
514
515 if (result == 0) {
516 result = restrictMcast(dmcast, group, ipLocal, true);
517 }
518 } else {
519 result = dmcast->join(addr, 1);
520 }
521
522 if (result != 0) {
523 yCError(DGRAMTWOWAYSTREAM, "cannot connect to multi-cast address");
524 happy = false;
525 return false;
526 }
527#else
528 struct ip_mreq mreq;
529 int s = -1;
530 if ((s = socket(AF_INET, SOCK_DGRAM, 0)) < 0) {
531 yCError(DGRAMTWOWAYSTREAM, "could not create receiver socket");
532 happy = false;
533 return false;
534 }
535 struct sockaddr_in addr;
536 u_int yes = 1;
537
538 /* set up destination address */
539 memset(&addr, 0, sizeof(addr));
540 addr.sin_family = AF_INET;
541 addr.sin_addr.s_addr = htonl(INADDR_ANY);
542 addr.sin_port = htons(group.getPort());
543
544 // allow multiple sockets to use the same PORT number
545 if (setsockopt(s, SOL_SOCKET, SO_REUSEADDR, &yes, sizeof(u_int)) < 0) {
546 yCError(DGRAMTWOWAYSTREAM, "could not allow sockets use the same ADDRESS");
547 happy = false;
548 return false;
549 }
550
551# if defined(__APPLE__)
552 if (setsockopt(s, SOL_SOCKET, SO_REUSEPORT, &yes, sizeof(u_int)) < 0) {
553 yCError(DGRAMTWOWAYSTREAM, "could not allow sockets use the same PORT number");
554 happy = false;
555 return false;
556 }
557# endif
558
559 // bind to receive address
560 if (bind(s, (struct sockaddr*)&addr, sizeof(addr)) == -1) {
561 yCError(DGRAMTWOWAYSTREAM, "could not create mcast server");
562 happy = false;
563 return false;
564 }
565
566 // use setsockopt() to request that the kernel join a multicast group
567 if (inet_pton(AF_INET, group.getHost().c_str(), &mreq.imr_multiaddr) == 0) {
568 yCError(DGRAMTWOWAYSTREAM, "Could not set up the mcast server");
569 std::exit(1);
570 }
571 mreq.imr_interface.s_addr = htonl(INADDR_ANY);
572 if (setsockopt(s, IPPROTO_IP, IP_ADD_MEMBERSHIP, &mreq, sizeof(mreq)) < 0) {
573 yCError(DGRAMTWOWAYSTREAM, "could not join the multicast group");
574 yCError(DGRAMTWOWAYSTREAM, "sendto: %d, %s", errno, strerror(errno));
575 happy = false;
576 return false;
577 }
578
579 dgram_sockfd = s;
580 dgram = this;
581#endif
582 configureSystemBuffers();
583
584 localAddress = group;
585 remoteAddress = group;
586#ifdef YARP_HAS_ACE
587 localHandle.set(localAddress.getPort(), localAddress.getHost().c_str());
588 remoteHandle.set(remoteAddress.getPort(), remoteAddress.getHost().c_str());
589#else
590 localHandle = localAddress.getPort();
591 remoteHandle = remoteAddress.getPort();
592#endif
593 allocate();
594 return true;
595}
596
601
603{
604 bool act = false;
605 mutex.lock();
606 if ((!closed) && (!interrupting) && happy) {
607 act = true;
608 interrupting = true;
609 closed = true;
610 }
611 mutex.unlock();
612
613 if (act) {
614 if (reader) {
615 int ct = 3;
616 while (happy && ct > 0) {
617 ct--;
619 if (mgram != nullptr) {
620 yCDebug(DGRAMTWOWAYSTREAM, "* mcast interrupt, interface %s", restrictInterfaceIp.toString().c_str());
621 tmp.join(localAddress, true, restrictInterfaceIp);
622 } else {
623 yCDebug(DGRAMTWOWAYSTREAM, "* dgram interrupt");
624 tmp.open(Contact(localAddress.getHost(), 0),
625 localAddress);
626 }
627 yCDebug(DGRAMTWOWAYSTREAM, "* interrupt state %s %s %s",
628 (interrupting ? "true" : "false"),
629 (closed ? "true" : "false"),
630 (happy ? "true" : "false"));
632 for (size_t i = 0; i < empty.length(); i++) {
633 empty.get()[i] = 0;
634 }
635
636 // don't want this message getting into a valid packet
637 tmp.pct = -1;
638
639 tmp.write(empty.bytes());
640 tmp.flush();
641 tmp.close();
642 if (happy) {
644 }
645 }
646 yCDebug(DGRAMTWOWAYSTREAM, "dgram interrupt done");
647 }
648 mutex.lock();
649 interrupting = false;
650 mutex.unlock();
651 } else {
652 // wait for interruption to be done
653 if (interrupting) {
654 while (interrupting) {
655 yCDebug(DGRAMTWOWAYSTREAM, "waiting for dgram interrupt to be finished...");
657 }
658 }
659 }
660}
661
663{
664 if (dgram != nullptr) {
665 //printf("Dgram closing, interrupt state %d\n", interrupting);
666 interrupt();
667 mutex.lock();
668 closed = true;
669 happy = false;
670 //printf("Dgram closinger, interrupt state %d\n", interrupting);
671 mutex.unlock();
672 while (interrupting) {
673 happy = false;
675 }
676 mutex.lock();
677 if (dgram != nullptr) {
678#if defined(YARP_HAS_ACE)
679 dgram->close();
680 delete dgram;
681#else
682 if (dgram_sockfd >= 0) {
683 ::close(dgram_sockfd);
684 }
685 dgram_sockfd = -1;
686#endif
687 dgram = nullptr;
688 mgram = nullptr;
689 }
690 happy = false;
691 mutex.unlock();
692 }
693 happy = false;
694}
695
697{
698 reader = true;
699 bool done = false;
700
701 while (!done) {
702
703 if (closed) {
704 happy = false;
705 return -1;
706 }
707
708 // if nothing is available, try to grab stuff
709 if (readAvail == 0) {
710 readAt = 0;
711
712
713 //yCAssert(DGRAMTWOWAYSTREAM, dgram != nullptr);
714 yCTrace(DGRAMTWOWAYSTREAM, "DGRAM Waiting for something!");
715 yarp::conf::ssize_t result = -1;
716#if defined(YARP_HAS_ACE)
717 if ((dgram != nullptr) && restrictInterfaceIp.isValid()) {
718 yCTrace(DGRAMTWOWAYSTREAM, "Consider remote mcast");
719 yCTrace(DGRAMTWOWAYSTREAM, "What we know:");
720 yCTrace(DGRAMTWOWAYSTREAM, " %s", restrictInterfaceIp.toString().c_str());
721 yCTrace(DGRAMTWOWAYSTREAM, " %s", localAddress.toString().c_str());
722 yCTrace(DGRAMTWOWAYSTREAM, " %s", remoteAddress.toString().c_str());
723
724 ACE_INET_Addr iface(restrictInterfaceIp.getPort(),
725 restrictInterfaceIp.getHost().c_str());
727 result = dgram->recv(readBuffer.get(), readBuffer.length(), dummy);
728 yCDebug(DGRAMTWOWAYSTREAM, "MCAST Got %zd bytes", result);
729
730 } else
731#endif
732 if (dgram != nullptr) {
733 yCAssert(DGRAMTWOWAYSTREAM, dgram != nullptr);
734#if defined(YARP_HAS_ACE)
736 yCTrace(DGRAMTWOWAYSTREAM, "DGRAM Waiting for something!");
737 result = dgram->recv(readBuffer.get(), readBuffer.length(), dummy);
738#else
739 result = recv(dgram_sockfd, readBuffer.get(), readBuffer.length(), 0);
740#endif
741 yCDebug(DGRAMTWOWAYSTREAM, "DGRAM Got %zd bytes", result);
742 } else {
744 //printf("Monitored input of %d bytes\n", monitor.length());
745 if (monitor.length() > readBuffer.length()) {
746 printf("Too big!\n");
747 std::exit(1);
748 }
749 memcpy(readBuffer.get(), monitor.get(), monitor.length());
750 result = monitor.length();
751 }
752
753 if (closed || (result < 0)) {
754 happy = false;
755 return -1;
756 }
757 readAvail = result;
758
759 // deal with CRC
760 int altPct = 0;
761 bool crcOk = checkCrc(readBuffer.get(), readAvail, CRC_SIZE, pct, &altPct);
762 if (altPct != -1) {
763 pct++;
764 if (!crcOk) {
765 if (bufferAlertNeeded && !bufferAlerted) {
766 yCError(DGRAMTWOWAYSTREAM, "*** Multicast/UDP packet dropped - checksum error ***");
767 yCInfo(DGRAMTWOWAYSTREAM, "The UDP/MCAST system buffer limit on your system is low.");
768 yCInfo(DGRAMTWOWAYSTREAM, "You may get packet loss under heavy conditions.");
769#ifdef __linux__
770 yCInfo(DGRAMTWOWAYSTREAM, "To change the buffer limit on linux: sysctl -w net.core.rmem_max=8388608");
771 yCInfo(DGRAMTWOWAYSTREAM, "(Might be something like: sudo /sbin/sysctl -w net.core.rmem_max=8388608)");
772#else
773 yCInfo(DGRAMTWOWAYSTREAM, "To change the limit use: sysctl for Linux/FreeBSD, ndd for Solaris, no for AIX");
774#endif
775 bufferAlerted = true;
776 } else {
777 errCount++;
778 double now = SystemClock::nowSystem();
779 if (now - lastReportTime > 1) {
780 yCError(DGRAMTWOWAYSTREAM, "*** %d datagram packet(s) dropped - checksum error ***", errCount);
781 lastReportTime = now;
782 errCount = 0;
783 }
784 }
785 reset();
786 return -1;
787 }
788 readAt += CRC_SIZE;
789 readAvail -= CRC_SIZE;
790 done = true;
791 } else {
792 readAvail = 0;
793 }
794 }
795
796 // if stuff is available, take it
797 if (readAvail > 0) {
798 size_t take = readAvail;
799 if (take > b.length()) {
800 take = b.length();
801 }
802 memcpy(b.get(), readBuffer.get() + readAt, take);
803 readAt += take;
804 readAvail -= take;
805 return take;
806 }
807 }
808
809 return 0;
810}
811
813{
814 yCTrace(DGRAMTWOWAYSTREAM, "DGRAM prep writing");
815 yCTrace(DGRAMTWOWAYSTREAM, "DGRAM write %zu bytes", b.length());
816
817 if (reader) {
818 return;
819 }
820 if (writeBuffer.get() == nullptr) {
821 return;
822 }
823
824 Bytes local = b;
825 while (local.length() > 0) {
826 yCTrace(DGRAMTWOWAYSTREAM, "DGRAM prep writing");
827 yarp::conf::ssize_t rem = local.length();
828 yarp::conf::ssize_t space = writeBuffer.length() - writeAvail;
829 bool shouldFlush = false;
830 if (rem >= space) {
831 rem = space;
832 shouldFlush = true;
833 }
834 memcpy(writeBuffer.get() + writeAvail, local.get(), rem);
835 writeAvail += rem;
836 local = Bytes(local.get() + rem, local.length() - rem);
837 if (shouldFlush) {
838 flush();
839 }
840 }
841}
842
843
845{
846 if (writeBuffer.get() == nullptr) {
847 return;
848 }
849
850 // should set CRC
851 if (writeAvail <= CRC_SIZE) {
852 return;
853 }
854 addCrc(writeBuffer.get(), writeAvail, CRC_SIZE, pct);
855 pct++;
856
857 if (writeAvail > 0) {
858 //yCAssert(DGRAMTWOWAYSTREAM, dgram != nullptr);
859 yarp::conf::ssize_t len = 0;
860
861#if defined(YARP_HAS_ACE)
862 if (mgram != nullptr) {
863 len = mgram->send(writeBuffer.get(), writeAvail);
864 yCDebug(DGRAMTWOWAYSTREAM, "MCAST - wrote %zd bytes", len);
865 } else
866#endif
867 if (dgram != nullptr) {
868#if defined(YARP_HAS_ACE)
869 len = dgram->send(writeBuffer.get(), writeAvail, remoteHandle);
870#else
871 len = send(dgram_sockfd, writeBuffer.get(), writeAvail, 0);
872#endif
873 yCDebug(DGRAMTWOWAYSTREAM, "DGRAM - wrote %zd bytes to %s", len, remoteAddress.toString().c_str());
874 } else {
875 Bytes b(writeBuffer.get(), writeAvail);
876 monitor = ManagedBytes(b, false);
877 monitor.copy();
878 //printf("Monitored output of %d bytes\n", monitor.length());
879 len = monitor.length();
881 }
882 if (len > writeBuffer.length() * 0.75) {
883 yCDebug(DGRAMTWOWAYSTREAM, "long dgrams might need a little time");
884
885 // Under heavy loads, packets could get dropped
886 // 640x480x3 images correspond to about 15 datagrams
887 // so there's not much time possible between them
888 // looked at iperf, it just does a busy-waiting delay
889 // there's an implementation below, but commented out -
890 // better solution was to increase recv buffer size
891
892 double first = yarp::os::SystemClock::nowSystem();
893 double now;
894 int ct = 0;
895 do {
896 //printf("Busy wait... %d\n", ct);
899 ct++;
900 } while (now - first < 0.001);
901 }
902
903 if (len < 0) {
904 happy = false;
905 yCDebug(DGRAMTWOWAYSTREAM, "DGRAM failed to send message with error: %s", strerror(errno));
906 return;
907 }
908 writeAvail -= len;
909
910 if (writeAvail != 0) {
911 // well, we have a problem
912 // checksums will cause dumping
913 yCDebug(DGRAMTWOWAYSTREAM, "dgram/mcast send behaving badly");
914 }
915 }
916 // finally: writeAvail should be 0
917
918 // make space for CRC
919 writeAvail = CRC_SIZE;
920}
921
922
924{
925 return happy;
926}
927
928
930{
931 readAt = 0;
932 readAvail = 0;
933 writeAvail = CRC_SIZE;
934 pct = 0;
935}
936
937
939{
940// yCError(DGRAMTWOWAYSTREAM, "Packet begins: %s", (reader ? "reader" : "writer"));
941 pct = 0;
942}
943
945{
946// yCError(DGRAMTWOWAYSTREAM, "Packet ends: %s", (reader ? "reader" : "writer"));
947 if (!reader) {
948 pct = 0;
949 }
950}
951
953{
954 return monitor.bytes();
955}
956
957
959{
960 monitor.clear();
961}
962
963
965{
966 if (dgram == nullptr) {
967 return false;
968 }
969#if defined(YARP_HAS_ACE)
970 return (dgram->set_option(IPPROTO_IP, IP_TOS, (int*)&tos, (int)sizeof(tos)) == 0);
971#else
972 return (setsockopt(dgram_sockfd, IPPROTO_IP, IP_TOS, (int*)&tos, (int)sizeof(tos)) == 0);
973#endif
974}
975
977{
978 int tos = -1;
979 if (dgram == nullptr) {
980 return tos;
981 }
982#if defined(YARP_HAS_ACE)
983 int optlen;
984 dgram->get_option(IPPROTO_IP, IP_TOS, (int*)&tos, &optlen);
985#else
987 getsockopt(dgram_sockfd, IPPROTO_IP, IP_TOS, (int*)&tos, &optlen);
988#endif
989 return tos;
990}
static bool checkCrc(char *buf, yarp::conf::ssize_t length, yarp::conf::ssize_t crcLength, int pct, int *store_altPct=nullptr)
#define CRC_SIZE
#define UDP_MAX_DATAGRAM_SIZE
static void addCrc(char *buf, yarp::conf::ssize_t length, yarp::conf::ssize_t crcLength, int pct)
A mini-server for performing network communication in the background.
void close() override
Stop port activity.
bool open(const std::string &name) override
Start port operation, with a specific name, with automatically-chosen network parameters.
void write(bool forceStrict=false)
Write the current object being returned by BufferedPort::prepare.
A simple abstraction for a block of bytes.
Definition Bytes.h:24
size_t length() const
Definition Bytes.cpp:22
const char * get() const
Definition Bytes.cpp:27
Represents how to reach a part of a YARP network.
Definition Contact.h:33
bool isValid() const
Checks if a Contact is tagged as valid.
Definition Contact.cpp:298
std::string toString() const
Get a textual representation of the Contact.
Definition Contact.cpp:303
std::string toURI(bool includeCarrier=true) const
Get a representation of the Contact as a URI.
Definition Contact.cpp:313
int getPort() const
Get the port number associated with this Contact for socket communication.
Definition Contact.cpp:239
std::string getHost() const
Get the host name associated with this Contact for socket communication.
Definition Contact.cpp:228
virtual int read()
Read and return a single byte.
An abstraction for a block of bytes, with optional responsibility for allocating/destroying that bloc...
void clear()
Disassociate object with any data block (deleting block if appropriate).
const Bytes & bytes() const
void allocate(size_t len)
Makes a data block of the specified length that will be deleted if this object is destroyed.
void copy()
Makes sure data block is owned, making a copy if necessary.
const char * get() const
static int netInt(const yarp::os::Bytes &code)
Definition NetType.cpp:27
static unsigned long int getCrc(char *buf, size_t len)
Definition NetType.cpp:100
static double nowSystem()
static void delaySystem(double seconds)
A stream abstraction for datagram communication.
void write(const yarp::os::Bytes &b) override
Write a block of bytes to the stream.
void interrupt() override
Interrupt the stream.
virtual bool open(const Contact &remote)
void close() override
Terminate the stream.
void endPacket() override
Mark the end of a logical packet (see beginPacket).
virtual bool openMcast(const Contact &group, const Contact &ipLocal)
void reset() override
Reset the stream.
virtual bool join(const Contact &group, bool sender, const Contact &ipLocal)
void beginPacket() override
Mark the beginning of a logical packet.
void flush() override
Make sure all pending write operations are finished.
bool isOk() const override
Check if the stream is ok or in an error state.
#define yCInfo(component,...)
#define yCError(component,...)
#define yCAssert(component, x)
#define yCTrace(component,...)
#define yCWarning(component,...)
#define yCDebug(component,...)
#define YARP_OS_LOG_COMPONENT(name, name_string)
std::string get_string(const std::string &key, bool *found=nullptr)
Read a string from an environment variable.
Definition environment.h:66
::ssize_t ssize_t
Definition numeric.h:86
The components from which ports and connections are built.
An interface to the operating system, including Port based communication.
std::int32_t NetInt32
Definition of the NetInt32 type.
Definition NetInt32.h:29