Skip to content

Commit

Permalink
Sizet stats (#231)
Browse files Browse the repository at this point in the history
* Convert statistics to atomics, remove lock
* Another flaky test
  • Loading branch information
bretambrose authored Nov 25, 2022
1 parent 0d85286 commit cde0625
Show file tree
Hide file tree
Showing 4 changed files with 69 additions and 36 deletions.
41 changes: 31 additions & 10 deletions include/aws/mqtt/private/v5/mqtt5_client_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -302,6 +302,35 @@ struct aws_mqtt5_client_flow_control_state {
struct aws_rate_limiter_token_bucket publish_throttle;
};

/**
* Contains some simple statistics about the current state of the client's queue of operations
*/
struct aws_mqtt5_client_operation_statistics_impl {
/*
* total number of operations submitted to the client that have not yet been completed. Unacked operations
* are a subset of this.
*/
struct aws_atomic_var incomplete_operation_count_atomic;

/*
* total packet size of operations submitted to the client that have not yet been completed. Unacked operations
* are a subset of this.
*/
struct aws_atomic_var incomplete_operation_size_atomic;

/*
* total number of operations that have been sent to the server and are waiting for a corresponding ACK before
* they can be completed.
*/
struct aws_atomic_var unacked_operation_count_atomic;

/*
* total packet size of operations that have been sent to the server and are waiting for a corresponding ACK before
* they can be completed.
*/
struct aws_atomic_var unacked_operation_size_atomic;
};

struct aws_mqtt5_client {

struct aws_allocator *allocator;
Expand Down Expand Up @@ -389,7 +418,7 @@ struct aws_mqtt5_client {
*
* clean_disconnect_error_code - the CLEAN_DISCONNECT state takes time to complete and we want to be able
* to pass an error code from a prior event to the channel shutdown. This holds the "override" error code
* that we'd like to shutdown the channel with while CLEAN_DISCONNECT is processed.
* that we'd like to shut down the channel with while CLEAN_DISCONNECT is processed.
*
* handshake exists on websocket-configured clients between the transform completion timepoint and the
* websocket setup callback.
Expand All @@ -402,16 +431,8 @@ struct aws_mqtt5_client {
*/
struct aws_mqtt5_client_operational_state operational_state;

/*
* TODO: topic alias mappings, from-server and to-server have independent mappings
*
* From-server requires a single table
* To-server requires both a table and a list (for LRU)
*/

/* Statistics tracking operational state */
struct aws_mutex operation_statistics_lock;
struct aws_mqtt5_client_operation_statistics operation_statistics;
struct aws_mqtt5_client_operation_statistics_impl operation_statistics_impl;

/*
* Wraps all state related to outbound flow control.
Expand Down
47 changes: 25 additions & 22 deletions source/v5/mqtt5_client.c
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,13 @@ static bool s_aws_mqtt5_operation_is_retainable(struct aws_mqtt5_operation *oper
}
}

static void s_init_statistics(struct aws_mqtt5_client_operation_statistics_impl *stats) {
aws_atomic_store_int(&stats->incomplete_operation_count_atomic, 0);
aws_atomic_store_int(&stats->incomplete_operation_size_atomic, 0);
aws_atomic_store_int(&stats->unacked_operation_count_atomic, 0);
aws_atomic_store_int(&stats->unacked_operation_size_atomic, 0);
}

static bool s_aws_mqtt5_operation_satisfies_offline_queue_retention_policy(
struct aws_mqtt5_operation *operation,
enum aws_mqtt5_client_operation_queue_behavior_type queue_behavior) {
Expand Down Expand Up @@ -291,8 +298,6 @@ static void s_mqtt5_client_final_destroy(struct aws_mqtt5_client *client) {
aws_mqtt5_inbound_topic_alias_resolver_clean_up(&client->inbound_topic_alias_resolver);
aws_mqtt5_outbound_topic_alias_resolver_destroy(client->outbound_topic_alias_resolver);

aws_mutex_clean_up(&client->operation_statistics_lock);

aws_mem_release(client->allocator, client);

if (client_termination_handler != NULL) {
Expand Down Expand Up @@ -2071,7 +2076,7 @@ struct aws_mqtt5_client *aws_mqtt5_client_new(

aws_mqtt5_client_options_storage_log(client->config, AWS_LL_DEBUG);

aws_mutex_init(&client->operation_statistics_lock);
s_init_statistics(&client->operation_statistics_impl);

return client;

Expand Down Expand Up @@ -3262,35 +3267,28 @@ void aws_mqtt5_client_statistics_change_operation_statistic_state(
return;
}

aws_mutex_lock(&client->operation_statistics_lock);
struct aws_mqtt5_client_operation_statistics *stats = &client->operation_statistics;
struct aws_mqtt5_client_operation_statistics_impl *stats = &client->operation_statistics_impl;

if ((old_state_flags & AWS_MQTT5_OSS_INCOMPLETE) != (new_state_flags & AWS_MQTT5_OSS_INCOMPLETE)) {
if ((new_state_flags & AWS_MQTT5_OSS_INCOMPLETE) != 0) {
++stats->incomplete_operation_count;
stats->incomplete_operation_size += packet_size;
aws_atomic_fetch_add(&stats->incomplete_operation_count_atomic, 1);
aws_atomic_fetch_add(&stats->incomplete_operation_size_atomic, (size_t)packet_size);
} else {
AWS_FATAL_ASSERT(stats->incomplete_operation_count > 0 && stats->incomplete_operation_size >= packet_size);

--stats->incomplete_operation_count;
stats->incomplete_operation_size -= packet_size;
aws_atomic_fetch_sub(&stats->incomplete_operation_count_atomic, 1);
aws_atomic_fetch_sub(&stats->incomplete_operation_size_atomic, (size_t)packet_size);
}
}

if ((old_state_flags & AWS_MQTT5_OSS_UNACKED) != (new_state_flags & AWS_MQTT5_OSS_UNACKED)) {
if ((new_state_flags & AWS_MQTT5_OSS_UNACKED) != 0) {
++stats->unacked_operation_count;
stats->unacked_operation_size += packet_size;
aws_atomic_fetch_add(&stats->unacked_operation_count_atomic, 1);
aws_atomic_fetch_add(&stats->unacked_operation_size_atomic, (size_t)packet_size);
} else {
AWS_FATAL_ASSERT(stats->unacked_operation_count > 0 && stats->unacked_operation_size >= packet_size);

--stats->unacked_operation_count;
stats->unacked_operation_size -= packet_size;
aws_atomic_fetch_sub(&stats->unacked_operation_count_atomic, 1);
aws_atomic_fetch_sub(&stats->unacked_operation_size_atomic, (size_t)packet_size);
}
}

aws_mutex_unlock(&client->operation_statistics_lock);

operation->statistic_state_flags = new_state_flags;

if (client->vtable != NULL && client->vtable->on_client_statistics_changed_callback_fn != NULL) {
Expand All @@ -3300,7 +3298,12 @@ void aws_mqtt5_client_statistics_change_operation_statistic_state(
}

void aws_mqtt5_client_get_stats(struct aws_mqtt5_client *client, struct aws_mqtt5_client_operation_statistics *stats) {
aws_mutex_lock(&client->operation_statistics_lock);
*stats = client->operation_statistics;
aws_mutex_unlock(&client->operation_statistics_lock);
stats->incomplete_operation_count =
(uint64_t)aws_atomic_load_int(&client->operation_statistics_impl.incomplete_operation_count_atomic);
stats->incomplete_operation_size =
(uint64_t)aws_atomic_load_int(&client->operation_statistics_impl.incomplete_operation_size_atomic);
stats->unacked_operation_count =
(uint64_t)aws_atomic_load_int(&client->operation_statistics_impl.unacked_operation_count_atomic);
stats->unacked_operation_size =
(uint64_t)aws_atomic_load_int(&client->operation_statistics_impl.unacked_operation_size_atomic);
}
13 changes: 13 additions & 0 deletions tests/v5/mqtt5_client_tests.c
Original file line number Diff line number Diff line change
Expand Up @@ -1105,6 +1105,19 @@ static int s_mqtt5_client_ping_sequence_fn(struct aws_allocator *allocator, void
s_wait_for_connected_lifecycle_event(&test_context);
s_wait_for_n_pingreqs(&ping_context);

/*
* There's a really unpleasant race condition where we can stop the client so fast (based on the mock
* server receiving PINGREQs that the mock server's socket gets closed underneath it as it is trying to
* write the PINGRESP back to the client, which in turn triggers channel shutdown where no further data
* is read from the socket, so we never see the DISCONNECT that the client actually sent.
*
* We're not able to wait on the PINGRESP because we have no insight into when it's received. So for now,
* we'll insert an artificial sleep before stopping the client. We should try and come up with a more
* elegant solution.
*/

aws_thread_current_sleep(aws_timestamp_convert(1, AWS_TIMESTAMP_SECS, AWS_TIMESTAMP_NANOS, NULL));

struct aws_mqtt5_packet_disconnect_view disconnect_view = {
.reason_code = AWS_MQTT5_DRC_NORMAL_DISCONNECTION,
};
Expand Down
4 changes: 0 additions & 4 deletions tests/v5/mqtt5_operation_and_storage_tests.c
Original file line number Diff line number Diff line change
Expand Up @@ -2076,8 +2076,6 @@ static void s_aws_mqtt5_operation_processing_test_context_init(
/* this keeps operation processing tests from crashing when dereferencing config options */
test_context->dummy_client.config = &test_context->dummy_client_options;

aws_mutex_init(&test_context->dummy_client.operation_statistics_lock);

aws_array_list_init_dynamic(&test_context->output_io_messages, allocator, 0, sizeof(struct aws_io_message *));

struct aws_mqtt5_encoder_options verification_encoder_options = {
Expand Down Expand Up @@ -2107,8 +2105,6 @@ static void s_aws_mqtt5_operation_processing_test_context_clean_up(
aws_mqtt5_encoder_clean_up(&test_context->dummy_client.encoder);
aws_mqtt5_client_operational_state_clean_up(&test_context->dummy_client.operational_state);

aws_mutex_clean_up(&test_context->dummy_client.operation_statistics_lock);

aws_array_list_clean_up(&test_context->completed_operation_error_codes);
}

Expand Down

0 comments on commit cde0625

Please sign in to comment.