Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Pass the message to rabbit_backing_queue:discard callback (backport #7802) #7816

Merged
merged 2 commits into from
Apr 4, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 6 additions & 4 deletions deps/rabbit/src/rabbit_amqqueue_process.erl
Original file line number Diff line number Diff line change
Expand Up @@ -641,12 +641,13 @@ send_mandatory(#delivery{mandatory = true,
discard(#delivery{confirm = Confirm,
sender = SenderPid,
flow = Flow,
message = #basic_message{id = MsgId}}, BQ, BQS, MTC, QName) ->
message = Message}, BQ, BQS, MTC, QName) ->
#basic_message{id = MsgId} = Message,
MTC1 = case Confirm of
true -> confirm_messages([MsgId], MTC, QName);
false -> MTC
end,
BQS1 = BQ:discard(MsgId, SenderPid, Flow, BQS),
BQS1 = BQ:discard(Message, SenderPid, Flow, BQS),
{BQS1, MTC1}.

run_message_queue(State) -> run_message_queue(false, State).
Expand Down Expand Up @@ -809,7 +810,7 @@ send_reject_publish(#delivery{confirm = true,
sender = SenderPid,
flow = Flow,
msg_seq_no = MsgSeqNo,
message = #basic_message{id = MsgId}},
message = Message},
_Delivered,
State = #q{ q = Q,
backing_queue = BQ,
Expand All @@ -818,8 +819,9 @@ send_reject_publish(#delivery{confirm = true,
ok = rabbit_classic_queue:send_rejection(SenderPid,
amqqueue:get_name(Q), MsgSeqNo),

#basic_message{id = MsgId} = Message,
MTC1 = maps:remove(MsgId, MTC),
BQS1 = BQ:discard(MsgId, SenderPid, Flow, BQS),
BQS1 = BQ:discard(Message, SenderPid, Flow, BQS),
State#q{ backing_queue_state = BQS1, msg_id_to_channel = MTC1 };
send_reject_publish(#delivery{confirm = false},
_Delivered, State) ->
Expand Down
2 changes: 1 addition & 1 deletion deps/rabbit/src/rabbit_backing_queue.erl
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@

%% Called to inform the BQ about messages which have reached the
%% queue, but are not going to be further passed to BQ.
-callback discard(rabbit_types:msg_id(), pid(), flow(), state()) -> state().
-callback discard(rabbit_types:basic_message(), pid(), flow(), state()) -> state().

%% Return ids of messages which have been confirmed since the last
%% invocation of this function (or initialisation).
Expand Down
13 changes: 7 additions & 6 deletions deps/rabbit/src/rabbit_mirror_queue_master.erl
Original file line number Diff line number Diff line change
Expand Up @@ -293,15 +293,16 @@ batch_publish_delivered(Publishes, ChPid, Flow,
State1 = State #state { backing_queue_state = BQS1 },
{AckTags, ensure_monitoring(ChPid, State1)}.

discard(MsgId, ChPid, Flow, State = #state { gm = GM,
backing_queue = BQ,
backing_queue_state = BQS,
seen_status = SS }) ->
discard(Message = #basic_message{id = MsgId},
ChPid, Flow, State = #state { gm = GM,
backing_queue = BQ,
backing_queue_state = BQS,
seen_status = SS }) ->
false = maps:is_key(MsgId, SS), %% ASSERTION
ok = gm:broadcast(GM, {discard, ChPid, Flow, MsgId}),
ok = gm:broadcast(GM, {discard, ChPid, Flow, Message}),
ensure_monitoring(ChPid,
State #state { backing_queue_state =
BQ:discard(MsgId, ChPid, Flow, BQS) }).
BQ:discard(Message, ChPid, Flow, BQS) }).

dropwhile(Pred, State = #state{backing_queue = BQ,
backing_queue_state = BQS }) ->
Expand Down
5 changes: 3 additions & 2 deletions deps/rabbit/src/rabbit_mirror_queue_slave.erl
Original file line number Diff line number Diff line change
Expand Up @@ -957,11 +957,12 @@ process_instruction({batch_publish_delivered, ChPid, Flow, Publishes}, State) ->
end, State1 #state { backing_queue_state = BQS1 },
MsgIdsAndAcks),
{ok, State2};
process_instruction({discard, ChPid, Flow, MsgId}, State) ->
process_instruction({discard, ChPid, Flow,
Msg = #basic_message { id = MsgId }}, State) ->
maybe_flow_ack(ChPid, Flow),
State1 = #state { backing_queue = BQ, backing_queue_state = BQS } =
publish_or_discard(discarded, ChPid, MsgId, State),
BQS1 = BQ:discard(MsgId, ChPid, Flow, BQS),
BQS1 = BQ:discard(Msg, ChPid, Flow, BQS),
{ok, State1 #state { backing_queue_state = BQS1 }};
process_instruction({drop, Length, Dropped, AckRequired},
State = #state { backing_queue = BQ,
Expand Down
22 changes: 6 additions & 16 deletions deps/rabbit/src/rabbit_priority_queue.erl
Original file line number Diff line number Diff line change
Expand Up @@ -249,22 +249,12 @@ batch_publish_delivered(Publishes, ChPid, Flow,
State = #passthrough{bq = BQ, bqs = BQS}) ->
?passthrough2(batch_publish_delivered(Publishes, ChPid, Flow, BQS)).

%% TODO this is a hack. The BQ api does not give us enough information
%% here - if we had the Msg we could look at its priority and forward
%% to the appropriate sub-BQ. But we don't so we are stuck.
%%
%% But fortunately VQ ignores discard/4, so we can too, *assuming we
%% are talking to VQ*. discard/4 is used by HA, but that's "above" us
%% (if in use) so we don't break that either, just some hypothetical
%% alternate BQ implementation.
discard(_MsgId, _ChPid, _Flow, State = #state{}) ->
State;
%% We should have something a bit like this here:
%% pick1(fun (_P, BQSN) ->
%% BQ:discard(MsgId, ChPid, Flow, BQSN)
%% end, Msg, State);
discard(MsgId, ChPid, Flow, State = #passthrough{bq = BQ, bqs = BQS}) ->
?passthrough1(discard(MsgId, ChPid, Flow, BQS)).
discard(Msg, ChPid, Flow, State = #state{bq = BQ}) ->
pick1(fun (_P, BQSN) ->
BQ:discard(Msg, ChPid, Flow, BQSN)
end, Msg, State);
discard(Msg, ChPid, Flow, State = #passthrough{bq = BQ, bqs = BQS}) ->
?passthrough1(discard(Msg, ChPid, Flow, BQS)).

drain_confirmed(State = #state{bq = BQ}) ->
fold_append2(fun (_P, BQSN) -> BQ:drain_confirmed(BQSN) end, State);
Expand Down
2 changes: 1 addition & 1 deletion deps/rabbit/src/rabbit_variable_queue.erl
Original file line number Diff line number Diff line change
Expand Up @@ -588,7 +588,7 @@ batch_publish_delivered(Publishes, ChPid, Flow, State) ->
State2 = ui(State1),
{lists:reverse(SeqIds), a(maybe_update_rates(State2))}.

discard(_MsgId, _ChPid, _Flow, State) -> State.
discard(_Msg, _ChPid, _Flow, State) -> State.

drain_confirmed(State = #vqstate { confirmed = C }) ->
case sets:is_empty(C) of
Expand Down