Skip to content

Commit

Permalink
Proposal for moving websocket frame write completions from on-send-do…
Browse files Browse the repository at this point in the history
…wnstream to on-socket-write-completion (#367)

* Proposal for moving websocket frame write completions from on-send-downstream to on-socket-write-completion

* Add some comments

* Additional test verifying correct websocket frame write completion timing

* Remove unnecessary completion callback
  • Loading branch information
bretambrose authored Aug 4, 2022
1 parent 397cffa commit eb9a90a
Show file tree
Hide file tree
Showing 3 changed files with 97 additions and 1 deletion.
41 changes: 40 additions & 1 deletion source/websocket.c
Original file line number Diff line number Diff line change
Expand Up @@ -61,9 +61,19 @@ struct aws_websocket {
/* Data that should only be accessed from the websocket's channel thread. */
struct {
struct aws_websocket_encoder encoder;

/* list of outbound frames that have yet to be encoded and sent to the socket */
struct aws_linked_list outgoing_frame_list;

/* current outbound frame being encoded and sent to the socket */
struct outgoing_frame *current_outgoing_frame;

/*
* list of outbound frames that have been completely written to the io message heading to the socket.
* When the socket write completes we can in turn invoke completion callbacks for all of these frames
*/
struct aws_linked_list write_completion_frames;

struct aws_websocket_decoder decoder;
struct aws_websocket_incoming_frame *current_incoming_frame;
struct aws_websocket_incoming_frame incoming_frame_storage;
Expand Down Expand Up @@ -167,6 +177,7 @@ static int s_decoder_on_user_payload(struct aws_websocket *websocket, struct aws
static int s_decoder_on_midchannel_payload(struct aws_websocket *websocket, struct aws_byte_cursor data);

static void s_destroy_outgoing_frame(struct aws_websocket *websocket, struct outgoing_frame *frame, int error_code);
static void s_complete_frame_list(struct aws_websocket *websocket, struct aws_linked_list *frames, int error_code);
static void s_complete_incoming_frame(struct aws_websocket *websocket, int error_code, bool *out_callback_result);
static void s_finish_shutdown(struct aws_websocket *websocket);
static void s_io_message_write_completed(
Expand Down Expand Up @@ -296,6 +307,7 @@ struct aws_websocket *aws_websocket_handler_new(const struct aws_websocket_handl
aws_channel_task_init(&websocket->close_timeout_task, s_close_timeout_task, websocket, "websocket_close_timeout");

aws_linked_list_init(&websocket->thread_data.outgoing_frame_list);
aws_linked_list_init(&websocket->thread_data.write_completion_frames);

aws_websocket_encoder_init(&websocket->thread_data.encoder, s_encoder_stream_outgoing_payload, websocket);

Expand Down Expand Up @@ -697,7 +709,13 @@ static void s_try_write_outgoing_frames(struct aws_websocket *websocket) {
wrote_close_frame = true;
}

s_destroy_outgoing_frame(websocket, websocket->thread_data.current_outgoing_frame, AWS_ERROR_SUCCESS);
/*
* a completely-written frame gets added to the write completion list so that when the socket write completes
* we can complete all of the outbound frames that were finished as part of the io message
*/
aws_linked_list_push_back(
&websocket->thread_data.write_completion_frames, &websocket->thread_data.current_outgoing_frame->node);

websocket->thread_data.current_outgoing_frame = NULL;

if (wrote_close_frame) {
Expand Down Expand Up @@ -817,6 +835,12 @@ static void s_io_message_write_completed(
struct aws_websocket *websocket = user_data;
AWS_ASSERT(aws_channel_thread_is_callers_thread(channel));

/*
* Invoke the completion callbacks (and then destroy) for all the frames that were completely written as
* part of this message completion at the socket layer
*/
s_complete_frame_list(websocket, &websocket->thread_data.write_completion_frames, err_code);

if (err_code == AWS_ERROR_SUCCESS) {
AWS_LOGF_TRACE(
AWS_LS_HTTP_WEBSOCKET, "id=%p: aws_io_message written to socket, sending more data...", (void *)websocket);
Expand Down Expand Up @@ -914,6 +938,19 @@ static void s_destroy_outgoing_frame(struct aws_websocket *websocket, struct out
aws_mem_release(websocket->alloc, frame);
}

static void s_complete_frame_list(struct aws_websocket *websocket, struct aws_linked_list *frames, int error_code) {
struct aws_linked_list_node *node = aws_linked_list_begin(frames);
while (node != aws_linked_list_end(frames)) {
struct outgoing_frame *frame = AWS_CONTAINER_OF(node, struct outgoing_frame, node);

node = aws_linked_list_next(node);
s_destroy_outgoing_frame(websocket, frame, error_code);
}

/* we've released everything, so reset the list to empty */
aws_linked_list_init(frames);
}

static void s_stop_writing(struct aws_websocket *websocket, int send_frame_error_code) {
AWS_ASSERT(aws_channel_thread_is_callers_thread(websocket->channel_slot->channel));
AWS_ASSERT(send_frame_error_code != AWS_ERROR_SUCCESS);
Expand Down Expand Up @@ -1196,6 +1233,8 @@ static void s_finish_shutdown(struct aws_websocket *websocket) {
s_unlock_synced_data(websocket);
/* END CRITICAL SECTION */

s_complete_frame_list(websocket, &websocket->thread_data.write_completion_frames, AWS_ERROR_HTTP_CONNECTION_CLOSED);

while (!aws_linked_list_empty(&websocket->thread_data.outgoing_frame_list)) {
struct aws_linked_list_node *node = aws_linked_list_pop_front(&websocket->thread_data.outgoing_frame_list);
struct outgoing_frame *frame = AWS_CONTAINER_OF(node, struct outgoing_frame, node);
Expand Down
1 change: 1 addition & 0 deletions tests/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,7 @@ add_test_case(websocket_handler_send_high_priority_frame)
add_test_case(websocket_handler_sends_nothing_after_close_frame)
add_test_case(websocket_handler_send_frames_always_complete)
add_test_case(websocket_handler_send_one_io_msg_at_a_time)
add_test_case(websocket_handler_delayed_write_completion)
add_test_case(websocket_handler_send_halts_if_payload_fn_returns_false)
add_test_case(websocket_handler_shutdown_automatically_sends_close_frame)
add_test_case(websocket_handler_shutdown_handles_queued_close_frame)
Expand Down
56 changes: 56 additions & 0 deletions tests/test_websocket_handler.c
Original file line number Diff line number Diff line change
Expand Up @@ -1133,6 +1133,62 @@ TEST_CASE(websocket_handler_send_one_io_msg_at_a_time) {
return AWS_OP_SUCCESS;
}

/*
* Verifies that the write completion callbacks for websocket frames are not invoked immediately after relaying
* towards the left end (socket) of the channel
*/
TEST_CASE(websocket_handler_delayed_write_completion) {
(void)ctx;
struct tester tester;
ASSERT_SUCCESS(s_tester_init(&tester, allocator));

struct aws_byte_cursor payload = aws_byte_cursor_from_c_str("bitter butter.");

const size_t count = 2;
struct send_tester *sending = aws_mem_acquire(allocator, sizeof(struct send_tester) * count);
ASSERT_NOT_NULL(sending);
memset(sending, 0, sizeof(struct send_tester) * count);

for (size_t i = 0; i < count; ++i) {
struct send_tester *send = &sending[i];
send->payload = payload;
send->def.opcode = AWS_WEBSOCKET_OPCODE_TEXT;
send->def.fin = true;

ASSERT_SUCCESS(s_send_frame(&tester, send));
}

/* Turn off instant write completion and run the channel */
testing_channel_complete_written_messages_immediately(&tester.testing_channel, false, AWS_OP_SUCCESS);
testing_channel_drain_queued_tasks(&tester.testing_channel);

struct aws_linked_list *io_msgs = testing_channel_get_written_message_queue(&tester.testing_channel);
ASSERT_FALSE(aws_linked_list_empty(io_msgs));

struct aws_linked_list_node *node = aws_linked_list_pop_front(io_msgs);
struct aws_io_message *msg = AWS_CONTAINER_OF(node, struct aws_io_message, queueing_handle);

/* we've relayed the frames, but no frame write completions should have been invoked */
for (size_t i = 0; i < count; ++i) {
struct send_tester *send = &sending[i];
ASSERT_UINT_EQUALS(0, send->on_complete_count);
}

/* manually invoke the write completion on the downstream io message */
msg->on_completion(tester.testing_channel.channel, msg, AWS_ERROR_SUCCESS, msg->user_data);
aws_mem_release(msg->allocator, msg);

/* now all frame write completions should have been invoked exactly once */
for (size_t i = 0; i < count; ++i) {
struct send_tester *send = &sending[i];
ASSERT_UINT_EQUALS(1, send->on_complete_count);
}

ASSERT_SUCCESS(s_tester_clean_up(&tester));
aws_mem_release(allocator, sending);
return AWS_OP_SUCCESS;
}

TEST_CASE(websocket_handler_send_halts_if_payload_fn_returns_false) {
(void)ctx;
struct tester tester;
Expand Down

0 comments on commit eb9a90a

Please sign in to comment.