Skip to content

Commit

Permalink
Error on end unconnected (#217)
Browse files Browse the repository at this point in the history
* return UV_ENOTCONN / UV_EPIPE on stream write or write_end if not connected or already ended

* UV_EBUSY on socket close if streams are still open
  • Loading branch information
jthomas43 authored Oct 17, 2024
1 parent e927f3a commit b1a2583
Show file tree
Hide file tree
Showing 2 changed files with 47 additions and 55 deletions.
11 changes: 3 additions & 8 deletions include/udx.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,9 @@ extern "C" {
#define UDX_MAGIC_BYTE 255
#define UDX_VERSION 1

#define UDX_SOCKET_RECEIVING 0b0001
#define UDX_SOCKET_BOUND 0b0010
#define UDX_SOCKET_CLOSING 0b0100
#define UDX_SOCKET_CLOSING_HANDLES 0b1000
#define UDX_SOCKET_RECEIVING 0b0001
#define UDX_SOCKET_BOUND 0b0010
#define UDX_SOCKET_CLOSED 0b0100

#define UDX_STREAM_CONNECTED 0b000000001
#define UDX_STREAM_RECEIVING 0b000000010
Expand Down Expand Up @@ -476,10 +475,6 @@ udx_stream_read_start (udx_stream_t *stream, udx_stream_read_cb cb);
int
udx_stream_read_stop (udx_stream_t *stream);

// only exposed here as a convenience / debug tool - the udx instance uses this automatically
int
udx_stream_check_timeouts (udx_stream_t *stream);

int
udx_stream_send (udx_stream_send_t *req, udx_stream_t *stream, const uv_buf_t bufs[], unsigned int bufs_len, udx_stream_send_cb cb);

Expand Down
91 changes: 44 additions & 47 deletions src/udx.c
Original file line number Diff line number Diff line change
Expand Up @@ -176,25 +176,6 @@ on_uv_close (uv_handle_t *handle) {
trigger_socket_close((udx_socket_t *) handle->data);
}

void
udx__close_handles (udx_socket_t *socket) {
if (socket->status & UDX_SOCKET_CLOSING_HANDLES) return;
socket->status |= UDX_SOCKET_CLOSING_HANDLES;

if (socket->status & UDX_SOCKET_BOUND) {
socket->pending_closes++;
uv_poll_stop(&(socket->io_poll));
uv_close((uv_handle_t *) &(socket->io_poll), on_uv_close);
}

socket->pending_closes++;
uv_close((uv_handle_t *) &(socket->handle), on_uv_close);

udx_t *udx = socket->udx;

udx->sockets--;
}

static bool
stream_write_wanted (udx_stream_t *stream) {
if (!(stream->status & UDX_STREAM_CONNECTED)) {
Expand Down Expand Up @@ -232,7 +213,7 @@ socket_write_wanted (udx_socket_t *socket) {

static int
update_poll (udx_socket_t *socket) {
if (socket->status & UDX_SOCKET_CLOSING_HANDLES) {
if (socket->status & UDX_SOCKET_CLOSED) {
assert(!uv_is_active((uv_handle_t *) &socket->io_poll));
return 0;
}
Expand Down Expand Up @@ -414,7 +395,6 @@ on_bytes_acked (udx_stream_write_buf_t *wbuf, size_t bytes, bool cancelled) {

static void
clear_outgoing_packets (udx_stream_t *stream) {
debug_printf("close: clearing outgoing packets\n");

// todo: skip the math, and just
// 1. destroy all packets
Expand Down Expand Up @@ -450,8 +430,6 @@ clear_outgoing_packets (udx_stream_t *stream) {
free(pkt);
}

debug_printf("close: cancelling queued writes, queue.len=%u\n", stream->write_queue.len);

while (stream->write_queue.len > 0) {
udx_stream_write_buf_t *wbuf = udx__queue_data(udx__queue_shift(&stream->write_queue), udx_stream_write_buf_t, queue);
assert(wbuf != NULL);
Expand Down Expand Up @@ -1428,16 +1406,10 @@ process_packet (udx_socket_t *socket, char *buf, ssize_t buf_len, struct sockadd
}

static bool
check_if_streams_have_data (udx_socket_t *socket) {
check_for_streams (udx_socket_t *socket) {
for (uint32_t i = 0; i < socket->udx->streams_len; i++) {
udx_stream_t *stream = socket->udx->streams[i];
if (stream->socket == socket) {
if (stream->status & UDX_STREAM_DEAD) {
if (stream->write_wanted & (UDX_STREAM_WRITE_WANT_DESTROY | UDX_STREAM_WRITE_WANT_STATE)) return true;
} else {
if (stream->write_wanted || stream->write_queue.len > 0 || stream->retransmit_queue.len > 0 || stream->unordered_queue.len > 0) return true;
}
}
if (stream->socket == socket) return true;
}
return false;
}
Expand Down Expand Up @@ -1485,7 +1457,7 @@ send_packet (udx_socket_t *socket, udx_packet_t *pkt) {

static bool
send_datagrams (udx_socket_t *socket) {
assert((socket->status & UDX_SOCKET_CLOSING_HANDLES) == 0);
assert((socket->status & UDX_SOCKET_CLOSED) == 0);
while (socket->send_queue.len > 0) {
udx_packet_t *pkt = udx__queue_data(udx__queue_peek(&socket->send_queue), udx_packet_t, queue);
ssize_t rc = send_packet(socket, pkt);
Expand All @@ -1500,7 +1472,7 @@ send_datagrams (udx_socket_t *socket) {
req->on_send(req, 0);
}
// edge case: user calls both udx_socket_close (draining queue) and udx_socket_send (adding to queue)
if (socket->status & UDX_SOCKET_CLOSING_HANDLES) {
if (socket->status & UDX_SOCKET_CLOSED) {
return false;
}
}
Expand Down Expand Up @@ -1870,7 +1842,7 @@ on_uv_poll (uv_poll_t *handle, int status, int events) {
buf.base = (char *) &b;
buf.len = 2048;

while (!(socket->status & UDX_SOCKET_CLOSING_HANDLES) && (size = udx__recvmsg(socket, &buf, (struct sockaddr *) &addr, addr_len)) >= 0) {
while (!(socket->status & UDX_SOCKET_CLOSED) && (size = udx__recvmsg(socket, &buf, (struct sockaddr *) &addr, addr_len)) >= 0) {
if (!process_packet(socket, b, size, (struct sockaddr *) &addr) && socket->on_recv != NULL) {
buf.len = size;

Expand All @@ -1885,17 +1857,13 @@ on_uv_poll (uv_poll_t *handle, int status, int events) {
}
}

if (events & UV_WRITABLE && !(socket->status & UDX_SOCKET_CLOSING_HANDLES)) {
if (events & UV_WRITABLE && !(socket->status & UDX_SOCKET_CLOSED)) {
if (events & UV_READABLE) {
// compensate for potentially long-running read callbacks
uv_update_time(handle->loop);
}

send_packets(socket);

if (socket->status & UDX_SOCKET_CLOSING && socket->send_queue.len == 0 && !check_if_streams_have_data(socket)) {
udx__close_handles(socket);
}
}

update_poll(socket);
Expand Down Expand Up @@ -2110,15 +2078,12 @@ udx_socket_recv_stop (udx_socket_t *socket) {

int
udx_socket_close (udx_socket_t *socket, udx_socket_close_cb cb) {
// if (socket->streams_len > 0) return UV_EBUSY;
if (check_for_streams(socket)) return UV_EBUSY;

socket->status |= UDX_SOCKET_CLOSING;
socket->status |= UDX_SOCKET_CLOSED;

socket->on_close = cb;

// allow stream packets to flush, but cancel anything else
// todo: drop all relay packets

while (socket->send_queue.len > 0) {
udx_packet_t *pkt = udx__queue_data(udx__queue_shift(&socket->send_queue), udx_packet_t, queue);
assert(pkt != NULL);
Expand All @@ -2130,10 +2095,18 @@ udx_socket_close (udx_socket_t *socket, udx_socket_close_cb cb) {
}
}

if (!check_if_streams_have_data(socket)) {
udx__close_handles(socket);
if (socket->status & UDX_SOCKET_BOUND) {
socket->pending_closes++;
uv_poll_stop(&(socket->io_poll));
uv_close((uv_handle_t *) &(socket->io_poll), on_uv_close);
}

socket->pending_closes++;
uv_close((uv_handle_t *) &(socket->handle), on_uv_close);

udx_t *udx = socket->udx;
udx->sockets--;

return 0;
}

Expand Down Expand Up @@ -2450,6 +2423,11 @@ udx_stream_relay_to (udx_stream_t *stream, udx_stream_t *destination) {

int
udx_stream_send (udx_stream_send_t *req, udx_stream_t *stream, const uv_buf_t bufs[], unsigned int bufs_len, udx_stream_send_cb cb) {

if (!(stream->status & UDX_STREAM_CONNECTED)) {
return UV_ENOTCONN;
}

assert(bufs_len == 1);

req->stream = stream;
Expand Down Expand Up @@ -2512,7 +2490,17 @@ _udx_stream_write (udx_stream_write_t *write, udx_stream_t *stream, const uv_buf

int
udx_stream_write (udx_stream_write_t *req, udx_stream_t *stream, const uv_buf_t bufs[], unsigned int bufs_len, udx_stream_ack_cb ack_cb) {
assert(bufs_len > 0);
if (!(stream->status & UDX_STREAM_CONNECTED)) {
return UV_ENOTCONN;
}

if (stream->status & UDX_STREAM_ENDING) {
return UV_EPIPE;
}

if (bufs_len == 0) {
return UV_EINVAL;
}

req->nwbufs = bufs_len;

Expand All @@ -2531,6 +2519,15 @@ udx_stream_write (udx_stream_write_t *req, udx_stream_t *stream, const uv_buf_t

int
udx_stream_write_end (udx_stream_write_t *req, udx_stream_t *stream, const uv_buf_t bufs[], unsigned int bufs_len, udx_stream_ack_cb ack_cb) {

if (!(stream->status & UDX_STREAM_CONNECTED)) {
return UV_ENOTCONN;
}

if (stream->status & UDX_STREAM_ENDING) {
return UV_EPIPE;
}

stream->status |= UDX_STREAM_ENDING;

if (bufs_len > 0) {
Expand Down

0 comments on commit b1a2583

Please sign in to comment.