Skip to content

Commit

Permalink
update multi_suback behavior
Browse files Browse the repository at this point in the history
  • Loading branch information
xiazhvera committed Sep 22, 2023
1 parent 7c467e4 commit e610e20
Show file tree
Hide file tree
Showing 3 changed files with 61 additions and 47 deletions.
96 changes: 51 additions & 45 deletions source/v5/mqtt5_to_mqtt3_adapter.c
Original file line number Diff line number Diff line change
Expand Up @@ -2188,6 +2188,12 @@ static void s_aws_mqtt5_to_mqtt3_adapter_subscribe_completion_fn(
struct aws_mqtt5_to_mqtt3_adapter_operation_subscribe *subscribe_op = complete_ctx;
struct aws_mqtt_client_connection_5_impl *adapter = subscribe_op->base.adapter;
struct aws_mqtt_subscription_set_subscription_record *record = NULL;
size_t reason_code_count = 0;

if (suback != NULL) {
reason_code_count = suback->reason_code_count;
}
size_t subscription_count = aws_array_list_length(&subscribe_op->subscriptions);

if (subscribe_op->on_suback != NULL) {
AWS_LOGF_DEBUG(
Expand All @@ -2200,16 +2206,15 @@ static void s_aws_mqtt5_to_mqtt3_adapter_subscribe_completion_fn(

enum aws_mqtt_qos granted_qos = AWS_MQTT_QOS_AT_MOST_ONCE;

size_t subscription_count = aws_array_list_length(&subscribe_op->subscriptions);
if (subscription_count > 0) {
aws_array_list_get_at(&subscribe_op->subscriptions, &record, 0);
topic_filter = record->subscription_view.topic_filter;
}

if (suback != NULL) {
if (suback->reason_code_count > 0) {
granted_qos = s_convert_mqtt5_suback_reason_code_to_mqtt3_granted_qos(suback->reason_codes[0]);
}
if (reason_code_count > 0) {
granted_qos = s_convert_mqtt5_suback_reason_code_to_mqtt3_granted_qos(suback->reason_codes[0]);
} else if (record) {
granted_qos = (enum aws_mqtt_qos)(record->subscription_view.qos);
} else {
granted_qos = AWS_MQTT_QOS_FAILURE;
}
Expand All @@ -2228,50 +2233,51 @@ static void s_aws_mqtt5_to_mqtt3_adapter_subscribe_completion_fn(
"id=%p: mqtt3-to-5-adapter, completing multi-topic subscribe",
(void *)adapter);

if (suback == NULL) {
(*subscribe_op->on_multi_suback)(
&adapter->base, subscribe_op->base.id, NULL, error_code, subscribe_op->on_multi_suback_user_data);
} else {
AWS_VARIABLE_LENGTH_ARRAY(
struct aws_mqtt_topic_subscription, multi_sub_subscription_buf, suback->reason_code_count);
AWS_VARIABLE_LENGTH_ARRAY(
struct aws_mqtt_topic_subscription *, multi_sub_subscription_ptr_buf, suback->reason_code_count);
struct aws_mqtt_topic_subscription *subscription_ptr =
(struct aws_mqtt_topic_subscription *)multi_sub_subscription_buf;

struct aws_array_list multi_sub_list;
aws_array_list_init_static(
&multi_sub_list,
multi_sub_subscription_ptr_buf,
suback->reason_code_count,
sizeof(struct aws_mqtt_topic_subscription *));

size_t subscription_count = aws_array_list_length(&subscribe_op->subscriptions);

for (size_t i = 0; i < suback->reason_code_count; ++i) {
struct aws_mqtt_topic_subscription *subscription = subscription_ptr + i;
AWS_ZERO_STRUCT(*subscription);

subscription->qos = s_convert_mqtt5_suback_reason_code_to_mqtt3_granted_qos(suback->reason_codes[i]);

if (i < subscription_count) {
aws_array_list_get_at(&subscribe_op->subscriptions, &record, i);
// If the suback does not contains any data, we directly extract the data from subscribe_op
if (reason_code_count == 0) {
reason_code_count = subscription_count;
}

subscription->topic = record->subscription_view.topic_filter;
subscription->on_publish = record->subscription_view.on_publish_received;
subscription->on_publish_ud = record->subscription_view.callback_user_data;
subscription->on_cleanup = record->subscription_view.on_cleanup;
}
AWS_VARIABLE_LENGTH_ARRAY(struct aws_mqtt_topic_subscription, multi_sub_subscription_buf, reason_code_count);
AWS_VARIABLE_LENGTH_ARRAY(
struct aws_mqtt_topic_subscription *, multi_sub_subscription_ptr_buf, reason_code_count);
struct aws_mqtt_topic_subscription *subscription_ptr =
(struct aws_mqtt_topic_subscription *)multi_sub_subscription_buf;

struct aws_array_list multi_sub_list;
aws_array_list_init_static(
&multi_sub_list,
multi_sub_subscription_ptr_buf,
reason_code_count,
sizeof(struct aws_mqtt_topic_subscription *));

for (size_t i = 0; i < reason_code_count; ++i) {
struct aws_mqtt_topic_subscription *subscription = subscription_ptr + i;
AWS_ZERO_STRUCT(*subscription);

if (i < subscription_count) {
aws_array_list_get_at(&subscribe_op->subscriptions, &record, i);

subscription->topic = record->subscription_view.topic_filter;
subscription->on_publish = record->subscription_view.on_publish_received;
subscription->on_publish_ud = record->subscription_view.callback_user_data;
subscription->on_cleanup = record->subscription_view.on_cleanup;
}

aws_array_list_push_back(&multi_sub_list, &subscription);
if (suback != NULL) {
subscription->qos = s_convert_mqtt5_suback_reason_code_to_mqtt3_granted_qos(suback->reason_codes[i]);
} else {
subscription->qos = (enum aws_mqtt_qos)(record->subscription_view.qos);
}
(*subscribe_op->on_multi_suback)(
&adapter->base,
subscribe_op->base.id,
&multi_sub_list,
error_code,
subscribe_op->on_multi_suback_user_data);

aws_array_list_push_back(&multi_sub_list, &subscription);
}
(*subscribe_op->on_multi_suback)(
&adapter->base,
subscribe_op->base.id,
&multi_sub_list,
error_code,
subscribe_op->on_multi_suback_user_data);
}

aws_mqtt5_to_mqtt3_adapter_operation_table_remove_operation(
Expand Down
3 changes: 2 additions & 1 deletion tests/v3/connection_state_test.c
Original file line number Diff line number Diff line change
Expand Up @@ -619,7 +619,8 @@ static void s_on_multi_suback(

aws_mutex_lock(&state_test_data->lock);
state_test_data->subscribe_completed = true;
if (!error_code) {
// The suback would return the subscription data regardless of error_code
if (topic_subacks) {
size_t length = aws_array_list_length(topic_subacks);
for (size_t i = 0; i < length; ++i) {
struct aws_mqtt_topic_subscription *subscription = NULL;
Expand Down
9 changes: 8 additions & 1 deletion tests/v5/mqtt5_to_mqtt3_adapter_tests.c
Original file line number Diff line number Diff line change
Expand Up @@ -2785,7 +2785,8 @@ static void s_aws_mqtt5_to_mqtt3_adapter_test_fixture_record_subscribe_multi_com
.error_code = error_code,
};

if (error_code == AWS_ERROR_SUCCESS) {
// The subscription would return subscription data regardless of error code
if (topic_subacks) {
size_t granted_count = aws_array_list_length(topic_subacks);

aws_array_list_init_dynamic(
Expand Down Expand Up @@ -2957,6 +2958,12 @@ static int s_mqtt5to3_adapter_subscribe_multi_null_suback_fn(struct aws_allocato
},
};

aws_array_list_init_static_from_initialized(
&expected_events[0].granted_subscriptions,
(void *)subscriptions,
2,
sizeof(struct aws_mqtt_topic_subscription));

aws_mqtt_client_connection_disconnect(
connection, s_aws_mqtt5_to_mqtt3_adapter_test_fixture_record_disconnection_complete, &fixture);

Expand Down

0 comments on commit e610e20

Please sign in to comment.