Skip to content

Commit

Permalink
Merge pull request rabbitmq#11230 from rabbitmq/dead-letter-bcc
Browse files Browse the repository at this point in the history
Remove BCC from x-death routing-keys
  • Loading branch information
ansd authored May 14, 2024
2 parents f122483 + 90a4010 commit 1c4af0c
Show file tree
Hide file tree
Showing 3 changed files with 47 additions and 16 deletions.
12 changes: 10 additions & 2 deletions deps/rabbit/src/mc.erl
Original file line number Diff line number Diff line change
Expand Up @@ -365,14 +365,22 @@ record_death(Reason, SourceQueue,
is_binary(SourceQueue) ->
Key = {SourceQueue, Reason},
#{?ANN_EXCHANGE := Exchange,
?ANN_ROUTING_KEYS := RoutingKeys} = Anns0,
?ANN_ROUTING_KEYS := RKeys0} = Anns0,
%% The routing keys that we record in the death history and will
%% report to the client should include CC, but exclude BCC.
RKeys = case Anns0 of
#{bcc := BccKeys} ->
RKeys0 -- BccKeys;
_ ->
RKeys0
end,
Timestamp = os:system_time(millisecond),
Ttl = maps:get(ttl, Anns0, undefined),
DeathAnns = rabbit_misc:maps_put_truthy(
ttl, Ttl, #{first_time => Timestamp,
last_time => Timestamp}),
NewDeath = #death{exchange = Exchange,
routing_keys = RoutingKeys,
routing_keys = RKeys,
count = 1,
anns = DeathAnns},
Anns = case Anns0 of
Expand Down
22 changes: 16 additions & 6 deletions deps/rabbit/src/mc_amqpl.erl
Original file line number Diff line number Diff line change
Expand Up @@ -51,10 +51,11 @@

