From 3f4e129a22661d85969b947b9e81040997fd88f4 Mon Sep 17 00:00:00 2001 From: Sergey Zakhlypa Date: Thu, 9 May 2024 15:16:17 +0200 Subject: [PATCH 1/6] Docs: fix typo on Trigger Policy page Signed-off-by: Sergey Zakhlypa --- doc/pages/architecture/062-trigger_delivery_policies.md | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/doc/pages/architecture/062-trigger_delivery_policies.md b/doc/pages/architecture/062-trigger_delivery_policies.md index cb82d30e6..f31f8090f 100644 --- a/doc/pages/architecture/062-trigger_delivery_policies.md +++ b/doc/pages/architecture/062-trigger_delivery_policies.md @@ -47,7 +47,7 @@ A Trigger Delivery Policy is composed of: A single policy does not allow to retry sending events for different amount of times, but different policies may have different numbers. This is optional, but required if the policy specifies at least one handler with retry strategy. -- Event TTL: in orer to further lower the space requirement of the event queue, events may be equipped with a TTL which specifies the amount of +- Event TTL: in order to further lower the space requirement of the event queue, events may be equipped with a TTL which specifies the amount of milliseconds an event is retained in the event queue. When an event expires, it is discarded from the event queue, even if it has not been delivered. This is optional. @@ -61,4 +61,4 @@ not impact the expected behaviour if Trigger Delivery Policies are not used. - If the Astarte Trigger Engine service is replicated, events could be delivered out of order, as data from event queues are delivered to consumers in a round-robin fashion. - If the `retry` strategy is specified, in-order delivery cannot be guaranteed because a > 1 [consumer prefetch count](https://www.rabbitmq.com/consumer-prefetch.html) is being used. This allows for higher throughput at the cost of consistency. In the future, the user will be allowed to choose between having an higher number of - events handled, but out of order, or ordered event handling at a lower rate. \ No newline at end of file + events handled, but out of order, or ordered event handling at a lower rate. From 20233b7f9768048dd97e0ef8e9ad2e6933556b8e Mon Sep 17 00:00:00 2001 From: Sergey Zakhlypa Date: Thu, 9 May 2024 19:21:17 +0200 Subject: [PATCH 2/6] Update Realm Management OpenAPI - fix TriggerDeliveryPolicyErrorHandler schema - add explicit 404 errors Signed-off-by: Sergey Zakhlypa --- .../static/astarte_realm_management_api.yaml | 27 ++++++++++++------- 1 file changed, 18 insertions(+), 9 deletions(-) diff --git a/apps/astarte_realm_management_api/priv/static/astarte_realm_management_api.yaml b/apps/astarte_realm_management_api/priv/static/astarte_realm_management_api.yaml index d03bdbf96..697497836 100644 --- a/apps/astarte_realm_management_api/priv/static/astarte_realm_management_api.yaml +++ b/apps/astarte_realm_management_api/priv/static/astarte_realm_management_api.yaml @@ -325,7 +325,7 @@ paths: '403': $ref: '#/components/responses/Forbidden' '404': - $ref: '#/components/responses/NotFound' + $ref: '#/components/responses/TriggerNotFound' '500': description: Internal Server Error. delete: @@ -347,7 +347,7 @@ paths: '403': $ref: '#/components/responses/Forbidden' '404': - $ref: '#/components/responses/NotFound' + $ref: '#/components/responses/TriggerNotFound' '500': description: Internal Server Error. '/{realm_name}/policies': @@ -417,7 +417,7 @@ paths: '403': $ref: '#/components/responses/Forbidden' '404': - $ref: '#/components/responses/NotFound' + $ref: '#/components/responses/TriggerDeliveryPolicyNotFound' '500': description: Internal Server Error. delete: @@ -439,7 +439,7 @@ paths: '403': $ref: '#/components/responses/Forbidden' '404': - $ref: '#/components/responses/NotFound' + $ref: '#/components/responses/TriggerDeliveryPolicyNotFound' '409': $ref: '#/components/responses/TriggerDeliveryPolicyCurrentlyUsedError' '500': @@ -657,6 +657,15 @@ components: mappings: type: - is invalid + TriggerNotFound: + description: Requested trigger was not found + content: + application/json: + schema: + $ref: '#/components/schemas/GenericError' + example: + errors: + detail: Trigger not found TriggerValidationError: description: The provided trigger configuration is not valid. content: @@ -695,15 +704,15 @@ components: example: errors: detail: is invalid - NotFound: - description: Resource not found + TriggerDeliveryPolicyNotFound: + description: Requested trigger delivery policy was not found content: application/json: schema: $ref: '#/components/schemas/GenericError' example: errors: - detail: Not found + detail: Trigger policy not found Unauthorized: description: Authorization information is missing or invalid. content: @@ -1147,7 +1156,7 @@ components: description: >- The range of errors the handler refers to. Must be one of: 'any_error', 'client_error', 'server_error' or a custom error range (e.g. '[418, 419, 420, 500]'). - retry_strategy: + strategy: type: string enum: - discard @@ -1155,7 +1164,7 @@ components: description: What Astarte must do if an HTTP error occurs when delivering errors the handler refers to. required: - 'on' - - retry_strategy + - strategy example: 'on': any_error strategy: discard From 27bdb185bfc5fb7fe3375eeac33ae0cbca280052 Mon Sep 17 00:00:00 2001 From: Arnaldo Cesco Date: Mon, 27 May 2024 11:44:47 +0200 Subject: [PATCH 3/6] Trigger Engine: trigger policy TTLs are seconds! The message TTL argument for a policy (if set) was passed as-is to AMQP. However, TTLs in AMQP are in milliseconds, so a x1000 factor is added. Signed-off-by: Arnaldo Cesco --- .../amqp_consumer/amqp_message_consumer.ex | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/apps/astarte_trigger_engine/lib/astarte_trigger_engine/amqp_consumer/amqp_message_consumer.ex b/apps/astarte_trigger_engine/lib/astarte_trigger_engine/amqp_consumer/amqp_message_consumer.ex index ed850576a..247cf3f5e 100644 --- a/apps/astarte_trigger_engine/lib/astarte_trigger_engine/amqp_consumer/amqp_message_consumer.ex +++ b/apps/astarte_trigger_engine/lib/astarte_trigger_engine/amqp_consumer/amqp_message_consumer.ex @@ -201,7 +201,8 @@ defmodule Astarte.TriggerEngine.AMQPConsumer.AMQPMessageConsumer do }) do [] |> put_x_arg_if(maximum_capacity != nil, {"x-max-length", :signedint, maximum_capacity}) - |> put_x_arg_if(event_ttl != nil, {"x-message-ttl", :signedint, event_ttl}) + # AMQP message TTLs are in milliseconds! + |> put_x_arg_if(event_ttl != nil, {"x-message-ttl", :signedint, event_ttl * 1_000}) end defp put_x_arg_if(list, true, x_arg), do: [x_arg | list] From 11b639a53e66601a732c7ad345868161cd94a525 Mon Sep 17 00:00:00 2001 From: Arnaldo Cesco Date: Mon, 27 May 2024 11:49:00 +0200 Subject: [PATCH 4/6] Docs: trigger delivery policy TTL is in seconds! Signed-off-by: Arnaldo Cesco --- doc/pages/architecture/062-trigger_delivery_policies.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/doc/pages/architecture/062-trigger_delivery_policies.md b/doc/pages/architecture/062-trigger_delivery_policies.md index f31f8090f..49aa3689e 100644 --- a/doc/pages/architecture/062-trigger_delivery_policies.md +++ b/doc/pages/architecture/062-trigger_delivery_policies.md @@ -48,7 +48,7 @@ A Trigger Delivery Policy is composed of: This is optional, but required if the policy specifies at least one handler with retry strategy. - Event TTL: in order to further lower the space requirement of the event queue, events may be equipped with a TTL which specifies the amount of - milliseconds an event is retained in the event queue. When an event expires, it is discarded from the event queue, even if it has not been + seconds an event is retained in the event queue. When an event expires, it is discarded from the event queue, even if it has not been delivered. This is optional. From 3c4636d9c8406d6483e8f54258dd2daaac6822f5 Mon Sep 17 00:00:00 2001 From: Arnaldo Cesco Date: Mon, 27 May 2024 14:53:17 +0200 Subject: [PATCH 5/6] Update CHANGELOG.md Signed-off-by: Arnaldo Cesco --- CHANGELOG.md | 2 ++ 1 file changed, 2 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 2ef31e664..9cd611717 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -16,6 +16,8 @@ and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0. in payloads). - [astarte_data_updater_plant] do not leak consumer channels in corner cases. - [astarte_data_updater_plant] do not leak producer channels in corner cases. +- [astarte_trigger_engine] Always treat event TTL for trigger policies in seconds, + not milliseconds. ## [1.1.1] - 2023-11-15 ### Fixed From e9f4290f1d22fd2b4afeb7fde3bf6ac6ef61791d Mon Sep 17 00:00:00 2001 From: Arnaldo Cesco Date: Tue, 28 May 2024 18:16:39 +0200 Subject: [PATCH 6/6] Trigger Engine: do not crash when trigger policy has no ttl [#951](https://github.com/astarte-platform/astarte/pull/951) fixed the TTL of policies, but it introduced a bug due to Elixir's by value evaluation strategy. Fix this by making the queue x-args generation lazy. Signed-off-by: Arnaldo Cesco --- .../amqp_consumer/amqp_message_consumer.ex | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/apps/astarte_trigger_engine/lib/astarte_trigger_engine/amqp_consumer/amqp_message_consumer.ex b/apps/astarte_trigger_engine/lib/astarte_trigger_engine/amqp_consumer/amqp_message_consumer.ex index 247cf3f5e..d669b7b04 100644 --- a/apps/astarte_trigger_engine/lib/astarte_trigger_engine/amqp_consumer/amqp_message_consumer.ex +++ b/apps/astarte_trigger_engine/lib/astarte_trigger_engine/amqp_consumer/amqp_message_consumer.ex @@ -196,17 +196,17 @@ defmodule Astarte.TriggerEngine.AMQPConsumer.AMQPMessageConsumer do end defp generate_policy_x_args(%PolicyStruct{ - maximum_capacity: maximum_capacity, + maximum_capacity: max_capacity, event_ttl: event_ttl }) do [] - |> put_x_arg_if(maximum_capacity != nil, {"x-max-length", :signedint, maximum_capacity}) + |> put_x_arg_if(max_capacity != nil, fn _ -> {"x-max-length", :signedint, max_capacity} end) # AMQP message TTLs are in milliseconds! - |> put_x_arg_if(event_ttl != nil, {"x-message-ttl", :signedint, event_ttl * 1_000}) + |> put_x_arg_if(event_ttl != nil, fn _ -> {"x-message-ttl", :signedint, event_ttl * 1_000} end) end - defp put_x_arg_if(list, true, x_arg), do: [x_arg | list] - defp put_x_arg_if(list, false, _x_arg), do: list + defp put_x_arg_if(list, true, x_arg_fun), do: [x_arg_fun.() | list] + defp put_x_arg_if(list, false, _x_arg_fun), do: list defp generate_queue_name(realm, policy) do "#{realm}_#{policy}_queue"