Skip to content

Commit

Permalink
Remove ShouldLog & limit deliv. limit not set logg
Browse files Browse the repository at this point in the history
Removes the usage of a ShouldLog parameter on several functions
and limits the logging of the message warning about the delivery_limit
not being set to the moment of queueDeclaration
  • Loading branch information
LoisSotoLopez committed Oct 24, 2024
1 parent 3b5069f commit 9dc9f97
Show file tree
Hide file tree
Showing 2 changed files with 41 additions and 47 deletions.
59 changes: 30 additions & 29 deletions deps/rabbit/src/rabbit_quorum_queue.erl
Original file line number Diff line number Diff line change
Expand Up @@ -316,26 +316,31 @@ declare_queue_error(Error, Queue, Leader, ActingUser) ->
ra_machine(Q) ->
{module, rabbit_fifo, ra_machine_config(Q)}.

gather_policy_config(Q, ShouldLog) ->
gather_policy_config(Q, IsQueueDeclaration) ->
QName = amqqueue:get_name(Q),
%% take the minimum value of the policy and the queue arg if present
MaxLength = args_policy_lookup(<<"max-length">>, fun min/2, Q),
OverflowBin = args_policy_lookup(<<"overflow">>, fun policy_has_precedence/2, Q),
Overflow = overflow(OverflowBin, drop_head, QName, ShouldLog),
Overflow = overflow(OverflowBin, drop_head, QName),
MaxBytes = args_policy_lookup(<<"max-length-bytes">>, fun min/2, Q),
DeliveryLimit = case args_policy_lookup(<<"delivery-limit">>,
fun resolve_delivery_limit/2, Q) of
undefined ->
maybe_log(ShouldLog, info,
"~ts: delivery_limit not set, defaulting to ~b",
[rabbit_misc:rs(QName), ?DEFAULT_DELIVERY_LIMIT]),
case IsQueueDeclaration of
true ->
rabbit_log:info(
"~ts: delivery_limit not set, defaulting to ~b",
[rabbit_misc:rs(QName), ?DEFAULT_DELIVERY_LIMIT]);
false ->
ok
end,
?DEFAULT_DELIVERY_LIMIT;
DL ->
DL
end,
Expires = args_policy_lookup(<<"expires">>, fun min/2, Q),
MsgTTL = args_policy_lookup(<<"message-ttl">>, fun min/2, Q),
DeadLetterHandler = dead_letter_handler(Q, Overflow, ShouldLog),
DeadLetterHandler = dead_letter_handler(Q, Overflow),
#{dead_letter_handler => DeadLetterHandler,
max_length => MaxLength,
max_bytes => MaxBytes,
Expand All @@ -348,7 +353,7 @@ gather_policy_config(Q, ShouldLog) ->
}.

ra_machine_config(Q) when ?is_amqqueue(Q) ->
PolicyConfig = gather_policy_config(Q, _ShouldLog = true),
PolicyConfig = gather_policy_config(Q, true),
QName = amqqueue:get_name(Q),
{Name, _} = amqqueue:get_pid(Q),
PolicyConfig#{
Expand Down Expand Up @@ -721,15 +726,15 @@ system_recover(quorum_queues) ->
end.