%% mc implementation
init(#content{} = Content0) ->
Content = rabbit_binary_parser:ensure_content_decoded(Content0),
Content1 = rabbit_binary_parser:ensure_content_decoded(Content0),
%% project essential properties into annotations
Anns = essential_properties(Content),
{strip_header(Content, ?DELETED_HEADER), Anns}.
Anns = essential_properties(Content1),
Content = strip_header(Content1, ?DELETED_HEADER),
{Content, Anns}.

convert_from(mc_amqp, Sections, Env) ->
{H, MAnn, Prop, AProp, BodyRev, Footer} =
Expand Down Expand Up @@ -554,7 +555,7 @@ message(#resource{name = ExchangeNameBin},
Error;
HeaderRoutes ->
{ok, mc:init(?MODULE,
rabbit_basic:strip_bcc_header(Content),
Content,
Anns#{?ANN_ROUTING_KEYS => [RoutingKey | HeaderRoutes],
?ANN_EXCHANGE => ExchangeNameBin})}
end.
Expand Down Expand Up @@ -795,7 +796,8 @@ message_id(undefined, _HKey, H) ->
essential_properties(#content{} = C) ->
#'P_basic'{delivery_mode = Mode,
priority = Priority,
timestamp = TimestampRaw} = Props = C#content.properties,
timestamp = TimestampRaw,
headers = Headers} = Props = C#content.properties,
{ok, MsgTTL} = rabbit_basic:parse_expiration(Props),
Timestamp = case TimestampRaw of
undefined ->
Expand All @@ -805,6 +807,12 @@ essential_properties(#content{} = C) ->
TimestampRaw * 1000
end,
Durable = Mode == 2,
BccKeys = case rabbit_basic:header(<<"BCC">>, Headers) of
{<<"BCC">>, array, Routes} ->
[Route || {longstr, Route} <- Routes];
_ ->
undefined
end,
maps_put_truthy(
?ANN_PRIORITY, Priority,
maps_put_truthy(
Expand All @@ -813,7 +821,9 @@ essential_properties(#content{} = C) ->
?ANN_TIMESTAMP, Timestamp,
maps_put_falsy(
?ANN_DURABLE, Durable,
#{})))).
maps_put_truthy(
bcc, BccKeys,
#{}))))).

%% headers that are added as annotations during conversions
is_internal_header(<<"x-basic-", _/binary>>) ->
Expand Down
29 changes: 21 additions & 8 deletions deps/rabbit/test/dead_lettering_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -1379,17 +1379,18 @@ dead_letter_headers_BCC(Config) ->
routing_key = DLXQName}),

P1 = <<"msg1">>,
BCCHeader = {<<"BCC">>, array, [{longstr, DLXQName}]},
publish(Ch, QName, [P1], [BCCHeader]),
CCHeader = {<<"CC">>, array, [{longstr, <<"cc 1">>}, {longstr, <<"cc 2">>}]},
BCCHeader = {<<"BCC">>, array, [{longstr, DLXQName}, {longstr, <<"bcc 2">>}]},
publish(Ch, QName, [P1], [CCHeader, BCCHeader]),
%% Message is published to both queues because of BCC header and DLX queue bound to both
%% exchanges
wait_for_messages(Config, [[QName, <<"1">>, <<"1">>, <<"0">>]]),
{#'basic.get_ok'{delivery_tag = DTag1}, #amqp_msg{payload = P1,
props = #'P_basic'{headers = Headers1}}} =
amqp_channel:call(Ch, #'basic.get'{queue = QName}),
amqp_channel:call(Ch, #'basic.get'{queue = QName}),
{#'basic.get_ok'{}, #amqp_msg{payload = P1,
props = #'P_basic'{headers = Headers2}}} =
amqp_channel:call(Ch, #'basic.get'{queue = DLXQName}),
amqp_channel:call(Ch, #'basic.get'{queue = DLXQName}),
%% We check the headers to ensure no dead lettering has happened
?assertEqual(undefined, header_lookup(Headers1, <<"x-death">>)),
?assertEqual(undefined, header_lookup(Headers2, <<"x-death">>)),
Expand All @@ -1401,10 +1402,15 @@ dead_letter_headers_BCC(Config) ->
wait_for_messages(Config, [[DLXQName, <<"2">>, <<"1">>, <<"1">>]]),
{#'basic.get_ok'{}, #amqp_msg{payload = P1,
props = #'P_basic'{headers = Headers3}}} =
amqp_channel:call(Ch, #'basic.get'{queue = DLXQName}),
amqp_channel:call(Ch, #'basic.get'{queue = DLXQName}),
consume_empty(Ch, QName),
?assertEqual(undefined, rabbit_misc:table_lookup(Headers3, <<"BCC">>)),
?assertMatch({array, _}, rabbit_misc:table_lookup(Headers3, <<"x-death">>)).
{array, [{table, Death}]} = rabbit_misc:table_lookup(Headers3, <<"x-death">>),
{array, RKeys0} = rabbit_misc:table_lookup(Death, <<"routing-keys">>),
RKeys = [RKey || {longstr, RKey} <- RKeys0],
%% routing-keys in the death history should include CC but exclude BCC keys
?assertEqual(lists:sort([QName, <<"cc 1">>, <<"cc 2">>]),
lists:sort(RKeys)).

%% Three top-level headers are added for the very first dead-lettering event.
%% They are
Expand Down Expand Up @@ -1669,7 +1675,11 @@ stream(Config) ->
#'basic.publish'{routing_key = Q1},
#amqp_msg{payload = Payload,
props = #'P_basic'{expiration = <<"0">>,
headers = [{<<"CC">>, array, [{longstr, <<"other key">>}]}]}
headers = [{<<"CC">>, array, [{longstr, <<"cc 1">>},
{longstr, <<"cc 2">>}]},
{<<"BCC">>, array, [{longstr, <<"bcc 1">>},
{longstr, <<"bcc 2">>}]}
]}
}),

#'basic.qos_ok'{} = amqp_channel:call(Ch1, #'basic.qos'{prefetch_count = 1}),
Expand Down Expand Up @@ -1710,7 +1720,10 @@ stream(Config) ->
?assertEqual({longstr, Reason}, rabbit_misc:table_lookup(Death1, <<"reason">>)),
?assertEqual({longstr, <<>>}, rabbit_misc:table_lookup(Death1, <<"exchange">>)),
?assertEqual({long, 1}, rabbit_misc:table_lookup(Death1, <<"count">>)),
?assertEqual({array, [{longstr, Q1}, {longstr, <<"other key">>}]},
%% routing-keys in the death history should include CC but exclude BCC keys
?assertEqual({array, [{longstr, Q1},
{longstr, <<"cc 1">>},
{longstr, <<"cc 2">>}]},
rabbit_misc:table_lookup(Death1, <<"routing-keys">>)),
?assertEqual({longstr, <<"0">>}, rabbit_misc:table_lookup(Death1, <<"original-expiration">>)),
{timestamp, T1} = rabbit_misc:table_lookup(Death1, <<"time">>),
Expand Down

0 comments on commit 1c4af0c

Please sign in to comment.