diff --git a/deps/rabbit/src/rabbit_quorum_queue.erl b/deps/rabbit/src/rabbit_quorum_queue.erl index ac9ba48f3411..f63edc9a2449 100644 --- a/deps/rabbit/src/rabbit_quorum_queue.erl +++ b/deps/rabbit/src/rabbit_quorum_queue.erl @@ -316,26 +316,31 @@ declare_queue_error(Error, Queue, Leader, ActingUser) -> ra_machine(Q) -> {module, rabbit_fifo, ra_machine_config(Q)}. -gather_policy_config(Q, ShouldLog) -> +gather_policy_config(Q, IsQueueDeclaration) -> QName = amqqueue:get_name(Q), %% take the minimum value of the policy and the queue arg if present MaxLength = args_policy_lookup(<<"max-length">>, fun min/2, Q), OverflowBin = args_policy_lookup(<<"overflow">>, fun policy_has_precedence/2, Q), - Overflow = overflow(OverflowBin, drop_head, QName, ShouldLog), + Overflow = overflow(OverflowBin, drop_head, QName), MaxBytes = args_policy_lookup(<<"max-length-bytes">>, fun min/2, Q), DeliveryLimit = case args_policy_lookup(<<"delivery-limit">>, fun resolve_delivery_limit/2, Q) of undefined -> - maybe_log(ShouldLog, info, - "~ts: delivery_limit not set, defaulting to ~b", - [rabbit_misc:rs(QName), ?DEFAULT_DELIVERY_LIMIT]), + case IsQueueDeclaration of + true -> + rabbit_log:info( + "~ts: delivery_limit not set, defaulting to ~b", + [rabbit_misc:rs(QName), ?DEFAULT_DELIVERY_LIMIT]); + false -> + ok + end, ?DEFAULT_DELIVERY_LIMIT; DL -> DL end, Expires = args_policy_lookup(<<"expires">>, fun min/2, Q), MsgTTL = args_policy_lookup(<<"message-ttl">>, fun min/2, Q), - DeadLetterHandler = dead_letter_handler(Q, Overflow, ShouldLog), + DeadLetterHandler = dead_letter_handler(Q, Overflow), #{dead_letter_handler => DeadLetterHandler, max_length => MaxLength, max_bytes => MaxBytes, @@ -348,7 +353,7 @@ gather_policy_config(Q, ShouldLog) -> }. ra_machine_config(Q) when ?is_amqqueue(Q) -> - PolicyConfig = gather_policy_config(Q, _ShouldLog = true), + PolicyConfig = gather_policy_config(Q, true), QName = amqqueue:get_name(Q), {Name, _} = amqqueue:get_pid(Q), PolicyConfig#{ @@ -721,7 +726,7 @@ system_recover(quorum_queues) -> end. maybe_apply_policies(Q, #{config := CurrentConfig}) -> - NewPolicyConfig = gather_policy_config(Q, _ShoudLog = false), + NewPolicyConfig = gather_policy_config(Q, false), RelevantKeys = maps:keys(NewPolicyConfig), CurrentPolicyConfig = maps:with(RelevantKeys, CurrentConfig), @@ -729,7 +734,7 @@ maybe_apply_policies(Q, #{config := CurrentConfig}) -> ShouldUpdate = NewPolicyConfig =/= CurrentPolicyConfig, case ShouldUpdate of true -> - rabbit_log:debug("Re-applying policies to ~p", [amqqueue:get_name(Q)]), + rabbit_log:debug("Re-applying policies to ~ts", [rabbit_misc:rs(amqqueue:get_name(Q))]), policy_changed(Q), ok; false -> ok @@ -1557,35 +1562,35 @@ reclaim_memory(Vhost, QueueName) -> ra_log_wal:force_roll_over({?RA_WAL_NAME, Node}). %%---------------------------------------------------------------------------- -dead_letter_handler(Q, Overflow, ShouldLog) -> +dead_letter_handler(Q, Overflow) -> Exchange = args_policy_lookup(<<"dead-letter-exchange">>, fun queue_arg_has_precedence/2, Q), RoutingKey = args_policy_lookup(<<"dead-letter-routing-key">>, fun queue_arg_has_precedence/2, Q), Strategy = args_policy_lookup(<<"dead-letter-strategy">>, fun queue_arg_has_precedence/2, Q), QName = amqqueue:get_name(Q), - dlh(Exchange, RoutingKey, Strategy, Overflow, QName, ShouldLog). + dlh(Exchange, RoutingKey, Strategy, Overflow, QName). -dlh(undefined, undefined, undefined, _, _, _) -> +dlh(undefined, undefined, undefined, _, _) -> undefined; -dlh(undefined, RoutingKey, undefined, _, QName, ShouldLog) -> - maybe_log(ShouldLog, warning, "Disabling dead-lettering for ~ts despite configured dead-letter-routing-key '~ts' " +dlh(undefined, RoutingKey, undefined, _, QName) -> + rabbit_log:warning("Disabling dead-lettering for ~ts despite configured dead-letter-routing-key '~ts' " "because dead-letter-exchange is not configured.", [rabbit_misc:rs(QName), RoutingKey]), undefined; -dlh(undefined, _, Strategy, _, QName, ShouldLog) -> - maybe_log(ShouldLog, warning, "Disabling dead-lettering for ~ts despite configured dead-letter-strategy '~ts' " +dlh(undefined, _, Strategy, _, QName) -> + rabbit_log:warning("Disabling dead-lettering for ~ts despite configured dead-letter-strategy '~ts' " "because dead-letter-exchange is not configured.", [rabbit_misc:rs(QName), Strategy]), undefined; -dlh(_, _, <<"at-least-once">>, reject_publish, _, _) -> +dlh(_, _, <<"at-least-once">>, reject_publish, _) -> at_least_once; -dlh(Exchange, RoutingKey, <<"at-least-once">>, drop_head, QName, ShouldLog) -> - maybe_log(ShouldLog, warning, "Falling back to dead-letter-strategy at-most-once for ~ts " +dlh(Exchange, RoutingKey, <<"at-least-once">>, drop_head, QName) -> + rabbit_log:warning("Falling back to dead-letter-strategy at-most-once for ~ts " "because configured dead-letter-strategy at-least-once is incompatible with " "effective overflow strategy drop-head. To enable dead-letter-strategy " "at-least-once, set overflow strategy to reject-publish.", [rabbit_misc:rs(QName)]), dlh_at_most_once(Exchange, RoutingKey, QName); -dlh(Exchange, RoutingKey, _, _, QName, _) -> +dlh(Exchange, RoutingKey, _, _, QName) -> dlh_at_most_once(Exchange, RoutingKey, QName). dlh_at_most_once(Exchange, RoutingKey, QName) -> @@ -1940,11 +1945,11 @@ update_type_state(Q, Fun) when ?is_amqqueue(Q) -> Ts = amqqueue:get_type_state(Q), amqqueue:set_type_state(Q, Fun(Ts)). -overflow(undefined, Def, _QName, _ShouldLog) -> Def; -overflow(<<"reject-publish">>, _Def, _QName, _ShouldLog) -> reject_publish; -overflow(<<"drop-head">>, _Def, _QName, _ShouldLog) -> drop_head; -overflow(<<"reject-publish-dlx">> = V, Def, QName, ShouldLog) -> - maybe_log(ShouldLog, warning, "Invalid overflow strategy ~tp for quorum queue: ~ts", +overflow(undefined, Def, _QName) -> Def; +overflow(<<"reject-publish">>, _Def, _QName) -> reject_publish; +overflow(<<"drop-head">>, _Def, _QName) -> drop_head; +overflow(<<"reject-publish-dlx">> = V, Def, QName) -> + rabbit_log:warning("Invalid overflow strategy ~tp for quorum queue: ~ts", [V, rabbit_misc:rs(QName)]), Def. @@ -2092,7 +2097,3 @@ file_handle_other_reservation() -> file_handle_release_reservation() -> ok. -maybe_log(true, Level, Msg, Args) -> - rabbit_log:Level(Msg, Args); -maybe_log(false, _, _, _) -> - ok. diff --git a/deps/rabbit/test/quorum_queue_SUITE.erl b/deps/rabbit/test/quorum_queue_SUITE.erl index b2829f267c27..718754cd4eb8 100644 --- a/deps/rabbit/test/quorum_queue_SUITE.erl +++ b/deps/rabbit/test/quorum_queue_SUITE.erl @@ -1425,25 +1425,18 @@ policy_repair(Config) -> % Wait for the queue to be available again. lists:foreach(fun(Srv) -> - GetPidUntil = fun GetPidUntil() -> - case - rabbit_ct_broker_helpers:rpc( - Config, - Srv, - erlang, - whereis, - [RaName]) - of - undefined -> - timer:sleep(500), - GetPidUntil(); - Pid when is_pid(Pid) -> - ok - end + rabbit_ct_helpers:await_condition( + fun () -> + is_pid( + rabbit_ct_broker_helpers:rpc( + Config, + Srv, + erlang, + whereis, + [RaName])) + end) end, - GetPidUntil() - end, - Servers), + Servers), % Wait for the policy to apply ?awaitMatch({ok, {_, #{config := #{max_length := ExpectedMaxLength3}}}, _},