Skip to content

Commit

Permalink
H2 monitor (#377)
Browse files Browse the repository at this point in the history
Add support of connection monitor for HTTP/2
  • Loading branch information
TingDaoK authored Jun 4, 2022
1 parent 1a3614f commit aadc57a
Show file tree
Hide file tree
Showing 9 changed files with 268 additions and 36 deletions.
9 changes: 9 additions & 0 deletions include/aws/http/private/h2_connection.h
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@

#include <aws/http/private/connection_impl.h>
#include <aws/http/private/h2_frames.h>
#include <aws/http/statistics.h>

struct aws_h2_decoder;
struct aws_h2_stream;
Expand Down Expand Up @@ -116,6 +117,14 @@ struct aws_h2_connection {
int channel_shutdown_error_code;
bool channel_shutdown_immediately;
bool channel_shutdown_waiting_for_goaway_to_be_written;

/* TODO: Consider adding stream monitor */
struct aws_crt_statistics_http2_channel stats;

/* Timestamp when connection has data to send, which is when there is an active stream with body to send */
uint64_t outgoing_timestamp_ns;
/* Timestamp when connection has data to receive, which is when there is an active stream */
uint64_t incoming_timestamp_ns;
} thread_data;

/* Any thread may touch this data, but the lock must be held (unless it's an atomic) */
Expand Down
22 changes: 22 additions & 0 deletions include/aws/http/statistics.h
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@

enum aws_crt_http_statistics_category {
AWSCRT_STAT_CAT_HTTP1_CHANNEL = AWS_CRT_STATISTICS_CATEGORY_BEGIN_RANGE(AWS_C_HTTP_PACKAGE_ID),
AWSCRT_STAT_CAT_HTTP2_CHANNEL,
};

/**
Expand All @@ -28,6 +29,16 @@ struct aws_crt_statistics_http1_channel {
uint32_t current_incoming_stream_id;
};

struct aws_crt_statistics_http2_channel {
aws_crt_statistics_category_t category;

uint64_t pending_outgoing_stream_ms;
uint64_t pending_incoming_stream_ms;

/* True if during the time of report, there has ever been no active streams on the connection */
bool was_inactive;
};

AWS_EXTERN_C_BEGIN

/**
Expand All @@ -48,6 +59,17 @@ void aws_crt_statistics_http1_channel_cleanup(struct aws_crt_statistics_http1_ch
AWS_HTTP_API
void aws_crt_statistics_http1_channel_reset(struct aws_crt_statistics_http1_channel *stats);

/**
* Initializes a HTTP/2 channel handler statistics struct
*/
AWS_HTTP_API
void aws_crt_statistics_http2_channel_init(struct aws_crt_statistics_http2_channel *stats);
/**
* Resets a HTTP/2 channel handler statistics struct's statistics
*/
AWS_HTTP_API
void aws_crt_statistics_http2_channel_reset(struct aws_crt_statistics_http2_channel *stats);

AWS_EXTERN_C_END

