Skip to content

Commit

Permalink
Support publishing AMQP 1.0 to Event Exchange
Browse files Browse the repository at this point in the history
Add new config to have the Exchange Event Plugin optionally publish AMQP 1.0 messages.
  • Loading branch information
ansd committed Nov 13, 2024
1 parent 2795293 commit 4ff9c8f
Show file tree
Hide file tree
Showing 8 changed files with 506 additions and 200 deletions.
11 changes: 2 additions & 9 deletions deps/rabbit/src/mc_amqpl.erl
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@
-define(AMQP10_FOOTER, <<"x-amqp-1.0-footer">>).
-define(PROTOMOD, rabbit_framing_amqp_0_9_1).
-define(CLASS_ID, 60).
-define(LONGSTR_UTF8_LIMIT, 4096).

-opaque state() :: #content{}.

Expand Down Expand Up @@ -682,19 +681,13 @@ wrap(_Type, undefined) ->
wrap(Type, Val) ->
{Type, Val}.

from_091(longstr, V)
when is_binary(V) andalso
byte_size(V) =< ?LONGSTR_UTF8_LIMIT ->
%% if a longstr is longer than 4096 bytes we just assume it is binary
%% it _may_ still be valid utf8 but checking this for every longstr header
%% value is going to be excessively slow
case mc_util:is_utf8_no_null(V) of
from_091(longstr, V) ->
case mc_util:is_utf8_no_null_limited(V) of
true ->
{utf8, V};
false ->
{binary, V}
end;
from_091(longstr, V) -> {binary, V};
from_091(long, V) -> {long, V};
from_091(unsignedbyte, V) -> {ubyte, V};
from_091(short, V) -> {short, V};
Expand Down
13 changes: 13 additions & 0 deletions deps/rabbit/src/mc_util.erl
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
-include("mc.hrl").

-export([is_valid_shortstr/1,
is_utf8_no_null_limited/1,
is_utf8_no_null/1,
uuid_to_urn_string/1,
urn_string_to_uuid/1,
Expand All @@ -12,12 +13,24 @@
is_x_header/1
]).

-define(UTF8_LIMIT, 4096).

-spec is_valid_shortstr(term()) -> boolean().
is_valid_shortstr(Bin) when ?IS_SHORTSTR_LEN(Bin) ->
is_utf8_no_null(Bin);
is_valid_shortstr(_) ->
false.

-spec is_utf8_no_null_limited(term()) -> boolean().
is_utf8_no_null_limited(Bin)
when byte_size(Bin) =< ?UTF8_LIMIT ->
is_utf8_no_null(Bin);
is_utf8_no_null_limited(_Term) ->
%% If longer than 4096 bytes, just assume it's not UTF-8.
%% It _may_ still be valid UTF-8 but checking this
%% on the hot path is going to be excessively slow.
false.

-spec is_utf8_no_null(term()) -> boolean().
is_utf8_no_null(Term) ->
utf8_scan(Term, fun (C) -> C > 0 end).
Expand Down
6 changes: 6 additions & 0 deletions deps/rabbitmq_event_exchange/Makefile
Original file line number Diff line number Diff line change
@@ -1,6 +1,12 @@
PROJECT = rabbitmq_event_exchange
PROJECT_DESCRIPTION = Event Exchange Type

define PROJECT_ENV
[
{protocol, amqp_0_9_1}
]
endef

define PROJECT_APP_EXTRA_KEYS
{broker_version_requirements, []}
endef
Expand Down
4 changes: 2 additions & 2 deletions deps/rabbitmq_event_exchange/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,8 @@ the management plugin for stats.

## How it Works

It declares a topic exchange called `amq.rabbitmq.event` **in the default
virtual host**. All events are published to this exchange with routing
It declares a topic exchange called `amq.rabbitmq.event`, by default in the default
virtual host (`/`). All events are published to this exchange with routing
keys like 'exchange.created', 'binding.deleted' etc, so you can
subscribe to only the events you're interested in.

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,3 +5,7 @@
fun(Conf) ->
list_to_binary(cuttlefish:conf_get("event_exchange.vhost", Conf))
end}.

{mapping, "event_exchange.protocol", "rabbitmq_event_exchange.protocol", [
{datatype, {enum, [amqp_0_9_1, amqp_1_0]}}
]}.
195 changes: 144 additions & 51 deletions deps/rabbitmq_event_exchange/src/rabbit_exchange_type_event.erl
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@

-include_lib("rabbit_common/include/rabbit.hrl").
-include_lib("rabbit_common/include/rabbit_framing.hrl").
-include_lib("amqp10_common/include/amqp10_framing.hrl").
-include_lib("rabbit/include/mc.hrl").
-include("rabbit_event_exchange.hrl").

-export([register/0, unregister/0]).
Expand All @@ -20,8 +22,11 @@

-export([fmt_proplist/1]). %% testing

