From eb9a90aa73fe1caa7d24e3a8a3069c97bdae3f8b Mon Sep 17 00:00:00 2001 From: Bret Ambrose Date: Thu, 4 Aug 2022 13:37:34 -0700 Subject: [PATCH] Proposal for moving websocket frame write completions from on-send-downstream to on-socket-write-completion (#367) * Proposal for moving websocket frame write completions from on-send-downstream to on-socket-write-completion * Add some comments * Additional test verifying correct websocket frame write completion timing * Remove unnecessary completion callback --- source/websocket.c | 41 ++++++++++++++++++++++++- tests/CMakeLists.txt | 1 + tests/test_websocket_handler.c | 56 ++++++++++++++++++++++++++++++++++ 3 files changed, 97 insertions(+), 1 deletion(-) diff --git a/source/websocket.c b/source/websocket.c index e840608bf..458f3a53f 100644 --- a/source/websocket.c +++ b/source/websocket.c @@ -61,9 +61,19 @@ struct aws_websocket { /* Data that should only be accessed from the websocket's channel thread. */ struct { struct aws_websocket_encoder encoder; + + /* list of outbound frames that have yet to be encoded and sent to the socket */ struct aws_linked_list outgoing_frame_list; + + /* current outbound frame being encoded and sent to the socket */ struct outgoing_frame *current_outgoing_frame; + /* + * list of outbound frames that have been completely written to the io message heading to the socket. + * When the socket write completes we can in turn invoke completion callbacks for all of these frames + */ + struct aws_linked_list write_completion_frames; + struct aws_websocket_decoder decoder; struct aws_websocket_incoming_frame *current_incoming_frame; struct aws_websocket_incoming_frame incoming_frame_storage; @@ -167,6 +177,7 @@ static int s_decoder_on_user_payload(struct aws_websocket *websocket, struct aws static int s_decoder_on_midchannel_payload(struct aws_websocket *websocket, struct aws_byte_cursor data); static void s_destroy_outgoing_frame(struct aws_websocket *websocket, struct outgoing_frame *frame, int error_code); +static void s_complete_frame_list(struct aws_websocket *websocket, struct aws_linked_list *frames, int error_code); static void s_complete_incoming_frame(struct aws_websocket *websocket, int error_code, bool *out_callback_result); static void s_finish_shutdown(struct aws_websocket *websocket); static void s_io_message_write_completed( @@ -296,6 +307,7 @@ struct aws_websocket *aws_websocket_handler_new(const struct aws_websocket_handl aws_channel_task_init(&websocket->close_timeout_task, s_close_timeout_task, websocket, "websocket_close_timeout"); aws_linked_list_init(&websocket->thread_data.outgoing_frame_list); + aws_linked_list_init(&websocket->thread_data.write_completion_frames); aws_websocket_encoder_init(&websocket->thread_data.encoder, s_encoder_stream_outgoing_payload, websocket); @@ -697,7 +709,13 @@ static void s_try_write_outgoing_frames(struct aws_websocket *websocket) { wrote_close_frame = true; } - s_destroy_outgoing_frame(websocket, websocket->thread_data.current_outgoing_frame, AWS_ERROR_SUCCESS); + /* + * a completely-written frame gets added to the write completion list so that when the socket write completes + * we can complete all of the outbound frames that were finished as part of the io message + */ + aws_linked_list_push_back( + &websocket->thread_data.write_completion_frames, &websocket->thread_data.current_outgoing_frame->node); + websocket->thread_data.current_outgoing_frame = NULL; if (wrote_close_frame) { @@ -817,6 +835,12 @@ static void s_io_message_write_completed( struct aws_websocket *websocket = user_data; AWS_ASSERT(aws_channel_thread_is_callers_thread(channel)); + /* + * Invoke the completion callbacks (and then destroy) for all the frames that were completely written as + * part of this message completion at the socket layer + */ + s_complete_frame_list(websocket, &websocket->thread_data.write_completion_frames, err_code); + if (err_code == AWS_ERROR_SUCCESS) { AWS_LOGF_TRACE( AWS_LS_HTTP_WEBSOCKET, "id=%p: aws_io_message written to socket, sending more data...", (void *)websocket); @@ -914,6 +938,19 @@ static void s_destroy_outgoing_frame(struct aws_websocket *websocket, struct out aws_mem_release(websocket->alloc, frame); } +static void s_complete_frame_list(struct aws_websocket *websocket, struct aws_linked_list *frames, int error_code) { + struct aws_linked_list_node *node = aws_linked_list_begin(frames); + while (node != aws_linked_list_end(frames)) { + struct outgoing_frame *frame = AWS_CONTAINER_OF(node, struct outgoing_frame, node); + + node = aws_linked_list_next(node); + s_destroy_outgoing_frame(websocket, frame, error_code); + } + + /* we've released everything, so reset the list to empty */ + aws_linked_list_init(frames); +} + static void s_stop_writing(struct aws_websocket *websocket, int send_frame_error_code) { AWS_ASSERT(aws_channel_thread_is_callers_thread(websocket->channel_slot->channel)); AWS_ASSERT(send_frame_error_code != AWS_ERROR_SUCCESS); @@ -1196,6 +1233,8 @@ static void s_finish_shutdown(struct aws_websocket *websocket) { s_unlock_synced_data(websocket); /* END CRITICAL SECTION */ + s_complete_frame_list(websocket, &websocket->thread_data.write_completion_frames, AWS_ERROR_HTTP_CONNECTION_CLOSED); + while (!aws_linked_list_empty(&websocket->thread_data.outgoing_frame_list)) { struct aws_linked_list_node *node = aws_linked_list_pop_front(&websocket->thread_data.outgoing_frame_list); struct outgoing_frame *frame = AWS_CONTAINER_OF(node, struct outgoing_frame, node); diff --git a/tests/CMakeLists.txt b/tests/CMakeLists.txt index 4e07ce3b5..3259fa743 100644 --- a/tests/CMakeLists.txt +++ b/tests/CMakeLists.txt @@ -188,6 +188,7 @@ add_test_case(websocket_handler_send_high_priority_frame) add_test_case(websocket_handler_sends_nothing_after_close_frame) add_test_case(websocket_handler_send_frames_always_complete) add_test_case(websocket_handler_send_one_io_msg_at_a_time) +add_test_case(websocket_handler_delayed_write_completion) add_test_case(websocket_handler_send_halts_if_payload_fn_returns_false) add_test_case(websocket_handler_shutdown_automatically_sends_close_frame) add_test_case(websocket_handler_shutdown_handles_queued_close_frame) diff --git a/tests/test_websocket_handler.c b/tests/test_websocket_handler.c index e5c56746f..2d7c59f2a 100644 --- a/tests/test_websocket_handler.c +++ b/tests/test_websocket_handler.c @@ -1133,6 +1133,62 @@ TEST_CASE(websocket_handler_send_one_io_msg_at_a_time) { return AWS_OP_SUCCESS; } +/* + * Verifies that the write completion callbacks for websocket frames are not invoked immediately after relaying + * towards the left end (socket) of the channel + */ +TEST_CASE(websocket_handler_delayed_write_completion) { + (void)ctx; + struct tester tester; + ASSERT_SUCCESS(s_tester_init(&tester, allocator)); + + struct aws_byte_cursor payload = aws_byte_cursor_from_c_str("bitter butter."); + + const size_t count = 2; + struct send_tester *sending = aws_mem_acquire(allocator, sizeof(struct send_tester) * count); + ASSERT_NOT_NULL(sending); + memset(sending, 0, sizeof(struct send_tester) * count); + + for (size_t i = 0; i < count; ++i) { + struct send_tester *send = &sending[i]; + send->payload = payload; + send->def.opcode = AWS_WEBSOCKET_OPCODE_TEXT; + send->def.fin = true; + + ASSERT_SUCCESS(s_send_frame(&tester, send)); + } + + /* Turn off instant write completion and run the channel */ + testing_channel_complete_written_messages_immediately(&tester.testing_channel, false, AWS_OP_SUCCESS); + testing_channel_drain_queued_tasks(&tester.testing_channel); + + struct aws_linked_list *io_msgs = testing_channel_get_written_message_queue(&tester.testing_channel); + ASSERT_FALSE(aws_linked_list_empty(io_msgs)); + + struct aws_linked_list_node *node = aws_linked_list_pop_front(io_msgs); + struct aws_io_message *msg = AWS_CONTAINER_OF(node, struct aws_io_message, queueing_handle); + + /* we've relayed the frames, but no frame write completions should have been invoked */ + for (size_t i = 0; i < count; ++i) { + struct send_tester *send = &sending[i]; + ASSERT_UINT_EQUALS(0, send->on_complete_count); + } + + /* manually invoke the write completion on the downstream io message */ + msg->on_completion(tester.testing_channel.channel, msg, AWS_ERROR_SUCCESS, msg->user_data); + aws_mem_release(msg->allocator, msg); + + /* now all frame write completions should have been invoked exactly once */ + for (size_t i = 0; i < count; ++i) { + struct send_tester *send = &sending[i]; + ASSERT_UINT_EQUALS(1, send->on_complete_count); + } + + ASSERT_SUCCESS(s_tester_clean_up(&tester)); + aws_mem_release(allocator, sending); + return AWS_OP_SUCCESS; +} + TEST_CASE(websocket_handler_send_halts_if_payload_fn_returns_false) { (void)ctx; struct tester tester;