Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

311 listener2 #345

Closed
wants to merge 20 commits into from
1 change: 1 addition & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ file(GLOB AWS_MQTT_PRIV_EXPOSED_HEADERS
file(GLOB AWS_MQTT_SRC
"source/*.c"
"source/v5/*.c"
"source/request-response/*.c"
)

file(GLOB MQTT_HEADERS
Expand Down
1 change: 1 addition & 0 deletions include/aws/mqtt/mqtt.h
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ enum aws_mqtt_error {
AWS_ERROR_MQTT_CONNECTION_RESET_FOR_ADAPTER_CONNECT,
AWS_ERROR_MQTT_CONNECTION_RESUBSCRIBE_NO_TOPICS,
AWS_ERROR_MQTT_CONNECTION_SUBSCRIBE_FAILURE,
AWS_ERROR_MQTT_PROTOCOL_ADAPTER_FAILING_REASON_CODE,

AWS_ERROR_END_MQTT_RANGE = AWS_ERROR_ENUM_END_RANGE(AWS_C_MQTT_PACKAGE_ID),
};
Expand Down
4 changes: 4 additions & 0 deletions include/aws/mqtt/private/client_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
#include <aws/mqtt/private/client_impl_shared.h>
#include <aws/mqtt/private/fixed_header.h>
#include <aws/mqtt/private/mqtt311_decoder.h>
#include <aws/mqtt/private/mqtt311_listener.h>
#include <aws/mqtt/private/topic_tree.h>

#include <aws/common/hash_table.h>
Expand Down Expand Up @@ -255,6 +256,9 @@ struct aws_mqtt_client_connection_311_impl {
aws_mqtt_on_operation_statistics_fn *on_any_operation_statistics;
void *on_any_operation_statistics_ud;

/* listener callbacks */
struct aws_mqtt311_callback_set_manager callback_manager;

/* Connection tasks. */
struct aws_mqtt_reconnect_task *reconnect_task;
struct aws_channel_task ping_task;
Expand Down
19 changes: 19 additions & 0 deletions include/aws/mqtt/private/client_impl_shared.h
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,20 @@

struct aws_mqtt_client_connection;

/*
* Internal enum that indicates what type of struct the underlying impl pointer actually is. We use this
* to safely interact with private APIs on the implementation or extract the adapted 5 client directly, as
* necessary.
*/
enum aws_mqtt311_impl_type {

/* 311 connection impl can be cast to `struct aws_mqtt_client_connection_311_impl` */
AWS_MQTT311_IT_311_CONNECTION,

/* 311 connection impl can be cast to `struct aws_mqtt_client_connection_5_impl`*/
AWS_MQTT311_IT_5_ADAPTER,
};

struct aws_mqtt_client_connection_vtable {

struct aws_mqtt_client_connection *(*acquire_fn)(void *impl);
Expand Down Expand Up @@ -107,13 +121,18 @@ struct aws_mqtt_client_connection_vtable {
void *userdata);

int (*get_stats_fn)(void *impl, struct aws_mqtt_connection_operation_statistics *stats);

enum aws_mqtt311_impl_type (*get_impl_type)(const void *impl);
};

struct aws_mqtt_client_connection {
struct aws_mqtt_client_connection_vtable *vtable;
void *impl;
};

AWS_MQTT_API enum aws_mqtt311_impl_type aws_mqtt_client_connection_get_impl_type(
const struct aws_mqtt_client_connection *connection);

AWS_MQTT_API uint64_t aws_mqtt_hash_uint16_t(const void *item);

AWS_MQTT_API bool aws_mqtt_compare_uint16_t_eq(const void *a, const void *b);
Expand Down
180 changes: 180 additions & 0 deletions include/aws/mqtt/private/mqtt311_listener.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,180 @@
/**
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
* SPDX-License-Identifier: Apache-2.0.
*/

#ifndef AWS_MQTT_MQTT311_LISTENER_H
#define AWS_MQTT_MQTT311_LISTENER_H

#include <aws/mqtt/mqtt.h>

#include <aws/common/rw_lock.h>
#include <aws/mqtt/client.h>

AWS_PUSH_SANE_WARNING_LEVEL

/**
* Callback signature for when an mqtt311 listener has completely destroyed itself.
*/
typedef void(aws_mqtt311_listener_termination_completion_fn)(void *complete_ctx);

/**
* A record that tracks MQTT311 client connection callbacks which can be dynamically injected via a listener.
*
* All the callbacks that are supported here are invoked only on the 311 connection's event loop. With the
* add/remove callback set also on the event loop, everything is correctly serialized without data races.
*
* If binding additional callbacks, they must only be invoked from the connection's event loop.
*
* We only listen to connection-success because the only connection-level event we care about is a failure
* to rejoin a session (which invalidates all subscriptions that were considered valid)
*/
struct aws_mqtt311_callback_set {

/* Called from s_packet_handler_publish which is event-loop invoked */
aws_mqtt_client_publish_received_fn *publish_received_handler;

/* Called from s_packet_handler_connack which is event-loop invoked */
aws_mqtt_client_on_connection_success_fn *connection_success_handler;

void *user_data;
};

/**
* An internal type for managing chains of callbacks attached to an mqtt311 client connection. Supports chains for
* lifecycle events and incoming publish packet handling.
*
* Assumed to be owned and used only by an MQTT311 client connection.
*/
struct aws_mqtt311_callback_set_manager {
struct aws_allocator *allocator;

struct aws_mqtt_client_connection *connection;

struct aws_linked_list callback_set_entries;

uint64_t next_callback_set_entry_id;
};

/**
* Configuration options for MQTT311 listener objects.
*/
struct aws_mqtt311_listener_config {

/**
* MQTT311 client connection to listen to events on
*/
struct aws_mqtt_client_connection *connection;

/**
* Callbacks to invoke when events occur on the MQTT311 client connection
*/
struct aws_mqtt311_callback_set listener_callbacks;

/**
* Listener destruction is asynchronous and thus requires a termination callback and associated user data
* to notify the user that the listener has been fully destroyed and no further events will be received.
*/
aws_mqtt311_listener_termination_completion_fn *termination_callback;
void *termination_callback_user_data;
};

AWS_EXTERN_C_BEGIN

/**
* Creates a new MQTT311 listener object. For as long as the listener lives, incoming publishes and lifecycle events
* will be forwarded to the callbacks configured on the listener.
*
* @param allocator allocator to use
* @param config listener configuration
* @return a new aws_mqtt311_listener object
*/
AWS_MQTT_API struct aws_mqtt311_listener *aws_mqtt311_listener_new(
struct aws_allocator *allocator,
struct aws_mqtt311_listener_config *config);

/**
* Adds a reference to an mqtt311 listener.
*
* @param listener listener to add a reference to
* @return the listener object
*/
AWS_MQTT_API struct aws_mqtt311_listener *aws_mqtt311_listener_acquire(struct aws_mqtt311_listener *listener);

/**
* Removes a reference to an mqtt311 listener. When the reference count drops to zero, the listener's asynchronous
* destruction will be started.
*
* @param listener listener to remove a reference from
* @return NULL
*/
AWS_MQTT_API struct aws_mqtt311_listener *aws_mqtt311_listener_release(struct aws_mqtt311_listener *listener);

/**
* Initializes a callback set manager
*/
AWS_MQTT_API
void aws_mqtt311_callback_set_manager_init(
struct aws_mqtt311_callback_set_manager *manager,
struct aws_allocator *allocator,
struct aws_mqtt_client_connection *connection);

/**
* Cleans up a callback set manager.
*
* aws_mqtt311_callback_set_manager_init must have been previously called or this will crash.
*/
AWS_MQTT_API
void aws_mqtt311_callback_set_manager_clean_up(struct aws_mqtt311_callback_set_manager *manager);

/**
* Adds a callback set to the front of the handler chain. Returns an integer id that can be used to selectively
* remove the callback set from the manager.
*
* May only be called on the client's event loop thread.
*/
AWS_MQTT_API
uint64_t aws_mqtt311_callback_set_manager_push_front(
struct aws_mqtt311_callback_set_manager *manager,
struct aws_mqtt311_callback_set *callback_set);

/**
* Removes a callback set from the handler chain.
*
* May only be called on the client's event loop thread.
*/
AWS_MQTT_API
void aws_mqtt311_callback_set_manager_remove(
struct aws_mqtt311_callback_set_manager *manager,
uint64_t callback_set_id);

/**
* Walks the incoming publish handler chain for an MQTT311 connection, invoking each in sequence.
*
* May only be called on the client's event loop thread.
*/
AWS_MQTT_API
void aws_mqtt311_callback_set_manager_on_publish_received(
struct aws_mqtt311_callback_set_manager *manager,
const struct aws_byte_cursor *topic,
const struct aws_byte_cursor *payload,
bool dup,
enum aws_mqtt_qos qos,
bool retain);

/**
* Invokes a connection success event on each listener in the manager's collection of callback sets.
*
* May only be called on the client's event loop thread.
*/
AWS_MQTT_API
void aws_mqtt311_callback_set_manager_on_connection_success(
struct aws_mqtt311_callback_set_manager *manager,
enum aws_mqtt_connect_return_code return_code,
bool rejoined_session);

AWS_EXTERN_C_END

AWS_POP_SANE_WARNING_LEVEL

#endif /* AWS_MQTT_MQTT311_LISTENER_H */
Loading
Loading