diff --git a/source/request-response/request_response_client.c b/source/request-response/request_response_client.c index 9a7af51a..8ca6d88a 100644 --- a/source/request-response/request_response_client.c +++ b/source/request-response/request_response_client.c @@ -632,6 +632,8 @@ static struct aws_mqtt_request_response_client *s_aws_mqtt_request_response_clie NULL, NULL); + aws_linked_list_init(&rr_client->operation_queue); + aws_task_init( &rr_client->external_shutdown_task, s_mqtt_request_response_client_external_shutdown_task_fn, @@ -840,6 +842,14 @@ static bool s_are_request_operation_options_valid( AWS_BYTE_CURSOR_PRI(path->topic)); return false; } + + if (path->correlation_token_json_path.len == 0) { + AWS_LOGF_ERROR( + AWS_LS_MQTT_REQUEST_RESPONSE, + "(%p) rr client request options - empty correlation token json path", + (void *)client); + return false; + } } if (request_options->correlation_token.len == 0) { diff --git a/tests/CMakeLists.txt b/tests/CMakeLists.txt index 4b9141b8..92e89030 100644 --- a/tests/CMakeLists.txt +++ b/tests/CMakeLists.txt @@ -511,6 +511,17 @@ add_test_case(rrsm_subscription_lost_while_unsubscribing) add_test_case(rrc_mqtt5_create_destroy) add_test_case(rrc_mqtt311_create_destroy) +add_test_case(rrc_submit_request_operation_failure_no_response_paths) +add_test_case(rrc_submit_request_operation_failure_invalid_response_topic) +add_test_case(rrc_submit_request_operation_failure_invalid_response_correlation_token_path) +add_test_case(rrc_submit_request_operation_failure_no_correlation_token) +add_test_case(rrc_submit_request_operation_failure_invalid_publish_topic) +add_test_case(rrc_submit_request_operation_failure_empty_request) +add_test_case(rrc_submit_streaming_operation_failure_invalid_subscription_topic_filter) + +add_test_case(rrc_submit_request_operation_failure_by_shutdown) +add_test_case(rrc_submit_streaming_operation_and_shutdown) + generate_test_driver(${PROJECT_NAME}-tests) set(TEST_PAHO_CLIENT_BINARY_NAME ${PROJECT_NAME}-paho-client) diff --git a/tests/request-response/request_response_client_tests.c b/tests/request-response/request_response_client_tests.c index 633ee42b..99a67e7e 100644 --- a/tests/request-response/request_response_client_tests.c +++ b/tests/request-response/request_response_client_tests.c @@ -3,6 +3,7 @@ * SPDX-License-Identifier: Apache-2.0. */ +#include #include #include @@ -34,8 +35,333 @@ struct aws_rr_client_test_fixture { bool client_initialized; bool client_destroyed; + + struct aws_hash_table request_response_records; + struct aws_hash_table streaming_records; }; +struct aws_rr_client_fixture_request_response_record { + struct aws_allocator *allocator; + + struct aws_rr_client_test_fixture *fixture; + + struct aws_byte_cursor payload_cursor; + + struct aws_byte_buf payload; + + bool completed; + int error_code; + struct aws_byte_buf response; +}; + +struct aws_rr_client_fixture_request_response_record *s_aws_rr_client_fixture_request_response_record_new( + struct aws_allocator *allocator, + struct aws_rr_client_test_fixture *fixture, + struct aws_byte_cursor request_payload) { + struct aws_rr_client_fixture_request_response_record *record = + aws_mem_calloc(allocator, 1, sizeof(struct aws_rr_client_fixture_request_response_record)); + + record->allocator = allocator; + record->fixture = fixture; + + aws_byte_buf_init_copy_from_cursor(&record->payload, allocator, request_payload); + record->payload_cursor = aws_byte_cursor_from_buf(&record->payload); + + return record; +} + +void s_aws_rr_client_fixture_request_response_record_delete( + struct aws_rr_client_fixture_request_response_record *record) { + aws_byte_buf_clean_up(&record->payload); + aws_byte_buf_clean_up(&record->response); + + aws_mem_release(record->allocator, record); +} + +static void s_aws_rr_client_fixture_request_response_record_hash_destroy(void *element) { + struct aws_rr_client_fixture_request_response_record *record = element; + + s_aws_rr_client_fixture_request_response_record_delete(record); +} + +static void s_rrc_fixture_request_completion_callback( + struct aws_byte_cursor *payload, + int error_code, + void *user_data) { + struct aws_rr_client_fixture_request_response_record *record = user_data; + struct aws_rr_client_test_fixture *fixture = record->fixture; + + aws_mutex_lock(&fixture->lock); + + if (payload != NULL) { + AWS_FATAL_ASSERT(error_code == AWS_ERROR_SUCCESS); + + aws_byte_buf_init_copy_from_cursor(&record->response, fixture->allocator, *payload); + } else { + AWS_FATAL_ASSERT(error_code != AWS_ERROR_SUCCESS); + record->error_code = error_code; + } + + record->completed = true; + + aws_mutex_unlock(&fixture->lock); + aws_condition_variable_notify_all(&fixture->signal); +} + +static struct aws_rr_client_fixture_request_response_record *s_rrc_fixture_add_request_record( + struct aws_rr_client_test_fixture *fixture, + struct aws_byte_cursor request_payload) { + struct aws_rr_client_fixture_request_response_record *record = + s_aws_rr_client_fixture_request_response_record_new(fixture->allocator, fixture, request_payload); + + aws_hash_table_put(&fixture->request_response_records, &record->payload_cursor, record, NULL); + + return record; +} + +struct rrc_operation_completion_context { + struct aws_byte_cursor key; + struct aws_rr_client_test_fixture *fixture; +}; + +static bool s_is_request_complete(void *context) { + struct rrc_operation_completion_context *completion_context = context; + + struct aws_hash_element *element = NULL; + aws_hash_table_find(&completion_context->fixture->request_response_records, &completion_context->key, &element); + + AWS_FATAL_ASSERT(element != NULL && element->value != NULL); + + struct aws_rr_client_fixture_request_response_record *record = element->value; + + return record->completed; +} + +static void s_rrc_wait_on_request_completion( + struct aws_rr_client_test_fixture *fixture, + struct aws_byte_cursor request_payload) { + struct rrc_operation_completion_context context = { + .key = request_payload, + .fixture = fixture, + }; + + aws_mutex_lock(&fixture->lock); + aws_condition_variable_wait_pred(&fixture->signal, &fixture->lock, s_is_request_complete, &context); + aws_mutex_unlock(&fixture->lock); +} + +static int s_rrc_verify_request_completion( + struct aws_rr_client_test_fixture *fixture, + struct aws_byte_cursor request_payload, + int expected_error_code, + struct aws_byte_cursor *expected_response) { + aws_mutex_lock(&fixture->lock); + + struct aws_hash_element *element = NULL; + aws_hash_table_find(&fixture->request_response_records, &request_payload, &element); + + AWS_FATAL_ASSERT(element != NULL && element->value != NULL); + + struct aws_rr_client_fixture_request_response_record *record = element->value; + + ASSERT_INT_EQUALS(expected_error_code, record->error_code); + + if (expected_response != NULL) { + struct aws_byte_cursor actual_payload = aws_byte_cursor_from_buf(&record->response); + ASSERT_TRUE(aws_byte_cursor_eq(expected_response, &actual_payload)); + } else { + ASSERT_INT_EQUALS(0, record->response.len); + } + + aws_mutex_unlock(&fixture->lock); + + return AWS_OP_SUCCESS; +} + +struct aws_rr_client_fixture_streaming_record { + struct aws_allocator *allocator; + + struct aws_rr_client_test_fixture *fixture; + + struct aws_byte_cursor record_key_cursor; + struct aws_byte_buf record_key; + + struct aws_array_list publishes; + struct aws_array_list subscription_events; + + bool terminated; +}; + +struct aws_rr_client_fixture_streaming_record_subscription_event { + enum aws_rr_subscription_event_type status; + int error_code; +}; + +struct aws_rr_client_fixture_streaming_record *s_aws_rr_client_fixture_streaming_record_new( + struct aws_allocator *allocator, + struct aws_rr_client_test_fixture *fixture, + struct aws_byte_cursor record_key) { + struct aws_rr_client_fixture_streaming_record *record = + aws_mem_calloc(allocator, 1, sizeof(struct aws_rr_client_fixture_streaming_record)); + + record->allocator = allocator; + record->fixture = fixture; + + aws_byte_buf_init_copy_from_cursor(&record->record_key, allocator, record_key); + record->record_key_cursor = aws_byte_cursor_from_buf(&record->record_key); + + aws_array_list_init_dynamic(&record->publishes, allocator, 10, sizeof(struct aws_byte_buf)); + aws_array_list_init_dynamic( + &record->subscription_events, + allocator, + 10, + sizeof(struct aws_rr_client_fixture_streaming_record_subscription_event)); + + return record; +} + +void s_aws_rr_client_fixture_streaming_record_delete(struct aws_rr_client_fixture_streaming_record *record) { + aws_byte_buf_clean_up(&record->record_key); + + size_t publish_count = aws_array_list_length(&record->publishes); + for (size_t i = 0; i < publish_count; ++i) { + struct aws_byte_buf publish_payload; + aws_array_list_get_at(&record->publishes, &publish_payload, i); + + aws_byte_buf_clean_up(&publish_payload); + } + + aws_array_list_clean_up(&record->publishes); + aws_array_list_clean_up(&record->subscription_events); + + aws_mem_release(record->allocator, record); +} + +static void s_aws_rr_client_fixture_streaming_record_hash_destroy(void *element) { + struct aws_rr_client_fixture_streaming_record *record = element; + + s_aws_rr_client_fixture_streaming_record_delete(record); +} + +static void s_rrc_fixture_streaming_operation_subscription_status_callback( + enum aws_rr_subscription_event_type status, + int error_code, + void *user_data) { + + struct aws_rr_client_fixture_streaming_record *record = user_data; + struct aws_rr_client_test_fixture *fixture = record->fixture; + + aws_mutex_lock(&fixture->lock); + + struct aws_rr_client_fixture_streaming_record_subscription_event event = { + .status = status, + .error_code = error_code, + }; + aws_array_list_push_back(&record->subscription_events, &event); + + aws_mutex_unlock(&fixture->lock); + aws_condition_variable_notify_all(&fixture->signal); +} + +static void s_rrc_fixture_streaming_operation_incoming_publish_callback( + struct aws_byte_cursor payload, + void *user_data) { + struct aws_rr_client_fixture_streaming_record *record = user_data; + struct aws_rr_client_test_fixture *fixture = record->fixture; + + aws_mutex_lock(&fixture->lock); + + struct aws_byte_buf payload_buffer; + aws_byte_buf_init_copy_from_cursor(&payload_buffer, fixture->allocator, payload); + + aws_array_list_push_back(&record->publishes, &payload_buffer); + + aws_mutex_unlock(&fixture->lock); + aws_condition_variable_notify_all(&fixture->signal); +} + +static void s_rrc_fixture_streaming_operation_terminated_callback(void *user_data) { + struct aws_rr_client_fixture_streaming_record *record = user_data; + struct aws_rr_client_test_fixture *fixture = record->fixture; + + aws_mutex_lock(&fixture->lock); + + record->terminated = true; + + aws_mutex_unlock(&fixture->lock); + aws_condition_variable_notify_all(&fixture->signal); +} + +static struct aws_rr_client_fixture_streaming_record *s_rrc_fixture_add_streaming_record( + struct aws_rr_client_test_fixture *fixture, + struct aws_byte_cursor key) { + struct aws_rr_client_fixture_streaming_record *record = + s_aws_rr_client_fixture_streaming_record_new(fixture->allocator, fixture, key); + + aws_hash_table_put(&fixture->streaming_records, &record->record_key, record, NULL); + + return record; +} + +static bool s_is_stream_terminated(void *context) { + struct rrc_operation_completion_context *completion_context = context; + + struct aws_hash_element *element = NULL; + aws_hash_table_find(&completion_context->fixture->streaming_records, &completion_context->key, &element); + + AWS_FATAL_ASSERT(element != NULL && element->value != NULL); + + struct aws_rr_client_fixture_streaming_record *record = element->value; + + return record->terminated; +} + +static void s_rrc_wait_on_streaming_termination( + struct aws_rr_client_test_fixture *fixture, + struct aws_byte_cursor key) { + struct rrc_operation_completion_context context = { + .key = key, + .fixture = fixture, + }; + + aws_mutex_lock(&fixture->lock); + aws_condition_variable_wait_pred(&fixture->signal, &fixture->lock, s_is_stream_terminated, &context); + aws_mutex_unlock(&fixture->lock); +} + +static int s_rrc_verify_streaming_record_subscription_events( + struct aws_rr_client_test_fixture *fixture, + struct aws_byte_cursor key, + size_t expected_subscription_event_count, + struct aws_rr_client_fixture_streaming_record_subscription_event *expected_subscription_events) { + aws_mutex_lock(&fixture->lock); + + struct aws_hash_element *element = NULL; + aws_hash_table_find(&fixture->streaming_records, &key, &element); + + AWS_FATAL_ASSERT(element != NULL && element->value != NULL); + + struct aws_rr_client_fixture_streaming_record *record = element->value; + + size_t actual_subscription_event_count = aws_array_list_length(&record->subscription_events); + ASSERT_INT_EQUALS(expected_subscription_event_count, actual_subscription_event_count); + + for (size_t i = 0; i < actual_subscription_event_count; ++i) { + struct aws_rr_client_fixture_streaming_record_subscription_event actual_event; + aws_array_list_get_at(&record->subscription_events, &actual_event, i); + + struct aws_rr_client_fixture_streaming_record_subscription_event *expected_event = + &expected_subscription_events[i]; + + ASSERT_INT_EQUALS(expected_event->status, actual_event.status); + ASSERT_INT_EQUALS(expected_event->error_code, actual_event.error_code); + } + + aws_mutex_unlock(&fixture->lock); + + return AWS_OP_SUCCESS; +} + static void s_aws_rr_client_test_fixture_on_initialized(void *user_data) { struct aws_rr_client_test_fixture *fixture = user_data; @@ -80,6 +406,24 @@ static int s_aws_rr_client_test_fixture_init_from_mqtt5( aws_condition_variable_init(&fixture->signal); fixture->test_context = test_context; + aws_hash_table_init( + &fixture->request_response_records, + allocator, + 10, + aws_hash_byte_cursor_ptr, + aws_mqtt_byte_cursor_hash_equality, + NULL, + s_aws_rr_client_fixture_request_response_record_hash_destroy); + + aws_hash_table_init( + &fixture->streaming_records, + allocator, + 10, + aws_hash_byte_cursor_ptr, + aws_mqtt_byte_cursor_hash_equality, + NULL, + s_aws_rr_client_fixture_streaming_record_hash_destroy); + if (aws_mqtt5_client_mock_test_fixture_init( &fixture->client_test_fixture.mqtt5_test_fixture, allocator, client_test_fixture_options)) { return AWS_OP_ERR; @@ -123,6 +467,24 @@ static int s_aws_rr_client_test_fixture_init_from_mqtt311( aws_condition_variable_init(&fixture->signal); fixture->test_context = test_context; + aws_hash_table_init( + &fixture->request_response_records, + allocator, + 10, + aws_hash_byte_cursor_ptr, + aws_mqtt_byte_cursor_hash_equality, + NULL, + s_aws_rr_client_fixture_request_response_record_hash_destroy); + + aws_hash_table_init( + &fixture->streaming_records, + allocator, + 10, + aws_hash_byte_cursor_ptr, + aws_mqtt_byte_cursor_hash_equality, + NULL, + s_aws_rr_client_fixture_streaming_record_hash_destroy); + aws_test311_setup_mqtt_server_fn(allocator, &fixture->client_test_fixture.mqtt311_test_fixture); struct aws_mqtt_request_response_client_options client_options = { @@ -187,6 +549,9 @@ static void s_aws_rr_client_test_fixture_clean_up(struct aws_rr_client_test_fixt aws_mutex_clean_up(&fixture->lock); aws_condition_variable_clean_up(&fixture->signal); + + aws_hash_table_clean_up(&fixture->request_response_records); + aws_hash_table_clean_up(&fixture->streaming_records); } static int s_rrc_mqtt5_create_destroy_fn(struct aws_allocator *allocator, void *ctx) { @@ -230,4 +595,330 @@ static int s_rrc_mqtt311_create_destroy_fn(struct aws_allocator *allocator, void return AWS_OP_SUCCESS; } -AWS_TEST_CASE(rrc_mqtt311_create_destroy, s_rrc_mqtt311_create_destroy_fn) \ No newline at end of file +AWS_TEST_CASE(rrc_mqtt311_create_destroy, s_rrc_mqtt311_create_destroy_fn) + +static int s_rrc_do_submit_request_operation_failure_test( + struct aws_allocator *allocator, + void (*request_mutator_fn)(struct aws_mqtt_request_operation_options *)) { + aws_mqtt_library_init(allocator); + + struct mqtt5_client_test_options client_test_options; + aws_mqtt5_client_test_init_default_options(&client_test_options); + + struct aws_mqtt5_client_mqtt5_mock_test_fixture_options client_test_fixture_options = { + .client_options = &client_test_options.client_options, + .server_function_table = &client_test_options.server_function_table, + }; + + struct aws_rr_client_test_fixture fixture; + ASSERT_SUCCESS( + s_aws_rr_client_test_fixture_init_from_mqtt5(&fixture, allocator, NULL, &client_test_fixture_options, NULL)); + + struct aws_mqtt_request_operation_response_path response_paths[] = { + { + .topic = aws_byte_cursor_from_c_str("response/filter/accepted"), + .correlation_token_json_path = aws_byte_cursor_from_c_str("client_token"), + }, + { + .topic = aws_byte_cursor_from_c_str("response/filter/rejected"), + .correlation_token_json_path = aws_byte_cursor_from_c_str("client_token"), + }, + }; + struct aws_mqtt_request_operation_options good_request = { + .subscription_topic_filter = aws_byte_cursor_from_c_str("response/filter/+"), + .response_paths = response_paths, + .response_path_count = AWS_ARRAY_SIZE(response_paths), + .publish_topic = aws_byte_cursor_from_c_str("get/shadow"), + .serialized_request = aws_byte_cursor_from_c_str("{}"), + .correlation_token = aws_byte_cursor_from_c_str("MyRequest#1"), + }; + ASSERT_SUCCESS(aws_mqtt_request_response_client_submit_request(fixture.client, &good_request)); + + struct aws_mqtt_request_operation_options bad_request = good_request; + (*request_mutator_fn)(&bad_request); + + ASSERT_FAILS(aws_mqtt_request_response_client_submit_request(fixture.client, &bad_request)); + + s_aws_rr_client_test_fixture_clean_up(&fixture); + + aws_mqtt_library_clean_up(); + + return AWS_OP_SUCCESS; +} + +static void s_no_response_paths_mutator(struct aws_mqtt_request_operation_options *request_options) { + request_options->response_path_count = 0; + request_options->response_paths = NULL; +} + +static int s_rrc_submit_request_operation_failure_no_response_paths_fn(struct aws_allocator *allocator, void *ctx) { + (void)ctx; + + return s_rrc_do_submit_request_operation_failure_test(allocator, s_no_response_paths_mutator); +} + +AWS_TEST_CASE( + rrc_submit_request_operation_failure_no_response_paths, + s_rrc_submit_request_operation_failure_no_response_paths_fn) + +static void s_invalid_response_topic_mutator(struct aws_mqtt_request_operation_options *request_options) { + request_options->response_paths[0].topic = aws_byte_cursor_from_c_str("a/b/#"); +} + +static int s_rrc_submit_request_operation_failure_invalid_response_topic_fn( + struct aws_allocator *allocator, + void *ctx) { + (void)ctx; + + return s_rrc_do_submit_request_operation_failure_test(allocator, s_invalid_response_topic_mutator); +} + +AWS_TEST_CASE( + rrc_submit_request_operation_failure_invalid_response_topic, + s_rrc_submit_request_operation_failure_invalid_response_topic_fn) + +static void s_invalid_response_correlation_token_path_mutator( + struct aws_mqtt_request_operation_options *request_options) { + request_options->response_paths[0].correlation_token_json_path = aws_byte_cursor_from_c_str(""); +} + +static int s_rrc_submit_request_operation_failure_invalid_response_correlation_token_path_fn( + struct aws_allocator *allocator, + void *ctx) { + (void)ctx; + + return s_rrc_do_submit_request_operation_failure_test(allocator, s_invalid_response_correlation_token_path_mutator); +} + +AWS_TEST_CASE( + rrc_submit_request_operation_failure_invalid_response_correlation_token_path, + s_rrc_submit_request_operation_failure_invalid_response_correlation_token_path_fn) + +static void s_no_correlation_token_mutator(struct aws_mqtt_request_operation_options *request_options) { + request_options->correlation_token = aws_byte_cursor_from_c_str(""); +} + +static int s_rrc_submit_request_operation_failure_no_correlation_token_fn(struct aws_allocator *allocator, void *ctx) { + (void)ctx; + + return s_rrc_do_submit_request_operation_failure_test(allocator, s_no_correlation_token_mutator); +} + +AWS_TEST_CASE( + rrc_submit_request_operation_failure_no_correlation_token, + s_rrc_submit_request_operation_failure_no_correlation_token_fn) + +static void s_invalid_publish_topic_mutator(struct aws_mqtt_request_operation_options *request_options) { + request_options->publish_topic = aws_byte_cursor_from_c_str("a/b/#"); +} + +static int s_rrc_submit_request_operation_failure_invalid_publish_topic_fn(struct aws_allocator *allocator, void *ctx) { + (void)ctx; + + return s_rrc_do_submit_request_operation_failure_test(allocator, s_invalid_publish_topic_mutator); +} + +AWS_TEST_CASE( + rrc_submit_request_operation_failure_invalid_publish_topic, + s_rrc_submit_request_operation_failure_invalid_publish_topic_fn) + +static void s_empty_request_mutator(struct aws_mqtt_request_operation_options *request_options) { + request_options->serialized_request = aws_byte_cursor_from_c_str(""); +} + +static int s_rrc_submit_request_operation_failure_empty_request_fn(struct aws_allocator *allocator, void *ctx) { + (void)ctx; + + return s_rrc_do_submit_request_operation_failure_test(allocator, s_empty_request_mutator); +} + +AWS_TEST_CASE( + rrc_submit_request_operation_failure_empty_request, + s_rrc_submit_request_operation_failure_empty_request_fn) + +static int s_rrc_submit_streaming_operation_failure_invalid_subscription_topic_filter_fn( + struct aws_allocator *allocator, + void *ctx) { + (void)ctx; + + aws_mqtt_library_init(allocator); + + struct mqtt5_client_test_options client_test_options; + aws_mqtt5_client_test_init_default_options(&client_test_options); + + struct aws_mqtt5_client_mqtt5_mock_test_fixture_options client_test_fixture_options = { + .client_options = &client_test_options.client_options, + .server_function_table = &client_test_options.server_function_table, + }; + + struct aws_rr_client_test_fixture fixture; + ASSERT_SUCCESS( + s_aws_rr_client_test_fixture_init_from_mqtt5(&fixture, allocator, NULL, &client_test_fixture_options, NULL)); + + struct aws_mqtt_streaming_operation_options good_options = { + .topic_filter = aws_byte_cursor_from_c_str("a/b"), + }; + + struct aws_mqtt_rr_client_operation *good_operation = + aws_mqtt_request_response_client_create_streaming_operation(fixture.client, &good_options); + ASSERT_NOT_NULL(good_operation); + + aws_mqtt_rr_client_operation_release(good_operation); + + struct aws_mqtt_streaming_operation_options bad_options = good_options; + bad_options.topic_filter = aws_byte_cursor_from_c_str(""); + + struct aws_mqtt_rr_client_operation *bad_operation = + aws_mqtt_request_response_client_create_streaming_operation(fixture.client, &bad_options); + ASSERT_NULL(bad_operation); + + s_aws_rr_client_test_fixture_clean_up(&fixture); + + aws_mqtt_library_clean_up(); + + return AWS_OP_SUCCESS; +} + +AWS_TEST_CASE( + rrc_submit_streaming_operation_failure_invalid_subscription_topic_filter, + s_rrc_submit_streaming_operation_failure_invalid_subscription_topic_filter_fn) + +static int s_do_rrc_single_request_operation_test_fn( + struct aws_allocator *allocator, + struct aws_mqtt_request_operation_options *request_options, + int expected_error_code, + struct aws_byte_cursor *expected_payload, + bool shutdown_after_submit) { + aws_mqtt_library_init(allocator); + + struct mqtt5_client_test_options client_test_options; + aws_mqtt5_client_test_init_default_options(&client_test_options); + + struct aws_mqtt5_client_mqtt5_mock_test_fixture_options client_test_fixture_options = { + .client_options = &client_test_options.client_options, + .server_function_table = &client_test_options.server_function_table, + }; + + struct aws_rr_client_test_fixture fixture; + ASSERT_SUCCESS( + s_aws_rr_client_test_fixture_init_from_mqtt5(&fixture, allocator, NULL, &client_test_fixture_options, NULL)); + + struct aws_rr_client_fixture_request_response_record *record = + s_rrc_fixture_add_request_record(&fixture, request_options->serialized_request); + + request_options->completion_callback = s_rrc_fixture_request_completion_callback; + request_options->user_data = record; + + ASSERT_SUCCESS(aws_mqtt_request_response_client_submit_request(fixture.client, request_options)); + + if (shutdown_after_submit) { + aws_mqtt_request_response_client_release(fixture.client); + fixture.client = NULL; + } + + s_rrc_wait_on_request_completion(&fixture, request_options->serialized_request); + + ASSERT_SUCCESS(s_rrc_verify_request_completion( + &fixture, request_options->serialized_request, expected_error_code, expected_payload)); + + s_aws_rr_client_test_fixture_clean_up(&fixture); + + aws_mqtt_library_clean_up(); + + return AWS_OP_SUCCESS; +} + +static int s_rrc_submit_request_operation_failure_by_shutdown_fn(struct aws_allocator *allocator, void *ctx) { + (void)ctx; + + struct aws_mqtt_request_operation_response_path response_paths[] = { + { + .topic = aws_byte_cursor_from_c_str("response/filter/accepted"), + .correlation_token_json_path = aws_byte_cursor_from_c_str("client_token"), + }, + }; + + struct aws_mqtt_request_operation_options request = { + .subscription_topic_filter = aws_byte_cursor_from_c_str("response/filter/+"), + .response_paths = response_paths, + .response_path_count = AWS_ARRAY_SIZE(response_paths), + .publish_topic = aws_byte_cursor_from_c_str("get/shadow"), + .serialized_request = aws_byte_cursor_from_c_str("request1"), + .correlation_token = aws_byte_cursor_from_c_str("MyRequest#1"), + }; + + return s_do_rrc_single_request_operation_test_fn( + allocator, &request, AWS_ERROR_MQTT_REQUEST_RESPONSE_CLIENT_SHUT_DOWN, NULL, true); +} + +AWS_TEST_CASE(rrc_submit_request_operation_failure_by_shutdown, s_rrc_submit_request_operation_failure_by_shutdown_fn) + +static int s_do_rrc_single_streaming_operation_test_fn( + struct aws_allocator *allocator, + struct aws_mqtt_streaming_operation_options *streaming_options, + size_t expected_subscription_event_count, + struct aws_rr_client_fixture_streaming_record_subscription_event *expected_subscription_events, + bool shutdown_after_submit) { + aws_mqtt_library_init(allocator); + + struct mqtt5_client_test_options client_test_options; + aws_mqtt5_client_test_init_default_options(&client_test_options); + + struct aws_mqtt5_client_mqtt5_mock_test_fixture_options client_test_fixture_options = { + .client_options = &client_test_options.client_options, + .server_function_table = &client_test_options.server_function_table, + }; + + struct aws_rr_client_test_fixture fixture; + ASSERT_SUCCESS( + s_aws_rr_client_test_fixture_init_from_mqtt5(&fixture, allocator, NULL, &client_test_fixture_options, NULL)); + + struct aws_byte_cursor streaming_id = aws_byte_cursor_from_c_str("streaming1"); + struct aws_rr_client_fixture_streaming_record *record = s_rrc_fixture_add_streaming_record(&fixture, streaming_id); + + streaming_options->incoming_publish_callback = s_rrc_fixture_streaming_operation_incoming_publish_callback; + streaming_options->subscription_status_callback = s_rrc_fixture_streaming_operation_subscription_status_callback; + streaming_options->terminated_callback = s_rrc_fixture_streaming_operation_terminated_callback; + streaming_options->user_data = record; + + struct aws_mqtt_rr_client_operation *streaming_operation = + aws_mqtt_request_response_client_create_streaming_operation(fixture.client, streaming_options); + ASSERT_NOT_NULL(streaming_operation); + + if (shutdown_after_submit) { + aws_mqtt_request_response_client_release(fixture.client); + fixture.client = NULL; + aws_mqtt_rr_client_operation_release(streaming_operation); + } + + s_rrc_wait_on_streaming_termination(&fixture, streaming_id); + + ASSERT_SUCCESS(s_rrc_verify_streaming_record_subscription_events( + &fixture, streaming_id, expected_subscription_event_count, expected_subscription_events)); + + s_aws_rr_client_test_fixture_clean_up(&fixture); + + aws_mqtt_library_clean_up(); + + return AWS_OP_SUCCESS; +} + +static int s_rrc_submit_streaming_operation_and_shutdown_fn(struct aws_allocator *allocator, void *ctx) { + (void)ctx; + + struct aws_rr_client_fixture_streaming_record_subscription_event expected_events[] = { + { + .status = ARRSET_SUBSCRIPTION_SUBSCRIBE_FAILURE, + .error_code = AWS_ERROR_MQTT_REQUEST_RESPONSE_CLIENT_SHUT_DOWN, + }, + }; + + struct aws_mqtt_streaming_operation_options streaming_options = { + .topic_filter = aws_byte_cursor_from_c_str("derp/filter"), + }; + + return s_do_rrc_single_streaming_operation_test_fn( + allocator, &streaming_options, AWS_ARRAY_SIZE(expected_events), expected_events, true); +} + +AWS_TEST_CASE(rrc_submit_streaming_operation_and_shutdown, s_rrc_submit_streaming_operation_and_shutdown_fn)