From 013168bc598d57f345ea76eae8503645aca68194 Mon Sep 17 00:00:00 2001 From: Bret Ambrose Date: Wed, 17 Jan 2024 17:26:29 -0800 Subject: [PATCH] Checkpoint --- .../request-response/protocol_adapter.h | 31 +- source/request-response/protocol_adapter.c | 340 +++++++++--------- tests/CMakeLists.txt | 5 + tests/v5/mqtt5_client_tests.c | 28 -- tests/v5/mqtt5_testing_utils.c | 26 ++ tests/v5/mqtt5_testing_utils.h | 5 + .../request_response_protocol_adapter_tests.c | 133 +++++++ 7 files changed, 363 insertions(+), 205 deletions(-) create mode 100644 tests/v5/request_response_protocol_adapter_tests.c diff --git a/include/aws/mqtt/private/request-response/protocol_adapter.h b/include/aws/mqtt/private/request-response/protocol_adapter.h index 90c4f0c4..3cf39a07 100644 --- a/include/aws/mqtt/private/request-response/protocol_adapter.h +++ b/include/aws/mqtt/private/request-response/protocol_adapter.h @@ -28,20 +28,21 @@ struct aws_protocol_adapter_publish_options { struct aws_byte_cursor topic; struct aws_byte_cursor payload; - void (*completion_callback_fn)(int, void *); + void (*completion_callback_fn)(bool, void *); void *user_data; uint32_t ack_timeout_seconds; }; -enum aws_protocol_adapter_subscription_status_update { - AWS_PASS_ESTABLISHMENT_SUCCESS, - AWS_PASS_ESTABLISHMENT_FAILURE, - AWS_PASS_REMOVED +enum aws_protocol_adapter_subscription_event_type { + AWS_PASET_SUBSCRIBE_SUCCESS, + AWS_PASET_SUBSCRIBE_FAILURE, + AWS_PASET_UNSUBSCRIBE_SUCCESS, + AWS_PASET_UNSUBSCRIBE_FAILURE, }; -struct aws_protocol_adapter_subscription_status_update_event { +struct aws_protocol_adapter_subscription_event { struct aws_byte_cursor topic_filter; - enum aws_protocol_adapter_subscription_status_update status_update; + enum aws_protocol_adapter_subscription_event_type event_type; }; struct aws_protocol_adapter_incoming_publish_event { @@ -49,14 +50,26 @@ struct aws_protocol_adapter_incoming_publish_event { struct aws_byte_cursor payload; }; -typedef void(aws_protocol_adapter_subscription_status_fn)(struct aws_protocol_adapter_subscription_status_update_event *update, void *user_data); +enum aws_protocol_adapter_connection_event_type { + AWS_PACET_OFFLINE, + AWS_PACET_ONLINE, +}; + +struct aws_protocol_adapter_connection_event { + enum aws_protocol_adapter_connection_event_type event_type; + bool rejoined_session; +}; + +typedef void(aws_protocol_adapter_subscription_event_fn)(struct aws_protocol_adapter_subscription_event *event, void *user_data); typedef void(aws_protocol_adapter_incoming_publish_fn)(struct aws_protocol_adapter_incoming_publish_event *publish, void *user_data); typedef void(aws_protocol_adapter_terminate_callback_fn)(void *user_data); +typedef void(aws_protocol_adapter_connection_event_fn)(struct aws_protocol_adapter_connection_event *event, void *user_data); struct aws_mqtt_protocol_adapter_options { - aws_protocol_adapter_subscription_status_fn *subscription_status_update_callback; + aws_protocol_adapter_subscription_event_fn *subscription_event_callback; aws_protocol_adapter_incoming_publish_fn *incoming_publish_callback; aws_protocol_adapter_terminate_callback_fn *terminate_callback; + aws_protocol_adapter_connection_event_fn *connection_event_callback; void *user_data; }; diff --git a/source/request-response/protocol_adapter.c b/source/request-response/protocol_adapter.c index f3494c86..4aae75cd 100644 --- a/source/request-response/protocol_adapter.c +++ b/source/request-response/protocol_adapter.c @@ -12,102 +12,6 @@ #include #include -enum aws_protocol_adapter_subscription_status { - PASS_NONE, - PASS_SUBSCRIBING, - PASS_SUBSCRIBED, - PASS_UNSUBSCRIBING, -}; - -struct aws_mqtt_protocol_adapter_subscription { - struct aws_allocator *allocator; - - struct aws_byte_cursor topic_filter; - struct aws_byte_buf topic_filter_buf; - - enum aws_protocol_adapter_subscription_status status; -}; - -static struct aws_mqtt_protocol_adapter_subscription *s_aws_mqtt_protocol_adapter_subscription_new(struct aws_allocator *allocator, struct aws_byte_cursor topic_filter) { - struct aws_mqtt_protocol_adapter_subscription *subscription = aws_mem_calloc(allocator, 1, sizeof(struct aws_mqtt_protocol_adapter_subscription)); - - subscription->allocator = allocator; - aws_byte_buf_init_copy_from_cursor(&subscription->topic_filter_buf, allocator, topic_filter); - subscription->topic_filter = aws_byte_cursor_from_buf(&subscription->topic_filter_buf); - - return subscription; -} - -static void s_aws_mqtt_protocol_adapter_subscription_destroy(struct aws_mqtt_protocol_adapter_subscription *subscription) { - aws_byte_buf_clean_up(&subscription->topic_filter_buf); - aws_mem_release(subscription->allocator, subscription); -} - -/******************************************************************************************************************/ - -struct aws_mqtt_protocol_adapter_subscription_set { - struct aws_allocator *allocator; - struct aws_mqtt_protocol_adapter *owner; // not an acquired reference due to the parent-child relationship - struct aws_hash_table subscriptions; // aws_byte_cursor * -> aws_mqtt_protocol_adapter_subscription * - - aws_protocol_adapter_subscription_status_fn *subscription_status_update_callback; - void *callback_user_data; -}; - -static int s_aws_mqtt_protocol_adapter_subscription_set_init(struct aws_mqtt_protocol_adapter_subscription_set *subscription_set, struct aws_allocator *allocator, struct aws_mqtt_protocol_adapter *owner, struct aws_mqtt_protocol_adapter_options *options) { - subscription_set->allocator = allocator; - subscription_set->owner = owner; - subscription_set->subscription_status_update_callback = options->subscription_status_update_callback; - subscription_set->callback_user_data = options->user_data; - - return aws_hash_table_init(&subscription_set->subscriptions, allocator, 0, aws_hash_byte_cursor_ptr, aws_mqtt_byte_cursor_hash_equality, NULL, NULL); -} - -static int s_aws_mqtt_protocol_adapter_subscription_set_subscription_destroy(void *context, struct aws_hash_element *elem) { - struct aws_mqtt_protocol_adapter_subscription_set *subscription_set = context; - struct aws_mqtt_protocol_adapter *adapter = subscription_set->owner; - - struct aws_mqtt_protocol_adapter_subscription *subscription = elem->value; - if (subscription->status != PASS_UNSUBSCRIBING) { - struct aws_protocol_adapter_unsubscribe_options options = { - .topic_filter = subscription->topic_filter, - }; - - aws_mqtt_protocol_adapter_unsubscribe(adapter, &options); - } - - s_aws_mqtt_protocol_adapter_subscription_destroy(subscription); - - return AWS_COMMON_HASH_TABLE_ITER_CONTINUE | AWS_COMMON_HASH_TABLE_ITER_DELETE; -} - -static void s_aws_mqtt_protocol_adapter_subscription_set_clean_up(struct aws_mqtt_protocol_adapter_subscription_set *subscription_set) { - struct aws_hash_table subscriptions; - AWS_ZERO_STRUCT(subscriptions); - - aws_hash_table_swap(&subscription_set->subscriptions, &subscriptions); - - aws_hash_table_foreach(&subscriptions, s_aws_mqtt_protocol_adapter_subscription_set_subscription_destroy, subscription_set); - - aws_hash_table_clean_up(&subscriptions); -} - -static void s_aws_mqtt_protocol_adapter_subscription_set_update_subscription(struct aws_mqtt_protocol_adapter_subscription_set *subscription_set, struct aws_byte_cursor topic_filter, enum aws_protocol_adapter_subscription_status status) { - (void)subscription_set; - (void)topic_filter; - (void)status; - - // TODO -} - -static void s_aws_mqtt_protocol_adapter_subscription_set_create_or_update_subscription(struct aws_mqtt_protocol_adapter_subscription_set *subscription_set, struct aws_byte_cursor topic_filter, enum aws_protocol_adapter_subscription_status status) { - (void)subscription_set; - (void)topic_filter; - (void)status; - - // TODO -} - /* * New API contract * @@ -122,61 +26,13 @@ static void s_aws_mqtt_protocol_adapter_subscription_set_create_or_update_subscr * hold of the adapter). * * How do we know not to retry unsubscribe failures because a subscribe came in? Well, we don't retry failures; let - * the manager make that decision. Only retry (maybe) if the manager is gone (ie failure against a zeroed weak ref). + * the manager make that decision. No retry when the weak ref is zeroed either. The potential for things to go wrong + * is worse than the potential of a subscription "leaking." * * On subscribe failures with zeroed weak ref, trust that an Unsubscribe was sent that will resolve later and let it * decide what to do. */ -/* - * On subscribe success: if there's not an entry, is this possible? No because we're called only by a function that checks for adapter weak->strong first, so the adapter exists and we don't allow subscription removal without an unsubscribe complete and we don't allow the subscribe until the unsubscribe has completed. But what - * if - * On subscribe success: if there's an entry, transition | subscribing -> subscribed, send an update - * On subscribe failure: if there's not an entry, is this possible? - * On subscribe failure: if there's an entry, transition -> unsubscribing, send an update - * - * Should we just blindly add if the adapter exists? Yes: simplest. No: represents undefined behavior if it shouldn't be happening - * - * In the design we said that the subscription set is just a dumb reflection of the ordered sequence of operations - * from the rr client which implies we should just create_or_update. The only time we don't want to create_or_update - * is if we're in/post destruction but then there's no adapter and we early out - */ -static void s_aws_mqtt_protocol_adapter_subscription_set_on_subscribe_completion(struct aws_mqtt_protocol_adapter_subscription_set *subscription_set, struct aws_byte_cursor topic_filter, bool success) { - if (success) { - s_aws_mqtt_protocol_adapter_subscription_set_update_subscription(subscription_set, topic_filter, PASS_SUBSCRIBED); - } else { - struct aws_protocol_adapter_unsubscribe_options options = { - .topic_filter = topic_filter, - }; - - aws_mqtt_protocol_adapter_unsubscribe(subscription_set->owner, &options); - } - - - - struct aws_hash_element *hash_element = NULL; - if (!aws_hash_table_find(&subscription_set->subscriptions, &topic_filter, &hash_element) || hash_element == NULL) { - return; - } - - struct aws_mqtt_protocol_adapter_subscription *subscription = hash_element->value; - AWS_FATAL_ASSERT(subscription != NULL); - - switch (subscription->status) { - case PASS_SUBSCRIBING: { - if (success) { - subscription->status = PASS_SUBSCRIBED; - } - } - - default: - break; - } - - // TODO -} - -/******************************************************************************************************************/ struct aws_mqtt_protocol_adapter *aws_mqtt_protocol_adapter_new_from_311(struct aws_allocator *allocator, struct aws_mqtt_protocol_adapter_options *options, struct aws_mqtt_client_connection *connection) { (void)allocator; @@ -198,27 +54,25 @@ struct aws_mqtt_protocol_adapter_5_impl { struct aws_event_loop *loop; struct aws_mqtt5_client *client; struct aws_mqtt5_listener *listener; - - struct aws_mqtt_protocol_adapter_subscription_set subscriptions; }; static void s_aws_mqtt_protocol_adapter_5_delete(void *impl) { struct aws_mqtt_protocol_adapter_5_impl *adapter = impl; + // all the real cleanup is done in the listener termination callback aws_mqtt5_listener_release(adapter->listener); } -/* Subscribe */ - -struct aws_mqtt_protocol_adapter_5_subscribe_data { +// used by both subscribe and unsubscribe +struct aws_mqtt_protocol_adapter_5_subscription_op_data { struct aws_allocator *allocator; struct aws_byte_buf topic_filter; struct aws_protocol_adapter_weak_ref *callback_ref; }; -static struct aws_mqtt_protocol_adapter_5_subscribe_data *aws_mqtt_protocol_adapter_5_subscribe_data_new(struct aws_allocator *allocator, struct aws_byte_cursor topic_filter, struct aws_protocol_adapter_weak_ref *callback_ref) { - struct aws_mqtt_protocol_adapter_5_subscribe_data *subscribe_data = aws_mem_calloc(allocator, 1, sizeof(struct aws_mqtt_protocol_adapter_5_subscribe_data)); +static struct aws_mqtt_protocol_adapter_5_subscription_op_data *s_aws_mqtt_protocol_adapter_5_subscription_op_data_new(struct aws_allocator *allocator, struct aws_byte_cursor topic_filter, struct aws_protocol_adapter_weak_ref *callback_ref) { + struct aws_mqtt_protocol_adapter_5_subscription_op_data *subscribe_data = aws_mem_calloc(allocator, 1, sizeof(struct aws_mqtt_protocol_adapter_5_subscription_op_data)); subscribe_data->allocator = allocator; subscribe_data->callback_ref = aws_weak_ref_acquire(callback_ref); @@ -227,17 +81,19 @@ static struct aws_mqtt_protocol_adapter_5_subscribe_data *aws_mqtt_protocol_adap return subscribe_data; } -static void aws_mqtt_protocol_adapter_5_subscribe_data_delete(struct aws_mqtt_protocol_adapter_5_subscribe_data *subscribe_data) { +static void s_aws_mqtt_protocol_adapter_5_subscription_op_data_delete(struct aws_mqtt_protocol_adapter_5_subscription_op_data *subscribe_data) { aws_weak_ref_release(subscribe_data->callback_ref); aws_byte_buf_clean_up(&subscribe_data->topic_filter); aws_mem_release(subscribe_data->allocator, subscribe_data); } +/* Subscribe */ + static void s_protocol_adapter_5_subscribe_completion(const struct aws_mqtt5_packet_suback_view *suback, int error_code, void *complete_ctx) { - struct aws_mqtt_protocol_adapter_5_subscribe_data *subscribe_data = complete_ctx; + struct aws_mqtt_protocol_adapter_5_subscription_op_data *subscribe_data = complete_ctx; struct aws_mqtt_protocol_adapter_5_impl *adapter = aws_weak_ref_get_reference(subscribe_data->callback_ref); if (adapter == NULL) { @@ -245,17 +101,23 @@ static void s_protocol_adapter_5_subscribe_completion(const struct aws_mqtt5_pac } bool success = error_code == AWS_ERROR_SUCCESS && suback != NULL && suback->reason_code_count == 1 && suback->reason_codes[0] <= AWS_MQTT5_SARC_GRANTED_QOS_2; - s_aws_mqtt_protocol_adapter_subscription_set_on_subscribe_completion(&adapter->subscriptions, aws_byte_cursor_from_buf(&subscribe_data->topic_filter), success); + + struct aws_protocol_adapter_subscription_event subscribe_event = { + .topic_filter = aws_byte_cursor_from_buf(&subscribe_data->topic_filter), + .event_type = success ? AWS_PASET_SUBSCRIBE_SUCCESS : AWS_PASET_SUBSCRIBE_FAILURE, + }; + + (*adapter->config.subscription_event_callback)(&subscribe_event, adapter->config.user_data); done: - aws_mqtt_protocol_adapter_5_subscribe_data_delete(subscribe_data); + s_aws_mqtt_protocol_adapter_5_subscription_op_data_delete(subscribe_data); } int s_aws_mqtt_protocol_adapter_5_subscribe(void *impl, struct aws_protocol_adapter_subscribe_options *options) { struct aws_mqtt_protocol_adapter_5_impl *adapter = impl; - struct aws_mqtt_protocol_adapter_5_subscribe_data *subscribe_data = aws_mqtt_protocol_adapter_5_subscribe_data_new(adapter->allocator, options->topic_filter, adapter->callback_ref); + struct aws_mqtt_protocol_adapter_5_subscription_op_data *subscribe_data = s_aws_mqtt_protocol_adapter_5_subscription_op_data_new(adapter->allocator, options->topic_filter, adapter->callback_ref); struct aws_mqtt5_subscription_view subscription_view = { .qos = AWS_MQTT5_QOS_AT_LEAST_ONCE, @@ -281,38 +143,180 @@ int s_aws_mqtt_protocol_adapter_5_subscribe(void *impl, struct aws_protocol_adap error: - aws_mqtt_protocol_adapter_5_subscribe_data_delete(subscribe_data); + s_aws_mqtt_protocol_adapter_5_subscription_op_data_delete(subscribe_data); return AWS_OP_ERR; } /* Unsubscribe */ +static void s_protocol_adapter_5_unsubscribe_completion(const struct aws_mqtt5_packet_unsuback_view *unsuback, + int error_code, + void *complete_ctx) { + struct aws_mqtt_protocol_adapter_5_subscription_op_data *unsubscribe_data = complete_ctx; + struct aws_mqtt_protocol_adapter_5_impl *adapter = aws_weak_ref_get_reference(unsubscribe_data->callback_ref); + + if (adapter == NULL) { + goto done; + } + + bool success = error_code == AWS_ERROR_SUCCESS && unsuback != NULL && unsuback->reason_code_count == 1 && unsuback->reason_codes[0] < 128; + + struct aws_protocol_adapter_subscription_event unsubscribe_event = { + .topic_filter = aws_byte_cursor_from_buf(&unsubscribe_data->topic_filter), + .event_type = success ? AWS_PASET_UNSUBSCRIBE_SUCCESS : AWS_PASET_UNSUBSCRIBE_FAILURE, + }; + + (*adapter->config.subscription_event_callback)(&unsubscribe_event, adapter->config.user_data); + +done: + + s_aws_mqtt_protocol_adapter_5_subscription_op_data_delete(unsubscribe_data); +} + int s_aws_mqtt_protocol_adapter_5_unsubscribe(void *impl, struct aws_protocol_adapter_unsubscribe_options *options) { struct aws_mqtt_protocol_adapter_5_impl *adapter = impl; - (void)adapter; - (void)options; - return aws_raise_error(AWS_ERROR_UNIMPLEMENTED); + struct aws_mqtt_protocol_adapter_5_subscription_op_data *unsubscribe_data = s_aws_mqtt_protocol_adapter_5_subscription_op_data_new(adapter->allocator, options->topic_filter, adapter->callback_ref); + + struct aws_mqtt5_packet_unsubscribe_view unsubscribe_view = { + .topic_filters = &options->topic_filter, + .topic_filter_count = 1, + }; + + struct aws_mqtt5_unsubscribe_completion_options completion_options = { + .ack_timeout_seconds_override = options->ack_timeout_seconds, + .completion_callback = s_protocol_adapter_5_unsubscribe_completion, + .completion_user_data = unsubscribe_data, + }; + + if (aws_mqtt5_client_unsubscribe(adapter->client, &unsubscribe_view, &completion_options)) { + goto error; + } + + return AWS_OP_SUCCESS; + +error: + + s_aws_mqtt_protocol_adapter_5_subscription_op_data_delete(unsubscribe_data); + + return AWS_OP_ERR; } /* Publish */ +struct aws_mqtt_protocol_adapter_5_publish_op_data { + struct aws_allocator *allocator; + struct aws_protocol_adapter_weak_ref *callback_ref; + + void (*completion_callback_fn)(bool, void *); + void *user_data; +}; + +static struct aws_mqtt_protocol_adapter_5_publish_op_data *s_aws_mqtt_protocol_adapter_5_publish_op_data_new(struct aws_allocator *allocator, const struct aws_protocol_adapter_publish_options *publish_options, struct aws_protocol_adapter_weak_ref *callback_ref) { + struct aws_mqtt_protocol_adapter_5_publish_op_data *publish_data = aws_mem_calloc(allocator, 1, sizeof(struct aws_mqtt_protocol_adapter_5_publish_op_data)); + + publish_data->allocator = allocator; + publish_data->callback_ref = aws_weak_ref_acquire(callback_ref); + publish_data->completion_callback_fn = publish_options->completion_callback_fn; + publish_data->user_data = publish_options->user_data; + + return publish_data; +} + +static void s_aws_mqtt_protocol_adapter_5_publish_op_data_delete(struct aws_mqtt_protocol_adapter_5_publish_op_data *publish_data) { + aws_weak_ref_release(publish_data->callback_ref); + + aws_mem_release(publish_data->allocator, publish_data); +} + +static void s_protocol_adapter_5_publish_completion( + enum aws_mqtt5_packet_type packet_type, + const void *packet, + int error_code, + void *complete_ctx) { + struct aws_mqtt_protocol_adapter_5_publish_op_data *publish_data = complete_ctx; + struct aws_mqtt_protocol_adapter_5_impl *adapter = aws_weak_ref_get_reference(publish_data->callback_ref); + + if (adapter == NULL) { + goto done; + } + + bool success = false; + if (error_code == AWS_ERROR_SUCCESS && packet_type == AWS_MQTT5_PT_PUBACK) { + const struct aws_mqtt5_packet_puback_view *puback = packet; + if (puback->reason_code < 128) { + success = true; + } + } + + (*publish_data->completion_callback_fn)(success, publish_data->user_data); + +done: + + s_aws_mqtt_protocol_adapter_5_publish_op_data_delete(publish_data); +} + int s_aws_mqtt_protocol_adapter_5_publish(void *impl, struct aws_protocol_adapter_publish_options *options) { struct aws_mqtt_protocol_adapter_5_impl *adapter = impl; - (void)adapter; - (void)options; + struct aws_mqtt_protocol_adapter_5_publish_op_data *publish_data = s_aws_mqtt_protocol_adapter_5_publish_op_data_new(adapter->allocator, options, adapter->callback_ref); - return aws_raise_error(AWS_ERROR_UNIMPLEMENTED); + struct aws_mqtt5_packet_publish_view publish_view = { + .topic = options->topic, + .qos = AWS_MQTT5_QOS_AT_LEAST_ONCE, + .payload = options->payload + }; + + struct aws_mqtt5_publish_completion_options completion_options = { + .ack_timeout_seconds_override = options->ack_timeout_seconds, + .completion_callback = s_protocol_adapter_5_publish_completion, + .completion_user_data = publish_data, + }; + + if (aws_mqtt5_client_publish(adapter->client, &publish_view, &completion_options)) { + goto error; + } + + return AWS_OP_SUCCESS; + +error: + + s_aws_mqtt_protocol_adapter_5_publish_op_data_delete(publish_data); + + return AWS_OP_ERR; } static bool s_protocol_adapter_mqtt5_listener_publish_received(const struct aws_mqtt5_packet_publish_view *publish, void *user_data) { - (void)publish; - (void)user_data; + struct aws_mqtt_protocol_adapter_5_impl *adapter = user_data; + + struct aws_protocol_adapter_incoming_publish_event publish_event = { + .topic = publish->topic, + .payload = publish->payload + }; + + (*adapter->config.incoming_publish_callback)(&publish_event, adapter->config.user_data); + + return false; } static void s_protocol_adapter_mqtt5_lifecycle_event_callback(const struct aws_mqtt5_client_lifecycle_event *event) { - (void)event; + struct aws_mqtt_protocol_adapter_5_impl *adapter = event->user_data; + + if (event->event_type != AWS_MQTT5_CLET_CONNECTION_SUCCESS && event->event_type != AWS_MQTT5_CLET_DISCONNECTION) { + return; + } + + bool is_connection_success = event->event_type == AWS_MQTT5_CLET_CONNECTION_SUCCESS; + + struct aws_protocol_adapter_connection_event connection_event = { + .event_type = is_connection_success ? AWS_PACET_ONLINE : AWS_PACET_OFFLINE, + }; + + if (is_connection_success) { + connection_event.rejoined_session = event->settings->rejoined_session; + } + + (*adapter->config.connection_event_callback)(&connection_event, adapter->config.user_data); } static void s_protocol_adapter_mqtt5_listener_termination_callback(void *user_data) { diff --git a/tests/CMakeLists.txt b/tests/CMakeLists.txt index b8ef2587..f1d90e22 100644 --- a/tests/CMakeLists.txt +++ b/tests/CMakeLists.txt @@ -442,6 +442,11 @@ add_test_case(mqtt_subscription_set_publish_single_level_wildcards) add_test_case(mqtt_subscription_set_publish_multi_level_wildcards) add_test_case(mqtt_subscription_set_get_subscriptions) +add_test_case(request_response_mqtt5_protocol_adapter_subscribe_success) +#add_test_case(request_response_mqtt5_protocol_adapter_subscribe_failure_error_code) +#add_test_case(request_response_mqtt5_protocol_adapter_subscribe_failure_reason_code) +#add_test_case(request_response_mqtt5_protocol_adapter_subscribe_failure_timeout) + generate_test_driver(${PROJECT_NAME}-tests) set(TEST_PAHO_CLIENT_BINARY_NAME ${PROJECT_NAME}-paho-client) diff --git a/tests/v5/mqtt5_client_tests.c b/tests/v5/mqtt5_client_tests.c index 2d436f85..06367d3d 100644 --- a/tests/v5/mqtt5_client_tests.c +++ b/tests/v5/mqtt5_client_tests.c @@ -10,7 +10,6 @@ #include #include #include -#include #include #include #include @@ -18,40 +17,13 @@ #include -#include #include -#define TEST_IO_MESSAGE_LENGTH 4096 - static bool s_is_within_percentage_of(uint64_t expected_time, uint64_t actual_time, double percentage) { double actual_percent = 1.0 - (double)actual_time / (double)expected_time; return fabs(actual_percent) <= percentage; } -int aws_mqtt5_mock_server_send_packet( - struct aws_mqtt5_server_mock_connection_context *connection, - enum aws_mqtt5_packet_type packet_type, - void *packet) { - aws_mqtt5_encoder_append_packet_encoding(&connection->encoder, packet_type, packet); - - struct aws_io_message *message = aws_channel_acquire_message_from_pool( - connection->slot->channel, AWS_IO_MESSAGE_APPLICATION_DATA, TEST_IO_MESSAGE_LENGTH); - if (message == NULL) { - return AWS_OP_ERR; - } - - enum aws_mqtt5_encoding_result result = - aws_mqtt5_encoder_encode_to_buffer(&connection->encoder, &message->message_data); - AWS_FATAL_ASSERT(result == AWS_MQTT5_ER_FINISHED); - - if (aws_channel_slot_send_message(connection->slot, message, AWS_CHANNEL_DIR_WRITE)) { - aws_mem_release(message->allocator, message); - return AWS_OP_ERR; - } - - return AWS_OP_SUCCESS; -} - int aws_mqtt5_mock_server_handle_connect_always_succeed( void *packet, struct aws_mqtt5_server_mock_connection_context *connection, diff --git a/tests/v5/mqtt5_testing_utils.c b/tests/v5/mqtt5_testing_utils.c index e8e72f3f..4b682d83 100644 --- a/tests/v5/mqtt5_testing_utils.c +++ b/tests/v5/mqtt5_testing_utils.c @@ -1740,3 +1740,29 @@ size_t aws_mqtt5_linked_list_length(struct aws_linked_list *list) { return length; } + +#define TEST_IO_MESSAGE_LENGTH 4096 + +int aws_mqtt5_mock_server_send_packet( + struct aws_mqtt5_server_mock_connection_context *connection, + enum aws_mqtt5_packet_type packet_type, + void *packet) { + aws_mqtt5_encoder_append_packet_encoding(&connection->encoder, packet_type, packet); + + struct aws_io_message *message = aws_channel_acquire_message_from_pool( + connection->slot->channel, AWS_IO_MESSAGE_APPLICATION_DATA, TEST_IO_MESSAGE_LENGTH); + if (message == NULL) { + return AWS_OP_ERR; + } + + enum aws_mqtt5_encoding_result result = + aws_mqtt5_encoder_encode_to_buffer(&connection->encoder, &message->message_data); + AWS_FATAL_ASSERT(result == AWS_MQTT5_ER_FINISHED); + + if (aws_channel_slot_send_message(connection->slot, message, AWS_CHANNEL_DIR_WRITE)) { + aws_mem_release(message->allocator, message); + return AWS_OP_ERR; + } + + return AWS_OP_SUCCESS; +} \ No newline at end of file diff --git a/tests/v5/mqtt5_testing_utils.h b/tests/v5/mqtt5_testing_utils.h index e4fa6de8..0e30f246 100644 --- a/tests/v5/mqtt5_testing_utils.h +++ b/tests/v5/mqtt5_testing_utils.h @@ -218,6 +218,11 @@ int aws_mqtt5_mock_server_handle_unsubscribe_unsuback_success( struct aws_mqtt5_server_mock_connection_context *connection, void *user_data); +int aws_mqtt5_mock_server_send_packet( + struct aws_mqtt5_server_mock_connection_context *connection, + enum aws_mqtt5_packet_type packet_type, + void *packet); + extern const struct aws_string *g_default_client_id; #define RECONNECT_TEST_MIN_BACKOFF 500 diff --git a/tests/v5/request_response_protocol_adapter_tests.c b/tests/v5/request_response_protocol_adapter_tests.c new file mode 100644 index 00000000..29abcd99 --- /dev/null +++ b/tests/v5/request_response_protocol_adapter_tests.c @@ -0,0 +1,133 @@ +/** +* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +* SPDX-License-Identifier: Apache-2.0. +*/ + +#include "mqtt5_testing_utils.h" +#include + +struct request_response_protocol_adapter_incoming_publish_event_record { + struct aws_byte_buf topic; + struct aws_byte_buf payload; +}; + +static void s_request_response_protocol_adapter_incoming_publish_event_record_init( + struct request_response_protocol_adapter_incoming_publish_event_record *record, + struct aws_allocator *allocator, + struct aws_byte_cursor topic, + struct aws_byte_cursor payload) { + + aws_byte_buf_init_copy_from_cursor(&record->topic, allocator, topic); + aws_byte_buf_init_copy_from_cursor(&record->payload, allocator, payload); +} + +static void s_request_response_protocol_adapter_incoming_publish_event_record_cleanup(struct request_response_protocol_adapter_incoming_publish_event_record *record) { + aws_byte_buf_clean_up(&record->topic); + aws_byte_buf_clean_up(&record->payload); +} + +struct request_response_protocol_adapter_connection_event_record { + enum aws_protocol_adapter_connection_event_type event_type; + bool rejoined_session; +}; + +struct request_response_protocol_adapter_subscription_event_record { + enum aws_protocol_adapter_subscription_event_type event_type; + struct aws_byte_buf topic_filter; +}; + +static void s_request_response_protocol_adapter_incoming_subscription_event_record_init( + struct request_response_protocol_adapter_subscription_event_record *record, + struct aws_allocator *allocator, + struct aws_byte_cursor topic_filter) { + + aws_byte_buf_init_copy_from_cursor(&record->topic_filter, allocator, topic_filter); +} + +static void s_request_response_protocol_adapter_incoming_subscription_event_record_cleanup(struct request_response_protocol_adapter_subscription_event_record *record) { + aws_byte_buf_clean_up(&record->topic_filter); +} + +struct aws_request_response_mqtt5_adapter_test_fixture { + struct aws_allocator *allocator; + struct aws_mqtt5_client_mock_test_fixture mqtt5_fixture; + + struct aws_mqtt_protocol_adapter *protocol_adapter; + + struct aws_array_list incoming_publish_events; + struct aws_array_list connection_events; + struct aws_array_list subscription_events; + + struct aws_mutex lock; + struct aws_condition_variable signal; +}; + + +static void s_rr_mqtt5_protocol_adapter_subscription_event(struct aws_protocol_adapter_subscription_event *event, void *user_data) { + struct aws_request_response_mqtt5_adapter_test_fixture *fixture = user_data; + + struct request_response_protocol_adapter_subscription_event_record record = { + .event_type = event->event_type + }; + s_request_response_protocol_adapter_incoming_subscription_event_record_init(&record, fixture->allocator, event->topic_filter); + + aws_mutex_lock(&fixture->lock); + aws_array_list_push_back(&fixture->subscription_events, &record); + aws_mutex_unlock(&fixture->lock); + aws_condition_variable_notify_all(&fixture->signal); +} + +static void s_rr_mqtt5_protocol_adapter_incoming_publish(struct aws_protocol_adapter_incoming_publish_event *publish, void *user_data) { + struct aws_request_response_mqtt5_adapter_test_fixture *fixture = user_data; + +} + +static void s_rr_mqtt5_protocol_adapter_terminate_callback(void *user_data) { + struct aws_request_response_mqtt5_adapter_test_fixture *fixture = user_data; +} + +static void s_rr_mqtt5_protocol_adapter_connection_event(struct aws_protocol_adapter_connection_event *event, void *user_data) { + struct aws_request_response_mqtt5_adapter_test_fixture *fixture = user_data; +} + +static int s_aws_request_response_mqtt5_adapter_test_fixture_init( + struct aws_request_response_mqtt5_adapter_test_fixture *fixture, + struct aws_allocator *allocator, + struct aws_mqtt5_client_mqtt5_mock_test_fixture_options *mqtt5_fixture_config) { + + AWS_ZERO_STRUCT(*fixture); + + fixture->allocator = allocator; + + if (aws_mqtt5_client_mock_test_fixture_init(&fixture->mqtt5_fixture, allocator, mqtt5_fixture_config)) { + return AWS_OP_ERR; + } + + struct aws_mqtt_protocol_adapter_options protocol_adapter_options = { + .subscription_event_callback = s_rr_mqtt5_protocol_adapter_subscription_event, + .incoming_publish_callback = s_rr_mqtt5_protocol_adapter_incoming_publish, + .terminate_callback = s_rr_mqtt5_protocol_adapter_terminate_callback, + .connection_event_callback = s_rr_mqtt5_protocol_adapter_connection_event, + .user_data = fixture + }; + + fixture->protocol_adapter = aws_mqtt_protocol_adapter_new_from_5(allocator, &protocol_adapter_options, fixture->mqtt5_fixture.client); + AWS_FATAL_ASSERT(fixture->protocol_adapter != NULL); + + aws_array_list_init_dynamic(&fixture->incoming_publish_events, allocator, 10, sizeof(struct request_response_protocol_adapter_incoming_publish_event_record)); + aws_array_list_init_dynamic(&fixture->connection_events, allocator, 10, sizeof(struct request_response_protocol_adapter_connection_event_record)); + aws_array_list_init_dynamic(&fixture->subscription_events, allocator, 10, sizeof(struct request_response_protocol_adapter_subscription_event_record)); + + aws_mutex_init(&fixture->lock); + aws_condition_variable_init(&fixture->signal); + + return AWS_OP_SUCCESS; +} + +static void s_aws_mqtt5_to_mqtt3_adapter_test_fixture_clean_up(struct aws_request_response_mqtt5_adapter_test_fixture *fixture) { + + aws_mqtt5_client_mock_test_fixture_clean_up(&fixture->mqtt5_fixture); + + aws_mutex_clean_up(&fixture->lock); + aws_condition_variable_clean_up(&fixture->signal); +}