Skip to content

Commit

Permalink
close stream immediately on receive path when end is acked (#215)
Browse files Browse the repository at this point in the history
  • Loading branch information
jthomas43 authored Oct 11, 2024
1 parent be1991b commit e927f3a
Show file tree
Hide file tree
Showing 2 changed files with 45 additions and 6 deletions.
5 changes: 5 additions & 0 deletions src/udx.c
Original file line number Diff line number Diff line change
Expand Up @@ -1364,6 +1364,11 @@ process_packet (udx_socket_t *socket, char *buf, ssize_t buf_len, struct sockadd
// TODO: make this work as well, if the ack packet is lost, ie
// have some internal (capped) queue of "gracefully closed" streams (TIME_WAIT)

if ((stream->status & UDX_STREAM_ALL_ENDED) == UDX_STREAM_ALL_ENDED) {
close_stream(stream, 0);
return 1;
}

if (stream->remote_acked == stream->seq) {
uv_timer_stop(&stream->rto_timer);
uv_timer_stop(&stream->tlp_timer);
Expand Down
46 changes: 40 additions & 6 deletions test/stream-write-read.c
Original file line number Diff line number Diff line change
Expand Up @@ -18,19 +18,38 @@ udx_stream_write_t *req;

bool ack_called = false;
bool read_called = false;
bool eof_received = false;

int nclosed;

void
on_close (udx_stream_t *s, int status) {
assert(status == 0);

nclosed++;

if (nclosed == 2) {
udx_socket_close(&asock, NULL);
udx_socket_close(&bsock, NULL);
}
}

void
on_ack (udx_stream_write_t *req, int status, int unordered) {
assert(status == 0);
assert(unordered == 0);

uv_stop(&loop);

ack_called = true;
}

void
on_read (udx_stream_t *handle, ssize_t read_len, const uv_buf_t *buf) {

if (read_len == UV_EOF) {
eof_received = true;
return;
}

assert(buf->len == 5);
assert(buf->len == read_len);
assert(memcmp(buf->base, "hello", 5) == 0);
Expand Down Expand Up @@ -65,10 +84,10 @@ main () {
e = udx_socket_bind(&asock, (struct sockaddr *) &aaddr, 0);
assert(e == 0);

e = udx_stream_init(&udx, &astream, 1, NULL, NULL);
e = udx_stream_init(&udx, &astream, 1, on_close, NULL);
assert(e == 0);

e = udx_stream_init(&udx, &bstream, 2, NULL, NULL);
e = udx_stream_init(&udx, &bstream, 2, on_close, NULL);
assert(e == 0);

e = udx_stream_connect(&astream, &asock, 2, (struct sockaddr *) &baddr);
Expand All @@ -84,9 +103,24 @@ main () {
e = udx_stream_write(req, &bstream, &buf, 1, on_ack);
assert(e && "drained");

uv_run(&loop, UV_RUN_DEFAULT);
udx_stream_write_t *end_request_a = malloc(udx_stream_write_sizeof(1));
udx_stream_write_t *end_request_b = malloc(udx_stream_write_sizeof(1));

e = udx_stream_write_end(end_request_a, &astream, NULL, 0, NULL);
assert(e);
e = udx_stream_write_end(end_request_b, &bstream, NULL, 0, NULL);
assert(e);

e = uv_run(&loop, UV_RUN_DEFAULT);
assert(e == 0);
e = uv_loop_close(&loop);
assert(e == 0);

free(end_request_a);
free(end_request_b);
free(req);

assert(ack_called && read_called);
assert(ack_called && read_called && eof_received);

return 0;
}

0 comments on commit e927f3a

Please sign in to comment.