From 563c8f8eac12ad6fa623db4a735f3e175df22545 Mon Sep 17 00:00:00 2001 From: Garrett D'Amore Date: Sat, 20 Jul 2024 15:59:04 -0700 Subject: [PATCH] fixes #168 UDP transport This is the initial implementation of UDP transport. It does in order guarantees (and consequently filters duplicates), but it does not guarantee delivery. The protocol limits payloads to 65000 bytes (minus headers for SP), but you really want to keep it to much less -- probably best for short messages that within a single MTU to avoid IP fragmentation and reassembly. This is unicast only for now (although there are plans for some support for multicast and broadcast as well as being able to perform automatic mesh building, but that will be in following work. Additional tunables are coming. This is only lightly tested at this point, and should be considered experimental. Its also undocumented. --- cmake/NNGOptions.cmake | 3 + src/core/aio.c | 13 +- src/core/idhash.c | 6 + src/core/idhash.h | 19 +- src/core/message.c | 23 + src/core/message.h | 14 +- src/core/platform.h | 2 +- src/core/url.c | 29 +- src/core/url.h | 1 + src/platform/posix/posix_resolv_gai.c | 2 +- src/platform/posix/posix_udp.c | 2 +- src/sp/transport.c | 6 + src/sp/transport/CMakeLists.txt | 4 +- src/sp/transport/tcp/tcp.c | 15 +- src/sp/transport/tls/tls.c | 13 +- src/sp/transport/udp/CMakeLists.txt | 15 + src/sp/transport/udp/udp.c | 1835 +++++++++++++++++++++++++ src/sp/transport/udp/udp_tran_test.c | 171 +++ src/testing/marry.c | 6 +- 19 files changed, 2134 insertions(+), 45 deletions(-) create mode 100644 src/sp/transport/udp/CMakeLists.txt create mode 100644 src/sp/transport/udp/udp.c create mode 100644 src/sp/transport/udp/udp_tran_test.c diff --git a/cmake/NNGOptions.cmake b/cmake/NNGOptions.cmake index 3a764b8b1..d117ee36d 100644 --- a/cmake/NNGOptions.cmake +++ b/cmake/NNGOptions.cmake @@ -135,6 +135,9 @@ mark_as_advanced(NNG_TRANSPORT_WSS) option (NNG_TRANSPORT_FDC "Enable File Descriptor transport (EXPERIMENTAL)" ON) mark_as_advanced(NNG_TRANSPORT_FDC) +option (NNG_TRANSPORT_UDP "Enable UDP transport (EXPERIMENTAL)" ON) +mark_as_advanced(NNG_TRANSPORT_UDP) + # ZeroTier option (NNG_TRANSPORT_ZEROTIER "Enable ZeroTier transport (requires libzerotiercore)." OFF) mark_as_advanced(NNG_TRANSPORT_ZEROTIER) diff --git a/src/core/aio.c b/src/core/aio.c index 2f548d745..30a963f9e 100644 --- a/src/core/aio.c +++ b/src/core/aio.c @@ -8,6 +8,8 @@ // found online at https://opensource.org/licenses/MIT. // +#include "core/defs.h" +#include "nng/nng.h" #include "nng_impl.h" #include @@ -764,7 +766,16 @@ nni_sleep_aio(nng_duration ms, nng_aio *aio) ms = aio->a_timeout; } } - aio->a_expire = nni_clock() + ms; + switch (ms) { + case NNG_DURATION_INFINITE: + case NNG_DURATION_DEFAULT: + // infinite sleep + aio->a_expire = NNI_TIME_NEVER; + break; + default: + aio->a_expire = nni_clock() + ms; + break; + } if ((rv = nni_aio_schedule(aio, nni_sleep_cancel, NULL)) != 0) { nni_aio_finish_error(aio, rv); diff --git a/src/core/idhash.c b/src/core/idhash.c index 0ee86f0dc..4ee77f6c1 100644 --- a/src/core/idhash.c +++ b/src/core/idhash.c @@ -389,3 +389,9 @@ nni_id_visit(nni_id_map *m, uint64_t *keyp, void **valp, uint32_t *cursor) *cursor = index; return (false); } + +uint32_t +nni_id_count(const nni_id_map *m) +{ + return (m->id_count); +} diff --git a/src/core/idhash.h b/src/core/idhash.h index ea91b6be8..826016bda 100644 --- a/src/core/idhash.h +++ b/src/core/idhash.h @@ -46,15 +46,16 @@ struct nni_id_map { #define NNI_ID_FLAG_RANDOM 2 // start at a random value #define NNI_ID_FLAG_REGISTER 4 // map is registered for finalization -extern void nni_id_map_init(nni_id_map *, uint64_t, uint64_t, bool); -extern void nni_id_map_fini(nni_id_map *); -extern void *nni_id_get(nni_id_map *, uint64_t); -extern int nni_id_set(nni_id_map *, uint64_t, void *); -extern int nni_id_alloc(nni_id_map *, uint64_t *, void *); -extern int nni_id_alloc32(nni_id_map *, uint32_t *, void *); -extern int nni_id_remove(nni_id_map *, uint64_t); -extern void nni_id_map_sys_fini(void); -extern bool nni_id_visit(nni_id_map *, uint64_t *, void **, uint32_t *); +extern void nni_id_map_init(nni_id_map *, uint64_t, uint64_t, bool); +extern void nni_id_map_fini(nni_id_map *); +extern void *nni_id_get(nni_id_map *, uint64_t); +extern int nni_id_set(nni_id_map *, uint64_t, void *); +extern int nni_id_alloc(nni_id_map *, uint64_t *, void *); +extern int nni_id_alloc32(nni_id_map *, uint32_t *, void *); +extern int nni_id_remove(nni_id_map *, uint64_t); +extern void nni_id_map_sys_fini(void); +extern bool nni_id_visit(nni_id_map *, uint64_t *, void **, uint32_t *); +extern uint32_t nni_id_count(const nni_id_map *); #define NNI_ID_MAP_INITIALIZER(min, max, flags) \ { \ diff --git a/src/core/message.c b/src/core/message.c index 3da93ac67..7a644d79c 100644 --- a/src/core/message.c +++ b/src/core/message.c @@ -29,6 +29,7 @@ struct nng_msg { nni_chunk m_body; uint32_t m_pipe; // set on receive nni_atomic_int m_refcnt; + nng_sockaddr m_addr; // set on receive, transport use }; #if 0 @@ -544,6 +545,16 @@ nni_msg_chop(nni_msg *m, size_t len) return (nni_chunk_chop(&m->m_body, len)); } +// Grow the message header, but don't put anything there. +// This is useful for setting up to receive directly into it +// for zero copy purposes. +void +nni_msg_header_extend(nni_msg *m, size_t len) +{ + NNI_ASSERT((len + m->m_header_len) <= sizeof(m->m_header_buf)); + m->m_header_len += len; +} + int nni_msg_header_append(nni_msg *m, const void *data, size_t len) { @@ -656,3 +667,15 @@ nni_msg_get_pipe(const nni_msg *m) { return (m->m_pipe); } + +const nng_sockaddr * +nni_msg_address(const nni_msg *msg) +{ + return (&msg->m_addr); +} + +void +nni_msg_set_address(nng_msg *msg, const nng_sockaddr *addr) +{ + msg->m_addr = *addr; +} diff --git a/src/core/message.h b/src/core/message.h index 7e35ba752..cc47457a9 100644 --- a/src/core/message.h +++ b/src/core/message.h @@ -1,5 +1,5 @@ // -// Copyright 2021 Staysail Systems, Inc. +// Copyright 2024 Staysail Systems, Inc. // Copyright 2017 Capitar IT Group BV // // This software is supplied under the terms of the MIT License, a @@ -20,9 +20,9 @@ extern int nni_msg_realloc(nni_msg *, size_t); extern int nni_msg_reserve(nni_msg *, size_t); extern size_t nni_msg_capacity(nni_msg *); extern int nni_msg_dup(nni_msg **, const nni_msg *); -extern void * nni_msg_header(nni_msg *); +extern void *nni_msg_header(nni_msg *); extern size_t nni_msg_header_len(const nni_msg *); -extern void * nni_msg_body(nni_msg *); +extern void *nni_msg_body(nni_msg *); extern size_t nni_msg_len(const nni_msg *); extern int nni_msg_append(nni_msg *, const void *, size_t); extern int nni_msg_insert(nni_msg *, const void *, size_t); @@ -55,6 +55,14 @@ extern void nni_msg_clone(nni_msg *); extern nni_msg *nni_msg_unique(nni_msg *); extern bool nni_msg_shared(nni_msg *); +// Socket address access. Principally useful for transports like UDP, +// which may need to remember or add the socket address later. +// SP transports will generally not support upper layers setting the +// address on send, but will take the information from the pipe. +// It may be set on receive, depending upon the transport. +extern const nng_sockaddr *nni_msg_address(const nni_msg *); +extern void nni_msg_set_address(nng_msg *, const nng_sockaddr *); + // nni_msg_pull_up ensures that the message is unique, and that any // header present is "pulled up" into the message body. If the function // cannot do this for any reason (out of space in the body), then NULL diff --git a/src/core/platform.h b/src/core/platform.h index 5cd931e6d..45a47a592 100644 --- a/src/core/platform.h +++ b/src/core/platform.h @@ -356,7 +356,7 @@ extern int nni_tcp_listener_get( // Symbolic service names will be looked up assuming SOCK_STREAM, so // they may not work with UDP. extern void nni_resolv_ip( - const char *, const char *, int, bool, nng_sockaddr *sa, nni_aio *); + const char *, const char *, uint16_t, bool, nng_sockaddr *sa, nni_aio *); // nni_parse_ip parses an IP address, without a port. extern int nni_parse_ip(const char *, nng_sockaddr *); diff --git a/src/core/url.c b/src/core/url.c index 65cf478bd..a42ece1f4 100644 --- a/src/core/url.c +++ b/src/core/url.c @@ -8,6 +8,7 @@ // found online at https://opensource.org/licenses/MIT. // +#include "nng/nng.h" #include "nng_impl.h" #include @@ -277,6 +278,32 @@ nni_url_default_port(const char *scheme) return (""); } +// Return the address family for an address scheme. +// Returns NNG_AF_UNSPEC for unknown cases or where +// we do not want to choose between AF_INET and AF_INET6. +uint16_t +nni_url_family(const char *scheme) +{ + if (strcmp(scheme, "ipc") == 0) { + return (NNG_AF_IPC); + } + if (strcmp(scheme, "inproc") == 0) { + return (NNG_AF_INPROC); + } + if (strcmp(scheme, "abstract") == 0) { + return (NNG_AF_ABSTRACT); + } +#ifdef NNG_HAVE_INET6 + if (strchr(scheme, '6') != NULL) { + return (NNG_AF_INET6); + } +#endif + if (strchr(scheme, '4') != NULL) { + return (NNG_AF_INET); + } + return (NNG_AF_UNSPEC); +} + // URLs usually follow the following format: // // scheme:[//[userinfo@]host][/]path[?query][#fragment] @@ -600,7 +627,7 @@ nni_url_clone(nni_url **dstp, const nni_url *src) // nni_url_to_address resolves a URL into a sockaddr, assuming the URL is for // an IP address. int -nni_url_to_address(nng_sockaddr *sa, const nng_url *url) +nni_url_to_address(nng_sockaddr *sa, const nni_url *url) { int af; nni_aio aio; diff --git a/src/core/url.h b/src/core/url.h index 877a07917..10e569fb2 100644 --- a/src/core/url.h +++ b/src/core/url.h @@ -17,6 +17,7 @@ extern int nni_url_parse(nni_url **, const char *path); extern void nni_url_free(nni_url *); extern int nni_url_clone(nni_url **, const nni_url *); extern const char *nni_url_default_port(const char *); +extern uint16_t nni_url_family(const char *); extern int nni_url_asprintf(char **, const nni_url *); extern int nni_url_asprintf_port(char **, const nni_url *, int); extern size_t nni_url_decode(uint8_t *, const char *, size_t); diff --git a/src/platform/posix/posix_resolv_gai.c b/src/platform/posix/posix_resolv_gai.c index f522499ee..7bb725ec1 100644 --- a/src/platform/posix/posix_resolv_gai.c +++ b/src/platform/posix/posix_resolv_gai.c @@ -236,7 +236,7 @@ resolv_task(resolv_item *item) } void -nni_resolv_ip(const char *host, const char *serv, int af, bool passive, +nni_resolv_ip(const char *host, const char *serv, uint16_t af, bool passive, nng_sockaddr *sa, nni_aio *aio) { resolv_item *item; diff --git a/src/platform/posix/posix_udp.c b/src/platform/posix/posix_udp.c index 4ef4c68ce..911787cae 100644 --- a/src/platform/posix/posix_udp.c +++ b/src/platform/posix/posix_udp.c @@ -9,8 +9,8 @@ // #include "core/nng_impl.h" -#include "nng/nng.h" #include "platform/posix/posix_impl.h" +#include #include #ifdef NNG_PLATFORM_POSIX diff --git a/src/sp/transport.c b/src/sp/transport.c index 1d895c828..ab2a68da9 100644 --- a/src/sp/transport.c +++ b/src/sp/transport.c @@ -75,6 +75,9 @@ extern void nni_sp_zt_register(void); #ifdef NNG_TRANSPORT_FDC extern void nni_sp_sfd_register(void); #endif +#ifdef NNG_TRANSPORT_UDP +extern void nni_sp_udp_register(void); +#endif void nni_sp_tran_sys_init(void) @@ -103,6 +106,9 @@ nni_sp_tran_sys_init(void) #ifdef NNG_TRANSPORT_FDC nni_sp_sfd_register(); #endif +#ifdef NNG_TRANSPORT_UDP + nni_sp_udp_register(); +#endif } // nni_sp_tran_sys_fini finalizes the entire transport system, including all diff --git a/src/sp/transport/CMakeLists.txt b/src/sp/transport/CMakeLists.txt index 0de80015d..d59a19d40 100644 --- a/src/sp/transport/CMakeLists.txt +++ b/src/sp/transport/CMakeLists.txt @@ -1,5 +1,5 @@ # -# Copyright 2023 Staysail Systems, Inc. +# Copyright 2024 Staysail Systems, Inc. # # This software is supplied under the terms of the MIT License, a # copy of which should be located in the distribution where this @@ -17,4 +17,4 @@ add_subdirectory(tcp) add_subdirectory(tls) add_subdirectory(ws) add_subdirectory(zerotier) - +add_subdirectory(udp) diff --git a/src/sp/transport/tcp/tcp.c b/src/sp/transport/tcp/tcp.c index be2e03458..1760551cc 100644 --- a/src/sp/transport/tcp/tcp.c +++ b/src/sp/transport/tcp/tcp.c @@ -730,7 +730,7 @@ tcptran_ep_close(void *arg) static int tcptran_url_parse_source(nng_url *url, nng_sockaddr *sa, const nng_url *surl) { - int af; + uint16_t af; char *semi; char *src; size_t len; @@ -751,18 +751,7 @@ tcptran_url_parse_source(nng_url *url, nng_sockaddr *sa, const nng_url *surl) len = (size_t) (semi - url->u_hostname); url->u_hostname = semi + 1; - - if (strcmp(surl->u_scheme, "tcp") == 0) { - af = NNG_AF_UNSPEC; - } else if (strcmp(surl->u_scheme, "tcp4") == 0) { - af = NNG_AF_INET; -#ifdef NNG_ENABLE_IPV6 - } else if (strcmp(surl->u_scheme, "tcp6") == 0) { - af = NNG_AF_INET6; -#endif - } else { - return (NNG_EADDRINVAL); - } + af = nni_url_family(url->u_scheme); if ((src = nni_alloc(len + 1)) == NULL) { return (NNG_ENOMEM); diff --git a/src/sp/transport/tls/tls.c b/src/sp/transport/tls/tls.c index f488771b5..504d885ed 100644 --- a/src/sp/transport/tls/tls.c +++ b/src/sp/transport/tls/tls.c @@ -712,18 +712,9 @@ tlstran_url_parse_source(nni_url *url, nng_sockaddr *sa, const nni_url *surl) len = (size_t) (semi - url->u_hostname); url->u_hostname = semi + 1; + af = nni_url_family(url->u_scheme); - if (strcmp(surl->u_scheme, "tls+tcp") == 0) { - af = NNG_AF_UNSPEC; - } else if (strcmp(surl->u_scheme, "tls+tcp4") == 0) { - af = NNG_AF_INET; -#ifdef NNG_ENABLE_IPV6 - } else if (strcmp(surl->u_scheme, "tls+tcp6") == 0) { - af = NNG_AF_INET6; -#endif - } else { - return (NNG_EADDRINVAL); - } + rv = nni_url_to_address(sa, url); if ((src = nni_alloc(len + 1)) == NULL) { return (NNG_ENOMEM); diff --git a/src/sp/transport/udp/CMakeLists.txt b/src/sp/transport/udp/CMakeLists.txt new file mode 100644 index 000000000..b08cd8613 --- /dev/null +++ b/src/sp/transport/udp/CMakeLists.txt @@ -0,0 +1,15 @@ +# +# Copyright 2024 Staysail Systems, Inc. +# +# This software is supplied under the terms of the MIT License, a +# copy of which should be located in the distribution where this +# file was obtained (LICENSE.txt). A copy of the license may also be +# found online at https://opensource.org/licenses/MIT. +# + +# UDP transport +nng_directory(udp) + +nng_sources_if(NNG_TRANSPORT_UDP udp.c) +nng_defines_if(NNG_TRANSPORT_UDP NNG_TRANSPORT_UDP) +nng_test_if(NNG_TRANSPORT_UDP udp_tran_test) diff --git a/src/sp/transport/udp/udp.c b/src/sp/transport/udp/udp.c new file mode 100644 index 000000000..8230692cf --- /dev/null +++ b/src/sp/transport/udp/udp.c @@ -0,0 +1,1835 @@ +// Copyright 2024 Staysail Systems, Inc. +// +// This software is supplied under the terms of the MIT License, a +// copy of which should be located in the distribution where this +// file was obtained (LICENSE.txt). A copy of the license may also be +// found online at https://opensource.org/licenses/MIT. +// + +#include "core/aio.h" +#include "core/defs.h" +#include "core/idhash.h" +#include "core/nng_impl.h" +#include "core/options.h" +#include "core/platform.h" +#include "nng/nng.h" + +#include +#include +#include + +// Experimental UDP transport. Unicast only. +typedef struct udp_pipe udp_pipe; +typedef struct udp_ep udp_ep; + +// These should reallyh be renamed for the project. +#define nni_udp_open nni_plat_udp_open +#define nni_udp_close nni_plat_udp_close +#define nni_udp_send nni_plat_udp_send +#define nni_udp_recv nni_plat_udp_recv +#define nni_udp nni_plat_udp +#define nni_udp_sockname nni_plat_udp_sockname + +// OP code, 8 bits +enum udp_opcode { + OPCODE_DATA = 0, + OPCODE_CREQ = 1, + OPCODE_CACK = 2, + OPCODE_DISC = 3, + OPCODE_MESH = 4, +}; + +// Disconnect reason, must be 16 bits +typedef enum udp_disc_reason { + DISC_CLOSED = 0, // normal close + DISC_TYPE = 1, // bad SP type + DISC_NOTCONN = 2, // no such connection + DISC_REFUSED = 3, // refused by policy + DISC_MSGSIZE = 4, // message too large + DISC_NEGO = 5, // neogtiation failed + DISC_INACTIVE = 6, // closed due to inactivity + DISC_PROTO = 7, // other protocol error + DISC_NOBUF = 8, // resources exhausted +} udp_disc_reason; + +#ifndef NNG_UDP_TXQUEUE_LEN +#define NNG_UDP_TXQUEUE_LEN 32 +#endif + +#ifndef NNG_UDP_RXQUEUE_LEN +#define NNG_UDP_RXQUEUE_LEN 16 +#endif + +#ifndef NNG_UDP_RECVMAX +#define NNG_UDP_RECVMAX 65000 // largest permitted by spec +#endif + +#ifndef NNG_UDP_REFRESH +#define NNG_UDP_REFRESH 5 +#endif + +#ifndef NNG_UDP_CONNRETRY +#define NNG_UDP_CONNRETRY 1 +#endif + +#define UDP_EP_ROLE(ep) ((ep)->dialer ? "dialer " : "listener") + +// NB: Each of the following messages is exactly 20 bytes in size + +typedef struct udp_sp_data { + uint8_t us_ver; + uint8_t us_op_code; + uint16_t us_type; + uint32_t us_sender_id; + uint32_t us_peer_id; + uint32_t us_sequence; + uint16_t us_length; + uint16_t us_reserved; // depends on message type +} udp_sp_data; + +typedef struct udp_sp_creq { + uint8_t us_ver; + uint8_t us_op_code; + uint16_t us_type; + uint32_t us_sender_id; + uint32_t us_peer_id; + uint32_t us_sequence; + uint16_t us_recv_max; // actually max payload size + uint8_t us_reserved; + uint8_t us_refresh; +} udp_sp_creq; + +typedef struct udp_sp_disc { + uint8_t us_ver; + uint8_t us_op_code; + uint16_t us_type; + uint32_t us_sender_id; + uint32_t us_peer_id; + uint32_t us_sequence; + uint16_t us_reason; // depends on message type + uint16_t us_reserved; +} udp_sp_disc; + +typedef struct udp_sp_mesh { + uint8_t us_ver; + uint8_t us_op_code; + uint16_t us_type; + uint32_t us_sender_id; + uint32_t us_reserved1; + uint32_t us_sequence; + uint32_t us_reserved2; +} udp_sp_mesh; + +// ack is the same format as request +typedef struct udp_sp_creq udp_sp_cack; + +typedef union udp_sp_msg { + udp_sp_data data; + udp_sp_creq creq; + udp_sp_cack cack; + udp_sp_disc disc; + udp_sp_mesh mesh; +} udp_sp_msg; + +// Like a NIC driver, this is a "descriptor" for UDP TX packets. +// This allows us to create a circular ring of these to support +// queueing for TX gracefully. +typedef struct udp_txdesc { + udp_sp_msg header; // UDP transport message headers + nni_msg *payload; // may be null, only for data messages + nng_sockaddr sa; + bool submitted; // true if submitted +} udp_txdesc; + +typedef struct udp_txring { + udp_txdesc *descs; + uint16_t head; + uint16_t tail; + uint16_t count; + uint16_t size; +} udp_txring; + +#define UDP_TXRING_SZ 128 + +// UDP pipe resend (CREQ) in msec (nng_duration) +#define UDP_PIPE_REFRESH(p) ((p)->refresh * NNI_SECOND) + +// UDP pipe timeout in msec (nng_duration) +#define UDP_PIPE_TIMEOUT(p) (((p)->refresh * NNI_SECOND * 5) / 2) + +struct udp_pipe { + udp_ep *ep; + nni_pipe *npipe; + nng_sockaddr peer_addr; + uint16_t peer; + uint16_t proto; + uint32_t self_id; + uint32_t peer_id; + uint32_t self_seq; + uint32_t peer_seq; + uint16_t sndmax; // peer's max recv size + uint16_t rcvmax; // max recv size + uint16_t refresh; // seconds, for the protocol + bool closed; + bool dialer; + nng_duration stale_dur; + nng_duration refresh_dur; + nng_time next_wake; + nng_time next_creq; + nng_time expire; + nni_list_node node; + nni_lmq rx_mq; + nni_list rx_aios; +}; + +struct udp_ep { + nni_udp *udp; + nni_mtx mtx; + uint16_t proto; + uint16_t af; // address family + bool fini; + bool started; + bool closed; + bool cooldown; + nng_url *url; + const char *host; // for dialers + int refcnt; // active pipes + nni_aio *useraio; + nni_aio *connaio; + nni_aio timeaio; + nni_aio resaio; + + nni_time linger; // when lingering send expires + + bool dialer; + bool tx_busy; // true if tx pending + nni_msg *rx_payload; // current receive message + nng_sockaddr rx_sa; // addr for last message + + nni_aio tx_aio; // aio for TX handling + nni_aio rx_aio; // aio for RX handling + nni_id_map pipes; // pipes (indexed by id) + nni_sockaddr self_sa; // our address + nni_sockaddr peer_sa; // peer address, only for dialer; + nni_sockaddr mesh_sa; // mesh source address (ours) + nni_list connaios; // aios from accept waiting for a client peer + nni_list connpipes; // pipes waiting to be connected + uint16_t refresh; // refresh interval for connections in seconds + udp_sp_msg rx_msg; // contains the received message header + uint16_t rcvmax; // max payload, trimmed to uint16_t + uint16_t short_msg; + udp_txring tx_ring; + nni_time next_wake; + nni_aio_completions complq; + +#ifdef NNG_ENABLE_STATS + nni_stat_item st_rcv_max; +#endif +}; + +static void udp_ep_hold(udp_ep *ep); +static void udp_ep_rele(udp_ep *ep); +static void udp_ep_fini(void *); +static void udp_ep_start(udp_ep *); +static void udp_pipe_fini(void *); +static void udp_resolv_cb(void *); +static void udp_rx_cb(void *); + +static int +udp_pipe_alloc(udp_pipe **pp, udp_ep *ep, uint32_t peer_id, nng_sockaddr *sa) +{ + udp_pipe *p; + int rv; + nni_time now; + if ((p = NNI_ALLOC_STRUCT(p)) == NULL) { + return (NNG_ENOMEM); + } + if ((rv = nni_id_alloc32(&ep->pipes, &p->self_id, p)) != 0) { + NNI_FREE_STRUCT(p); + return (rv); + } + now = nni_clock(); + nni_aio_list_init(&p->rx_aios); + p->ep = ep; + p->dialer = ep->dialer; + p->self_seq = nni_random(); + p->peer_id = peer_id; + p->proto = ep->proto; + p->peer_addr = *sa; + p->refresh = p->dialer ? NNG_UDP_CONNRETRY : ep->refresh; + p->next_wake = now + UDP_PIPE_REFRESH(p); + p->expire = now + UDP_PIPE_TIMEOUT(p); + p->rcvmax = ep->rcvmax; + *pp = p; + nni_lmq_init(&p->rx_mq, NNG_UDP_RXQUEUE_LEN); + udp_ep_hold(ep); + return (0); +} + +static void udp_recv_data( + udp_ep *ep, udp_sp_data *dreq, size_t len, nng_sockaddr *sa); +static void udp_send_disc_full(udp_ep *ep, nng_sockaddr *sa, uint32_t local_id, + uint32_t remote_id, uint32_t seq, udp_disc_reason reason); +static void udp_send_disc(udp_ep *ep, udp_pipe *p, udp_disc_reason reason); + +static void udp_ep_match(udp_ep *ep); + +static void +udp_tran_init(void) +{ +} + +static void +udp_tran_fini(void) +{ +} + +static void +udp_pipe_close(void *arg) +{ + udp_pipe *p = arg; + udp_ep *ep = p->ep; + nni_aio *aio; + + nni_mtx_lock(&ep->mtx); + udp_send_disc(ep, p, DISC_CLOSED); + while ((aio = nni_list_first(&p->rx_aios)) != NULL) { + nni_aio_list_remove(aio); + nni_aio_finish_error(aio, NNG_ECLOSED); + } + nni_mtx_unlock(&ep->mtx); +} + +static void +udp_pipe_stop(void *arg) +{ + udp_pipe_close(arg); +} + +static int +udp_pipe_init(void *arg, nni_pipe *npipe) +{ + udp_pipe *p = arg; + p->npipe = npipe; + + return (0); +} + +static void +udp_pipe_destroy(udp_pipe *p) +{ + nng_msg *m; + + // call with ep->mtx lock held + while (!nni_lmq_empty(&p->rx_mq)) { + nni_lmq_get(&p->rx_mq, &m); + nni_msg_free(m); + } + NNI_ASSERT(nni_list_empty(&p->rx_aios)); + + NNI_FREE_STRUCT(p); +} + +static void +udp_pipe_fini(void *arg) +{ + udp_pipe *p = arg; + udp_ep *ep = p->ep; + + nni_mtx_lock(&ep->mtx); + nni_id_remove(&ep->pipes, p->self_id); + + udp_pipe_destroy(p); + udp_ep_rele(ep); // releases lock +} + +// Find the pipe matching the given id (our pipe id, taken from the peer_id +// of the header) and peer's sockaddr. Returns NULL if not found. The +// ep lock must be held. If a pending pipe (not yet connected) is found, then +// it is returned instead. +static udp_pipe * +udp_find_pipe(udp_ep *ep, uint32_t self_id, uint32_t peer_id) +{ + udp_pipe *p; + if (((p = nni_id_get(&ep->pipes, self_id)) != NULL) && (!p->closed)) { + if (p->peer_id == 0 || p->peer_id == peer_id) { + return (p); + } + } + return (NULL); +} + +static bool +udp_check_pipe_sequence(udp_pipe *p, uint32_t seq) +{ + int32_t delta; + // signed math so we can see how far apart they are + delta = (int32_t) (seq - p->peer_seq); + if (delta < 0) { + // out of order delivery + return (false); + } + // TODO: bump a stat for misses if delta > 0. + p->peer_seq = seq + 1; // expected next sequence number + return (true); +} + +static void +udp_start_rx(udp_ep *ep) +{ + nni_iov iov[2]; + + iov[0].iov_buf = &ep->rx_msg; + iov[0].iov_len = sizeof(ep->rx_msg); + iov[1].iov_buf = nni_msg_body(ep->rx_payload); + iov[1].iov_len = nni_msg_len(ep->rx_payload); + nni_aio_set_input(&ep->rx_aio, 0, &ep->rx_sa); + nni_aio_set_iov(&ep->rx_aio, 2, iov); + nni_udp_recv(ep->udp, &ep->rx_aio); +} + +static void +udp_start_tx(udp_ep *ep) +{ + udp_txring *ring = &ep->tx_ring; + udp_txdesc *desc; + nng_msg *msg; + + if ((!ring->count) || (!ep->started) || ep->tx_busy) { + return; + } + ep->tx_busy = true; + + // NB: This does not advance the tail yet. + // The tail will be advanced when the operation is complete. + desc = &ring->descs[ring->tail]; + nni_iov iov[3]; + int niov = 0; + + NNI_ASSERT(desc->submitted); + iov[0].iov_buf = &desc->header; + iov[0].iov_len = sizeof(desc->header); + niov++; + + if ((msg = desc->payload) != NULL) { + if (nni_msg_header_len(msg) > 0) { + iov[niov].iov_buf = nni_msg_header(msg); + iov[niov].iov_len = nni_msg_header_len(msg); + niov++; + } + if (nni_msg_len(msg) > 0) { + iov[niov].iov_buf = nni_msg_body(msg); + iov[niov].iov_len = nni_msg_len(msg); + niov++; + } + } + nni_aio_set_input(&ep->tx_aio, 0, &desc->sa); + nni_aio_set_iov(&ep->tx_aio, niov, iov); + // it should *never* take this long, but allow for ARP resolution + nni_aio_set_timeout(&ep->tx_aio, NNI_SECOND * 10); + nni_udp_send(ep->udp, &ep->tx_aio); +} + +static void +udp_queue_tx(udp_ep *ep, nng_sockaddr *sa, udp_sp_msg *msg, nni_msg *payload) +{ + udp_txring *ring = &ep->tx_ring; + udp_txdesc *desc = &ring->descs[ring->head]; + + if (ring->count == ring->size || !ep->started) { + // ring is full + // TODO: bump a stat + if (payload != NULL) { + nni_msg_free(payload); + } + return; + } +#ifdef NNG_LITTLE_ENDIAN + // This covers modern GCC, clang, Visual Studio. + desc->header = *msg; +#else + // Fix the endianness, so other routines don't have to. + // It turns out that the endianness of the fields of CREQ + // is compatible with the fields of every other message type. + // We only have to do this for systems that are not known + // (at compile time) to be little endian. + desc->header.creq.us_ver = 0x1; + desc->header.creq.us_op_code = msg->creq.us_op_code; + NNI_PUT16LE(&desc->header.creq.us_type, msg->creq.us_type); + NNI_PUT32LE(&desc->header.creq.us_sended_id, msg->creq.us_sender_id); + NNI_PUT32LE(&desc->header.creq.us_peer_id, msg->creq.us_peer_id); + NNI_PUT32LE(&desc->header.creq.us_sequence, msg->creq.us_sequence); + NNI_PUT16LE(&desc->header.creq.us_recv_max, msg->creq.us_recv_max); + desc->header.creq.us_reserved = 0; + desc->header.creq.us_refresh = msg->creq.us_refresh; +#endif + + desc->payload = payload; + desc->sa = *sa; + desc->submitted = true; + ring->count++; + ring->head++; + if (ring->head == ring->size) { + ring->head = 0; + } + udp_start_tx(ep); +} + +static void +udp_finish_tx(udp_ep *ep) +{ + udp_txring *ring = &ep->tx_ring; + udp_txdesc *desc; + + NNI_ASSERT(ring->count > 0); + desc = &ring->descs[ring->tail]; + NNI_ASSERT(desc->submitted); + if (desc->payload != NULL) { + nni_msg_free(desc->payload); + desc->payload = NULL; + } + desc->submitted = false; + ring->tail++; + ring->count--; + if (ring->tail == ring->size) { + ring->tail = 0; + } + ep->tx_busy = false; + + // possibly start another tx going + udp_start_tx(ep); +} + +static void +udp_send_disc(udp_ep *ep, udp_pipe *p, udp_disc_reason reason) +{ + nni_aio *aio; + if (p->closed) { + return; + } + p->closed = true; + while ((aio = nni_list_first(&p->rx_aios)) != NULL) { + nni_aio_list_remove(aio); + nni_aio_finish_error(aio, NNG_ECLOSED); + } + udp_send_disc_full( + ep, &p->peer_addr, p->self_id, p->peer_id, p->self_seq++, reason); +} + +static void +udp_send_disc_full(udp_ep *ep, nng_sockaddr *sa, uint32_t local_id, + uint32_t remote_id, uint32_t seq, udp_disc_reason reason) +{ + udp_sp_disc disc; + + disc.us_ver = 0x1; + disc.us_op_code = OPCODE_DISC; + disc.us_type = ep->proto; + disc.us_sender_id = local_id; + disc.us_peer_id = remote_id; + disc.us_sequence = seq; + disc.us_reason = (uint16_t) reason; + udp_queue_tx(ep, sa, (void *) &disc, NULL); +} + +static void +udp_send_creq(udp_ep *ep, udp_pipe *p) +{ + udp_sp_creq creq; + creq.us_ver = 0x1; + creq.us_op_code = OPCODE_CREQ; + creq.us_type = p->proto; + creq.us_sender_id = p->self_id; + creq.us_peer_id = p->peer_id; + creq.us_sequence = p->self_seq++; + creq.us_recv_max = p->rcvmax; + creq.us_refresh = p->refresh; + p->next_creq = nni_clock() + UDP_PIPE_REFRESH(p); + p->next_wake = p->next_creq; + if (p->next_wake < ep->next_wake) { + ep->next_wake = p->next_wake; + nni_aio_abort(&ep->timeaio, NNG_EINTR); + } + udp_queue_tx(ep, &p->peer_addr, (void *) &creq, NULL); +} + +static void +udp_send_cack(udp_ep *ep, udp_pipe *p) +{ + udp_sp_cack cack; + cack.us_ver = 0x01; + cack.us_op_code = OPCODE_CACK; + cack.us_type = p->proto; + cack.us_sender_id = p->self_id; + cack.us_peer_id = p->peer_id; + cack.us_sequence = p->self_seq++; + cack.us_recv_max = p->rcvmax; + cack.us_refresh = p->refresh; + udp_queue_tx(ep, &p->peer_addr, (void *) &cack, NULL); +} + +static void +udp_recv_disc(udp_ep *ep, udp_sp_disc *disc, nng_sockaddr *sa) +{ + udp_pipe *p; + nni_aio *aio; + NNI_ARG_UNUSED(sa); + + p = udp_find_pipe(ep, disc->us_peer_id, disc->us_sender_id); + if (p != NULL) { + // For now we aren't validating the sequence numbers. + // This allows for an out of order DISC to cause the + // connection to be dropped, but it should self heal. + p->closed = true; + p->self_id = 0; // prevent it from being identified later + while ((aio = nni_list_first(&p->rx_aios)) != NULL) { + nni_aio_list_remove(aio); + nni_aio_finish_error(aio, NNG_ECLOSED); + } + } +} + +// Receive data for the pipe. Returns true if we used +// the message, false otherwise. +static void +udp_recv_data(udp_ep *ep, udp_sp_data *dreq, size_t len, nng_sockaddr *sa) +{ + // NB: ep mtx is locked + udp_pipe *p; + nni_aio *aio; + nni_msg *msg; + nni_time now; + + // send_id is the remote peer's ID + // peer_id is our ID (receiver ID) + // sequence number is our sender's sequence + uint32_t send_id = dreq->us_sender_id; + uint32_t peer_id = dreq->us_peer_id; + + // NB: Peer ID endianness does not matter, as long we use it + // consistently. + if ((p = udp_find_pipe(ep, peer_id, send_id)) == NULL) { + // TODO: Bump a stat... + udp_send_disc_full(ep, sa, send_id, peer_id, 0, DISC_NOTCONN); + // Question: how do we store the sockaddr for that? + return; + } + if (p->peer_id == 0) { + // connection isn't formed yet ... send another CREQ + udp_send_creq(ep, p); + return; + } + + now = nni_clock(); + + // Make sure the message wasn't truncated, and that it fits within + // our maximum agreed upon payload. + if ((dreq->us_length > len) || (dreq->us_length > p->rcvmax)) { + udp_send_disc(ep, p, DISC_MSGSIZE); + return; + } + + p->expire = now + UDP_PIPE_TIMEOUT(p); + p->next_wake = now + UDP_PIPE_REFRESH(p); + + if (p->next_wake < ep->next_wake) { + ep->next_wake = p->next_wake; + nni_aio_abort(&ep->timeaio, NNG_EINTR); + } + + // trim the message down to its + nni_msg_chop( + ep->rx_payload, nni_msg_len(ep->rx_payload) - dreq->us_length); + + if (!udp_check_pipe_sequence(p, dreq->us_sequence)) { + // out of order delivery, drop it + // TODO: bump a stat + return; + } + + if (nni_lmq_full(&p->rx_mq)) { + // bump a NOBUF stat + return; + } + + // Short message, just alloc and copy + if (len <= ep->short_msg) { + if (nng_msg_alloc(&msg, len) != 0) { + // TODO: bump a stat + return; + } + nni_msg_set_address(msg, sa); + nni_msg_clear(msg); + nni_msg_append(msg, nni_msg_body(ep->rx_payload), len); + nni_lmq_put(&p->rx_mq, msg); + } else { + // Message size larger than copy break, do zero copy + msg = ep->rx_payload; + if (nng_msg_alloc(&ep->rx_payload, + ep->rcvmax + sizeof(ep->rx_msg)) != 0) { + // TODO: bump a stat + ep->rx_payload = msg; // make sure we put it back + return; + } + + if (len > nng_msg_len(msg)) { + // chop off any unfilled tail + nng_msg_chop(msg, nng_msg_len(msg) - len); + } + nni_msg_set_address(msg, sa); + nni_lmq_put(&p->rx_mq, msg); + } + + while (((aio = nni_list_first(&p->rx_aios)) != NULL) && + (!nni_lmq_empty(&p->rx_mq))) { + nni_aio_list_remove(aio); + nni_lmq_get(&p->rx_mq, &msg); + nni_aio_set_msg(aio, msg); + nni_aio_completions_add( + &ep->complq, aio, 0, nni_aio_count(aio)); + } +} + +static void +udp_recv_creq(udp_ep *ep, udp_sp_creq *creq, nng_sockaddr *sa) +{ + udp_pipe *p; + nni_time now; + + now = nni_clock(); + if (ep->dialer) { + // dialers do not accept CREQ requests + udp_send_disc_full(ep, sa, creq->us_peer_id, + creq->us_sender_id, 0, DISC_REFUSED); + return; + } + if ((p = udp_find_pipe(ep, creq->us_peer_id, creq->us_sender_id))) { + if ((p->peer_id == 0) || (p->peer != creq->us_type)) { + // we don't expect this -- a connection request from a + // peer while we have an oustanding request of our own. + // We *could* compare the sockaddrs to see if they + // match and if so then treat this as just a dueling + // connection. but for now we just discard it -- we'll + // wait for the CACK. + return; + } + + // so we know who it is from.. this is a refresh. + if (creq->us_refresh == 0) { + udp_send_disc(ep, p, DISC_NEGO); + return; + } + if (creq->us_refresh < p->refresh) { + p->refresh = creq->us_refresh; + } + p->next_wake = now + UDP_PIPE_REFRESH(p); + p->expire = now + UDP_PIPE_TIMEOUT(p); + if (p->next_wake < ep->next_wake) { + // earlier wake up, kick it with EINTR + ep->next_wake = p->next_wake; + nni_aio_abort(&ep->timeaio, NNG_EINTR); + } + udp_send_cack(ep, p); + return; + } + + // new pipe + if (ep->fini || ep->closed) { + // endpoint is closing down, reject it. + udp_send_disc_full( + ep, sa, 0, creq->us_sender_id, 0, DISC_REFUSED); + return; + } + if (creq->us_refresh == 0) { + udp_send_disc_full( + ep, sa, 0, creq->us_sender_id, 0, DISC_NEGO); + return; + } + + if (udp_pipe_alloc(&p, ep, creq->us_peer_id, sa) != 0) { + udp_send_disc_full( + ep, sa, 0, creq->us_sender_id, 0, DISC_NOBUF); + return; + } + p->refresh = ep->refresh; + if (creq->us_refresh < p->refresh) { + p->refresh = creq->us_refresh; + } + p->peer = creq->us_type; + p->peer_id = creq->us_sender_id; + p->peer_seq = creq->us_sequence + 1; + p->sndmax = creq->us_recv_max; + p->refresh = ep->refresh; + p->next_wake = now + p->refresh; + p->expire = now + (p->refresh * (5 * NNI_SECOND) / 2); + if (p->next_wake < ep->next_wake) { + // wake up the timer since our refresh interval is shorter + ep->next_wake = p->next_wake; + nni_aio_abort(&ep->timeaio, NNG_EINTR); + } + nni_list_append(&ep->connpipes, p); + udp_send_cack(ep, p); + udp_ep_match(ep); +} + +static void +udp_recv_cack(udp_ep *ep, udp_sp_creq *cack, nng_sockaddr *sa) +{ + udp_pipe *p; + bool first; + nni_time now; + + if ((p = udp_find_pipe(ep, cack->us_peer_id, cack->us_sender_id)) && + (!p->closed)) { + if ((p->peer_id != 0) && (p->peer != cack->us_type)) { + udp_send_disc(ep, p, DISC_TYPE); + return; + } + + first = (p->peer_id == 0); + + // so we know who it is from.. this is a refresh. + p->sndmax = cack->us_recv_max; + p->peer = cack->us_type; + p->peer_id = cack->us_sender_id; + + if (cack->us_refresh == 0) { + udp_send_disc(ep, p, DISC_NEGO); + return; + } + if (first) { + p->refresh = ep->refresh; + p->peer_seq = cack->us_sequence + 1; + } + if (cack->us_refresh < p->refresh) { + p->refresh = cack->us_refresh; + } + now = nni_clock(); + p->next_wake = now + UDP_PIPE_REFRESH(p); + p->expire = now + UDP_PIPE_TIMEOUT(p); + if (p->next_wake < ep->next_wake) { + // earlier wake up, kick it with EINTR + nni_aio_abort(&ep->timeaio, NNG_EINTR); + } + + if (first) { + nni_list_append(&ep->connpipes, p); + udp_ep_match(ep); + } + return; + } + + // a CACK without a corresponding CREQ (or timed out pipe already) + udp_send_disc_full( + ep, sa, cack->us_peer_id, cack->us_sender_id, 0, DISC_NOTCONN); +} + +static void +udp_tx_cb(void *arg) +{ + udp_ep *ep = arg; + + nni_mtx_lock(&ep->mtx); + udp_finish_tx(ep); + nni_mtx_unlock(&ep->mtx); +} + +// In the case of unicast UDP, we don't know +// whether the message arrived from a connected peer as part of a +// logical connection, or is a message related to connection management. +static void +udp_rx_cb(void *arg) +{ + udp_ep *ep = arg; + nni_aio *aio = &ep->rx_aio; + int rv; + size_t n; + udp_sp_msg *hdr; + nng_sockaddr *sa; + nni_aio_completions complq; + + // for a received packet we are either receiving it for a + // connection we already have established, or for a new connection. + // Dialers cannot receive connection requests (as a safety + // precaution). + + nni_mtx_lock(&ep->mtx); + if ((rv = nni_aio_result(aio)) != 0) { + // something bad happened on RX... which is unexpected. + // sleep a little bit and hope for recovery. + switch (nni_aio_result(aio)) { + case NNG_ECLOSED: + case NNG_ECANCELED: + return; + case NNG_ETIMEDOUT: + case NNG_EAGAIN: + case NNG_EINTR: + ep->cooldown = false; + goto finish; + break; + default: + ep->cooldown = true; + nni_sleep_aio(5, aio); + return; + } + } + if (ep->cooldown) { + ep->cooldown = false; + goto finish; + } + + // Received message will be in the ep rx header. + hdr = &ep->rx_msg; + sa = &ep->rx_sa; + n = nng_aio_count(aio); + + if ((n >= sizeof(*hdr)) && (hdr->data.us_ver == 1)) { + n -= sizeof(*hdr); + +#ifndef NNG_LITTLE_ENDIAN + // Fix the endianness, so other routines don't have to. + // It turns out that the endianness of the fields of CREQ + // is compatible with the fields of every other message type. + // We only have to do this for systems that are not known + // (at compile time) to be little endian. + hdr->data.us_type = NNI_GET16LE(&hdr->data.us_type); + hdr->data.us_sender_id = NNI_GET32LE(&hdr->data.us_sender_id); + hdr->data.us_peeer_id = NNI_GET32LE(&hdr->data.us_peer_id); + hdr->data.us_sequence = NNI_GET32LE(&hdr->data.us_sequence); + hdr->data.us_length = NNI_GET16LE(&hdr->data.us_length); +#endif + + // TODO: verify that incoming type matches us! + + switch (hdr->data.us_op_code) { + case OPCODE_DATA: + udp_recv_data(ep, &hdr->data, n, sa); + break; + case OPCODE_CREQ: + udp_recv_creq(ep, &hdr->creq, sa); + break; + case OPCODE_CACK: + udp_recv_cack(ep, &hdr->cack, sa); + break; + case OPCODE_DISC: + udp_recv_disc(ep, &hdr->disc, sa); + break; + case OPCODE_MESH: // TODO: + // udp_recv_mesh(ep, &hdr->mesh, sa); + // break; + default: + udp_send_disc_full( + ep, sa, 0, hdr->data.us_sender_id, 0, DISC_PROTO); + break; + } + } + +finish: + // start another receive + udp_start_rx(ep); + + // grab the list of completions so we can finish them. + complq = ep->complq; + nni_aio_completions_init(&ep->complq); + nni_mtx_unlock(&ep->mtx); + + // now run the completions -- synchronously + nni_aio_completions_run(&complq); +} + +static void +udp_pipe_send(void *arg, nni_aio *aio) +{ + udp_pipe *p = arg; + udp_ep *ep; + udp_sp_data dreq; + nng_msg *msg; + + if (nni_aio_begin(aio) != 0) { + // No way to give the message back to the protocol, + // so we just discard it silently to prevent it from leaking. + nni_msg_free(nni_aio_get_msg(aio)); + nni_aio_set_msg(aio, NULL); + return; + } + + msg = nni_aio_get_msg(aio); + ep = p->ep; + + nni_mtx_lock(&ep->mtx); + if ((nni_msg_len(msg) + nni_msg_header_len(msg)) > p->sndmax) { + nni_mtx_unlock(&ep->mtx); + // rather failing this with an error, we just drop it on the + // floor. this is on the sender, so there isn't a compelling + // need to disconnect the pipe, since it we're not being + // "ill-behaved" to our peer. + // TODO: bump a stat + nni_msg_free(msg); + return; + } + + dreq.us_ver = 1; + dreq.us_type = ep->proto; + dreq.us_op_code = OPCODE_DATA; + dreq.us_sender_id = p->self_id; + dreq.us_peer_id = p->peer_id; + dreq.us_sequence = p->self_seq++; + dreq.us_length = + msg != NULL ? nni_msg_len(msg) + nni_msg_header_len(msg) : 0; + + // Just queue it, or fail it. + udp_queue_tx(ep, &p->peer_addr, (void *) &dreq, msg); + nni_mtx_unlock(&ep->mtx); + nni_aio_finish(aio, 0, dreq.us_length); +} + +static void +udp_pipe_recv_cancel(nni_aio *aio, void *arg, int rv) +{ + udp_pipe *p = arg; + udp_ep *ep = p->ep; + + nni_mtx_lock(&ep->mtx); + if (!nni_aio_list_active(aio)) { + nni_mtx_unlock(&ep->mtx); + return; + } + nni_aio_list_remove(aio); + nni_mtx_unlock(&ep->mtx); + nni_aio_finish_error(aio, rv); +} + +static void +udp_pipe_recv(void *arg, nni_aio *aio) +{ + udp_pipe *p = arg; + udp_ep *ep = p->ep; + int rv; + + if (nni_aio_begin(aio) != 0) { + return; + } + nni_mtx_lock(&ep->mtx); + if (p->closed) { + nni_mtx_unlock(&ep->mtx); + nni_aio_finish_error(aio, NNG_ECLOSED); + return; + } + if ((rv = nni_aio_schedule(aio, udp_pipe_recv_cancel, p)) != 0) { + nni_mtx_unlock(&ep->mtx); + nni_aio_finish_error(aio, rv); + return; + } + + nni_list_append(&p->rx_aios, aio); + nni_mtx_unlock(&ep->mtx); +} + +static uint16_t +udp_pipe_peer(void *arg) +{ + udp_pipe *p = arg; + + return (p->peer); +} + +static int +udp_pipe_get_recvmax(void *arg, void *v, size_t *szp, nni_type t) +{ + udp_pipe *p = arg; + udp_ep *ep = p->ep; + int rv; + nni_mtx_lock(&ep->mtx); + rv = nni_copyout_size(p->rcvmax, v, szp, t); + nni_mtx_unlock(&ep->mtx); + return (rv); +} + +static int +udp_pipe_get_remaddr(void *arg, void *v, size_t *szp, nni_type t) +{ + udp_pipe *p = arg; + udp_ep *ep = p->ep; + int rv; + nni_mtx_lock(&ep->mtx); + rv = nni_copyout_sockaddr(&p->peer_addr, v, szp, t); + nni_mtx_unlock(&ep->mtx); + return (rv); +} + +static nni_option udp_pipe_options[] = { + { + .o_name = NNG_OPT_RECVMAXSZ, + .o_get = udp_pipe_get_recvmax, + }, + { + .o_name = NNG_OPT_REMADDR, + .o_get = udp_pipe_get_remaddr, + }, + { + .o_name = NULL, + }, +}; + +static int +udp_pipe_getopt( + void *arg, const char *name, void *buf, size_t *szp, nni_type t) +{ + udp_pipe *p = arg; + int rv; + + rv = nni_getopt(udp_pipe_options, name, p, buf, szp, t); + return (rv); +} + +// udp_ep_hold simply bumps the reference count. +// This needs to be done with the lock for the EP held. +static void +udp_ep_hold(udp_ep *ep) +{ + ep->refcnt++; +} + +// udp_ep_rele drops the reference count on the endpoint. +// If the endpoint drops to zero, the EP is freed. It also +// unlocks the mutex, which must be held when calling this. +static void +udp_ep_rele(udp_ep *ep) +{ + nni_aio *aio; + NNI_ASSERT(ep->refcnt > 0); + ep->refcnt--; + if (!ep->fini || ep->refcnt > 0) { + nni_mtx_unlock(&ep->mtx); + return; + } + while ((aio = nni_list_first(&ep->connaios)) != NULL) { + nni_aio_list_remove(aio); + nni_aio_finish_error(aio, NNG_ECLOSED); + } + nni_mtx_unlock(&ep->mtx); + nni_aio_close(&ep->timeaio); + nni_aio_close(&ep->resaio); + nni_aio_close(&ep->tx_aio); + nni_aio_close(&ep->rx_aio); + if (ep->udp != NULL) { + nni_udp_close(ep->udp); + } + nni_aio_fini(&ep->timeaio); + nni_aio_fini(&ep->resaio); + nni_aio_fini(&ep->tx_aio); + nni_aio_fini(&ep->rx_aio); + nni_id_map_fini(&ep->pipes); + NNI_FREE_STRUCTS(ep->tx_ring.descs, ep->tx_ring.size); + NNI_FREE_STRUCT(ep); +} + +static void +udp_ep_fini(void *arg) +{ + udp_ep *ep = arg; + + // We optionally linger a little bit (up to a half second) + // so that the disconnect messages can get pushed out. On + // most systems this should only take a single millisecond. + nni_time linger = + nni_clock() + NNI_SECOND / 2; // half second to drain, max + nni_mtx_lock(&ep->mtx); + ep->fini = true; + while ((ep->tx_ring.count > 0) && (nni_clock() < linger)) { + nni_mtx_unlock(&ep->mtx); + nng_msleep(1); + nni_mtx_lock(&ep->mtx); + } + if (ep->tx_ring.count > 0) { + nng_log_warn("NNG-UDP-LINGER", + "Lingering timed out on endpoint close, peer " + "notifications dropped"); + } + udp_ep_rele(ep); // drops the lock +} + +static void +udp_ep_close(void *arg) +{ + udp_ep *ep = arg; + udp_pipe *p; + nni_aio *aio; + + nni_mtx_lock(&ep->mtx); + while ((aio = nni_list_first(&ep->connaios)) != NULL) { + nni_aio_list_remove(aio); + nni_aio_finish_error(aio, NNG_ECONNABORTED); + } + while ((p = nni_list_first(&ep->connpipes)) != NULL) { + NNI_ASSERT(p->npipe == NULL); + nni_list_remove(&ep->connpipes, p); + nni_id_remove(&ep->pipes, p->self_id); + udp_pipe_destroy(p); + ep->refcnt--; + } + + // close all pipes + uint32_t cursor = 0; + while (nni_id_visit(&ep->pipes, NULL, (void **) &p, &cursor)) { + p->closed = true; + if (p->peer_id != 0) { + udp_send_disc(ep, p, DISC_CLOSED); + } + while ((aio = nni_list_first(&p->rx_aios)) != NULL) { + nni_aio_list_remove(aio); + nni_aio_finish_error(aio, NNG_ECLOSED); + } + } + ep->closed = true; + nni_aio_close(&ep->resaio); + nni_mtx_unlock(&ep->mtx); +} + +// timer handler - sends out additional creqs as needed, +// reaps stale connections, and handles linger. +static void +udp_timer_cb(void *arg) +{ + udp_ep *ep = arg; + udp_pipe *p; + int rv; + + nni_mtx_lock(&ep->mtx); + rv = nni_aio_result(&ep->timeaio); + if ((rv == NNG_ECLOSED) || (rv == NNG_ECANCELED) || ep->closed) { + nni_mtx_unlock(&ep->mtx); + return; + } + + uint32_t cursor = 0; + nni_time now = nni_clock(); + uint16_t refresh = (nng_duration) ep->refresh * NNI_SECOND; + + ep->next_wake = NNI_TIME_NEVER; + while (nni_id_visit(&ep->pipes, NULL, (void **) &p, &cursor)) { + + if (p->closed) { + continue; + } + + if (now > p->expire) { + char buf[128]; + nni_aio *aio; + nng_log_info("NNG-UDP-INACTIVE", + "Pipe peer %s timed out due to inactivity", + nng_str_sockaddr(&p->peer_addr, buf, sizeof(buf))); + if ((ep->dialer) && (p->peer_id == 0) && + (aio = nni_list_first(&ep->connaios))) { + nni_aio_list_remove(aio); + nni_aio_finish_error(aio, NNG_ETIMEDOUT); + } + // This will probably not be received by the peer, + // since we aren't getting anything from them. But + // having it on the wire may help debugging later. + udp_send_disc(ep, p, DISC_INACTIVE); + continue; + } + + if (p->dialer && now > p->next_creq) { + udp_send_creq(ep, p); + } + if (p->next_wake < ep->next_wake) { + ep->next_wake = p->next_wake; + } + } + refresh = ep->next_wake == NNI_TIME_NEVER ? NNG_DURATION_INFINITE + : ep->next_wake - now; + nni_sleep_aio(refresh, &ep->timeaio); + nni_mtx_unlock(&ep->mtx); +} + +static int +udp_ep_init(udp_ep **epp, nng_url *url, nni_sock *sock) +{ + udp_ep *ep; + int rv; + + if ((ep = NNI_ALLOC_STRUCT(ep)) == NULL) { + return (NNG_ENOMEM); + } + + ep->tx_ring.descs = + NNI_ALLOC_STRUCTS(ep->tx_ring.descs, NNG_UDP_TXQUEUE_LEN); + if (ep->tx_ring.descs == NULL) { + NNI_FREE_STRUCT(ep); + return (NNG_ENOMEM); + } + ep->tx_ring.size = NNG_UDP_TXQUEUE_LEN; + + ep->af = nni_url_family(url->u_scheme); + ep->self_sa.s_family = ep->af; + ep->proto = nni_sock_proto_id(sock); + ep->url = url; + ep->refresh = NNG_UDP_REFRESH; // one minute by default + ep->rcvmax = NNG_UDP_RECVMAX; + ep->refcnt = 1; + if ((rv = nni_msg_alloc(&ep->rx_payload, + ep->rcvmax + sizeof(ep->rx_msg)) != 0)) { + NNI_FREE_STRUCTS(ep->tx_ring.descs, NNG_UDP_TXQUEUE_LEN); + NNI_FREE_STRUCT(ep); + return (rv); + } + + nni_mtx_init(&ep->mtx); + nni_id_map_init(&ep->pipes, 1, 0xFFFFFFFF, true); + NNI_LIST_INIT(&ep->connpipes, udp_pipe, node); + nni_aio_list_init(&ep->connaios); + + nni_aio_init(&ep->rx_aio, udp_rx_cb, ep); + nni_aio_init(&ep->tx_aio, udp_tx_cb, ep); + nni_aio_init(&ep->timeaio, udp_timer_cb, ep); + nni_aio_init(&ep->resaio, udp_resolv_cb, ep); + nni_aio_completions_init(&ep->complq); + +#ifdef NNG_ENABLE_STATS + static const nni_stat_info rcv_max_info = { + .si_name = "rcv_max", + .si_desc = "maximum receive size", + .si_type = NNG_STAT_LEVEL, + .si_unit = NNG_UNIT_BYTES, + .si_atomic = true, + }; + nni_stat_init(&ep->st_rcv_max, &rcv_max_info); +#endif + + // schedule our timer callback - forever for now + // adjusted automatically as we add pipes or other + // actions which require earlier wakeup. + nni_sleep_aio(NNG_DURATION_INFINITE, &ep->timeaio); + + *epp = ep; + return (0); +} + +static int +udp_check_url(nng_url *url, bool listen) +{ + // Check for invalid URL components. + if ((strlen(url->u_path) != 0) && (strcmp(url->u_path, "/") != 0)) { + return (NNG_EADDRINVAL); + } + if ((url->u_fragment != NULL) || (url->u_userinfo != NULL) || + (url->u_query != NULL)) { + return (NNG_EADDRINVAL); + } + if (!listen) { + if ((strlen(url->u_hostname) == 0) || + (strlen(url->u_port) == 0) || (atoi(url->u_port) == 0)) { + return (NNG_EADDRINVAL); + } + } + return (0); +} + +static int +udp_dialer_init(void **dp, nng_url *url, nni_dialer *ndialer) +{ + udp_ep *ep; + int rv; + nni_sock *sock = nni_dialer_sock(ndialer); + + if ((rv = udp_check_url(url, false)) != 0) { + return (rv); + } + + if ((rv = udp_ep_init(&ep, url, sock)) != 0) { + return (rv); + } + +#ifdef NNG_ENABLE_STATS + nni_dialer_add_stat(ndialer, &ep->st_rcv_max); +#endif + *dp = ep; + return (0); +} + +static int +udp_listener_init(void **lp, nng_url *url, nni_listener *nlistener) +{ + udp_ep *ep; + int rv; + nni_sock *sock = nni_listener_sock(nlistener); + + // Check for invalid URL components. + if ((rv = udp_check_url(url, true)) != 0) { + return (rv); + } + + if ((rv = udp_ep_init(&ep, url, sock)) != 0) { + return (rv); + } + + if ((rv = nni_url_to_address(&ep->self_sa, url)) != 0) { + return (rv); + } + +#ifdef NNG_ENABLE_STATS + nni_listener_add_stat(nlistener, &ep->st_rcv_max); +#endif + + *lp = ep; + return (0); +} + +static void +udp_ep_cancel(nni_aio *aio, void *arg, int rv) +{ + udp_ep *ep = arg; + nni_mtx_lock(&ep->mtx); + if (nni_aio_list_active(aio)) { + nni_aio_list_remove(aio); + nni_aio_finish_error(aio, rv); + nni_aio_abort(&ep->resaio, rv); + } + nni_mtx_unlock(&ep->mtx); +} + +static void +udp_resolv_cb(void *arg) +{ + udp_ep *ep = arg; + udp_pipe *p; + nni_aio *aio; + int rv; + nni_mtx_lock(&ep->mtx); + if ((aio = nni_list_first(&ep->connaios)) == NULL) { + nni_mtx_unlock(&ep->mtx); + return; + } + if (ep->closed) { + nni_aio_list_remove(aio); + nni_mtx_unlock(&ep->mtx); + nni_aio_finish_error(aio, NNG_ECLOSED); + return; + } + if ((rv = nni_aio_result(&ep->resaio)) != 0) { + nni_aio_list_remove(aio); + nni_mtx_unlock(&ep->mtx); + nng_log_warn("NNG-UDP-RESOLV", + "Failed resolving IP address: %s", nng_strerror(rv)); + nni_aio_finish_error(aio, rv); + return; + } + + // Choose the right port to bind to. The family must match. + if (ep->self_sa.s_family == NNG_AF_UNSPEC) { + ep->self_sa.s_family = ep->peer_sa.s_family; + } + + nni_aio_abort(&ep->timeaio, NNG_EINTR); + if (ep->udp == NULL) { + if ((rv = nni_udp_open(&ep->udp, &ep->self_sa)) != 0) { + nni_aio_list_remove(aio); + nni_mtx_unlock(&ep->mtx); + nni_aio_finish_error(aio, rv); + return; + } + } + + // places a "hold" on the ep + if ((rv = udp_pipe_alloc(&p, ep, 0, &ep->peer_sa)) != 0) { + nni_aio_list_remove(aio); + nni_mtx_unlock(&ep->mtx); + nni_aio_finish_error(aio, rv); + return; + } + + udp_ep_start(ep); + + // Send out the connection request. We don't complete + // the user aio until we confirm a connection, so that + // we can supply details like maximum receive message size + // and the protocol the peer is using. + udp_send_creq(ep, p); + nni_mtx_unlock(&ep->mtx); +} + +static void +udp_ep_connect(void *arg, nni_aio *aio) +{ + udp_ep *ep = arg; + int rv; + + if (nni_aio_begin(aio) != 0) { + return; + } + nni_mtx_lock(&ep->mtx); + if (ep->closed) { + nni_mtx_unlock(&ep->mtx); + nni_aio_finish_error(aio, NNG_ECLOSED); + return; + } + if (ep->started) { + nni_mtx_unlock(&ep->mtx); + nni_aio_finish_error(aio, NNG_EBUSY); + return; + } + NNI_ASSERT(nni_list_empty(&ep->connaios)); + ep->dialer = true; + + if ((rv = nni_aio_schedule(aio, udp_ep_cancel, ep)) != 0) { + nni_mtx_unlock(&ep->mtx); + nni_aio_finish_error(aio, rv); + return; + } + + nni_list_append(&ep->connaios, aio); + + // lookup the IP address + + nni_aio_set_timeout(&ep->resaio, NNI_SECOND * 5); + nni_resolv_ip(ep->url->u_hostname, ep->url->u_port, ep->af, false, + &ep->peer_sa, &ep->resaio); + + // wake up for retries + nni_aio_abort(&ep->timeaio, NNG_EINTR); + + nni_mtx_unlock(&ep->mtx); +} + +static int +udp_ep_get_port(void *arg, void *buf, size_t *szp, nni_type t) +{ + udp_ep *ep = arg; + nng_sockaddr sa; + int port; + uint8_t *paddr; + + nni_mtx_lock(&ep->mtx); + if (ep->udp != NULL) { + (void) nni_udp_sockname(ep->udp, &sa); + } else { + sa = ep->self_sa; + } + switch (sa.s_family) { + case NNG_AF_INET: + paddr = (void *) &sa.s_in.sa_port; + break; + + case NNG_AF_INET6: + paddr = (void *) &sa.s_in6.sa_port; + break; + + default: + paddr = NULL; + break; + } + nni_mtx_unlock(&ep->mtx); + + if (paddr == NULL) { + return (NNG_ESTATE); + } + + NNI_GET16(paddr, port); + return (nni_copyout_int(port, buf, szp, t)); +} + +static int +udp_ep_get_url(void *arg, void *v, size_t *szp, nni_opt_type t) +{ + udp_ep *ep = arg; + char *s; + int rv; + int port = 0; + nng_sockaddr sa; + + nni_mtx_lock(&ep->mtx); + if (ep->udp != NULL) { + (void) nni_udp_sockname(ep->udp, &sa); + } else { + sa = ep->self_sa; + } + switch (sa.s_family) { + case NNG_AF_INET: + NNI_GET16((uint8_t *) &sa.s_in.sa_port, port); + break; + case NNG_AF_INET6: + NNI_GET16((uint8_t *) &sa.s_in6.sa_port, port); + break; + } + if ((rv = nni_url_asprintf_port(&s, ep->url, port)) == 0) { + rv = nni_copyout_str(s, v, szp, t); + nni_strfree(s); + } + nni_mtx_unlock(&ep->mtx); + + return (rv); +} + +static int +udp_ep_get_locaddr(void *arg, void *v, size_t *szp, nni_opt_type t) +{ + udp_ep *ep = arg; + int rv; + nng_sockaddr sa; + + if (ep->udp != NULL) { + (void) nni_udp_sockname(ep->udp, &sa); + } else { + sa = ep->self_sa; + } + + rv = nni_copyout_sockaddr(&sa, v, szp, t); + return (rv); +} + +static int +udp_ep_get_remaddr(void *arg, void *v, size_t *szp, nni_opt_type t) +{ + udp_ep *ep = arg; + int rv; + nng_sockaddr sa; + + if (!ep->dialer) { + return (NNG_ENOTSUP); + } + sa = ep->peer_sa; + + rv = nni_copyout_sockaddr(&sa, v, szp, t); + return (rv); +} + +static int +udp_ep_get_recvmaxsz(void *arg, void *v, size_t *szp, nni_opt_type t) +{ + udp_ep *ep = arg; + int rv; + + nni_mtx_lock(&ep->mtx); + rv = nni_copyout_size(ep->rcvmax, v, szp, t); + nni_mtx_unlock(&ep->mtx); + return (rv); +} + +static int +udp_ep_set_recvmaxsz(void *arg, const void *v, size_t sz, nni_opt_type t) +{ + udp_ep *ep = arg; + size_t val; + int rv; + if ((rv = nni_copyin_size(&val, v, sz, 0, 65000, t)) == 0) { + nni_mtx_lock(&ep->mtx); + if (ep->started) { + nni_mtx_unlock(&ep->mtx); + return (NNG_EBUSY); + } + ep->rcvmax = (uint16_t) val; + nni_mtx_unlock(&ep->mtx); +#ifdef NNG_ENABLE_STATS + nni_stat_set_value(&ep->st_rcv_max, val); +#endif + } + return (rv); +} + +// this just looks for pipes waiting for an aio, and aios waiting for +// a connection, and matches them together. +static void +udp_ep_match(udp_ep *ep) +{ + nng_aio *aio = nni_list_first(&ep->connaios); + udp_pipe *p = nni_list_first(&ep->connpipes); + + if ((aio == NULL) || (p == NULL)) { + return; + } + + nni_aio_list_remove(aio); + nni_list_remove(&ep->connpipes, p); + nni_aio_set_output(aio, 0, p); + nni_aio_finish(aio, 0, 0); +} + +static void +udp_ep_start(udp_ep *ep) +{ + ep->started = true; + udp_start_rx(ep); +} + +static int +udp_ep_bind(void *arg) +{ + udp_ep *ep = arg; + int rv; + + nni_mtx_lock(&ep->mtx); + if (ep->started) { + nni_mtx_unlock(&ep->mtx); + return (NNG_EBUSY); + } + + rv = nni_udp_open(&ep->udp, &ep->self_sa); + if (rv != 0) { + nni_mtx_unlock(&ep->mtx); + return (rv); + } + udp_ep_start(ep); + nni_mtx_unlock(&ep->mtx); + + return (rv); +} + +static void +udp_ep_accept(void *arg, nni_aio *aio) +{ + udp_ep *ep = arg; + int rv; + + if (nni_aio_begin(aio) != 0) { + return; + } + nni_mtx_lock(&ep->mtx); + if (ep->closed) { + nni_mtx_unlock(&ep->mtx); + nni_aio_finish_error(aio, NNG_ECLOSED); + return; + } + if ((rv = nni_aio_schedule(aio, udp_ep_cancel, ep)) != 0) { + nni_mtx_unlock(&ep->mtx); + nni_aio_finish_error(aio, rv); + return; + } + nni_aio_list_append(&ep->connaios, aio); + udp_ep_match(ep); + nni_mtx_unlock(&ep->mtx); +} + +static nni_sp_pipe_ops udp_pipe_ops = { + .p_init = udp_pipe_init, + .p_fini = udp_pipe_fini, + .p_stop = udp_pipe_stop, + .p_send = udp_pipe_send, + .p_recv = udp_pipe_recv, + .p_close = udp_pipe_close, + .p_peer = udp_pipe_peer, + .p_getopt = udp_pipe_getopt, +}; + +static const nni_option udp_ep_opts[] = { + { + .o_name = NNG_OPT_RECVMAXSZ, + .o_get = udp_ep_get_recvmaxsz, + .o_set = udp_ep_set_recvmaxsz, + }, + { + .o_name = NNG_OPT_URL, + .o_get = udp_ep_get_url, + }, + { + .o_name = NNG_OPT_LOCADDR, + .o_get = udp_ep_get_locaddr, + }, + { + .o_name = NNG_OPT_REMADDR, + .o_get = udp_ep_get_remaddr, + }, + { + .o_name = NNG_OPT_TCP_BOUND_PORT, + .o_get = udp_ep_get_port, + }, + // terminate list + { + .o_name = NULL, + }, +}; + +static int +udp_dialer_getopt( + void *arg, const char *name, void *buf, size_t *szp, nni_type t) +{ + udp_ep *ep = arg; + + return (nni_getopt(udp_ep_opts, name, ep, buf, szp, t)); +} + +static int +udp_dialer_setopt( + void *arg, const char *name, const void *buf, size_t sz, nni_type t) +{ + udp_ep *ep = arg; + + return (nni_setopt(udp_ep_opts, name, ep, buf, sz, t)); +} + +static int +udp_listener_getopt( + void *arg, const char *name, void *buf, size_t *szp, nni_type t) +{ + udp_ep *ep = arg; + + return (nni_getopt(udp_ep_opts, name, ep, buf, szp, t)); +} + +static int +udp_listener_setopt( + void *arg, const char *name, const void *buf, size_t sz, nni_type t) +{ + udp_ep *ep = arg; + + return (nni_setopt(udp_ep_opts, name, ep, buf, sz, t)); +} + +static nni_sp_dialer_ops udp_dialer_ops = { + .d_init = udp_dialer_init, + .d_fini = udp_ep_fini, + .d_connect = udp_ep_connect, + .d_close = udp_ep_close, + .d_getopt = udp_dialer_getopt, + .d_setopt = udp_dialer_setopt, +}; + +static nni_sp_listener_ops udp_listener_ops = { + .l_init = udp_listener_init, + .l_fini = udp_ep_fini, + .l_bind = udp_ep_bind, + .l_accept = udp_ep_accept, + .l_close = udp_ep_close, + .l_getopt = udp_listener_getopt, + .l_setopt = udp_listener_setopt, +}; + +static nni_sp_tran udp_tran = { + .tran_scheme = "udp", + .tran_dialer = &udp_dialer_ops, + .tran_listener = &udp_listener_ops, + .tran_pipe = &udp_pipe_ops, + .tran_init = udp_tran_init, + .tran_fini = udp_tran_fini, +}; + +static nni_sp_tran udp4_tran = { + .tran_scheme = "udp4", + .tran_dialer = &udp_dialer_ops, + .tran_listener = &udp_listener_ops, + .tran_pipe = &udp_pipe_ops, + .tran_init = udp_tran_init, + .tran_fini = udp_tran_fini, +}; + +#ifdef NNG_ENABLE_IPV6 +static nni_sp_tran udp6_tran = { + .tran_scheme = "udp6", + .tran_dialer = &udp_dialer_ops, + .tran_listener = &udp_listener_ops, + .tran_pipe = &udp_pipe_ops, + .tran_init = udp_tran_init, + .tran_fini = udp_tran_fini, +}; +#endif + +void +nni_sp_udp_register(void) +{ + nni_sp_tran_register(&udp_tran); + nni_sp_tran_register(&udp4_tran); +#ifdef NNG_ENABLE_IPV6 + nni_sp_tran_register(&udp6_tran); +#endif +} diff --git a/src/sp/transport/udp/udp_tran_test.c b/src/sp/transport/udp/udp_tran_test.c new file mode 100644 index 000000000..b99c5af19 --- /dev/null +++ b/src/sp/transport/udp/udp_tran_test.c @@ -0,0 +1,171 @@ +// +// Copyright 2024 Staysail Systems, Inc. +// Copyright 2018 Capitar IT Group BV +// Copyright 2018 Devolutions +// Copyright 2018 Cody Piersall +// +// This software is supplied under the terms of the MIT License, a +// copy of which should be located in the distribution where this +// file was obtained (LICENSE.txt). A copy of the license may also be +// found online at https://opensource.org/licenses/MIT. +// + +#include "nng/nng.h" +#include + +// TCP tests. + +static void +test_udp_wild_card_connect_fail(void) +{ + nng_socket s; + char addr[NNG_MAXADDRLEN]; + + NUTS_OPEN(s); + (void) snprintf(addr, sizeof(addr), "udp://*:%u", nuts_next_port()); + NUTS_FAIL(nng_dial(s, addr, NULL, 0), NNG_EADDRINVAL); + NUTS_CLOSE(s); +} + +void +test_udp_wild_card_bind(void) +{ + nng_socket s1; + nng_socket s2; + char addr[NNG_MAXADDRLEN]; + uint16_t port; + + port = nuts_next_port(); + + NUTS_OPEN(s1); + NUTS_OPEN(s2); + (void) snprintf(addr, sizeof(addr), "udp4://*:%u", port); + NUTS_PASS(nng_listen(s1, addr, NULL, 0)); + nng_msleep(500); + (void) snprintf(addr, sizeof(addr), "udp://127.0.0.1:%u", port); + NUTS_PASS(nng_dial(s2, addr, NULL, 0)); + NUTS_CLOSE(s2); + NUTS_CLOSE(s1); +} + +void +test_udp_local_address_connect(void) +{ + + nng_socket s1; + nng_socket s2; + char addr[NNG_MAXADDRLEN]; + uint16_t port; + + NUTS_OPEN(s1); + NUTS_OPEN(s2); + port = nuts_next_port(); + (void) snprintf(addr, sizeof(addr), "udp://127.0.0.1:%u", port); + NUTS_PASS(nng_listen(s1, addr, NULL, 0)); + (void) snprintf(addr, sizeof(addr), "udp://127.0.0.1:%u", port); + NUTS_PASS(nng_dial(s2, addr, NULL, 0)); + NUTS_CLOSE(s2); + NUTS_CLOSE(s1); +} + +void +test_udp_port_zero_bind(void) +{ + nng_socket s1; + nng_socket s2; + nng_sockaddr sa; + nng_listener l; + char *addr; + int port; + + NUTS_OPEN(s1); + NUTS_OPEN(s2); + NUTS_PASS(nng_listen(s1, "udp://127.0.0.1:0", &l, 0)); + nng_msleep(100); + NUTS_PASS(nng_listener_get_string(l, NNG_OPT_URL, &addr)); + NUTS_TRUE(memcmp(addr, "udp://", 6) == 0); + NUTS_PASS(nng_listener_get_addr(l, NNG_OPT_LOCADDR, &sa)); + NUTS_TRUE(sa.s_in.sa_family == NNG_AF_INET); + NUTS_TRUE(sa.s_in.sa_port != 0); + NUTS_TRUE(sa.s_in.sa_addr == nuts_be32(0x7f000001)); + NUTS_PASS(nng_dial(s2, addr, NULL, 0)); + NUTS_PASS(nng_listener_get_int(l, NNG_OPT_TCP_BOUND_PORT, &port)); + NUTS_TRUE(port == nuts_be16(sa.s_in.sa_port)); + nng_strfree(addr); + + NUTS_CLOSE(s2); + NUTS_CLOSE(s1); +} + +void +test_udp_non_local_address(void) +{ + nng_socket s1; + + NUTS_OPEN(s1); + NUTS_FAIL(nng_listen(s1, "udp://8.8.8.8", NULL, 0), NNG_EADDRINVAL); + NUTS_CLOSE(s1); +} + +void +test_udp_malformed_address(void) +{ + nng_socket s1; + + NUTS_OPEN(s1); + NUTS_FAIL(nng_dial(s1, "udp://127.0.0.1", NULL, 0), NNG_EADDRINVAL); + NUTS_FAIL(nng_dial(s1, "udp://127.0.0.1.32", NULL, 0), NNG_EADDRINVAL); + NUTS_FAIL(nng_dial(s1, "udp://127.0.x.1.32", NULL, 0), NNG_EADDRINVAL); + NUTS_FAIL( + nng_listen(s1, "udp://127.0.0.1.32", NULL, 0), NNG_EADDRINVAL); + NUTS_FAIL( + nng_listen(s1, "udp://127.0.x.1.32", NULL, 0), NNG_EADDRINVAL); + NUTS_CLOSE(s1); +} + +void +test_udp_recv_max(void) +{ + char msg[256]; + char buf[256]; + nng_socket s0; + nng_socket s1; + nng_listener l; + size_t sz; + char *addr; + + NUTS_ADDR(addr, "udp"); + + NUTS_OPEN(s0); + NUTS_PASS(nng_socket_set_ms(s0, NNG_OPT_RECVTIMEO, 100)); + NUTS_PASS(nng_socket_set_size(s0, NNG_OPT_RECVMAXSZ, 200)); + NUTS_PASS(nng_listener_create(&l, s0, addr)); + NUTS_PASS(nng_socket_get_size(s0, NNG_OPT_RECVMAXSZ, &sz)); + NUTS_TRUE(sz == 200); + NUTS_PASS(nng_listener_set_size(l, NNG_OPT_RECVMAXSZ, 100)); + NUTS_PASS(nng_listener_start(l, 0)); + + NUTS_OPEN(s1); + NUTS_PASS(nng_dial(s1, addr, NULL, 0)); + nng_msleep(1000); + NUTS_PASS(nng_send(s1, msg, 95, 0)); + NUTS_PASS(nng_socket_set_ms(s1, NNG_OPT_SENDTIMEO, 100)); + NUTS_PASS(nng_recv(s0, buf, &sz, 0)); + NUTS_TRUE(sz == 95); + NUTS_PASS(nng_send(s1, msg, 150, 0)); + NUTS_FAIL(nng_recv(s0, buf, &sz, 0), NNG_ETIMEDOUT); + NUTS_PASS(nng_close(s0)); + NUTS_CLOSE(s1); +} + +NUTS_TESTS = { + + { "udp wild card connect fail", test_udp_wild_card_connect_fail }, + { "udp wild card bind", test_udp_wild_card_bind }, + { "udp port zero bind", test_udp_port_zero_bind }, + { "udp local address connect", test_udp_local_address_connect }, + { "udp non-local address", test_udp_non_local_address }, + { "udp malformed address", test_udp_malformed_address }, + { "udp recv max", test_udp_recv_max }, + { NULL, NULL }, +}; diff --git a/src/testing/marry.c b/src/testing/marry.c index d39cf583a..7441468ff 100644 --- a/src/testing/marry.c +++ b/src/testing/marry.c @@ -1,5 +1,5 @@ // -// Copyright 2021 Staysail Systems, Inc. +// Copyright 2024 Staysail Systems, Inc. // Copyright 2018 Capitar IT Group BV // // This software is supplied under the terms of the MIT License, a @@ -46,7 +46,8 @@ nuts_scratch_addr(const char *scheme, size_t sz, char *addr) } if ((strncmp(scheme, "tcp", 3) == 0) || - (strncmp(scheme, "tls", 3) == 0)) { + (strncmp(scheme, "tls", 3) == 0) || + (strncmp(scheme, "udp", 3) == 0)) { (void) snprintf( addr, sz, "%s://127.0.0.1:%u", scheme, nuts_next_port()); return; @@ -84,6 +85,7 @@ nuts_scratch_addr(const char *scheme, size_t sz, char *addr) } // We should not be here. + nng_log_err("NUTS", "Unknown scheme"); abort(); }