#endif /* AWS_HTTP_STATISTICS_H */
46 changes: 32 additions & 14 deletions source/connection_monitor.c
Original file line number Diff line number Diff line change
Expand Up @@ -32,13 +32,16 @@ static void s_process_statistics(
uint64_t pending_write_interval_ms = 0;
uint64_t bytes_read = 0;
uint64_t bytes_written = 0;
uint32_t current_outgoing_stream_id = 0;
uint32_t current_incoming_stream_id = 0;
uint32_t h1_current_outgoing_stream_id = 0;
uint32_t h1_current_incoming_stream_id = 0;

/*
* Pull out the data needed to perform the throughput calculation
*/
size_t stats_count = aws_array_list_length(stats_list);
bool h2 = false;
bool h2_was_inactive = false;

for (size_t i = 0; i < stats_count; ++i) {
struct aws_crt_statistics_base *stats_base = NULL;
if (aws_array_list_get_at(stats_list, &stats_base, i)) {
Expand All @@ -54,13 +57,24 @@ static void s_process_statistics(
}

case AWSCRT_STAT_CAT_HTTP1_CHANNEL: {
AWS_ASSERT(!h2);
struct aws_crt_statistics_http1_channel *http1_stats =
(struct aws_crt_statistics_http1_channel *)stats_base;
pending_read_interval_ms = http1_stats->pending_incoming_stream_ms;
pending_write_interval_ms = http1_stats->pending_outgoing_stream_ms;
current_outgoing_stream_id = http1_stats->current_outgoing_stream_id;
current_incoming_stream_id = http1_stats->current_incoming_stream_id;
h1_current_outgoing_stream_id = http1_stats->current_outgoing_stream_id;
h1_current_incoming_stream_id = http1_stats->current_incoming_stream_id;

break;
}

case AWSCRT_STAT_CAT_HTTP2_CHANNEL: {
struct aws_crt_statistics_http2_channel *h2_stats =
(struct aws_crt_statistics_http2_channel *)stats_base;
pending_read_interval_ms = h2_stats->pending_incoming_stream_ms;
pending_write_interval_ms = h2_stats->pending_outgoing_stream_ms;
h2_was_inactive |= h2_stats->was_inactive;
h2 = true;
break;
}

Expand Down Expand Up @@ -110,17 +124,21 @@ static void s_process_statistics(
bytes_per_second);

/*
* Check throughput only if at least one stream exists and was observed in that role previously
*
* ToDo: This logic only makes sense from an h1 perspective. A similar requirement could be placed on
* h2 stats by analyzing/tracking the min and max stream ids (per odd/even) at process timepoints.
* Check throughput only if the connection has active stream and no gap between.
*/
bool check_throughput =
(current_incoming_stream_id != 0 && current_incoming_stream_id == impl->last_incoming_stream_id) ||
(current_outgoing_stream_id != 0 && current_outgoing_stream_id == impl->last_outgoing_stream_id);

impl->last_outgoing_stream_id = current_outgoing_stream_id;
impl->last_incoming_stream_id = current_incoming_stream_id;
bool check_throughput = false;
if (h2) {
/* For HTTP/2, check throughput only if there always has any active stream on the connection */
check_throughput = !h2_was_inactive;
} else {
/* For HTTP/1, check throughput only if at least one stream exists and was observed in that role previously */
check_throughput =
(h1_current_incoming_stream_id != 0 && h1_current_incoming_stream_id == impl->last_incoming_stream_id) ||
(h1_current_outgoing_stream_id != 0 && h1_current_outgoing_stream_id == impl->last_outgoing_stream_id);

impl->last_outgoing_stream_id = h1_current_outgoing_stream_id;
impl->last_incoming_stream_id = h1_current_incoming_stream_id;
}
impl->last_measured_throughput = bytes_per_second;

if (!check_throughput) {
Expand Down
97 changes: 96 additions & 1 deletion source/h2_connection.c
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,8 @@ struct aws_h2err s_decoder_on_goaway(
uint32_t error_code,
struct aws_byte_cursor debug_data,
void *userdata);
static void s_reset_statistics(struct aws_channel_handler *handler);
static void s_gather_statistics(struct aws_channel_handler *handler, struct aws_array_list *stats);

static struct aws_http_connection_vtable s_h2_connection_vtable = {
.channel_handler_vtable =
Expand All @@ -157,6 +159,8 @@ static struct aws_http_connection_vtable s_h2_connection_vtable = {
.initial_window_size = s_handler_initial_window_size,
.message_overhead = s_handler_message_overhead,
.destroy = s_handler_destroy,
.reset_statistics = s_reset_statistics,
.gather_statistics = s_gather_statistics,
},

.on_channel_handler_installed = s_handler_installed,
Expand Down Expand Up @@ -219,6 +223,14 @@ static void s_release_stream_and_connection_lock(struct aws_h2_stream *stream, s
(void)err;
}

static void s_add_time_measurement_to_stats(uint64_t start_ns, uint64_t end_ns, uint64_t *output_ms) {
if (end_ns > start_ns) {
*output_ms += aws_timestamp_convert(end_ns - start_ns, AWS_TIMESTAMP_NANOS, AWS_TIMESTAMP_MILLIS, NULL);
} else {
*output_ms = 0;
}
}

/**
* Internal function for bringing connection to a stop.
* Invoked multiple times, including when:
Expand Down Expand Up @@ -373,6 +385,9 @@ static struct aws_h2_connection *s_connection_new(
connection->thread_data.goaway_received_last_stream_id = AWS_H2_STREAM_ID_MAX;
connection->thread_data.goaway_sent_last_stream_id = AWS_H2_STREAM_ID_MAX;

aws_crt_statistics_http2_channel_init(&connection->thread_data.stats);
connection->thread_data.stats.was_inactive = true; /* Start with non active streams */

connection->synced_data.is_open = true;
connection->synced_data.new_stream_error_code = AWS_ERROR_SUCCESS;

Expand Down Expand Up @@ -795,6 +810,9 @@ static int s_encode_data_from_outgoing_streams(struct aws_h2_connection *connect

AWS_PRECONDITION(aws_channel_thread_is_callers_thread(connection->base.channel_slot->channel));
struct aws_linked_list *outgoing_streams_list = &connection->thread_data.outgoing_streams_list;
if (aws_linked_list_empty(outgoing_streams_list)) {
return AWS_OP_SUCCESS;
}
struct aws_linked_list *stalled_window_streams_list = &connection->thread_data.stalled_window_streams_list;
struct aws_linked_list *waiting_streams_list = &connection->thread_data.waiting_streams_list;

Expand All @@ -816,7 +834,7 @@ static int s_encode_data_from_outgoing_streams(struct aws_h2_connection *connect
"Peer connection's flow-control window is too small now %zu. Connection will stop sending DATA until "
"WINDOW_UPDATE is received.",
connection->thread_data.window_size_peer);
break;
goto done;
}

/* Stop looping if message is so full it's not worth the bother */
Expand Down Expand Up @@ -885,6 +903,16 @@ static int s_encode_data_from_outgoing_streams(struct aws_h2_connection *connect
return aws_raise_error(aws_error_code);
}

if (aws_linked_list_empty(outgoing_streams_list)) {
/* transition from something to write -> nothing to write */
uint64_t now_ns = 0;
aws_channel_current_clock_time(connection->base.channel_slot->channel, &now_ns);
s_add_time_measurement_to_stats(
connection->thread_data.outgoing_timestamp_ns,
now_ns,
&connection->thread_data.stats.pending_outgoing_stream_ms);
}

return AWS_OP_SUCCESS;
}

Expand Down Expand Up @@ -1768,6 +1796,19 @@ static void s_stream_complete(struct aws_h2_connection *connection, struct aws_h
aws_linked_list_remove(&stream->node);
}

if (aws_hash_table_get_entry_count(&connection->thread_data.active_streams_map) == 0 &&
connection->thread_data.incoming_timestamp_ns != 0) {
uint64_t now_ns = 0;
aws_channel_current_clock_time(connection->base.channel_slot->channel, &now_ns);
/* transition from something to read -> nothing to read and nothing to write */
s_add_time_measurement_to_stats(
connection->thread_data.incoming_timestamp_ns,
now_ns,
&connection->thread_data.stats.pending_incoming_stream_ms);
connection->thread_data.stats.was_inactive = true;
connection->thread_data.incoming_timestamp_ns = 0;
}

aws_h2_stream_complete(stream, error_code);

/* release connection's hold on stream */
Expand Down Expand Up @@ -1867,6 +1908,14 @@ static void s_move_stream_to_thread(
if (aws_h2_stream_on_activated(stream, &body_state)) {
goto error;
}

if (aws_hash_table_get_entry_count(&connection->thread_data.active_streams_map) == 1) {
/* transition from nothing to read -> something to read */
uint64_t now_ns = 0;
aws_channel_current_clock_time(connection->base.channel_slot->channel, &now_ns);
connection->thread_data.incoming_timestamp_ns = now_ns;
}

switch (body_state) {
case AWS_H2_STREAM_BODY_STATE_WAITING_WRITES:
aws_linked_list_push_back(&connection->thread_data.waiting_streams_list, &stream->node);
Expand Down Expand Up @@ -2753,3 +2802,49 @@ static size_t s_handler_message_overhead(struct aws_channel_handler *handler) {
/* "All frames begin with a fixed 9-octet header followed by a variable-length payload" (RFC-7540 4.1) */
return 9;
}

static void s_reset_statistics(struct aws_channel_handler *handler) {
struct aws_h2_connection *connection = handler->impl;
aws_crt_statistics_http2_channel_reset(&connection->thread_data.stats);
if (aws_hash_table_get_entry_count(&connection->thread_data.active_streams_map) == 0) {
/* Check the current state */
connection->thread_data.stats.was_inactive = true;
}
return;
}

static void s_gather_statistics(struct aws_channel_handler *handler, struct aws_array_list *stats) {

struct aws_h2_connection *connection = handler->impl;
AWS_PRECONDITION(aws_channel_thread_is_callers_thread(connection->base.channel_slot->channel));

/* TODO: Need update the way we calculate statistics, to account for user-controlled pauses.
* If user is adding chunks 1 by 1, there can naturally be a gap in the upload.
* If the user lets the stream-window go to zero, there can naturally be a gap in the download. */
uint64_t now_ns = 0;
if (aws_channel_current_clock_time(connection->base.channel_slot->channel, &now_ns)) {
return;
}

if (!aws_linked_list_empty(&connection->thread_data.outgoing_streams_list)) {
s_add_time_measurement_to_stats(
connection->thread_data.outgoing_timestamp_ns,
now_ns,
&connection->thread_data.stats.pending_outgoing_stream_ms);

connection->thread_data.outgoing_timestamp_ns = now_ns;
}
if (aws_hash_table_get_entry_count(&connection->thread_data.active_streams_map) != 0) {
s_add_time_measurement_to_stats(
connection->thread_data.incoming_timestamp_ns,
now_ns,
&connection->thread_data.stats.pending_incoming_stream_ms);

connection->thread_data.incoming_timestamp_ns = now_ns;
} else {
connection->thread_data.stats.was_inactive = true;
}

void *stats_base = &connection->thread_data.stats;
aws_array_list_push_back(stats, &stats_base);
}
11 changes: 11 additions & 0 deletions source/statistics.c
Original file line number Diff line number Diff line change
Expand Up @@ -22,3 +22,14 @@ void aws_crt_statistics_http1_channel_reset(struct aws_crt_statistics_http1_chan
stats->current_outgoing_stream_id = 0;
stats->current_incoming_stream_id = 0;
}

void aws_crt_statistics_http2_channel_init(struct aws_crt_statistics_http2_channel *stats) {
AWS_ZERO_STRUCT(*stats);
stats->category = AWSCRT_STAT_CAT_HTTP2_CHANNEL;
}

void aws_crt_statistics_http2_channel_reset(struct aws_crt_statistics_http2_channel *stats) {
stats->pending_outgoing_stream_ms = 0;
stats->pending_incoming_stream_ms = 0;
stats->was_inactive = false;
}
2 changes: 1 addition & 1 deletion tests/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -616,14 +616,14 @@ add_net_test_case(h2_sm_mock_goaway)
# Tests against real world server
add_net_test_case(h2_sm_acquire_stream)
add_net_test_case(h2_sm_acquire_stream_multiple_connections)
add_net_test_case(h2_sm_acquire_stream_stress)
add_net_test_case(h2_sm_closing_before_connection_acquired)
# Tests against local server
if (ENABLE_LOCALHOST_INTEGRATION_TESTS)
# Tests should be named with localhost_integ_*
add_net_test_case(localhost_integ_h2_sm_prior_knowledge)
add_net_test_case(localhost_integ_h2_sm_acquire_stream_stress)
add_net_test_case(localhost_integ_h2_sm_acquire_stream_stress_with_body)
add_net_test_case(localhost_integ_h2_sm_connection_monitor_kill_slow_connection)
endif()

add_test_case(random_access_set_sanitize_test)
Expand Down
Loading

0 comments on commit aadc57a

Please sign in to comment.