diff --git a/src/udx.c b/src/udx.c index d960b1e..1c52ce6 100644 --- a/src/udx.c +++ b/src/udx.c @@ -1364,6 +1364,11 @@ process_packet (udx_socket_t *socket, char *buf, ssize_t buf_len, struct sockadd // TODO: make this work as well, if the ack packet is lost, ie // have some internal (capped) queue of "gracefully closed" streams (TIME_WAIT) + if ((stream->status & UDX_STREAM_ALL_ENDED) == UDX_STREAM_ALL_ENDED) { + close_stream(stream, 0); + return 1; + } + if (stream->remote_acked == stream->seq) { uv_timer_stop(&stream->rto_timer); uv_timer_stop(&stream->tlp_timer); diff --git a/test/stream-write-read.c b/test/stream-write-read.c index 381a146..1bbdc5d 100644 --- a/test/stream-write-read.c +++ b/test/stream-write-read.c @@ -18,19 +18,38 @@ udx_stream_write_t *req; bool ack_called = false; bool read_called = false; +bool eof_received = false; + +int nclosed; + +void +on_close (udx_stream_t *s, int status) { + assert(status == 0); + + nclosed++; + + if (nclosed == 2) { + udx_socket_close(&asock, NULL); + udx_socket_close(&bsock, NULL); + } +} void on_ack (udx_stream_write_t *req, int status, int unordered) { assert(status == 0); assert(unordered == 0); - uv_stop(&loop); - ack_called = true; } void on_read (udx_stream_t *handle, ssize_t read_len, const uv_buf_t *buf) { + + if (read_len == UV_EOF) { + eof_received = true; + return; + } + assert(buf->len == 5); assert(buf->len == read_len); assert(memcmp(buf->base, "hello", 5) == 0); @@ -65,10 +84,10 @@ main () { e = udx_socket_bind(&asock, (struct sockaddr *) &aaddr, 0); assert(e == 0); - e = udx_stream_init(&udx, &astream, 1, NULL, NULL); + e = udx_stream_init(&udx, &astream, 1, on_close, NULL); assert(e == 0); - e = udx_stream_init(&udx, &bstream, 2, NULL, NULL); + e = udx_stream_init(&udx, &bstream, 2, on_close, NULL); assert(e == 0); e = udx_stream_connect(&astream, &asock, 2, (struct sockaddr *) &baddr); @@ -84,9 +103,24 @@ main () { e = udx_stream_write(req, &bstream, &buf, 1, on_ack); assert(e && "drained"); - uv_run(&loop, UV_RUN_DEFAULT); + udx_stream_write_t *end_request_a = malloc(udx_stream_write_sizeof(1)); + udx_stream_write_t *end_request_b = malloc(udx_stream_write_sizeof(1)); + + e = udx_stream_write_end(end_request_a, &astream, NULL, 0, NULL); + assert(e); + e = udx_stream_write_end(end_request_b, &bstream, NULL, 0, NULL); + assert(e); + + e = uv_run(&loop, UV_RUN_DEFAULT); + assert(e == 0); + e = uv_loop_close(&loop); + assert(e == 0); + + free(end_request_a); + free(end_request_b); + free(req); - assert(ack_called && read_called); + assert(ack_called && read_called && eof_received); return 0; }