Skip to content

Commit

Permalink
Update callback scheme to the one we extensively discussed (#45)
Browse files Browse the repository at this point in the history
  • Loading branch information
ColdenCullen authored Jan 8, 2019
1 parent b1675aa commit 2537e40
Show file tree
Hide file tree
Showing 8 changed files with 404 additions and 175 deletions.
116 changes: 86 additions & 30 deletions include/aws/mqtt/client.h
Original file line number Diff line number Diff line change
Expand Up @@ -38,41 +38,51 @@ struct aws_mqtt_client {

struct aws_mqtt_client_connection;

struct aws_mqtt_client_connection_callbacks {
/* Called if the connection to the server is not completed.
* Note that if a CONNACK is received, this function will not be called no matter what the return code is */
void (*on_connection_failed)(struct aws_mqtt_client_connection *connection, int error_code, void *user_data);
/* Called when a connection acknowlegement is received.
* If return_code is not ACCEPT, the connetion is automatically closed. */
void (*on_connack)(
struct aws_mqtt_client_connection *connection,
enum aws_mqtt_connect_return_code return_code,
bool session_present,
void *user_data);
/* Called when a connection is closed, right before any resources are deleted.
* Return true to attempt a reconnect. */
bool (*on_disconnect)(struct aws_mqtt_client_connection *connection, int error_code, void *user_data);

void *user_data;
};

/** Callback called when a request roundtrip is complete (QoS0 immediately, QoS1 on PUBACK, QoS2 on PUBCOMP). */
typedef void(aws_mqtt_op_complete_fn)(
struct aws_mqtt_client_connection *connection,
uint16_t packet_id,
int error_code,
void *userdata);

/**
* Called when a connection attempt is completed, either in success or error.
*
* If error code is AWS_OP_SUCCESS, then a CONNACK has been received from the server and return_code and session_present
* contain the values received.
* If error_code is not AWS_OP_SUCCESS, it refers to the internal error that occured during connection, and return_code
* and session_present are invalid.
*/
typedef void(aws_mqtt_client_on_connection_complete_fn)(
struct aws_mqtt_client_connection *connection,
int error_code,
enum aws_mqtt_connect_return_code return_code,
bool session_present,
void *userdata);

/* Called if the connection to the server is lost. */
typedef void(aws_mqtt_client_on_connection_interrupted_fn)(
struct aws_mqtt_client_connection *connection,
int error_code,
void *userdata);

/* Called when a connection to the server is resumed. */
typedef void(aws_mqtt_client_on_connection_resumed_fn)(
struct aws_mqtt_client_connection *connection,
enum aws_mqtt_connect_return_code return_code,
bool session_present,
void *userdata);

/** Called when a multi-topic subscription request is complete */
typedef void(aws_mqtt_suback_fn)(
typedef void(aws_mqtt_suback_multi_fn)(
struct aws_mqtt_client_connection *connection,
uint16_t packet_id,
const struct aws_array_list *topic_subacks, /* contains aws_mqtt_topic_subscription pointers */
int error_code,
void *userdata);

/** Called when a single-topic subscription request is complete */
typedef void(aws_mqtt_suback_single_fn)(
typedef void(aws_mqtt_suback_fn)(
struct aws_mqtt_client_connection *connection,
uint16_t packet_id,
const struct aws_byte_cursor *topic,
Expand All @@ -85,7 +95,10 @@ typedef void(aws_mqtt_client_publish_received_fn)(
struct aws_mqtt_client_connection *connection,
const struct aws_byte_cursor *topic,
const struct aws_byte_cursor *payload,
void *user_data);
void *userdata);

/* Called when a connection is closed, right before any resources are deleted */
typedef void(aws_mqtt_client_on_disconnect_fn)(struct aws_mqtt_client_connection *connection, void *userdata);

/** Passed to subscribe() and suback callbacks */
struct aws_mqtt_topic_subscription {
Expand Down Expand Up @@ -141,7 +154,6 @@ void aws_mqtt_client_clean_up(struct aws_mqtt_client *client);
AWS_MQTT_API
struct aws_mqtt_client_connection *aws_mqtt_client_connection_new(
struct aws_mqtt_client *client,
struct aws_mqtt_client_connection_callbacks callbacks,
const struct aws_byte_cursor *host_name,
uint16_t port,
struct aws_socket_options *socket_options,
Expand Down Expand Up @@ -196,14 +208,33 @@ int aws_mqtt_client_connection_set_reconnect_timeout(
uint64_t min_timeout,
uint64_t max_timeout);

/**
* Sets the callbacks to call when a connection is interrupted and resumed.
*
* \param[in] connection The connection object
* \param[in] on_interrupted The function to call when a connection is lost
* \param[in] on_interrupted_ud Userdata for on_interrupted
* \param[in] on_resumed The function to call when a connection is resumed
* \param[in] on_resumed_ud Userdata for on_resumed
*/
AWS_MQTT_API
int aws_mqtt_client_connection_set_connection_interruption_handlers(
struct aws_mqtt_client_connection *connection,
aws_mqtt_client_on_connection_interrupted_fn *on_interrupted,
void *on_interrupted_ud,
aws_mqtt_client_on_connection_resumed_fn *on_resumed,
void *on_resumed_ud);

/**
* Opens the actual connection defined by aws_mqtt_client_connection_new.
* Once the connection is opened, on_connack will be called.
*
* \param[in] connection The connection object
* \param[in] client_id The clientid to place in the CONNECT packet. May be NULL to reuse last set client_id.
* \param[in] clean_session True to discard all server session data and start fresh
* \param[in] keep_alive_time The keep alive value to place in the CONNECT PACKET
* \param[in] connection The connection object
* \param[in] client_id The clientid to place in the CONNECT packet.
* \param[in] clean_session True to discard all server session data and start fresh
* \param[in] keep_alive_time The keep alive value to place in the CONNECT PACKET
* \param[in] on_connection_complete The callback to fire when the connection attempt completes
* \param[in] userdata Passed to the userdata param of on_connection_complete
*
* \returns AWS_OP_SUCCESS if the connection has been successfully initiated,
* otherwise AWS_OP_ERR and aws_last_error() will be set.
Expand All @@ -213,7 +244,29 @@ int aws_mqtt_client_connection_connect(
struct aws_mqtt_client_connection *connection,
const struct aws_byte_cursor *client_id,
bool clean_session,
uint16_t keep_alive_time);
uint16_t keep_alive_time,
aws_mqtt_client_on_connection_complete_fn *on_connection_complete,
void *userdata);

/**
* Opens the actual connection defined by aws_mqtt_client_connection_new.
* Once the connection is opened, on_connack will be called.
*
* Must be called on a connection that has previously been open,
* as the parameters passed during the last connection will be reused.
*
* \param[in] connection The connection object
* \param[in] on_connection_complete The callback to fire when the connection attempt completes
* \param[in] userdata Passed to the userdata param of on_connection_complete
*
* \returns AWS_OP_SUCCESS if the connection has been successfully initiated,
* otherwise AWS_OP_ERR and aws_last_error() will be set.
*/
AWS_MQTT_API
int aws_mqtt_client_connection_reconnect(
struct aws_mqtt_client_connection *connection,
aws_mqtt_client_on_connection_complete_fn *on_connection_complete,
void *userdata);

/**
* Closes the connection asyncronously, calls the on_disconnect callback, and destroys the connection object.
Expand All @@ -224,7 +277,10 @@ int aws_mqtt_client_connection_connect(
* otherwise AWS_OP_ERR and aws_last_error() is set.
*/
AWS_MQTT_API
int aws_mqtt_client_connection_disconnect(struct aws_mqtt_client_connection *connection);
int aws_mqtt_client_connection_disconnect(
struct aws_mqtt_client_connection *connection,
aws_mqtt_client_on_disconnect_fn *on_disconnect,
void *userdata);

/**
* Subscribe to topic filters. on_publish will be called when a PUBLISH matching each topic_filter is received.
Expand All @@ -240,7 +296,7 @@ AWS_MQTT_API
uint16_t aws_mqtt_client_connection_subscribe_multiple(
struct aws_mqtt_client_connection *connection,
const struct aws_array_list *topic_filters,
aws_mqtt_suback_fn *on_suback,
aws_mqtt_suback_multi_fn *on_suback,
void *on_suback_ud);

/**
Expand All @@ -265,7 +321,7 @@ uint16_t aws_mqtt_client_connection_subscribe(
aws_mqtt_client_publish_received_fn *on_publish,
void *on_publish_ud,
aws_mqtt_userdata_cleanup_fn *on_ud_cleanup,
aws_mqtt_suback_single_fn *on_suback,
aws_mqtt_suback_fn *on_suback,
void *on_suback_ud);

/**
Expand Down
2 changes: 2 additions & 0 deletions include/aws/mqtt/mqtt.h
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,8 @@ enum aws_mqtt_error {
AWS_ERROR_MQTT_INVALID_TOPIC,
AWS_ERROR_MQTT_TIMEOUT,
AWS_ERROR_MQTT_PROTOCOL_ERROR,
AWS_ERROR_MQTT_NOT_CONNECTED,
AWS_ERROR_MQTT_ALREADY_CONNECTED,

AWS_ERROR_END_MQTT_RANGE = 0x1800,
};
Expand Down
40 changes: 31 additions & 9 deletions include/aws/mqtt/private/client_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,18 +30,25 @@
#include <aws/io/message_pool.h>
#include <aws/io/tls_channel_handler.h>

#define MQTT_CLIENT_CALL_CALLBACK(client_ptr, callback, ...) \
#define MQTT_CLIENT_CALL_CALLBACK(client_ptr, callback) \
do { \
if (client_ptr->callbacks.callback) { \
client_ptr->callbacks.callback(client_ptr, __VA_ARGS__, client_ptr->callbacks.user_data); \
if ((client_ptr)->callback) { \
(client_ptr)->callback((client_ptr), (client_ptr)->callback##_ud); \
} \
} while (false)
#define MQTT_CLIENT_CALL_CALLBACK_ARGS(client_ptr, callback, ...) \
do { \
if ((client_ptr)->callback) { \
(client_ptr)->callback((client_ptr), __VA_ARGS__, (client_ptr)->callback##_ud); \
} \
} while (false)

enum aws_mqtt_client_connection_state {
AWS_MQTT_CLIENT_STATE_INIT,
AWS_MQTT_CLIENT_STATE_CONNECTING,
AWS_MQTT_CLIENT_STATE_CONNECTED,
AWS_MQTT_CLIENT_STATE_RECONNECTING,
AWS_MQTT_CLIENT_STATE_DISCONNECTING,
AWS_MQTT_CLIENT_STATE_DISCONNECTED,
};

enum aws_mqtt_client_request_state {
Expand Down Expand Up @@ -77,6 +84,12 @@ struct aws_mqtt_outstanding_request {
void *on_complete_ud;
};

struct aws_mqtt_reconnect_task {
struct aws_task task;
struct aws_atomic_var connection_ptr;
struct aws_allocator *allocator;
};

struct aws_mqtt_client_connection {

struct aws_allocator *allocator;
Expand All @@ -89,8 +102,15 @@ struct aws_mqtt_client_connection {
struct aws_tls_connection_options *tls_options;
struct aws_socket_options *socket_options;

/* User callbacks */
struct aws_mqtt_client_connection_callbacks callbacks;
/* User connection callbacks */
aws_mqtt_client_on_connection_complete_fn *on_connection_complete;
void *on_connection_complete_ud;
aws_mqtt_client_on_disconnect_fn *on_disconnect;
void *on_disconnect_ud;
aws_mqtt_client_on_connection_interrupted_fn *on_interrupted;
void *on_interrupted_ud;
aws_mqtt_client_on_connection_resumed_fn *on_resumed;
void *on_resumed_ud;

/* The state of the connection */
enum aws_mqtt_client_connection_state state;
Expand All @@ -114,13 +134,15 @@ struct aws_mqtt_client_connection {
struct aws_linked_list list;
struct aws_mutex mutex;
} pending_requests;
struct aws_mqtt_reconnect_task *reconnect_task;

uint64_t last_pingresp_timestamp;

struct {
uint64_t current;
uint64_t min;
uint64_t max;
uint64_t current; /* seconds */
uint64_t min; /* seconds */
uint64_t max; /* seconds */
uint64_t next_attempt; /* milliseconds */
} reconnect_timeouts;

/* If an incomplete packet arrives, store the data here. */
Expand Down
Loading

0 comments on commit 2537e40

Please sign in to comment.