Skip to content

Commit

Permalink
Merge pull request #7802 from noxdafox/queue_discard
Browse files Browse the repository at this point in the history
Pass the message to `rabbit_backing_queue:discard` callback
  • Loading branch information
michaelklishin authored Apr 4, 2023
2 parents fded279 + 4f45b8d commit 7337689
Show file tree
Hide file tree
Showing 6 changed files with 24 additions and 30 deletions.
10 changes: 6 additions & 4 deletions deps/rabbit/src/rabbit_amqqueue_process.erl
Original file line number Diff line number Diff line change
Expand Up @@ -641,12 +641,13 @@ send_mandatory(#delivery{mandatory = true,
discard(#delivery{confirm = Confirm,
sender = SenderPid,
flow = Flow,
message = #basic_message{id = MsgId}}, BQ, BQS, MTC, QName) ->
message = Message}, BQ, BQS, MTC, QName) ->
#basic_message{id = MsgId} = Message,
MTC1 = case Confirm of
true -> confirm_messages([MsgId], MTC, QName);
false -> MTC
end,
BQS1 = BQ:discard(MsgId, SenderPid, Flow, BQS),
BQS1 = BQ:discard(Message, SenderPid, Flow, BQS),
{BQS1, MTC1}.

run_message_queue(State) -> run_message_queue(false, State).
Expand Down Expand Up @@ -809,7 +810,7 @@ send_reject_publish(#delivery{confirm = true,
sender = SenderPid,
flow = Flow,
msg_seq_no = MsgSeqNo,
message = #basic_message{id = MsgId}},
message = Message},
_Delivered,
State = #q{ q = Q,
backing_queue = BQ,
Expand All @@ -818,8 +819,9 @@ send_reject_publish(#delivery{confirm = true,
ok = rabbit_classic_queue:send_rejection(SenderPid,
amqqueue:get_name(Q), MsgSeqNo),

#basic_message{id = MsgId} = Message,
MTC1 = maps:remove(MsgId, MTC),
BQS1 = BQ:discard(MsgId, SenderPid, Flow, BQS),
BQS1 = BQ:discard(Message, SenderPid, Flow, BQS),
State#q{ backing_queue_state = BQS1, msg_id_to_channel = MTC1 };
send_reject_publish(#delivery{confirm = false},
_Delivered, State) ->
Expand Down
2 changes: 1 addition & 1 deletion deps/rabbit/src/rabbit_backing_queue.erl
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@

%% Called to inform the BQ about messages which have reached the
%% queue, but are not going to be further passed to BQ.
-callback discard(rabbit_types:msg_id(), pid(), flow(), state()) -> state().
-callback discard(rabbit_types:basic_message(), pid(), flow(), state()) -> state().

%% Return ids of messages which have been confirmed since the last
%% invocation of this function (or initialisation).
Expand Down
13 changes: 7 additions & 6 deletions deps/rabbit/src/rabbit_mirror_queue_master.erl
Original file line number Diff line number Diff line change
Expand Up @@ -293,15 +293,16 @@ batch_publish_delivered(Publishes, ChPid, Flow,
State1 = State #state { backing_queue_state = BQS1 },
{AckTags, ensure_monitoring(ChPid, State1)}.

discard(MsgId, ChPid, Flow, State = #state { gm = GM,
backing_queue = BQ,
backing_queue_state = BQS,
seen_status = SS }) ->
discard(Message = #basic_message{id = MsgId},
ChPid, Flow, State = #state { gm = GM,
backing_queue = BQ,
backing_queue_state = BQS,
seen_status = SS }) ->
false = maps:is_key(MsgId, SS), %% ASSERTION
ok = gm:broadcast(GM, {discard, ChPid, Flow, MsgId}),
ok = gm:broadcast(GM, {discard, ChPid, Flow, Message}),
ensure_monitoring(ChPid,
State #state { backing_queue_state =
BQ:discard(MsgId, ChPid, Flow, BQS) }).
BQ:discard(Message, ChPid, Flow, BQS) }).