-record(state, {vhost,
has_any_bindings
-define(APP_NAME, rabbitmq_event_exchange).

-record(state, {protocol :: amqp_0_9_1 | amqp_1_0,
vhost :: rabbit_types:vhost(),
has_any_bindings :: boolean()
}).

-rabbit_boot_step({rabbit_event_exchange,
Expand Down Expand Up @@ -65,41 +70,35 @@ exchange(VHost) ->
%%----------------------------------------------------------------------------

init([]) ->
{ok, Protocol} = application:get_env(?APP_NAME, protocol),
VHost = get_vhost(),
X = rabbit_misc:r(VHost, exchange, ?EXCH_NAME),
HasBindings = case rabbit_binding:list_for_source(X) of
[] -> false;
_ -> true
end,
{ok, #state{vhost = VHost,
[] -> false;
_ -> true
end,
{ok, #state{protocol = Protocol,
vhost = VHost,
has_any_bindings = HasBindings}}.

handle_call(_Request, State) -> {ok, not_understood, State}.

handle_event(_, #state{has_any_bindings = false} = State) ->
{ok, State};
handle_event(#event{type = Type,
props = Props,
timestamp = TS,
reference = none}, #state{vhost = VHost} = State) ->
_ = case key(Type) of
ignore -> ok;
Key ->
Props2 = [{<<"timestamp_in_ms">>, TS} | Props],
PBasic = #'P_basic'{delivery_mode = 2,
headers = fmt_proplist(Props2),
%% 0-9-1 says the timestamp is a
%% "64 bit POSIX
%% timestamp". That's second
%% resolution, not millisecond.
timestamp = erlang:convert_time_unit(
TS, milli_seconds, seconds)},
Content = rabbit_basic:build_content(PBasic, <<>>),
XName = exchange(VHost),
{ok, Msg} = mc_amqpl:message(XName, Key, Content),
rabbit_queue_type:publish_at_most_once(XName, Msg)
end,
{ok, State};
handle_event(#event{type = Type,
props = Props,
reference = none,
timestamp = Timestamp},
#state{protocol = Protocol,
vhost = VHost,
has_any_bindings = true} = State) ->
case key(Type) of
ignore ->
{ok, State};
Key ->
XName = exchange(VHost),
Mc = mc_init(Protocol, XName, Key, Props, Timestamp),
_ = rabbit_queue_type:publish_at_most_once(XName, Mc),
{ok, State}
end;
handle_event(_Event, State) ->
{ok, State}.

Expand Down Expand Up @@ -207,9 +206,110 @@ key(S) ->
Tokens -> list_to_binary(string:join(Tokens, "."))
end.

get_vhost() ->
case application:get_env(?APP_NAME, vhost) of
undefined ->
{ok, V} = application:get_env(rabbit, default_vhost),
V;
{ok, V} ->
V
end.

mc_init(amqp_1_0, XName, Key, Props, Timestamp) ->
Sections = [#'v1_0.message_annotations'{content = props_to_message_annotations(Props)},
#'v1_0.properties'{creation_time = {timestamp, Timestamp}},
#'v1_0.data'{content = <<>>}],
Payload = iolist_to_binary([amqp10_framing:encode_bin(S) || S <- Sections]),
Anns = #{?ANN_EXCHANGE => XName#resource.name,
?ANN_ROUTING_KEYS => [Key]},
mc:init(mc_amqp, Payload, Anns);
mc_init(amqp_0_9_1, XName, Key, Props0, TimestampMillis) ->
Props = [{<<"timestamp_in_ms">>, TimestampMillis} | Props0],
Headers = fmt_proplist(Props),
TimestampSecs = erlang:convert_time_unit(TimestampMillis, millisecond, second),
PBasic = #'P_basic'{delivery_mode = 2,
headers = Headers,
timestamp = TimestampSecs},
Content = rabbit_basic:build_content(PBasic, <<>>),
{ok, Mc} = mc_amqpl:message(XName, Key, Content),
Mc.

props_to_message_annotations(Props) ->
KVList = lists:foldl(
fun({K, #resource{virtual_host = Vhost, name = Name}}, Acc) ->
Ann0 = {to_message_annotation_key(K), {utf8, Name}},
Ann1 = {{symbol, <<"x-opt-vhost">>}, {utf8, Vhost}},
[Ann0, Ann1 | Acc];
({K, V}, Acc) ->
Ann = {to_message_annotation_key(K),
to_message_annotation_val(V)},
[Ann | Acc]
end, [], Props),
lists:reverse(KVList).

to_message_annotation_key(Key) ->
Key1 = to_binary(Key),
Pattern = try persistent_term:get(cp_underscore)
catch error:badarg ->
Cp = binary:compile_pattern(<<"_">>),
ok = persistent_term:put(cp_underscore, Cp),
Cp
end,
Key2 = binary:replace(Key1, Pattern, <<"-">>, [global]),
Key3 = case Key2 of
<<"x-", _/binary>> ->
Key2;
_ ->
<<"x-opt-", Key2/binary>>
end,
{symbol, Key3}.

to_message_annotation_val(V)
when is_binary(V) ->
case mc_util:is_utf8_no_null_limited(V) of
true ->
{utf8, V};
false ->
{binary, V}
end;
to_message_annotation_val(V)
when is_integer(V) ->
{long, V};
to_message_annotation_val(V)
when is_number(V) ->
%% AMQP double and Erlang float are both 64-bit.
{double, V};
to_message_annotation_val(V)
when is_boolean(V) ->
{boolean, V};
to_message_annotation_val(V)
when is_pid(V) ->
{utf8, to_pid(V)};
to_message_annotation_val(V)
when is_atom(V) ->
{utf8, atom_to_binary(V, utf8)};
to_message_annotation_val([{Key, _} | _] = Proplist)
when is_atom(Key) orelse
is_binary(Key) ->
Map = lists:map(fun({K, V}) ->
{{utf8, to_binary(K)},
to_message_annotation_val(V)}
end, Proplist),
{map, Map};
to_message_annotation_val([{Key, Type, _} | _] = Table)
when is_binary(Key) andalso
is_atom(Type) ->
%% Looks like an AMQP 0.9.1 table
mc_amqpl:from_091(table, Table);
to_message_annotation_val(V)
when is_list(V) ->
{list, [to_message_annotation_val(Val) || Val <- V]};
to_message_annotation_val(V) ->
{utf8, fmt_other(V)}.

fmt_proplist(Props) ->
lists:foldl(fun({K, V}, Acc) ->
case fmt(a2b(K), V) of
case fmt(to_binary(K), V) of
L when is_list(L) -> lists:append(L, Acc);
T -> [T | Acc]
end
Expand All @@ -226,11 +326,8 @@ fmt(K, V) when is_number(V) -> {K, float, V};
fmt(K, V) when is_binary(V) -> {K, longstr, V};
fmt(K, [{_, _}|_] = Vs) -> {K, table, fmt_proplist(Vs)};
fmt(K, Vs) when is_list(Vs) -> {K, array, [fmt(V) || V <- Vs]};
fmt(K, V) when is_pid(V) -> {K, longstr,
list_to_binary(rabbit_misc:pid_to_string(V))};
fmt(K, V) -> {K, longstr,
list_to_binary(
rabbit_misc:format("~1000000000p", [V]))}.
fmt(K, V) when is_pid(V) -> {K, longstr, to_pid(V)};
fmt(K, V) -> {K, longstr, fmt_other(V)}.

%% Exactly the same as fmt/2, duplicated only for performance issues
fmt(true) -> {bool, true};
Expand All @@ -241,20 +338,16 @@ fmt(V) when is_number(V) -> {float, V};
fmt(V) when is_binary(V) -> {longstr, V};
fmt([{_, _}|_] = Vs) -> {table, fmt_proplist(Vs)};
fmt(Vs) when is_list(Vs) -> {array, [fmt(V) || V <- Vs]};
fmt(V) when is_pid(V) -> {longstr,
list_to_binary(rabbit_misc:pid_to_string(V))};
fmt(V) -> {longstr,
list_to_binary(
rabbit_misc:format("~1000000000p", [V]))}.
fmt(V) when is_pid(V) -> {longstr, to_pid(V)};
fmt(V) -> {longstr, fmt_other(V)}.

a2b(A) when is_atom(A) -> atom_to_binary(A, utf8);
a2b(B) when is_binary(B) -> B.
fmt_other(V) ->
list_to_binary(rabbit_misc:format("~1000000000p", [V])).

get_vhost() ->
case application:get_env(rabbitmq_event_exchange, vhost) of
undefined ->
{ok, V} = application:get_env(rabbit, default_vhost),
V;
{ok, V} ->
V
end.
to_binary(Val) when is_atom(Val) ->
atom_to_binary(Val);
to_binary(Val) when is_binary(Val) ->
Val.

to_pid(Val) ->
list_to_binary(rabbit_misc:pid_to_string(Val)).
Original file line number Diff line number Diff line change
@@ -1,19 +1,34 @@
[
{virtual_host1,
"event_exchange.vhost = /",
[
{rabbitmq_event_exchange, [
{vhost, <<"/">>}
]}
], [rabbitmq_event_exchange]
},
{virtual_host1,
"event_exchange.vhost = /",
[{rabbitmq_event_exchange, [
{vhost, <<"/">>}
]}],
[rabbitmq_event_exchange]
},

{virtual_host2,
"event_exchange.vhost = dev",
[
{rabbitmq_event_exchange, [
{vhost, <<"dev">>}
]}
], [rabbitmq_event_exchange]
}
{virtual_host2,
"event_exchange.vhost = dev",
[{rabbitmq_event_exchange, [
{vhost, <<"dev">>}
]}
],
[rabbitmq_event_exchange]
},

{protocol_amqp,
"event_exchange.protocol = amqp_1_0",
[{rabbitmq_event_exchange, [
{protocol, amqp_1_0}
]}],
[rabbitmq_event_exchange]
},

{protocol_amqpl,
"event_exchange.protocol = amqp_0_9_1",
[{rabbitmq_event_exchange, [
{protocol, amqp_0_9_1}
]}],
[rabbitmq_event_exchange]
}
].
Loading

0 comments on commit 4ff9c8f

Please sign in to comment.