maybe_apply_policies(Q, #{config := CurrentConfig}) ->
NewPolicyConfig = gather_policy_config(Q, _ShoudLog = false),
NewPolicyConfig = gather_policy_config(Q, false),

RelevantKeys = maps:keys(NewPolicyConfig),
CurrentPolicyConfig = maps:with(RelevantKeys, CurrentConfig),

ShouldUpdate = NewPolicyConfig =/= CurrentPolicyConfig,
case ShouldUpdate of
true ->
rabbit_log:debug("Re-applying policies to ~p", [amqqueue:get_name(Q)]),
rabbit_log:debug("Re-applying policies to ~ts", [rabbit_misc:rs(amqqueue:get_name(Q))]),
policy_changed(Q),
ok;
false -> ok
Expand Down Expand Up @@ -1557,35 +1562,35 @@ reclaim_memory(Vhost, QueueName) ->
ra_log_wal:force_roll_over({?RA_WAL_NAME, Node}).

%%----------------------------------------------------------------------------
dead_letter_handler(Q, Overflow, ShouldLog) ->
dead_letter_handler(Q, Overflow) ->
Exchange = args_policy_lookup(<<"dead-letter-exchange">>, fun queue_arg_has_precedence/2, Q),
RoutingKey = args_policy_lookup(<<"dead-letter-routing-key">>, fun queue_arg_has_precedence/2, Q),
Strategy = args_policy_lookup(<<"dead-letter-strategy">>, fun queue_arg_has_precedence/2, Q),
QName = amqqueue:get_name(Q),
dlh(Exchange, RoutingKey, Strategy, Overflow, QName, ShouldLog).
dlh(Exchange, RoutingKey, Strategy, Overflow, QName).

dlh(undefined, undefined, undefined, _, _, _) ->
dlh(undefined, undefined, undefined, _, _) ->
undefined;
dlh(undefined, RoutingKey, undefined, _, QName, ShouldLog) ->
maybe_log(ShouldLog, warning, "Disabling dead-lettering for ~ts despite configured dead-letter-routing-key '~ts' "
dlh(undefined, RoutingKey, undefined, _, QName) ->
rabbit_log:warning("Disabling dead-lettering for ~ts despite configured dead-letter-routing-key '~ts' "
"because dead-letter-exchange is not configured.",
[rabbit_misc:rs(QName), RoutingKey]),
undefined;
dlh(undefined, _, Strategy, _, QName, ShouldLog) ->
maybe_log(ShouldLog, warning, "Disabling dead-lettering for ~ts despite configured dead-letter-strategy '~ts' "
dlh(undefined, _, Strategy, _, QName) ->
rabbit_log:warning("Disabling dead-lettering for ~ts despite configured dead-letter-strategy '~ts' "
"because dead-letter-exchange is not configured.",
[rabbit_misc:rs(QName), Strategy]),
undefined;
dlh(_, _, <<"at-least-once">>, reject_publish, _, _) ->
dlh(_, _, <<"at-least-once">>, reject_publish, _) ->
at_least_once;
dlh(Exchange, RoutingKey, <<"at-least-once">>, drop_head, QName, ShouldLog) ->
maybe_log(ShouldLog, warning, "Falling back to dead-letter-strategy at-most-once for ~ts "
dlh(Exchange, RoutingKey, <<"at-least-once">>, drop_head, QName) ->
rabbit_log:warning("Falling back to dead-letter-strategy at-most-once for ~ts "
"because configured dead-letter-strategy at-least-once is incompatible with "
"effective overflow strategy drop-head. To enable dead-letter-strategy "
"at-least-once, set overflow strategy to reject-publish.",
[rabbit_misc:rs(QName)]),
dlh_at_most_once(Exchange, RoutingKey, QName);
dlh(Exchange, RoutingKey, _, _, QName, _) ->
dlh(Exchange, RoutingKey, _, _, QName) ->
dlh_at_most_once(Exchange, RoutingKey, QName).

dlh_at_most_once(Exchange, RoutingKey, QName) ->
Expand Down Expand Up @@ -1940,11 +1945,11 @@ update_type_state(Q, Fun) when ?is_amqqueue(Q) ->
Ts = amqqueue:get_type_state(Q),
amqqueue:set_type_state(Q, Fun(Ts)).

overflow(undefined, Def, _QName, _ShouldLog) -> Def;
overflow(<<"reject-publish">>, _Def, _QName, _ShouldLog) -> reject_publish;
overflow(<<"drop-head">>, _Def, _QName, _ShouldLog) -> drop_head;
overflow(<<"reject-publish-dlx">> = V, Def, QName, ShouldLog) ->
maybe_log(ShouldLog, warning, "Invalid overflow strategy ~tp for quorum queue: ~ts",
overflow(undefined, Def, _QName) -> Def;
overflow(<<"reject-publish">>, _Def, _QName) -> reject_publish;
overflow(<<"drop-head">>, _Def, _QName) -> drop_head;
overflow(<<"reject-publish-dlx">> = V, Def, QName) ->
rabbit_log:warning("Invalid overflow strategy ~tp for quorum queue: ~ts",
[V, rabbit_misc:rs(QName)]),
Def.

Expand Down Expand Up @@ -2092,7 +2097,3 @@ file_handle_other_reservation() ->
file_handle_release_reservation() ->
ok.

maybe_log(true, Level, Msg, Args) ->
rabbit_log:Level(Msg, Args);
maybe_log(false, _, _, _) ->
ok.
29 changes: 11 additions & 18 deletions deps/rabbit/test/quorum_queue_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -1425,25 +1425,18 @@ policy_repair(Config) ->

% Wait for the queue to be available again.
lists:foreach(fun(Srv) ->
GetPidUntil = fun GetPidUntil() ->
case
rabbit_ct_broker_helpers:rpc(
Config,
Srv,
erlang,
whereis,
[RaName])
of
undefined ->
timer:sleep(500),
GetPidUntil();
Pid when is_pid(Pid) ->
ok
end
rabbit_ct_helpers:await_condition(
fun () ->
is_pid(
rabbit_ct_broker_helpers:rpc(
Config,
Srv,
erlang,
whereis,
[RaName]))
end)
end,
GetPidUntil()
end,
Servers),
Servers),

% Wait for the policy to apply
?awaitMatch({ok, {_, #{config := #{max_length := ExpectedMaxLength3}}}, _},
Expand Down

0 comments on commit 9dc9f97

Please sign in to comment.