Skip to content

Commit

Permalink
Connection shutdown with buffered data (#482)
Browse files Browse the repository at this point in the history
  • Loading branch information
TingDaoK authored Aug 7, 2024
1 parent b5684c7 commit 7db2452
Show file tree
Hide file tree
Showing 4 changed files with 295 additions and 10 deletions.
10 changes: 9 additions & 1 deletion include/aws/http/private/h1_connection.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,12 @@
# pragma warning(disable : 4214) /* nonstandard extension used: bit field types other than int */
#endif

enum aws_h1_connection_read_state {
AWS_CONNECTION_READ_OPEN,
AWS_CONNECTION_READ_SHUTTING_DOWN,
AWS_CONNECTION_READ_SHUT_DOWN_COMPLETE,
};

struct aws_h1_connection {
struct aws_http_connection base;

Expand Down Expand Up @@ -96,8 +102,10 @@ struct aws_h1_connection {
uint64_t outgoing_stream_timestamp_ns;
uint64_t incoming_stream_timestamp_ns;

int pending_shutdown_error_code;
enum aws_h1_connection_read_state read_state;

/* True when read and/or writing has stopped, whether due to errors or normal channel shutdown. */
bool is_reading_stopped : 1;
bool is_writing_stopped : 1;

/* If true, the connection has upgraded to another protocol.
Expand Down
77 changes: 68 additions & 9 deletions source/h1_connection.c
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,17 @@ static void s_stop(
AWS_ASSERT(stop_reading || stop_writing || schedule_shutdown); /* You are required to stop at least 1 thing */

if (stop_reading) {
connection->thread_data.is_reading_stopped = true;
if (connection->thread_data.read_state == AWS_CONNECTION_READ_OPEN) {
connection->thread_data.read_state = AWS_CONNECTION_READ_SHUT_DOWN_COMPLETE;
} else if (connection->thread_data.read_state == AWS_CONNECTION_READ_SHUTTING_DOWN) {
/* Shutdown after pending */
if (connection->thread_data.pending_shutdown_error_code != 0) {
error_code = connection->thread_data.pending_shutdown_error_code;
}
connection->thread_data.read_state = AWS_CONNECTION_READ_SHUT_DOWN_COMPLETE;
aws_channel_slot_on_handler_shutdown_complete(
connection->base.channel_slot, AWS_CHANNEL_DIR_READ, error_code, false);
}
}

if (stop_writing) {
Expand All @@ -167,6 +177,7 @@ static void s_stop(
aws_error_name(error_code));

aws_channel_shutdown(connection->base.channel_slot->channel, error_code);

if (stop_reading) {
/* Increase the window size after shutdown starts, to prevent deadlock when data still pending in the TLS
* handler. */
Expand Down Expand Up @@ -324,7 +335,7 @@ static size_t s_calculate_stream_mode_desired_connection_window(struct aws_h1_co
static int s_update_connection_window(struct aws_h1_connection *connection) {
AWS_ASSERT(aws_channel_thread_is_callers_thread(connection->base.channel_slot->channel));

if (connection->thread_data.is_reading_stopped) {
if (connection->thread_data.read_state == AWS_CONNECTION_READ_SHUT_DOWN_COMPLETE) {
return AWS_OP_SUCCESS;
}

Expand Down Expand Up @@ -778,7 +789,7 @@ static void s_set_incoming_stream_ptr(
static void s_client_update_incoming_stream_ptr(struct aws_h1_connection *connection) {
struct aws_linked_list *list = &connection->thread_data.stream_list;
struct aws_h1_stream *desired;
if (connection->thread_data.is_reading_stopped) {
if (connection->thread_data.read_state == AWS_CONNECTION_READ_SHUT_DOWN_COMPLETE) {
desired = NULL;
} else if (aws_linked_list_empty(list)) {
desired = NULL;
Expand Down Expand Up @@ -1663,7 +1674,7 @@ static void s_handler_installed(struct aws_channel_handler *handler, struct aws_
static int s_try_process_next_midchannel_read_message(struct aws_h1_connection *connection, bool *out_stop_processing) {
AWS_ASSERT(aws_channel_thread_is_callers_thread(connection->base.channel_slot->channel));
AWS_ASSERT(connection->thread_data.has_switched_protocols);
AWS_ASSERT(!connection->thread_data.is_reading_stopped);
AWS_ASSERT(connection->thread_data.read_state != AWS_CONNECTION_READ_SHUT_DOWN_COMPLETE);
AWS_ASSERT(!aws_linked_list_empty(&connection->thread_data.read_buffer.messages));

*out_stop_processing = false;
Expand Down Expand Up @@ -1839,7 +1850,7 @@ static int s_handler_process_read_message(

AWS_LOGF_TRACE(
AWS_LS_HTTP_CONNECTION, "id=%p: Incoming message of size %zu.", (void *)&connection->base, message_size);
if (connection->thread_data.is_reading_stopped) {
if (connection->thread_data.read_state == AWS_CONNECTION_READ_SHUT_DOWN_COMPLETE) {
/* Read has stopped, ignore the data, shutdown the channel incase it has not started yet. */
aws_mem_release(message->allocator, message); /* Release the message as we return success. */
s_shutdown_due_to_error(connection, AWS_ERROR_HTTP_CONNECTION_CLOSED);
Expand Down Expand Up @@ -1868,7 +1879,7 @@ static int s_handler_process_read_message(
}

void aws_h1_connection_try_process_read_messages(struct aws_h1_connection *connection) {

int error_code = 0;
/* Protect against this function being called recursively. */
if (connection->thread_data.is_processing_read_messages) {
return;
Expand All @@ -1877,7 +1888,7 @@ void aws_h1_connection_try_process_read_messages(struct aws_h1_connection *conne

/* Process queued messages */
while (!aws_linked_list_empty(&connection->thread_data.read_buffer.messages)) {
if (connection->thread_data.is_reading_stopped) {
if (connection->thread_data.read_state == AWS_CONNECTION_READ_SHUT_DOWN_COMPLETE) {
AWS_LOGF_ERROR(
AWS_LS_HTTP_CONNECTION,
"id=%p: Cannot process message because connection is shutting down.",
Expand Down Expand Up @@ -1908,6 +1919,13 @@ void aws_h1_connection_try_process_read_messages(struct aws_h1_connection *conne
}
}

if (connection->thread_data.read_state == AWS_CONNECTION_READ_SHUTTING_DOWN &&
connection->thread_data.read_buffer.pending_bytes == 0) {
/* Done processing the pending buffer. */
aws_raise_error(connection->thread_data.pending_shutdown_error_code);
goto shutdown;
}

/* Increment connection window, if necessary */
if (s_update_connection_window(connection)) {
goto shutdown;
Expand All @@ -1917,15 +1935,25 @@ void aws_h1_connection_try_process_read_messages(struct aws_h1_connection *conne
return;

shutdown:
s_shutdown_due_to_error(connection, aws_last_error());
error_code = aws_last_error();
if (connection->thread_data.read_state == AWS_CONNECTION_READ_SHUTTING_DOWN &&
connection->thread_data.pending_shutdown_error_code != 0) {
error_code = connection->thread_data.pending_shutdown_error_code;
}
if (error_code == 0) {
/* Graceful shutdown, don't stop writing yet. */
s_stop(connection, true /*stop_reading*/, false /*stop_writing*/, true /*schedule_shutdown*/, error_code);
} else {
s_shutdown_due_to_error(connection, aws_last_error());
}
}

/* Try to process the next queued aws_io_message as normal HTTP data for an aws_http_stream.
* This MUST NOT be called if the connection has switched protocols and become a midchannel handler. */
static int s_try_process_next_stream_read_message(struct aws_h1_connection *connection, bool *out_stop_processing) {
AWS_ASSERT(aws_channel_thread_is_callers_thread(connection->base.channel_slot->channel));
AWS_ASSERT(!connection->thread_data.has_switched_protocols);
AWS_ASSERT(!connection->thread_data.is_reading_stopped);
AWS_ASSERT(connection->thread_data.read_state != AWS_CONNECTION_READ_SHUT_DOWN_COMPLETE);
AWS_ASSERT(!aws_linked_list_empty(&connection->thread_data.read_buffer.messages));

*out_stop_processing = false;
Expand Down Expand Up @@ -2122,6 +2150,31 @@ static int s_handler_increment_read_window(
return AWS_OP_SUCCESS;
}

static void s_initialize_read_delay_shutdown(struct aws_h1_connection *connection, int error_code) {

AWS_LOGF_DEBUG(
AWS_LS_HTTP_CONNECTION,
"id=%p: Connection still have pending data to be delivered during shutdown. Wait until downstream "
"reads the data.",
(void *)&connection->base);

AWS_LOGF_TRACE(
AWS_LS_HTTP_CONNECTION,
"id=%p: Current window stats: connection=%zu, stream=%" PRIu64 " buffer=%zu/%zu",
(void *)&connection->base,
connection->thread_data.connection_window,
connection->thread_data.incoming_stream ? connection->thread_data.incoming_stream->thread_data.stream_window
: 0,
connection->thread_data.read_buffer.pending_bytes,
connection->thread_data.read_buffer.capacity);

/* Still have data buffered in connection, wait for it to be processed */
connection->thread_data.read_state = AWS_CONNECTION_READ_SHUTTING_DOWN;
connection->thread_data.pending_shutdown_error_code = error_code;
/* Try to process messages in queue */
aws_h1_connection_try_process_read_messages(connection);
}

static int s_handler_shutdown(
struct aws_channel_handler *handler,
struct aws_channel_slot *slot,
Expand All @@ -2142,6 +2195,12 @@ static int s_handler_shutdown(

if (dir == AWS_CHANNEL_DIR_READ) {
/* This call ensures that no further streams will be created or worked on. */
if (!free_scarce_resources_immediately && connection->thread_data.read_state == AWS_CONNECTION_READ_OPEN &&
connection->thread_data.read_buffer.pending_bytes > 0) {
s_initialize_read_delay_shutdown(connection, error_code);
/* Return success, and wait for the buffered data to be processed to propagate the shutdown. */
return AWS_OP_SUCCESS;
}
s_stop(connection, true /*stop_reading*/, false /*stop_writing*/, false /*schedule_shutdown*/, error_code);
} else /* dir == AWS_CHANNEL_DIR_WRITE */ {

Expand Down
4 changes: 4 additions & 0 deletions tests/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,10 @@ add_test_case(h1_client_stream_cancel)
add_test_case(h1_client_response_close_connection_before_request_finishes)
add_test_case(h1_client_response_first_byte_timeout_connection)
add_test_case(h1_client_response_first_byte_timeout_request_override)
add_test_case(h1_client_connection_close_before_request_finishes_with_buffer)
add_test_case(h1_client_connection_close_before_request_finishes_with_buffer_incomplete_response)
add_test_case(h1_client_connection_close_before_request_finishes_with_buffer_force_shutdown)
add_test_case(h1_client_connection_close_before_request_finishes_with_buffer_stream_cancel)

add_test_case(strutil_trim_http_whitespace)
add_test_case(strutil_is_http_token)
Expand Down
Loading

0 comments on commit 7db2452

Please sign in to comment.