diff --git a/CMakeLists.txt b/CMakeLists.txt index bc1a3d6..ae9e52d 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -34,6 +34,7 @@ target_sources( src/endian.c src/queue.c src/queue.h + src/link.h src/io.h src/udx.c ) diff --git a/include/udx.h b/include/udx.h index c5c0afb..0948fec 100644 --- a/include/udx.h +++ b/include/udx.h @@ -107,16 +107,12 @@ typedef void (*udx_interface_event_close_cb)(udx_interface_event_t *handle); struct udx_s { uv_loop_t *loop; - uint32_t refs; + int refs; udx_idle_cb on_idle; - uint32_t sockets_len; - uint32_t sockets_max_len; - udx_socket_t **sockets; - - uint32_t streams_len; - uint32_t streams_max_len; - udx_stream_t **streams; + udx_socket_t *sockets; + udx_stream_t *streams; + udx_interface_event_t *listeners; udx_cirbuf_t streams_by_id; @@ -145,11 +141,12 @@ struct udx_socket_s { udx_queue_t send_queue; + udx_socket_t *prev; + udx_socket_t *next; + udx_t *udx; udx_cirbuf_t *streams_by_id; // for convenience - int set_id; - bool cmsg_wanted; // include a control buffer for recvmsg int family; int status; @@ -193,7 +190,9 @@ struct udx_stream_s { uint32_t local_id; // must be first entry, so its compat with the cirbuf uint32_t remote_id; - int set_id; + udx_stream_t *prev; + udx_stream_t *next; + int status; int write_wanted; int out_of_order; @@ -392,6 +391,9 @@ struct udx_interface_event_s { uv_loop_t *loop; udx_t *udx; + udx_interface_event_t *prev; + udx_interface_event_t *next; + uv_interface_address_t *addrs; int addrs_len; bool sorted; diff --git a/src/link.h b/src/link.h new file mode 100644 index 0000000..48743a3 --- /dev/null +++ b/src/link.h @@ -0,0 +1,25 @@ +#ifndef UDX_LINK_H +#define UDX_LINK_H + +#define udx__link_add(l, v) \ + if ((l) == NULL) { \ + (v)->next = (v)->prev = NULL; \ + } else { \ + (v)->prev = NULL; \ + (v)->next = (l); \ + (l)->prev = (v); \ + } \ + (l) = (v); + +#define udx__link_remove(l, v) \ + if ((v) == (l)) { \ + (l) = NULL; \ + } else { \ + if ((v)->next != NULL) (v)->next->prev = (v)->prev; \ + if ((v)->prev != NULL) (v)->prev->next = (v)->next; \ + } + +#define udx__link_foreach(l, el) \ + for ((el) = (l); (el) != NULL; (el) = (el)->next) + +#endif diff --git a/src/udx.c b/src/udx.c index a96759d..c1ed140 100644 --- a/src/udx.c +++ b/src/udx.c @@ -15,6 +15,7 @@ #include "endian.h" #include "io.h" #include "queue.h" +#include "link.h" #define UDX_STREAM_ALL_ENDED (UDX_STREAM_ENDED | UDX_STREAM_ENDED_REMOTE) #define UDX_STREAM_DEAD (UDX_STREAM_DESTROYING | UDX_STREAM_CLOSED) @@ -33,8 +34,6 @@ #define UDX_DEFAULT_TTL 64 #define UDX_DEFAULT_BUFFER_SIZE 212992 -#define UDX_DEFAULT_SET_SIZE 16 - #define UDX_MAX_RTO_TIMEOUTS 6 #define UDX_CONG_C 400 // C=0.4 (inverse) in scaled 1000 @@ -56,6 +55,16 @@ #define UDX_BANDWIDTH_INTERVAL_SECS 10 +#define udx__set_add(s, v) \ + v->next = s->next; \ + s->next = v; \ + v->prev = s; \ + if (s->prev == s) s->prev = v; + +#define udx__set_remove(v) \ + v->next->prev = v->prev; \ + v->prev->next = v->next; + typedef struct { uint32_t seq; // must be the first entry, so its compat with the cirbuf @@ -159,17 +168,7 @@ ref_dec (udx_t *udx) { if (udx->refs) return; - if (udx->sockets != NULL) { - free(udx->sockets); - udx->sockets = NULL; - udx->sockets_max_len = 0; - } - - if (udx->streams != NULL) { - free(udx->streams); - udx->streams = NULL; - udx->streams_max_len = 0; - + if (udx->streams == NULL) { udx__cirbuf_destroy(&(udx->streams_by_id)); } @@ -223,8 +222,8 @@ socket_write_wanted (udx_socket_t *socket) { return true; } - for (uint32_t i = 0; i < socket->udx->streams_len; i++) { - udx_stream_t *stream = socket->udx->streams[i]; + udx_stream_t *stream; + udx__link_foreach(socket->udx->streams, stream) { if (stream->socket == socket && stream_write_wanted(stream)) { return true; } @@ -594,11 +593,7 @@ close_stream (udx_stream_t *stream, int err) { udx_t *udx = stream->udx; - // Remove from the set, by array[i] = array.pop() - udx_stream_t *other = udx->streams[--(udx->streams_len)]; - udx->streams[stream->set_id] = other; - other->set_id = stream->set_id; - + udx__link_remove(udx->streams, stream); udx__cirbuf_remove(&(udx->streams_by_id), stream->local_id); // stream on_close called before acks are cancelled! @@ -1449,8 +1444,8 @@ process_packet (udx_socket_t *socket, char *buf, ssize_t buf_len, struct sockadd static bool check_for_streams (udx_socket_t *socket) { - for (uint32_t i = 0; i < socket->udx->streams_len; i++) { - udx_stream_t *stream = socket->udx->streams[i]; + udx_stream_t *stream; + udx__link_foreach(socket->udx->streams, stream) { if (stream->socket == socket) return true; } return false; @@ -1889,21 +1884,12 @@ arm_stream_timers (udx_stream_t *stream, bool sent_tlp) { static void send_packets (udx_socket_t *socket) { - bool _continue = send_datagrams(socket); + if (!send_datagrams(socket)) return; - if (!_continue) return; - - for (uint32_t i = 0; i < socket->udx->streams_len;) { - udx_stream_t *stream = socket->udx->streams[i]; - if (stream->socket == socket) { - _continue = send_stream_packets(socket, stream); - - if (!_continue) return; - } - // if stream was closed, a new stream shuffled into this slot - if (socket->udx->streams[i] == stream) { - i++; - } + udx_stream_t *stream; + udx__link_foreach(socket->udx->streams, stream) { + if (stream->socket != socket) continue; + if (!send_stream_packets(socket, stream)) return; } } @@ -1956,13 +1942,9 @@ udx_init (uv_loop_t *loop, udx_t *udx) { udx->refs = 0; udx->on_idle = NULL; - udx->sockets_len = 0; - udx->sockets_max_len = 0; udx->sockets = NULL; - - udx->streams_len = 0; - udx->streams_max_len = 0; udx->streams = NULL; + udx->listeners = NULL; udx->bytes_rx = 0; udx->bytes_tx = 0; @@ -1984,11 +1966,7 @@ int udx_socket_init (udx_t *udx, udx_socket_t *socket) { udx->refs++; - if (udx->sockets == NULL) { - udx->sockets_len = 0; - udx->sockets_max_len = UDX_DEFAULT_SET_SIZE; - udx->sockets = malloc(udx->sockets_max_len * sizeof(udx_socket_t *)); - } + udx__link_add(udx->sockets, socket); socket->family = 0; socket->status = 0; @@ -1999,15 +1977,6 @@ udx_socket_init (udx_t *udx, udx_socket_t *socket) { socket->udx = udx; socket->streams_by_id = &(udx->streams_by_id); - socket->set_id = udx->sockets_len++; - - if (udx->sockets_len == udx->sockets_max_len) { - udx->sockets_max_len *= 2; - udx->sockets = realloc(udx->sockets, udx->sockets_max_len * sizeof(udx_socket_t *)); - } - - udx->sockets[socket->set_id] = socket; - socket->on_recv = NULL; socket->on_close = NULL; @@ -2249,10 +2218,7 @@ udx_socket_close (udx_socket_t *socket, udx_socket_close_cb cb) { uv_close((uv_handle_t *) &(socket->handle), on_uv_close); udx_t *udx = socket->udx; - - udx_socket_t *other = udx->sockets[--(udx->sockets_len)]; - udx->sockets[socket->set_id] = other; - other->set_id = socket->set_id; + udx__link_remove(udx->sockets, socket); return 0; } @@ -2262,12 +2228,11 @@ udx_stream_init (udx_t *udx, udx_stream_t *stream, uint32_t local_id, udx_stream udx->refs++; if (udx->streams == NULL) { - udx->streams_len = 0; - udx->streams_max_len = UDX_DEFAULT_SET_SIZE; - udx->streams = malloc(udx->streams_max_len * sizeof(udx_stream_t *)); udx__cirbuf_init(&(udx->streams_by_id), 16); } + udx__link_add(udx->streams, stream); + stream->local_id = local_id; stream->remote_id = 0; stream->status = 0; @@ -2376,17 +2341,7 @@ udx_stream_init (udx_t *udx, udx_stream_t *stream, uint32_t local_id, udx_stream udx__queue_init(&stream->inflight_queue); udx__queue_init(&stream->retransmit_queue); - stream->set_id = udx->streams_len++; - - if (udx->streams_len == udx->streams_max_len) { - udx->streams_max_len *= 2; - udx->streams = realloc(udx->streams, udx->streams_max_len * sizeof(udx_stream_t *)); - } - - udx->streams[stream->set_id] = stream; - // Add the socket to the active set - udx__cirbuf_set(&(udx->streams_by_id), (udx_cirbuf_val_t *) stream); return 0; @@ -2837,6 +2792,9 @@ static void on_interface_event_close (uv_handle_t *handle) { udx_interface_event_t *event = (udx_interface_event_t *) handle->data; + udx_t *udx = event->udx; + udx__link_remove(udx->listeners, event); + if (event->on_close != NULL) { event->on_close(event); } @@ -2850,8 +2808,6 @@ udx_interface_event_init (udx_t *udx, udx_interface_event_t *handle) { handle->loop = udx->loop; handle->sorted = false; - udx->refs++; - int err = uv_interface_addresses(&(handle->addrs), &(handle->addrs_len)); if (err < 0) return err; @@ -2860,6 +2816,9 @@ udx_interface_event_init (udx_t *udx, udx_interface_event_t *handle) { handle->timer.data = handle; + udx->refs++; + udx__set_add(udx->listeners, handle); + return 0; }