Skip to content

Commit

Permalink
Consider QQs may let pass 1st overflowing msg
Browse files Browse the repository at this point in the history
  • Loading branch information
LoisSotoLopez committed Sep 30, 2024
1 parent 299fd31 commit 6d78078
Showing 1 changed file with 49 additions and 33 deletions.
82 changes: 49 additions & 33 deletions deps/rabbit/test/quorum_queue_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -1176,7 +1176,7 @@ policy_repair(Config) ->
<<"/">>,
<<QQ/binary, "_1">>,
QQ,
[{<<"max-length">>, ExpectedMaxLength1}],
[{<<"max-length">>, ExpectedMaxLength1}, {<<"overflow">>, <<"reject-publish">>}],
Priority1,
<<"quorum_queues">>,
<<"acting-user">>
Expand All @@ -1194,13 +1194,17 @@ policy_repair(Config) ->
% Checking twice to ensure consistency
%
% Once
publish_many(Ch, QQ, ExpectedMaxLength1 + 1, call),
timer:sleep(100),
ExpectedMaxLength1 = length(consume_all(Ch, QQ)),
{GottenOks1, GottenFails1} = publish_confirm_many(Ch, QQ, ExpectedMaxLength1 + 1),
ct:pal("GottenOks1: ~p, GottenFails1: ~p", [GottenOks1, GottenFails1]),
?assert((GottenOks1 =:= ExpectedMaxLength1) or (GottenOks1 =:= ExpectedMaxLength1 + 1)),
?assert((GottenFails1 =:= 1) or (GottenFails1 =:= 0)),
consume_all(Ch, QQ),
% Twice
publish_many(Ch, QQ, ExpectedMaxLength1 + 10, call),
timer:sleep(100),
ExpectedMaxLength1 = length(consume_all(Ch, QQ)),
{GottenOks2, GottenFails2} = publish_confirm_many(Ch, QQ, ExpectedMaxLength1 + 10),
ct:pal("GottenOks2: ~p, GottenFails2: ~p", [GottenOks2, GottenFails2]),
?assert((GottenOks2 =:= ExpectedMaxLength1) or (GottenOks2 =:= ExpectedMaxLength1 + 1)),
?assert((GottenFails2 =:= 10) or (GottenFails2 =:= 9)),
consume_all(Ch, QQ),

% Set higher priority policy, allowing more messages
ExpectedMaxLength2 = 20,
Expand All @@ -1214,7 +1218,7 @@ policy_repair(Config) ->
<<"/">>,
<<QQ/binary, "_2">>,
QQ,
[{<<"max-length">>, ExpectedMaxLength2}],
[{<<"max-length">>, ExpectedMaxLength2}, {<<"overflow">>, <<"reject-publish">>}],
Priority2,
<<"quorum_queues">>,
<<"acting-user">>
Expand All @@ -1231,13 +1235,17 @@ policy_repair(Config) ->
% Checking twice to ensure consistency.
%
% Once
publish_many(Ch, QQ, ExpectedMaxLength2 + 1),
timer:sleep(100),
ExpectedMaxLength2 = length(consume_all(Ch, QQ)),
{GottenOks3, GottenFails3} = publish_confirm_many(Ch, QQ, ExpectedMaxLength2 + 1),
ct:pal("GottenOks3: ~p, GottenFails3: ~p", [GottenOks3, GottenFails3]),
?assert((GottenOks3 =:= ExpectedMaxLength2) or (GottenOks3 =:= ExpectedMaxLength2 + 1)),
?assert((GottenFails3 =:= 1) or (GottenFails3 =:= 0)),
consume_all(Ch, QQ),
% Twice
publish_many(Ch, QQ, ExpectedMaxLength2 + 10),
timer:sleep(100),
ExpectedMaxLength2 = length(consume_all(Ch, QQ)),
{GottenOks4, GottenFails4} = publish_confirm_many(Ch, QQ, ExpectedMaxLength2 + 10),
ct:pal("GottenOks4: ~p, GottenFails4: ~p", [GottenOks4, GottenFails4]),
?assert((GottenOks4 =:= ExpectedMaxLength2) or (GottenOks4 =:= ExpectedMaxLength2 + 1)),
?assert((GottenFails4 =:= 10) or (GottenFails4 =:= 9)),
consume_all(Ch, QQ),

