diff --git a/include/aws/http/connection.h b/include/aws/http/connection.h index ede27c362..61a23c6c9 100644 --- a/include/aws/http/connection.h +++ b/include/aws/http/connection.h @@ -391,6 +391,13 @@ struct aws_http_client_connection_options { * If connection is HTTP/2 and options were not specified, default values are used. */ const struct aws_http2_connection_options *http2_options; + + /** + * Optional. + * Requests the channel/connection be bound to a specific event loop rather than chosen sequentially from the + * event loop group associated with the client bootstrap. + */ + struct aws_event_loop *requested_event_loop; }; /* Predefined settings identifiers (RFC-7540 6.5.2) */ diff --git a/include/aws/http/private/proxy_impl.h b/include/aws/http/private/proxy_impl.h index 6d99f6ee8..06d57e844 100644 --- a/include/aws/http/private/proxy_impl.h +++ b/include/aws/http/private/proxy_impl.h @@ -120,6 +120,8 @@ struct aws_http_proxy_user_data { aws_client_bootstrap_on_channel_event_fn *original_channel_on_shutdown; struct aws_http_proxy_config *proxy_config; + + struct aws_event_loop *requested_event_loop; }; struct aws_http_proxy_system_vtable { diff --git a/include/aws/http/websocket.h b/include/aws/http/websocket.h index 7b124bd18..6686798d5 100644 --- a/include/aws/http/websocket.h +++ b/include/aws/http/websocket.h @@ -248,6 +248,14 @@ struct aws_websocket_client_connection_options { * reaches 0, no further data will be received. */ bool manual_window_management; + + /** + * Optional + * If set, requests that a specific event loop be used to seat the connection, rather than the next one + * in the event loop group. Useful for serializing all io and external events related to a client onto + * a single thread. + */ + struct aws_event_loop *requested_event_loop; }; /** diff --git a/source/connection.c b/source/connection.c index 9c7de96c8..7e7767c54 100644 --- a/source/connection.c +++ b/source/connection.c @@ -1092,6 +1092,7 @@ int aws_http_client_connect_internal( .shutdown_callback = s_client_bootstrap_on_channel_shutdown, .enable_read_back_pressure = options.manual_window_management, .user_data = http_bootstrap, + .requested_event_loop = options.requested_event_loop, }; err = s_system_vtable_ptr->new_socket_channel(&channel_options); diff --git a/source/proxy_connection.c b/source/proxy_connection.c index 36bcfa4ce..44e65362d 100644 --- a/source/proxy_connection.c +++ b/source/proxy_connection.c @@ -136,6 +136,7 @@ struct aws_http_proxy_user_data *aws_http_proxy_user_data_new( user_data->original_http_on_shutdown = options->on_shutdown; user_data->original_channel_on_setup = on_channel_setup; user_data->original_channel_on_shutdown = on_channel_shutdown; + user_data->requested_event_loop = options->requested_event_loop; /* one and only one setup callback must be valid */ AWS_FATAL_ASSERT((user_data->original_http_on_setup == NULL) != (user_data->original_channel_on_setup == NULL)); @@ -984,6 +985,7 @@ static int s_aws_http_client_connect_via_forwarding_proxy(const struct aws_http_ options_copy.on_setup = s_aws_http_on_client_connection_http_forwarding_proxy_setup_fn; options_copy.on_shutdown = s_aws_http_on_client_connection_http_proxy_shutdown_fn; options_copy.tls_options = options->proxy_options->tls_options; + options_copy.requested_event_loop = options->requested_event_loop; int result = aws_http_client_connect_internal(&options_copy, s_proxy_http_request_transform); if (result == AWS_OP_ERR) { @@ -1018,6 +1020,7 @@ static int s_create_tunneling_connection(struct aws_http_proxy_user_data *user_d connect_options.on_shutdown = s_aws_http_on_client_connection_http_proxy_shutdown_fn; connect_options.http1_options = NULL; /* ToDo */ connect_options.http2_options = NULL; /* ToDo */ + connect_options.requested_event_loop = user_data->requested_event_loop; int result = aws_http_client_connect(&connect_options); if (result == AWS_OP_ERR) { @@ -1565,6 +1568,7 @@ int aws_http_proxy_new_socket_channel( http_connection_options.user_data = user_data; http_connection_options.on_setup = NULL; /* use channel callbacks, not http callbacks */ http_connection_options.on_shutdown = NULL; /* use channel callbacks, not http callbacks */ + http_connection_options.requested_event_loop = channel_options->requested_event_loop; if (s_aws_http_client_connect_via_tunneling_proxy( &http_connection_options, s_http_proxied_socket_channel_setup, s_http_proxied_socket_channel_shutdown)) { diff --git a/source/websocket_bootstrap.c b/source/websocket_bootstrap.c index 96e1059a3..8071a89ce 100644 --- a/source/websocket_bootstrap.c +++ b/source/websocket_bootstrap.c @@ -196,6 +196,7 @@ int aws_websocket_client_connect(const struct aws_websocket_client_connection_op http_options.user_data = ws_bootstrap; http_options.on_setup = s_ws_bootstrap_on_http_setup; http_options.on_shutdown = s_ws_bootstrap_on_http_shutdown; + http_options.requested_event_loop = options->requested_event_loop; /* Infer port, if not explicitly specified in URI */ http_options.port = options->port; diff --git a/tests/CMakeLists.txt b/tests/CMakeLists.txt index 02781c61a..ce2e17dc9 100644 --- a/tests/CMakeLists.txt +++ b/tests/CMakeLists.txt @@ -470,10 +470,11 @@ add_test_case(server_new_destroy) add_test_case(connection_setup_shutdown) add_test_case(connection_setup_shutdown_tls) add_test_case(connection_setup_shutdown_proxy_setting_on_ev_not_found) +add_test_case(connection_setup_shutdown_pinned_event_loop) add_test_case(connection_h2_prior_knowledge) add_test_case(connection_h2_prior_knowledge_not_work_with_tls) add_test_case(connection_customized_alpn) -add_test_case(connection_customized_alpn_error_with_unknow_return_string) +add_test_case(connection_customized_alpn_error_with_unknown_return_string) # These server tests occasionally fail. Resurrect if/when we get back to work on HTTP server. #add_test_case(connection_destroy_server_with_connection_existing) #add_test_case(connection_destroy_server_with_multiple_connections_existing) diff --git a/tests/test_connection.c b/tests/test_connection.c index df98cbb59..5848f89ef 100644 --- a/tests/test_connection.c +++ b/tests/test_connection.c @@ -11,6 +11,7 @@ #include #include #include +#include #include #include #include @@ -42,12 +43,14 @@ struct tester_options { char *server_alpn_list; char *client_alpn_list; bool no_connection; /* don't connect server to client */ + bool pin_event_loop; }; /* Singleton used by tests in this file */ struct tester { struct aws_allocator *alloc; - struct aws_event_loop_group *event_loop_group; + struct aws_event_loop_group *server_event_loop_group; + struct aws_event_loop_group *client_event_loop_group; struct aws_host_resolver *host_resolver; struct aws_server_bootstrap *server_bootstrap; struct aws_http_server *server; @@ -239,7 +242,7 @@ static void s_client_connection_options_init_tester( struct aws_http_client_connection_options *client_options, struct tester *tester) { struct aws_client_bootstrap_options bootstrap_options = { - .event_loop_group = tester->event_loop_group, + .event_loop_group = tester->client_event_loop_group, .host_resolver = tester->host_resolver, }; tester->client_bootstrap = aws_client_bootstrap_new(tester->alloc, &bootstrap_options); @@ -298,15 +301,37 @@ static int s_tester_init(struct tester *tester, const struct tester_options *opt ASSERT_SUCCESS(aws_mutex_init(&tester->wait_lock)); ASSERT_SUCCESS(aws_condition_variable_init(&tester->wait_cvar)); - tester->event_loop_group = aws_event_loop_group_new_default(tester->alloc, 1, NULL); + /* + * The current http testing framework has several issues that hinder testing event loop pinning: + * (1) Server shutdown can crash with memory corruption if the server uses an event loop group with more than one + * thread + * (2) s_tester_wait mixes results from both client and server and once you unlink them out of the same, single- + * threaded event loop, the test assumptions start breaking due to different serializations of io events. + * + * This leads to a self-defeating situation: in order to test event loop pinning we need event loop groups with + * many threads, but as soon as we use one, existing tests start breaking. + * + * Event loop pinning is a critical blocker for an upcoming release, so rather than trying to figure out the + * underlying race condition within the http testing framework (I suspect it's socket listener related), we + * instead add some complexity to the testing framework such that + * (1) Existing tests continue to use a single event loop group with one thread + * (2) The event loop pinning test uses two event loop groups, the server elg with a single thread and the + * client elg with many threads to actually test pinning. + */ + tester->server_event_loop_group = aws_event_loop_group_new_default(tester->alloc, 1, NULL); + if (options->pin_event_loop) { + tester->client_event_loop_group = aws_event_loop_group_new_default(tester->alloc, 16, NULL); + } else { + tester->client_event_loop_group = aws_event_loop_group_acquire(tester->server_event_loop_group); + } struct aws_host_resolver_default_options resolver_options = { - .el_group = tester->event_loop_group, + .el_group = tester->client_event_loop_group, .max_entries = 8, }; tester->host_resolver = aws_host_resolver_new_default(tester->alloc, &resolver_options); - tester->server_bootstrap = aws_server_bootstrap_new(tester->alloc, tester->event_loop_group); + tester->server_bootstrap = aws_server_bootstrap_new(tester->alloc, tester->server_event_loop_group); ASSERT_NOT_NULL(tester->server_bootstrap); struct aws_socket_options socket_options = { @@ -360,6 +385,11 @@ static int s_tester_init(struct tester *tester, const struct tester_options *opt aws_byte_cursor_from_c_str("localhost"))); client_options.tls_options = &tester->client_tls_connection_options; } + + if (options->pin_event_loop) { + client_options.requested_event_loop = aws_event_loop_group_get_next_loop(tester->client_event_loop_group); + } + tester->client_options = client_options; tester->server_connection_num = 0; @@ -395,7 +425,8 @@ static int s_tester_clean_up(struct tester *tester) { aws_server_bootstrap_release(tester->server_bootstrap); aws_client_bootstrap_release(tester->client_bootstrap); aws_host_resolver_release(tester->host_resolver); - aws_event_loop_group_release(tester->event_loop_group); + aws_event_loop_group_release(tester->client_event_loop_group); + aws_event_loop_group_release(tester->server_event_loop_group); aws_http_library_clean_up(); aws_mutex_clean_up(&tester->wait_lock); @@ -645,7 +676,7 @@ static int s_test_connection_customized_alpn(struct aws_allocator *allocator, vo } AWS_TEST_CASE(connection_customized_alpn, s_test_connection_customized_alpn); -static int s_test_connection_customized_alpn_error_with_unknow_return_string( +static int s_test_connection_customized_alpn_error_with_unknown_return_string( struct aws_allocator *allocator, void *ctx) { (void)ctx; @@ -702,8 +733,8 @@ static int s_test_connection_customized_alpn_error_with_unknow_return_string( return AWS_OP_SUCCESS; } AWS_TEST_CASE( - connection_customized_alpn_error_with_unknow_return_string, - s_test_connection_customized_alpn_error_with_unknow_return_string); + connection_customized_alpn_error_with_unknown_return_string, + s_test_connection_customized_alpn_error_with_unknown_return_string); static int s_test_connection_destroy_server_with_connection_existing(struct aws_allocator *allocator, void *ctx) { (void)ctx; @@ -856,7 +887,7 @@ static int s_test_connection_server_shutting_down_new_connection_setup_fail( /* get the first eventloop of tester, which will be the eventloop for server listener socket, block the listener * socket */ - struct aws_event_loop *server_eventloop = aws_event_loop_group_get_loop_at(tester.event_loop_group, 0); + struct aws_event_loop *server_eventloop = aws_event_loop_group_get_loop_at(tester.server_event_loop_group, 0); struct aws_task *server_block_task = aws_mem_acquire(allocator, sizeof(struct aws_task)); aws_task_init(server_block_task, s_block_task, &tester, "wait_a_bit"); aws_event_loop_schedule_task_now(server_eventloop, server_block_task); @@ -916,3 +947,27 @@ static int s_test_connection_server_shutting_down_new_connection_setup_fail( AWS_TEST_CASE( connection_server_shutting_down_new_connection_setup_fail, s_test_connection_server_shutting_down_new_connection_setup_fail); + +static int s_test_connection_setup_shutdown_pinned_event_loop(struct aws_allocator *allocator, void *ctx) { + (void)ctx; + struct tester_options options = { + .alloc = allocator, + .pin_event_loop = true, + }; + struct tester tester; + ASSERT_SUCCESS(s_tester_init(&tester, &options)); + + for (int i = 0; i < tester.client_connection_num; i++) { + struct aws_http_connection *connection = tester.client_connections[i]; + ASSERT_PTR_EQUALS( + tester.client_options.requested_event_loop, aws_channel_get_event_loop(connection->channel_slot->channel)); + } + + release_all_client_connections(&tester); + release_all_server_connections(&tester); + ASSERT_SUCCESS(s_tester_wait(&tester, s_tester_connection_shutdown_pred)); + + ASSERT_SUCCESS(s_tester_clean_up(&tester)); + return AWS_OP_SUCCESS; +} +AWS_TEST_CASE(connection_setup_shutdown_pinned_event_loop, s_test_connection_setup_shutdown_pinned_event_loop);