diff --git a/CMakeLists.txt b/CMakeLists.txt index bc1a3d6..ae9e52d 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -34,6 +34,7 @@ target_sources( src/endian.c src/queue.c src/queue.h + src/link.h src/io.h src/udx.c ) diff --git a/examples/client.c b/examples/client.c index dd101f2..5c885e5 100644 --- a/examples/client.c +++ b/examples/client.c @@ -69,9 +69,9 @@ main (int argc, char **argv) { uv_loop_init(&loop); - udx_init(&loop, &udx); + udx_init(&loop, &udx, NULL); - udx_socket_init(&udx, &sock); + udx_socket_init(&udx, &sock, NULL); struct sockaddr_in addr; uv_ip4_addr("0.0.0.0", 18082, &addr); diff --git a/examples/server.c b/examples/server.c index 1b7aff4..7969166 100644 --- a/examples/server.c +++ b/examples/server.c @@ -115,9 +115,9 @@ main (int argc, char **argv) { chunk.len = 16384; chunk.base = calloc(1, chunk.len); - udx_init(&loop, &udx); + udx_init(&loop, &udx, NULL); - udx_socket_init(&udx, &sock); + udx_socket_init(&udx, &sock, NULL); struct sockaddr_in addr; uv_ip4_addr("0.0.0.0", 18081, &addr); diff --git a/examples/udxperf.c b/examples/udxperf.c index 5ff6811..2b5942b 100644 --- a/examples/udxperf.c +++ b/examples/udxperf.c @@ -599,8 +599,8 @@ main (int argc, char **argv) { } uv_loop_init(&loop); - udx_init(&loop, &udx); - udx_socket_init(&udx, &sock); + udx_init(&loop, &udx, NULL); + udx_socket_init(&udx, &sock, NULL); if (is_server) { server(); diff --git a/include/udx.h b/include/udx.h index c5c0afb..36fb186 100644 --- a/include/udx.h +++ b/include/udx.h @@ -107,16 +107,15 @@ typedef void (*udx_interface_event_close_cb)(udx_interface_event_t *handle); struct udx_s { uv_loop_t *loop; - uint32_t refs; - udx_idle_cb on_idle; + int refs; + bool teardown; + bool has_streams; - uint32_t sockets_len; - uint32_t sockets_max_len; - udx_socket_t **sockets; + udx_idle_cb on_idle; - uint32_t streams_len; - uint32_t streams_max_len; - udx_stream_t **streams; + udx_socket_t *sockets; + udx_stream_t *streams; + udx_interface_event_t *listeners; udx_cirbuf_t streams_by_id; @@ -145,11 +144,14 @@ struct udx_socket_s { udx_queue_t send_queue; + udx_socket_t *prev; + udx_socket_t *next; + + udx_stream_t *streams; + udx_t *udx; udx_cirbuf_t *streams_by_id; // for convenience - int set_id; - bool cmsg_wanted; // include a control buffer for recvmsg int family; int status; @@ -193,7 +195,9 @@ struct udx_stream_s { uint32_t local_id; // must be first entry, so its compat with the cirbuf uint32_t remote_id; - int set_id; + udx_stream_t *prev; + udx_stream_t *next; + int status; int write_wanted; int out_of_order; @@ -392,6 +396,9 @@ struct udx_interface_event_s { uv_loop_t *loop; udx_t *udx; + udx_interface_event_t *prev; + udx_interface_event_t *next; + uv_interface_address_t *addrs; int addrs_len; bool sorted; @@ -403,13 +410,16 @@ struct udx_interface_event_s { }; int -udx_init (uv_loop_t *loop, udx_t *udx); +udx_init (uv_loop_t *loop, udx_t *udx, udx_idle_cb on_idle); + +int +udx_is_idle (udx_t *udx); void -udx_idle (udx_t *udx, udx_idle_cb cb); +udx_teardown (udx_t *udx); int -udx_socket_init (udx_t *udx, udx_socket_t *socket); +udx_socket_init (udx_t *udx, udx_socket_t *socket, udx_socket_close_cb cb); int udx_socket_get_send_buffer_size (udx_socket_t *socket, int *value); @@ -460,7 +470,7 @@ int udx_socket_recv_stop (udx_socket_t *socket); int -udx_socket_close (udx_socket_t *socket, udx_socket_close_cb cb); +udx_socket_close (udx_socket_t *socket); // only exposed here as a convenience / debug tool - the udx instance uses this automatically int @@ -530,7 +540,7 @@ int udx_lookup (udx_t *udx, udx_lookup_t *req, const char *host, unsigned int flags, udx_lookup_cb cb); int -udx_interface_event_init (udx_t *udx, udx_interface_event_t *handle); +udx_interface_event_init (udx_t *udx, udx_interface_event_t *handle, udx_interface_event_close_cb cb); int udx_interface_event_start (udx_interface_event_t *handle, udx_interface_event_cb cb, uint64_t frequency); @@ -539,7 +549,7 @@ int udx_interface_event_stop (udx_interface_event_t *handle); int -udx_interface_event_close (udx_interface_event_t *handle, udx_interface_event_close_cb cb); +udx_interface_event_close (udx_interface_event_t *handle); #ifdef __cplusplus } diff --git a/src/link.h b/src/link.h new file mode 100644 index 0000000..4138a20 --- /dev/null +++ b/src/link.h @@ -0,0 +1,22 @@ +#ifndef UDX_LINK_H +#define UDX_LINK_H + +#define udx__link_add(l, v) \ + if ((l) == NULL) { \ + (v)->next = (v)->prev = NULL; \ + } else { \ + (v)->prev = NULL; \ + (v)->next = (l); \ + (l)->prev = (v); \ + } \ + (l) = (v); + +#define udx__link_remove(l, v) \ + if ((v)->next != NULL) (v)->next->prev = (v)->prev; \ + if ((v)->prev == NULL) (l) = (v)->next; \ + else (v)->prev->next = (v)->next; + +#define udx__link_foreach(l, el) \ + for ((el) = (l); (el) != NULL; (el) = (el)->next) + +#endif diff --git a/src/udx.c b/src/udx.c index a96759d..b0c9614 100644 --- a/src/udx.c +++ b/src/udx.c @@ -15,6 +15,7 @@ #include "endian.h" #include "io.h" #include "queue.h" +#include "link.h" #define UDX_STREAM_ALL_ENDED (UDX_STREAM_ENDED | UDX_STREAM_ENDED_REMOTE) #define UDX_STREAM_DEAD (UDX_STREAM_DESTROYING | UDX_STREAM_CLOSED) @@ -33,8 +34,6 @@ #define UDX_DEFAULT_TTL 64 #define UDX_DEFAULT_BUFFER_SIZE 212992 -#define UDX_DEFAULT_SET_SIZE 16 - #define UDX_MAX_RTO_TIMEOUTS 6 #define UDX_CONG_C 400 // C=0.4 (inverse) in scaled 1000 @@ -159,18 +158,9 @@ ref_dec (udx_t *udx) { if (udx->refs) return; - if (udx->sockets != NULL) { - free(udx->sockets); - udx->sockets = NULL; - udx->sockets_max_len = 0; - } - - if (udx->streams != NULL) { - free(udx->streams); - udx->streams = NULL; - udx->streams_max_len = 0; - + if (udx->has_streams) { udx__cirbuf_destroy(&(udx->streams_by_id)); + udx->has_streams = false; } if (udx->on_idle != NULL) { @@ -223,9 +213,9 @@ socket_write_wanted (udx_socket_t *socket) { return true; } - for (uint32_t i = 0; i < socket->udx->streams_len; i++) { - udx_stream_t *stream = socket->udx->streams[i]; - if (stream->socket == socket && stream_write_wanted(stream)) { + udx_stream_t *stream; + udx__link_foreach(socket->streams, stream) { + if (stream_write_wanted(stream)) { return true; } } @@ -593,11 +583,13 @@ close_stream (udx_stream_t *stream, int err) { debug_printf("closing stream local_id=%u \n", stream->local_id); udx_t *udx = stream->udx; + udx_socket_t *socket = stream->socket; - // Remove from the set, by array[i] = array.pop() - udx_stream_t *other = udx->streams[--(udx->streams_len)]; - udx->streams[stream->set_id] = other; - other->set_id = stream->set_id; + if (socket) { + udx__link_remove(socket->streams, stream); + } else { + udx__link_remove(udx->streams, stream); + } udx__cirbuf_remove(&(udx->streams_by_id), stream->local_id); @@ -649,6 +641,10 @@ close_stream (udx_stream_t *stream, int err) { uv_close((uv_handle_t *) &stream->tlp_timer, finalize_maybe); uv_close((uv_handle_t *) &stream->zwp_timer, finalize_maybe); + if (udx->teardown && socket->streams == NULL) { + udx_socket_close(socket); + } + return 1; } @@ -1204,13 +1200,15 @@ detect_loss_repaired_by_loss_probe (udx_stream_t *stream, uint32_t ack) { static int process_packet (udx_socket_t *socket, char *buf, ssize_t buf_len, struct sockaddr *addr) { + udx_t *udx = socket->udx; + socket->bytes_rx += buf_len; socket->packets_rx += 1; - socket->udx->bytes_rx += buf_len; - socket->udx->packets_rx += 1; + udx->bytes_rx += buf_len; + udx->packets_rx += 1; - if (buf_len < UDX_HEADER_SIZE) return 0; + if (!(udx->has_streams) || buf_len < UDX_HEADER_SIZE) return 0; uint8_t *b = (uint8_t *) buf; @@ -1447,15 +1445,6 @@ process_packet (udx_socket_t *socket, char *buf, ssize_t buf_len, struct sockadd return 1; } -static bool -check_for_streams (udx_socket_t *socket) { - for (uint32_t i = 0; i < socket->udx->streams_len; i++) { - udx_stream_t *stream = socket->udx->streams[i]; - if (stream->socket == socket) return true; - } - return false; -} - static void arm_stream_timers (udx_stream_t *stream, bool sent_tlp); @@ -1889,20 +1878,23 @@ arm_stream_timers (udx_stream_t *stream, bool sent_tlp) { static void send_packets (udx_socket_t *socket) { - bool _continue = send_datagrams(socket); + if (!send_datagrams(socket)) return; - if (!_continue) return; + udx_stream_t *stream; + udx__link_foreach(socket->streams, stream) { + // in case of re-entry, the stream might be closed, ie both us and next one in line was destroyed + // if so no just ignore + if (stream->status & UDX_STREAM_CLOSED) continue; - for (uint32_t i = 0; i < socket->udx->streams_len;) { - udx_stream_t *stream = socket->udx->streams[i]; - if (stream->socket == socket) { - _continue = send_stream_packets(socket, stream); + assert(stream->socket == socket); - if (!_continue) return; - } - // if stream was closed, a new stream shuffled into this slot - if (socket->udx->streams[i] == stream) { - i++; + if (!send_stream_packets(socket, stream)) return; + + // the above could have triggered a re-entry moving this stream to another socket (change_remote) + // if so just restart, unlikely + if (stream->socket != socket) { + stream = socket->streams; + if (!stream || !send_stream_packets(socket, stream)) return; } } } @@ -1952,17 +1944,15 @@ on_uv_poll (uv_poll_t *handle, int status, int events) { } int -udx_init (uv_loop_t *loop, udx_t *udx) { +udx_init (uv_loop_t *loop, udx_t *udx, udx_idle_cb on_idle) { udx->refs = 0; - udx->on_idle = NULL; + udx->teardown = false; + udx->has_streams = false; + udx->on_idle = on_idle; - udx->sockets_len = 0; - udx->sockets_max_len = 0; udx->sockets = NULL; - - udx->streams_len = 0; - udx->streams_max_len = 0; udx->streams = NULL; + udx->listeners = NULL; udx->bytes_rx = 0; udx->bytes_tx = 0; @@ -1981,15 +1971,47 @@ udx_idle (udx_t *udx, udx_idle_cb cb) { } int -udx_socket_init (udx_t *udx, udx_socket_t *socket) { - udx->refs++; +udx_is_idle (udx_t *udx) { + return udx->refs == 0; +} + +void +udx_teardown (udx_t *udx) { + udx->teardown = true; + + udx_socket_t *socket; + udx_stream_t *stream; + udx_interface_event_t *listener; + + udx__link_foreach(udx->sockets, socket) { + if (socket->streams == NULL) { + udx_socket_close(socket); + continue; + } + + udx__link_foreach(socket->streams, stream) { + udx_stream_destroy(stream); + } + } + + udx__link_foreach(udx->streams, stream) { + udx_stream_destroy(stream); + } - if (udx->sockets == NULL) { - udx->sockets_len = 0; - udx->sockets_max_len = UDX_DEFAULT_SET_SIZE; - udx->sockets = malloc(udx->sockets_max_len * sizeof(udx_socket_t *)); + udx__link_foreach(udx->listeners, listener) { + udx_interface_event_close(listener); } +} + +int +udx_socket_init (udx_t *udx, udx_socket_t *socket, udx_socket_close_cb cb) { + if (udx->teardown) return UV_EINVAL; + + udx->refs++; + + udx__link_add(udx->sockets, socket); + socket->streams = NULL; socket->family = 0; socket->status = 0; socket->events = 0; @@ -1999,17 +2021,8 @@ udx_socket_init (udx_t *udx, udx_socket_t *socket) { socket->udx = udx; socket->streams_by_id = &(udx->streams_by_id); - socket->set_id = udx->sockets_len++; - - if (udx->sockets_len == udx->sockets_max_len) { - udx->sockets_max_len *= 2; - udx->sockets = realloc(udx->sockets, udx->sockets_max_len * sizeof(udx_socket_t *)); - } - - udx->sockets[socket->set_id] = socket; - socket->on_recv = NULL; - socket->on_close = NULL; + socket->on_close = cb; socket->bytes_rx = 0; socket->bytes_tx = 0; @@ -2221,13 +2234,11 @@ udx_socket_recv_stop (udx_socket_t *socket) { } int -udx_socket_close (udx_socket_t *socket, udx_socket_close_cb cb) { - if (check_for_streams(socket)) return UV_EBUSY; +udx_socket_close (udx_socket_t *socket) { + if (socket->streams != NULL) return UV_EBUSY; socket->status |= UDX_SOCKET_CLOSED; - socket->on_close = cb; - while (socket->send_queue.len > 0) { udx_packet_t *pkt = udx__queue_data(udx__queue_shift(&socket->send_queue), udx_packet_t, queue); assert(pkt != NULL); @@ -2249,25 +2260,24 @@ udx_socket_close (udx_socket_t *socket, udx_socket_close_cb cb) { uv_close((uv_handle_t *) &(socket->handle), on_uv_close); udx_t *udx = socket->udx; - - udx_socket_t *other = udx->sockets[--(udx->sockets_len)]; - udx->sockets[socket->set_id] = other; - other->set_id = socket->set_id; + udx__link_remove(udx->sockets, socket); return 0; } int udx_stream_init (udx_t *udx, udx_stream_t *stream, uint32_t local_id, udx_stream_close_cb close_cb, udx_stream_finalize_cb finalize_cb) { + if (udx->teardown) return UV_EINVAL; + udx->refs++; - if (udx->streams == NULL) { - udx->streams_len = 0; - udx->streams_max_len = UDX_DEFAULT_SET_SIZE; - udx->streams = malloc(udx->streams_max_len * sizeof(udx_stream_t *)); + if (!(udx->has_streams)) { udx__cirbuf_init(&(udx->streams_by_id), 16); + udx->has_streams = true; } + udx__link_add(udx->streams, stream); + stream->local_id = local_id; stream->remote_id = 0; stream->status = 0; @@ -2376,17 +2386,7 @@ udx_stream_init (udx_t *udx, udx_stream_t *stream, uint32_t local_id, udx_stream udx__queue_init(&stream->inflight_queue); udx__queue_init(&stream->retransmit_queue); - stream->set_id = udx->streams_len++; - - if (udx->streams_len == udx->streams_max_len) { - udx->streams_max_len *= 2; - udx->streams = realloc(udx->streams, udx->streams_max_len * sizeof(udx_stream_t *)); - } - - udx->streams[stream->set_id] = stream; - // Add the socket to the active set - udx__cirbuf_set(&(udx->streams_by_id), (udx_cirbuf_val_t *) stream); return 0; @@ -2469,13 +2469,35 @@ udx_stream_read_stop (udx_stream_t *stream) { return stream->socket == NULL ? 0 : update_poll(stream->socket); } +static void +set_stream_socket (udx_stream_t *stream, udx_socket_t *socket) { + if (stream->socket == socket) return; // just in case + + udx_socket_t *prev = stream->socket; + + // technically its unsafe to remove and add it to another queue + // if iterating the queue we removed from. + if (prev == NULL) { + udx_t *udx = stream->udx; + udx__link_remove(udx->streams, stream); + } else { + udx__link_remove(prev->streams, stream); + } + + stream->socket = socket; + udx__link_add(socket->streams, stream); +} + int udx_stream_change_remote (udx_stream_t *stream, udx_socket_t *socket, uint32_t remote_id, const struct sockaddr *remote_addr, udx_stream_remote_changed_cb on_remote_changed) { - assert(stream->status & UDX_STREAM_CONNECTED); - // the since the udx_t object stores streams_by_id, we cannot migrate streams across udx objects // the local id's of different udx streams may collide. assert(socket->udx == stream->socket->udx); + + if (stream->status & UDX_STREAM_DEAD || stream->udx->teardown) { + return UV_EINVAL; + } + if (!(stream->status & UDX_STREAM_CONNECTED)) { return UV_EINVAL; } @@ -2502,8 +2524,7 @@ udx_stream_change_remote (udx_stream_t *stream, udx_socket_t *socket, uint32_t r } stream->remote_id = remote_id; - - stream->socket = socket; + set_stream_socket(stream, socket); if (stream->seq != stream->remote_acked) { debug_printf("change_remote: id=%u RA=%u Seq=%u\n", stream->local_id, stream->remote_acked, stream->seq); @@ -2526,6 +2547,10 @@ udx_stream_change_remote (udx_stream_t *stream, udx_socket_t *socket, uint32_t r int udx_stream_connect (udx_stream_t *stream, udx_socket_t *socket, uint32_t remote_id, const struct sockaddr *remote_addr) { + if (stream->status & UDX_STREAM_DEAD || stream->udx->teardown) { + return UV_EINVAL; + } + if (stream->status & UDX_STREAM_CONNECTED) { return UV_EISCONN; } @@ -2533,7 +2558,7 @@ udx_stream_connect (udx_stream_t *stream, udx_socket_t *socket, uint32_t remote_ stream->status |= UDX_STREAM_CONNECTED; stream->remote_id = remote_id; - stream->socket = socket; + set_stream_socket(stream, socket); if (remote_addr->sa_family == AF_INET) { stream->remote_addr_len = sizeof(struct sockaddr_in); @@ -2756,12 +2781,14 @@ on_uv_getaddrinfo (uv_getaddrinfo_t *req, int status, struct addrinfo *res) { int udx_lookup (udx_t *udx, udx_lookup_t *req, const char *host, unsigned int flags, udx_lookup_cb cb) { + if (udx->teardown) return UV_EINVAL; + + udx->refs++; + req->udx = udx; req->on_lookup = cb; req->req.data = req; - udx->refs++; - memset(&req->hints, 0, sizeof(struct addrinfo)); int family = AF_UNSPEC; @@ -2837,6 +2864,9 @@ static void on_interface_event_close (uv_handle_t *handle) { udx_interface_event_t *event = (udx_interface_event_t *) handle->data; + udx_t *udx = event->udx; + udx__link_remove(udx->listeners, event); + if (event->on_close != NULL) { event->on_close(event); } @@ -2845,12 +2875,13 @@ on_interface_event_close (uv_handle_t *handle) { } int -udx_interface_event_init (udx_t *udx, udx_interface_event_t *handle) { +udx_interface_event_init (udx_t *udx, udx_interface_event_t *handle, udx_interface_event_close_cb cb) { + if (udx->teardown) return UV_EINVAL; + handle->udx = udx; handle->loop = udx->loop; handle->sorted = false; - - udx->refs++; + handle->on_close = cb; int err = uv_interface_addresses(&(handle->addrs), &(handle->addrs_len)); if (err < 0) return err; @@ -2860,6 +2891,9 @@ udx_interface_event_init (udx_t *udx, udx_interface_event_t *handle) { handle->timer.data = handle; + udx->refs++; + udx__link_add(udx->listeners, handle); + return 0; } @@ -2880,9 +2914,8 @@ udx_interface_event_stop (udx_interface_event_t *handle) { } int -udx_interface_event_close (udx_interface_event_t *handle, udx_interface_event_close_cb cb) { +udx_interface_event_close (udx_interface_event_t *handle) { handle->on_event = NULL; - handle->on_close = cb; uv_free_interface_addresses(handle->addrs, handle->addrs_len); diff --git a/test/lookup-invalid.c b/test/lookup-invalid.c index 22d3ba4..2d8a80e 100644 --- a/test/lookup-invalid.c +++ b/test/lookup-invalid.c @@ -22,7 +22,7 @@ main () { int e; uv_loop_init(&loop); - udx_init(&loop, &udx); + udx_init(&loop, &udx, NULL); e = udx_lookup(&udx, &req, "example.invalid.", 0, on_lookup); assert(e == 0); diff --git a/test/lookup-ipv6.c b/test/lookup-ipv6.c index 5bf7549..405fe5b 100644 --- a/test/lookup-ipv6.c +++ b/test/lookup-ipv6.c @@ -27,7 +27,7 @@ main () { int e; uv_loop_init(&loop); - udx_init(&loop, &udx); + udx_init(&loop, &udx, NULL); e = udx_lookup(&udx, &req, "localhost", UDX_LOOKUP_FAMILY_IPV6, on_lookup); assert(e == 0); diff --git a/test/lookup.c b/test/lookup.c index 7fe4fe8..c41541f 100644 --- a/test/lookup.c +++ b/test/lookup.c @@ -27,7 +27,7 @@ main () { int e; uv_loop_init(&loop); - udx_init(&loop, &udx); + udx_init(&loop, &udx, NULL); e = udx_lookup(&udx, &req, "localhost", UDX_LOOKUP_FAMILY_IPV4, on_lookup); assert(e == 0); diff --git a/test/socket-send-recv-dualstack.c b/test/socket-send-recv-dualstack.c index a1caa2c..0915097 100644 --- a/test/socket-send-recv-dualstack.c +++ b/test/socket-send-recv-dualstack.c @@ -40,13 +40,13 @@ main () { uv_loop_init(&loop); - e = udx_init(&loop, &udx); + e = udx_init(&loop, &udx, NULL); assert(e == 0); - e = udx_socket_init(&udx, &asock); + e = udx_socket_init(&udx, &asock, NULL); assert(e == 0); - e = udx_socket_init(&udx, &bsock); + e = udx_socket_init(&udx, &bsock, NULL); assert(e == 0); struct sockaddr_in baddr; diff --git a/test/socket-send-recv-ipv6.c b/test/socket-send-recv-ipv6.c index d2c9204..33294bf 100644 --- a/test/socket-send-recv-ipv6.c +++ b/test/socket-send-recv-ipv6.c @@ -40,13 +40,13 @@ main () { uv_loop_init(&loop); - e = udx_init(&loop, &udx); + e = udx_init(&loop, &udx, NULL); assert(e == 0); - e = udx_socket_init(&udx, &asock); + e = udx_socket_init(&udx, &asock, NULL); assert(e == 0); - e = udx_socket_init(&udx, &bsock); + e = udx_socket_init(&udx, &bsock, NULL); assert(e == 0); struct sockaddr_in6 baddr; diff --git a/test/socket-send-recv-multicast.c b/test/socket-send-recv-multicast.c index 8945367..2c5b45e 100644 --- a/test/socket-send-recv-multicast.c +++ b/test/socket-send-recv-multicast.c @@ -46,13 +46,13 @@ main () { uv_loop_init(&loop); - e = udx_init(&loop, &udx); + e = udx_init(&loop, &udx, NULL); assert(e == 0); - e = udx_socket_init(&udx, &asock); + e = udx_socket_init(&udx, &asock, NULL); assert(e == 0); - e = udx_socket_init(&udx, &bsock); + e = udx_socket_init(&udx, &bsock, NULL); assert(e == 0); struct sockaddr_in baddr; diff --git a/test/socket-send-recv.c b/test/socket-send-recv.c index d200d21..1b7a2fc 100644 --- a/test/socket-send-recv.c +++ b/test/socket-send-recv.c @@ -40,13 +40,13 @@ main () { uv_loop_init(&loop); - e = udx_init(&loop, &udx); + e = udx_init(&loop, &udx, NULL); assert(e == 0); - e = udx_socket_init(&udx, &asock); + e = udx_socket_init(&udx, &asock, NULL); assert(e == 0); - e = udx_socket_init(&udx, &bsock); + e = udx_socket_init(&udx, &bsock, NULL); assert(e == 0); struct sockaddr_in baddr; diff --git a/test/stream-change-remote.c b/test/stream-change-remote.c index 8387d35..8c9f54f 100644 --- a/test/stream-change-remote.c +++ b/test/stream-change-remote.c @@ -91,19 +91,19 @@ main () { uv_loop_init(&loop); - e = udx_init(&loop, &udx); + e = udx_init(&loop, &udx, NULL); assert(e == 0); - e = udx_socket_init(&udx, &asock); + e = udx_socket_init(&udx, &asock, NULL); assert(e == 0); - e = udx_socket_init(&udx, &bsock); + e = udx_socket_init(&udx, &bsock, NULL); assert(e == 0); - e = udx_socket_init(&udx, &csock); + e = udx_socket_init(&udx, &csock, NULL); assert(e == 0); - e = udx_socket_init(&udx, &dsock); + e = udx_socket_init(&udx, &dsock, NULL); assert(e == 0); uv_ip4_addr("127.0.0.1", 8081, &aaddr); diff --git a/test/stream-destroy-before-connect.c b/test/stream-destroy-before-connect.c index 6e19aca..00e2907 100644 --- a/test/stream-destroy-before-connect.c +++ b/test/stream-destroy-before-connect.c @@ -10,7 +10,7 @@ main () { uv_loop_init(&loop); udx_t udx; - e = udx_init(&loop, &udx); + e = udx_init(&loop, &udx, NULL); assert(e == 0); udx_stream_t stream; diff --git a/test/stream-destroy.c b/test/stream-destroy.c index f6412a5..4f4ae46 100644 --- a/test/stream-destroy.c +++ b/test/stream-destroy.c @@ -13,7 +13,7 @@ void on_close (udx_stream_t *handle, int status) { assert(status == 0); - int e = udx_socket_close(&sock, NULL); + int e = udx_socket_close(&sock); assert(e == 0); @@ -26,10 +26,10 @@ main () { uv_loop_init(&loop); - e = udx_init(&loop, &udx); + e = udx_init(&loop, &udx, NULL); assert(e == 0); - e = udx_socket_init(&udx, &sock); + e = udx_socket_init(&udx, &sock, NULL); assert(e == 0); struct sockaddr_in addr; diff --git a/test/stream-multiple.c b/test/stream-multiple.c index effd401..651ab9e 100644 --- a/test/stream-multiple.c +++ b/test/stream-multiple.c @@ -67,7 +67,7 @@ main () { uv_loop_init(&loop); - e = udx_init(&loop, &udx); + e = udx_init(&loop, &udx, NULL); assert(e == 0); uv_buf_t buf = uv_buf_init(malloc(NBYTES_TO_SEND), NBYTES_TO_SEND); @@ -81,7 +81,7 @@ main () { int receiver_id = NSTREAMS + i; receiver[i].read_hash = HASH_INIT; - e = udx_socket_init(&udx, &sender[i].usock); + e = udx_socket_init(&udx, &sender[i].usock, NULL); assert(e == 0); uv_ip4_addr("127.0.0.1", 8000 + i, &sender[i].addr); e = udx_socket_bind(&sender[i].usock, (struct sockaddr *) &sender[i].addr, 0); @@ -89,7 +89,7 @@ main () { sender[i].write = malloc(udx_stream_write_sizeof(1)); e = udx_stream_init(&udx, &sender[i].stream, sender_id, NULL, NULL); - udx_socket_init(&udx, &receiver[i].usock); + udx_socket_init(&udx, &receiver[i].usock, NULL); uv_ip4_addr("127.0.0.1", 8100 + i, &receiver[i].addr); e = udx_socket_bind(&receiver[i].usock, (struct sockaddr *) &receiver[i].addr, 0); assert(e == 0); diff --git a/test/stream-preconnect-same-socket.c b/test/stream-preconnect-same-socket.c index 582bc85..2acb61f 100644 --- a/test/stream-preconnect-same-socket.c +++ b/test/stream-preconnect-same-socket.c @@ -48,10 +48,10 @@ main () { uv_loop_init(&loop); - e = udx_init(&loop, &udx); + e = udx_init(&loop, &udx, NULL); assert(e == 0); - e = udx_socket_init(&udx, &sock); + e = udx_socket_init(&udx, &sock, NULL); assert(e == 0); uv_ip4_addr("127.0.0.1", 8081, &addr); diff --git a/test/stream-preconnect.c b/test/stream-preconnect.c index b9e4272..5661a69 100644 --- a/test/stream-preconnect.c +++ b/test/stream-preconnect.c @@ -49,13 +49,13 @@ main () { uv_loop_init(&loop); - e = udx_init(&loop, &udx); + e = udx_init(&loop, &udx, NULL); assert(e == 0); - e = udx_socket_init(&udx, &asock); + e = udx_socket_init(&udx, &asock, NULL); assert(e == 0); - e = udx_socket_init(&udx, &bsock); + e = udx_socket_init(&udx, &bsock, NULL); assert(e == 0); uv_ip4_addr("127.0.0.1", 8081, &aaddr); diff --git a/test/stream-relay.c b/test/stream-relay.c index 4d3fc50..f79346b 100644 --- a/test/stream-relay.c +++ b/test/stream-relay.c @@ -67,19 +67,19 @@ main () { uv_loop_init(&loop); - e = udx_init(&loop, &udx); + e = udx_init(&loop, &udx, NULL); assert(e == 0); - e = udx_socket_init(&udx, &asock); + e = udx_socket_init(&udx, &asock, NULL); assert(e == 0); - e = udx_socket_init(&udx, &bsock); + e = udx_socket_init(&udx, &bsock, NULL); assert(e == 0); - e = udx_socket_init(&udx, &csock); + e = udx_socket_init(&udx, &csock, NULL); assert(e == 0); - e = udx_socket_init(&udx, &dsock); + e = udx_socket_init(&udx, &dsock, NULL); assert(e == 0); uv_ip4_addr("127.0.0.1", 8081, &aaddr); diff --git a/test/stream-send-recv-ipv6.c b/test/stream-send-recv-ipv6.c index 87d4cc2..938f9c0 100644 --- a/test/stream-send-recv-ipv6.c +++ b/test/stream-send-recv-ipv6.c @@ -43,13 +43,13 @@ main () { uv_loop_init(&loop); - e = udx_init(&loop, &udx); + e = udx_init(&loop, &udx, NULL); assert(e == 0); - e = udx_socket_init(&udx, &asock); + e = udx_socket_init(&udx, &asock, NULL); assert(e == 0); - e = udx_socket_init(&udx, &bsock); + e = udx_socket_init(&udx, &bsock, NULL); assert(e == 0); struct sockaddr_in6 baddr; diff --git a/test/stream-send-recv.c b/test/stream-send-recv.c index 6887689..1555a8a 100644 --- a/test/stream-send-recv.c +++ b/test/stream-send-recv.c @@ -43,13 +43,13 @@ main () { uv_loop_init(&loop); - e = udx_init(&loop, &udx); + e = udx_init(&loop, &udx, NULL); assert(e == 0); - e = udx_socket_init(&udx, &asock); + e = udx_socket_init(&udx, &asock, NULL); assert(e == 0); - e = udx_socket_init(&udx, &bsock); + e = udx_socket_init(&udx, &bsock, NULL); assert(e == 0); struct sockaddr_in baddr; diff --git a/test/stream-write-read-ipv6.c b/test/stream-write-read-ipv6.c index 86ed324..68cb3fa 100644 --- a/test/stream-write-read-ipv6.c +++ b/test/stream-write-read-ipv6.c @@ -44,13 +44,13 @@ main () { uv_loop_init(&loop); - e = udx_init(&loop, &udx); + e = udx_init(&loop, &udx, NULL); assert(e == 0); - e = udx_socket_init(&udx, &asock); + e = udx_socket_init(&udx, &asock, NULL); assert(e == 0); - e = udx_socket_init(&udx, &bsock); + e = udx_socket_init(&udx, &bsock, NULL); assert(e == 0); struct sockaddr_in6 baddr; diff --git a/test/stream-write-read-multiple.c b/test/stream-write-read-multiple.c index ccbe510..4c95dca 100644 --- a/test/stream-write-read-multiple.c +++ b/test/stream-write-read-multiple.c @@ -43,13 +43,13 @@ main () { uv_loop_init(&loop); - e = udx_init(&loop, &udx); + e = udx_init(&loop, &udx, NULL); assert(e == 0); - e = udx_socket_init(&udx, &asock); + e = udx_socket_init(&udx, &asock, NULL); assert(e == 0); - e = udx_socket_init(&udx, &bsock); + e = udx_socket_init(&udx, &bsock, NULL); assert(e == 0); struct sockaddr_in baddr; diff --git a/test/stream-write-read-perf.c b/test/stream-write-read-perf.c index 02e13b2..b921d74 100644 --- a/test/stream-write-read-perf.c +++ b/test/stream-write-read-perf.c @@ -63,7 +63,7 @@ on_b_sock_close () { static void on_b_stream_close () { printf("sending stream closing\n"); - int e = udx_socket_close(&bsock, on_b_sock_close); + int e = udx_socket_close(&bsock); assert(e == 0 && "udx_socket_close (sender, 'b')"); } @@ -75,7 +75,7 @@ on_a_sock_close () { static void on_a_stream_close () { printf("receiving stream closing\n"); - int e = udx_socket_close(&asock, on_a_sock_close); + int e = udx_socket_close(&asock); assert(e == 0 && "udx_socket_close (receiver, 'a')"); } @@ -87,13 +87,13 @@ main () { uv_loop_init(&loop); - e = udx_init(&loop, &udx); + e = udx_init(&loop, &udx, NULL); assert(e == 0); - e = udx_socket_init(&udx, &asock); + e = udx_socket_init(&udx, &asock, on_a_sock_close); assert(e == 0); - e = udx_socket_init(&udx, &bsock); + e = udx_socket_init(&udx, &bsock, on_b_sock_close); assert(e == 0); struct sockaddr_in baddr; diff --git a/test/stream-write-read-receive-window.c b/test/stream-write-read-receive-window.c index 5889139..6e2711f 100644 --- a/test/stream-write-read-receive-window.c +++ b/test/stream-write-read-receive-window.c @@ -49,8 +49,8 @@ void on_finalize (udx_stream_t *stream) { nfinalize++; if (nfinalize == 2) { - udx_socket_close(&send_sock, on_socket_close); - udx_socket_close(&recv_sock, on_socket_close); + udx_socket_close(&send_sock); + udx_socket_close(&recv_sock); } } @@ -90,13 +90,13 @@ main () { uv_loop_init(&loop); - e = udx_init(&loop, &udx); + e = udx_init(&loop, &udx, NULL); assert(e == 0); - e = udx_socket_init(&udx, &recv_sock); + e = udx_socket_init(&udx, &recv_sock, on_socket_close); assert(e == 0); - e = udx_socket_init(&udx, &send_sock); + e = udx_socket_init(&udx, &send_sock, on_socket_close); assert(e == 0); struct sockaddr_in send_addr; diff --git a/test/stream-write-read.c b/test/stream-write-read.c index 1bbdc5d..27ac2de 100644 --- a/test/stream-write-read.c +++ b/test/stream-write-read.c @@ -29,8 +29,8 @@ on_close (udx_stream_t *s, int status) { nclosed++; if (nclosed == 2) { - udx_socket_close(&asock, NULL); - udx_socket_close(&bsock, NULL); + udx_socket_close(&asock); + udx_socket_close(&bsock); } } @@ -65,13 +65,13 @@ main () { uv_loop_init(&loop); - e = udx_init(&loop, &udx); + e = udx_init(&loop, &udx, NULL); assert(e == 0); - e = udx_socket_init(&udx, &asock); + e = udx_socket_init(&udx, &asock, NULL); assert(e == 0); - e = udx_socket_init(&udx, &bsock); + e = udx_socket_init(&udx, &bsock, NULL); assert(e == 0); struct sockaddr_in baddr;