From 2537e4098bcaf3e401c9348db22cd93c3e44b5b8 Mon Sep 17 00:00:00 2001 From: Colden Cullen Date: Tue, 8 Jan 2019 15:37:29 -0800 Subject: [PATCH] Update callback scheme to the one we extensively discussed (#45) --- include/aws/mqtt/client.h | 116 ++++++++---- include/aws/mqtt/mqtt.h | 2 + include/aws/mqtt/private/client_impl.h | 40 ++++- source/client.c | 237 +++++++++++++++++-------- source/client_channel_handler.c | 31 +++- source/mqtt.c | 6 + tests/aws_iot_client_test.c | 91 +++++----- tests/paho_client_test.c | 56 ++++-- 8 files changed, 404 insertions(+), 175 deletions(-) diff --git a/include/aws/mqtt/client.h b/include/aws/mqtt/client.h index b9c13697..3b602e05 100644 --- a/include/aws/mqtt/client.h +++ b/include/aws/mqtt/client.h @@ -38,24 +38,6 @@ struct aws_mqtt_client { struct aws_mqtt_client_connection; -struct aws_mqtt_client_connection_callbacks { - /* Called if the connection to the server is not completed. - * Note that if a CONNACK is received, this function will not be called no matter what the return code is */ - void (*on_connection_failed)(struct aws_mqtt_client_connection *connection, int error_code, void *user_data); - /* Called when a connection acknowlegement is received. - * If return_code is not ACCEPT, the connetion is automatically closed. */ - void (*on_connack)( - struct aws_mqtt_client_connection *connection, - enum aws_mqtt_connect_return_code return_code, - bool session_present, - void *user_data); - /* Called when a connection is closed, right before any resources are deleted. - * Return true to attempt a reconnect. */ - bool (*on_disconnect)(struct aws_mqtt_client_connection *connection, int error_code, void *user_data); - - void *user_data; -}; - /** Callback called when a request roundtrip is complete (QoS0 immediately, QoS1 on PUBACK, QoS2 on PUBCOMP). */ typedef void(aws_mqtt_op_complete_fn)( struct aws_mqtt_client_connection *connection, @@ -63,8 +45,36 @@ typedef void(aws_mqtt_op_complete_fn)( int error_code, void *userdata); +/** + * Called when a connection attempt is completed, either in success or error. + * + * If error code is AWS_OP_SUCCESS, then a CONNACK has been received from the server and return_code and session_present + * contain the values received. + * If error_code is not AWS_OP_SUCCESS, it refers to the internal error that occured during connection, and return_code + * and session_present are invalid. + */ +typedef void(aws_mqtt_client_on_connection_complete_fn)( + struct aws_mqtt_client_connection *connection, + int error_code, + enum aws_mqtt_connect_return_code return_code, + bool session_present, + void *userdata); + +/* Called if the connection to the server is lost. */ +typedef void(aws_mqtt_client_on_connection_interrupted_fn)( + struct aws_mqtt_client_connection *connection, + int error_code, + void *userdata); + +/* Called when a connection to the server is resumed. */ +typedef void(aws_mqtt_client_on_connection_resumed_fn)( + struct aws_mqtt_client_connection *connection, + enum aws_mqtt_connect_return_code return_code, + bool session_present, + void *userdata); + /** Called when a multi-topic subscription request is complete */ -typedef void(aws_mqtt_suback_fn)( +typedef void(aws_mqtt_suback_multi_fn)( struct aws_mqtt_client_connection *connection, uint16_t packet_id, const struct aws_array_list *topic_subacks, /* contains aws_mqtt_topic_subscription pointers */ @@ -72,7 +82,7 @@ typedef void(aws_mqtt_suback_fn)( void *userdata); /** Called when a single-topic subscription request is complete */ -typedef void(aws_mqtt_suback_single_fn)( +typedef void(aws_mqtt_suback_fn)( struct aws_mqtt_client_connection *connection, uint16_t packet_id, const struct aws_byte_cursor *topic, @@ -85,7 +95,10 @@ typedef void(aws_mqtt_client_publish_received_fn)( struct aws_mqtt_client_connection *connection, const struct aws_byte_cursor *topic, const struct aws_byte_cursor *payload, - void *user_data); + void *userdata); + +/* Called when a connection is closed, right before any resources are deleted */ +typedef void(aws_mqtt_client_on_disconnect_fn)(struct aws_mqtt_client_connection *connection, void *userdata); /** Passed to subscribe() and suback callbacks */ struct aws_mqtt_topic_subscription { @@ -141,7 +154,6 @@ void aws_mqtt_client_clean_up(struct aws_mqtt_client *client); AWS_MQTT_API struct aws_mqtt_client_connection *aws_mqtt_client_connection_new( struct aws_mqtt_client *client, - struct aws_mqtt_client_connection_callbacks callbacks, const struct aws_byte_cursor *host_name, uint16_t port, struct aws_socket_options *socket_options, @@ -196,14 +208,33 @@ int aws_mqtt_client_connection_set_reconnect_timeout( uint64_t min_timeout, uint64_t max_timeout); +/** + * Sets the callbacks to call when a connection is interrupted and resumed. + * + * \param[in] connection The connection object + * \param[in] on_interrupted The function to call when a connection is lost + * \param[in] on_interrupted_ud Userdata for on_interrupted + * \param[in] on_resumed The function to call when a connection is resumed + * \param[in] on_resumed_ud Userdata for on_resumed + */ +AWS_MQTT_API +int aws_mqtt_client_connection_set_connection_interruption_handlers( + struct aws_mqtt_client_connection *connection, + aws_mqtt_client_on_connection_interrupted_fn *on_interrupted, + void *on_interrupted_ud, + aws_mqtt_client_on_connection_resumed_fn *on_resumed, + void *on_resumed_ud); + /** * Opens the actual connection defined by aws_mqtt_client_connection_new. * Once the connection is opened, on_connack will be called. * - * \param[in] connection The connection object - * \param[in] client_id The clientid to place in the CONNECT packet. May be NULL to reuse last set client_id. - * \param[in] clean_session True to discard all server session data and start fresh - * \param[in] keep_alive_time The keep alive value to place in the CONNECT PACKET + * \param[in] connection The connection object + * \param[in] client_id The clientid to place in the CONNECT packet. + * \param[in] clean_session True to discard all server session data and start fresh + * \param[in] keep_alive_time The keep alive value to place in the CONNECT PACKET + * \param[in] on_connection_complete The callback to fire when the connection attempt completes + * \param[in] userdata Passed to the userdata param of on_connection_complete * * \returns AWS_OP_SUCCESS if the connection has been successfully initiated, * otherwise AWS_OP_ERR and aws_last_error() will be set. @@ -213,7 +244,29 @@ int aws_mqtt_client_connection_connect( struct aws_mqtt_client_connection *connection, const struct aws_byte_cursor *client_id, bool clean_session, - uint16_t keep_alive_time); + uint16_t keep_alive_time, + aws_mqtt_client_on_connection_complete_fn *on_connection_complete, + void *userdata); + +/** + * Opens the actual connection defined by aws_mqtt_client_connection_new. + * Once the connection is opened, on_connack will be called. + * + * Must be called on a connection that has previously been open, + * as the parameters passed during the last connection will be reused. + * + * \param[in] connection The connection object + * \param[in] on_connection_complete The callback to fire when the connection attempt completes + * \param[in] userdata Passed to the userdata param of on_connection_complete + * + * \returns AWS_OP_SUCCESS if the connection has been successfully initiated, + * otherwise AWS_OP_ERR and aws_last_error() will be set. + */ +AWS_MQTT_API +int aws_mqtt_client_connection_reconnect( + struct aws_mqtt_client_connection *connection, + aws_mqtt_client_on_connection_complete_fn *on_connection_complete, + void *userdata); /** * Closes the connection asyncronously, calls the on_disconnect callback, and destroys the connection object. @@ -224,7 +277,10 @@ int aws_mqtt_client_connection_connect( * otherwise AWS_OP_ERR and aws_last_error() is set. */ AWS_MQTT_API -int aws_mqtt_client_connection_disconnect(struct aws_mqtt_client_connection *connection); +int aws_mqtt_client_connection_disconnect( + struct aws_mqtt_client_connection *connection, + aws_mqtt_client_on_disconnect_fn *on_disconnect, + void *userdata); /** * Subscribe to topic filters. on_publish will be called when a PUBLISH matching each topic_filter is received. @@ -240,7 +296,7 @@ AWS_MQTT_API uint16_t aws_mqtt_client_connection_subscribe_multiple( struct aws_mqtt_client_connection *connection, const struct aws_array_list *topic_filters, - aws_mqtt_suback_fn *on_suback, + aws_mqtt_suback_multi_fn *on_suback, void *on_suback_ud); /** @@ -265,7 +321,7 @@ uint16_t aws_mqtt_client_connection_subscribe( aws_mqtt_client_publish_received_fn *on_publish, void *on_publish_ud, aws_mqtt_userdata_cleanup_fn *on_ud_cleanup, - aws_mqtt_suback_single_fn *on_suback, + aws_mqtt_suback_fn *on_suback, void *on_suback_ud); /** diff --git a/include/aws/mqtt/mqtt.h b/include/aws/mqtt/mqtt.h index 52d6758c..df3aaac9 100644 --- a/include/aws/mqtt/mqtt.h +++ b/include/aws/mqtt/mqtt.h @@ -52,6 +52,8 @@ enum aws_mqtt_error { AWS_ERROR_MQTT_INVALID_TOPIC, AWS_ERROR_MQTT_TIMEOUT, AWS_ERROR_MQTT_PROTOCOL_ERROR, + AWS_ERROR_MQTT_NOT_CONNECTED, + AWS_ERROR_MQTT_ALREADY_CONNECTED, AWS_ERROR_END_MQTT_RANGE = 0x1800, }; diff --git a/include/aws/mqtt/private/client_impl.h b/include/aws/mqtt/private/client_impl.h index 92e3eced..6fc71dcf 100644 --- a/include/aws/mqtt/private/client_impl.h +++ b/include/aws/mqtt/private/client_impl.h @@ -30,18 +30,25 @@ #include #include -#define MQTT_CLIENT_CALL_CALLBACK(client_ptr, callback, ...) \ +#define MQTT_CLIENT_CALL_CALLBACK(client_ptr, callback) \ do { \ - if (client_ptr->callbacks.callback) { \ - client_ptr->callbacks.callback(client_ptr, __VA_ARGS__, client_ptr->callbacks.user_data); \ + if ((client_ptr)->callback) { \ + (client_ptr)->callback((client_ptr), (client_ptr)->callback##_ud); \ + } \ + } while (false) +#define MQTT_CLIENT_CALL_CALLBACK_ARGS(client_ptr, callback, ...) \ + do { \ + if ((client_ptr)->callback) { \ + (client_ptr)->callback((client_ptr), __VA_ARGS__, (client_ptr)->callback##_ud); \ } \ } while (false) enum aws_mqtt_client_connection_state { - AWS_MQTT_CLIENT_STATE_INIT, AWS_MQTT_CLIENT_STATE_CONNECTING, AWS_MQTT_CLIENT_STATE_CONNECTED, + AWS_MQTT_CLIENT_STATE_RECONNECTING, AWS_MQTT_CLIENT_STATE_DISCONNECTING, + AWS_MQTT_CLIENT_STATE_DISCONNECTED, }; enum aws_mqtt_client_request_state { @@ -77,6 +84,12 @@ struct aws_mqtt_outstanding_request { void *on_complete_ud; }; +struct aws_mqtt_reconnect_task { + struct aws_task task; + struct aws_atomic_var connection_ptr; + struct aws_allocator *allocator; +}; + struct aws_mqtt_client_connection { struct aws_allocator *allocator; @@ -89,8 +102,15 @@ struct aws_mqtt_client_connection { struct aws_tls_connection_options *tls_options; struct aws_socket_options *socket_options; - /* User callbacks */ - struct aws_mqtt_client_connection_callbacks callbacks; + /* User connection callbacks */ + aws_mqtt_client_on_connection_complete_fn *on_connection_complete; + void *on_connection_complete_ud; + aws_mqtt_client_on_disconnect_fn *on_disconnect; + void *on_disconnect_ud; + aws_mqtt_client_on_connection_interrupted_fn *on_interrupted; + void *on_interrupted_ud; + aws_mqtt_client_on_connection_resumed_fn *on_resumed; + void *on_resumed_ud; /* The state of the connection */ enum aws_mqtt_client_connection_state state; @@ -114,13 +134,15 @@ struct aws_mqtt_client_connection { struct aws_linked_list list; struct aws_mutex mutex; } pending_requests; + struct aws_mqtt_reconnect_task *reconnect_task; uint64_t last_pingresp_timestamp; struct { - uint64_t current; - uint64_t min; - uint64_t max; + uint64_t current; /* seconds */ + uint64_t min; /* seconds */ + uint64_t max; /* seconds */ + uint64_t next_attempt; /* milliseconds */ } reconnect_timeouts; /* If an incomplete packet arrives, store the data here. */ diff --git a/source/client.c b/source/client.c index 4cdbbdff..73a3c932 100644 --- a/source/client.c +++ b/source/client.c @@ -73,13 +73,19 @@ static void s_mqtt_client_init( (void)bootstrap; + struct aws_mqtt_client_connection *connection = user_data; + if (error_code != AWS_OP_SUCCESS) { - /* No need to call error callback, s_mqtt_client_shutdown will be called, which will call to user */ + if (connection->state == AWS_MQTT_CLIENT_STATE_RECONNECTING) { + /* If reconnect attempt failed, schedule the next attempt */ + struct aws_event_loop *el = + aws_event_loop_group_get_next_loop(connection->client->bootstrap->event_loop_group); + aws_event_loop_schedule_task_future( + el, &connection->reconnect_task->task, connection->reconnect_timeouts.next_attempt); + } return; } - struct aws_mqtt_client_connection *connection = user_data; - /* Reset the current timeout timer */ connection->reconnect_timeouts.current = connection->reconnect_timeouts.min; @@ -142,7 +148,7 @@ static void s_mqtt_client_init( return; handle_error: - MQTT_CLIENT_CALL_CALLBACK(connection, on_connection_failed, aws_last_error()); + MQTT_CLIENT_CALL_CALLBACK_ARGS(connection, on_connection_complete, aws_last_error(), 0, false); if (message) { aws_mem_release(message->allocator, message); @@ -150,30 +156,38 @@ static void s_mqtt_client_init( } static void s_attempt_reconect(struct aws_task *task, void *userdata, enum aws_task_status status) { - struct aws_mqtt_client_connection *connection = userdata; - if (status == AWS_TASK_STATUS_RUN_READY && !connection->slot) { - /* If the task is not cancelled and a connection has not succeeded, attempt reconnect */ + (void)task; - aws_mqtt_client_connection_connect(connection, NULL, connection->clean_session, connection->keep_alive_time); + struct aws_mqtt_reconnect_task *reconnect = userdata; + struct aws_mqtt_client_connection *connection = aws_atomic_load_ptr(&reconnect->connection_ptr); - struct aws_event_loop *el = aws_event_loop_group_get_next_loop(connection->client->bootstrap->event_loop_group); + if (status == AWS_TASK_STATUS_RUN_READY && connection) { + /* If the task is not cancelled and a connection has not succeeded, attempt reconnect */ - uint64_t ttr = 0; - aws_event_loop_current_clock_time(el, &ttr); - ttr += aws_timestamp_convert( + aws_high_res_clock_get_ticks(&connection->reconnect_timeouts.next_attempt); + connection->reconnect_timeouts.next_attempt += aws_timestamp_convert( connection->reconnect_timeouts.current, AWS_TIMESTAMP_SECS, AWS_TIMESTAMP_NANOS, NULL); - connection->reconnect_timeouts.current *= 2; - if (connection->reconnect_timeouts.current > connection->reconnect_timeouts.max) { + /* Check before multipying to avoid potential overflow */ + if (connection->reconnect_timeouts.current > connection->reconnect_timeouts.max / 2) { connection->reconnect_timeouts.current = connection->reconnect_timeouts.max; + } else { + connection->reconnect_timeouts.current *= 2; } - /* Schedule checkup task */ - aws_event_loop_schedule_task_future(el, task, ttr); + if (aws_mqtt_client_connection_reconnect( + connection, connection->on_connection_complete, connection->on_connection_complete_ud)) { + + /* If reconnect attempt failed, schedule the next attempt */ + struct aws_event_loop *el = + aws_event_loop_group_get_next_loop(connection->client->bootstrap->event_loop_group); + aws_event_loop_schedule_task_future( + el, &connection->reconnect_task->task, connection->reconnect_timeouts.next_attempt); + } } else { - aws_mem_release(connection->allocator, task); + aws_mem_release(reconnect->allocator, reconnect); } } @@ -194,22 +208,47 @@ static void s_mqtt_client_shutdown( connection->slot = NULL; } - /* Alert the connection we've shutdown */ - bool attempt_reconnect = - connection->state != AWS_MQTT_CLIENT_STATE_DISCONNECTING && error_code != AWS_IO_TLS_ERROR_NEGOTIATION_FAILURE; - if (connection->callbacks.on_disconnect) { - attempt_reconnect = - connection->callbacks.on_disconnect(connection, error_code, connection->callbacks.user_data); - } + /* Call appropriate callback. */ + if (connection->state == AWS_MQTT_CLIENT_STATE_DISCONNECTING) { + + connection->state = AWS_MQTT_CLIENT_STATE_DISCONNECTED; + + /* Successfully shutdown, so clear the outstanding requests */ + aws_hash_table_clean_up(&connection->outstanding_requests.table); + + MQTT_CLIENT_CALL_CALLBACK(connection, on_disconnect); + + } else if (connection->state == AWS_MQTT_CLIENT_STATE_CONNECTING) { + + connection->state = AWS_MQTT_CLIENT_STATE_DISCONNECTED; + MQTT_CLIENT_CALL_CALLBACK_ARGS(connection, on_connection_complete, error_code, 0, false); + + } else { + + assert( + connection->state == AWS_MQTT_CLIENT_STATE_CONNECTED || + connection->state == AWS_MQTT_CLIENT_STATE_RECONNECTING); + + if (connection->state == AWS_MQTT_CLIENT_STATE_CONNECTED) { + + connection->state = AWS_MQTT_CLIENT_STATE_RECONNECTING; + MQTT_CLIENT_CALL_CALLBACK_ARGS(connection, on_interrupted, error_code); + } - if (attempt_reconnect) { - /* Unintentionally disconnecting, reconnect */ + assert( + connection->state == AWS_MQTT_CLIENT_STATE_RECONNECTING || + connection->state == AWS_MQTT_CLIENT_STATE_DISCONNECTING); - struct aws_task *reconnect_task = aws_mem_acquire(connection->allocator, sizeof(struct aws_task)); - aws_task_init(reconnect_task, s_attempt_reconect, connection); + /* This will only be true if the user called disconnect from the on_interrupted callback */ + if (connection->state == AWS_MQTT_CLIENT_STATE_DISCONNECTING) { + connection->state = AWS_MQTT_CLIENT_STATE_DISCONNECTED; + MQTT_CLIENT_CALL_CALLBACK(connection, on_disconnect); - /* Attempt the reconnect immediately */ - reconnect_task->fn(reconnect_task, reconnect_task->arg, AWS_TASK_STATUS_RUN_READY); + } else { + /* Attempt the reconnect immediately, which will schedule a task to retry if it doesn't succeed */ + connection->reconnect_task->task.fn( + &connection->reconnect_task->task, connection->reconnect_task->task.arg, AWS_TASK_STATUS_RUN_READY); + } } } @@ -226,7 +265,7 @@ static void s_outstanding_request_destroy(void *item) { if (request->cancelled) { /* Task ran as cancelled already, clean up the memory */ - aws_mem_release(request->allocator, request); + aws_memory_pool_release(&request->connection->requests_pool, request); } else { /* Signal task to clean up request */ request->cancelled = true; @@ -235,7 +274,6 @@ static void s_outstanding_request_destroy(void *item) { struct aws_mqtt_client_connection *aws_mqtt_client_connection_new( struct aws_mqtt_client *client, - struct aws_mqtt_client_connection_callbacks callbacks, const struct aws_byte_cursor *host_name, uint16_t port, struct aws_socket_options *socket_options, @@ -258,8 +296,7 @@ struct aws_mqtt_client_connection *aws_mqtt_client_connection_new( connection->port = port; connection->tls_options = tls_options; connection->socket_options = socket_options; - connection->callbacks = callbacks; - connection->state = AWS_MQTT_CLIENT_STATE_INIT; + connection->state = AWS_MQTT_CLIENT_STATE_DISCONNECTED; connection->reconnect_timeouts.min = 1; connection->reconnect_timeouts.max = 128; aws_mutex_init(&connection->outstanding_requests.mutex); @@ -275,17 +312,6 @@ struct aws_mqtt_client_connection *aws_mqtt_client_connection_new( goto handle_error; } - if (aws_hash_table_init( - &connection->outstanding_requests.table, - connection->allocator, - sizeof(struct aws_mqtt_outstanding_request *), - s_hash_uint16_t, - s_uint16_t_eq, - NULL, - &s_outstanding_request_destroy)) { - - goto handle_error; - } if (aws_memory_pool_init( &connection->requests_pool, connection->allocator, 32, sizeof(struct aws_mqtt_outstanding_request))) { @@ -328,6 +354,7 @@ struct aws_mqtt_client_connection *aws_mqtt_client_connection_new( void aws_mqtt_client_connection_destroy(struct aws_mqtt_client_connection *connection) { assert(connection); + assert(connection->state == AWS_MQTT_CLIENT_STATE_DISCONNECTED); aws_string_destroy(connection->host_name); @@ -352,7 +379,7 @@ void aws_mqtt_client_connection_destroy(struct aws_mqtt_client_connection *conne aws_mqtt_topic_tree_clean_up(&connection->subscriptions); /* Cleanup outstanding requests */ - aws_hash_table_clean_up(&connection->outstanding_requests.table); + assert(connection->outstanding_requests.table.p_impl == NULL); aws_memory_pool_clean_up(&connection->requests_pool); if (connection->slot) { @@ -436,6 +463,21 @@ int aws_mqtt_client_connection_set_reconnect_timeout( return AWS_OP_SUCCESS; } +int aws_mqtt_client_connection_set_connection_interruption_handlers( + struct aws_mqtt_client_connection *connection, + aws_mqtt_client_on_connection_interrupted_fn *on_interrupted, + void *on_interrupted_ud, + aws_mqtt_client_on_connection_resumed_fn *on_resumed, + void *on_resumed_ud) { + + connection->on_interrupted = on_interrupted; + connection->on_interrupted_ud = on_interrupted_ud; + connection->on_resumed = on_resumed; + connection->on_resumed_ud = on_resumed_ud; + + return AWS_OP_SUCCESS; +} + /******************************************************************************* * Connect ******************************************************************************/ @@ -444,35 +486,81 @@ int aws_mqtt_client_connection_connect( struct aws_mqtt_client_connection *connection, const struct aws_byte_cursor *client_id, bool clean_session, - uint16_t keep_alive_time) { + uint16_t keep_alive_time, + aws_mqtt_client_on_connection_complete_fn *on_connection_complete, + void *userdata) { + + if (connection->state != AWS_MQTT_CLIENT_STATE_DISCONNECTED) { + return aws_raise_error(AWS_ERROR_MQTT_ALREADY_CONNECTED); + } connection->state = AWS_MQTT_CLIENT_STATE_CONNECTING; connection->clean_session = clean_session; connection->keep_alive_time = keep_alive_time; - if (client_id) { - /* Clean up old client_id */ - if (connection->client_id.buffer) { - aws_byte_buf_clean_up(&connection->client_id); - } + /* Clean up old client_id */ + if (connection->client_id.buffer) { + aws_byte_buf_clean_up(&connection->client_id); + } - /* Only set connection->client_id if a new one was provided */ - struct aws_byte_buf client_id_buf = aws_byte_buf_from_array(client_id->ptr, client_id->len); - if (aws_byte_buf_init_copy(&connection->client_id, connection->allocator, &client_id_buf)) { - return AWS_OP_ERR; - } - } else { - /* If client_id not passed, one must already be set. */ - assert(connection->client_id.buffer); + /* Create the reconnect task for use later (probably) */ + assert(!connection->reconnect_task); + connection->reconnect_task = aws_mem_acquire(connection->allocator, sizeof(struct aws_mqtt_reconnect_task)); + if (!connection->reconnect_task) { + return AWS_OP_ERR; + } + aws_atomic_init_ptr(&connection->reconnect_task->connection_ptr, connection); + connection->reconnect_task->allocator = connection->allocator; + aws_task_init(&connection->reconnect_task->task, s_attempt_reconect, connection->reconnect_task); + + /* Only set connection->client_id if a new one was provided */ + struct aws_byte_buf client_id_buf = aws_byte_buf_from_array(client_id->ptr, client_id->len); + if (aws_byte_buf_init_copy(&connection->client_id, connection->allocator, &client_id_buf)) { + aws_mem_release(connection->allocator, connection->reconnect_task); + return AWS_OP_ERR; + } + + if (aws_mqtt_client_connection_reconnect(connection, on_connection_complete, userdata)) { + aws_mem_release(connection->allocator, connection->reconnect_task); + aws_byte_buf_clean_up(&connection->client_id); + return AWS_OP_ERR; } - if (clean_session) { + return AWS_OP_SUCCESS; +} + +/******************************************************************************* + * Reconnect + ******************************************************************************/ + +int aws_mqtt_client_connection_reconnect( + struct aws_mqtt_client_connection *connection, + aws_mqtt_client_on_connection_complete_fn *on_connection_complete, + void *userdata) { + + connection->on_connection_complete = on_connection_complete; + connection->on_connection_complete_ud = userdata; + + if (connection->clean_session) { /* If clean_session is set, all subscriptions will be reset by the server, so we can clean the local tree out too. */ aws_mqtt_topic_tree_clean_up(&connection->subscriptions); aws_mqtt_topic_tree_init(&connection->subscriptions, connection->allocator); } + /* Init the outstanding requests hash table, the lifetime is limited to that of the socket connection */ + if (aws_hash_table_init( + &connection->outstanding_requests.table, + connection->allocator, + sizeof(struct aws_mqtt_outstanding_request *), + s_hash_uint16_t, + s_uint16_t_eq, + NULL, + &s_outstanding_request_destroy)) { + + return AWS_OP_ERR; + } + int result = 0; if (connection->tls_options) { result = aws_client_bootstrap_new_tls_socket_channel( @@ -496,7 +584,6 @@ int aws_mqtt_client_connection_connect( } if (result) { /* Connection attempt failed */ - MQTT_CLIENT_CALL_CALLBACK(connection, on_connection_failed, aws_last_error()); return AWS_OP_ERR; } @@ -507,13 +594,23 @@ int aws_mqtt_client_connection_connect( * Disconnect ******************************************************************************/ -int aws_mqtt_client_connection_disconnect(struct aws_mqtt_client_connection *connection) { +int aws_mqtt_client_connection_disconnect( + struct aws_mqtt_client_connection *connection, + aws_mqtt_client_on_disconnect_fn *on_disconnect, + void *userdata) { + + if (connection->state == AWS_MQTT_CLIENT_STATE_CONNECTED || + connection->state == AWS_MQTT_CLIENT_STATE_RECONNECTING) { + + connection->on_disconnect = on_disconnect; + connection->on_disconnect_ud = userdata; - if (connection && connection->slot) { mqtt_disconnect_impl(connection, AWS_OP_SUCCESS); + + return AWS_OP_SUCCESS; } - return AWS_OP_SUCCESS; + return aws_raise_error(AWS_ERROR_MQTT_NOT_CONNECTED); } /******************************************************************************* @@ -542,7 +639,7 @@ struct subscribe_task_arg { /* true if transaction was committed to the topic tree, false requires a retry */ bool tree_updated; - aws_mqtt_suback_fn *on_suback; + aws_mqtt_suback_multi_fn *on_suback; void *on_suback_ud; }; @@ -673,7 +770,7 @@ static void s_subscribe_complete( uint16_t aws_mqtt_client_connection_subscribe_multiple( struct aws_mqtt_client_connection *connection, const struct aws_array_list *topic_filters, - aws_mqtt_suback_fn *on_suback, + aws_mqtt_suback_multi_fn *on_suback, void *on_suback_ud) { assert(connection); @@ -778,7 +875,7 @@ static void s_subscribe_single_complete( assert(result == AWS_OP_SUCCESS); /* There needs to be exactly 1 topic in this list */ (void)result; - aws_mqtt_suback_single_fn *suback = (aws_mqtt_suback_single_fn *)task_arg->on_suback; + aws_mqtt_suback_fn *suback = (aws_mqtt_suback_fn *)task_arg->on_suback; suback(connection, packet_id, &topic->request.topic, topic->request.qos, error_code, task_arg->on_suback_ud); } @@ -794,7 +891,7 @@ uint16_t aws_mqtt_client_connection_subscribe( aws_mqtt_client_publish_received_fn *on_publish, void *on_publish_ud, aws_mqtt_userdata_cleanup_fn *on_ud_cleanup, - aws_mqtt_suback_single_fn *on_suback, + aws_mqtt_suback_fn *on_suback, void *on_suback_ud) { assert(connection); @@ -822,7 +919,7 @@ uint16_t aws_mqtt_client_connection_subscribe( AWS_ZERO_STRUCT(*task_arg); task_arg->connection = connection; - task_arg->on_suback = (aws_mqtt_suback_fn *)on_suback; + task_arg->on_suback = (aws_mqtt_suback_multi_fn *)on_suback; task_arg->on_suback_ud = on_suback_ud; aws_array_list_init_static(&task_arg->topics, task_topic_storage, 1, sizeof(void *)); diff --git a/source/client_channel_handler.c b/source/client_channel_handler.c index e8ca8446..f760a0bc 100644 --- a/source/client_channel_handler.c +++ b/source/client_channel_handler.c @@ -50,9 +50,21 @@ static int s_packet_handler_connack( return AWS_OP_ERR; } + /* User requested disconnect, don't do anything */ + if (connection->state >= AWS_MQTT_CLIENT_STATE_DISCONNECTING) { + return AWS_OP_SUCCESS; + } + + const bool was_reconnecting = connection->state == AWS_MQTT_CLIENT_STATE_RECONNECTING; + connection->state = AWS_MQTT_CLIENT_STATE_CONNECTED; - MQTT_CLIENT_CALL_CALLBACK(connection, on_connack, connack.connect_return_code, connack.session_present); + if (was_reconnecting) { + MQTT_CLIENT_CALL_CALLBACK_ARGS(connection, on_resumed, connack.connect_return_code, connack.session_present); + } else { + MQTT_CLIENT_CALL_CALLBACK_ARGS( + connection, on_connection_complete, AWS_OP_SUCCESS, connack.connect_return_code, connack.session_present); + } if (connack.connect_return_code == AWS_MQTT_CONNECT_ACCEPTED) { /* If successfully connected, schedule all pending tasks */ @@ -79,7 +91,7 @@ static int s_packet_handler_connack( } } else { /* If error code returned, disconnect */ - aws_mqtt_client_connection_disconnect(connection); + mqtt_disconnect_impl(connection, AWS_ERROR_MQTT_PROTOCOL_ERROR); } return AWS_OP_SUCCESS; @@ -244,7 +256,7 @@ static int s_process_mqtt_packet( /* [MQTT-3.2.0-1] The first packet sent from the Server to the Client MUST be a CONNACK Packet */ if (connection->state == AWS_MQTT_CLIENT_STATE_CONNECTING && packet_type != AWS_MQTT_PACKET_CONNACK) { - aws_mqtt_client_connection_disconnect(connection); + mqtt_disconnect_impl(connection, AWS_ERROR_MQTT_PROTOCOL_ERROR); return aws_raise_error(AWS_ERROR_MQTT_PROTOCOL_ERROR); } @@ -463,7 +475,7 @@ static void s_request_timeout_task(struct aws_channel_task *task, void *arg, enu if (request->cancelled) { /* If the request was cancelled, assume all containers are gone and just free */ - aws_mem_release(request->allocator, request); + aws_memory_pool_release(&request->connection->requests_pool, request); return; } @@ -512,7 +524,7 @@ static void s_request_timeout_task(struct aws_channel_task *task, void *arg, enu assert(was_present); aws_memory_pool_release(&request->connection->requests_pool, elem.value); - } else if (request->connection->slot) { + } else if (request->connection->state == AWS_MQTT_CLIENT_STATE_CONNECTED) { /* If not complete and online, schedule retry task */ uint64_t ttr = 0; @@ -581,7 +593,7 @@ uint16_t mqtt_create_request( aws_channel_task_init(&next_request->timeout_task, s_request_timeout_task, next_request); /* Send the request now if on channel's thread, otherwise schedule a task */ - if (!connection->slot) { + if (connection->state != AWS_MQTT_CLIENT_STATE_CONNECTED) { aws_mutex_lock(&connection->pending_requests.mutex); aws_linked_list_push_back(&connection->pending_requests.list, &next_request->list_node); aws_mutex_unlock(&connection->pending_requests.mutex); @@ -617,6 +629,13 @@ void mqtt_request_complete(struct aws_mqtt_client_connection *connection, int er void mqtt_disconnect_impl(struct aws_mqtt_client_connection *connection, int error_code) { connection->state = AWS_MQTT_CLIENT_STATE_DISCONNECTING; + + /* If there is an outstanding reconnect task, cancel it */ + if (connection->reconnect_task) { + aws_atomic_store_ptr(&connection->reconnect_task->connection_ptr, NULL); + connection->reconnect_task = NULL; + } + if (connection->slot) { aws_channel_shutdown(connection->slot->channel, error_code); } diff --git a/source/mqtt.c b/source/mqtt.c index 922b8841..27e7fd00 100644 --- a/source/mqtt.c +++ b/source/mqtt.c @@ -137,6 +137,12 @@ void aws_mqtt_load_error_strings() { AWS_DEFINE_ERROR_INFO_MQTT( AWS_ERROR_MQTT_PROTOCOL_ERROR, "Protocol error occured."), + AWS_DEFINE_ERROR_INFO_MQTT( + AWS_ERROR_MQTT_NOT_CONNECTED, + "The requested operation is invalid as the connection is not open."), + AWS_DEFINE_ERROR_INFO_MQTT( + AWS_ERROR_MQTT_ALREADY_CONNECTED, + "The requested operation is invalid as the connection is already open."), }; /* clang-format on */ #undef AWS_DEFINE_ERROR_INFO_MQTT diff --git a/tests/aws_iot_client_test.c b/tests/aws_iot_client_test.c index ac675094..17919cd0 100644 --- a/tests/aws_iot_client_test.c +++ b/tests/aws_iot_client_test.c @@ -48,7 +48,7 @@ AWS_STATIC_STRING_FROM_LITERAL(s_hostname, "a1ba5f1mpna9k5-ats.iot.us-east-1.ama enum { PUBLISHES = 20 }; -enum { PAYLOAD_LEN = 20000 }; +enum { PAYLOAD_LEN = 200 }; static uint8_t s_payload[PAYLOAD_LEN]; static uint8_t s_will_payload[] = "The client has gone offline!"; @@ -99,66 +99,69 @@ static void s_on_packet_recieved( ++args->packets_gotten; if (args->packets_gotten == PUBLISHES) { - aws_mutex_lock(args->mutex); aws_condition_variable_notify_one(args->condition_variable); aws_mutex_unlock(args->mutex); } } -static void s_mqtt_on_connack( +static bool s_all_packets_received_cond(void *userdata) { + + struct connection_args *args = userdata; + return args->packets_gotten == PUBLISHES; +} + +static void s_mqtt_on_connection_complete( struct aws_mqtt_client_connection *connection, + int error_code, enum aws_mqtt_connect_return_code return_code, bool session_present, - void *user_data) { + void *userdata) { (void)connection; + (void)error_code; (void)return_code; (void)session_present; + assert(error_code == AWS_OP_SUCCESS); assert(return_code == AWS_MQTT_CONNECT_ACCEPTED); assert(session_present == false); - struct connection_args *args = user_data; - - struct aws_byte_cursor subscribe_topic_cur = aws_byte_cursor_from_string(s_subscribe_topic); - - aws_mqtt_client_connection_subscribe( - args->connection, - &subscribe_topic_cur, - AWS_MQTT_QOS_AT_LEAST_ONCE, - &s_on_packet_recieved, - args, - NULL, - NULL, - NULL); + struct connection_args *args = userdata; aws_mutex_lock(args->mutex); aws_condition_variable_notify_one(args->condition_variable); aws_mutex_unlock(args->mutex); } -static void s_mqtt_on_connection_failed( +static void s_mqtt_on_suback( struct aws_mqtt_client_connection *connection, + uint16_t packet_id, + const struct aws_byte_cursor *topic, + enum aws_mqtt_qos qos, int error_code, - void *user_data) { + void *userdata) { (void)connection; - (void)user_data; + (void)packet_id; + (void)topic; + (void)qos; + (void)error_code; + + assert(error_code == AWS_OP_SUCCESS); - fprintf(stderr, "Error recieved: %s\n", aws_error_debug_str(error_code)); + struct connection_args *args = userdata; + + aws_mutex_lock(args->mutex); + aws_condition_variable_notify_one(args->condition_variable); + aws_mutex_unlock(args->mutex); } -static bool s_mqtt_on_disconnect(struct aws_mqtt_client_connection *connection, int error_code, void *user_data) { +static void s_mqtt_on_disconnect(struct aws_mqtt_client_connection *connection, void *userdata) { (void)connection; - if (error_code != AWS_OP_SUCCESS) { - fprintf(stderr, "Disconnected, error: %s\n", aws_error_debug_str(error_code)); - return false; - } - - struct connection_args *args = user_data; + struct connection_args *args = userdata; aws_mqtt_client_connection_destroy(args->connection); args->connection = NULL; @@ -166,8 +169,6 @@ static bool s_mqtt_on_disconnect(struct aws_mqtt_client_connection *connection, aws_mutex_lock(args->mutex); aws_condition_variable_notify_one(args->condition_variable); aws_mutex_unlock(args->mutex); - - return false; } int main(int argc, char **argv) { @@ -218,25 +219,31 @@ int main(int argc, char **argv) { socket_options.type = AWS_SOCKET_STREAM; socket_options.domain = AWS_SOCKET_IPV6; - struct aws_mqtt_client_connection_callbacks callbacks; - AWS_ZERO_STRUCT(callbacks); - callbacks.on_connack = &s_mqtt_on_connack; - callbacks.on_connection_failed = &s_mqtt_on_connection_failed; - callbacks.on_disconnect = &s_mqtt_on_disconnect; - callbacks.user_data = &args; - struct aws_mqtt_client client; aws_mqtt_client_init(&client, args.allocator, bootstrap); struct aws_byte_cursor host_name_cur = aws_byte_cursor_from_string(s_hostname); - args.connection = - aws_mqtt_client_connection_new(&client, callbacks, &host_name_cur, 8883, &socket_options, &tls_con_opt); + args.connection = aws_mqtt_client_connection_new(&client, &host_name_cur, 8883, &socket_options, &tls_con_opt); struct aws_byte_cursor will_cur = aws_byte_cursor_from_array(s_will_payload, WILL_PAYLOAD_LEN); aws_mqtt_client_connection_set_will(args.connection, &subscribe_topic_cur, 1, false, &will_cur); struct aws_byte_cursor client_id_cur = aws_byte_cursor_from_string(s_client_id); - aws_mqtt_client_connection_connect(args.connection, &client_id_cur, true, 0); + aws_mqtt_client_connection_connect(args.connection, &client_id_cur, true, 0, s_mqtt_on_connection_complete, &args); + + aws_mutex_lock(&mutex); + ASSERT_SUCCESS(aws_condition_variable_wait(&condition_variable, &mutex)); + aws_mutex_unlock(&mutex); + + aws_mqtt_client_connection_subscribe( + args.connection, + &subscribe_topic_cur, + AWS_MQTT_QOS_AT_LEAST_ONCE, + &s_on_packet_recieved, + &args, + NULL, + s_mqtt_on_suback, + &args); aws_mutex_lock(&mutex); ASSERT_SUCCESS(aws_condition_variable_wait(&condition_variable, &mutex)); @@ -261,12 +268,12 @@ int main(int argc, char **argv) { } aws_mutex_lock(&mutex); - ASSERT_SUCCESS(aws_condition_variable_wait(&condition_variable, &mutex)); + ASSERT_SUCCESS(aws_condition_variable_wait_pred(&condition_variable, &mutex, &s_all_packets_received_cond, &args)); aws_mutex_unlock(&mutex); ASSERT_UINT_EQUALS(PUBLISHES, args.packets_gotten); - aws_mqtt_client_connection_disconnect(args.connection); + aws_mqtt_client_connection_disconnect(args.connection, s_mqtt_on_disconnect, &args); aws_mutex_lock(&mutex); ASSERT_SUCCESS(aws_condition_variable_wait(&condition_variable, &mutex)); diff --git a/tests/paho_client_test.c b/tests/paho_client_test.c index b63ebafd..9aeb43f7 100644 --- a/tests/paho_client_test.c +++ b/tests/paho_client_test.c @@ -114,16 +114,19 @@ static void s_mqtt_on_puback( aws_mutex_unlock(args->mutex); } -static void s_mqtt_on_connack( +static void s_mqtt_on_connection_complete( struct aws_mqtt_client_connection *connection, + int error_code, enum aws_mqtt_connect_return_code return_code, bool session_present, void *user_data) { (void)connection; + (void)error_code; (void)return_code; (void)session_present; + assert(error_code == AWS_OP_SUCCESS); assert(return_code == AWS_MQTT_CONNECT_ACCEPTED); assert(session_present == false); @@ -136,6 +139,29 @@ static void s_mqtt_on_connack( aws_mutex_unlock(args->mutex); } +static void s_mqtt_on_interrupted(struct aws_mqtt_client_connection *connection, int error_code, void *userdata) { + + (void)connection; + (void)error_code; + (void)userdata; + + printf("Connection offline\n"); +} + +static void s_mqtt_on_resumed( + struct aws_mqtt_client_connection *connection, + enum aws_mqtt_connect_return_code return_code, + bool session_present, + void *userdata) { + + (void)connection; + (void)return_code; + (void)session_present; + (void)userdata; + + printf("Connection resumed\n"); +} + static void s_mqtt_on_unsuback( struct aws_mqtt_client_connection *connection, uint16_t packet_id, @@ -157,12 +183,9 @@ static void s_mqtt_on_unsuback( aws_mutex_unlock(args->mutex); } -static bool s_mqtt_on_disconnect(struct aws_mqtt_client_connection *connection, int error_code, void *user_data) { +static void s_mqtt_on_disconnect(struct aws_mqtt_client_connection *connection, void *user_data) { (void)connection; - (void)error_code; - - assert(error_code == AWS_OP_SUCCESS); struct connection_args *args = user_data; @@ -171,8 +194,6 @@ static bool s_mqtt_on_disconnect(struct aws_mqtt_client_connection *connection, aws_mutex_lock(args->mutex); aws_condition_variable_notify_one(args->condition_variable); aws_mutex_unlock(args->mutex); - - return false; } int main(int argc, char **argv) { @@ -207,20 +228,18 @@ int main(int argc, char **argv) { options.type = AWS_SOCKET_STREAM; options.domain = AWS_SOCKET_IPV4; - struct aws_mqtt_client_connection_callbacks callbacks; - AWS_ZERO_STRUCT(callbacks); - callbacks.on_connack = &s_mqtt_on_connack; - callbacks.on_disconnect = &s_mqtt_on_disconnect; - callbacks.user_data = &args; - struct aws_mqtt_client client; ASSERT_SUCCESS(aws_mqtt_client_init(&client, args.allocator, bootstrap)); struct aws_byte_cursor host_name_cur = aws_byte_cursor_from_string(s_hostname); - args.connection = aws_mqtt_client_connection_new(&client, callbacks, &host_name_cur, 1883, &options, NULL); + args.connection = aws_mqtt_client_connection_new(&client, &host_name_cur, 1883, &options, NULL); ASSERT_NOT_NULL(args.connection); - ASSERT_SUCCESS(aws_mqtt_client_connection_connect(args.connection, &s_client_id_1, true, 0)); + aws_mqtt_client_connection_set_connection_interruption_handlers( + args.connection, s_mqtt_on_interrupted, NULL, s_mqtt_on_resumed, NULL); + + ASSERT_SUCCESS(aws_mqtt_client_connection_connect( + args.connection, &s_client_id_1, true, 0, s_mqtt_on_connection_complete, &args)); /* Wait for connack */ aws_mutex_lock(&mutex); @@ -240,7 +259,7 @@ int main(int argc, char **argv) { printf("2 done\n"); - aws_mqtt_client_connection_disconnect(args.connection); + aws_mqtt_client_connection_disconnect(args.connection, s_mqtt_on_disconnect, &args); /* Wait for disconnack */ aws_mutex_lock(&mutex); @@ -249,7 +268,8 @@ int main(int argc, char **argv) { printf("3 done\n"); - ASSERT_SUCCESS(aws_mqtt_client_connection_connect(args.connection, &s_client_id_2, true, 0)); + ASSERT_SUCCESS(aws_mqtt_client_connection_connect( + args.connection, &s_client_id_2, true, 0, s_mqtt_on_connection_complete, &args)); /* Wait for connack */ aws_mutex_lock(&mutex); @@ -297,7 +317,7 @@ int main(int argc, char **argv) { size_t outstanding_subs = aws_hash_table_get_entry_count(&args.connection->subscriptions.root->subtopics); ASSERT_UINT_EQUALS(0, outstanding_subs); - aws_mqtt_client_connection_disconnect(args.connection); + aws_mqtt_client_connection_disconnect(args.connection, s_mqtt_on_disconnect, &args); aws_mutex_lock(&mutex); ASSERT_SUCCESS(aws_condition_variable_wait(&condition_variable, &mutex));