From 8de5a0060e593ef04af615b3036d46a950768c84 Mon Sep 17 00:00:00 2001 From: Bret Ambrose Date: Wed, 13 Mar 2024 15:46:16 -0700 Subject: [PATCH] Update subscription manager and associated systems to support durable/indefined streaming operations --- .../request-response/request_response.h | 39 +- source/request-response/protocol_adapter.c | 30 + .../request_response_client.c | 5 +- .../request-response/subscription_manager.c | 181 ++++- tests/CMakeLists.txt | 36 +- .../request-response/protocol_adapter_tests.c | 4 +- .../request_response_client_tests.c | 2 +- .../subscription_manager_tests.c | 638 +++++++++++++++--- 8 files changed, 798 insertions(+), 137 deletions(-) diff --git a/include/aws/mqtt/private/request-response/request_response.h b/include/aws/mqtt/private/request-response/request_response.h index 5d861457..78e7819d 100644 --- a/include/aws/mqtt/private/request-response/request_response.h +++ b/include/aws/mqtt/private/request-response/request_response.h @@ -9,24 +9,49 @@ #include /* - * Describes a change to the state of a request-response client subscription + * Describes a change to the state of a request operation subscription */ enum aws_rr_subscription_event_type { /* - * A subscribe succeeded + * A request subscription subscribe succeeded */ - ARRSET_SUBSCRIPTION_SUBSCRIBE_SUCCESS, + ARRSET_REQUEST_SUBSCRIPTION_SUBSCRIBE_SUCCESS, /* - * A subscribe failed + * A request subscription subscribe failed */ - ARRSET_SUBSCRIPTION_SUBSCRIBE_FAILURE, + ARRSET_REQUEST_SUBSCRIPTION_SUBSCRIBE_FAILURE, /* - * A previously successful subscription has ended (generally due to a failure to resume a session) + * A previously successful request subscription has ended. + * + * Under normal circumstances this can happen when + * + * (1) failure to rejoin a session + * (2) a successful unsubscribe when the subscription is no longer needed */ - ARRSET_SUBSCRIPTION_ENDED + ARRSET_REQUEST_SUBSCRIPTION_SUBSCRIPTION_ENDED, + + /* + * A streaming subscription subscribe succeeded + */ + ARRSET_STREAMING_SUBSCRIPTION_ESTABLISHED, + + /* + * The protocol client failed to rejoin a session containing a previously-established streaming subscription + */ + ARRSET_STREAMING_SUBSCRIPTION_LOST, + + /* + * A streaming subscription subscribe attempt resulted in an error or reason code that the client has determined + * will result in indefinite failures to subscribe. In this case, we stop attempting to resubscribe. + * + * Situations that can lead to this: + * (1) Permission failures + * (2) Invalid topic filter + */ + ARRSET_STREAMING_SUBSCRIPTION_HALTED }; #endif /* AWS_MQTT_PRIVATE_REQUEST_RESPONSE_REQUEST_RESPONSE_H */ diff --git a/source/request-response/protocol_adapter.c b/source/request-response/protocol_adapter.c index a59f09dc..fefa7000 100644 --- a/source/request-response/protocol_adapter.c +++ b/source/request-response/protocol_adapter.c @@ -145,6 +145,7 @@ static void s_protocol_adapter_311_subscribe_completion( .topic_filter = aws_byte_cursor_from_buf(&subscribe_data->topic_filter), .event_type = AWS_PASET_SUBSCRIBE, .error_code = error_code, + .retryable = true, }; (*adapter->config.subscription_event_callback)(&subscribe_event, adapter->config.user_data); @@ -487,6 +488,28 @@ static void s_aws_mqtt_protocol_adapter_5_destroy(void *impl) { /* Subscribe */ +static bool s_is_retryable_subscribe(enum aws_mqtt5_suback_reason_code reason_code, int error_code) { + if (error_code == AWS_ERROR_MQTT5_PACKET_VALIDATION || error_code == AWS_ERROR_MQTT5_SUBSCRIBE_OPTIONS_VALIDATION) { + return false; + } else if (error_code != AWS_ERROR_SUCCESS) { + return true; + } + + switch (reason_code) { + case AWS_MQTT5_SARC_GRANTED_QOS_0: + case AWS_MQTT5_SARC_GRANTED_QOS_1: + case AWS_MQTT5_SARC_GRANTED_QOS_2: + case AWS_MQTT5_SARC_UNSPECIFIED_ERROR: + case AWS_MQTT5_SARC_PACKET_IDENTIFIER_IN_USE: + case AWS_MQTT5_SARC_IMPLEMENTATION_SPECIFIC_ERROR: + case AWS_MQTT5_SARC_QUOTA_EXCEEDED: + return true; + + default: + return false; + } +} + static void s_protocol_adapter_5_subscribe_completion( const struct aws_mqtt5_packet_suback_view *suback, int error_code, @@ -498,6 +521,12 @@ static void s_protocol_adapter_5_subscribe_completion( goto done; } + enum aws_mqtt5_suback_reason_code reason_code = AWS_MQTT5_SARC_GRANTED_QOS_0; + if (suback != NULL && suback->reason_code_count > 0) { + reason_code = suback->reason_codes[0]; + } + bool is_retryable = s_is_retryable_subscribe(reason_code, error_code); + if (error_code == AWS_ERROR_SUCCESS) { if (suback == NULL || suback->reason_code_count != 1 || suback->reason_codes[0] >= 128) { error_code = AWS_ERROR_MQTT_PROTOCOL_ADAPTER_FAILING_REASON_CODE; @@ -508,6 +537,7 @@ static void s_protocol_adapter_5_subscribe_completion( .topic_filter = aws_byte_cursor_from_buf(&subscribe_data->topic_filter), .event_type = AWS_PASET_SUBSCRIBE, .error_code = error_code, + .retryable = is_retryable, }; (*adapter->config.subscription_event_callback)(&subscribe_event, adapter->config.user_data); diff --git a/source/request-response/request_response_client.c b/source/request-response/request_response_client.c index 1f6b255b..110199d6 100644 --- a/source/request-response/request_response_client.c +++ b/source/request-response/request_response_client.c @@ -454,10 +454,7 @@ static void s_streaming_operation_on_client_shutdown(struct aws_mqtt_rr_client_o operation->storage.streaming_storage.options.subscription_status_callback; void *user_data = operation->storage.streaming_storage.options.user_data; if (subscription_status_callback != NULL) { - enum aws_rr_subscription_event_type status_type = (operation->state == AWS_MRROS_SUBSCRIBED) - ? ARRSET_SUBSCRIPTION_ENDED - : ARRSET_SUBSCRIPTION_SUBSCRIBE_FAILURE; - (*subscription_status_callback)(status_type, error_code, user_data); + (*subscription_status_callback)(ARRSET_STREAMING_SUBSCRIPTION_HALTED, error_code, user_data); } } diff --git a/source/request-response/subscription_manager.c b/source/request-response/subscription_manager.c index 402814c3..98a7206c 100644 --- a/source/request-response/subscription_manager.c +++ b/source/request-response/subscription_manager.c @@ -64,6 +64,16 @@ struct aws_rr_subscription_record { enum aws_rr_subscription_pending_action_type pending_action; enum aws_rr_subscription_type type; + + /* + * A poisoned record represents a subscription that we will never try to subscribe to because a previous + * attempt resulted in a failure that we judge to be "terminal." Terminal failures include permission failures + * and validation failures. To remove a poisoned record, all listeners must be removed. For request-response + * operations this will happen naturally. For streaming operations, the operation must be closed by the user (in + * response to the user-facing event we emit on the streaming operation when the failure that poisons the + * record occurs). + */ + bool poisoned; }; static void s_aws_rr_subscription_record_log_invariant_violations(const struct aws_rr_subscription_record *record) { @@ -162,7 +172,7 @@ static void s_subscription_record_unsubscribe( record->pending_action = ARRSPAT_UNSUBSCRIBING; } -/* Only called when shutting down the request-response client */ +/* Only called by the request-response client when shutting down */ static int s_rr_subscription_clean_up_foreach_wrap(void *context, struct aws_hash_element *elem) { struct aws_rr_subscription_manager *manager = context; struct aws_rr_subscription_record *subscription = elem->value; @@ -297,26 +307,55 @@ static void s_cull_unused_subscriptions(struct aws_rr_subscription_manager *mana aws_hash_table_foreach(&manager->subscriptions, s_rr_subscription_cull_unused_subscriptions_wrapper, manager); } -static const char *s_request_response_subscription_event_type_to_c_str(enum aws_rr_subscription_event_type type) { +static const char *s_rr_subscription_event_type_to_c_str(enum aws_rr_subscription_event_type type) { switch (type) { - case ARRSET_SUBSCRIPTION_SUBSCRIBE_SUCCESS: - return "SubscriptionSubscribeSuccess"; + case ARRSET_REQUEST_SUBSCRIPTION_SUBSCRIBE_SUCCESS: + return "RequestSubscribeSuccess"; + + case ARRSET_REQUEST_SUBSCRIPTION_SUBSCRIBE_FAILURE: + return "RequestSubscribeFailure"; + + case ARRSET_REQUEST_SUBSCRIPTION_SUBSCRIPTION_ENDED: + return "RequestSubscriptionEnded"; + + case ARRSET_STREAMING_SUBSCRIPTION_ESTABLISHED: + return "StreamingSubscriptionEstablished"; - case ARRSET_SUBSCRIPTION_SUBSCRIBE_FAILURE: - return "SubscriptionSubscribeFailure"; + case ARRSET_STREAMING_SUBSCRIPTION_LOST: + return "StreamingSubscriptionLost"; - case ARRSET_SUBSCRIPTION_ENDED: - return "SubscriptionEnded"; + case ARRSET_STREAMING_SUBSCRIPTION_HALTED: + return "StreamingSubscriptionHalted"; } return "Unknown"; } +static bool s_subscription_type_matches_event_type( + enum aws_rr_subscription_type subscription_type, + enum aws_rr_subscription_event_type event_type) { + switch (event_type) { + case ARRSET_REQUEST_SUBSCRIPTION_SUBSCRIBE_SUCCESS: + case ARRSET_REQUEST_SUBSCRIPTION_SUBSCRIBE_FAILURE: + case ARRSET_REQUEST_SUBSCRIPTION_SUBSCRIPTION_ENDED: + return subscription_type == ARRST_REQUEST_RESPONSE; + + case ARRSET_STREAMING_SUBSCRIPTION_ESTABLISHED: + case ARRSET_STREAMING_SUBSCRIPTION_LOST: + case ARRSET_STREAMING_SUBSCRIPTION_HALTED: + return subscription_type == ARRST_EVENT_STREAM; + } + + return false; +} + static void s_emit_subscription_event( const struct aws_rr_subscription_manager *manager, const struct aws_rr_subscription_record *record, enum aws_rr_subscription_event_type type) { + AWS_FATAL_ASSERT(s_subscription_type_matches_event_type(record->type, type)); + for (struct aws_hash_iter iter = aws_hash_iter_begin(&record->listeners); !aws_hash_iter_done(&iter); aws_hash_iter_next(&iter)) { @@ -334,7 +373,7 @@ static void s_emit_subscription_event( "request-response subscription manager - subscription event for ('" PRInSTR "'), type: %s, operation: %" PRIu64 "", AWS_BYTE_CURSOR_PRI(record->topic_filter_cursor), - s_request_response_subscription_event_type_to_c_str(type), + s_rr_subscription_event_type_to_c_str(type), listener->operation_id); } } @@ -344,6 +383,10 @@ static int s_rr_activate_idle_subscription( struct aws_rr_subscription_record *record) { int result = AWS_OP_SUCCESS; + if (record->poisoned) { + return AWS_OP_SUCCESS; + } + if (manager->is_protocol_client_connected && aws_hash_table_get_entry_count(&record->listeners) > 0) { if (record->status == ARRSST_NOT_SUBSCRIBED && record->pending_action == ARRSPAT_NOTHING) { struct aws_protocol_adapter_subscribe_options subscribe_options = { @@ -368,7 +411,12 @@ static int s_rr_activate_idle_subscription( error_code, aws_error_debug_str(error_code)); - s_emit_subscription_event(manager, record, ARRSET_SUBSCRIPTION_SUBSCRIBE_FAILURE); + if (record->type == ARRST_REQUEST_RESPONSE) { + s_emit_subscription_event(manager, record, ARRSET_REQUEST_SUBSCRIPTION_SUBSCRIBE_FAILURE); + } else { + record->poisoned = true; + s_emit_subscription_event(manager, record, ARRSET_STREAMING_SUBSCRIPTION_HALTED); + } } } } @@ -434,6 +482,16 @@ enum aws_acquire_subscription_result_type aws_rr_subscription_manager_acquire_su return AASRT_FAILURE; } + if (existing_record->poisoned) { + AWS_LOGF_ERROR( + AWS_LS_MQTT_REQUEST_RESPONSE, + "request-response subscription manager - acquire subscription for ('" PRInSTR "'), operation %" PRIu64 + " failed - existing subscription is poisoned and has not been released", + AWS_BYTE_CURSOR_PRI(options->topic_filter), + options->operation_id); + return AASRT_FAILURE; + } + s_aws_rr_subscription_record_log_invariant_violations(existing_record); // for simplicity, we require unsubscribes to complete before re-subscribing @@ -489,42 +547,84 @@ void aws_rr_subscription_manager_release_subscription( s_remove_listener_from_subscription_record(manager, options->topic_filter, options->operation_id); } -void aws_rr_subscription_manager_on_protocol_adapter_subscription_event( +static void s_handle_protocol_adapter_request_subscription_event( struct aws_rr_subscription_manager *manager, + struct aws_rr_subscription_record *record, const struct aws_protocol_adapter_subscription_event *event) { - struct aws_rr_subscription_record *record = s_get_subscription_record(manager, event->topic_filter); - if (record == NULL) { - return; - } + if (event->event_type == AWS_PASET_SUBSCRIBE) { + AWS_FATAL_ASSERT(record->pending_action == ARRSPAT_SUBSCRIBING); + record->pending_action = ARRSPAT_NOTHING; - AWS_LOGF_DEBUG( - AWS_LS_MQTT_REQUEST_RESPONSE, - "request-response subscription manager - received a protocol adapter subscription event for ('" PRInSTR - "'), type %s, error_code %d(%s)", - AWS_BYTE_CURSOR_PRI(event->topic_filter), - aws_protocol_adapter_subscription_event_type_to_c_str(event->event_type), - event->error_code, - aws_error_debug_str(event->error_code)); + if (event->error_code == AWS_ERROR_SUCCESS) { + record->status = ARRSST_SUBSCRIBED; + s_emit_subscription_event(manager, record, ARRSET_REQUEST_SUBSCRIPTION_SUBSCRIBE_SUCCESS); + } else { + s_emit_subscription_event(manager, record, ARRSET_REQUEST_SUBSCRIPTION_SUBSCRIBE_FAILURE); + } + } else { + AWS_FATAL_ASSERT(event->event_type == AWS_PASET_UNSUBSCRIBE); + AWS_FATAL_ASSERT(record->pending_action == ARRSPAT_UNSUBSCRIBING); + record->pending_action = ARRSPAT_NOTHING; + if (event->error_code == AWS_ERROR_SUCCESS) { + record->status = ARRSST_NOT_SUBSCRIBED; + s_emit_subscription_event(manager, record, ARRSET_REQUEST_SUBSCRIPTION_SUBSCRIPTION_ENDED); + } + } +} + +static void s_handle_protocol_adapter_streaming_subscription_event( + struct aws_rr_subscription_manager *manager, + struct aws_rr_subscription_record *record, + const struct aws_protocol_adapter_subscription_event *event) { if (event->event_type == AWS_PASET_SUBSCRIBE) { AWS_FATAL_ASSERT(record->pending_action == ARRSPAT_SUBSCRIBING); + record->pending_action = ARRSPAT_NOTHING; if (event->error_code == AWS_ERROR_SUCCESS) { record->status = ARRSST_SUBSCRIBED; - s_emit_subscription_event(manager, record, ARRSET_SUBSCRIPTION_SUBSCRIBE_SUCCESS); + s_emit_subscription_event(manager, record, ARRSET_STREAMING_SUBSCRIPTION_ESTABLISHED); } else { - s_emit_subscription_event(manager, record, ARRSET_SUBSCRIPTION_SUBSCRIBE_FAILURE); + if (event->retryable) { + s_rr_activate_idle_subscription(manager, record); + } else { + record->poisoned = true; + s_emit_subscription_event(manager, record, ARRSET_STREAMING_SUBSCRIPTION_HALTED); + } } - } else if (event->event_type == AWS_PASET_UNSUBSCRIBE) { + } else { + AWS_FATAL_ASSERT(event->event_type == AWS_PASET_UNSUBSCRIBE); AWS_FATAL_ASSERT(record->pending_action == ARRSPAT_UNSUBSCRIBING); + record->pending_action = ARRSPAT_NOTHING; if (event->error_code == AWS_ERROR_SUCCESS) { record->status = ARRSST_NOT_SUBSCRIBED; - s_emit_subscription_event(manager, record, ARRSET_SUBSCRIPTION_ENDED); } } +} + +void aws_rr_subscription_manager_on_protocol_adapter_subscription_event( + struct aws_rr_subscription_manager *manager, + const struct aws_protocol_adapter_subscription_event *event) { + struct aws_rr_subscription_record *record = s_get_subscription_record(manager, event->topic_filter); + if (record == NULL) { + return; + } - record->pending_action = ARRSPAT_NOTHING; + AWS_LOGF_DEBUG( + AWS_LS_MQTT_REQUEST_RESPONSE, + "request-response subscription manager - received a protocol adapter subscription event for ('" PRInSTR + "'), type %s, error_code %d(%s)", + AWS_BYTE_CURSOR_PRI(event->topic_filter), + aws_protocol_adapter_subscription_event_type_to_c_str(event->event_type), + event->error_code, + aws_error_debug_str(event->error_code)); + + if (record->type == ARRST_REQUEST_RESPONSE) { + s_handle_protocol_adapter_request_subscription_event(manager, record, event); + } else { + s_handle_protocol_adapter_streaming_subscription_event(manager, record, event); + } s_aws_rr_subscription_record_log_invariant_violations(record); } @@ -550,19 +650,36 @@ static int s_apply_session_lost_wrapper(void *context, struct aws_hash_element * if (record->status == ARRSST_SUBSCRIBED) { record->status = ARRSST_NOT_SUBSCRIBED; - s_emit_subscription_event(manager, record, ARRSET_SUBSCRIPTION_ENDED); - if (record->pending_action != ARRSPAT_UNSUBSCRIBING) { - s_aws_rr_subscription_record_destroy(record); - return AWS_COMMON_HASH_TABLE_ITER_CONTINUE | AWS_COMMON_HASH_TABLE_ITER_DELETE; + if (record->type == ARRST_REQUEST_RESPONSE) { + s_emit_subscription_event(manager, record, ARRSET_REQUEST_SUBSCRIPTION_SUBSCRIPTION_ENDED); + + if (record->pending_action != ARRSPAT_UNSUBSCRIBING) { + s_aws_rr_subscription_record_destroy(record); + return AWS_COMMON_HASH_TABLE_ITER_CONTINUE | AWS_COMMON_HASH_TABLE_ITER_DELETE; + } + } else { + s_emit_subscription_event(manager, record, ARRSET_STREAMING_SUBSCRIPTION_LOST); } } return AWS_COMMON_HASH_TABLE_ITER_CONTINUE; } +static int s_apply_streaming_resubscribe_wrapper(void *context, struct aws_hash_element *elem) { + struct aws_rr_subscription_record *record = elem->value; + struct aws_rr_subscription_manager *manager = context; + + if (record->type == ARRST_EVENT_STREAM) { + s_rr_activate_idle_subscription(manager, record); + } + + return AWS_COMMON_HASH_TABLE_ITER_CONTINUE; +} + static void s_apply_session_lost(struct aws_rr_subscription_manager *manager) { aws_hash_table_foreach(&manager->subscriptions, s_apply_session_lost_wrapper, manager); + aws_hash_table_foreach(&manager->subscriptions, s_apply_streaming_resubscribe_wrapper, manager); } void aws_rr_subscription_manager_on_protocol_adapter_connection_event( diff --git a/tests/CMakeLists.txt b/tests/CMakeLists.txt index 92e89030..3ba0bb96 100644 --- a/tests/CMakeLists.txt +++ b/tests/CMakeLists.txt @@ -492,20 +492,32 @@ add_test_case(rrsm_acquire_blocked_eventstream) add_test_case(rrsm_acquire_no_capacity_max1) add_test_case(rrsm_acquire_no_capacity_too_many_event_stream) add_test_case(rrsm_acquire_failure_mixed_subscription_types) -add_test_case(rrsm_acquire_failure_subscribe_sync_failure) -add_test_case(rrsm_acquire_subscribe_failure_event) -add_test_case(rrsm_release_unsubscribes) +add_test_case(rrsm_acquire_failure_poisoned) +add_test_case(rrsm_release_unsubscribes_request) +add_test_case(rrsm_release_unsubscribes_streaming) add_test_case(rrsm_release_unsubscribe_success_clears_space) add_test_case(rrsm_release_unsubscribe_failure_blocked) -add_test_case(rrsm_offline_acquire_online_success) -add_test_case(rrsm_offline_acquire_online_failure) -add_test_case(rrsm_offline_acquire_release_online) -add_test_case(rrsm_acquire_success_offline_release_acquire2_no_unsubscribe) -add_test_case(rrsm_acquire_success_clean_up_unsubscribe_override) -add_test_case(rrsm_acquire_pending_clean_up_unsubscribe_override) -add_test_case(rrsm_offline_acquire_pending_clean_up_unsubscribe_override) -add_test_case(rrsm_acquire_success_offline_online_no_session_subscription_lost_can_reacquire) -add_test_case(rrsm_subscription_lost_while_unsubscribing) +add_test_case(rrsm_acquire_failure_subscribe_sync_failure_request) +add_test_case(rrsm_acquire_failure_subscribe_sync_failure_streaming) +add_test_case(rrsm_acquire_request_subscribe_failure_event) +add_test_case(rrsm_acquire_streaming_subscribe_failure_retryable_resubscribe) +add_test_case(rrsm_offline_acquire_request_online_success) +add_test_case(rrsm_offline_acquire_request_online_failure) +add_test_case(rrsm_offline_acquire_streaming_online_success) +add_test_case(rrsm_offline_acquire_streaming_online_failure) +add_test_case(rrsm_offline_acquire_release_request_online) +add_test_case(rrsm_offline_acquire_release_streaming_online) +add_test_case(rrsm_acquire_request_success_offline_release_acquire2_no_unsubscribe) +add_test_case(rrsm_acquire_streaming_success_offline_release_acquire2_no_unsubscribe) +add_test_case(rrsm_acquire_request_success_clean_up_unsubscribe_override) +add_test_case(rrsm_acquire_streaming_success_clean_up_unsubscribe_override) +add_test_case(rrsm_acquire_request_pending_clean_up_unsubscribe_override) +add_test_case(rrsm_acquire_streaming_pending_clean_up_unsubscribe_override) +add_test_case(rrsm_offline_acquire_request_pending_clean_up_unsubscribe_override) +add_test_case(rrsm_offline_acquire_streaming_pending_clean_up_unsubscribe_override) +add_test_case(rrsm_acquire_request_success_offline_online_no_session_subscription_ended_can_reacquire) +add_test_case(rrsm_request_subscription_ended_while_unsubscribing) +add_test_case(rrsm_streaming_subscription_lost_resubscribe_on_no_session) # "rrc" = request response client add_test_case(rrc_mqtt5_create_destroy) diff --git a/tests/request-response/protocol_adapter_tests.c b/tests/request-response/protocol_adapter_tests.c index ec2d6a1c..5b5bce58 100644 --- a/tests/request-response/protocol_adapter_tests.c +++ b/tests/request-response/protocol_adapter_tests.c @@ -585,7 +585,7 @@ static int s_do_request_response_mqtt5_protocol_adapter_subscribe_test( AWS_PASET_SUBSCRIBE, aws_byte_cursor_from_c_str("hello/world"), expected_error_code, - false); + test_type != PAOTT_FAILURE_REASON_CODE_NOT_RETRYABLE); struct aws_protocol_adapter_subscribe_options subscribe_options = { .topic_filter = aws_byte_cursor_from_buf(&expected_outcome.topic_filter), @@ -1255,7 +1255,7 @@ static int s_do_request_response_mqtt311_protocol_adapter_subscribe_test( AWS_PASET_SUBSCRIBE, aws_byte_cursor_from_c_str("hello/world"), expected_error_code, - false); + true); struct aws_protocol_adapter_subscribe_options subscribe_options = { .topic_filter = aws_byte_cursor_from_buf(&expected_outcome.topic_filter), diff --git a/tests/request-response/request_response_client_tests.c b/tests/request-response/request_response_client_tests.c index 4b2ca361..a7586df1 100644 --- a/tests/request-response/request_response_client_tests.c +++ b/tests/request-response/request_response_client_tests.c @@ -909,7 +909,7 @@ static int s_rrc_submit_streaming_operation_and_shutdown_fn(struct aws_allocator struct aws_rr_client_fixture_streaming_record_subscription_event expected_events[] = { { - .status = ARRSET_SUBSCRIPTION_SUBSCRIBE_FAILURE, + .status = ARRSET_STREAMING_SUBSCRIPTION_HALTED, .error_code = AWS_ERROR_MQTT_REQUEST_RESPONSE_CLIENT_SHUT_DOWN, }, }; diff --git a/tests/request-response/subscription_manager_tests.c b/tests/request-response/subscription_manager_tests.c index 2ffd58cf..4abd726c 100644 --- a/tests/request-response/subscription_manager_tests.c +++ b/tests/request-response/subscription_manager_tests.c @@ -59,6 +59,7 @@ struct aws_mqtt_protocol_adapter_mock_impl { struct aws_array_list api_records; bool is_connected; + size_t subscribe_count; }; static void s_aws_mqtt_protocol_adapter_mock_destroy(void *impl) { @@ -523,8 +524,8 @@ static int s_rrsm_acquire_existing_subscribing_fn(struct aws_allocator *allocato AWS_TEST_CASE(rrsm_acquire_existing_subscribing, s_rrsm_acquire_existing_subscribing_fn) /* - * Verify: Acquiring an existing, completed subscription does not trigger a protocol client subscribe and returns - * SUBSCRIBED + * Verify: Acquiring an existing, completed request subscription does not trigger a protocol client subscribe and + * returns SUBSCRIBED. Verify request and streaming subscription events are emitted. */ static int s_rrsm_acquire_existing_subscribed_fn(struct aws_allocator *allocator, void *ctx) { (void)ctx; @@ -574,12 +575,12 @@ static int s_rrsm_acquire_existing_subscribed_fn(struct aws_allocator *allocator struct aws_subscription_status_record expected_subscription_events[] = { { - .type = ARRSET_SUBSCRIPTION_SUBSCRIBE_SUCCESS, + .type = ARRSET_REQUEST_SUBSCRIPTION_SUBSCRIBE_SUCCESS, .topic_filter_cursor = aws_byte_cursor_from_c_str("hello/world1"), .operation_id = 1, }, { - .type = ARRSET_SUBSCRIPTION_SUBSCRIBE_SUCCESS, + .type = ARRSET_STREAMING_SUBSCRIPTION_ESTABLISHED, .topic_filter_cursor = aws_byte_cursor_from_c_str("hello/world2"), .operation_id = 2, }}; @@ -818,10 +819,61 @@ static int s_rrsm_acquire_failure_mixed_subscription_types_fn(struct aws_allocat AWS_TEST_CASE(rrsm_acquire_failure_mixed_subscription_types, s_rrsm_acquire_failure_mixed_subscription_types_fn) /* - * Verify: A subscription that resolves successfully invokes callbacks for every operation listener; releasing + * Verify: Acquiring an existing, completed request subscription does not trigger a protocol client subscribe and + * returns SUBSCRIBED. Verify request and streaming subscription events are emitted. + */ +static int s_rrsm_acquire_failure_poisoned_fn(struct aws_allocator *allocator, void *ctx) { + (void)ctx; + + aws_mqtt_library_init(allocator); + + struct aws_subscription_manager_test_fixture fixture; + ASSERT_SUCCESS(s_aws_subscription_manager_test_fixture_init(&fixture, allocator, NULL)); + + struct aws_rr_subscription_manager *manager = &fixture.subscription_manager; + + struct aws_rr_acquire_subscription_options acquire1_options = { + .type = ARRST_EVENT_STREAM, + .topic_filter = aws_byte_cursor_from_c_str("hello/world1"), + .operation_id = 1, + }; + ASSERT_INT_EQUALS(AASRT_SUBSCRIBING, aws_rr_subscription_manager_acquire_subscription(manager, &acquire1_options)); + + struct aws_protocol_adapter_subscription_event unretryable_failure_event = { + .topic_filter = aws_byte_cursor_from_c_str("hello/world1"), + .event_type = AWS_PASET_SUBSCRIBE, + .error_code = AWS_ERROR_MQTT_PROTOCOL_ADAPTER_FAILING_REASON_CODE, + .retryable = false, + }; + aws_rr_subscription_manager_on_protocol_adapter_subscription_event(manager, &unretryable_failure_event); + + struct aws_subscription_status_record expected_subscription_events[] = {{ + .type = ARRSET_STREAMING_SUBSCRIPTION_HALTED, + .topic_filter_cursor = aws_byte_cursor_from_c_str("hello/world1"), + .operation_id = 1, + }}; + ASSERT_TRUE(s_contains_subscription_event_sequential_records(&fixture, 1, expected_subscription_events)); + + struct aws_rr_acquire_subscription_options reacquire1_options = { + .type = ARRST_EVENT_STREAM, + .topic_filter = aws_byte_cursor_from_c_str("hello/world1"), + .operation_id = 3, + }; + ASSERT_INT_EQUALS(AASRT_FAILURE, aws_rr_subscription_manager_acquire_subscription(manager, &reacquire1_options)); + + s_aws_subscription_manager_test_fixture_clean_up(&fixture); + aws_mqtt_library_clean_up(); + + return AWS_OP_SUCCESS; +} + +AWS_TEST_CASE(rrsm_acquire_failure_poisoned, s_rrsm_acquire_failure_poisoned_fn) + +/* + * Verify: A request subscription that resolves successfully invokes callbacks for every operation listener; releasing * both references and calling a new acquire will trigger an unsubscribe of the first subscription */ -static int s_rrsm_release_unsubscribes_fn(struct aws_allocator *allocator, void *ctx) { +static int s_rrsm_release_unsubscribes_request_fn(struct aws_allocator *allocator, void *ctx) { (void)ctx; aws_mqtt_library_init(allocator); @@ -855,12 +907,12 @@ static int s_rrsm_release_unsubscribes_fn(struct aws_allocator *allocator, void // verify two success callbacks struct aws_subscription_status_record expected_subscription_events[] = { { - .type = ARRSET_SUBSCRIPTION_SUBSCRIBE_SUCCESS, + .type = ARRSET_REQUEST_SUBSCRIPTION_SUBSCRIBE_SUCCESS, .topic_filter_cursor = aws_byte_cursor_from_c_str("hello/world"), .operation_id = 1, }, { - .type = ARRSET_SUBSCRIPTION_SUBSCRIBE_SUCCESS, + .type = ARRSET_REQUEST_SUBSCRIPTION_SUBSCRIBE_SUCCESS, .topic_filter_cursor = aws_byte_cursor_from_c_str("hello/world"), .operation_id = 2, }}; @@ -907,7 +959,99 @@ static int s_rrsm_release_unsubscribes_fn(struct aws_allocator *allocator, void return AWS_OP_SUCCESS; } -AWS_TEST_CASE(rrsm_release_unsubscribes, s_rrsm_release_unsubscribes_fn) +AWS_TEST_CASE(rrsm_release_unsubscribes_request, s_rrsm_release_unsubscribes_request_fn) + +/* + * Verify: A streaming subscription that resolves successfully invokes callbacks for every operation listener; releasing + * both references and calling a new acquire will trigger an unsubscribe of the first subscription + */ +static int s_rrsm_release_unsubscribes_streaming_fn(struct aws_allocator *allocator, void *ctx) { + (void)ctx; + + aws_mqtt_library_init(allocator); + + struct aws_subscription_manager_test_fixture fixture; + ASSERT_SUCCESS(s_aws_subscription_manager_test_fixture_init(&fixture, allocator, NULL)); + + struct aws_rr_subscription_manager *manager = &fixture.subscription_manager; + + struct aws_rr_acquire_subscription_options acquire1_options = { + .type = ARRST_EVENT_STREAM, + .topic_filter = aws_byte_cursor_from_c_str("hello/world"), + .operation_id = 1, + }; + ASSERT_INT_EQUALS(AASRT_SUBSCRIBING, aws_rr_subscription_manager_acquire_subscription(manager, &acquire1_options)); + + struct aws_rr_acquire_subscription_options acquire2_options = { + .type = ARRST_EVENT_STREAM, + .topic_filter = aws_byte_cursor_from_c_str("hello/world"), + .operation_id = 2, + }; + ASSERT_INT_EQUALS(AASRT_SUBSCRIBING, aws_rr_subscription_manager_acquire_subscription(manager, &acquire2_options)); + + struct aws_protocol_adapter_subscription_event successful_subscription_event = { + .topic_filter = aws_byte_cursor_from_c_str("hello/world"), + .event_type = AWS_PASET_SUBSCRIBE, + .error_code = AWS_ERROR_SUCCESS, + }; + aws_rr_subscription_manager_on_protocol_adapter_subscription_event(manager, &successful_subscription_event); + + // verify two success callbacks + struct aws_subscription_status_record expected_subscription_events[] = { + { + .type = ARRSET_STREAMING_SUBSCRIPTION_ESTABLISHED, + .topic_filter_cursor = aws_byte_cursor_from_c_str("hello/world"), + .operation_id = 1, + }, + { + .type = ARRSET_STREAMING_SUBSCRIPTION_ESTABLISHED, + .topic_filter_cursor = aws_byte_cursor_from_c_str("hello/world"), + .operation_id = 2, + }}; + ASSERT_TRUE(s_contains_subscription_event_records(&fixture, 2, expected_subscription_events)); + + // verify no unsubscribes + struct aws_protocol_adapter_api_record expected_unsubscribe = { + .type = PAAT_UNSUBSCRIBE, + .topic_filter_cursor = aws_byte_cursor_from_c_str("hello/world"), + .timeout = DEFAULT_SM_TEST_TIMEOUT, + }; + ASSERT_FALSE(s_api_records_contains_record(fixture.mock_protocol_adapter, &expected_unsubscribe)); + + // release once, verify no unsubscribe + struct aws_rr_release_subscription_options release1_options = { + .topic_filter = aws_byte_cursor_from_c_str("hello/world"), + .operation_id = 1, + }; + aws_rr_subscription_manager_release_subscription(manager, &release1_options); + ASSERT_FALSE(s_api_records_contains_record(fixture.mock_protocol_adapter, &expected_unsubscribe)); + + // release second + struct aws_rr_release_subscription_options release2_options = { + .topic_filter = aws_byte_cursor_from_c_str("hello/world"), + .operation_id = 2, + }; + aws_rr_subscription_manager_release_subscription(manager, &release2_options); + ASSERT_FALSE(s_api_records_contains_record(fixture.mock_protocol_adapter, &expected_unsubscribe)); + + // unsubscribe is lazy, so we need to trigger it by acquiring something else + struct aws_rr_acquire_subscription_options acquire3_options = { + .type = ARRST_EVENT_STREAM, + .topic_filter = aws_byte_cursor_from_c_str("hello/world2"), + .operation_id = 3, + }; + ASSERT_INT_EQUALS(AASRT_SUBSCRIBING, aws_rr_subscription_manager_acquire_subscription(manager, &acquire3_options)); + + // now the unsubscribe should be present + ASSERT_TRUE(s_api_records_contains_record(fixture.mock_protocol_adapter, &expected_unsubscribe)); + + s_aws_subscription_manager_test_fixture_clean_up(&fixture); + aws_mqtt_library_clean_up(); + + return AWS_OP_SUCCESS; +} + +AWS_TEST_CASE(rrsm_release_unsubscribes_streaming, s_rrsm_release_unsubscribes_streaming_fn) static int s_rrsm_do_unsubscribe_test(struct aws_allocator *allocator, bool should_succeed) { aws_mqtt_library_init(allocator); @@ -1006,25 +1150,31 @@ static int s_rrsm_release_unsubscribe_failure_blocked_fn(struct aws_allocator *a AWS_TEST_CASE(rrsm_release_unsubscribe_failure_blocked, s_rrsm_release_unsubscribe_failure_blocked_fn) -static int s_aws_mqtt_protocol_adapter_mock_subscribe_fails( +static int s_aws_mqtt_protocol_adapter_mock_subscribe_fails_first_time( void *impl, struct aws_protocol_adapter_subscribe_options *options) { (void)impl; (void)options; - return aws_raise_error(AWS_ERROR_INVALID_ARGUMENT); + struct aws_mqtt_protocol_adapter_mock_impl *mock_impl = impl; + + if (mock_impl->subscribe_count++ == 0) { + return aws_raise_error(AWS_ERROR_INVALID_ARGUMENT); + } else { + return AWS_OP_SUCCESS; + } } /* - * Verify: Acquiring a new subscription but synchronously failing the protocol adapter subscribe returns FAILURE + * Verify: Acquiring a new request subscription but synchronously failing the protocol adapter subscribe returns FAILURE */ -static int s_rrsm_acquire_failure_subscribe_sync_failure_fn(struct aws_allocator *allocator, void *ctx) { +static int s_rrsm_acquire_failure_subscribe_sync_failure_request_fn(struct aws_allocator *allocator, void *ctx) { (void)ctx; aws_mqtt_library_init(allocator); struct aws_mqtt_protocol_adapter_vtable failing_vtable = s_protocol_adapter_mock_vtable; - failing_vtable.aws_mqtt_protocol_adapter_subscribe_fn = s_aws_mqtt_protocol_adapter_mock_subscribe_fails; + failing_vtable.aws_mqtt_protocol_adapter_subscribe_fn = s_aws_mqtt_protocol_adapter_mock_subscribe_fails_first_time; struct aws_subscription_manager_test_fixture_options fixture_config = { .max_subscriptions = 3, @@ -1051,12 +1201,62 @@ static int s_rrsm_acquire_failure_subscribe_sync_failure_fn(struct aws_allocator return AWS_OP_SUCCESS; } -AWS_TEST_CASE(rrsm_acquire_failure_subscribe_sync_failure, s_rrsm_acquire_failure_subscribe_sync_failure_fn) +AWS_TEST_CASE( + rrsm_acquire_failure_subscribe_sync_failure_request, + s_rrsm_acquire_failure_subscribe_sync_failure_request_fn) + +/* + * Verify: Acquiring a new streaming subscription but synchronously failing the protocol adapter subscribe returns + * FAILURE Trying again also fails even though the mock subscribe succeeds after the first try. + */ +static int s_rrsm_acquire_failure_subscribe_sync_failure_streaming_fn(struct aws_allocator *allocator, void *ctx) { + (void)ctx; + + aws_mqtt_library_init(allocator); + + struct aws_mqtt_protocol_adapter_vtable failing_vtable = s_protocol_adapter_mock_vtable; + failing_vtable.aws_mqtt_protocol_adapter_subscribe_fn = s_aws_mqtt_protocol_adapter_mock_subscribe_fails_first_time; + + struct aws_subscription_manager_test_fixture_options fixture_config = { + .max_subscriptions = 3, + .operation_timeout_seconds = DEFAULT_SM_TEST_TIMEOUT, + .start_connected = true, + .adapter_vtable = &failing_vtable, + }; + + struct aws_subscription_manager_test_fixture fixture; + ASSERT_SUCCESS(s_aws_subscription_manager_test_fixture_init(&fixture, allocator, &fixture_config)); + + struct aws_rr_subscription_manager *manager = &fixture.subscription_manager; + + struct aws_rr_acquire_subscription_options acquire_options = { + .type = ARRST_EVENT_STREAM, + .topic_filter = aws_byte_cursor_from_c_str("hello/world"), + .operation_id = 1, + }; + ASSERT_INT_EQUALS(AASRT_FAILURE, aws_rr_subscription_manager_acquire_subscription(manager, &acquire_options)); + + struct aws_rr_acquire_subscription_options acquire_options2 = { + .type = ARRST_EVENT_STREAM, + .topic_filter = aws_byte_cursor_from_c_str("hello/world"), + .operation_id = 2, + }; + ASSERT_INT_EQUALS(AASRT_FAILURE, aws_rr_subscription_manager_acquire_subscription(manager, &acquire_options2)); + + s_aws_subscription_manager_test_fixture_clean_up(&fixture); + aws_mqtt_library_clean_up(); + + return AWS_OP_SUCCESS; +} + +AWS_TEST_CASE( + rrsm_acquire_failure_subscribe_sync_failure_streaming, + s_rrsm_acquire_failure_subscribe_sync_failure_streaming_fn) /* - * Verify: Completing a subscription-acquire with a failing reason code emits a subscription failed event + * Verify: Completing a request subscription-acquire with a failing reason code emits a subscription failed event */ -static int s_rrsm_acquire_subscribe_failure_event_fn(struct aws_allocator *allocator, void *ctx) { +static int s_rrsm_acquire_request_subscribe_failure_event_fn(struct aws_allocator *allocator, void *ctx) { (void)ctx; aws_mqtt_library_init(allocator); @@ -1083,7 +1283,7 @@ static int s_rrsm_acquire_subscribe_failure_event_fn(struct aws_allocator *alloc // verify subscribe failure event emission struct aws_subscription_status_record expected_subscription_event = { - .type = ARRSET_SUBSCRIPTION_SUBSCRIBE_FAILURE, + .type = ARRSET_REQUEST_SUBSCRIPTION_SUBSCRIBE_FAILURE, .topic_filter_cursor = aws_byte_cursor_from_c_str("hello/world"), .operation_id = 1, }; @@ -1096,9 +1296,87 @@ static int s_rrsm_acquire_subscribe_failure_event_fn(struct aws_allocator *alloc return AWS_OP_SUCCESS; } -AWS_TEST_CASE(rrsm_acquire_subscribe_failure_event, s_rrsm_acquire_subscribe_failure_event_fn) +AWS_TEST_CASE(rrsm_acquire_request_subscribe_failure_event, s_rrsm_acquire_request_subscribe_failure_event_fn) + +/* + * Verify: Completing a streaming subscription-acquire with a retryable failing failure triggers a resubscribe attempt + */ +static int s_rrsm_acquire_streaming_subscribe_failure_retryable_resubscribe_fn( + struct aws_allocator *allocator, + void *ctx) { + (void)ctx; -static int s_do_offline_acquire_online_test(struct aws_allocator *allocator, bool success) { + aws_mqtt_library_init(allocator); + + struct aws_subscription_manager_test_fixture fixture; + ASSERT_SUCCESS(s_aws_subscription_manager_test_fixture_init(&fixture, allocator, NULL)); + + struct aws_rr_subscription_manager *manager = &fixture.subscription_manager; + + struct aws_rr_acquire_subscription_options acquire1_options = { + .type = ARRST_EVENT_STREAM, + .topic_filter = aws_byte_cursor_from_c_str("hello/world"), + .operation_id = 1, + }; + ASSERT_INT_EQUALS(AASRT_SUBSCRIBING, aws_rr_subscription_manager_acquire_subscription(manager, &acquire1_options)); + + // complete the subscribe with a retryable failure + struct aws_protocol_adapter_subscription_event failed_subscription_event = { + .topic_filter = aws_byte_cursor_from_c_str("hello/world"), + .event_type = AWS_PASET_SUBSCRIBE, + .error_code = AWS_ERROR_MQTT_PROTOCOL_ADAPTER_FAILING_REASON_CODE, + .retryable = true, + }; + aws_rr_subscription_manager_on_protocol_adapter_subscription_event(manager, &failed_subscription_event); + + struct aws_protocol_adapter_api_record expected_subscribes[] = { + { + .type = PAAT_SUBSCRIBE, + .topic_filter_cursor = aws_byte_cursor_from_c_str("hello/world"), + .timeout = DEFAULT_SM_TEST_TIMEOUT, + }, + { + .type = PAAT_SUBSCRIBE, + .topic_filter_cursor = aws_byte_cursor_from_c_str("hello/world"), + .timeout = DEFAULT_SM_TEST_TIMEOUT, + }, + }; + + ASSERT_TRUE( + s_api_records_equals(fixture.mock_protocol_adapter, AWS_ARRAY_SIZE(expected_subscribes), expected_subscribes)); + + s_aws_subscription_manager_test_fixture_clean_up(&fixture); + aws_mqtt_library_clean_up(); + + return AWS_OP_SUCCESS; +} + +AWS_TEST_CASE( + rrsm_acquire_streaming_subscribe_failure_retryable_resubscribe, + s_rrsm_acquire_streaming_subscribe_failure_retryable_resubscribe_fn) + +static enum aws_rr_subscription_event_type s_compute_expected_subscription_event_offline_acquire_online( + enum aws_rr_subscription_type subscription_type, + bool success) { + if (subscription_type == ARRST_REQUEST_RESPONSE) { + if (success) { + return ARRSET_REQUEST_SUBSCRIPTION_SUBSCRIBE_SUCCESS; + } else { + return ARRSET_REQUEST_SUBSCRIPTION_SUBSCRIBE_FAILURE; + } + } else { + if (success) { + return ARRSET_STREAMING_SUBSCRIPTION_ESTABLISHED; + } else { + return ARRSET_STREAMING_SUBSCRIPTION_HALTED; + } + } +} + +static int s_do_offline_acquire_online_test( + struct aws_allocator *allocator, + enum aws_rr_subscription_type subscription_type, + bool success) { aws_mqtt_library_init(allocator); struct aws_subscription_manager_test_fixture_options fixture_config = { @@ -1113,7 +1391,7 @@ static int s_do_offline_acquire_online_test(struct aws_allocator *allocator, boo struct aws_rr_subscription_manager *manager = &fixture.subscription_manager; struct aws_rr_acquire_subscription_options acquire_options = { - .type = ARRST_REQUEST_RESPONSE, + .type = subscription_type, .topic_filter = aws_byte_cursor_from_c_str("hello/world"), .operation_id = 1, }; @@ -1147,7 +1425,7 @@ static int s_do_offline_acquire_online_test(struct aws_allocator *allocator, boo aws_rr_subscription_manager_on_protocol_adapter_subscription_event(manager, &subscription_event); struct aws_subscription_status_record expected_subscription_event = { - .type = success ? ARRSET_SUBSCRIPTION_SUBSCRIBE_SUCCESS : ARRSET_SUBSCRIPTION_SUBSCRIBE_FAILURE, + .type = s_compute_expected_subscription_event_offline_acquire_online(subscription_type, success), .topic_filter_cursor = aws_byte_cursor_from_c_str("hello/world"), .operation_id = 1, }; @@ -1160,36 +1438,56 @@ static int s_do_offline_acquire_online_test(struct aws_allocator *allocator, boo } /* - * Verify: Acquiring a new subscription while offline returns SUBSCRIBING. Going online triggers a protocol adapter - * subscribe. Completing the subscription successfully emits a subscribe success event. + * Verify: Acquiring a new request subscription while offline returns SUBSCRIBING. Going online triggers a protocol + * adapter subscribe. Completing the subscription successfully emits a request subscribe success event. */ -static int s_rrsm_offline_acquire_online_success_fn(struct aws_allocator *allocator, void *ctx) { +static int s_rrsm_offline_acquire_request_online_success_fn(struct aws_allocator *allocator, void *ctx) { (void)ctx; - return s_do_offline_acquire_online_test(allocator, true); + return s_do_offline_acquire_online_test(allocator, ARRST_REQUEST_RESPONSE, true); } -AWS_TEST_CASE(rrsm_offline_acquire_online_success, s_rrsm_offline_acquire_online_success_fn) +AWS_TEST_CASE(rrsm_offline_acquire_request_online_success, s_rrsm_offline_acquire_request_online_success_fn) /* - * Verify: Acquiring a new subscription while offline returns SUBSCRIBING. Going online triggers a protocol adapter - * subscribe. Completing the subscription with a failure emits a subscribe failure event. + * Verify: Acquiring a new request subscription while offline returns SUBSCRIBING. Going online triggers a protocol + * adapter subscribe. Completing the subscription with a failure emits a request subscribe failure event. */ -static int s_rrsm_offline_acquire_online_failure_fn(struct aws_allocator *allocator, void *ctx) { +static int s_rrsm_offline_acquire_request_online_failure_fn(struct aws_allocator *allocator, void *ctx) { (void)ctx; - return s_do_offline_acquire_online_test(allocator, false); + return s_do_offline_acquire_online_test(allocator, ARRST_REQUEST_RESPONSE, false); } -AWS_TEST_CASE(rrsm_offline_acquire_online_failure, s_rrsm_offline_acquire_online_failure_fn) +AWS_TEST_CASE(rrsm_offline_acquire_request_online_failure, s_rrsm_offline_acquire_request_online_failure_fn) /* - * Verify: Acquiring and releasing a subscription while offline and then going online should remove the - * subscription without invoking any protocol adapter APIs. + * Verify: Acquiring a new steaming subscription while offline returns SUBSCRIBING. Going online triggers a protocol + * adapter subscribe. Completing the subscription successfully emits a streaming subscription established event. + */ +static int s_rrsm_offline_acquire_streaming_online_success_fn(struct aws_allocator *allocator, void *ctx) { + (void)ctx; + + return s_do_offline_acquire_online_test(allocator, ARRST_EVENT_STREAM, true); +} + +AWS_TEST_CASE(rrsm_offline_acquire_streaming_online_success, s_rrsm_offline_acquire_streaming_online_success_fn) + +/* + * Verify: Acquiring a new request subscription while offline returns SUBSCRIBING. Going online triggers a protocol + * adapter subscribe. Completing the subscription with a failure emits a streaming subscription halted event. */ -static int s_rrsm_offline_acquire_release_online_fn(struct aws_allocator *allocator, void *ctx) { +static int s_rrsm_offline_acquire_streaming_online_failure_fn(struct aws_allocator *allocator, void *ctx) { (void)ctx; + return s_do_offline_acquire_online_test(allocator, ARRST_EVENT_STREAM, false); +} + +AWS_TEST_CASE(rrsm_offline_acquire_streaming_online_failure, s_rrsm_offline_acquire_streaming_online_failure_fn) + +static int s_do_offline_acquire_release_online_test( + struct aws_allocator *allocator, + enum aws_rr_subscription_type subscription_type) { aws_mqtt_library_init(allocator); struct aws_subscription_manager_test_fixture fixture; @@ -1205,7 +1503,7 @@ static int s_rrsm_offline_acquire_release_online_fn(struct aws_allocator *alloca // acquire struct aws_rr_acquire_subscription_options acquire_options = { - .type = ARRST_REQUEST_RESPONSE, + .type = subscription_type, .topic_filter = aws_byte_cursor_from_c_str("hello/world"), .operation_id = 1, }; @@ -1231,7 +1529,7 @@ static int s_rrsm_offline_acquire_release_online_fn(struct aws_allocator *alloca // trigger a different subscription, verify it's the only thing that has reached the protocol adapter struct aws_rr_acquire_subscription_options acquire2_options = { - .type = ARRST_REQUEST_RESPONSE, + .type = subscription_type, .topic_filter = aws_byte_cursor_from_c_str("hello/world2"), .operation_id = 2, }; @@ -1252,16 +1550,34 @@ static int s_rrsm_offline_acquire_release_online_fn(struct aws_allocator *alloca return AWS_OP_SUCCESS; } -AWS_TEST_CASE(rrsm_offline_acquire_release_online, s_rrsm_offline_acquire_release_online_fn) +/* + * Verify: Acquiring and releasing a subscription while offline and then going online should remove the + * subscription without invoking any protocol adapter APIs. + */ +static int s_rrsm_offline_acquire_release_request_online_fn(struct aws_allocator *allocator, void *ctx) { + (void)ctx; + + return s_do_offline_acquire_release_online_test(allocator, ARRST_REQUEST_RESPONSE); +} + +AWS_TEST_CASE(rrsm_offline_acquire_release_request_online, s_rrsm_offline_acquire_release_request_online_fn) /* - * Verify: Releasing an active subscription while offline should not invoke an unsubscribe until back online + * Verify: Acquiring and releasing a subscription while offline and then going online should remove the + * subscription without invoking any protocol adapter APIs. */ -static int s_rrsm_acquire_success_offline_release_acquire2_no_unsubscribe_fn( - struct aws_allocator *allocator, - void *ctx) { +static int s_rrsm_offline_acquire_release_streaming_online_fn(struct aws_allocator *allocator, void *ctx) { (void)ctx; + return s_do_offline_acquire_release_online_test(allocator, ARRST_EVENT_STREAM); +} + +AWS_TEST_CASE(rrsm_offline_acquire_release_streaming_online, s_rrsm_offline_acquire_release_streaming_online_fn) + +static int s_do_acquire_success_offline_release_acquire2_no_unsubscribe_test( + struct aws_allocator *allocator, + enum aws_rr_subscription_type subscription_type) { + aws_mqtt_library_init(allocator); struct aws_subscription_manager_test_fixture fixture; @@ -1271,7 +1587,7 @@ static int s_rrsm_acquire_success_offline_release_acquire2_no_unsubscribe_fn( // acquire struct aws_rr_acquire_subscription_options acquire_options = { - .type = ARRST_REQUEST_RESPONSE, + .type = subscription_type, .topic_filter = aws_byte_cursor_from_c_str("hello/world"), .operation_id = 1, }; @@ -1286,7 +1602,8 @@ static int s_rrsm_acquire_success_offline_release_acquire2_no_unsubscribe_fn( aws_rr_subscription_manager_on_protocol_adapter_subscription_event(manager, &subscription_event); struct aws_subscription_status_record expected_subscription_event = { - .type = ARRSET_SUBSCRIPTION_SUBSCRIBE_SUCCESS, + .type = subscription_type == ARRST_REQUEST_RESPONSE ? ARRSET_REQUEST_SUBSCRIPTION_SUBSCRIBE_SUCCESS + : ARRSET_STREAMING_SUBSCRIPTION_ESTABLISHED, .topic_filter_cursor = aws_byte_cursor_from_c_str("hello/world"), .operation_id = 1, }; @@ -1307,7 +1624,7 @@ static int s_rrsm_acquire_success_offline_release_acquire2_no_unsubscribe_fn( // acquire something different, normally that triggers an unsubscribe, but we're offline struct aws_rr_acquire_subscription_options acquire2_options = { - .type = ARRST_REQUEST_RESPONSE, + .type = subscription_type, .topic_filter = aws_byte_cursor_from_c_str("hello/world2"), .operation_id = 2, }; @@ -1336,14 +1653,42 @@ static int s_rrsm_acquire_success_offline_release_acquire2_no_unsubscribe_fn( return AWS_OP_SUCCESS; } +/* + * Verify: Releasing an active request subscription while offline should not invoke an unsubscribe until back online + */ +static int s_rrsm_acquire_request_success_offline_release_acquire2_no_unsubscribe_fn( + struct aws_allocator *allocator, + void *ctx) { + (void)ctx; + + return s_do_acquire_success_offline_release_acquire2_no_unsubscribe_test(allocator, ARRST_REQUEST_RESPONSE); +} + +AWS_TEST_CASE( + rrsm_acquire_request_success_offline_release_acquire2_no_unsubscribe, + s_rrsm_acquire_request_success_offline_release_acquire2_no_unsubscribe_fn) + +/* + * Verify: Releasing an active streaming subscription while offline should not invoke an unsubscribe until back online + */ +static int s_rrsm_acquire_streaming_success_offline_release_acquire2_no_unsubscribe_fn( + struct aws_allocator *allocator, + void *ctx) { + (void)ctx; + + return s_do_acquire_success_offline_release_acquire2_no_unsubscribe_test(allocator, ARRST_REQUEST_RESPONSE); +} + AWS_TEST_CASE( - rrsm_acquire_success_offline_release_acquire2_no_unsubscribe, - s_rrsm_acquire_success_offline_release_acquire2_no_unsubscribe_fn) + rrsm_acquire_streaming_success_offline_release_acquire2_no_unsubscribe, + s_rrsm_acquire_streaming_success_offline_release_acquire2_no_unsubscribe_fn) static int s_do_rrsm_acquire_clean_up_test( struct aws_allocator *allocator, + enum aws_rr_subscription_type subscription_type, bool complete_subscribe, bool clean_up_while_connected) { + aws_mqtt_library_init(allocator); struct aws_subscription_manager_test_fixture fixture; @@ -1353,7 +1698,7 @@ static int s_do_rrsm_acquire_clean_up_test( // acquire struct aws_rr_acquire_subscription_options acquire_options = { - .type = ARRST_REQUEST_RESPONSE, + .type = subscription_type, .topic_filter = aws_byte_cursor_from_c_str("hello/world"), .operation_id = 1, }; @@ -1369,7 +1714,8 @@ static int s_do_rrsm_acquire_clean_up_test( aws_rr_subscription_manager_on_protocol_adapter_subscription_event(manager, &subscription_event); struct aws_subscription_status_record expected_subscription_event = { - .type = ARRSET_SUBSCRIPTION_SUBSCRIBE_SUCCESS, + .type = subscription_type == ARRST_REQUEST_RESPONSE ? ARRSET_REQUEST_SUBSCRIPTION_SUBSCRIBE_SUCCESS + : ARRSET_STREAMING_SUBSCRIPTION_ESTABLISHED, .topic_filter_cursor = aws_byte_cursor_from_c_str("hello/world"), .operation_id = 1, }; @@ -1413,45 +1759,92 @@ static int s_do_rrsm_acquire_clean_up_test( } /* - * Verify: Calling clean up while a subscription is active triggers an immediate unsubscribe + * Verify: Calling clean up while a request subscription is active triggers an immediate unsubscribe + */ +static int s_rrsm_acquire_request_success_clean_up_unsubscribe_override_fn(struct aws_allocator *allocator, void *ctx) { + (void)ctx; + + return s_do_rrsm_acquire_clean_up_test(allocator, ARRST_REQUEST_RESPONSE, true, true); +} + +AWS_TEST_CASE( + rrsm_acquire_request_success_clean_up_unsubscribe_override, + s_rrsm_acquire_request_success_clean_up_unsubscribe_override_fn) + +/* + * Verify: Calling clean up while a streaming subscription is active triggers an immediate unsubscribe + */ +static int s_rrsm_acquire_streaming_success_clean_up_unsubscribe_override_fn( + struct aws_allocator *allocator, + void *ctx) { + (void)ctx; + + return s_do_rrsm_acquire_clean_up_test(allocator, ARRST_EVENT_STREAM, true, true); +} + +AWS_TEST_CASE( + rrsm_acquire_streaming_success_clean_up_unsubscribe_override, + s_rrsm_acquire_streaming_success_clean_up_unsubscribe_override_fn) + +/* + * Verify: Calling clean up while a request subscription is pending triggers an immediate unsubscribe */ -static int s_rrsm_acquire_success_clean_up_unsubscribe_override_fn(struct aws_allocator *allocator, void *ctx) { +static int s_rrsm_acquire_request_pending_clean_up_unsubscribe_override_fn(struct aws_allocator *allocator, void *ctx) { (void)ctx; - return s_do_rrsm_acquire_clean_up_test(allocator, true, true); + return s_do_rrsm_acquire_clean_up_test(allocator, ARRST_REQUEST_RESPONSE, false, true); } AWS_TEST_CASE( - rrsm_acquire_success_clean_up_unsubscribe_override, - s_rrsm_acquire_success_clean_up_unsubscribe_override_fn) + rrsm_acquire_request_pending_clean_up_unsubscribe_override, + s_rrsm_acquire_request_pending_clean_up_unsubscribe_override_fn) /* - * Verify: Calling clean up while a subscription is pending triggers an immediate unsubscribe + * Verify: Calling clean up while a streaming subscription is pending triggers an immediate unsubscribe */ -static int s_rrsm_acquire_pending_clean_up_unsubscribe_override_fn(struct aws_allocator *allocator, void *ctx) { +static int s_rrsm_acquire_streaming_pending_clean_up_unsubscribe_override_fn( + struct aws_allocator *allocator, + void *ctx) { (void)ctx; - return s_do_rrsm_acquire_clean_up_test(allocator, false, true); + return s_do_rrsm_acquire_clean_up_test(allocator, ARRST_EVENT_STREAM, false, true); } AWS_TEST_CASE( - rrsm_acquire_pending_clean_up_unsubscribe_override, - s_rrsm_acquire_pending_clean_up_unsubscribe_override_fn) + rrsm_acquire_streaming_pending_clean_up_unsubscribe_override, + s_rrsm_acquire_streaming_pending_clean_up_unsubscribe_override_fn) /* - * Verify: Calling clean up while offline and a subscription is pending triggers an immediate unsubscribe + * Verify: Calling clean up while offline and a request subscription is pending triggers an immediate unsubscribe */ -static int s_rrsm_offline_acquire_pending_clean_up_unsubscribe_override_fn(struct aws_allocator *allocator, void *ctx) { +static int s_rrsm_offline_acquire_request_pending_clean_up_unsubscribe_override_fn( + struct aws_allocator *allocator, + void *ctx) { + (void)ctx; + + return s_do_rrsm_acquire_clean_up_test(allocator, ARRST_REQUEST_RESPONSE, false, false); +} + +AWS_TEST_CASE( + rrsm_offline_acquire_request_pending_clean_up_unsubscribe_override, + s_rrsm_offline_acquire_request_pending_clean_up_unsubscribe_override_fn) + +/* + * Verify: Calling clean up while offline and a streaming subscription is pending triggers an immediate unsubscribe + */ +static int s_rrsm_offline_acquire_streaming_pending_clean_up_unsubscribe_override_fn( + struct aws_allocator *allocator, + void *ctx) { (void)ctx; - return s_do_rrsm_acquire_clean_up_test(allocator, false, false); + return s_do_rrsm_acquire_clean_up_test(allocator, ARRST_EVENT_STREAM, false, false); } AWS_TEST_CASE( - rrsm_offline_acquire_pending_clean_up_unsubscribe_override, - s_rrsm_offline_acquire_pending_clean_up_unsubscribe_override_fn) + rrsm_offline_acquire_streaming_pending_clean_up_unsubscribe_override, + s_rrsm_offline_acquire_streaming_pending_clean_up_unsubscribe_override_fn) -static int s_rrsm_do_no_session_subscription_lost_test( +static int s_rrsm_do_no_session_subscription_ended_test( struct aws_allocator *allocator, bool offline_while_unsubscribing) { aws_mqtt_library_init(allocator); @@ -1478,7 +1871,7 @@ static int s_rrsm_do_no_session_subscription_lost_test( aws_rr_subscription_manager_on_protocol_adapter_subscription_event(manager, &subscription_event); struct aws_subscription_status_record expected_subscription_event = { - .type = ARRSET_SUBSCRIPTION_SUBSCRIBE_SUCCESS, + .type = ARRSET_REQUEST_SUBSCRIPTION_SUBSCRIBE_SUCCESS, .topic_filter_cursor = aws_byte_cursor_from_c_str("hello/world"), .operation_id = 1, }; @@ -1527,12 +1920,12 @@ static int s_rrsm_do_no_session_subscription_lost_test( // verify subscription lost emitted if (!offline_while_unsubscribing) { - struct aws_subscription_status_record expected_subscription_lost_event = { - .type = ARRSET_SUBSCRIPTION_ENDED, + struct aws_subscription_status_record expected_subscription_ended_event = { + .type = ARRSET_REQUEST_SUBSCRIPTION_SUBSCRIPTION_ENDED, .topic_filter_cursor = aws_byte_cursor_from_c_str("hello/world"), .operation_id = 1, }; - ASSERT_TRUE(s_contains_subscription_event_record(&fixture, &expected_subscription_lost_event)); + ASSERT_TRUE(s_contains_subscription_event_record(&fixture, &expected_subscription_ended_event)); } // if we were unsubscribing, verify reacquire is blocked and then complete the unsubscribe @@ -1563,29 +1956,116 @@ static int s_rrsm_do_no_session_subscription_lost_test( } /* - * Verify: If the client fails to rejoin a session, a SUBSCRIPTION_ENDED event is emitted for active subscriptions and - * that subscription can successfully be reacquired + * Verify: If the client fails to rejoin a session, a SUBSCRIPTION_ENDED event is emitted for active request + * subscriptions and that subscription can successfully be reacquired */ -static int s_rrsm_acquire_success_offline_online_no_session_subscription_lost_can_reacquire_fn( +static int s_rrsm_acquire_request_success_offline_online_no_session_subscription_ended_can_reacquire_fn( struct aws_allocator *allocator, void *ctx) { (void)ctx; - return s_rrsm_do_no_session_subscription_lost_test(allocator, false); + return s_rrsm_do_no_session_subscription_ended_test(allocator, false); } AWS_TEST_CASE( - rrsm_acquire_success_offline_online_no_session_subscription_lost_can_reacquire, - s_rrsm_acquire_success_offline_online_no_session_subscription_lost_can_reacquire_fn) + rrsm_acquire_request_success_offline_online_no_session_subscription_ended_can_reacquire, + s_rrsm_acquire_request_success_offline_online_no_session_subscription_ended_can_reacquire_fn) /* * Verify: If the client fails to rejoin a session, a SUBSCRIPTION_ENDED event is emitted for unsubscribing - * subscriptions + * request subscriptions */ -static int s_rrsm_subscription_lost_while_unsubscribing_fn(struct aws_allocator *allocator, void *ctx) { +static int s_rrsm_request_subscription_ended_while_unsubscribing_fn(struct aws_allocator *allocator, void *ctx) { (void)ctx; - return s_rrsm_do_no_session_subscription_lost_test(allocator, true); + return s_rrsm_do_no_session_subscription_ended_test(allocator, true); +} + +AWS_TEST_CASE( + rrsm_request_subscription_ended_while_unsubscribing, + s_rrsm_request_subscription_ended_while_unsubscribing_fn) + +/* + * Verify: If the client fails to rejoin a session, a SUBSCRIPTION_LOST event is emitted for streaming subscriptions, + * and a resubscribe is triggered + */ +static int s_rrsm_streaming_subscription_lost_resubscribe_on_no_session_fn(struct aws_allocator *allocator, void *ctx) { + (void)ctx; + + aws_mqtt_library_init(allocator); + + struct aws_subscription_manager_test_fixture fixture; + ASSERT_SUCCESS(s_aws_subscription_manager_test_fixture_init(&fixture, allocator, NULL)); + + struct aws_rr_subscription_manager *manager = &fixture.subscription_manager; + + // acquire + struct aws_rr_acquire_subscription_options acquire_options = { + .type = ARRST_EVENT_STREAM, + .topic_filter = aws_byte_cursor_from_c_str("hello/world"), + .operation_id = 1, + }; + ASSERT_INT_EQUALS(AASRT_SUBSCRIBING, aws_rr_subscription_manager_acquire_subscription(manager, &acquire_options)); + + // successfully complete subscription + struct aws_protocol_adapter_subscription_event subscription_event = { + .event_type = AWS_PASET_SUBSCRIBE, + .topic_filter = aws_byte_cursor_from_c_str("hello/world"), + .error_code = AWS_ERROR_SUCCESS, + }; + aws_rr_subscription_manager_on_protocol_adapter_subscription_event(manager, &subscription_event); + + struct aws_subscription_status_record expected_subscription_event = { + .type = ARRSET_STREAMING_SUBSCRIPTION_ESTABLISHED, + .topic_filter_cursor = aws_byte_cursor_from_c_str("hello/world"), + .operation_id = 1, + }; + ASSERT_TRUE(s_contains_subscription_event_record(&fixture, &expected_subscription_event)); + + // online -> offline + struct aws_protocol_adapter_connection_event offline_event = { + .event_type = AWS_PACET_DISCONNECTED, + }; + aws_rr_subscription_manager_on_protocol_adapter_connection_event(manager, &offline_event); + + // offline -> online (no session) + struct aws_protocol_adapter_connection_event online_event = { + .event_type = AWS_PACET_CONNECTED, + .joined_session = false, + }; + aws_rr_subscription_manager_on_protocol_adapter_connection_event(manager, &online_event); + + // verify subscription lost on rejoin + struct aws_subscription_status_record expected_subscription_ended_event = { + .type = ARRSET_STREAMING_SUBSCRIPTION_LOST, + .topic_filter_cursor = aws_byte_cursor_from_c_str("hello/world"), + .operation_id = 1, + }; + ASSERT_TRUE(s_contains_subscription_event_record(&fixture, &expected_subscription_ended_event)); + + // verify resubscribe submitted to the protocol adapter + struct aws_protocol_adapter_api_record expected_subscribes[] = { + { + .type = PAAT_SUBSCRIBE, + .topic_filter_cursor = aws_byte_cursor_from_c_str("hello/world"), + .timeout = DEFAULT_SM_TEST_TIMEOUT, + }, + { + .type = PAAT_SUBSCRIBE, + .topic_filter_cursor = aws_byte_cursor_from_c_str("hello/world"), + .timeout = DEFAULT_SM_TEST_TIMEOUT, + }, + }; + + ASSERT_TRUE( + s_api_records_equals(fixture.mock_protocol_adapter, AWS_ARRAY_SIZE(expected_subscribes), expected_subscribes)); + + s_aws_subscription_manager_test_fixture_clean_up(&fixture); + aws_mqtt_library_clean_up(); + + return AWS_OP_SUCCESS; } -AWS_TEST_CASE(rrsm_subscription_lost_while_unsubscribing, s_rrsm_subscription_lost_while_unsubscribing_fn) +AWS_TEST_CASE( + rrsm_streaming_subscription_lost_resubscribe_on_no_session, + s_rrsm_streaming_subscription_lost_resubscribe_on_no_session_fn)