From 4ff9c8f3f2af44279e87c68779a0bf4522ca4880 Mon Sep 17 00:00:00 2001 From: David Ansari Date: Tue, 12 Nov 2024 12:16:15 +0100 Subject: [PATCH] Support publishing AMQP 1.0 to Event Exchange Add new config to have the Exchange Event Plugin optionally publish AMQP 1.0 messages. --- deps/rabbit/src/mc_amqpl.erl | 11 +- deps/rabbit/src/mc_util.erl | 13 + deps/rabbitmq_event_exchange/Makefile | 6 + deps/rabbitmq_event_exchange/README.md | 4 +- .../schema/rabbitmq_event_exchange.schema | 4 + .../src/rabbit_exchange_type_event.erl | 195 +++++--- .../rabbitmq_event_exchange.snippets | 47 +- .../test/system_SUITE.erl | 426 +++++++++++++----- 8 files changed, 506 insertions(+), 200 deletions(-) diff --git a/deps/rabbit/src/mc_amqpl.erl b/deps/rabbit/src/mc_amqpl.erl index 936a1b130d89..cac190e2cb5e 100644 --- a/deps/rabbit/src/mc_amqpl.erl +++ b/deps/rabbit/src/mc_amqpl.erl @@ -43,7 +43,6 @@ -define(AMQP10_FOOTER, <<"x-amqp-1.0-footer">>). -define(PROTOMOD, rabbit_framing_amqp_0_9_1). -define(CLASS_ID, 60). --define(LONGSTR_UTF8_LIMIT, 4096). -opaque state() :: #content{}. @@ -682,19 +681,13 @@ wrap(_Type, undefined) -> wrap(Type, Val) -> {Type, Val}. -from_091(longstr, V) - when is_binary(V) andalso - byte_size(V) =< ?LONGSTR_UTF8_LIMIT -> - %% if a longstr is longer than 4096 bytes we just assume it is binary - %% it _may_ still be valid utf8 but checking this for every longstr header - %% value is going to be excessively slow - case mc_util:is_utf8_no_null(V) of +from_091(longstr, V) -> + case mc_util:is_utf8_no_null_limited(V) of true -> {utf8, V}; false -> {binary, V} end; -from_091(longstr, V) -> {binary, V}; from_091(long, V) -> {long, V}; from_091(unsignedbyte, V) -> {ubyte, V}; from_091(short, V) -> {short, V}; diff --git a/deps/rabbit/src/mc_util.erl b/deps/rabbit/src/mc_util.erl index 9ec7928de9b7..cab28289a2ad 100644 --- a/deps/rabbit/src/mc_util.erl +++ b/deps/rabbit/src/mc_util.erl @@ -3,6 +3,7 @@ -include("mc.hrl"). -export([is_valid_shortstr/1, + is_utf8_no_null_limited/1, is_utf8_no_null/1, uuid_to_urn_string/1, urn_string_to_uuid/1, @@ -12,12 +13,24 @@ is_x_header/1 ]). +-define(UTF8_LIMIT, 4096). + -spec is_valid_shortstr(term()) -> boolean(). is_valid_shortstr(Bin) when ?IS_SHORTSTR_LEN(Bin) -> is_utf8_no_null(Bin); is_valid_shortstr(_) -> false. +-spec is_utf8_no_null_limited(term()) -> boolean(). +is_utf8_no_null_limited(Bin) + when byte_size(Bin) =< ?UTF8_LIMIT -> + is_utf8_no_null(Bin); +is_utf8_no_null_limited(_Term) -> + %% If longer than 4096 bytes, just assume it's not UTF-8. + %% It _may_ still be valid UTF-8 but checking this + %% on the hot path is going to be excessively slow. + false. + -spec is_utf8_no_null(term()) -> boolean(). is_utf8_no_null(Term) -> utf8_scan(Term, fun (C) -> C > 0 end). diff --git a/deps/rabbitmq_event_exchange/Makefile b/deps/rabbitmq_event_exchange/Makefile index fdac1be67e6e..72d6367dd744 100644 --- a/deps/rabbitmq_event_exchange/Makefile +++ b/deps/rabbitmq_event_exchange/Makefile @@ -1,6 +1,12 @@ PROJECT = rabbitmq_event_exchange PROJECT_DESCRIPTION = Event Exchange Type +define PROJECT_ENV + [ + {protocol, amqp_0_9_1} + ] +endef + define PROJECT_APP_EXTRA_KEYS {broker_version_requirements, []} endef diff --git a/deps/rabbitmq_event_exchange/README.md b/deps/rabbitmq_event_exchange/README.md index 1380a4d30f72..0562e2218203 100644 --- a/deps/rabbitmq_event_exchange/README.md +++ b/deps/rabbitmq_event_exchange/README.md @@ -11,8 +11,8 @@ the management plugin for stats. ## How it Works -It declares a topic exchange called `amq.rabbitmq.event` **in the default -virtual host**. All events are published to this exchange with routing +It declares a topic exchange called `amq.rabbitmq.event`, by default in the default +virtual host (`/`). All events are published to this exchange with routing keys like 'exchange.created', 'binding.deleted' etc, so you can subscribe to only the events you're interested in. diff --git a/deps/rabbitmq_event_exchange/priv/schema/rabbitmq_event_exchange.schema b/deps/rabbitmq_event_exchange/priv/schema/rabbitmq_event_exchange.schema index c8b2efe5acdd..62de27e820c7 100644 --- a/deps/rabbitmq_event_exchange/priv/schema/rabbitmq_event_exchange.schema +++ b/deps/rabbitmq_event_exchange/priv/schema/rabbitmq_event_exchange.schema @@ -5,3 +5,7 @@ fun(Conf) -> list_to_binary(cuttlefish:conf_get("event_exchange.vhost", Conf)) end}. + +{mapping, "event_exchange.protocol", "rabbitmq_event_exchange.protocol", [ + {datatype, {enum, [amqp_0_9_1, amqp_1_0]}} +]}. diff --git a/deps/rabbitmq_event_exchange/src/rabbit_exchange_type_event.erl b/deps/rabbitmq_event_exchange/src/rabbit_exchange_type_event.erl index 70251406b20c..e1e321016978 100644 --- a/deps/rabbitmq_event_exchange/src/rabbit_exchange_type_event.erl +++ b/deps/rabbitmq_event_exchange/src/rabbit_exchange_type_event.erl @@ -11,6 +11,8 @@ -include_lib("rabbit_common/include/rabbit.hrl"). -include_lib("rabbit_common/include/rabbit_framing.hrl"). +-include_lib("amqp10_common/include/amqp10_framing.hrl"). +-include_lib("rabbit/include/mc.hrl"). -include("rabbit_event_exchange.hrl"). -export([register/0, unregister/0]). @@ -20,8 +22,11 @@ -export([fmt_proplist/1]). %% testing --record(state, {vhost, - has_any_bindings +-define(APP_NAME, rabbitmq_event_exchange). + +-record(state, {protocol :: amqp_0_9_1 | amqp_1_0, + vhost :: rabbit_types:vhost(), + has_any_bindings :: boolean() }). -rabbit_boot_step({rabbit_event_exchange, @@ -65,41 +70,35 @@ exchange(VHost) -> %%---------------------------------------------------------------------------- init([]) -> + {ok, Protocol} = application:get_env(?APP_NAME, protocol), VHost = get_vhost(), X = rabbit_misc:r(VHost, exchange, ?EXCH_NAME), HasBindings = case rabbit_binding:list_for_source(X) of - [] -> false; - _ -> true - end, - {ok, #state{vhost = VHost, + [] -> false; + _ -> true + end, + {ok, #state{protocol = Protocol, + vhost = VHost, has_any_bindings = HasBindings}}. handle_call(_Request, State) -> {ok, not_understood, State}. -handle_event(_, #state{has_any_bindings = false} = State) -> - {ok, State}; -handle_event(#event{type = Type, - props = Props, - timestamp = TS, - reference = none}, #state{vhost = VHost} = State) -> - _ = case key(Type) of - ignore -> ok; - Key -> - Props2 = [{<<"timestamp_in_ms">>, TS} | Props], - PBasic = #'P_basic'{delivery_mode = 2, - headers = fmt_proplist(Props2), - %% 0-9-1 says the timestamp is a - %% "64 bit POSIX - %% timestamp". That's second - %% resolution, not millisecond. - timestamp = erlang:convert_time_unit( - TS, milli_seconds, seconds)}, - Content = rabbit_basic:build_content(PBasic, <<>>), - XName = exchange(VHost), - {ok, Msg} = mc_amqpl:message(XName, Key, Content), - rabbit_queue_type:publish_at_most_once(XName, Msg) - end, - {ok, State}; +handle_event(#event{type = Type, + props = Props, + reference = none, + timestamp = Timestamp}, + #state{protocol = Protocol, + vhost = VHost, + has_any_bindings = true} = State) -> + case key(Type) of + ignore -> + {ok, State}; + Key -> + XName = exchange(VHost), + Mc = mc_init(Protocol, XName, Key, Props, Timestamp), + _ = rabbit_queue_type:publish_at_most_once(XName, Mc), + {ok, State} + end; handle_event(_Event, State) -> {ok, State}. @@ -207,9 +206,110 @@ key(S) -> Tokens -> list_to_binary(string:join(Tokens, ".")) end. +get_vhost() -> + case application:get_env(?APP_NAME, vhost) of + undefined -> + {ok, V} = application:get_env(rabbit, default_vhost), + V; + {ok, V} -> + V + end. + +mc_init(amqp_1_0, XName, Key, Props, Timestamp) -> + Sections = [#'v1_0.message_annotations'{content = props_to_message_annotations(Props)}, + #'v1_0.properties'{creation_time = {timestamp, Timestamp}}, + #'v1_0.data'{content = <<>>}], + Payload = iolist_to_binary([amqp10_framing:encode_bin(S) || S <- Sections]), + Anns = #{?ANN_EXCHANGE => XName#resource.name, + ?ANN_ROUTING_KEYS => [Key]}, + mc:init(mc_amqp, Payload, Anns); +mc_init(amqp_0_9_1, XName, Key, Props0, TimestampMillis) -> + Props = [{<<"timestamp_in_ms">>, TimestampMillis} | Props0], + Headers = fmt_proplist(Props), + TimestampSecs = erlang:convert_time_unit(TimestampMillis, millisecond, second), + PBasic = #'P_basic'{delivery_mode = 2, + headers = Headers, + timestamp = TimestampSecs}, + Content = rabbit_basic:build_content(PBasic, <<>>), + {ok, Mc} = mc_amqpl:message(XName, Key, Content), + Mc. + +props_to_message_annotations(Props) -> + KVList = lists:foldl( + fun({K, #resource{virtual_host = Vhost, name = Name}}, Acc) -> + Ann0 = {to_message_annotation_key(K), {utf8, Name}}, + Ann1 = {{symbol, <<"x-opt-vhost">>}, {utf8, Vhost}}, + [Ann0, Ann1 | Acc]; + ({K, V}, Acc) -> + Ann = {to_message_annotation_key(K), + to_message_annotation_val(V)}, + [Ann | Acc] + end, [], Props), + lists:reverse(KVList). + +to_message_annotation_key(Key) -> + Key1 = to_binary(Key), + Pattern = try persistent_term:get(cp_underscore) + catch error:badarg -> + Cp = binary:compile_pattern(<<"_">>), + ok = persistent_term:put(cp_underscore, Cp), + Cp + end, + Key2 = binary:replace(Key1, Pattern, <<"-">>, [global]), + Key3 = case Key2 of + <<"x-", _/binary>> -> + Key2; + _ -> + <<"x-opt-", Key2/binary>> + end, + {symbol, Key3}. + +to_message_annotation_val(V) + when is_binary(V) -> + case mc_util:is_utf8_no_null_limited(V) of + true -> + {utf8, V}; + false -> + {binary, V} + end; +to_message_annotation_val(V) + when is_integer(V) -> + {long, V}; +to_message_annotation_val(V) + when is_number(V) -> + %% AMQP double and Erlang float are both 64-bit. + {double, V}; +to_message_annotation_val(V) + when is_boolean(V) -> + {boolean, V}; +to_message_annotation_val(V) + when is_pid(V) -> + {utf8, to_pid(V)}; +to_message_annotation_val(V) + when is_atom(V) -> + {utf8, atom_to_binary(V, utf8)}; +to_message_annotation_val([{Key, _} | _] = Proplist) + when is_atom(Key) orelse + is_binary(Key) -> + Map = lists:map(fun({K, V}) -> + {{utf8, to_binary(K)}, + to_message_annotation_val(V)} + end, Proplist), + {map, Map}; +to_message_annotation_val([{Key, Type, _} | _] = Table) + when is_binary(Key) andalso + is_atom(Type) -> + %% Looks like an AMQP 0.9.1 table + mc_amqpl:from_091(table, Table); +to_message_annotation_val(V) + when is_list(V) -> + {list, [to_message_annotation_val(Val) || Val <- V]}; +to_message_annotation_val(V) -> + {utf8, fmt_other(V)}. + fmt_proplist(Props) -> lists:foldl(fun({K, V}, Acc) -> - case fmt(a2b(K), V) of + case fmt(to_binary(K), V) of L when is_list(L) -> lists:append(L, Acc); T -> [T | Acc] end @@ -226,11 +326,8 @@ fmt(K, V) when is_number(V) -> {K, float, V}; fmt(K, V) when is_binary(V) -> {K, longstr, V}; fmt(K, [{_, _}|_] = Vs) -> {K, table, fmt_proplist(Vs)}; fmt(K, Vs) when is_list(Vs) -> {K, array, [fmt(V) || V <- Vs]}; -fmt(K, V) when is_pid(V) -> {K, longstr, - list_to_binary(rabbit_misc:pid_to_string(V))}; -fmt(K, V) -> {K, longstr, - list_to_binary( - rabbit_misc:format("~1000000000p", [V]))}. +fmt(K, V) when is_pid(V) -> {K, longstr, to_pid(V)}; +fmt(K, V) -> {K, longstr, fmt_other(V)}. %% Exactly the same as fmt/2, duplicated only for performance issues fmt(true) -> {bool, true}; @@ -241,20 +338,16 @@ fmt(V) when is_number(V) -> {float, V}; fmt(V) when is_binary(V) -> {longstr, V}; fmt([{_, _}|_] = Vs) -> {table, fmt_proplist(Vs)}; fmt(Vs) when is_list(Vs) -> {array, [fmt(V) || V <- Vs]}; -fmt(V) when is_pid(V) -> {longstr, - list_to_binary(rabbit_misc:pid_to_string(V))}; -fmt(V) -> {longstr, - list_to_binary( - rabbit_misc:format("~1000000000p", [V]))}. +fmt(V) when is_pid(V) -> {longstr, to_pid(V)}; +fmt(V) -> {longstr, fmt_other(V)}. -a2b(A) when is_atom(A) -> atom_to_binary(A, utf8); -a2b(B) when is_binary(B) -> B. +fmt_other(V) -> + list_to_binary(rabbit_misc:format("~1000000000p", [V])). -get_vhost() -> - case application:get_env(rabbitmq_event_exchange, vhost) of - undefined -> - {ok, V} = application:get_env(rabbit, default_vhost), - V; - {ok, V} -> - V - end. +to_binary(Val) when is_atom(Val) -> + atom_to_binary(Val); +to_binary(Val) when is_binary(Val) -> + Val. + +to_pid(Val) -> + list_to_binary(rabbit_misc:pid_to_string(Val)). diff --git a/deps/rabbitmq_event_exchange/test/config_schema_SUITE_data/rabbitmq_event_exchange.snippets b/deps/rabbitmq_event_exchange/test/config_schema_SUITE_data/rabbitmq_event_exchange.snippets index 2fceed017a96..70eb722731b9 100644 --- a/deps/rabbitmq_event_exchange/test/config_schema_SUITE_data/rabbitmq_event_exchange.snippets +++ b/deps/rabbitmq_event_exchange/test/config_schema_SUITE_data/rabbitmq_event_exchange.snippets @@ -1,19 +1,34 @@ [ - {virtual_host1, - "event_exchange.vhost = /", - [ - {rabbitmq_event_exchange, [ - {vhost, <<"/">>} - ]} - ], [rabbitmq_event_exchange] - }, +{virtual_host1, + "event_exchange.vhost = /", + [{rabbitmq_event_exchange, [ + {vhost, <<"/">>} + ]}], + [rabbitmq_event_exchange] +}, - {virtual_host2, - "event_exchange.vhost = dev", - [ - {rabbitmq_event_exchange, [ - {vhost, <<"dev">>} - ]} - ], [rabbitmq_event_exchange] - } +{virtual_host2, + "event_exchange.vhost = dev", + [{rabbitmq_event_exchange, [ + {vhost, <<"dev">>} + ]} + ], + [rabbitmq_event_exchange] +}, + +{protocol_amqp, + "event_exchange.protocol = amqp_1_0", + [{rabbitmq_event_exchange, [ + {protocol, amqp_1_0} + ]}], + [rabbitmq_event_exchange] +}, + +{protocol_amqpl, + "event_exchange.protocol = amqp_0_9_1", + [{rabbitmq_event_exchange, [ + {protocol, amqp_0_9_1} + ]}], + [rabbitmq_event_exchange] +} ]. diff --git a/deps/rabbitmq_event_exchange/test/system_SUITE.erl b/deps/rabbitmq_event_exchange/test/system_SUITE.erl index 4610378131ea..e78966ce1c64 100644 --- a/deps/rabbitmq_event_exchange/test/system_SUITE.erl +++ b/deps/rabbitmq_event_exchange/test/system_SUITE.erl @@ -13,74 +13,83 @@ -compile(export_all). --define(TAG, <<"user_who_performed_action">>). - all() -> [ - {group, amqp}, - {group, amqpl} + {group, amqp_1_0}, + {group, amqp_0_9_1} ]. groups() -> [ - {amqp, [shuffle], + {amqp_1_0, [shuffle], + shared_tests() ++ [ - amqp_connection + amqp_1_0_amqp_connection, + amqp_1_0_queue_created, + headers_exchange ]}, - {amqpl, [], + {amqp_0_9_1, [], + shared_tests() ++ [ - queue_created, - authentication, - audit_queue, - audit_exchange, - audit_exchange_internal_parameter, - audit_binding, - audit_vhost, - audit_vhost_deletion, - audit_channel, - audit_connection, - audit_direct_connection, - audit_consumer, - audit_parameter, - audit_policy, - audit_vhost_limit, - audit_user, - audit_user_password, - audit_user_tags, - audit_permission, - audit_topic_permission, - resource_alarm, + amqp_0_9_1_amqp_connection, + amqp_0_9_1_queue_created, unregister ]} ]. +shared_tests() -> + [ + authentication_success, + authentication_failure, + audit_queue, + audit_exchange, + audit_exchange_internal_parameter, + audit_binding, + audit_vhost, + audit_vhost_deletion, + audit_channel, + audit_connection, + audit_direct_connection, + audit_consumer, + audit_parameter, + audit_policy, + audit_vhost_limit, + audit_user, + audit_user_password, + audit_user_tags, + audit_permission, + audit_topic_permission, + resource_alarm + ]. + %% ------------------------------------------------------------------- %% Testsuite setup/teardown. %% ------------------------------------------------------------------- init_per_suite(Config) -> + {ok, _} = application:ensure_all_started(rabbitmq_amqp_client), rabbit_ct_helpers:log_environment(), - Config1 = rabbit_ct_helpers:set_config(Config, [ - {rmq_nodename_suffix, ?MODULE} - ]), - Config2 = rabbit_ct_helpers:run_setup_steps(Config1, - rabbit_ct_broker_helpers:setup_steps() ++ - rabbit_ct_client_helpers:setup_steps()), - Config2. + Config. end_per_suite(Config) -> - rabbit_ct_helpers:run_teardown_steps(Config, - rabbit_ct_client_helpers:teardown_steps() ++ - rabbit_ct_broker_helpers:teardown_steps()). - -init_per_group(amqp, Config) -> - {ok, _} = application:ensure_all_started(rabbitmq_amqp_client), - Config; -init_per_group(_, Config) -> Config. -end_per_group(_, Config) -> - Config. +init_per_group(Group, Config) -> + Config1 = rabbit_ct_helpers:merge_app_env( + Config, + {rabbitmq_event_exchange, [{protocol, Group}]}), + Config2 = rabbit_ct_helpers:set_config( + Config1, [{rmq_nodename_suffix, ?MODULE}]), + rabbit_ct_helpers:run_setup_steps( + Config2, + rabbit_ct_broker_helpers:setup_steps() ++ + rabbit_ct_client_helpers:setup_steps()). + +end_per_group(_Group, Config) -> + rabbit_ct_helpers:run_teardown_steps( + Config, + rabbit_ct_client_helpers:teardown_steps() ++ + rabbit_ct_broker_helpers:teardown_steps()). init_per_testcase(Testcase, Config) -> rabbit_ct_helpers:testcase_started(Config, Testcase). @@ -88,34 +97,50 @@ init_per_testcase(Testcase, Config) -> end_per_testcase(Testcase, Config) -> rabbit_ct_helpers:testcase_finished(Config, Testcase). - %% ------------------------------------------------------------------- %% Testsuite cases %% ------------------------------------------------------------------- -%% Only really tests that we're not completely broken. -queue_created(Config) -> - Now = os:system_time(seconds), +amqp_1_0_queue_created(Config) -> + QName = atom_to_binary(?FUNCTION_NAME), + Headers = queue_created(QName, Config), + ?assertEqual({longstr, QName}, + rabbit_misc:table_lookup(Headers, <<"x-opt-name">>)), + ?assertEqual({table, [{<<"x-queue-type">>, longstr, <<"classic">>}]}, + rabbit_misc:table_lookup(Headers, <<"x-opt-arguments">>)). - Ch = declare_event_queue(Config, <<"queue.*">>), +amqp_0_9_1_queue_created(Config) -> + QName = atom_to_binary(?FUNCTION_NAME), + Headers = queue_created(QName,Config), + ?assertEqual({longstr, QName}, + rabbit_misc:table_lookup(Headers, <<"name">>)), + {array, QArgs} = rabbit_misc:table_lookup(Headers, <<"arguments">>), + ?assertEqual(<<"{<<\"x-queue-type\">>,longstr,<<\"classic\">>}">>, + proplists:get_value(longstr, QArgs)). - #'queue.declare_ok'{queue = Q2} = - amqp_channel:call(Ch, #'queue.declare'{exclusive = true}), +queue_created(QName, Config) -> + Ch = declare_event_queue(Config, <<"queue.created">>), + + Now = os:system_time(seconds), + #'queue.declare_ok'{} = amqp_channel:call( + Ch, #'queue.declare'{ + queue = QName, + exclusive = true, + arguments = [{<<"x-queue-type">>, longstr, <<"classic">>}] + }), receive {#'basic.deliver'{routing_key = Key}, - #amqp_msg{props = #'P_basic'{headers = Headers, timestamp = TS}}} -> + #amqp_msg{props = #'P_basic'{headers = Headers, + timestamp = TS}}} -> %% timestamp is within the last 5 seconds - true = ((TS - Now) =< 5), - <<"queue.created">> = Key, - {longstr, Q2} = rabbit_misc:table_lookup(Headers, <<"name">>) - end, - - rabbit_ct_client_helpers:close_channel(Ch), - ok. - + ?assert(((TS - Now) =< 5)), + ?assertEqual(<<"queue.created">>, Key), + rabbit_ct_client_helpers:close_channel(Ch), + Headers + end. -authentication(Config) -> +authentication_success(Config) -> Ch = declare_event_queue(Config, <<"user.#">>), Conn2 = rabbit_ct_client_helpers:open_unmanaged_connection(Config, 0), @@ -123,14 +148,41 @@ authentication(Config) -> {#'basic.deliver'{routing_key = Key}, #amqp_msg{props = #'P_basic'{headers = Headers}}} -> <<"user.authentication.success">> = Key, - undefined = rabbit_misc:table_lookup(Headers, <<"vhost">>), - {longstr, _PeerHost} = rabbit_misc:table_lookup(Headers, <<"peer_host">>), - {bool, false} = rabbit_misc:table_lookup(Headers, <<"ssl">>) + {Vhost, PeerHost, Ssl} = + case group_name(Config) of + amqp_0_9_1 -> + {<<"vhost">>, <<"peer_host">>, <<"ssl">>}; + amqp_1_0 -> + {<<"x-opt-vhost">>, <<"x-opt-peer-host">>, <<"x-opt-ssl">>} + end, + undefined = rabbit_misc:table_lookup(Headers, Vhost), + {longstr, _PeerHost} = rabbit_misc:table_lookup(Headers, PeerHost), + {bool, false} = rabbit_misc:table_lookup(Headers, Ssl) + after 5000 -> missing_deliver end, - amqp_connection:close(Conn2), - rabbit_ct_client_helpers:close_channel(Ch), - ok. + ok = amqp_connection:close(Conn2), + ok = rabbit_ct_client_helpers:close_channel(Ch). + +authentication_failure(Config) -> + Ch = declare_event_queue(Config, <<"user.authentication.*">>), + {error, _} = rabbit_ct_client_helpers:open_unmanaged_connection( + Config, 0, <<"fake user">>, <<"fake password">>), + + receive + {#'basic.deliver'{routing_key = Key}, + #amqp_msg{props = #'P_basic'{headers = Headers}}} -> + ?assertEqual(<<"user.authentication.failure">>, Key), + User = case group_name(Config) of + amqp_0_9_1 -> <<"name">>; + amqp_1_0 -> <<"x-opt-name">> + end, + ?assertEqual({longstr, <<"fake user">>}, + rabbit_misc:table_lookup(Headers, User)) + after 5000 -> missing_deliver + end, + + ok = rabbit_ct_client_helpers:close_channel(Ch). audit_queue(Config) -> Ch = declare_event_queue(Config, <<"queue.*">>), @@ -138,13 +190,12 @@ audit_queue(Config) -> #'queue.declare_ok'{queue = Q} = amqp_channel:call(Ch, #'queue.declare'{exclusive = true}), - User = proplists:get_value(rmq_username, Config), - receive_user_in_event(<<"queue.created">>, User), + receive_user_in_event(<<"queue.created">>, Config), #'queue.delete_ok'{} = amqp_channel:call(Ch, #'queue.delete'{queue = Q}), - receive_user_in_event(<<"queue.deleted">>, User), + receive_user_in_event(<<"queue.deleted">>, Config), rabbit_ct_client_helpers:close_channel(Ch), ok. @@ -157,13 +208,12 @@ audit_exchange(Config) -> amqp_channel:call(Ch, #'exchange.declare'{exchange = X, type = <<"topic">>}), - User = proplists:get_value(rmq_username, Config), - receive_user_in_event(<<"exchange.created">>, User), + receive_user_in_event(<<"exchange.created">>, Config), #'exchange.delete_ok'{} = amqp_channel:call(Ch, #'exchange.delete'{exchange = X}), - receive_user_in_event(<<"exchange.deleted">>, User), + receive_user_in_event(<<"exchange.deleted">>, Config), rabbit_ct_client_helpers:close_channel(Ch), ok. @@ -171,8 +221,7 @@ audit_exchange(Config) -> audit_binding(Config) -> Ch = declare_event_queue(Config, <<"binding.*">>), %% The binding to the event exchange itself is the first queued event - User = proplists:get_value(rmq_username, Config), - receive_user_in_event(<<"binding.created">>, User), + receive_user_in_event(<<"binding.created">>, Config), #'queue.declare_ok'{queue = Q} = amqp_channel:call(Ch, #'queue.declare'{exclusive = true}), @@ -181,26 +230,34 @@ audit_binding(Config) -> amqp_channel:call(Ch, #'queue.bind'{queue = Q, exchange = <<"amq.direct">>, routing_key = <<"test">>}), - receive_user_in_event(<<"binding.created">>, User), + receive_user_in_event(<<"binding.created">>, Config), #'queue.unbind_ok'{} = amqp_channel:call(Ch, #'queue.unbind'{queue = Q, exchange = <<"amq.direct">>, routing_key = <<"test">>}), - receive_user_in_event(<<"binding.deleted">>, User), + receive_user_in_event(<<"binding.deleted">>, Config), rabbit_ct_client_helpers:close_channel(Ch), ok. audit_vhost(Config) -> + Node = atom_to_binary(rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename)), Ch = declare_event_queue(Config, <<"vhost.*">>), User = <<"Bugs Bunny">>, rabbit_ct_broker_helpers:add_vhost(Config, 0, <<"test-vhost">>, User), - receive_user_in_event(<<"vhost.created">>, User), + Headers = receive_user_in_event(<<"vhost.created">>, User, Config), + + Key = case group_name(Config) of + amqp_0_9_1 -> <<"cluster_state">>; + amqp_1_0 -> <<"x-opt-cluster-state">> + end, + ?assertEqual({table, [{Node, longstr, <<"running">>}]}, + rabbit_misc:table_lookup(Headers, Key)), rabbit_ct_broker_helpers:delete_vhost(Config, 0, <<"test-vhost">>, User), - receive_user_in_event(<<"vhost.deleted">>, User), + receive_user_in_event(<<"vhost.deleted">>, User, Config), rabbit_ct_client_helpers:close_channel(Ch), ok. @@ -218,72 +275,81 @@ audit_vhost_deletion(Config) -> %% The user that creates the queue is the connection one, not the vhost creator #'queue.declare_ok'{queue = _Q} = amqp_channel:call(Ch2, #'queue.declare'{}), - receive_user_in_event(<<"queue.created">>, ConnUser), + receive_user_in_event(<<"queue.created">>, ConnUser, Config), ok = rabbit_ct_client_helpers:close_connection_and_channel(Conn, Ch2), %% Validate that the user deleting the queue is the one used to delete the vhost, %% not the original user that created the queue (the connection one) rabbit_ct_broker_helpers:delete_vhost(Config, 0, Vhost, User), - receive_user_in_event(<<"queue.deleted">>, User), + receive_user_in_event(<<"queue.deleted">>, User, Config), rabbit_ct_client_helpers:close_channel(Ch), ok. audit_channel(Config) -> Ch = declare_event_queue(Config, <<"channel.*">>), - User = proplists:get_value(rmq_username, Config), Conn = rabbit_ct_client_helpers:open_unmanaged_connection(Config), {ok, Ch2} = amqp_connection:open_channel(Conn), - receive_user_in_event(<<"channel.created">>, User), + receive_user_in_event(<<"channel.created">>, Config), rabbit_ct_client_helpers:close_channel(Ch2), - receive_user_in_event(<<"channel.closed">>, User), + receive_user_in_event(<<"channel.closed">>, Config), rabbit_ct_client_helpers:close_channel(Ch), ok. audit_connection(Config) -> Ch = declare_event_queue(Config, <<"connection.*">>), - User = proplists:get_value(rmq_username, Config), Conn = rabbit_ct_client_helpers:open_unmanaged_connection(Config), - receive_user_in_event(<<"connection.created">>, User), + receive_user_in_event(<<"connection.created">>, Config), %% Username is not available in connection_close rabbit_ct_client_helpers:close_connection(Conn), - receive_event(<<"connection.closed">>, ?TAG, undefined), + Headers = receive_event(<<"connection.closed">>, user_key(Config), undefined), + case group_name(Config) of + amqp_0_9_1 -> + ?assert(lists:keymember(<<"client_properties">>, 1, Headers)); + amqp_1_0 -> + {table, ClientProps} = rabbit_misc:table_lookup(Headers, <<"x-opt-client-properties">>), + ?assertEqual({longstr, <<"Erlang">>}, + rabbit_misc:table_lookup(ClientProps, <<"platform">>)), + {table, Caps} = rabbit_misc:table_lookup(ClientProps, <<"capabilities">>), + ?assertEqual({bool, true}, + rabbit_misc:table_lookup(Caps, <<"basic.nack">>)), + ?assertEqual({bool, true}, + rabbit_misc:table_lookup(Caps, <<"connection.blocked">>)) + end, rabbit_ct_client_helpers:close_channel(Ch), ok. audit_direct_connection(Config) -> Ch = declare_event_queue(Config, <<"connection.*">>), - User = proplists:get_value(rmq_username, Config), Conn = rabbit_ct_client_helpers:open_unmanaged_connection_direct(Config), - receive_user_in_event(<<"connection.created">>, User), + receive_user_in_event(<<"connection.created">>, Config), rabbit_ct_client_helpers:close_connection(Conn), - receive_event(<<"connection.closed">>, ?TAG, undefined), + receive_event(<<"connection.closed">>, user_key(Config), undefined), rabbit_ct_client_helpers:close_channel(Ch), ok. audit_consumer(Config) -> Ch = declare_event_queue(Config, <<"consumer.*">>), - User = proplists:get_value(rmq_username, Config), - receive_user_in_event(<<"consumer.created">>, User), + receive_user_in_event(<<"consumer.created">>, Config), #'queue.declare_ok'{queue = Q} = amqp_channel:call(Ch, #'queue.declare'{exclusive = true}), amqp_channel:subscribe(Ch, #'basic.consume'{queue = Q, no_ack = true}, self()), CTag = receive #'basic.consume_ok'{consumer_tag = C} -> C end, - receive_user_in_event(<<"consumer.created">>, User), + receive_user_in_event(<<"consumer.created">>, Config), amqp_channel:call(Ch, #'basic.cancel'{consumer_tag = CTag}), - receive_user_in_event(<<"consumer.deleted">>, User), + receive_user_in_event(<<"consumer.deleted">>, Config), rabbit_ct_client_helpers:close_channel(Ch), ok. @@ -298,11 +364,10 @@ audit_exchange_internal_parameter(Config) -> #'exchange.delete_ok'{} = amqp_channel:call(Ch, #'exchange.delete'{exchange = X}), - User = proplists:get_value(rmq_username, Config), %% Exchange deletion sets and clears a runtime parameter which acts as a %% kind of lock: - receive_user_in_event(<<"parameter.set">>, User), - receive_user_in_event(<<"parameter.cleared">>, User), + receive_user_in_event(<<"parameter.set">>, Config), + receive_user_in_event(<<"parameter.cleared">>, Config), rabbit_ct_client_helpers:close_channel(Ch), ok. @@ -315,11 +380,11 @@ audit_parameter(Config) -> ok = rabbit_ct_broker_helpers:set_parameter( Config, 0, VHost, <<"vhost-limits">>, <<"limits">>, [{<<"max-connections">>, 200}], User), - receive_user_in_event(<<"parameter.set">>, User), + receive_user_in_event(<<"parameter.set">>, User, Config), ok = rabbit_ct_broker_helpers:clear_parameter( Config, 0, VHost, <<"vhost-limits">>, <<"limits">>, User), - receive_user_in_event(<<"parameter.cleared">>, User), + receive_user_in_event(<<"parameter.cleared">>, User, Config), rabbit_ct_client_helpers:close_channel(Ch), ok. @@ -330,10 +395,10 @@ audit_policy(Config) -> rabbit_ct_broker_helpers:set_policy(Config, 0, <<".*">>, <<"all">>, <<"queues">>, [{<<"max-length-bytes">>, 10000}], User), - receive_user_in_event(<<"policy.set">>, User), + receive_user_in_event(<<"policy.set">>, User, Config), ok = rabbit_ct_broker_helpers:clear_policy(Config, 0, <<".*">>, User), - receive_user_in_event(<<"policy.cleared">>, User), + receive_user_in_event(<<"policy.cleared">>, User, Config), rabbit_ct_client_helpers:close_channel(Ch), ok. @@ -346,11 +411,11 @@ audit_vhost_limit(Config) -> ok = rabbit_ct_broker_helpers:set_parameter( Config, 0, VHost, <<"vhost-limits">>, <<"limits">>, [{<<"max-connections">>, 200}], User), - receive_user_in_event(<<"vhost.limits.set">>, User), + receive_user_in_event(<<"vhost.limits.set">>, User, Config), ok = rabbit_ct_broker_helpers:clear_parameter( Config, 0, VHost, <<"vhost-limits">>, <<"limits">>, User), - receive_user_in_event(<<"vhost.limits.cleared">>, User), + receive_user_in_event(<<"vhost.limits.cleared">>, User, Config), rabbit_ct_client_helpers:close_channel(Ch), ok. @@ -361,10 +426,10 @@ audit_user(Config) -> User = <<"Wabbit">>, rabbit_ct_broker_helpers:add_user(Config, 0, User, User, ActingUser), - receive_user_in_event(<<"user.created">>, ActingUser), + receive_user_in_event(<<"user.created">>, ActingUser, Config), rabbit_ct_broker_helpers:delete_user(Config, 0, User, ActingUser), - receive_user_in_event(<<"user.deleted">>, ActingUser), + receive_user_in_event(<<"user.deleted">>, ActingUser, Config), rabbit_ct_client_helpers:close_channel(Ch), ok. @@ -376,10 +441,10 @@ audit_user_password(Config) -> rabbit_ct_broker_helpers:add_user(Config, 0, User, User, ActingUser), rabbit_ct_broker_helpers:change_password(Config, 0, User, <<"pass">>, ActingUser), - receive_user_in_event(<<"user.password.changed">>, ActingUser), + receive_user_in_event(<<"user.password.changed">>, ActingUser, Config), rabbit_ct_broker_helpers:clear_password(Config, 0, User, ActingUser), - receive_user_in_event(<<"user.password.cleared">>, ActingUser), + receive_user_in_event(<<"user.password.cleared">>, ActingUser, Config), rabbit_ct_broker_helpers:delete_user(Config, 0, User, ActingUser), rabbit_ct_client_helpers:close_channel(Ch), @@ -392,7 +457,7 @@ audit_user_tags(Config) -> rabbit_ct_broker_helpers:add_user(Config, 0, User, User, ActingUser), rabbit_ct_broker_helpers:set_user_tags(Config, 0, User, [management], ActingUser), - receive_user_in_event(<<"user.tags.set">>, ActingUser), + receive_user_in_event(<<"user.tags.set">>, ActingUser, Config), rabbit_ct_broker_helpers:delete_user(Config, 0, User, ActingUser), @@ -408,10 +473,10 @@ audit_permission(Config) -> rabbit_ct_broker_helpers:add_user(Config, 0, User, User, ActingUser), rabbit_ct_broker_helpers:set_permissions(Config, 0, User, VHost, <<".*">>, <<".*">>, <<".*">>, ActingUser), - receive_user_in_event(<<"permission.created">>, ActingUser), + receive_user_in_event(<<"permission.created">>, ActingUser, Config), rabbit_ct_broker_helpers:clear_permissions(Config, 0, User, VHost, ActingUser), - receive_user_in_event(<<"permission.deleted">>, ActingUser), + receive_user_in_event(<<"permission.deleted">>, ActingUser, Config), rabbit_ct_broker_helpers:delete_user(Config, 0, User, ActingUser), rabbit_ct_client_helpers:close_channel(Ch), @@ -427,12 +492,12 @@ audit_topic_permission(Config) -> rabbit_ct_broker_helpers:rpc( Config, 0, rabbit_auth_backend_internal, set_topic_permissions, [User, VHost, <<"amq.topic">>, "^a", "^a", ActingUser]), - receive_user_in_event(<<"topic.permission.created">>, ActingUser), + receive_user_in_event(<<"topic.permission.created">>, ActingUser, Config), rabbit_ct_broker_helpers:rpc( Config, 0, rabbit_auth_backend_internal, clear_topic_permissions, [User, VHost, ActingUser]), - receive_user_in_event(<<"topic.permission.deleted">>, ActingUser), + receive_user_in_event(<<"topic.permission.deleted">>, ActingUser, Config), rabbit_ct_broker_helpers:delete_user(Config, 0, User, ActingUser), rabbit_ct_client_helpers:close_channel(Ch), @@ -469,8 +534,8 @@ unregister(Config) -> lookup, [X])), ok. -%% Test that the event exchange works when publising and consuming via AMQP 1.0. -amqp_connection(Config) -> +%% Test the plugin publishing internally with AMQP 0.9.1 while the client uses AMQP 1.0. +amqp_0_9_1_amqp_connection(Config) -> QName = atom_to_binary(?FUNCTION_NAME), Address = rabbitmq_amqp_address:queue(QName), {Connection1, Session, LinkPair} = amqp_init(Config), @@ -498,10 +563,120 @@ amqp_connection(Config) -> ok = amqp10_client:end_session(Session), ok = amqp10_client:close_connection(Connection1). +%% Test the plugin publishing internally with AMQP 1.0 and the client using AMQP 1.0. +amqp_1_0_amqp_connection(Config) -> + QName = atom_to_binary(?FUNCTION_NAME), + Address = rabbitmq_amqp_address:queue(QName), + {Connection1, Session, LinkPair} = amqp_init(Config), + {ok, _} = rabbitmq_amqp_client:declare_queue(LinkPair, QName,#{}), + ok = rabbitmq_amqp_client:bind_queue( + LinkPair, QName, <<"amq.rabbitmq.event">>, <<"connection.*">>, #{}), + {ok, Receiver} = amqp10_client:attach_receiver_link( + Session, <<"receiver">>, Address, settled), + + OpnConf0 = amqp_connection_config(Config), + OpnConf = maps:update(container_id, <<"2nd container">>, OpnConf0), + {ok, Connection2} = amqp10_client:open_connection(OpnConf), + receive {amqp10_event, {connection, Connection2, opened}} -> ok + after 5000 -> ct:fail({missing_event, ?LINE}) + end, + {ok, Msg} = amqp10_client:get_msg(Receiver), + ?assertEqual([<<>>], amqp10_msg:body(Msg)), + MsgAnns = amqp10_msg:message_annotations(Msg), + ?assertMatch(#{<<"x-routing-key">> := <<"connection.created">>, + <<"x-opt-container-id">> := <<"2nd container">>, + <<"x-opt-channel-max">> := ChannelMax} + when is_integer(ChannelMax), + MsgAnns), + %% We expect to receive event properties that have complex types. + ClientProps = maps:get(<<"x-opt-client-properties">>, MsgAnns), + OtpRelease = integer_to_binary(?OTP_RELEASE), + ?assertMatch(#{ + {symbol, <<"version">>} := {utf8, _}, + {symbol, <<"product">>} := {utf8, <<"AMQP 1.0 client">>}, + {symbol, <<"platform">>} := {utf8, <<"Erlang/OTP ", OtpRelease/binary>>} + }, + maps:from_list(ClientProps)), + FormattedPid = maps:get(<<"x-opt-pid">>, MsgAnns), + %% The formatted Pid should include the RabbitMQ node name: + ?assertMatch({match, _}, + re:run(FormattedPid, <<"rmq-ct-system_SUITE">>)), + + ok = amqp10_client:close_connection(Connection2), + {ok, _} = rabbitmq_amqp_client:delete_queue(LinkPair, QName), + ok = rabbitmq_amqp_client:detach_management_link_pair_sync(LinkPair), + ok = amqp10_client:end_session(Session), + ok = amqp10_client:close_connection(Connection1). + +%% Test that routing on specific event properties works. +headers_exchange(Config) -> + XName = <<"my headers exchange">>, + QName = atom_to_binary(?FUNCTION_NAME), + Address = rabbitmq_amqp_address:queue(QName), + OpnConf = amqp_connection_config(Config), + {Connection, Session, LinkPair} = amqp_init(Config), + + ok = rabbitmq_amqp_client:declare_exchange(LinkPair, XName, #{type => <<"headers">>}), + {ok, _} = rabbitmq_amqp_client:declare_queue(LinkPair, QName, #{}), + ok = rabbitmq_amqp_client:bind_queue( + LinkPair, QName, XName, <<>>, + #{<<"x-opt-container-id">> => {utf8, <<"client-2">>}, + <<"x-match">> => {utf8, <<"any-with-x">>}}), + ok = rabbitmq_amqp_client:bind_exchange( + LinkPair, XName, <<"amq.rabbitmq.event">>, <<"connection.created">>, #{}), + {ok, Receiver} = amqp10_client:attach_receiver_link( + Session, <<"receiver">>, Address, settled), + + %% Open two connections. + OpnConf1 = maps:update(container_id, <<"client-1">>, OpnConf), + {ok, Connection1} = amqp10_client:open_connection(OpnConf1), + receive {amqp10_event, {connection, Connection1, opened}} -> ok + after 5000 -> ct:fail({missing_event, ?LINE}) + end, + OpnConf2 = maps:update(container_id, <<"client-2">>, OpnConf), + {ok, Connection2} = amqp10_client:open_connection(OpnConf2), + receive {amqp10_event, {connection, Connection2, opened}} -> ok + after 5000 -> ct:fail({missing_event, ?LINE}) + end, + + %% Thanks to routing via headers exchange on event property + %% x-opt-container-id = client-2 + %% we should only receive the second connection.created event. + ok = amqp10_client:flow_link_credit(Receiver, 2, never, true), + receive {amqp10_msg, Receiver, Msg} -> + ?assertMatch(#{<<"x-routing-key">> := <<"connection.created">>, + <<"x-opt-container-id">> := <<"client-2">>}, + amqp10_msg:message_annotations(Msg)) + after 5000 -> ct:fail({missing_msg, ?LINE}) + end, + receive {amqp10_event, {link, Receiver, credit_exhausted}} -> ok + after 5000 -> ct:fail({missing_event, ?LINE}) + end, + + ok = amqp10_client:close_connection(Connection1), + ok = amqp10_client:close_connection(Connection2), + {ok, _} = rabbitmq_amqp_client:delete_queue(LinkPair, QName), + ok = rabbitmq_amqp_client:delete_exchange(LinkPair, XName), + ok = rabbitmq_amqp_client:detach_management_link_pair_sync(LinkPair), + ok = amqp10_client:end_session(Session), + ok = amqp10_client:close_connection(Connection). + %% ------------------------------------------------------------------- %% Helpers %% ------------------------------------------------------------------- +user_key(Config) -> + case group_name(Config) of + amqp_0_9_1 -> + <<"user_who_performed_action">>; + amqp_1_0 -> + <<"x-opt-user-who-performed-action">> + end. + +group_name(Config) -> + GroupProps = proplists:get_value(tc_group_properties, Config), + proplists:get_value(name, GroupProps). + declare_event_queue(Config, RoutingKey) -> Ch = rabbit_ct_client_helpers:open_channel(Config, 0), #'queue.declare_ok'{queue = Q} = @@ -516,17 +691,24 @@ declare_event_queue(Config, RoutingKey) -> end, Ch. -receive_user_in_event(Event, User) -> - receive_event(Event, ?TAG, {longstr, User}). +receive_user_in_event(Event, Config) -> + User = proplists:get_value(rmq_username, Config), + receive_user_in_event(Event, User, Config). + +receive_user_in_event(Event, User, Config) -> + Key = user_key(Config), + Value = {longstr, User}, + receive_event(Event, Key, Value). receive_event(Event, Key, Value) -> receive {#'basic.deliver'{routing_key = RoutingKey}, #amqp_msg{props = #'P_basic'{headers = Headers}}} -> - Event = RoutingKey, - Value = rabbit_misc:table_lookup(Headers, Key) + ?assertEqual(Event, RoutingKey), + ?assertEqual(Value, rabbit_misc:table_lookup(Headers, Key)), + Headers after - 60000 -> + 10_000 -> throw({receive_event_timeout, Event, Key, Value}) end. @@ -534,9 +716,9 @@ receive_event(Event) -> receive {#'basic.deliver'{routing_key = RoutingKey}, #amqp_msg{props = #'P_basic'{}}} -> - Event = RoutingKey + ?assertEqual(Event, RoutingKey) after - 60000 -> + 10_000 -> throw({receive_event_timeout, Event}) end.