From 9880dbbf2a8629efd5fae4ae4661b76cd3a8a79c Mon Sep 17 00:00:00 2001 From: Yunhao Ling Date: Tue, 15 Dec 2020 23:27:25 -0800 Subject: [PATCH] message footer and delivery annotation should be encoded into the outgoing payload --- src/message_sender.c | 66 ++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 66 insertions(+) diff --git a/src/message_sender.c b/src/message_sender.c index 2ee407df..b5627ba4 100644 --- a/src/message_sender.c +++ b/src/message_sender.c @@ -208,6 +208,8 @@ static SEND_ONE_MESSAGE_RESULT send_one_message(MESSAGE_SENDER_INSTANCE* message AMQP_VALUE body_amqp_value = NULL; size_t body_data_count = 0; AMQP_VALUE msg_annotations = NULL; + AMQP_VALUE footer = NULL; + AMQP_VALUE delivery_annotations = NULL; bool is_error = false; // message header @@ -301,6 +303,38 @@ static SEND_ONE_MESSAGE_RESULT send_one_message(MESSAGE_SENDER_INSTANCE* message } } + // footer + if ((!is_error) && + (message_get_footer(message, &footer) == 0) && + (footer != NULL)) + { + if (amqpvalue_get_encoded_size(footer, &encoded_size) != 0) + { + LogError("Cannot obtain footer encoded size"); + is_error = true; + } + else + { + total_encoded_size += encoded_size; + } + } + + // delivery annotations + if ((!is_error) && + (message_get_delivery_annotations(message, &delivery_annotations) == 0) && + (delivery_annotations != NULL)) + { + if (amqpvalue_get_encoded_size(delivery_annotations, &encoded_size) != 0) + { + LogError("Cannot obtain delivery annotations encoded size"); + is_error = true; + } + else + { + total_encoded_size += encoded_size; + } + } + if (is_error) { result = SEND_ONE_MESSAGE_ERROR; @@ -462,6 +496,28 @@ static SEND_ONE_MESSAGE_RESULT send_one_message(MESSAGE_SENDER_INSTANCE* message log_message_chunk(message_sender, "Application properties:", application_properties_value); } + if ((result == SEND_ONE_MESSAGE_OK) && (footer != NULL)) + { + if (amqpvalue_encode(footer, encode_bytes, &payload) != 0) + { + LogError("Cannot encode footer value"); + result = SEND_ONE_MESSAGE_ERROR; + } + + log_message_chunk(message_sender, "Footer:", footer); + } + + if ((result == SEND_ONE_MESSAGE_OK) && (delivery_annotations != NULL)) + { + if (amqpvalue_encode(delivery_annotations, encode_bytes, &payload) != 0) + { + LogError("Cannot encode delivery annotations value"); + result = SEND_ONE_MESSAGE_ERROR; + } + + log_message_chunk(message_sender, "Delivery annotations:", delivery_annotations); + } + if (result == SEND_ONE_MESSAGE_OK) { switch (message_body_type) @@ -594,6 +650,16 @@ static SEND_ONE_MESSAGE_RESULT send_one_message(MESSAGE_SENDER_INSTANCE* message { properties_destroy(properties); } + + if (footer != NULL) + { + annotations_destroy(footer); + } + + if (delivery_annotations != NULL) + { + annotations_destroy(delivery_annotations); + } } return result;