19 #if defined(YARP_HAS_ACE)
21 # include <ace/Handle_Set.h>
22 # include <ace/INET_Addr.h>
23 # include <ace/Log_Msg.h>
24 # include <ace/OS_Memory.h>
25 # include <ace/OS_NS_sys_select.h>
26 # include <ace/SOCK_Dgram.h>
27 # include <ace/SOCK_Dgram_Mcast.h>
28 # include <ace/os_include/net/os_if.h>
34 # include <arpa/inet.h>
35 # include <netinet/in.h>
36 # include <sys/socket.h>
37 # include <sys/types.h>
48 #define UDP_MAX_DATAGRAM_SIZE (65507 - CRC_SIZE)
59 (length > crcLength) ? (length - crcLength) : 0);
64 bool ok = (alt == curr && pct == altPct);
67 yCDebug(DGRAMTWOWAYSTREAM,
"crc mismatch");
70 yCDebug(DGRAMTWOWAYSTREAM,
"packet code broken");
73 if (store_altPct !=
nullptr) {
74 *store_altPct = altPct;
84 (length > crcLength) ? (length - crcLength) : 0);
94 #if defined(YARP_HAS_ACE)
95 ACE_INET_Addr anywhere((u_short)0, (ACE_UINT32)INADDR_ANY);
96 Contact local(anywhere.get_host_addr(),
97 anywhere.get_port_number());
101 return open(local, remote);
106 localAddress = local;
107 remoteAddress = remote;
109 #if defined(YARP_HAS_ACE)
110 localHandle = ACE_INET_Addr((u_short)(localAddress.getPort()), (ACE_UINT32)INADDR_ANY);
112 remoteHandle.set(remoteAddress.getPort(), remoteAddress.getHost().c_str());
114 dgram =
new ACE_SOCK_Dgram;
115 yCAssert(DGRAMTWOWAYSTREAM, dgram !=
nullptr);
117 int result = dgram->open(localHandle,
118 ACE_PROTOCOL_FAMILY_INET,
126 if ((s = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP)) == -1) {
129 struct sockaddr_in dgram_sin;
130 memset((
char*)&dgram_sin, 0,
sizeof(dgram_sin));
131 dgram_sin.sin_family = AF_INET;
132 dgram_sin.sin_addr.s_addr = htonl(INADDR_ANY);
133 dgram_sin.sin_port = htons(remote.
getPort());
134 if (local.isValid()) {
135 if (inet_pton(AF_INET, remote.
getHost().c_str(), &dgram_sin.sin_addr) == 0) {
136 yCError(DGRAMTWOWAYSTREAM,
"could not set up udp client");
139 if (connect(s, (
struct sockaddr*)&dgram_sin,
sizeof(dgram_sin)) == -1) {
140 yCError(DGRAMTWOWAYSTREAM,
"could not connect udp client");
144 if (bind(s, (
struct sockaddr*)&dgram_sin,
sizeof(dgram_sin)) == -1) {
145 yCError(DGRAMTWOWAYSTREAM,
"could not create udp server");
154 struct sockaddr_in sin;
155 socklen_t len =
sizeof(sin);
156 if (getsockname(dgram_sockfd, (
struct sockaddr*)&sin, &len) == 0 && sin.sin_family == AF_INET) {
158 local_port = ntohs(sin.sin_port);
163 yCError(DGRAMTWOWAYSTREAM,
"could not open datagram socket");
167 configureSystemBuffers();
169 #if defined(YARP_HAS_ACE)
170 dgram->get_local_addr(localHandle);
171 yCDebug(DGRAMTWOWAYSTREAM,
"starting DGRAM entity on port number %u",localHandle.get_port_number());
172 localAddress =
Contact(
"127.0.0.1", localHandle.get_port_number());
174 localAddress =
Contact(
"127.0.0.1", local_port);
177 yCDebug(DGRAMTWOWAYSTREAM,
"Update: DGRAM from %s to %s", localAddress.toURI().c_str(), remoteAddress.toURI().c_str());
184 void DgramTwoWayStream::allocate(
int readSize,
int writeSize)
190 int _write_size = -1;
193 std::string _env_mode;
199 if (!_env_mode.empty()) {
200 _env_dgram = _env_mode;
202 if (!_env_dgram.empty()) {
205 _read_size = _write_size = sz;
207 yCInfo(DGRAMTWOWAYSTREAM,
"Datagram packet size set to %d", _read_size);
210 _read_size = readSize;
211 yCInfo(DGRAMTWOWAYSTREAM,
"Datagram read size reset to %d", _read_size);
213 if (writeSize != 0) {
214 _write_size = writeSize;
215 yCInfo(DGRAMTWOWAYSTREAM,
"Datagram write size reset to %d", _write_size);
223 if (_read_size < 0) {
224 #if defined(YARP_HAS_ACE)
226 if (dgram !=
nullptr) {
227 int len =
sizeof(_read_size);
228 int result = dgram->get_option(SOL_SOCKET, SO_RCVBUF, &_read_size, &len);
230 yCError(DGRAMTWOWAYSTREAM,
"Failed to read buffer size from RCVBUF socket with error: %s. Setting read buffer size to UDP_MAX_DATAGRAM_SIZE.",
236 socklen_t len =
sizeof(_read_size);
238 int result = getsockopt(dgram_sockfd, SOL_SOCKET, SO_RCVBUF, &_read_size, &len);
240 yCError(DGRAMTWOWAYSTREAM,
"Failed to read buffer size from RCVBUF socket with error: %s. Setting read buffer size to UDP_MAX_DATAGRAM_SIZE.",
247 readBuffer.allocate(_read_size);
248 writeBuffer.allocate(_write_size);
257 void DgramTwoWayStream::configureSystemBuffers()
268 int readBufferSize = -1;
269 if (!socketReadBufferSize.empty()) {
271 }
else if (!socketBufferSize.empty()) {
275 int writeBufferSize = -1;
276 if (!socketSendBufferSize.empty()) {
278 }
else if (!socketBufferSize.empty()) {
285 yCWarning(DGRAMTWOWAYSTREAM,
"The desired SND buffer size is too big. It is set to the max datagram size : %d",
291 if (readBufferSize > 0) {
292 int actualReadSize = -1;
294 #if defined(YARP_HAS_ACE)
295 int intSize =
sizeof(readBufferSize);
296 int setResult = dgram->set_option(SOL_SOCKET, SO_RCVBUF, (
void*)&readBufferSize, intSize);
298 int getResult = dgram->get_option(SOL_SOCKET, SO_RCVBUF, (
void*)&actualReadSize, &intSize);
300 socklen_t intSize =
sizeof(readBufferSize);
301 int setResult = setsockopt(dgram_sockfd, SOL_SOCKET, SO_RCVBUF, (
void*)&readBufferSize, intSize);
302 int getResult = getsockopt(dgram_sockfd, SOL_SOCKET, SO_RCVBUF, (
void*)&actualReadSize, &intSize);
306 #if defined(__linux__)
309 if (setResult < 0 || getResult < 0 || readBufferSize != actualReadSize) {
310 bufferAlertNeeded =
true;
311 bufferAlerted =
false;
312 yCWarning(DGRAMTWOWAYSTREAM,
"Failed to set RECV socket buffer to desired size. Actual: %d, Desired %d",
317 if (writeBufferSize > 0) {
318 int actualWriteSize = -1;
319 #if defined(YARP_HAS_ACE)
320 int intSize =
sizeof(writeBufferSize);
321 int setResult = dgram->set_option(SOL_SOCKET, SO_SNDBUF, (
void*)&writeBufferSize, intSize);
322 int getResult = dgram->get_option(SOL_SOCKET, SO_SNDBUF, (
void*)&actualWriteSize, &intSize);
324 socklen_t intSize =
sizeof(writeBufferSize);
325 int setResult = setsockopt(dgram_sockfd, SOL_SOCKET, SO_SNDBUF, (
void*)&writeBufferSize, intSize);
326 int getResult = getsockopt(dgram_sockfd, SOL_SOCKET, SO_SNDBUF, (
void*)&actualWriteSize, &intSize);
330 #if defined(__linux__)
331 actualWriteSize /= 2;
333 if (setResult < 0 || getResult < 0 || writeBufferSize != actualWriteSize) {
334 bufferAlertNeeded =
true;
335 bufferAlerted =
false;
336 yCWarning(DGRAMTWOWAYSTREAM,
"Failed to set SND socket buffer to desired size. Actual: %d, Desired: %d",
344 #if defined(YARP_HAS_ACE)
345 int DgramTwoWayStream::restrictMcast(ACE_SOCK_Dgram_Mcast* dmcast,
350 restrictInterfaceIp = ipLocal;
352 yCInfo(DGRAMTWOWAYSTREAM,
"multicast connection %s on network interface for %s", group.
getHost().c_str(), ipLocal.
getHost().c_str());
361 ip_mreq multicast_address;
362 ACE_INET_Addr group_addr(group.
getPort(),
364 ACE_INET_Addr interface_addr(ipLocal.
getPort(),
366 multicast_address.imr_interface.s_addr = htonl(interface_addr.get_ip_address());
367 multicast_address.imr_multiaddr.s_addr = htonl(group_addr.get_ip_address());
370 yCDebug(DGRAMTWOWAYSTREAM,
"Trying to correct mcast membership...");
371 result = ((ACE_SOCK*)dmcast)->set_option(IPPROTO_IP, IP_ADD_MEMBERSHIP, &multicast_address,
sizeof(
struct ip_mreq));
373 yCDebug(DGRAMTWOWAYSTREAM,
"Trying to correct mcast output...");
374 result = ((ACE_SOCK*)dmcast)->set_option(IPPROTO_IP, IP_MULTICAST_IF, &multicast_address.imr_interface.s_addr,
sizeof(
struct in_addr));
378 yCDebug(DGRAMTWOWAYSTREAM,
"mcast result: %s", strerror(num));
397 localAddress = ipLocal;
399 #if defined(YARP_HAS_ACE)
400 localHandle = ACE_INET_Addr((u_short)(localAddress.getPort()),
401 (ACE_UINT32)INADDR_ANY);
403 ACE_SOCK_Dgram_Mcast::options mcastOptions = ACE_SOCK_Dgram_Mcast::DEFOPTS;
404 # if defined(__APPLE__)
405 mcastOptions =
static_cast<ACE_SOCK_Dgram_Mcast::options
>(ACE_SOCK_Dgram_Mcast::OPT_BINDADDR_NO | ACE_SOCK_Dgram_Mcast::DEFOPT_NULLIFACE);
408 auto* dmcast =
new ACE_SOCK_Dgram_Mcast(mcastOptions);
411 yCAssert(DGRAMTWOWAYSTREAM, dgram !=
nullptr);
415 result = dmcast->open(addr,
nullptr, 1);
417 result = restrictMcast(dmcast, group, ipLocal,
false);
421 yCError(DGRAMTWOWAYSTREAM,
"could not open multicast datagram socket");
430 struct sockaddr_in dgram_sin;
432 if ((s = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP)) == -1) {
433 yCError(DGRAMTWOWAYSTREAM,
"could not create sender socket");
437 memset((
char*)&dgram_sin, 0,
sizeof(dgram_sin));
438 dgram_sin.sin_family = AF_INET;
439 dgram_sin.sin_port = htons(group.
getPort());
442 if (inet_pton(AF_INET, group.
getHost().c_str(), &dgram_sin.sin_addr) == 0) {
443 yCError(DGRAMTWOWAYSTREAM,
"could not set up mcast client");
446 if (connect(s, (
struct sockaddr*)&dgram_sin,
sizeof(dgram_sin)) == -1) {
447 yCError(DGRAMTWOWAYSTREAM,
"could not connect mcast client");
456 struct sockaddr_in sin;
457 socklen_t len =
sizeof(sin);
458 if (getsockname(dgram_sockfd, (
struct sockaddr*)&sin, &len) == 0 && sin.sin_family == AF_INET) {
459 local_port = ntohs(sin.sin_port);
464 configureSystemBuffers();
465 remoteAddress = group;
468 localHandle.set(localAddress.getPort(), localAddress.getHost().c_str());
469 remoteHandle.set(remoteAddress.getPort(), remoteAddress.getHost().c_str());
472 remoteAddress = group;
473 localAddress =
Contact(
"127.0.0.1", local_port);
474 localHandle = local_port;
475 remoteHandle = remoteAddress.getPort();
479 yCDebug(DGRAMTWOWAYSTREAM,
"Update: DGRAM from %s to %s", localAddress.toURI().c_str(), remoteAddress.toURI().c_str());
488 yCDebug(DGRAMTWOWAYSTREAM,
"subscribing to mcast address %s for %s", group.
toURI().c_str(), (sender ?
"writing" :
"reading"));
494 return openMcast(group, ipLocal);
500 #if defined(YARP_HAS_ACE)
501 ACE_SOCK_Dgram_Mcast::options mcastOptions = ACE_SOCK_Dgram_Mcast::DEFOPTS;
502 # if defined(__APPLE__)
503 mcastOptions =
static_cast<ACE_SOCK_Dgram_Mcast::options
>(ACE_SOCK_Dgram_Mcast::OPT_BINDADDR_NO | ACE_SOCK_Dgram_Mcast::DEFOPT_NULLIFACE);
506 auto* dmcast =
new ACE_SOCK_Dgram_Mcast(mcastOptions);
510 yCAssert(DGRAMTWOWAYSTREAM, dgram !=
nullptr);
516 result = dmcast->join(addr, 1);
519 result = restrictMcast(dmcast, group, ipLocal,
true);
522 result = dmcast->join(addr, 1);
526 yCError(DGRAMTWOWAYSTREAM,
"cannot connect to multi-cast address");
533 if ((s = socket(AF_INET, SOCK_DGRAM, 0)) < 0) {
534 yCError(DGRAMTWOWAYSTREAM,
"could not create receiver socket");
538 struct sockaddr_in addr;
542 memset(&addr, 0,
sizeof(addr));
543 addr.sin_family = AF_INET;
544 addr.sin_addr.s_addr = htonl(INADDR_ANY);
545 addr.sin_port = htons(group.
getPort());
548 if (setsockopt(s, SOL_SOCKET, SO_REUSEADDR, &yes,
sizeof(u_int)) < 0) {
549 yCError(DGRAMTWOWAYSTREAM,
"could not allow sockets use the same ADDRESS");
554 # if defined(__APPLE__)
555 if (setsockopt(s, SOL_SOCKET, SO_REUSEPORT, &yes,
sizeof(u_int)) < 0) {
556 yCError(DGRAMTWOWAYSTREAM,
"could not allow sockets use the same PORT number");
563 if (bind(s, (
struct sockaddr*)&addr,
sizeof(addr)) == -1) {
564 yCError(DGRAMTWOWAYSTREAM,
"could not create mcast server");
570 if (inet_pton(AF_INET, group.
getHost().c_str(), &mreq.imr_multiaddr) == 0) {
571 yCError(DGRAMTWOWAYSTREAM,
"Could not set up the mcast server");
574 mreq.imr_interface.s_addr = htonl(INADDR_ANY);
575 if (setsockopt(s, IPPROTO_IP, IP_ADD_MEMBERSHIP, &mreq,
sizeof(mreq)) < 0) {
576 yCError(DGRAMTWOWAYSTREAM,
"could not join the multicast group");
577 yCError(DGRAMTWOWAYSTREAM,
"sendto: %d, %s", errno, strerror(errno));
585 configureSystemBuffers();
587 localAddress = group;
588 remoteAddress = group;
590 localHandle.set(localAddress.getPort(), localAddress.getHost().c_str());
591 remoteHandle.set(remoteAddress.getPort(), remoteAddress.getHost().c_str());
593 localHandle = localAddress.
getPort();
594 remoteHandle = remoteAddress.getPort();
609 if ((!closed) && (!interrupting) && happy) {
619 while (happy && ct > 0) {
622 if (mgram !=
nullptr) {
623 yCDebug(DGRAMTWOWAYSTREAM,
"* mcast interrupt, interface %s", restrictInterfaceIp.toString().c_str());
624 tmp.
join(localAddress,
true, restrictInterfaceIp);
626 yCDebug(DGRAMTWOWAYSTREAM,
"* dgram interrupt");
630 yCDebug(DGRAMTWOWAYSTREAM,
"* interrupt state %s %s %s",
631 (interrupting ?
"true" :
"false"),
632 (closed ?
"true" :
"false"),
633 (happy ?
"true" :
"false"));
635 for (
size_t i = 0; i < empty.
length(); i++) {
649 yCDebug(DGRAMTWOWAYSTREAM,
"dgram interrupt done");
652 interrupting =
false;
657 while (interrupting) {
658 yCDebug(DGRAMTWOWAYSTREAM,
"waiting for dgram interrupt to be finished...");
667 if (dgram !=
nullptr) {
675 while (interrupting) {
680 if (dgram !=
nullptr) {
681 #if defined(YARP_HAS_ACE)
685 if (dgram_sockfd >= 0) {
686 ::close(dgram_sockfd);
712 if (readAvail == 0) {
717 yCTrace(DGRAMTWOWAYSTREAM,
"DGRAM Waiting for something!");
719 #if defined(YARP_HAS_ACE)
720 if ((dgram !=
nullptr) && restrictInterfaceIp.isValid()) {
721 yCTrace(DGRAMTWOWAYSTREAM,
"Consider remote mcast");
722 yCTrace(DGRAMTWOWAYSTREAM,
"What we know:");
723 yCTrace(DGRAMTWOWAYSTREAM,
" %s", restrictInterfaceIp.toString().c_str());
724 yCTrace(DGRAMTWOWAYSTREAM,
" %s", localAddress.toString().c_str());
725 yCTrace(DGRAMTWOWAYSTREAM,
" %s", remoteAddress.toString().c_str());
727 ACE_INET_Addr iface(restrictInterfaceIp.getPort(),
728 restrictInterfaceIp.getHost().c_str());
729 ACE_INET_Addr dummy((u_short)0, (ACE_UINT32)INADDR_ANY);
730 result = dgram->recv(readBuffer.get(), readBuffer.length(), dummy);
731 yCDebug(DGRAMTWOWAYSTREAM,
"MCAST Got %zd bytes", result);
735 if (dgram !=
nullptr) {
736 yCAssert(DGRAMTWOWAYSTREAM, dgram !=
nullptr);
737 #if defined(YARP_HAS_ACE)
738 ACE_INET_Addr dummy((u_short)0, (ACE_UINT32)INADDR_ANY);
739 yCTrace(DGRAMTWOWAYSTREAM,
"DGRAM Waiting for something!");
740 result = dgram->recv(readBuffer.get(), readBuffer.length(), dummy);
742 result = recv(dgram_sockfd, readBuffer.get(), readBuffer.length(), 0);
744 yCDebug(DGRAMTWOWAYSTREAM,
"DGRAM Got %zd bytes", result);
748 if (monitor.length() > readBuffer.length()) {
749 printf(
"Too big!\n");
752 memcpy(readBuffer.get(), monitor.get(), monitor.length());
753 result = monitor.length();
756 if (closed || (result < 0)) {
768 if (bufferAlertNeeded && !bufferAlerted) {
769 yCError(DGRAMTWOWAYSTREAM,
"*** Multicast/UDP packet dropped - checksum error ***");
770 yCInfo(DGRAMTWOWAYSTREAM,
"The UDP/MCAST system buffer limit on your system is low.");
771 yCInfo(DGRAMTWOWAYSTREAM,
"You may get packet loss under heavy conditions.");
773 yCInfo(DGRAMTWOWAYSTREAM,
"To change the buffer limit on linux: sysctl -w net.core.rmem_max=8388608");
774 yCInfo(DGRAMTWOWAYSTREAM,
"(Might be something like: sudo /sbin/sysctl -w net.core.rmem_max=8388608)");
776 yCInfo(DGRAMTWOWAYSTREAM,
"To change the limit use: sysctl for Linux/FreeBSD, ndd for Solaris, no for AIX");
778 bufferAlerted =
true;
782 if (
now - lastReportTime > 1) {
783 yCError(DGRAMTWOWAYSTREAM,
"*** %d datagram packet(s) dropped - checksum error ***", errCount);
784 lastReportTime =
now;
801 size_t take = readAvail;
805 memcpy(b.
get(), readBuffer.get() + readAt, take);
817 yCTrace(DGRAMTWOWAYSTREAM,
"DGRAM prep writing");
818 yCTrace(DGRAMTWOWAYSTREAM,
"DGRAM write %zu bytes", b.
length());
823 if (writeBuffer.get() ==
nullptr) {
828 while (local.length() > 0) {
829 yCTrace(DGRAMTWOWAYSTREAM,
"DGRAM prep writing");
832 bool shouldFlush =
false;
837 memcpy(writeBuffer.get() + writeAvail, local.get(), rem);
839 local =
Bytes(local.get() + rem, local.length() - rem);
849 if (writeBuffer.get() ==
nullptr) {
860 if (writeAvail > 0) {
864 #if defined(YARP_HAS_ACE)
865 if (mgram !=
nullptr) {
866 len = mgram->send(writeBuffer.get(), writeAvail);
867 yCDebug(DGRAMTWOWAYSTREAM,
"MCAST - wrote %zd bytes", len);
870 if (dgram !=
nullptr) {
871 #if defined(YARP_HAS_ACE)
872 len = dgram->send(writeBuffer.get(), writeAvail, remoteHandle);
874 len = send(dgram_sockfd, writeBuffer.get(), writeAvail, 0);
876 yCDebug(DGRAMTWOWAYSTREAM,
"DGRAM - wrote %zd bytes to %s", len, remoteAddress.toString().c_str());
878 Bytes b(writeBuffer.get(), writeAvail);
882 len = monitor.length();
885 if (len > writeBuffer.length() * 0.75) {
886 yCDebug(DGRAMTWOWAYSTREAM,
"long dgrams might need a little time");
903 }
while (
now - first < 0.001);
908 yCDebug(DGRAMTWOWAYSTREAM,
"DGRAM failed to send message with error: %s", strerror(errno));
913 if (writeAvail != 0) {
916 yCDebug(DGRAMTWOWAYSTREAM,
"dgram/mcast send behaving badly");
957 return monitor.bytes();
969 if (dgram ==
nullptr) {
972 #if defined(YARP_HAS_ACE)
973 return (dgram->set_option(IPPROTO_IP, IP_TOS, (
int*)&tos, (
int)
sizeof(tos)) == 0);
975 return (setsockopt(dgram_sockfd, IPPROTO_IP, IP_TOS, (
int*)&tos, (
int)
sizeof(tos)) == 0);
982 if (dgram ==
nullptr) {
985 #if defined(YARP_HAS_ACE)
987 dgram->get_option(IPPROTO_IP, IP_TOS, (
int*)&tos, &optlen);
990 getsockopt(dgram_sockfd, IPPROTO_IP, IP_TOS, (
int*)&tos, &optlen);
static bool checkCrc(char *buf, yarp::conf::ssize_t length, yarp::conf::ssize_t crcLength, int pct, int *store_altPct=nullptr)
#define UDP_MAX_DATAGRAM_SIZE
static void addCrc(char *buf, yarp::conf::ssize_t length, yarp::conf::ssize_t crcLength, int pct)
A simple abstraction for a block of bytes.
An abstraction for a block of bytes, with optional responsibility for allocating/destroying that bloc...
const Bytes & bytes() const
static int netInt(const yarp::os::Bytes &code)
static unsigned long int getCrc(char *buf, size_t len)
static int toInt(const std::string &x)
static double nowSystem()
static void delaySystem(double seconds)
A stream abstraction for datagram communication.
bool setTypeOfService(int tos) override
void write(const yarp::os::Bytes &b) override
Write a block of bytes to the stream.
void interrupt() override
Interrupt the stream.
virtual bool open(const Contact &remote)
void close() override
Terminate the stream.
void endPacket() override
Mark the end of a logical packet (see beginPacket).
int getTypeOfService() override
yarp::os::Bytes getMonitor()
virtual bool openMcast(const Contact &group, const Contact &ipLocal)
void reset() override
Reset the stream.
virtual bool join(const Contact &group, bool sender, const Contact &ipLocal)
virtual ~DgramTwoWayStream()
void beginPacket() override
Mark the beginning of a logical packet.
void flush() override
Make sure all pending write operations are finished.
bool isOk() const override
Check if the stream is ok or in an error state.
#define yCInfo(component,...)
#define yCError(component,...)
#define yCAssert(component, x)
#define yCTrace(component,...)
#define yCWarning(component,...)
#define yCDebug(component,...)
#define YARP_OS_LOG_COMPONENT(name, name_string)
std::string getEnvironment(const char *key, bool *found=nullptr)
Read a variable from the environment.
double now()
Return the current time in seconds, relative to an arbitrary starting point.
The components from which ports and connections are built.
An interface to the operating system, including Port based communication.
std::int32_t NetInt32
Definition of the NetInt32 type.