16#if defined(YARP_HAS_ACE)
18# include <ace/Handle_Set.h>
19# include <ace/INET_Addr.h>
20# include <ace/Log_Msg.h>
21# include <ace/OS_Memory.h>
22# include <ace/OS_NS_sys_select.h>
23# include <ace/SOCK_Dgram.h>
24# include <ace/SOCK_Dgram_Mcast.h>
25# include <ace/os_include/net/os_if.h>
31# include <arpa/inet.h>
32# include <netinet/in.h>
33# include <sys/socket.h>
34# include <sys/types.h>
45#define UDP_MAX_DATAGRAM_SIZE (65507 - CRC_SIZE)
91#if defined(YARP_HAS_ACE)
98 return open(local, remote);
103 localAddress = local;
104 remoteAddress = remote;
106#if defined(YARP_HAS_ACE)
109 remoteHandle.set(remoteAddress.
getPort(), remoteAddress.
getHost().c_str());
114 int result = dgram->
open(localHandle,
131 if (local.isValid()) {
164 configureSystemBuffers();
166#if defined(YARP_HAS_ACE)
167 dgram->get_local_addr(localHandle);
169 localAddress =
Contact(
"127.0.0.1", localHandle.get_port_number());
200 int sz = yarp::conf::numeric::from_string<int>(
_env_dgram);
221#if defined(YARP_HAS_ACE)
223 if (dgram !=
nullptr) {
227 yCError(
DGRAMTWOWAYSTREAM,
"Failed to read buffer size from RCVBUF socket with error: %s. Setting read buffer size to UDP_MAX_DATAGRAM_SIZE.",
237 yCError(
DGRAMTWOWAYSTREAM,
"Failed to read buffer size from RCVBUF socket with error: %s. Setting read buffer size to UDP_MAX_DATAGRAM_SIZE.",
254void DgramTwoWayStream::configureSystemBuffers()
291#if defined(YARP_HAS_ACE)
303#if defined(__linux__)
307 bufferAlertNeeded =
true;
308 bufferAlerted =
false;
316#if defined(YARP_HAS_ACE)
327#if defined(__linux__)
331 bufferAlertNeeded =
true;
332 bufferAlerted =
false;
341#if defined(YARP_HAS_ACE)
396#if defined(YARP_HAS_ACE)
400 ACE_SOCK_Dgram_Mcast::options
mcastOptions = ACE_SOCK_Dgram_Mcast::DEFOPTS;
401# if defined(__APPLE__)
402 mcastOptions =
static_cast<ACE_SOCK_Dgram_Mcast::options
>(ACE_SOCK_Dgram_Mcast::OPT_BINDADDR_NO | ACE_SOCK_Dgram_Mcast::DEFOPT_NULLIFACE);
461 configureSystemBuffers();
462 remoteAddress = group;
465 localHandle.set(localAddress.
getPort(), localAddress.
getHost().c_str());
466 remoteHandle.set(remoteAddress.
getPort(), remoteAddress.
getHost().c_str());
469 remoteAddress = group;
472 remoteHandle = remoteAddress.
getPort();
497#if defined(YARP_HAS_ACE)
498 ACE_SOCK_Dgram_Mcast::options
mcastOptions = ACE_SOCK_Dgram_Mcast::DEFOPTS;
499# if defined(__APPLE__)
500 mcastOptions =
static_cast<ACE_SOCK_Dgram_Mcast::options
>(ACE_SOCK_Dgram_Mcast::OPT_BINDADDR_NO | ACE_SOCK_Dgram_Mcast::DEFOPT_NULLIFACE);
551# if defined(__APPLE__)
582 configureSystemBuffers();
584 localAddress = group;
585 remoteAddress = group;
587 localHandle.set(localAddress.
getPort(), localAddress.
getHost().c_str());
588 remoteHandle.set(remoteAddress.
getPort(), remoteAddress.
getHost().c_str());
590 localHandle = localAddress.
getPort();
591 remoteHandle = remoteAddress.
getPort();
606 if ((!closed) && (!interrupting) && happy) {
616 while (happy && ct > 0) {
619 if (mgram !=
nullptr) {
621 tmp.join(localAddress,
true, restrictInterfaceIp);
628 (interrupting ?
"true" :
"false"),
629 (closed ?
"true" :
"false"),
630 (happy ?
"true" :
"false"));
632 for (
size_t i = 0;
i <
empty.length();
i++) {
649 interrupting =
false;
654 while (interrupting) {
664 if (dgram !=
nullptr) {
672 while (interrupting) {
677 if (dgram !=
nullptr) {
678#if defined(YARP_HAS_ACE)
682 if (dgram_sockfd >= 0) {
709 if (readAvail == 0) {
716#if defined(YARP_HAS_ACE)
717 if ((dgram !=
nullptr) && restrictInterfaceIp.
isValid()) {
725 restrictInterfaceIp.
getHost().c_str());
727 result = dgram->recv(readBuffer.
get(), readBuffer.
length(), dummy);
732 if (dgram !=
nullptr) {
734#if defined(YARP_HAS_ACE)
737 result = dgram->recv(readBuffer.
get(), readBuffer.
length(), dummy);
739 result = recv(dgram_sockfd, readBuffer.
get(), readBuffer.
length(), 0);
750 result = monitor.
length();
753 if (closed || (result < 0)) {
765 if (bufferAlertNeeded && !bufferAlerted) {
775 bufferAlerted =
true;
779 if (now - lastReportTime > 1) {
781 lastReportTime = now;
798 size_t take = readAvail;
820 if (writeBuffer.
get() ==
nullptr) {
825 while (local.length() > 0) {
834 memcpy(writeBuffer.
get() + writeAvail, local.get(),
rem);
836 local =
Bytes(local.get() +
rem, local.length() -
rem);
846 if (writeBuffer.
get() ==
nullptr) {
857 if (writeAvail > 0) {
861#if defined(YARP_HAS_ACE)
862 if (mgram !=
nullptr) {
863 len = mgram->send(writeBuffer.
get(), writeAvail);
867 if (dgram !=
nullptr) {
868#if defined(YARP_HAS_ACE)
869 len = dgram->send(writeBuffer.
get(), writeAvail, remoteHandle);
871 len = send(dgram_sockfd, writeBuffer.
get(), writeAvail, 0);
875 Bytes b(writeBuffer.
get(), writeAvail);
882 if (len > writeBuffer.
length() * 0.75) {
900 }
while (now - first < 0.001);
910 if (writeAvail != 0) {
954 return monitor.
bytes();
966 if (dgram ==
nullptr) {
969#if defined(YARP_HAS_ACE)
979 if (dgram ==
nullptr) {
982#if defined(YARP_HAS_ACE)
static bool checkCrc(char *buf, yarp::conf::ssize_t length, yarp::conf::ssize_t crcLength, int pct, int *store_altPct=nullptr)
#define UDP_MAX_DATAGRAM_SIZE
static void addCrc(char *buf, yarp::conf::ssize_t length, yarp::conf::ssize_t crcLength, int pct)
A mini-server for performing network communication in the background.
void close() override
Stop port activity.
bool open(const std::string &name) override
Start port operation, with a specific name, with automatically-chosen network parameters.
void write(bool forceStrict=false)
Write the current object being returned by BufferedPort::prepare.
A simple abstraction for a block of bytes.
An abstraction for a block of bytes, with optional responsibility for allocating/destroying that bloc...
void clear()
Disassociate object with any data block (deleting block if appropriate).
const Bytes & bytes() const
void allocate(size_t len)
Makes a data block of the specified length that will be deleted if this object is destroyed.
void copy()
Makes sure data block is owned, making a copy if necessary.
static int netInt(const yarp::os::Bytes &code)
static unsigned long int getCrc(char *buf, size_t len)
static double nowSystem()
static void delaySystem(double seconds)
A stream abstraction for datagram communication.
bool setTypeOfService(int tos) override
void write(const yarp::os::Bytes &b) override
Write a block of bytes to the stream.
virtual void onMonitorInput()
void interrupt() override
Interrupt the stream.
virtual bool open(const Contact &remote)
void close() override
Terminate the stream.
void endPacket() override
Mark the end of a logical packet (see beginPacket).
int getTypeOfService() override
virtual void onMonitorOutput()
yarp::os::Bytes getMonitor()
virtual bool openMcast(const Contact &group, const Contact &ipLocal)
void reset() override
Reset the stream.
virtual bool join(const Contact &group, bool sender, const Contact &ipLocal)
virtual ~DgramTwoWayStream()
void beginPacket() override
Mark the beginning of a logical packet.
void flush() override
Make sure all pending write operations are finished.
bool isOk() const override
Check if the stream is ok or in an error state.
#define yCInfo(component,...)
#define yCError(component,...)
#define yCAssert(component, x)
#define yCTrace(component,...)
#define yCWarning(component,...)
#define yCDebug(component,...)
#define YARP_OS_LOG_COMPONENT(name, name_string)
std::string get_string(const std::string &key, bool *found=nullptr)
Read a string from an environment variable.
The components from which ports and connections are built.
An interface to the operating system, including Port based communication.
std::int32_t NetInt32
Definition of the NetInt32 type.