Skip to content

Commit

Permalink
Merge pull request #953 from eddbbt/fward-master
Browse files Browse the repository at this point in the history
Forward-port changes from release-1.1 to master
  • Loading branch information
Annopaolo authored May 31, 2024
2 parents fac70ec + 8a8e114 commit 1a9c203
Show file tree
Hide file tree
Showing 4 changed files with 27 additions and 16 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,8 @@ and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0.
in payloads).
- [astarte_data_updater_plant] do not leak consumer channels in corner cases.
- [astarte_data_updater_plant] do not leak producer channels in corner cases.
- [astarte_trigger_engine] Always treat event TTL for trigger policies in seconds,
not milliseconds.

## [1.1.1] - 2023-11-15
### Fixed
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -365,7 +365,7 @@ paths:
'403':
$ref: '#/components/responses/Forbidden'
'404':
$ref: '#/components/responses/NotFound'
$ref: '#/components/responses/TriggerNotFound'
'500':
description: Internal Server Error.
delete:
Expand All @@ -387,7 +387,7 @@ paths:
'403':
$ref: '#/components/responses/Forbidden'
'404':
$ref: '#/components/responses/NotFound'
$ref: '#/components/responses/TriggerNotFound'
'500':
description: Internal Server Error.
'/{realm_name}/policies':
Expand Down Expand Up @@ -457,7 +457,7 @@ paths:
'403':
$ref: '#/components/responses/Forbidden'
'404':
$ref: '#/components/responses/NotFound'
$ref: '#/components/responses/TriggerDeliveryPolicyNotFound'
'500':
description: Internal Server Error.
delete:
Expand All @@ -479,7 +479,7 @@ paths:
'403':
$ref: '#/components/responses/Forbidden'
'404':
$ref: '#/components/responses/NotFound'
$ref: '#/components/responses/TriggerDeliveryPolicyNotFound'
'409':
$ref: '#/components/responses/TriggerDeliveryPolicyCurrentlyUsedError'
'500':
Expand Down Expand Up @@ -747,6 +747,15 @@ components:
mappings:
type:
- is invalid
TriggerNotFound:
description: Requested trigger was not found
content:
application/json:
schema:
$ref: '#/components/schemas/GenericError'
example:
errors:
detail: Trigger not found
TriggerValidationError:
description: The provided trigger configuration is not valid.
content:
Expand Down Expand Up @@ -785,15 +794,15 @@ components:
example:
errors:
detail: is invalid
NotFound:
description: Resource not found
TriggerDeliveryPolicyNotFound:
description: Requested trigger delivery policy was not found
content:
application/json:
schema:
$ref: '#/components/schemas/GenericError'
example:
errors:
detail: Not found
detail: Trigger policy not found
Unauthorized:
description: Authorization information is missing or invalid.
content:
Expand Down Expand Up @@ -1252,15 +1261,15 @@ components:
description: >-
The range of errors the handler refers to. Must be one of: 'any_error',
'client_error', 'server_error' or a custom error range (e.g. '[418, 419, 420, 500]').
retry_strategy:
strategy:
type: string
enum:
- discard
- retry
description: What Astarte must do if an HTTP error occurs when delivering errors the handler refers to.
required:
- 'on'
- retry_strategy
- strategy
example:
'on': any_error
strategy: discard
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -196,17 +196,17 @@ defmodule Astarte.TriggerEngine.AMQPConsumer.AMQPMessageConsumer do
end

defp generate_policy_x_args(%PolicyStruct{
maximum_capacity: maximum_capacity,
maximum_capacity: max_capacity,
event_ttl: event_ttl
}) do
[]
|> put_x_arg_if(maximum_capacity != nil, {"x-max-length", :signedint, maximum_capacity})
|> put_x_arg_if(max_capacity != nil, fn _ -> {"x-max-length", :signedint, max_capacity} end)
# AMQP message TTLs are in milliseconds!
|> put_x_arg_if(event_ttl != nil, {"x-message-ttl", :signedint, event_ttl * 1_000})
|> put_x_arg_if(event_ttl != nil, fn _ -> {"x-message-ttl", :signedint, event_ttl * 1_000} end)
end

defp put_x_arg_if(list, true, x_arg), do: [x_arg | list]
defp put_x_arg_if(list, false, _x_arg), do: list
defp put_x_arg_if(list, true, x_arg_fun), do: [x_arg_fun.() | list]
defp put_x_arg_if(list, false, _x_arg_fun), do: list

defp generate_queue_name(realm, policy) do
"#{realm}_#{policy}_queue"
Expand Down
4 changes: 2 additions & 2 deletions doc/pages/architecture/062-trigger_delivery_policies.md
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ A Trigger Delivery Policy is composed of:
A single policy does not allow to retry sending events for different amount of times, but different policies may have different numbers.
This is optional, but required if the policy specifies at least one handler with retry strategy.

- Event TTL: in orer to further lower the space requirement of the event queue, events may be equipped with a TTL which specifies the amount of
- Event TTL: in order to further lower the space requirement of the event queue, events may be equipped with a TTL which specifies the amount of
seconds an event is retained in the event queue. When an event expires, it is discarded from the event queue, even if it has not been
delivered. This is optional.

Expand All @@ -61,4 +61,4 @@ not impact the expected behaviour if Trigger Delivery Policies are not used.
- If the Astarte Trigger Engine service is replicated, events could be delivered out of order, as data from event queues are delivered to consumers in a round-robin fashion.
- If the `retry` strategy is specified, in-order delivery cannot be guaranteed because a > 1 [consumer prefetch count](https://www.rabbitmq.com/consumer-prefetch.html) is being used.
This allows for higher throughput at the cost of consistency. In the future, the user will be allowed to choose between having an higher number of
events handled, but out of order, or ordered event handling at a lower rate.
events handled, but out of order, or ordered event handling at a lower rate.

0 comments on commit 1a9c203

Please sign in to comment.