Skip to content

Commit

Permalink
remove set arrays and just use a linked list
Browse files Browse the repository at this point in the history
  • Loading branch information
mafintosh committed Dec 9, 2024
1 parent c922b1c commit 5081f96
Show file tree
Hide file tree
Showing 4 changed files with 72 additions and 85 deletions.
1 change: 1 addition & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ target_sources(
src/endian.c
src/queue.c
src/queue.h
src/link.h
src/io.h
src/udx.c
)
Expand Down
24 changes: 13 additions & 11 deletions include/udx.h
Original file line number Diff line number Diff line change
Expand Up @@ -107,16 +107,12 @@ typedef void (*udx_interface_event_close_cb)(udx_interface_event_t *handle);
struct udx_s {
uv_loop_t *loop;

uint32_t refs;
int refs;
udx_idle_cb on_idle;

uint32_t sockets_len;
uint32_t sockets_max_len;
udx_socket_t **sockets;

uint32_t streams_len;
uint32_t streams_max_len;
udx_stream_t **streams;
udx_socket_t *sockets;
udx_stream_t *streams;
udx_interface_event_t *listeners;

udx_cirbuf_t streams_by_id;

Expand Down Expand Up @@ -145,11 +141,12 @@ struct udx_socket_s {

udx_queue_t send_queue;

udx_socket_t *prev;
udx_socket_t *next;

udx_t *udx;
udx_cirbuf_t *streams_by_id; // for convenience

int set_id;

bool cmsg_wanted; // include a control buffer for recvmsg
int family;
int status;
Expand Down Expand Up @@ -193,7 +190,9 @@ struct udx_stream_s {
uint32_t local_id; // must be first entry, so its compat with the cirbuf
uint32_t remote_id;

int set_id;
udx_stream_t *prev;
udx_stream_t *next;

int status;
int write_wanted;
int out_of_order;
Expand Down Expand Up @@ -392,6 +391,9 @@ struct udx_interface_event_s {
uv_loop_t *loop;
udx_t *udx;

udx_interface_event_t *prev;
udx_interface_event_t *next;

uv_interface_address_t *addrs;
int addrs_len;
bool sorted;
Expand Down
25 changes: 25 additions & 0 deletions src/link.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
#ifndef UDX_LINK_H
#define UDX_LINK_H

#define udx__link_add(l, v) \
if ((l) == NULL) { \
(v)->next = (v)->prev = NULL; \
} else { \
(v)->prev = NULL; \
(v)->next = (l); \
(l)->prev = (v); \
} \
(l) = (v);

#define udx__link_remove(l, v) \
if ((v) == (l)) { \
(l) = NULL; \
} else { \
if ((v)->next != NULL) (v)->next->prev = (v)->prev; \
if ((v)->prev != NULL) (v)->prev->next = (v)->next; \
}

#define udx__link_foreach(l, el) \
for ((el) = (l); (el) != NULL; (el) = (el)->next)

#endif
107 changes: 33 additions & 74 deletions src/udx.c
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
#include "endian.h"
#include "io.h"
#include "queue.h"
#include "link.h"

#define UDX_STREAM_ALL_ENDED (UDX_STREAM_ENDED | UDX_STREAM_ENDED_REMOTE)
#define UDX_STREAM_DEAD (UDX_STREAM_DESTROYING | UDX_STREAM_CLOSED)
Expand All @@ -33,8 +34,6 @@
#define UDX_DEFAULT_TTL 64
#define UDX_DEFAULT_BUFFER_SIZE 212992

#define UDX_DEFAULT_SET_SIZE 16

#define UDX_MAX_RTO_TIMEOUTS 6

#define UDX_CONG_C 400 // C=0.4 (inverse) in scaled 1000
Expand All @@ -56,6 +55,16 @@

#define UDX_BANDWIDTH_INTERVAL_SECS 10

#define udx__set_add(s, v) \
v->next = s->next; \
s->next = v; \
v->prev = s; \
if (s->prev == s) s->prev = v;

#define udx__set_remove(v) \
v->next->prev = v->prev; \
v->prev->next = v->next;

typedef struct {
uint32_t seq; // must be the first entry, so its compat with the cirbuf

Expand Down Expand Up @@ -159,17 +168,7 @@ ref_dec (udx_t *udx) {

if (udx->refs) return;

if (udx->sockets != NULL) {
free(udx->sockets);
udx->sockets = NULL;
udx->sockets_max_len = 0;
}

if (udx->streams != NULL) {
free(udx->streams);
udx->streams = NULL;
udx->streams_max_len = 0;

if (udx->streams == NULL) {
udx__cirbuf_destroy(&(udx->streams_by_id));
}

Expand Down Expand Up @@ -223,8 +222,8 @@ socket_write_wanted (udx_socket_t *socket) {
return true;
}

for (uint32_t i = 0; i < socket->udx->streams_len; i++) {
udx_stream_t *stream = socket->udx->streams[i];
udx_stream_t *stream;
udx__link_foreach(socket->udx->streams, stream) {
if (stream->socket == socket && stream_write_wanted(stream)) {
return true;
}
Expand Down Expand Up @@ -594,11 +593,7 @@ close_stream (udx_stream_t *stream, int err) {

udx_t *udx = stream->udx;

// Remove from the set, by array[i] = array.pop()
udx_stream_t *other = udx->streams[--(udx->streams_len)];
udx->streams[stream->set_id] = other;
other->set_id = stream->set_id;

udx__link_remove(udx->streams, stream);
udx__cirbuf_remove(&(udx->streams_by_id), stream->local_id);

// stream on_close called before acks are cancelled!
Expand Down Expand Up @@ -1449,8 +1444,8 @@ process_packet (udx_socket_t *socket, char *buf, ssize_t buf_len, struct sockadd

static bool
check_for_streams (udx_socket_t *socket) {
for (uint32_t i = 0; i < socket->udx->streams_len; i++) {
udx_stream_t *stream = socket->udx->streams[i];
udx_stream_t *stream;
udx__link_foreach(socket->udx->streams, stream) {
if (stream->socket == socket) return true;
}
return false;
Expand Down Expand Up @@ -1889,21 +1884,12 @@ arm_stream_timers (udx_stream_t *stream, bool sent_tlp) {

static void
send_packets (udx_socket_t *socket) {
bool _continue = send_datagrams(socket);
if (!send_datagrams(socket)) return;

if (!_continue) return;

for (uint32_t i = 0; i < socket->udx->streams_len;) {
udx_stream_t *stream = socket->udx->streams[i];
if (stream->socket == socket) {
_continue = send_stream_packets(socket, stream);

if (!_continue) return;
}
// if stream was closed, a new stream shuffled into this slot
if (socket->udx->streams[i] == stream) {
i++;
}
udx_stream_t *stream;
udx__link_foreach(socket->udx->streams, stream) {
if (stream->socket != socket) continue;
if (!send_stream_packets(socket, stream)) return;
}
}

Expand Down Expand Up @@ -1956,13 +1942,9 @@ udx_init (uv_loop_t *loop, udx_t *udx) {
udx->refs = 0;
udx->on_idle = NULL;

udx->sockets_len = 0;
udx->sockets_max_len = 0;
udx->sockets = NULL;

udx->streams_len = 0;
udx->streams_max_len = 0;
udx->streams = NULL;
udx->listeners = NULL;

udx->bytes_rx = 0;
udx->bytes_tx = 0;
Expand All @@ -1984,11 +1966,7 @@ int
udx_socket_init (udx_t *udx, udx_socket_t *socket) {
udx->refs++;

if (udx->sockets == NULL) {
udx->sockets_len = 0;
udx->sockets_max_len = UDX_DEFAULT_SET_SIZE;
udx->sockets = malloc(udx->sockets_max_len * sizeof(udx_socket_t *));
}
udx__link_add(udx->sockets, socket);

socket->family = 0;
socket->status = 0;
Expand All @@ -1999,15 +1977,6 @@ udx_socket_init (udx_t *udx, udx_socket_t *socket) {
socket->udx = udx;
socket->streams_by_id = &(udx->streams_by_id);

socket->set_id = udx->sockets_len++;

if (udx->sockets_len == udx->sockets_max_len) {
udx->sockets_max_len *= 2;
udx->sockets = realloc(udx->sockets, udx->sockets_max_len * sizeof(udx_socket_t *));
}

udx->sockets[socket->set_id] = socket;

socket->on_recv = NULL;
socket->on_close = NULL;

Expand Down Expand Up @@ -2249,10 +2218,7 @@ udx_socket_close (udx_socket_t *socket, udx_socket_close_cb cb) {
uv_close((uv_handle_t *) &(socket->handle), on_uv_close);

udx_t *udx = socket->udx;

udx_socket_t *other = udx->sockets[--(udx->sockets_len)];
udx->sockets[socket->set_id] = other;
other->set_id = socket->set_id;
udx__link_remove(udx->sockets, socket);

return 0;
}
Expand All @@ -2262,12 +2228,11 @@ udx_stream_init (udx_t *udx, udx_stream_t *stream, uint32_t local_id, udx_stream
udx->refs++;

if (udx->streams == NULL) {
udx->streams_len = 0;
udx->streams_max_len = UDX_DEFAULT_SET_SIZE;
udx->streams = malloc(udx->streams_max_len * sizeof(udx_stream_t *));
udx__cirbuf_init(&(udx->streams_by_id), 16);
}

udx__link_add(udx->streams, stream);

stream->local_id = local_id;
stream->remote_id = 0;
stream->status = 0;
Expand Down Expand Up @@ -2376,17 +2341,7 @@ udx_stream_init (udx_t *udx, udx_stream_t *stream, uint32_t local_id, udx_stream
udx__queue_init(&stream->inflight_queue);
udx__queue_init(&stream->retransmit_queue);

stream->set_id = udx->streams_len++;

if (udx->streams_len == udx->streams_max_len) {
udx->streams_max_len *= 2;
udx->streams = realloc(udx->streams, udx->streams_max_len * sizeof(udx_stream_t *));
}

udx->streams[stream->set_id] = stream;

// Add the socket to the active set

udx__cirbuf_set(&(udx->streams_by_id), (udx_cirbuf_val_t *) stream);

return 0;
Expand Down Expand Up @@ -2837,6 +2792,9 @@ static void
on_interface_event_close (uv_handle_t *handle) {
udx_interface_event_t *event = (udx_interface_event_t *) handle->data;

udx_t *udx = event->udx;
udx__link_remove(udx->listeners, event);

if (event->on_close != NULL) {
event->on_close(event);
}
Expand All @@ -2850,8 +2808,6 @@ udx_interface_event_init (udx_t *udx, udx_interface_event_t *handle) {
handle->loop = udx->loop;
handle->sorted = false;

udx->refs++;

int err = uv_interface_addresses(&(handle->addrs), &(handle->addrs_len));
if (err < 0) return err;

Expand All @@ -2860,6 +2816,9 @@ udx_interface_event_init (udx_t *udx, udx_interface_event_t *handle) {

handle->timer.data = handle;

udx->refs++;
udx__set_add(udx->listeners, handle);

return 0;
}

Expand Down

0 comments on commit 5081f96

Please sign in to comment.