dropwhile(Pred, State = #state{backing_queue = BQ,
backing_queue_state = BQS }) ->
Expand Down
5 changes: 3 additions & 2 deletions deps/rabbit/src/rabbit_mirror_queue_slave.erl
Original file line number Diff line number Diff line change
Expand Up @@ -957,11 +957,12 @@ process_instruction({batch_publish_delivered, ChPid, Flow, Publishes}, State) ->
end, State1 #state { backing_queue_state = BQS1 },
MsgIdsAndAcks),
{ok, State2};
process_instruction({discard, ChPid, Flow, MsgId}, State) ->
process_instruction({discard, ChPid, Flow,
Msg = #basic_message { id = MsgId }}, State) ->
maybe_flow_ack(ChPid, Flow),
State1 = #state { backing_queue = BQ, backing_queue_state = BQS } =
publish_or_discard(discarded, ChPid, MsgId, State),
BQS1 = BQ:discard(MsgId, ChPid, Flow, BQS),
BQS1 = BQ:discard(Msg, ChPid, Flow, BQS),
{ok, State1 #state { backing_queue_state = BQS1 }};
process_instruction({drop, Length, Dropped, AckRequired},
State = #state { backing_queue = BQ,
Expand Down
22 changes: 6 additions & 16 deletions deps/rabbit/src/rabbit_priority_queue.erl
Original file line number Diff line number Diff line change
Expand Up @@ -249,22 +249,12 @@ batch_publish_delivered(Publishes, ChPid, Flow,
State = #passthrough{bq = BQ, bqs = BQS}) ->
?passthrough2(batch_publish_delivered(Publishes, ChPid, Flow, BQS)).

%% TODO this is a hack. The BQ api does not give us enough information
%% here - if we had the Msg we could look at its priority and forward
%% to the appropriate sub-BQ. But we don't so we are stuck.
%%
%% But fortunately VQ ignores discard/4, so we can too, *assuming we
%% are talking to VQ*. discard/4 is used by HA, but that's "above" us
%% (if in use) so we don't break that either, just some hypothetical
%% alternate BQ implementation.
discard(_MsgId, _ChPid, _Flow, State = #state{}) ->
State;
%% We should have something a bit like this here:
%% pick1(fun (_P, BQSN) ->
%% BQ:discard(MsgId, ChPid, Flow, BQSN)
%% end, Msg, State);
discard(MsgId, ChPid, Flow, State = #passthrough{bq = BQ, bqs = BQS}) ->
?passthrough1(discard(MsgId, ChPid, Flow, BQS)).
discard(Msg, ChPid, Flow, State = #state{bq = BQ}) ->
pick1(fun (_P, BQSN) ->
BQ:discard(Msg, ChPid, Flow, BQSN)
end, Msg, State);
discard(Msg, ChPid, Flow, State = #passthrough{bq = BQ, bqs = BQS}) ->
?passthrough1(discard(Msg, ChPid, Flow, BQS)).

drain_confirmed(State = #state{bq = BQ}) ->
fold_append2(fun (_P, BQSN) -> BQ:drain_confirmed(BQSN) end, State);
Expand Down
2 changes: 1 addition & 1 deletion deps/rabbit/src/rabbit_variable_queue.erl
Original file line number Diff line number Diff line change
Expand Up @@ -588,7 +588,7 @@ batch_publish_delivered(Publishes, ChPid, Flow, State) ->
State2 = ui(State1),
{lists:reverse(SeqIds), a(maybe_update_rates(State2))}.

discard(_MsgId, _ChPid, _Flow, State) -> State.
discard(_Msg, _ChPid, _Flow, State) -> State.

drain_confirmed(State = #vqstate { confirmed = C }) ->
case sets:is_empty(C) of
Expand Down

0 comments on commit 7337689

Please sign in to comment.