diff --git a/source/darwin/nw_socket.c b/source/darwin/nw_socket.c index 062ab95fc..a7775e476 100644 --- a/source/darwin/nw_socket.c +++ b/source/darwin/nw_socket.c @@ -502,7 +502,8 @@ struct read_queue_node { struct aws_allocator *allocator; dispatch_data_t received_data; struct aws_linked_list_node node; - size_t current_offset; + size_t total_offset; + size_t region_offset; }; static void s_destroy_read_queue_node(struct read_queue_node *node) { @@ -1947,13 +1948,14 @@ static int s_socket_read_fn(struct aws_socket *socket, struct aws_byte_buf *read (dispatch_data_applier_t) ^ (dispatch_data_t region, size_t offset, const void *buffer, size_t size) { (void)region; (void)offset; - size_t to_copy = aws_min_size(max_to_read, size - read_node->current_offset); - aws_byte_buf_write(read_buffer, (const uint8_t *)buffer + read_node->current_offset, to_copy); + size_t to_copy = aws_min_size(max_to_read, size - read_node->region_offset); + aws_byte_buf_write(read_buffer, (const uint8_t *)buffer + read_node->total_offset, to_copy); max_to_read -= to_copy; *amount_read += to_copy; - read_node->current_offset += to_copy; - if (read_node->current_offset == size) { - read_node->current_offset = 0; + read_node->total_offset += to_copy; + read_node->region_offset += to_copy; + if (read_node->region_offset == size) { + read_node->region_offset = 0; return true; } return false; diff --git a/tests/CMakeLists.txt b/tests/CMakeLists.txt index dd28d712c..13f7a8c79 100644 --- a/tests/CMakeLists.txt +++ b/tests/CMakeLists.txt @@ -146,6 +146,7 @@ add_test_case(pem_sanitize_comments_around_pem_object_removed) add_test_case(pem_sanitize_empty_file_rejected) add_test_case(pem_sanitize_wrong_format_rejected) +add_test_case(socket_data_over_multiple_frames) add_test_case(socket_handler_echo_and_backpressure) add_test_case(socket_handler_close) # These tests fail on Windows due to some bug in our server code where, if the socket is closed diff --git a/tests/read_write_test_handler.c b/tests/read_write_test_handler.c index 959158c96..9f249754f 100644 --- a/tests/read_write_test_handler.c +++ b/tests/read_write_test_handler.c @@ -199,17 +199,26 @@ static void s_rw_handler_write_now( struct aws_byte_buf *buffer, aws_channel_on_message_write_completed_fn *on_completion, void *user_data) { + size_t remaining = buffer->len; + struct aws_byte_cursor write_cursor = aws_byte_cursor_from_buf(buffer); - struct aws_io_message *msg = - aws_channel_acquire_message_from_pool(slot->channel, AWS_IO_MESSAGE_APPLICATION_DATA, buffer->len); + while (remaining > 0) { + size_t chunk_size = remaining; + struct aws_io_message *msg = + aws_channel_acquire_message_from_pool(slot->channel, AWS_IO_MESSAGE_APPLICATION_DATA, chunk_size); + + chunk_size = aws_min_size(chunk_size, msg->message_data.capacity - msg->message_data.len); - msg->on_completion = on_completion; - msg->user_data = user_data; + msg->on_completion = on_completion; + msg->user_data = user_data; - struct aws_byte_cursor write_buffer = aws_byte_cursor_from_buf(buffer); - AWS_FATAL_ASSERT(aws_byte_buf_append(&msg->message_data, &write_buffer) == AWS_OP_SUCCESS); + struct aws_byte_cursor chunk_cursor = aws_byte_cursor_advance(&write_cursor, chunk_size); + AWS_FATAL_ASSERT(aws_byte_buf_append(&msg->message_data, &chunk_cursor) == AWS_OP_SUCCESS); - AWS_FATAL_ASSERT(aws_channel_slot_send_message(slot, msg, AWS_CHANNEL_DIR_WRITE) == AWS_OP_SUCCESS); + AWS_FATAL_ASSERT(aws_channel_slot_send_message(slot, msg, AWS_CHANNEL_DIR_WRITE) == AWS_OP_SUCCESS); + + remaining -= chunk_size; + } } static void s_rw_handler_write_task(struct aws_channel_task *task, void *arg, enum aws_task_status task_status) { diff --git a/tests/socket_handler_test.c b/tests/socket_handler_test.c index 896790379..875ac39e8 100644 --- a/tests/socket_handler_test.c +++ b/tests/socket_handler_test.c @@ -738,6 +738,131 @@ static int s_socket_echo_and_backpressure_test(struct aws_allocator *allocator, AWS_TEST_CASE(socket_handler_echo_and_backpressure, s_socket_echo_and_backpressure_test) +static int s_socket_data_over_multiple_frames_test(struct aws_allocator *allocator, void *ctx) { + (void)ctx; + + s_socket_common_tester_init(allocator, &c_tester); + + // Create a large message that will be split over multiple frames + const size_t total_bytes_to_send_from_server = g_aws_channel_max_fragment_size * 100; + struct aws_byte_buf msg_from_server; + ASSERT_SUCCESS(aws_byte_buf_init(&msg_from_server, allocator, total_bytes_to_send_from_server)); + // Seed the random number generator + srand((unsigned int)time(NULL)); + + // Fill the buffer with random printable ASCII characters + for (size_t i = 0; i < total_bytes_to_send_from_server; ++i) { + char random_char = 32 + (rand() % 95); // Printable ASCII characters range from 32 to 126 + ASSERT_TRUE(aws_byte_buf_write_u8(&msg_from_server, random_char)); + } + + struct aws_byte_buf client_received_message; + ASSERT_SUCCESS(aws_byte_buf_init(&client_received_message, allocator, total_bytes_to_send_from_server)); + + struct socket_test_rw_args server_rw_args; + ASSERT_SUCCESS(s_rw_args_init(&server_rw_args, &c_tester, aws_byte_buf_from_empty_array(NULL, 0), 0)); + + struct socket_test_rw_args client_rw_args; + ASSERT_SUCCESS(s_rw_args_init(&client_rw_args, &c_tester, client_received_message, 0)); + + struct aws_channel_handler *client_rw_handler = + rw_handler_new(allocator, s_socket_test_handle_read, s_socket_test_handle_write, true, 10000, &client_rw_args); + ASSERT_NOT_NULL(client_rw_handler); + + struct aws_channel_handler *server_rw_handler = + rw_handler_new(allocator, s_socket_test_handle_read, s_socket_test_handle_write, true, 10000, &server_rw_args); + ASSERT_NOT_NULL(server_rw_handler); + + struct socket_test_args server_args; + ASSERT_SUCCESS(s_socket_test_args_init(&server_args, &c_tester, server_rw_handler)); + + struct socket_test_args client_args; + ASSERT_SUCCESS(s_socket_test_args_init(&client_args, &c_tester, client_rw_handler)); + + struct local_server_tester local_server_tester; + ASSERT_SUCCESS( + s_local_server_tester_init(allocator, &local_server_tester, &server_args, &c_tester, AWS_SOCKET_LOCAL, true)); + + struct aws_client_bootstrap_options client_bootstrap_options = { + .event_loop_group = c_tester.el_group, + .host_resolver = c_tester.resolver, + }; + struct aws_client_bootstrap *client_bootstrap = aws_client_bootstrap_new(allocator, &client_bootstrap_options); + ASSERT_NOT_NULL(client_bootstrap); + + struct aws_socket_channel_bootstrap_options client_channel_options; + AWS_ZERO_STRUCT(client_channel_options); + client_channel_options.bootstrap = client_bootstrap; + client_channel_options.host_name = local_server_tester.endpoint.address; + client_channel_options.port = local_server_tester.endpoint.port; + client_channel_options.socket_options = &local_server_tester.socket_options; + client_channel_options.setup_callback = s_socket_handler_test_client_setup_callback; + client_channel_options.shutdown_callback = s_socket_handler_test_client_shutdown_callback; + client_channel_options.user_data = &client_args; + client_channel_options.enable_read_back_pressure = true; + + ASSERT_SUCCESS(aws_client_bootstrap_new_socket_channel(&client_channel_options)); + + ASSERT_SUCCESS(aws_mutex_lock(&c_tester.mutex)); + + /* wait for both ends to setup */ + ASSERT_SUCCESS(aws_condition_variable_wait_for_pred( + &c_tester.condition_variable, &c_tester.mutex, TIMEOUT, s_channel_setup_predicate, &server_args)); + ASSERT_SUCCESS(aws_condition_variable_wait_for_pred( + &c_tester.condition_variable, &c_tester.mutex, TIMEOUT, s_channel_setup_predicate, &client_args)); + + /* send msg from server to client, and wait for some bytes to be received */ + rw_handler_write(server_args.rw_handler, server_args.rw_slot, &msg_from_server); + ASSERT_SUCCESS(aws_condition_variable_wait_for_pred( + &c_tester.condition_variable, &c_tester.mutex, TIMEOUT, s_socket_test_read_predicate, &client_rw_args)); + + /* confirm that the initial read window was respected */ + ASSERT_SUCCESS(client_rw_args.amount_read == 1000); + + client_rw_args.invocation_happened = false; + client_rw_args.expected_read = total_bytes_to_send_from_server; + + /* increment the read window on the client side and confirm it receives the remainder of the message */ + rw_handler_trigger_increment_read_window( + client_args.rw_handler, client_args.rw_slot, total_bytes_to_send_from_server); + ASSERT_SUCCESS(aws_condition_variable_wait_pred( + &c_tester.condition_variable, &c_tester.mutex, s_socket_test_full_read_predicate, &client_rw_args)); + + ASSERT_INT_EQUALS(total_bytes_to_send_from_server, client_rw_args.amount_read); + + ASSERT_BIN_ARRAYS_EQUALS( + msg_from_server.buffer, + msg_from_server.len, + client_rw_args.received_message.buffer, + client_rw_args.received_message.len); + + /* shut down both sides */ + ASSERT_SUCCESS(aws_channel_shutdown(server_args.channel, AWS_OP_SUCCESS)); + ASSERT_SUCCESS(aws_channel_shutdown(client_args.channel, AWS_OP_SUCCESS)); + + ASSERT_SUCCESS(aws_condition_variable_wait_for_pred( + &c_tester.condition_variable, &c_tester.mutex, TIMEOUT, s_channel_shutdown_predicate, &server_args)); + ASSERT_SUCCESS(aws_condition_variable_wait_for_pred( + &c_tester.condition_variable, &c_tester.mutex, TIMEOUT, s_channel_shutdown_predicate, &client_args)); + aws_server_bootstrap_destroy_socket_listener(local_server_tester.server_bootstrap, local_server_tester.listener); + ASSERT_SUCCESS(aws_condition_variable_wait_for_pred( + &c_tester.condition_variable, &c_tester.mutex, TIMEOUT, s_listener_destroy_predicate, &server_args)); + + aws_mutex_unlock(&c_tester.mutex); + + /* clean up */ + ASSERT_SUCCESS(s_local_server_tester_clean_up(&local_server_tester)); + + aws_client_bootstrap_release(client_bootstrap); + ASSERT_SUCCESS(s_socket_common_tester_clean_up(&c_tester)); + aws_byte_buf_clean_up(&msg_from_server); + aws_byte_buf_clean_up(&client_received_message); + + return AWS_OP_SUCCESS; +} + +AWS_TEST_CASE(socket_data_over_multiple_frames, s_socket_data_over_multiple_frames_test) + static int s_socket_close_test(struct aws_allocator *allocator, void *ctx) { (void)ctx;