diff --git a/deps/rabbit/src/rabbit_amqqueue_process.erl b/deps/rabbit/src/rabbit_amqqueue_process.erl index 9b8d7ee6db62..6a30e5dc7825 100644 --- a/deps/rabbit/src/rabbit_amqqueue_process.erl +++ b/deps/rabbit/src/rabbit_amqqueue_process.erl @@ -641,12 +641,13 @@ send_mandatory(#delivery{mandatory = true, discard(#delivery{confirm = Confirm, sender = SenderPid, flow = Flow, - message = #basic_message{id = MsgId}}, BQ, BQS, MTC, QName) -> + message = Message}, BQ, BQS, MTC, QName) -> + #basic_message{id = MsgId} = Message, MTC1 = case Confirm of true -> confirm_messages([MsgId], MTC, QName); false -> MTC end, - BQS1 = BQ:discard(MsgId, SenderPid, Flow, BQS), + BQS1 = BQ:discard(Message, SenderPid, Flow, BQS), {BQS1, MTC1}. run_message_queue(State) -> run_message_queue(false, State). @@ -809,7 +810,7 @@ send_reject_publish(#delivery{confirm = true, sender = SenderPid, flow = Flow, msg_seq_no = MsgSeqNo, - message = #basic_message{id = MsgId}}, + message = Message}, _Delivered, State = #q{ q = Q, backing_queue = BQ, @@ -818,8 +819,9 @@ send_reject_publish(#delivery{confirm = true, ok = rabbit_classic_queue:send_rejection(SenderPid, amqqueue:get_name(Q), MsgSeqNo), + #basic_message{id = MsgId} = Message, MTC1 = maps:remove(MsgId, MTC), - BQS1 = BQ:discard(MsgId, SenderPid, Flow, BQS), + BQS1 = BQ:discard(Message, SenderPid, Flow, BQS), State#q{ backing_queue_state = BQS1, msg_id_to_channel = MTC1 }; send_reject_publish(#delivery{confirm = false}, _Delivered, State) -> diff --git a/deps/rabbit/src/rabbit_backing_queue.erl b/deps/rabbit/src/rabbit_backing_queue.erl index 1ce0bca7cde3..4dc8d871aab6 100644 --- a/deps/rabbit/src/rabbit_backing_queue.erl +++ b/deps/rabbit/src/rabbit_backing_queue.erl @@ -117,7 +117,7 @@ %% Called to inform the BQ about messages which have reached the %% queue, but are not going to be further passed to BQ. --callback discard(rabbit_types:msg_id(), pid(), flow(), state()) -> state(). +-callback discard(rabbit_types:basic_message(), pid(), flow(), state()) -> state(). %% Return ids of messages which have been confirmed since the last %% invocation of this function (or initialisation). diff --git a/deps/rabbit/src/rabbit_mirror_queue_master.erl b/deps/rabbit/src/rabbit_mirror_queue_master.erl index e6e0efc91d3e..316b8b0be739 100644 --- a/deps/rabbit/src/rabbit_mirror_queue_master.erl +++ b/deps/rabbit/src/rabbit_mirror_queue_master.erl @@ -293,15 +293,16 @@ batch_publish_delivered(Publishes, ChPid, Flow, State1 = State #state { backing_queue_state = BQS1 }, {AckTags, ensure_monitoring(ChPid, State1)}. -discard(MsgId, ChPid, Flow, State = #state { gm = GM, - backing_queue = BQ, - backing_queue_state = BQS, - seen_status = SS }) -> +discard(Message = #basic_message{id = MsgId}, + ChPid, Flow, State = #state { gm = GM, + backing_queue = BQ, + backing_queue_state = BQS, + seen_status = SS }) -> false = maps:is_key(MsgId, SS), %% ASSERTION - ok = gm:broadcast(GM, {discard, ChPid, Flow, MsgId}), + ok = gm:broadcast(GM, {discard, ChPid, Flow, Message}), ensure_monitoring(ChPid, State #state { backing_queue_state = - BQ:discard(MsgId, ChPid, Flow, BQS) }). + BQ:discard(Message, ChPid, Flow, BQS) }). dropwhile(Pred, State = #state{backing_queue = BQ, backing_queue_state = BQS }) -> diff --git a/deps/rabbit/src/rabbit_mirror_queue_slave.erl b/deps/rabbit/src/rabbit_mirror_queue_slave.erl index 09440eafee99..9b46298bdc5a 100644 --- a/deps/rabbit/src/rabbit_mirror_queue_slave.erl +++ b/deps/rabbit/src/rabbit_mirror_queue_slave.erl @@ -957,11 +957,12 @@ process_instruction({batch_publish_delivered, ChPid, Flow, Publishes}, State) -> end, State1 #state { backing_queue_state = BQS1 }, MsgIdsAndAcks), {ok, State2}; -process_instruction({discard, ChPid, Flow, MsgId}, State) -> +process_instruction({discard, ChPid, Flow, + Msg = #basic_message { id = MsgId }}, State) -> maybe_flow_ack(ChPid, Flow), State1 = #state { backing_queue = BQ, backing_queue_state = BQS } = publish_or_discard(discarded, ChPid, MsgId, State), - BQS1 = BQ:discard(MsgId, ChPid, Flow, BQS), + BQS1 = BQ:discard(Msg, ChPid, Flow, BQS), {ok, State1 #state { backing_queue_state = BQS1 }}; process_instruction({drop, Length, Dropped, AckRequired}, State = #state { backing_queue = BQ, diff --git a/deps/rabbit/src/rabbit_priority_queue.erl b/deps/rabbit/src/rabbit_priority_queue.erl index 382b513bec22..f1477f533675 100644 --- a/deps/rabbit/src/rabbit_priority_queue.erl +++ b/deps/rabbit/src/rabbit_priority_queue.erl @@ -249,22 +249,12 @@ batch_publish_delivered(Publishes, ChPid, Flow, State = #passthrough{bq = BQ, bqs = BQS}) -> ?passthrough2(batch_publish_delivered(Publishes, ChPid, Flow, BQS)). -%% TODO this is a hack. The BQ api does not give us enough information -%% here - if we had the Msg we could look at its priority and forward -%% to the appropriate sub-BQ. But we don't so we are stuck. -%% -%% But fortunately VQ ignores discard/4, so we can too, *assuming we -%% are talking to VQ*. discard/4 is used by HA, but that's "above" us -%% (if in use) so we don't break that either, just some hypothetical -%% alternate BQ implementation. -discard(_MsgId, _ChPid, _Flow, State = #state{}) -> - State; - %% We should have something a bit like this here: - %% pick1(fun (_P, BQSN) -> - %% BQ:discard(MsgId, ChPid, Flow, BQSN) - %% end, Msg, State); -discard(MsgId, ChPid, Flow, State = #passthrough{bq = BQ, bqs = BQS}) -> - ?passthrough1(discard(MsgId, ChPid, Flow, BQS)). +discard(Msg, ChPid, Flow, State = #state{bq = BQ}) -> + pick1(fun (_P, BQSN) -> + BQ:discard(Msg, ChPid, Flow, BQSN) + end, Msg, State); +discard(Msg, ChPid, Flow, State = #passthrough{bq = BQ, bqs = BQS}) -> + ?passthrough1(discard(Msg, ChPid, Flow, BQS)). drain_confirmed(State = #state{bq = BQ}) -> fold_append2(fun (_P, BQSN) -> BQ:drain_confirmed(BQSN) end, State); diff --git a/deps/rabbit/src/rabbit_variable_queue.erl b/deps/rabbit/src/rabbit_variable_queue.erl index 9cf3af8716e5..d9abe673a6d0 100644 --- a/deps/rabbit/src/rabbit_variable_queue.erl +++ b/deps/rabbit/src/rabbit_variable_queue.erl @@ -588,7 +588,7 @@ batch_publish_delivered(Publishes, ChPid, Flow, State) -> State2 = ui(State1), {lists:reverse(SeqIds), a(maybe_update_rates(State2))}. -discard(_MsgId, _ChPid, _Flow, State) -> State. +discard(_Msg, _ChPid, _Flow, State) -> State. drain_confirmed(State = #vqstate { confirmed = C }) -> case sets:is_empty(C) of