% Make the queue process unavailable.
% Kill the process multiple times until its supervisor stops restarting it.
Expand Down Expand Up @@ -1282,7 +1290,7 @@ policy_repair(Config) ->
<<"/">>,
<<QQ/binary, "_3">>,
QQ,
[{<<"max-length">>, ExpectedMaxLength3}],
[{<<"max-length">>, ExpectedMaxLength3}, {<<"overflow">>, <<"reject-publish">>}],
Priority3,
<<"quorum_queues">>,
<<"acting-user">>
Expand Down Expand Up @@ -1329,22 +1337,28 @@ policy_repair(Config) ->
GetPidUntil()
end,
Servers),

timer:sleep(1000),

% Wait for the policy to apply
?awaitMatch({ok, {_, #{config := #{max_length := ExpectedMaxLength3}}}, _},
rpc:call(Server0, ra, local_query, [RaName, QueryFun]),
?DEFAULT_AWAIT),

% Check the policy has been applied
% Insert MaxLength3 + some messages but after consuming all messages only
% MaxLength3 are retrieved.
% Checking twice to ensure consistency.
%
% Once
publish_many(Ch, QQ, ExpectedMaxLength3 + 1, call),
timer:sleep(100),
ExpectedMaxLength3 = length(consume_all(Ch, QQ)),
{GottenOks5, GottenFails5} = publish_confirm_many(Ch, QQ, ExpectedMaxLength3 + 1),
ct:pal("GottenOks5: ~p, GottenFails5: ~p", [GottenOks5, GottenFails5]),
?assert((GottenOks5 =:= ExpectedMaxLength3) or (GottenOks5 =:= ExpectedMaxLength3 + 1)),
?assert((GottenFails5 =:= 1) or (GottenFails5 =:= 0)),
consume_all(Ch, QQ),
% Twice
publish_many(Ch, QQ, ExpectedMaxLength3 + 10, call),
timer:sleep(100),
ExpectedMaxLength3 = length(consume_all(Ch, QQ)).
{GottenOks6, GottenFails6} = publish_confirm_many(Ch, QQ, ExpectedMaxLength3 + 10),
ct:pal("GottenOks6: ~p, GottenFails6: ~p", [GottenOks6, GottenFails6]),
?assert((GottenOks6 =:= ExpectedMaxLength3) or (GottenOks6 =:= ExpectedMaxLength3 + 1)),
?assert((GottenFails6 =:= 10) or (GottenFails6 =:= 9)).

priority_queue_fifo(Config) ->
%% testing: if hi priority messages are published before lo priority
Expand Down Expand Up @@ -4183,19 +4197,13 @@ count_online_nodes(Server, VHost, Q0) ->
length(proplists:get_value(online, Info, [])).

publish_many(Ch, Queue, Count) ->
publish_many(Ch, Queue, Count, cast).

publish_many(Ch, Queue, Count, Method) ->
[publish(Ch, Queue, Method) || _ <- lists:seq(1, Count)].
[publish(Ch, Queue) || _ <- lists:seq(1, Count)].

publish(Ch, Queue) ->
publish(Ch, Queue, cast).
publish(Ch, Queue, <<"msg">>).

publish(Ch, Queue, Method) ->
publish(Ch, Queue, <<"msg">>, Method).

publish(Ch, Queue, Msg, Method) when Method =:= cast; Method =:= call ->
ok = amqp_channel:Method(Ch,
publish(Ch, Queue, Msg) ->
ok = amqp_channel:cast(Ch,
#'basic.publish'{routing_key = Queue},
#amqp_msg{props = #'P_basic'{delivery_mode = 2},
payload = Msg}).
Expand Down Expand Up @@ -4354,6 +4362,14 @@ lists_interleave([Item | Items], List)
{Left, Right} = lists:split(2, List),
Left ++ [Item | lists_interleave(Items, Right)].

publish_confirm_many(Ch, Queue, Count) ->
lists:foldl(fun(_, {Oks, Fails}) ->
case publish_confirm(Ch, Queue) of
ok -> {Oks + 1, Fails};
_ -> {Oks, Fails + 1}
end
end, {0,0}, lists:seq(1, Count)).

consume_all(Ch, QQ) ->
Consume = fun C(Acc) ->
case amqp_channel:call(Ch, #'basic.get'{queue = QQ}) of
Expand Down

0 comments on commit 6d78078

Please sign in to comment.