From aadc57a3829cb057acdd6d57ed0991ff61a71192 Mon Sep 17 00:00:00 2001 From: Dengke Tang Date: Fri, 3 Jun 2022 17:53:11 -0700 Subject: [PATCH] H2 monitor (#377) Add support of connection monitor for HTTP/2 --- include/aws/http/private/h2_connection.h | 9 +++ include/aws/http/statistics.h | 22 ++++++ source/connection_monitor.c | 46 +++++++---- source/h2_connection.c | 97 +++++++++++++++++++++++- source/statistics.c | 11 +++ tests/CMakeLists.txt | 2 +- tests/py_localhost/server.py | 45 +++++++++++ tests/test_localhost_integ.c | 5 ++ tests/test_stream_manager.c | 67 +++++++++++----- 9 files changed, 268 insertions(+), 36 deletions(-) diff --git a/include/aws/http/private/h2_connection.h b/include/aws/http/private/h2_connection.h index 5be9eb81d..6d42b8316 100644 --- a/include/aws/http/private/h2_connection.h +++ b/include/aws/http/private/h2_connection.h @@ -13,6 +13,7 @@ #include #include +#include struct aws_h2_decoder; struct aws_h2_stream; @@ -116,6 +117,14 @@ struct aws_h2_connection { int channel_shutdown_error_code; bool channel_shutdown_immediately; bool channel_shutdown_waiting_for_goaway_to_be_written; + + /* TODO: Consider adding stream monitor */ + struct aws_crt_statistics_http2_channel stats; + + /* Timestamp when connection has data to send, which is when there is an active stream with body to send */ + uint64_t outgoing_timestamp_ns; + /* Timestamp when connection has data to receive, which is when there is an active stream */ + uint64_t incoming_timestamp_ns; } thread_data; /* Any thread may touch this data, but the lock must be held (unless it's an atomic) */ diff --git a/include/aws/http/statistics.h b/include/aws/http/statistics.h index bc50aa8c2..ecc8c2700 100644 --- a/include/aws/http/statistics.h +++ b/include/aws/http/statistics.h @@ -12,6 +12,7 @@ enum aws_crt_http_statistics_category { AWSCRT_STAT_CAT_HTTP1_CHANNEL = AWS_CRT_STATISTICS_CATEGORY_BEGIN_RANGE(AWS_C_HTTP_PACKAGE_ID), + AWSCRT_STAT_CAT_HTTP2_CHANNEL, }; /** @@ -28,6 +29,16 @@ struct aws_crt_statistics_http1_channel { uint32_t current_incoming_stream_id; }; +struct aws_crt_statistics_http2_channel { + aws_crt_statistics_category_t category; + + uint64_t pending_outgoing_stream_ms; + uint64_t pending_incoming_stream_ms; + + /* True if during the time of report, there has ever been no active streams on the connection */ + bool was_inactive; +}; + AWS_EXTERN_C_BEGIN /** @@ -48,6 +59,17 @@ void aws_crt_statistics_http1_channel_cleanup(struct aws_crt_statistics_http1_ch AWS_HTTP_API void aws_crt_statistics_http1_channel_reset(struct aws_crt_statistics_http1_channel *stats); +/** + * Initializes a HTTP/2 channel handler statistics struct + */ +AWS_HTTP_API +void aws_crt_statistics_http2_channel_init(struct aws_crt_statistics_http2_channel *stats); +/** + * Resets a HTTP/2 channel handler statistics struct's statistics + */ +AWS_HTTP_API +void aws_crt_statistics_http2_channel_reset(struct aws_crt_statistics_http2_channel *stats); + AWS_EXTERN_C_END #endif /* AWS_HTTP_STATISTICS_H */ diff --git a/source/connection_monitor.c b/source/connection_monitor.c index 9164d67fe..273232551 100644 --- a/source/connection_monitor.c +++ b/source/connection_monitor.c @@ -32,13 +32,16 @@ static void s_process_statistics( uint64_t pending_write_interval_ms = 0; uint64_t bytes_read = 0; uint64_t bytes_written = 0; - uint32_t current_outgoing_stream_id = 0; - uint32_t current_incoming_stream_id = 0; + uint32_t h1_current_outgoing_stream_id = 0; + uint32_t h1_current_incoming_stream_id = 0; /* * Pull out the data needed to perform the throughput calculation */ size_t stats_count = aws_array_list_length(stats_list); + bool h2 = false; + bool h2_was_inactive = false; + for (size_t i = 0; i < stats_count; ++i) { struct aws_crt_statistics_base *stats_base = NULL; if (aws_array_list_get_at(stats_list, &stats_base, i)) { @@ -54,13 +57,24 @@ static void s_process_statistics( } case AWSCRT_STAT_CAT_HTTP1_CHANNEL: { + AWS_ASSERT(!h2); struct aws_crt_statistics_http1_channel *http1_stats = (struct aws_crt_statistics_http1_channel *)stats_base; pending_read_interval_ms = http1_stats->pending_incoming_stream_ms; pending_write_interval_ms = http1_stats->pending_outgoing_stream_ms; - current_outgoing_stream_id = http1_stats->current_outgoing_stream_id; - current_incoming_stream_id = http1_stats->current_incoming_stream_id; + h1_current_outgoing_stream_id = http1_stats->current_outgoing_stream_id; + h1_current_incoming_stream_id = http1_stats->current_incoming_stream_id; + + break; + } + case AWSCRT_STAT_CAT_HTTP2_CHANNEL: { + struct aws_crt_statistics_http2_channel *h2_stats = + (struct aws_crt_statistics_http2_channel *)stats_base; + pending_read_interval_ms = h2_stats->pending_incoming_stream_ms; + pending_write_interval_ms = h2_stats->pending_outgoing_stream_ms; + h2_was_inactive |= h2_stats->was_inactive; + h2 = true; break; } @@ -110,17 +124,21 @@ static void s_process_statistics( bytes_per_second); /* - * Check throughput only if at least one stream exists and was observed in that role previously - * - * ToDo: This logic only makes sense from an h1 perspective. A similar requirement could be placed on - * h2 stats by analyzing/tracking the min and max stream ids (per odd/even) at process timepoints. + * Check throughput only if the connection has active stream and no gap between. */ - bool check_throughput = - (current_incoming_stream_id != 0 && current_incoming_stream_id == impl->last_incoming_stream_id) || - (current_outgoing_stream_id != 0 && current_outgoing_stream_id == impl->last_outgoing_stream_id); - - impl->last_outgoing_stream_id = current_outgoing_stream_id; - impl->last_incoming_stream_id = current_incoming_stream_id; + bool check_throughput = false; + if (h2) { + /* For HTTP/2, check throughput only if there always has any active stream on the connection */ + check_throughput = !h2_was_inactive; + } else { + /* For HTTP/1, check throughput only if at least one stream exists and was observed in that role previously */ + check_throughput = + (h1_current_incoming_stream_id != 0 && h1_current_incoming_stream_id == impl->last_incoming_stream_id) || + (h1_current_outgoing_stream_id != 0 && h1_current_outgoing_stream_id == impl->last_outgoing_stream_id); + + impl->last_outgoing_stream_id = h1_current_outgoing_stream_id; + impl->last_incoming_stream_id = h1_current_incoming_stream_id; + } impl->last_measured_throughput = bytes_per_second; if (!check_throughput) { diff --git a/source/h2_connection.c b/source/h2_connection.c index b50fdb2ec..b5a1c14d5 100644 --- a/source/h2_connection.c +++ b/source/h2_connection.c @@ -146,6 +146,8 @@ struct aws_h2err s_decoder_on_goaway( uint32_t error_code, struct aws_byte_cursor debug_data, void *userdata); +static void s_reset_statistics(struct aws_channel_handler *handler); +static void s_gather_statistics(struct aws_channel_handler *handler, struct aws_array_list *stats); static struct aws_http_connection_vtable s_h2_connection_vtable = { .channel_handler_vtable = @@ -157,6 +159,8 @@ static struct aws_http_connection_vtable s_h2_connection_vtable = { .initial_window_size = s_handler_initial_window_size, .message_overhead = s_handler_message_overhead, .destroy = s_handler_destroy, + .reset_statistics = s_reset_statistics, + .gather_statistics = s_gather_statistics, }, .on_channel_handler_installed = s_handler_installed, @@ -219,6 +223,14 @@ static void s_release_stream_and_connection_lock(struct aws_h2_stream *stream, s (void)err; } +static void s_add_time_measurement_to_stats(uint64_t start_ns, uint64_t end_ns, uint64_t *output_ms) { + if (end_ns > start_ns) { + *output_ms += aws_timestamp_convert(end_ns - start_ns, AWS_TIMESTAMP_NANOS, AWS_TIMESTAMP_MILLIS, NULL); + } else { + *output_ms = 0; + } +} + /** * Internal function for bringing connection to a stop. * Invoked multiple times, including when: @@ -373,6 +385,9 @@ static struct aws_h2_connection *s_connection_new( connection->thread_data.goaway_received_last_stream_id = AWS_H2_STREAM_ID_MAX; connection->thread_data.goaway_sent_last_stream_id = AWS_H2_STREAM_ID_MAX; + aws_crt_statistics_http2_channel_init(&connection->thread_data.stats); + connection->thread_data.stats.was_inactive = true; /* Start with non active streams */ + connection->synced_data.is_open = true; connection->synced_data.new_stream_error_code = AWS_ERROR_SUCCESS; @@ -795,6 +810,9 @@ static int s_encode_data_from_outgoing_streams(struct aws_h2_connection *connect AWS_PRECONDITION(aws_channel_thread_is_callers_thread(connection->base.channel_slot->channel)); struct aws_linked_list *outgoing_streams_list = &connection->thread_data.outgoing_streams_list; + if (aws_linked_list_empty(outgoing_streams_list)) { + return AWS_OP_SUCCESS; + } struct aws_linked_list *stalled_window_streams_list = &connection->thread_data.stalled_window_streams_list; struct aws_linked_list *waiting_streams_list = &connection->thread_data.waiting_streams_list; @@ -816,7 +834,7 @@ static int s_encode_data_from_outgoing_streams(struct aws_h2_connection *connect "Peer connection's flow-control window is too small now %zu. Connection will stop sending DATA until " "WINDOW_UPDATE is received.", connection->thread_data.window_size_peer); - break; + goto done; } /* Stop looping if message is so full it's not worth the bother */ @@ -885,6 +903,16 @@ static int s_encode_data_from_outgoing_streams(struct aws_h2_connection *connect return aws_raise_error(aws_error_code); } + if (aws_linked_list_empty(outgoing_streams_list)) { + /* transition from something to write -> nothing to write */ + uint64_t now_ns = 0; + aws_channel_current_clock_time(connection->base.channel_slot->channel, &now_ns); + s_add_time_measurement_to_stats( + connection->thread_data.outgoing_timestamp_ns, + now_ns, + &connection->thread_data.stats.pending_outgoing_stream_ms); + } + return AWS_OP_SUCCESS; } @@ -1768,6 +1796,19 @@ static void s_stream_complete(struct aws_h2_connection *connection, struct aws_h aws_linked_list_remove(&stream->node); } + if (aws_hash_table_get_entry_count(&connection->thread_data.active_streams_map) == 0 && + connection->thread_data.incoming_timestamp_ns != 0) { + uint64_t now_ns = 0; + aws_channel_current_clock_time(connection->base.channel_slot->channel, &now_ns); + /* transition from something to read -> nothing to read and nothing to write */ + s_add_time_measurement_to_stats( + connection->thread_data.incoming_timestamp_ns, + now_ns, + &connection->thread_data.stats.pending_incoming_stream_ms); + connection->thread_data.stats.was_inactive = true; + connection->thread_data.incoming_timestamp_ns = 0; + } + aws_h2_stream_complete(stream, error_code); /* release connection's hold on stream */ @@ -1867,6 +1908,14 @@ static void s_move_stream_to_thread( if (aws_h2_stream_on_activated(stream, &body_state)) { goto error; } + + if (aws_hash_table_get_entry_count(&connection->thread_data.active_streams_map) == 1) { + /* transition from nothing to read -> something to read */ + uint64_t now_ns = 0; + aws_channel_current_clock_time(connection->base.channel_slot->channel, &now_ns); + connection->thread_data.incoming_timestamp_ns = now_ns; + } + switch (body_state) { case AWS_H2_STREAM_BODY_STATE_WAITING_WRITES: aws_linked_list_push_back(&connection->thread_data.waiting_streams_list, &stream->node); @@ -2753,3 +2802,49 @@ static size_t s_handler_message_overhead(struct aws_channel_handler *handler) { /* "All frames begin with a fixed 9-octet header followed by a variable-length payload" (RFC-7540 4.1) */ return 9; } + +static void s_reset_statistics(struct aws_channel_handler *handler) { + struct aws_h2_connection *connection = handler->impl; + aws_crt_statistics_http2_channel_reset(&connection->thread_data.stats); + if (aws_hash_table_get_entry_count(&connection->thread_data.active_streams_map) == 0) { + /* Check the current state */ + connection->thread_data.stats.was_inactive = true; + } + return; +} + +static void s_gather_statistics(struct aws_channel_handler *handler, struct aws_array_list *stats) { + + struct aws_h2_connection *connection = handler->impl; + AWS_PRECONDITION(aws_channel_thread_is_callers_thread(connection->base.channel_slot->channel)); + + /* TODO: Need update the way we calculate statistics, to account for user-controlled pauses. + * If user is adding chunks 1 by 1, there can naturally be a gap in the upload. + * If the user lets the stream-window go to zero, there can naturally be a gap in the download. */ + uint64_t now_ns = 0; + if (aws_channel_current_clock_time(connection->base.channel_slot->channel, &now_ns)) { + return; + } + + if (!aws_linked_list_empty(&connection->thread_data.outgoing_streams_list)) { + s_add_time_measurement_to_stats( + connection->thread_data.outgoing_timestamp_ns, + now_ns, + &connection->thread_data.stats.pending_outgoing_stream_ms); + + connection->thread_data.outgoing_timestamp_ns = now_ns; + } + if (aws_hash_table_get_entry_count(&connection->thread_data.active_streams_map) != 0) { + s_add_time_measurement_to_stats( + connection->thread_data.incoming_timestamp_ns, + now_ns, + &connection->thread_data.stats.pending_incoming_stream_ms); + + connection->thread_data.incoming_timestamp_ns = now_ns; + } else { + connection->thread_data.stats.was_inactive = true; + } + + void *stats_base = &connection->thread_data.stats; + aws_array_list_push_back(stats, &stats_base); +} diff --git a/source/statistics.c b/source/statistics.c index 0e3650799..ea4e65c1d 100644 --- a/source/statistics.c +++ b/source/statistics.c @@ -22,3 +22,14 @@ void aws_crt_statistics_http1_channel_reset(struct aws_crt_statistics_http1_chan stats->current_outgoing_stream_id = 0; stats->current_incoming_stream_id = 0; } + +void aws_crt_statistics_http2_channel_init(struct aws_crt_statistics_http2_channel *stats) { + AWS_ZERO_STRUCT(*stats); + stats->category = AWSCRT_STAT_CAT_HTTP2_CHANNEL; +} + +void aws_crt_statistics_http2_channel_reset(struct aws_crt_statistics_http2_channel *stats) { + stats->pending_outgoing_stream_ms = 0; + stats->pending_incoming_stream_ms = 0; + stats->was_inactive = false; +} diff --git a/tests/CMakeLists.txt b/tests/CMakeLists.txt index 2096b77a7..33d0bd4bc 100644 --- a/tests/CMakeLists.txt +++ b/tests/CMakeLists.txt @@ -616,7 +616,6 @@ add_net_test_case(h2_sm_mock_goaway) # Tests against real world server add_net_test_case(h2_sm_acquire_stream) add_net_test_case(h2_sm_acquire_stream_multiple_connections) -add_net_test_case(h2_sm_acquire_stream_stress) add_net_test_case(h2_sm_closing_before_connection_acquired) # Tests against local server if (ENABLE_LOCALHOST_INTEGRATION_TESTS) @@ -624,6 +623,7 @@ if (ENABLE_LOCALHOST_INTEGRATION_TESTS) add_net_test_case(localhost_integ_h2_sm_prior_knowledge) add_net_test_case(localhost_integ_h2_sm_acquire_stream_stress) add_net_test_case(localhost_integ_h2_sm_acquire_stream_stress_with_body) + add_net_test_case(localhost_integ_h2_sm_connection_monitor_kill_slow_connection) endif() add_test_case(random_access_set_sanitize_test) diff --git a/tests/py_localhost/server.py b/tests/py_localhost/server.py index f1e9ed680..b45706e05 100644 --- a/tests/py_localhost/server.py +++ b/tests/py_localhost/server.py @@ -16,6 +16,7 @@ import io import json import ssl +import time import os import collections from typing import List, Tuple @@ -45,6 +46,7 @@ def __init__(self): self.num_sentence_received = {} self.raw_headers = None self.download_test_length = 2500000000 + self.out_bytes_per_second = 900 def connection_made(self, transport: asyncio.Transport): self.transport = transport @@ -133,6 +135,12 @@ def stream_complete(self, stream_id: int): self.conn.send_headers( stream_id, [(':status', '200'), ('content-length', str(length))]) asyncio.ensure_future(self.send_repeat_data(length, stream_id)) + elif path == '/slowConnTest': + length = self.download_test_length/1000 + self.conn.send_headers( + stream_id, [(':status', '200'), ('content-length', str(length))]) + asyncio.ensure_future( + self.send_slow_repeat_data(length, stream_id)) else: self.conn.send_headers(stream_id, [(':status', '404')]) asyncio.ensure_future(self.send_data(b"Not Found", stream_id)) @@ -237,6 +245,43 @@ async def send_repeat_data(self, length, stream_id): self.transport.write(self.conn.data_to_send()) length = length - chunk_size + async def send_slow_repeat_data(self, length, stream_id): + """ + Send data with length slowly (less than 1000 bytes per second) + """ + while length > 0: + while self.conn.local_flow_control_window(stream_id) < 1: + try: + await self.wait_for_flow_control(stream_id) + except asyncio.CancelledError: + return + + chunk_size = min( + self.conn.local_flow_control_window(stream_id), + length, + self.conn.max_outbound_frame_size, + self.out_bytes_per_second + ) + repeated = b"This is CRT HTTP test." + data = int(chunk_size/len(repeated)) * repeated + \ + repeated[:chunk_size % len(repeated)] + + try: + # Sleep for a sec to make the out bytes per second slower than the expected + time.sleep(1) + self.conn.send_data( + stream_id, + data, + end_stream=(chunk_size == length) + ) + except (StreamClosedError, ProtocolError): + # The stream got closed and we didn't get told. We're done + # here. + break + + self.transport.write(self.conn.data_to_send()) + length = length - chunk_size + async def wait_for_flow_control(self, stream_id): """ Waits for a Future that fires when the flow control window is opened. diff --git a/tests/test_localhost_integ.c b/tests/test_localhost_integ.c index a968bcde1..9797db4d9 100644 --- a/tests/test_localhost_integ.c +++ b/tests/test_localhost_integ.c @@ -240,6 +240,10 @@ static int s_tester_init(struct tester *tester, struct aws_allocator *allocator, .keepalive = false, .keep_alive_interval_sec = 0, }; + struct aws_http_connection_monitoring_options monitor_opt = { + .allowable_throughput_failure_interval_seconds = 1, + .minimum_throughput_bytes_per_second = 1000, + }; struct aws_http_client_connection_options client_options = { .self_size = sizeof(struct aws_http_client_connection_options), .allocator = allocator, @@ -251,6 +255,7 @@ static int s_tester_init(struct tester *tester, struct aws_allocator *allocator, .tls_options = &tester->tls_connection_options, .on_setup = s_on_connection_setup, .on_shutdown = s_on_connection_shutdown, + .monitoring_options = &monitor_opt, }; ASSERT_SUCCESS(aws_http_client_connect(&client_options)); struct aws_logger_standard_options logger_options = { diff --git a/tests/test_stream_manager.c b/tests/test_stream_manager.c index 8303d8a8c..1354a3177 100644 --- a/tests/test_stream_manager.c +++ b/tests/test_stream_manager.c @@ -17,6 +17,7 @@ #include #include #include +#include #include #include @@ -43,9 +44,13 @@ struct sm_tester_options { size_t ideal_concurrent_streams_per_connection; size_t max_concurrent_streams_per_connection; + const struct aws_http_connection_monitoring_options *monitor_opt; + struct aws_byte_cursor *uri_cursor; + const enum aws_log_level *log_level; }; +static struct aws_logger s_logger; struct sm_tester { struct aws_allocator *allocator; struct aws_event_loop_group *event_loop_group; @@ -187,6 +192,14 @@ static int s_tester_init(struct sm_tester_options *options) { s_tester.allocator = alloc; + struct aws_logger_standard_options logger_options = { + .level = options->log_level ? *options->log_level : AWS_LOG_LEVEL_TRACE, + .file = stderr, + }; + + aws_logger_init_standard(&s_logger, alloc, &logger_options); + aws_logger_set(&s_logger); + ASSERT_SUCCESS(aws_mutex_init(&s_tester.lock)); ASSERT_SUCCESS(aws_condition_variable_init(&s_tester.signal)); @@ -264,6 +277,7 @@ static int s_tester_init(struct sm_tester_options *options) { .max_connections = options->max_connections, .shutdown_complete_user_data = &s_tester, .shutdown_complete_callback = s_sm_tester_on_sm_shutdown_complete, + .monitoring_options = options->monitor_opt, }; s_tester.stream_manager = aws_http2_stream_manager_new(alloc, &sm_options); @@ -404,6 +418,7 @@ static int s_tester_clean_up(void) { aws_condition_variable_clean_up(&s_tester.signal); aws_array_list_clean_up(&s_tester.streams); aws_uri_clean_up(&s_tester.endpoint); + aws_logger_clean_up(&s_logger); return AWS_OP_SUCCESS; } @@ -1112,26 +1127,6 @@ TEST_CASE(h2_sm_acquire_stream_multiple_connections) { return s_tester_clean_up(); } -/* Test that makes tons of real streams */ -TEST_CASE(h2_sm_acquire_stream_stress) { - (void)ctx; - struct sm_tester_options options = { - .max_connections = 100, - .max_concurrent_streams_per_connection = 100, - .alloc = allocator, - }; - ASSERT_SUCCESS(s_tester_init(&options)); - int num_to_acquire = 200 * 100; - /* Because of network and things, we may fail some acquisition. Let's expect 99% success */ - int expected_success = 198 * 100; - ASSERT_SUCCESS(s_sm_stream_acquiring(num_to_acquire)); - ASSERT_SUCCESS(s_wait_on_streams_completed_count(num_to_acquire)); - ASSERT_TRUE((int)s_tester.acquiring_stream_errors < (num_to_acquire - expected_success)); - ASSERT_TRUE((int)s_tester.stream_200_count > expected_success); - - return s_tester_clean_up(); -} - static void s_sm_tester_on_connection_setup(struct aws_http_connection *connection, int error_code, void *user_data) { if (s_tester.release_sm_during_connection_acquiring) { aws_http2_stream_manager_release(s_tester.stream_manager); @@ -1194,11 +1189,18 @@ TEST_CASE(localhost_integ_h2_sm_prior_knowledge) { TEST_CASE(localhost_integ_h2_sm_acquire_stream_stress) { (void)ctx; struct aws_byte_cursor uri_cursor = aws_byte_cursor_from_c_str("https://localhost:8443/echo"); + struct aws_http_connection_monitoring_options monitor_opt = { + .allowable_throughput_failure_interval_seconds = 1, + .minimum_throughput_bytes_per_second = 1000, + }; + enum aws_log_level log_level = AWS_LOG_LEVEL_DEBUG; struct sm_tester_options options = { .max_connections = 100, .max_concurrent_streams_per_connection = 100, .alloc = allocator, .uri_cursor = &uri_cursor, + .monitor_opt = &monitor_opt, + .log_level = &log_level, }; ASSERT_SUCCESS(s_tester_init(&options)); int num_to_acquire = 500 * 100; @@ -1300,3 +1302,28 @@ TEST_CASE(localhost_integ_h2_sm_acquire_stream_stress_with_body) { return s_tester_clean_up(); } + +/* Test that connection monitor works properly with HTTP/2 stream manager */ +TEST_CASE(localhost_integ_h2_sm_connection_monitor_kill_slow_connection) { + (void)ctx; + struct aws_byte_cursor uri_cursor = aws_byte_cursor_from_c_str("https://localhost:8443/slowConnTest"); + struct aws_http_connection_monitoring_options monitor_opt = { + .allowable_throughput_failure_interval_seconds = 1, + .minimum_throughput_bytes_per_second = 1000, + }; + struct sm_tester_options options = { + .max_connections = 100, + .max_concurrent_streams_per_connection = 100, + .alloc = allocator, + .uri_cursor = &uri_cursor, + .monitor_opt = &monitor_opt, + }; + ASSERT_SUCCESS(s_tester_init(&options)); + + ASSERT_SUCCESS(s_sm_stream_acquiring(1)); + ASSERT_SUCCESS(s_wait_on_streams_completed_count(1)); + /* Check the connection closed by connection monitor and the stream should completed with corresponding error */ + ASSERT_UINT_EQUALS(s_tester.stream_completed_error_code, AWS_ERROR_HTTP_CONNECTION_CLOSED); + + return s_tester_clean_up(); +}