From f5fbb8c8b27cc34e3a7b0ab8a8ad59e8cec4d737 Mon Sep 17 00:00:00 2001 From: Michael Klishin Date: Thu, 13 Apr 2023 11:20:51 +0400 Subject: [PATCH] Revert "REVERTED Pass the message to `rabbit_backing_queue:discard` callback " --- deps/rabbit/src/rabbit_amqqueue_process.erl | 10 ++++----- deps/rabbit/src/rabbit_backing_queue.erl | 2 +- .../rabbit/src/rabbit_mirror_queue_master.erl | 13 +++++------ deps/rabbit/src/rabbit_mirror_queue_slave.erl | 5 ++--- deps/rabbit/src/rabbit_priority_queue.erl | 22 ++++++++++++++----- deps/rabbit/src/rabbit_variable_queue.erl | 2 +- 6 files changed, 30 insertions(+), 24 deletions(-) diff --git a/deps/rabbit/src/rabbit_amqqueue_process.erl b/deps/rabbit/src/rabbit_amqqueue_process.erl index 6a30e5dc7825..9b8d7ee6db62 100644 --- a/deps/rabbit/src/rabbit_amqqueue_process.erl +++ b/deps/rabbit/src/rabbit_amqqueue_process.erl @@ -641,13 +641,12 @@ send_mandatory(#delivery{mandatory = true, discard(#delivery{confirm = Confirm, sender = SenderPid, flow = Flow, - message = Message}, BQ, BQS, MTC, QName) -> - #basic_message{id = MsgId} = Message, + message = #basic_message{id = MsgId}}, BQ, BQS, MTC, QName) -> MTC1 = case Confirm of true -> confirm_messages([MsgId], MTC, QName); false -> MTC end, - BQS1 = BQ:discard(Message, SenderPid, Flow, BQS), + BQS1 = BQ:discard(MsgId, SenderPid, Flow, BQS), {BQS1, MTC1}. run_message_queue(State) -> run_message_queue(false, State). @@ -810,7 +809,7 @@ send_reject_publish(#delivery{confirm = true, sender = SenderPid, flow = Flow, msg_seq_no = MsgSeqNo, - message = Message}, + message = #basic_message{id = MsgId}}, _Delivered, State = #q{ q = Q, backing_queue = BQ, @@ -819,9 +818,8 @@ send_reject_publish(#delivery{confirm = true, ok = rabbit_classic_queue:send_rejection(SenderPid, amqqueue:get_name(Q), MsgSeqNo), - #basic_message{id = MsgId} = Message, MTC1 = maps:remove(MsgId, MTC), - BQS1 = BQ:discard(Message, SenderPid, Flow, BQS), + BQS1 = BQ:discard(MsgId, SenderPid, Flow, BQS), State#q{ backing_queue_state = BQS1, msg_id_to_channel = MTC1 }; send_reject_publish(#delivery{confirm = false}, _Delivered, State) -> diff --git a/deps/rabbit/src/rabbit_backing_queue.erl b/deps/rabbit/src/rabbit_backing_queue.erl index 4dc8d871aab6..1ce0bca7cde3 100644 --- a/deps/rabbit/src/rabbit_backing_queue.erl +++ b/deps/rabbit/src/rabbit_backing_queue.erl @@ -117,7 +117,7 @@ %% Called to inform the BQ about messages which have reached the %% queue, but are not going to be further passed to BQ. --callback discard(rabbit_types:basic_message(), pid(), flow(), state()) -> state(). +-callback discard(rabbit_types:msg_id(), pid(), flow(), state()) -> state(). %% Return ids of messages which have been confirmed since the last %% invocation of this function (or initialisation). diff --git a/deps/rabbit/src/rabbit_mirror_queue_master.erl b/deps/rabbit/src/rabbit_mirror_queue_master.erl index 316b8b0be739..e6e0efc91d3e 100644 --- a/deps/rabbit/src/rabbit_mirror_queue_master.erl +++ b/deps/rabbit/src/rabbit_mirror_queue_master.erl @@ -293,16 +293,15 @@ batch_publish_delivered(Publishes, ChPid, Flow, State1 = State #state { backing_queue_state = BQS1 }, {AckTags, ensure_monitoring(ChPid, State1)}. -discard(Message = #basic_message{id = MsgId}, - ChPid, Flow, State = #state { gm = GM, - backing_queue = BQ, - backing_queue_state = BQS, - seen_status = SS }) -> +discard(MsgId, ChPid, Flow, State = #state { gm = GM, + backing_queue = BQ, + backing_queue_state = BQS, + seen_status = SS }) -> false = maps:is_key(MsgId, SS), %% ASSERTION - ok = gm:broadcast(GM, {discard, ChPid, Flow, Message}), + ok = gm:broadcast(GM, {discard, ChPid, Flow, MsgId}), ensure_monitoring(ChPid, State #state { backing_queue_state = - BQ:discard(Message, ChPid, Flow, BQS) }). + BQ:discard(MsgId, ChPid, Flow, BQS) }). dropwhile(Pred, State = #state{backing_queue = BQ, backing_queue_state = BQS }) -> diff --git a/deps/rabbit/src/rabbit_mirror_queue_slave.erl b/deps/rabbit/src/rabbit_mirror_queue_slave.erl index 9b46298bdc5a..09440eafee99 100644 --- a/deps/rabbit/src/rabbit_mirror_queue_slave.erl +++ b/deps/rabbit/src/rabbit_mirror_queue_slave.erl @@ -957,12 +957,11 @@ process_instruction({batch_publish_delivered, ChPid, Flow, Publishes}, State) -> end, State1 #state { backing_queue_state = BQS1 }, MsgIdsAndAcks), {ok, State2}; -process_instruction({discard, ChPid, Flow, - Msg = #basic_message { id = MsgId }}, State) -> +process_instruction({discard, ChPid, Flow, MsgId}, State) -> maybe_flow_ack(ChPid, Flow), State1 = #state { backing_queue = BQ, backing_queue_state = BQS } = publish_or_discard(discarded, ChPid, MsgId, State), - BQS1 = BQ:discard(Msg, ChPid, Flow, BQS), + BQS1 = BQ:discard(MsgId, ChPid, Flow, BQS), {ok, State1 #state { backing_queue_state = BQS1 }}; process_instruction({drop, Length, Dropped, AckRequired}, State = #state { backing_queue = BQ, diff --git a/deps/rabbit/src/rabbit_priority_queue.erl b/deps/rabbit/src/rabbit_priority_queue.erl index f1477f533675..382b513bec22 100644 --- a/deps/rabbit/src/rabbit_priority_queue.erl +++ b/deps/rabbit/src/rabbit_priority_queue.erl @@ -249,12 +249,22 @@ batch_publish_delivered(Publishes, ChPid, Flow, State = #passthrough{bq = BQ, bqs = BQS}) -> ?passthrough2(batch_publish_delivered(Publishes, ChPid, Flow, BQS)). -discard(Msg, ChPid, Flow, State = #state{bq = BQ}) -> - pick1(fun (_P, BQSN) -> - BQ:discard(Msg, ChPid, Flow, BQSN) - end, Msg, State); -discard(Msg, ChPid, Flow, State = #passthrough{bq = BQ, bqs = BQS}) -> - ?passthrough1(discard(Msg, ChPid, Flow, BQS)). +%% TODO this is a hack. The BQ api does not give us enough information +%% here - if we had the Msg we could look at its priority and forward +%% to the appropriate sub-BQ. But we don't so we are stuck. +%% +%% But fortunately VQ ignores discard/4, so we can too, *assuming we +%% are talking to VQ*. discard/4 is used by HA, but that's "above" us +%% (if in use) so we don't break that either, just some hypothetical +%% alternate BQ implementation. +discard(_MsgId, _ChPid, _Flow, State = #state{}) -> + State; + %% We should have something a bit like this here: + %% pick1(fun (_P, BQSN) -> + %% BQ:discard(MsgId, ChPid, Flow, BQSN) + %% end, Msg, State); +discard(MsgId, ChPid, Flow, State = #passthrough{bq = BQ, bqs = BQS}) -> + ?passthrough1(discard(MsgId, ChPid, Flow, BQS)). drain_confirmed(State = #state{bq = BQ}) -> fold_append2(fun (_P, BQSN) -> BQ:drain_confirmed(BQSN) end, State); diff --git a/deps/rabbit/src/rabbit_variable_queue.erl b/deps/rabbit/src/rabbit_variable_queue.erl index d9abe673a6d0..9cf3af8716e5 100644 --- a/deps/rabbit/src/rabbit_variable_queue.erl +++ b/deps/rabbit/src/rabbit_variable_queue.erl @@ -588,7 +588,7 @@ batch_publish_delivered(Publishes, ChPid, Flow, State) -> State2 = ui(State1), {lists:reverse(SeqIds), a(maybe_update_rates(State2))}. -discard(_Msg, _ChPid, _Flow, State) -> State. +discard(_MsgId, _ChPid, _Flow, State) -> State. drain_confirmed(State = #vqstate { confirmed = C }) -> case sets:is_empty(C) of