Skip to content

Commit

Permalink
Event loop pin (#391)
Browse files Browse the repository at this point in the history
* Http connection event loop pinning
* Websocket event loop pinning
  • Loading branch information
bretambrose authored Oct 12, 2022
1 parent 82fb7ed commit dcbc111
Show file tree
Hide file tree
Showing 8 changed files with 90 additions and 11 deletions.
7 changes: 7 additions & 0 deletions include/aws/http/connection.h
Original file line number Diff line number Diff line change
Expand Up @@ -391,6 +391,13 @@ struct aws_http_client_connection_options {
* If connection is HTTP/2 and options were not specified, default values are used.
*/
const struct aws_http2_connection_options *http2_options;

/**
* Optional.
* Requests the channel/connection be bound to a specific event loop rather than chosen sequentially from the
* event loop group associated with the client bootstrap.
*/
struct aws_event_loop *requested_event_loop;
};

/* Predefined settings identifiers (RFC-7540 6.5.2) */
Expand Down
2 changes: 2 additions & 0 deletions include/aws/http/private/proxy_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,8 @@ struct aws_http_proxy_user_data {
aws_client_bootstrap_on_channel_event_fn *original_channel_on_shutdown;

struct aws_http_proxy_config *proxy_config;

struct aws_event_loop *requested_event_loop;
};

struct aws_http_proxy_system_vtable {
Expand Down
8 changes: 8 additions & 0 deletions include/aws/http/websocket.h
Original file line number Diff line number Diff line change
Expand Up @@ -248,6 +248,14 @@ struct aws_websocket_client_connection_options {
* reaches 0, no further data will be received.
*/
bool manual_window_management;

/**
* Optional
* If set, requests that a specific event loop be used to seat the connection, rather than the next one
* in the event loop group. Useful for serializing all io and external events related to a client onto
* a single thread.
*/
struct aws_event_loop *requested_event_loop;
};

/**
Expand Down
1 change: 1 addition & 0 deletions source/connection.c
Original file line number Diff line number Diff line change
Expand Up @@ -1092,6 +1092,7 @@ int aws_http_client_connect_internal(
.shutdown_callback = s_client_bootstrap_on_channel_shutdown,
.enable_read_back_pressure = options.manual_window_management,
.user_data = http_bootstrap,
.requested_event_loop = options.requested_event_loop,
};

err = s_system_vtable_ptr->new_socket_channel(&channel_options);
Expand Down
4 changes: 4 additions & 0 deletions source/proxy_connection.c
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,7 @@ struct aws_http_proxy_user_data *aws_http_proxy_user_data_new(
user_data->original_http_on_shutdown = options->on_shutdown;
user_data->original_channel_on_setup = on_channel_setup;
user_data->original_channel_on_shutdown = on_channel_shutdown;
user_data->requested_event_loop = options->requested_event_loop;

/* one and only one setup callback must be valid */
AWS_FATAL_ASSERT((user_data->original_http_on_setup == NULL) != (user_data->original_channel_on_setup == NULL));
Expand Down Expand Up @@ -984,6 +985,7 @@ static int s_aws_http_client_connect_via_forwarding_proxy(const struct aws_http_
options_copy.on_setup = s_aws_http_on_client_connection_http_forwarding_proxy_setup_fn;
options_copy.on_shutdown = s_aws_http_on_client_connection_http_proxy_shutdown_fn;
options_copy.tls_options = options->proxy_options->tls_options;
options_copy.requested_event_loop = options->requested_event_loop;

int result = aws_http_client_connect_internal(&options_copy, s_proxy_http_request_transform);
if (result == AWS_OP_ERR) {
Expand Down Expand Up @@ -1018,6 +1020,7 @@ static int s_create_tunneling_connection(struct aws_http_proxy_user_data *user_d
connect_options.on_shutdown = s_aws_http_on_client_connection_http_proxy_shutdown_fn;
connect_options.http1_options = NULL; /* ToDo */
connect_options.http2_options = NULL; /* ToDo */
connect_options.requested_event_loop = user_data->requested_event_loop;

int result = aws_http_client_connect(&connect_options);
if (result == AWS_OP_ERR) {
Expand Down Expand Up @@ -1565,6 +1568,7 @@ int aws_http_proxy_new_socket_channel(
http_connection_options.user_data = user_data;
http_connection_options.on_setup = NULL; /* use channel callbacks, not http callbacks */
http_connection_options.on_shutdown = NULL; /* use channel callbacks, not http callbacks */
http_connection_options.requested_event_loop = channel_options->requested_event_loop;

if (s_aws_http_client_connect_via_tunneling_proxy(
&http_connection_options, s_http_proxied_socket_channel_setup, s_http_proxied_socket_channel_shutdown)) {
Expand Down
1 change: 1 addition & 0 deletions source/websocket_bootstrap.c
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,7 @@ int aws_websocket_client_connect(const struct aws_websocket_client_connection_op
http_options.user_data = ws_bootstrap;
http_options.on_setup = s_ws_bootstrap_on_http_setup;
http_options.on_shutdown = s_ws_bootstrap_on_http_shutdown;
http_options.requested_event_loop = options->requested_event_loop;

/* Infer port, if not explicitly specified in URI */
http_options.port = options->port;
Expand Down
3 changes: 2 additions & 1 deletion tests/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -470,10 +470,11 @@ add_test_case(server_new_destroy)
add_test_case(connection_setup_shutdown)
add_test_case(connection_setup_shutdown_tls)
add_test_case(connection_setup_shutdown_proxy_setting_on_ev_not_found)
add_test_case(connection_setup_shutdown_pinned_event_loop)
add_test_case(connection_h2_prior_knowledge)
add_test_case(connection_h2_prior_knowledge_not_work_with_tls)
add_test_case(connection_customized_alpn)
add_test_case(connection_customized_alpn_error_with_unknow_return_string)
add_test_case(connection_customized_alpn_error_with_unknown_return_string)
# These server tests occasionally fail. Resurrect if/when we get back to work on HTTP server.
#add_test_case(connection_destroy_server_with_connection_existing)
#add_test_case(connection_destroy_server_with_multiple_connections_existing)
Expand Down
75 changes: 65 additions & 10 deletions tests/test_connection.c
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
#include <aws/common/clock.h>
#include <aws/common/condition_variable.h>
#include <aws/common/log_writer.h>
#include <aws/common/logging.h>
#include <aws/common/string.h>
#include <aws/common/thread.h>
#include <aws/common/uuid.h>
Expand Down Expand Up @@ -42,12 +43,14 @@ struct tester_options {
char *server_alpn_list;
char *client_alpn_list;
bool no_connection; /* don't connect server to client */
bool pin_event_loop;
};

/* Singleton used by tests in this file */
struct tester {
struct aws_allocator *alloc;
struct aws_event_loop_group *event_loop_group;
struct aws_event_loop_group *server_event_loop_group;
struct aws_event_loop_group *client_event_loop_group;
struct aws_host_resolver *host_resolver;
struct aws_server_bootstrap *server_bootstrap;
struct aws_http_server *server;
Expand Down Expand Up @@ -239,7 +242,7 @@ static void s_client_connection_options_init_tester(
struct aws_http_client_connection_options *client_options,
struct tester *tester) {
struct aws_client_bootstrap_options bootstrap_options = {
.event_loop_group = tester->event_loop_group,
.event_loop_group = tester->client_event_loop_group,
.host_resolver = tester->host_resolver,
};
tester->client_bootstrap = aws_client_bootstrap_new(tester->alloc, &bootstrap_options);
Expand Down Expand Up @@ -298,15 +301,37 @@ static int s_tester_init(struct tester *tester, const struct tester_options *opt
ASSERT_SUCCESS(aws_mutex_init(&tester->wait_lock));
ASSERT_SUCCESS(aws_condition_variable_init(&tester->wait_cvar));

tester->event_loop_group = aws_event_loop_group_new_default(tester->alloc, 1, NULL);
/*
* The current http testing framework has several issues that hinder testing event loop pinning:
* (1) Server shutdown can crash with memory corruption if the server uses an event loop group with more than one
* thread
* (2) s_tester_wait mixes results from both client and server and once you unlink them out of the same, single-
* threaded event loop, the test assumptions start breaking due to different serializations of io events.
*
* This leads to a self-defeating situation: in order to test event loop pinning we need event loop groups with
* many threads, but as soon as we use one, existing tests start breaking.
*
* Event loop pinning is a critical blocker for an upcoming release, so rather than trying to figure out the
* underlying race condition within the http testing framework (I suspect it's socket listener related), we
* instead add some complexity to the testing framework such that
* (1) Existing tests continue to use a single event loop group with one thread
* (2) The event loop pinning test uses two event loop groups, the server elg with a single thread and the
* client elg with many threads to actually test pinning.
*/
tester->server_event_loop_group = aws_event_loop_group_new_default(tester->alloc, 1, NULL);
if (options->pin_event_loop) {
tester->client_event_loop_group = aws_event_loop_group_new_default(tester->alloc, 16, NULL);
} else {
tester->client_event_loop_group = aws_event_loop_group_acquire(tester->server_event_loop_group);
}

struct aws_host_resolver_default_options resolver_options = {
.el_group = tester->event_loop_group,
.el_group = tester->client_event_loop_group,
.max_entries = 8,
};

tester->host_resolver = aws_host_resolver_new_default(tester->alloc, &resolver_options);
tester->server_bootstrap = aws_server_bootstrap_new(tester->alloc, tester->event_loop_group);
tester->server_bootstrap = aws_server_bootstrap_new(tester->alloc, tester->server_event_loop_group);
ASSERT_NOT_NULL(tester->server_bootstrap);

struct aws_socket_options socket_options = {
Expand Down Expand Up @@ -360,6 +385,11 @@ static int s_tester_init(struct tester *tester, const struct tester_options *opt
aws_byte_cursor_from_c_str("localhost")));
client_options.tls_options = &tester->client_tls_connection_options;
}

if (options->pin_event_loop) {
client_options.requested_event_loop = aws_event_loop_group_get_next_loop(tester->client_event_loop_group);
}

tester->client_options = client_options;

tester->server_connection_num = 0;
Expand Down Expand Up @@ -395,7 +425,8 @@ static int s_tester_clean_up(struct tester *tester) {
aws_server_bootstrap_release(tester->server_bootstrap);
aws_client_bootstrap_release(tester->client_bootstrap);
aws_host_resolver_release(tester->host_resolver);
aws_event_loop_group_release(tester->event_loop_group);
aws_event_loop_group_release(tester->client_event_loop_group);
aws_event_loop_group_release(tester->server_event_loop_group);

aws_http_library_clean_up();
aws_mutex_clean_up(&tester->wait_lock);
Expand Down Expand Up @@ -645,7 +676,7 @@ static int s_test_connection_customized_alpn(struct aws_allocator *allocator, vo
}
AWS_TEST_CASE(connection_customized_alpn, s_test_connection_customized_alpn);

static int s_test_connection_customized_alpn_error_with_unknow_return_string(
static int s_test_connection_customized_alpn_error_with_unknown_return_string(
struct aws_allocator *allocator,
void *ctx) {
(void)ctx;
Expand Down Expand Up @@ -702,8 +733,8 @@ static int s_test_connection_customized_alpn_error_with_unknow_return_string(
return AWS_OP_SUCCESS;
}
AWS_TEST_CASE(
connection_customized_alpn_error_with_unknow_return_string,
s_test_connection_customized_alpn_error_with_unknow_return_string);
connection_customized_alpn_error_with_unknown_return_string,
s_test_connection_customized_alpn_error_with_unknown_return_string);

static int s_test_connection_destroy_server_with_connection_existing(struct aws_allocator *allocator, void *ctx) {
(void)ctx;
Expand Down Expand Up @@ -856,7 +887,7 @@ static int s_test_connection_server_shutting_down_new_connection_setup_fail(

/* get the first eventloop of tester, which will be the eventloop for server listener socket, block the listener
* socket */
struct aws_event_loop *server_eventloop = aws_event_loop_group_get_loop_at(tester.event_loop_group, 0);
struct aws_event_loop *server_eventloop = aws_event_loop_group_get_loop_at(tester.server_event_loop_group, 0);
struct aws_task *server_block_task = aws_mem_acquire(allocator, sizeof(struct aws_task));
aws_task_init(server_block_task, s_block_task, &tester, "wait_a_bit");
aws_event_loop_schedule_task_now(server_eventloop, server_block_task);
Expand Down Expand Up @@ -916,3 +947,27 @@ static int s_test_connection_server_shutting_down_new_connection_setup_fail(
AWS_TEST_CASE(
connection_server_shutting_down_new_connection_setup_fail,
s_test_connection_server_shutting_down_new_connection_setup_fail);

static int s_test_connection_setup_shutdown_pinned_event_loop(struct aws_allocator *allocator, void *ctx) {
(void)ctx;
struct tester_options options = {
.alloc = allocator,
.pin_event_loop = true,
};
struct tester tester;
ASSERT_SUCCESS(s_tester_init(&tester, &options));

for (int i = 0; i < tester.client_connection_num; i++) {
struct aws_http_connection *connection = tester.client_connections[i];
ASSERT_PTR_EQUALS(
tester.client_options.requested_event_loop, aws_channel_get_event_loop(connection->channel_slot->channel));
}

release_all_client_connections(&tester);
release_all_server_connections(&tester);
ASSERT_SUCCESS(s_tester_wait(&tester, s_tester_connection_shutdown_pred));

ASSERT_SUCCESS(s_tester_clean_up(&tester));
return AWS_OP_SUCCESS;
}
AWS_TEST_CASE(connection_setup_shutdown_pinned_event_loop, s_test_connection_setup_shutdown_pinned_event_loop);

0 comments on commit dcbc111

Please sign in to comment.