Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Mqtt5 operation timeout #333

Merged
merged 7 commits into from
Nov 7, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions include/aws/mqtt/private/v5/mqtt5_client_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -258,6 +258,12 @@ struct aws_mqtt5_client_operational_state {
struct aws_linked_list unacked_operations;
struct aws_linked_list write_completion_operations;

/*
* heap of operation pointers where the timeout is the sort value. Elements are added/removed from this
* data structure in exact synchronization with unacked_operations_table.
*/
struct aws_priority_queue operations_by_ack_timeout;

/*
* Is there an io message in transit (to the socket) that has not invoked its write completion callback yet?
* The client implementation only allows one in-transit message at a time, and so if this is true, we don't
Expand Down
7 changes: 6 additions & 1 deletion include/aws/mqtt/private/v5/mqtt5_options_storage.h
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@ struct aws_mqtt5_operation_vtable {
int (*aws_mqtt5_operation_validate_vs_connection_settings_fn)(
const void *operation_packet_view,
const struct aws_mqtt5_client *client);

uint32_t (*aws_mqtt5_operation_get_ack_timeout_override_fn)(const struct aws_mqtt5_operation *operation);
};

/* Flags that indicate the way in which an operation is currently affecting the statistics of the client */
Expand All @@ -64,6 +66,7 @@ struct aws_mqtt5_operation {
const struct aws_mqtt5_operation_vtable *vtable;
struct aws_ref_count ref_count;
uint64_t ack_timeout_timepoint_ns;
struct aws_priority_queue_node priority_queue_node;
struct aws_linked_list_node node;

enum aws_mqtt5_packet_type packet_type;
Expand Down Expand Up @@ -163,7 +166,7 @@ struct aws_mqtt5_client_options_storage {
uint64_t max_reconnect_delay_ms;
uint64_t min_connected_time_to_reset_reconnect_delay_ms;

uint64_t ack_timeout_seconds;
uint32_t ack_timeout_seconds;

uint32_t ping_timeout_ms;
uint32_t connack_timeout_ms;
Expand Down Expand Up @@ -208,6 +211,8 @@ AWS_MQTT_API int aws_mqtt5_operation_validate_vs_connection_settings(
const struct aws_mqtt5_operation *operation,
const struct aws_mqtt5_client *client);

AWS_MQTT_API uint32_t aws_mqtt5_operation_get_ack_timeout_override(const struct aws_mqtt5_operation *operation);

/* Connect */

AWS_MQTT_API struct aws_mqtt5_operation_connect *aws_mqtt5_operation_connect_new(
Expand Down
14 changes: 10 additions & 4 deletions include/aws/mqtt/v5/mqtt5_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -344,31 +344,37 @@ typedef void(aws_mqtt5_client_termination_completion_fn)(void *complete_ctx);
/* operation completion options structures */

/**
* Completion callback options for the Publish operation
* Completion options for the Publish operation
*/
struct aws_mqtt5_publish_completion_options {
aws_mqtt5_publish_completion_fn *completion_callback;
void *completion_user_data;

uint32_t ack_timeout_seconds_override;
};

/**
* Completion callback options for the Subscribe operation
* Completion options for the Subscribe operation
*/
struct aws_mqtt5_subscribe_completion_options {
aws_mqtt5_subscribe_completion_fn *completion_callback;
void *completion_user_data;

uint32_t ack_timeout_seconds_override;
};

/**
* Completion callback options for the Unsubscribe operation
* Completion options for the Unsubscribe operation
*/
struct aws_mqtt5_unsubscribe_completion_options {
aws_mqtt5_unsubscribe_completion_fn *completion_callback;
void *completion_user_data;

uint32_t ack_timeout_seconds_override;
};

/**
* Public completion callback options for the a DISCONNECT operation
* Completion options for the a DISCONNECT operation
*/
struct aws_mqtt5_disconnect_completion_options {
aws_mqtt5_disconnect_completion_fn *completion_callback;
Expand Down
172 changes: 116 additions & 56 deletions source/v5/mqtt5_client.c
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,13 @@ static void s_complete_operation(
const void *view) {
if (client != NULL) {
aws_mqtt5_client_statistics_change_operation_statistic_state(client, operation, AWS_MQTT5_OSS_NONE);
if (aws_priority_queue_node_is_in_queue(&operation->priority_queue_node)) {
struct aws_mqtt5_operation *queued_operation = NULL;
aws_priority_queue_remove(
&client->operational_state.operations_by_ack_timeout,
&queued_operation,
&operation->priority_queue_node);
}
}

aws_mqtt5_operation_complete(operation, error_code, packet_type, view);
Expand All @@ -197,43 +204,47 @@ static void s_complete_operation_list(
}

static void s_check_timeouts(struct aws_mqtt5_client *client, uint64_t now) {
if (client->config->ack_timeout_seconds == 0) {
return;
}
struct aws_priority_queue *timeout_queue = &client->operational_state.operations_by_ack_timeout;

bool done = aws_priority_queue_size(timeout_queue) == 0;
while (!done) {
struct aws_mqtt5_operation **next_operation_by_timeout_ptr = NULL;
aws_priority_queue_top(timeout_queue, (void **)&next_operation_by_timeout_ptr);
AWS_FATAL_ASSERT(next_operation_by_timeout_ptr != NULL);
struct aws_mqtt5_operation *next_operation_by_timeout = *next_operation_by_timeout_ptr;
AWS_FATAL_ASSERT(next_operation_by_timeout != NULL);

// If the top of the heap hasn't timed out than nothing has
if (next_operation_by_timeout->ack_timeout_timepoint_ns > now) {
break;
}

struct aws_linked_list_node *node = aws_linked_list_begin(&client->operational_state.unacked_operations);
while (node != aws_linked_list_end(&client->operational_state.unacked_operations)) {
struct aws_mqtt5_operation *operation = AWS_CONTAINER_OF(node, struct aws_mqtt5_operation, node);
node = aws_linked_list_next(node);
if (operation->ack_timeout_timepoint_ns < now) {
/* Timeout for this packet has been reached */
aws_mqtt5_packet_id_t packet_id = aws_mqtt5_operation_get_packet_id(operation);
AWS_LOGF_INFO(
AWS_LS_MQTT5_CLIENT,
"id=%p: %s packet with id:%d has timed out",
(void *)client,
aws_mqtt5_packet_type_to_c_string(operation->packet_type),
(int)packet_id);

struct aws_hash_element *elem = NULL;
aws_hash_table_find(&client->operational_state.unacked_operations_table, &packet_id, &elem);

if (elem == NULL || elem->value == NULL) {
AWS_LOGF_ERROR(
AWS_LS_MQTT5_CLIENT,
"id=%p: timeout for unknown operation with id %d",
(void *)client,
(int)packet_id);
return;
}
/* Ack timeout for this operation has been reached */
aws_priority_queue_pop(timeout_queue, &next_operation_by_timeout);

aws_linked_list_remove(&operation->node);
aws_hash_table_remove(&client->operational_state.unacked_operations_table, &packet_id, NULL, NULL);
aws_mqtt5_packet_id_t packet_id = aws_mqtt5_operation_get_packet_id(next_operation_by_timeout);
AWS_LOGF_INFO(
AWS_LS_MQTT5_CLIENT,
"id=%p: %s packet with id:%d has timed out",
(void *)client,
aws_mqtt5_packet_type_to_c_string(next_operation_by_timeout->packet_type),
(int)packet_id);

s_complete_operation(client, operation, AWS_ERROR_MQTT_TIMEOUT, AWS_MQTT5_PT_NONE, NULL);
} else {
break;
struct aws_hash_element *elem = NULL;
aws_hash_table_find(&client->operational_state.unacked_operations_table, &packet_id, &elem);

if (elem == NULL || elem->value == NULL) {
AWS_LOGF_ERROR(
AWS_LS_MQTT5_CLIENT, "id=%p: timeout for unknown operation with id %d", (void *)client, (int)packet_id);
return;
}

aws_linked_list_remove(&next_operation_by_timeout->node);
aws_hash_table_remove(&client->operational_state.unacked_operations_table, &packet_id, NULL, NULL);

s_complete_operation(client, next_operation_by_timeout, AWS_ERROR_MQTT_TIMEOUT, AWS_MQTT5_PT_NONE, NULL);

done = aws_priority_queue_size(timeout_queue) == 0;
}
}

Expand Down Expand Up @@ -412,7 +423,11 @@ static uint64_t s_compute_next_service_time_client_mqtt_connect(struct aws_mqtt5
return aws_min_u64(client->next_mqtt_connect_packet_timeout_time, operation_processing_time);
}

static uint64_t s_min_non_0_64(uint64_t a, uint64_t b) {
/*
* Returns the minimum of two numbers, ignoring zero. Zero is returned only if both are zero. Useful when we're
* computing (next service) timepoints and zero means "no timepoint"
*/
static uint64_t s_min_non_zero_u64(uint64_t a, uint64_t b) {
if (a == 0) {
return b;
}
Expand All @@ -424,6 +439,19 @@ static uint64_t s_min_non_0_64(uint64_t a, uint64_t b) {
return aws_min_u64(a, b);
}

/*
* If there are unacked operations, returns the earliest point in time that one could timeout.
*/
static uint64_t s_get_unacked_operation_timeout_for_next_service_time(struct aws_mqtt5_client *client) {
if (aws_priority_queue_size(&client->operational_state.operations_by_ack_timeout) > 0) {
struct aws_mqtt5_operation **operation = NULL;
aws_priority_queue_top(&client->operational_state.operations_by_ack_timeout, (void **)&operation);
return (*operation)->ack_timeout_timepoint_ns;
}

return 0;
}

static uint64_t s_compute_next_service_time_client_connected(struct aws_mqtt5_client *client, uint64_t now) {

/* ping and ping timeout */
Expand All @@ -432,13 +460,8 @@ static uint64_t s_compute_next_service_time_client_connected(struct aws_mqtt5_cl
next_service_time = aws_min_u64(next_service_time, client->next_ping_timeout_time);
}

/* unacked operations timeout */
if (client->config->ack_timeout_seconds != 0 &&
!aws_linked_list_empty(&client->operational_state.unacked_operations)) {
struct aws_linked_list_node *node = aws_linked_list_begin(&client->operational_state.unacked_operations);
struct aws_mqtt5_operation *operation = AWS_CONTAINER_OF(node, struct aws_mqtt5_operation, node);
next_service_time = aws_min_u64(next_service_time, operation->ack_timeout_timepoint_ns);
}
next_service_time =
s_min_non_zero_u64(next_service_time, s_get_unacked_operation_timeout_for_next_service_time(client));

if (client->desired_state != AWS_MCS_CONNECTED) {
next_service_time = now;
Expand All @@ -447,29 +470,21 @@ static uint64_t s_compute_next_service_time_client_connected(struct aws_mqtt5_cl
uint64_t operation_processing_time =
s_aws_mqtt5_client_compute_operational_state_service_time(&client->operational_state, now);

next_service_time = s_min_non_0_64(operation_processing_time, next_service_time);
next_service_time = s_min_non_zero_u64(operation_processing_time, next_service_time);

/* reset reconnect delay interval */
next_service_time = s_min_non_0_64(client->next_reconnect_delay_reset_time_ns, next_service_time);
next_service_time = s_min_non_zero_u64(client->next_reconnect_delay_reset_time_ns, next_service_time);

return next_service_time;
}

static uint64_t s_compute_next_service_time_client_clean_disconnect(struct aws_mqtt5_client *client, uint64_t now) {
uint64_t ack_timeout_time = 0;

/* unacked operations timeout */
if (client->config->ack_timeout_seconds != 0 &&
!aws_linked_list_empty(&client->operational_state.unacked_operations)) {
struct aws_linked_list_node *node = aws_linked_list_begin(&client->operational_state.unacked_operations);
struct aws_mqtt5_operation *operation = AWS_CONTAINER_OF(node, struct aws_mqtt5_operation, node);
ack_timeout_time = operation->ack_timeout_timepoint_ns;
}
uint64_t ack_timeout_time = s_get_unacked_operation_timeout_for_next_service_time(client);

uint64_t operation_processing_time =
s_aws_mqtt5_client_compute_operational_state_service_time(&client->operational_state, now);

return s_min_non_0_64(ack_timeout_time, operation_processing_time);
return s_min_non_zero_u64(ack_timeout_time, operation_processing_time);
}

static uint64_t s_compute_next_service_time_client_channel_shutdown(struct aws_mqtt5_client *client, uint64_t now) {
Expand Down Expand Up @@ -587,8 +602,10 @@ static void s_aws_mqtt5_client_operational_state_reset(
s_complete_operation_list(client, &client_operational_state->unacked_operations, completion_error_code);

if (is_final) {
aws_priority_queue_clean_up(&client_operational_state->operations_by_ack_timeout);
aws_hash_table_clean_up(&client_operational_state->unacked_operations_table);
} else {
aws_priority_queue_clear(&client->operational_state.operations_by_ack_timeout);
aws_hash_table_clear(&client_operational_state->unacked_operations_table);
}
}
Expand Down Expand Up @@ -2497,6 +2514,25 @@ int aws_mqtt5_operation_bind_packet_id(
return AWS_OP_ERR;
}

/*
* Priority queue comparison function for ack timeout processing
*/
static int s_compare_operation_timeouts(const void *a, const void *b) {
const struct aws_mqtt5_operation **operation_a_ptr = (void *)a;
const struct aws_mqtt5_operation *operation_a = *operation_a_ptr;

const struct aws_mqtt5_operation **operation_b_ptr = (void *)b;
const struct aws_mqtt5_operation *operation_b = *operation_b_ptr;

if (operation_a->ack_timeout_timepoint_ns < operation_b->ack_timeout_timepoint_ns) {
return -1;
} else if (operation_a->ack_timeout_timepoint_ns > operation_b->ack_timeout_timepoint_ns) {
return 1;
} else {
return 0;
}
}

int aws_mqtt5_client_operational_state_init(
struct aws_mqtt5_client_operational_state *client_operational_state,
struct aws_allocator *allocator,
Expand All @@ -2517,6 +2553,15 @@ int aws_mqtt5_client_operational_state_init(
return AWS_OP_ERR;
}

if (aws_priority_queue_init_dynamic(
&client_operational_state->operations_by_ack_timeout,
allocator,
100,
sizeof(struct aws_mqtt5_operation *),
s_compare_operation_timeouts)) {
return AWS_OP_ERR;
}

client_operational_state->next_mqtt_packet_id = 1;
client_operational_state->current_operation = NULL;
client_operational_state->client = client;
Expand Down Expand Up @@ -2631,6 +2676,7 @@ void aws_mqtt5_client_on_disconnection_update_operational_state(struct aws_mqtt5
client, &operations_to_fail, AWS_ERROR_MQTT5_OPERATION_FAILED_DUE_TO_OFFLINE_QUEUE_POLICY);

aws_hash_table_clear(&client->operational_state.unacked_operations_table);
aws_priority_queue_clear(&client->operational_state.operations_by_ack_timeout);

/*
* Prevents inbound resolution on the highly unlikely, illegal server behavior of sending a PUBLISH before
Expand Down Expand Up @@ -3065,10 +3111,24 @@ int aws_mqtt5_client_service_operational_state(struct aws_mqtt5_client_operation
break;
}

if (client->config->ack_timeout_seconds != 0) {
uint32_t ack_timeout_seconds = aws_mqtt5_operation_get_ack_timeout_override(current_operation);
if (ack_timeout_seconds == 0) {
ack_timeout_seconds = client->config->ack_timeout_seconds;
}

if (ack_timeout_seconds > 0) {
current_operation->ack_timeout_timepoint_ns =
now + aws_timestamp_convert(
client->config->ack_timeout_seconds, AWS_TIMESTAMP_SECS, AWS_TIMESTAMP_NANOS, NULL);
now + aws_timestamp_convert(ack_timeout_seconds, AWS_TIMESTAMP_SECS, AWS_TIMESTAMP_NANOS, NULL);
} else {
current_operation->ack_timeout_timepoint_ns = UINT64_MAX;
}

if (aws_priority_queue_push_ref(
&client_operational_state->operations_by_ack_timeout,
(void *)&current_operation,
&current_operation->priority_queue_node)) {
operational_error_code = aws_last_error();
break;
}

aws_linked_list_push_back(&client_operational_state->unacked_operations, &current_operation->node);
Expand Down
Loading