Skip to content

Commit

Permalink
Always rejoin session option (#242)
Browse files Browse the repository at this point in the history
* Always rejoin session option
* Log a warning on non-compliant session resumption
  • Loading branch information
bretambrose authored Dec 15, 2022
1 parent 0e55e37 commit c2bc310
Show file tree
Hide file tree
Showing 5 changed files with 57 additions and 9 deletions.
8 changes: 8 additions & 0 deletions include/aws/mqtt/v5/mqtt5_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,14 @@ enum aws_mqtt5_client_session_behavior_type {
* Always attempt to rejoin an existing session after an initial connection success.
*/
AWS_MQTT5_CSBT_REJOIN_POST_SUCCESS,

/**
* Always attempt to rejoin an existing session. Since the client does not support durable session persistence,
* this option is not guaranteed to be spec compliant because any unacknowledged qos1 publishes (which are
* part of the client session state) will not be present on the initial connection. Until we support
* durable session resumption, this option is technically spec-breaking, but useful.
*/
AWS_MQTT5_CSBT_REJOIN_ALWAYS,
};

/**
Expand Down
16 changes: 12 additions & 4 deletions source/v5/mqtt5_client.c
Original file line number Diff line number Diff line change
Expand Up @@ -1164,7 +1164,8 @@ static bool s_should_resume_session(const struct aws_mqtt5_client *client) {
enum aws_mqtt5_client_session_behavior_type session_behavior =
aws_mqtt5_client_session_behavior_type_to_non_default(client->config->session_behavior);

return session_behavior == AWS_MQTT5_CSBT_REJOIN_POST_SUCCESS && client->has_connected_successfully;
return (session_behavior == AWS_MQTT5_CSBT_REJOIN_POST_SUCCESS && client->has_connected_successfully) ||
(session_behavior == AWS_MQTT5_CSBT_REJOIN_ALWAYS);
}

static void s_change_current_state_to_mqtt_connect(struct aws_mqtt5_client *client) {
Expand Down Expand Up @@ -1716,13 +1717,20 @@ static void s_aws_mqtt5_client_on_connack(
/* Check if a session is being rejoined and perform associated rejoin connect logic here */
if (client->negotiated_settings.rejoined_session) {
/* Disconnect if the server is attempting to connect the client to an unexpected session */
if (aws_mqtt5_client_session_behavior_type_to_non_default(client->config->session_behavior) ==
AWS_MQTT5_CSBT_CLEAN ||
client->has_connected_successfully == false) {
if (!s_should_resume_session(client)) {
s_aws_mqtt5_client_emit_final_lifecycle_event(
client, AWS_ERROR_MQTT_CANCELLED_FOR_CLEAN_SESSION, connack_view, NULL);
s_aws_mqtt5_client_shutdown_channel(client, AWS_ERROR_MQTT_CANCELLED_FOR_CLEAN_SESSION);
return;
} else if (!client->has_connected_successfully) {
/*
* We were configured with REJOIN_ALWAYS and this is the first connection. This is technically not safe
* and so let's log a warning for future diagnostics should it cause the user problems.
*/
AWS_LOGF_WARN(
AWS_LS_MQTT5_CLIENT,
"id=%p: initial connection rejoined existing session. This may cause packet id collisions.",
(void *)client);
}
}

Expand Down
2 changes: 2 additions & 0 deletions source/v5/mqtt5_utils.c
Original file line number Diff line number Diff line change
Expand Up @@ -270,6 +270,8 @@ const char *aws_mqtt5_client_session_behavior_type_to_c_string(
return "Clean session always";
case AWS_MQTT5_CSBT_REJOIN_POST_SUCCESS:
return "Attempt to resume a session after initial connection success";
case AWS_MQTT5_CSBT_REJOIN_ALWAYS:
return "Always attempt to resume a session";
default:
return "Unknown session behavior";
}
Expand Down
1 change: 1 addition & 0 deletions tests/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -271,6 +271,7 @@ add_test_case(mqtt5_client_flow_control_iot_core_throughput)
add_test_case(mqtt5_client_flow_control_iot_core_publish_tps)
add_test_case(mqtt5_client_session_resumption_clean_start)
add_test_case(mqtt5_client_session_resumption_post_success)
add_test_case(mqtt5_client_session_resumption_always)
add_test_case(mqtt5_client_receive_qos1_return_puback_test)
add_test_case(mqtt5_client_receive_nonexisting_session_state)
add_test_case(mqtt5_client_receive_assigned_client_id)
Expand Down
39 changes: 34 additions & 5 deletions tests/v5/mqtt5_client_tests.c
Original file line number Diff line number Diff line change
Expand Up @@ -2732,7 +2732,7 @@ static int s_mqtt5_client_flow_control_iot_core_publish_tps_fn(struct aws_alloca

AWS_TEST_CASE(mqtt5_client_flow_control_iot_core_publish_tps, s_mqtt5_client_flow_control_iot_core_publish_tps_fn)

static int s_aws_mqtt5_mock_server_handle_connect_honor_session(
static int s_aws_mqtt5_mock_server_handle_connect_honor_session_after_success(
void *packet,
struct aws_mqtt5_server_mock_connection_context *connection,
void *user_data) {
Expand All @@ -2753,6 +2753,24 @@ static int s_aws_mqtt5_mock_server_handle_connect_honor_session(
return s_aws_mqtt5_mock_server_send_packet(connection, AWS_MQTT5_PT_CONNACK, &connack_view);
}

static int s_aws_mqtt5_mock_server_handle_connect_honor_session_unconditional(
void *packet,
struct aws_mqtt5_server_mock_connection_context *connection,
void *user_data) {
(void)packet;
(void)user_data;

struct aws_mqtt5_packet_connect_view *connect_packet = packet;

struct aws_mqtt5_packet_connack_view connack_view;
AWS_ZERO_STRUCT(connack_view);

connack_view.reason_code = AWS_MQTT5_CRC_SUCCESS;
connack_view.session_present = !connect_packet->clean_start;

return s_aws_mqtt5_mock_server_send_packet(connection, AWS_MQTT5_PT_CONNACK, &connack_view);
}

struct aws_mqtt5_wait_for_n_lifecycle_events_context {
struct aws_mqtt5_client_mock_test_fixture *test_fixture;
enum aws_mqtt5_client_lifecycle_event_type event_type;
Expand Down Expand Up @@ -2803,6 +2821,7 @@ static bool s_compute_expected_rejoined_session(
case AWS_MQTT5_CSBT_REJOIN_POST_SUCCESS:
return connect_index > 0;

case AWS_MQTT5_CSBT_REJOIN_ALWAYS:
default:
return true;
}
Expand Down Expand Up @@ -2833,7 +2852,7 @@ static int s_do_mqtt5_client_session_resumption_test(

test_options.client_options.session_behavior = session_behavior;
test_options.server_function_table.packet_handlers[AWS_MQTT5_PT_CONNECT] =
s_aws_mqtt5_mock_server_handle_connect_honor_session;
s_aws_mqtt5_mock_server_handle_connect_honor_session_unconditional;

struct aws_mqtt5_client_mqtt5_mock_test_fixture_options test_fixture_options = {
.client_options = &test_options.client_options,
Expand Down Expand Up @@ -2908,6 +2927,16 @@ static int s_mqtt5_client_session_resumption_post_success_fn(struct aws_allocato

AWS_TEST_CASE(mqtt5_client_session_resumption_post_success, s_mqtt5_client_session_resumption_post_success_fn)

static int s_mqtt5_client_session_resumption_always_fn(struct aws_allocator *allocator, void *ctx) {
(void)ctx;

ASSERT_SUCCESS(s_do_mqtt5_client_session_resumption_test(allocator, AWS_MQTT5_CSBT_REJOIN_ALWAYS));

return AWS_OP_SUCCESS;
}

AWS_TEST_CASE(mqtt5_client_session_resumption_always, s_mqtt5_client_session_resumption_always_fn)

static uint8_t s_sub_pub_unsub_topic_filter[] = "hello/+";

static struct aws_mqtt5_subscription_view s_sub_pub_unsub_subscriptions[] = {
Expand Down Expand Up @@ -3832,7 +3861,7 @@ static int mqtt5_client_no_session_after_client_stop_fn(struct aws_allocator *al
s_aws_mqtt5_mock_server_handle_publish_no_puback_on_first_connect;
/* Simulate reconnecting to an existing connection */
test_options.server_function_table.packet_handlers[AWS_MQTT5_PT_CONNECT] =
s_aws_mqtt5_mock_server_handle_connect_honor_session;
s_aws_mqtt5_mock_server_handle_connect_honor_session_after_success;

struct aws_mqtt5_client_mock_test_fixture test_context;

Expand Down Expand Up @@ -3922,7 +3951,7 @@ static int mqtt5_client_restore_session_on_ping_timeout_reconnect_fn(struct aws_
s_aws_mqtt5_mock_server_handle_publish_no_puback_on_first_connect;
/* Simulate reconnecting to an existing connection */
test_options.server_function_table.packet_handlers[AWS_MQTT5_PT_CONNECT] =
s_aws_mqtt5_mock_server_handle_connect_honor_session;
s_aws_mqtt5_mock_server_handle_connect_honor_session_after_success;

struct aws_mqtt5_client_mock_test_fixture test_context;

Expand Down Expand Up @@ -4458,7 +4487,7 @@ static int s_mqtt5_client_statistics_publish_qos1_requeue_fn(struct aws_allocato
test_options.server_function_table.packet_handlers[AWS_MQTT5_PT_PUBLISH] =
s_aws_mqtt5_server_disconnect_on_first_publish_puback_after;
test_options.server_function_table.packet_handlers[AWS_MQTT5_PT_CONNECT] =
s_aws_mqtt5_mock_server_handle_connect_honor_session;
s_aws_mqtt5_mock_server_handle_connect_honor_session_after_success;

struct aws_mqtt5_client_mock_test_fixture test_context;
struct aws_mqtt5_sub_pub_unsub_context full_test_context = {
Expand Down

0 comments on commit c2bc310

Please sign in to comment.