Skip to content

Commit

Permalink
fix dipatch_data_apply
Browse files Browse the repository at this point in the history
  • Loading branch information
xiazhvera committed Feb 26, 2025
1 parent ffc1f3f commit 9923cf0
Show file tree
Hide file tree
Showing 4 changed files with 150 additions and 13 deletions.
14 changes: 8 additions & 6 deletions source/darwin/nw_socket.c
Original file line number Diff line number Diff line change
Expand Up @@ -502,7 +502,8 @@ struct read_queue_node {
struct aws_allocator *allocator;
dispatch_data_t received_data;
struct aws_linked_list_node node;
size_t current_offset;
size_t total_offset;
size_t region_offset;
};

static void s_destroy_read_queue_node(struct read_queue_node *node) {
Expand Down Expand Up @@ -1947,13 +1948,14 @@ static int s_socket_read_fn(struct aws_socket *socket, struct aws_byte_buf *read
(dispatch_data_applier_t) ^ (dispatch_data_t region, size_t offset, const void *buffer, size_t size) {
(void)region;
(void)offset;
size_t to_copy = aws_min_size(max_to_read, size - read_node->current_offset);
aws_byte_buf_write(read_buffer, (const uint8_t *)buffer + read_node->current_offset, to_copy);
size_t to_copy = aws_min_size(max_to_read, size - read_node->region_offset);
aws_byte_buf_write(read_buffer, (const uint8_t *)buffer + read_node->total_offset, to_copy);
max_to_read -= to_copy;
*amount_read += to_copy;
read_node->current_offset += to_copy;
if (read_node->current_offset == size) {
read_node->current_offset = 0;
read_node->total_offset += to_copy;
read_node->region_offset += to_copy;
if (read_node->region_offset == size) {
read_node->region_offset = 0;
return true;
}
return false;
Expand Down
1 change: 1 addition & 0 deletions tests/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,7 @@ add_test_case(pem_sanitize_comments_around_pem_object_removed)
add_test_case(pem_sanitize_empty_file_rejected)
add_test_case(pem_sanitize_wrong_format_rejected)

add_test_case(socket_data_over_multiple_frames)
add_test_case(socket_handler_echo_and_backpressure)
add_test_case(socket_handler_close)
# These tests fail on Windows due to some bug in our server code where, if the socket is closed
Expand Down
23 changes: 16 additions & 7 deletions tests/read_write_test_handler.c
Original file line number Diff line number Diff line change
Expand Up @@ -199,17 +199,26 @@ static void s_rw_handler_write_now(
struct aws_byte_buf *buffer,
aws_channel_on_message_write_completed_fn *on_completion,
void *user_data) {
size_t remaining = buffer->len;
struct aws_byte_cursor write_cursor = aws_byte_cursor_from_buf(buffer);

struct aws_io_message *msg =
aws_channel_acquire_message_from_pool(slot->channel, AWS_IO_MESSAGE_APPLICATION_DATA, buffer->len);
while (remaining > 0) {
size_t chunk_size = remaining;
struct aws_io_message *msg =
aws_channel_acquire_message_from_pool(slot->channel, AWS_IO_MESSAGE_APPLICATION_DATA, chunk_size);

chunk_size = aws_min_size(chunk_size, msg->message_data.capacity - msg->message_data.len);

msg->on_completion = on_completion;
msg->user_data = user_data;
msg->on_completion = on_completion;
msg->user_data = user_data;

struct aws_byte_cursor write_buffer = aws_byte_cursor_from_buf(buffer);
AWS_FATAL_ASSERT(aws_byte_buf_append(&msg->message_data, &write_buffer) == AWS_OP_SUCCESS);
struct aws_byte_cursor chunk_cursor = aws_byte_cursor_advance(&write_cursor, chunk_size);
AWS_FATAL_ASSERT(aws_byte_buf_append(&msg->message_data, &chunk_cursor) == AWS_OP_SUCCESS);

AWS_FATAL_ASSERT(aws_channel_slot_send_message(slot, msg, AWS_CHANNEL_DIR_WRITE) == AWS_OP_SUCCESS);
AWS_FATAL_ASSERT(aws_channel_slot_send_message(slot, msg, AWS_CHANNEL_DIR_WRITE) == AWS_OP_SUCCESS);

remaining -= chunk_size;
}
}

static void s_rw_handler_write_task(struct aws_channel_task *task, void *arg, enum aws_task_status task_status) {
Expand Down
125 changes: 125 additions & 0 deletions tests/socket_handler_test.c
Original file line number Diff line number Diff line change
Expand Up @@ -738,6 +738,131 @@ static int s_socket_echo_and_backpressure_test(struct aws_allocator *allocator,

AWS_TEST_CASE(socket_handler_echo_and_backpressure, s_socket_echo_and_backpressure_test)

static int s_socket_data_over_multiple_frames_test(struct aws_allocator *allocator, void *ctx) {
(void)ctx;

s_socket_common_tester_init(allocator, &c_tester);

// Create a large message that will be split over multiple frames
const size_t total_bytes_to_send_from_server = g_aws_channel_max_fragment_size * 100;
struct aws_byte_buf msg_from_server;
ASSERT_SUCCESS(aws_byte_buf_init(&msg_from_server, allocator, total_bytes_to_send_from_server));
// Seed the random number generator
srand((unsigned int)time(NULL));

// Fill the buffer with random printable ASCII characters
for (size_t i = 0; i < total_bytes_to_send_from_server; ++i) {
char random_char = 32 + (rand() % 95); // Printable ASCII characters range from 32 to 126
ASSERT_TRUE(aws_byte_buf_write_u8(&msg_from_server, random_char));
}

struct aws_byte_buf client_received_message;
ASSERT_SUCCESS(aws_byte_buf_init(&client_received_message, allocator, total_bytes_to_send_from_server));

struct socket_test_rw_args server_rw_args;
ASSERT_SUCCESS(s_rw_args_init(&server_rw_args, &c_tester, aws_byte_buf_from_empty_array(NULL, 0), 0));

struct socket_test_rw_args client_rw_args;
ASSERT_SUCCESS(s_rw_args_init(&client_rw_args, &c_tester, client_received_message, 0));

struct aws_channel_handler *client_rw_handler =
rw_handler_new(allocator, s_socket_test_handle_read, s_socket_test_handle_write, true, 10000, &client_rw_args);
ASSERT_NOT_NULL(client_rw_handler);

struct aws_channel_handler *server_rw_handler =
rw_handler_new(allocator, s_socket_test_handle_read, s_socket_test_handle_write, true, 10000, &server_rw_args);
ASSERT_NOT_NULL(server_rw_handler);

struct socket_test_args server_args;
ASSERT_SUCCESS(s_socket_test_args_init(&server_args, &c_tester, server_rw_handler));

struct socket_test_args client_args;
ASSERT_SUCCESS(s_socket_test_args_init(&client_args, &c_tester, client_rw_handler));

struct local_server_tester local_server_tester;
ASSERT_SUCCESS(
s_local_server_tester_init(allocator, &local_server_tester, &server_args, &c_tester, AWS_SOCKET_LOCAL, true));

struct aws_client_bootstrap_options client_bootstrap_options = {
.event_loop_group = c_tester.el_group,
.host_resolver = c_tester.resolver,
};
struct aws_client_bootstrap *client_bootstrap = aws_client_bootstrap_new(allocator, &client_bootstrap_options);
ASSERT_NOT_NULL(client_bootstrap);

struct aws_socket_channel_bootstrap_options client_channel_options;
AWS_ZERO_STRUCT(client_channel_options);
client_channel_options.bootstrap = client_bootstrap;
client_channel_options.host_name = local_server_tester.endpoint.address;
client_channel_options.port = local_server_tester.endpoint.port;
client_channel_options.socket_options = &local_server_tester.socket_options;
client_channel_options.setup_callback = s_socket_handler_test_client_setup_callback;
client_channel_options.shutdown_callback = s_socket_handler_test_client_shutdown_callback;
client_channel_options.user_data = &client_args;
client_channel_options.enable_read_back_pressure = true;

ASSERT_SUCCESS(aws_client_bootstrap_new_socket_channel(&client_channel_options));

ASSERT_SUCCESS(aws_mutex_lock(&c_tester.mutex));

/* wait for both ends to setup */
ASSERT_SUCCESS(aws_condition_variable_wait_for_pred(
&c_tester.condition_variable, &c_tester.mutex, TIMEOUT, s_channel_setup_predicate, &server_args));
ASSERT_SUCCESS(aws_condition_variable_wait_for_pred(
&c_tester.condition_variable, &c_tester.mutex, TIMEOUT, s_channel_setup_predicate, &client_args));

/* send msg from server to client, and wait for some bytes to be received */
rw_handler_write(server_args.rw_handler, server_args.rw_slot, &msg_from_server);
ASSERT_SUCCESS(aws_condition_variable_wait_for_pred(
&c_tester.condition_variable, &c_tester.mutex, TIMEOUT, s_socket_test_read_predicate, &client_rw_args));

/* confirm that the initial read window was respected */
ASSERT_SUCCESS(client_rw_args.amount_read == 1000);

client_rw_args.invocation_happened = false;
client_rw_args.expected_read = total_bytes_to_send_from_server;

/* increment the read window on the client side and confirm it receives the remainder of the message */
rw_handler_trigger_increment_read_window(
client_args.rw_handler, client_args.rw_slot, total_bytes_to_send_from_server);
ASSERT_SUCCESS(aws_condition_variable_wait_pred(
&c_tester.condition_variable, &c_tester.mutex, s_socket_test_full_read_predicate, &client_rw_args));

ASSERT_INT_EQUALS(total_bytes_to_send_from_server, client_rw_args.amount_read);

ASSERT_BIN_ARRAYS_EQUALS(
msg_from_server.buffer,
msg_from_server.len,
client_rw_args.received_message.buffer,
client_rw_args.received_message.len);

/* shut down both sides */
ASSERT_SUCCESS(aws_channel_shutdown(server_args.channel, AWS_OP_SUCCESS));
ASSERT_SUCCESS(aws_channel_shutdown(client_args.channel, AWS_OP_SUCCESS));

ASSERT_SUCCESS(aws_condition_variable_wait_for_pred(
&c_tester.condition_variable, &c_tester.mutex, TIMEOUT, s_channel_shutdown_predicate, &server_args));
ASSERT_SUCCESS(aws_condition_variable_wait_for_pred(
&c_tester.condition_variable, &c_tester.mutex, TIMEOUT, s_channel_shutdown_predicate, &client_args));
aws_server_bootstrap_destroy_socket_listener(local_server_tester.server_bootstrap, local_server_tester.listener);
ASSERT_SUCCESS(aws_condition_variable_wait_for_pred(
&c_tester.condition_variable, &c_tester.mutex, TIMEOUT, s_listener_destroy_predicate, &server_args));

aws_mutex_unlock(&c_tester.mutex);

/* clean up */
ASSERT_SUCCESS(s_local_server_tester_clean_up(&local_server_tester));

aws_client_bootstrap_release(client_bootstrap);
ASSERT_SUCCESS(s_socket_common_tester_clean_up(&c_tester));
aws_byte_buf_clean_up(&msg_from_server);
aws_byte_buf_clean_up(&client_received_message);

return AWS_OP_SUCCESS;
}

AWS_TEST_CASE(socket_data_over_multiple_frames, s_socket_data_over_multiple_frames_test)

static int s_socket_close_test(struct aws_allocator *allocator, void *ctx) {
(void)ctx;

Expand Down

0 comments on commit 9923cf0

Please sign in to comment.