diff --git a/deps/rabbit/src/mc_amqpl.erl b/deps/rabbit/src/mc_amqpl.erl index 936a1b130d89..cac190e2cb5e 100644 --- a/deps/rabbit/src/mc_amqpl.erl +++ b/deps/rabbit/src/mc_amqpl.erl @@ -43,7 +43,6 @@ -define(AMQP10_FOOTER, <<"x-amqp-1.0-footer">>). -define(PROTOMOD, rabbit_framing_amqp_0_9_1). -define(CLASS_ID, 60). --define(LONGSTR_UTF8_LIMIT, 4096). -opaque state() :: #content{}. @@ -682,19 +681,13 @@ wrap(_Type, undefined) -> wrap(Type, Val) -> {Type, Val}. -from_091(longstr, V) - when is_binary(V) andalso - byte_size(V) =< ?LONGSTR_UTF8_LIMIT -> - %% if a longstr is longer than 4096 bytes we just assume it is binary - %% it _may_ still be valid utf8 but checking this for every longstr header - %% value is going to be excessively slow - case mc_util:is_utf8_no_null(V) of +from_091(longstr, V) -> + case mc_util:is_utf8_no_null_limited(V) of true -> {utf8, V}; false -> {binary, V} end; -from_091(longstr, V) -> {binary, V}; from_091(long, V) -> {long, V}; from_091(unsignedbyte, V) -> {ubyte, V}; from_091(short, V) -> {short, V}; diff --git a/deps/rabbit/src/mc_util.erl b/deps/rabbit/src/mc_util.erl index 9ec7928de9b7..d19f17e7d92b 100644 --- a/deps/rabbit/src/mc_util.erl +++ b/deps/rabbit/src/mc_util.erl @@ -3,6 +3,7 @@ -include("mc.hrl"). -export([is_valid_shortstr/1, + is_utf8_no_null_limited/1, is_utf8_no_null/1, uuid_to_urn_string/1, urn_string_to_uuid/1, @@ -12,12 +13,24 @@ is_x_header/1 ]). +-define(UTF8_SCAN_LIMIT, 4096). + -spec is_valid_shortstr(term()) -> boolean(). is_valid_shortstr(Bin) when ?IS_SHORTSTR_LEN(Bin) -> is_utf8_no_null(Bin); is_valid_shortstr(_) -> false. +-spec is_utf8_no_null_limited(term()) -> boolean(). +is_utf8_no_null_limited(Bin) + when byte_size(Bin) =< ?UTF8_SCAN_LIMIT -> + is_utf8_no_null(Bin); +is_utf8_no_null_limited(_Term) -> + %% If longer than 4096 bytes, just assume it's not UTF-8. + %% It _may_ still be valid UTF-8 but checking this + %% on the hot path is going to be excessively slow. + false. + -spec is_utf8_no_null(term()) -> boolean(). is_utf8_no_null(Term) -> utf8_scan(Term, fun (C) -> C > 0 end). diff --git a/deps/rabbit/test/mc_unit_SUITE.erl b/deps/rabbit/test/mc_unit_SUITE.erl index f8d10462e629..1949763c5c76 100644 --- a/deps/rabbit/test/mc_unit_SUITE.erl +++ b/deps/rabbit/test/mc_unit_SUITE.erl @@ -313,34 +313,37 @@ amqpl_amqp_bin_amqpl(_Config) -> %% incoming amqpl converted to amqp, serialized / deserialized then converted %% back to amqpl. %% simulates a legacy message published then consumed to a stream - Props = #'P_basic'{content_type = <<"text/plain">>, - content_encoding = <<"gzip">>, - headers = [{<<"a-stream-offset">>, long, 99}, - {<<"a-string">>, longstr, <<"a string">>}, - {<<"a-bool">>, bool, false}, - {<<"a-unsignedbyte">>, unsignedbyte, 1}, - {<<"a-unsignedshort">>, unsignedshort, 1}, - {<<"a-unsignedint">>, unsignedint, 1}, - {<<"a-signedint">>, signedint, 1}, - {<<"a-timestamp">>, timestamp, 1}, - {<<"a-double">>, double, 1.0}, - {<<"a-float">>, float, 1.0}, - {<<"a-void">>, void, undefined}, - {<<"a-binary">>, binary, <<"data">>}, - {<<"a-array">>, array, [{long, 1}, {long, 2}]}, - {<<"x-stream-filter">>, longstr, <<"apple">>} - ], - delivery_mode = 2, - priority = 98, - correlation_id = <<"corr">> , - reply_to = <<"reply-to">>, - expiration = <<"1">>, - message_id = <<"msg-id">>, - timestamp = 99, - type = <<"45">>, - user_id = <<"banana">>, - app_id = <<"rmq">> - }, + String5k = binary:copy(<<"x">>, 5000), + Props = #'P_basic'{ + content_type = <<"text/plain">>, + content_encoding = <<"gzip">>, + headers = [{<<"a-stream-offset">>, long, 99}, + {<<"a-string">>, longstr, <<"a string">>}, + {<<"a-very-long-string">>, longstr, String5k}, + {<<"a-bool">>, bool, false}, + {<<"a-unsignedbyte">>, unsignedbyte, 1}, + {<<"a-unsignedshort">>, unsignedshort, 1}, + {<<"a-unsignedint">>, unsignedint, 1}, + {<<"a-signedint">>, signedint, 1}, + {<<"a-timestamp">>, timestamp, 1}, + {<<"a-double">>, double, 1.0}, + {<<"a-float">>, float, 1.0}, + {<<"a-void">>, void, undefined}, + {<<"a-binary">>, binary, <<"data">>}, + {<<"a-array">>, array, [{long, 1}, {long, 2}]}, + {<<"x-stream-filter">>, longstr, <<"apple">>} + ], + delivery_mode = 2, + priority = 98, + correlation_id = <<"corr">> , + reply_to = <<"reply-to">>, + expiration = <<"1">>, + message_id = <<"msg-id">>, + timestamp = 99, + type = <<"45">>, + user_id = <<"banana">>, + app_id = <<"rmq">> + }, Content = #content{properties = Props, payload_fragments_rev = [<<"data">>]}, Msg = mc:init(mc_amqpl, Content, annotations()), @@ -404,6 +407,9 @@ amqpl_amqp_bin_amqpl(_Config) -> ?assertEqual({long, 99}, Get(<<"a-stream-offset">>, AP10)), ?assertEqual({utf8, <<"a string">>}, Get(<<"a-string">>, AP10)), + %% We expect that a very long string is not scanned for valid UTF-8 + %% and instead directly turned into a binary. + ?assertEqual({binary, String5k}, Get(<<"a-very-long-string">>, AP10)), ?assertEqual(false, Get(<<"a-bool">>, AP10)), ?assertEqual({ubyte, 1}, Get(<<"a-unsignedbyte">>, AP10)), ?assertEqual({ushort, 1}, Get(<<"a-unsignedshort">>, AP10)), diff --git a/deps/rabbitmq_event_exchange/Makefile b/deps/rabbitmq_event_exchange/Makefile index fdac1be67e6e..72d6367dd744 100644 --- a/deps/rabbitmq_event_exchange/Makefile +++ b/deps/rabbitmq_event_exchange/Makefile @@ -1,6 +1,12 @@ PROJECT = rabbitmq_event_exchange PROJECT_DESCRIPTION = Event Exchange Type +define PROJECT_ENV + [ + {protocol, amqp_0_9_1} + ] +endef + define PROJECT_APP_EXTRA_KEYS {broker_version_requirements, []} endef diff --git a/deps/rabbitmq_event_exchange/README.md b/deps/rabbitmq_event_exchange/README.md index 1380a4d30f72..4f2aab35e699 100644 --- a/deps/rabbitmq_event_exchange/README.md +++ b/deps/rabbitmq_event_exchange/README.md @@ -1,154 +1,7 @@ # RabbitMQ Event Exchange -## Overview - -This plugin exposes the internal RabbitMQ event mechanism as messages that clients -can consume. It's useful -if you want to keep track of certain events, e.g. when queues, exchanges, bindings, users, -connections, channels are created and deleted. This plugin filters out stats -events, so you are almost certainly going to get better results using -the management plugin for stats. - -## How it Works - -It declares a topic exchange called `amq.rabbitmq.event` **in the default -virtual host**. All events are published to this exchange with routing -keys like 'exchange.created', 'binding.deleted' etc, so you can -subscribe to only the events you're interested in. - -The exchange behaves similarly to 'amq.rabbitmq.log': everything gets -published there; if you don't trust a user with the information that -gets published, don't allow them access. - - -## Installation - -This plugin ships with RabbitMQ. Like with all other plugins, it must be -enabled before it can be used: - -```bash -[sudo] rabbitmq-plugins enable rabbitmq_event_exchange -``` - -## Event format - -Each event has various properties associated with it. These are -translated into AMQP 0-9-1 data encoding and inserted in the message headers. The -**message body is always blank**. - -## Events - -So far RabbitMQ and related plugins emit events with the following routing keys: - -### RabbitMQ Broker - -Queue, Exchange and Binding events: - - * `queue.deleted` - * `queue.created` - * `exchange.created` - * `exchange.deleted` - * `binding.created` - * `binding.deleted` - -Connection and Channel events: - - * `connection.created` - * `connection.closed` - * `channel.created` - * `channel.closed` - -Consumer events: - - * `consumer.created` - * `consumer.deleted` - -Policy and Parameter events: - - * `policy.set` - * `policy.cleared` - * `parameter.set` - * `parameter.cleared` - -Virtual host events: - - * `vhost.created` - * `vhost.deleted` - * `vhost.limits.set` - * `vhost.limits.cleared` - -User related events: - - * `user.authentication.success` - * `user.authentication.failure` - * `user.created` - * `user.deleted` - * `user.password.changed` - * `user.password.cleared` - * `user.tags.set` - -Permission events: - - * `permission.created` - * `permission.deleted` - * `topic.permission.created` - * `topic.permission.deleted` - -Alarm events: - - * `alarm.set` - * `alarm.cleared` - -### Shovel Plugin - -Worker events: - - * `shovel.worker.status` - * `shovel.worker.removed` - -### Federation Plugin - -Link events: - - * `federation.link.status` - * `federation.link.removed` - -## Example - -There is a usage example using the Java client in `examples/java`. - - -## Configuration - - * `rabbitmq_event_exchange.vhost`: what vhost should the `amq.rabbitmq.event` exchange be declared in. Default: `rabbit.default_vhost` (`<<"/">>`). - - -## Uninstalling - -If you want to remove the exchange which this plugin creates, first -disable the plugin and restart the broker. Then you can delete the exchange, -e.g. with : - - rabbitmqctl eval 'rabbit_exchange:delete(rabbit_misc:r(<<"/">>, exchange, <<"amq.rabbitmq.event">>), false, <<"username">>).' - - -## Building from Source - -Building is no different from [building other RabbitMQ plugins](https://www.rabbitmq.com/plugin-development.html). - -TL;DR: - - git clone https://github.com.com/rabbitmq/rabbitmq-public-umbrella.git umbrella - cd umbrella - make co - make up BRANCH=stable - cd deps - git clone https://github.com/rabbitmq/rabbitmq-event-exchange.git rabbitmq_event_exchange - cd rabbitmq_event_exchange - make dist - +See the [website](https://www.rabbitmq.com/docs/event-exchange) for documentation. ## License -Released under the Mozilla Public License 2.0, -the same as RabbitMQ. +Released under the Mozilla Public License 2.0, the same as RabbitMQ. diff --git a/deps/rabbitmq_event_exchange/priv/schema/rabbitmq_event_exchange.schema b/deps/rabbitmq_event_exchange/priv/schema/rabbitmq_event_exchange.schema index c8b2efe5acdd..62de27e820c7 100644 --- a/deps/rabbitmq_event_exchange/priv/schema/rabbitmq_event_exchange.schema +++ b/deps/rabbitmq_event_exchange/priv/schema/rabbitmq_event_exchange.schema @@ -5,3 +5,7 @@ fun(Conf) -> list_to_binary(cuttlefish:conf_get("event_exchange.vhost", Conf)) end}. + +{mapping, "event_exchange.protocol", "rabbitmq_event_exchange.protocol", [ + {datatype, {enum, [amqp_0_9_1, amqp_1_0]}} +]}. diff --git a/deps/rabbitmq_event_exchange/src/rabbit_exchange_type_event.erl b/deps/rabbitmq_event_exchange/src/rabbit_exchange_type_event.erl index 70251406b20c..b79508b8b8d0 100644 --- a/deps/rabbitmq_event_exchange/src/rabbit_exchange_type_event.erl +++ b/deps/rabbitmq_event_exchange/src/rabbit_exchange_type_event.erl @@ -11,6 +11,8 @@ -include_lib("rabbit_common/include/rabbit.hrl"). -include_lib("rabbit_common/include/rabbit_framing.hrl"). +-include_lib("amqp10_common/include/amqp10_framing.hrl"). +-include_lib("rabbit/include/mc.hrl"). -include("rabbit_event_exchange.hrl"). -export([register/0, unregister/0]). @@ -20,8 +22,11 @@ -export([fmt_proplist/1]). %% testing --record(state, {vhost, - has_any_bindings +-define(APP_NAME, rabbitmq_event_exchange). + +-record(state, {protocol :: amqp_0_9_1 | amqp_1_0, + vhost :: rabbit_types:vhost(), + has_any_bindings :: boolean() }). -rabbit_boot_step({rabbit_event_exchange, @@ -65,41 +70,35 @@ exchange(VHost) -> %%---------------------------------------------------------------------------- init([]) -> + {ok, Protocol} = application:get_env(?APP_NAME, protocol), VHost = get_vhost(), X = rabbit_misc:r(VHost, exchange, ?EXCH_NAME), HasBindings = case rabbit_binding:list_for_source(X) of - [] -> false; - _ -> true - end, - {ok, #state{vhost = VHost, + [] -> false; + _ -> true + end, + {ok, #state{protocol = Protocol, + vhost = VHost, has_any_bindings = HasBindings}}. handle_call(_Request, State) -> {ok, not_understood, State}. -handle_event(_, #state{has_any_bindings = false} = State) -> - {ok, State}; -handle_event(#event{type = Type, - props = Props, - timestamp = TS, - reference = none}, #state{vhost = VHost} = State) -> - _ = case key(Type) of - ignore -> ok; - Key -> - Props2 = [{<<"timestamp_in_ms">>, TS} | Props], - PBasic = #'P_basic'{delivery_mode = 2, - headers = fmt_proplist(Props2), - %% 0-9-1 says the timestamp is a - %% "64 bit POSIX - %% timestamp". That's second - %% resolution, not millisecond. - timestamp = erlang:convert_time_unit( - TS, milli_seconds, seconds)}, - Content = rabbit_basic:build_content(PBasic, <<>>), - XName = exchange(VHost), - {ok, Msg} = mc_amqpl:message(XName, Key, Content), - rabbit_queue_type:publish_at_most_once(XName, Msg) - end, - {ok, State}; +handle_event(#event{type = Type, + props = Props, + reference = none, + timestamp = Timestamp}, + #state{protocol = Protocol, + vhost = VHost, + has_any_bindings = true} = State) -> + case key(Type) of + ignore -> + {ok, State}; + Key -> + XName = exchange(VHost), + Mc = mc_init(Protocol, XName, Key, Props, Timestamp), + _ = rabbit_queue_type:publish_at_most_once(XName, Mc), + {ok, State} + end; handle_event(_Event, State) -> {ok, State}. @@ -207,9 +206,109 @@ key(S) -> Tokens -> list_to_binary(string:join(Tokens, ".")) end. +get_vhost() -> + case application:get_env(?APP_NAME, vhost) of + undefined -> + {ok, V} = application:get_env(rabbit, default_vhost), + V; + {ok, V} -> + V + end. + +mc_init(amqp_1_0, #resource{name = XNameBin}, Key, Props, Timestamp) -> + Sections = [#'v1_0.message_annotations'{content = props_to_message_annotations(Props)}, + #'v1_0.properties'{creation_time = {timestamp, Timestamp}}, + #'v1_0.data'{content = <<>>}], + Payload = iolist_to_binary([amqp10_framing:encode_bin(S) || S <- Sections]), + Anns = #{?ANN_EXCHANGE => XNameBin, + ?ANN_ROUTING_KEYS => [Key]}, + mc:init(mc_amqp, Payload, Anns); +mc_init(amqp_0_9_1, XName, Key, Props0, TimestampMillis) -> + Props = [{<<"timestamp_in_ms">>, TimestampMillis} | Props0], + Headers = fmt_proplist(Props), + TimestampSecs = erlang:convert_time_unit(TimestampMillis, millisecond, second), + PBasic = #'P_basic'{delivery_mode = 2, + headers = Headers, + timestamp = TimestampSecs}, + Content = rabbit_basic:build_content(PBasic, <<>>), + {ok, Mc} = mc_amqpl:message(XName, Key, Content), + Mc. + +props_to_message_annotations(Props) -> + KVList = lists:foldl( + fun({K, #resource{virtual_host = Vhost, name = Name}}, Acc) -> + Ann0 = {to_message_annotation_key(K), {utf8, Name}}, + Ann1 = {{symbol, <<"x-opt-vhost">>}, {utf8, Vhost}}, + [Ann0, Ann1 | Acc]; + ({K, V}, Acc) -> + Ann = {to_message_annotation_key(K), + to_message_annotation_val(V)}, + [Ann | Acc] + end, [], Props), + lists:reverse(KVList). + +to_message_annotation_key(Key) -> + Key1 = to_binary(Key), + Pattern = try persistent_term:get(cp_underscore) + catch error:badarg -> + Cp = binary:compile_pattern(<<"_">>), + ok = persistent_term:put(cp_underscore, Cp), + Cp + end, + Key2 = binary:replace(Key1, Pattern, <<"-">>, [global]), + Key3 = case Key2 of + <<"x-", _/binary>> -> + Key2; + _ -> + <<"x-opt-", Key2/binary>> + end, + {symbol, Key3}. + +to_message_annotation_val(V) + when is_boolean(V) -> + {boolean, V}; +to_message_annotation_val(V) + when is_atom(V) -> + {utf8, atom_to_binary(V, utf8)}; +to_message_annotation_val(V) + when is_binary(V) -> + case mc_util:is_utf8_no_null_limited(V) of + true -> + {utf8, V}; + false -> + {binary, V} + end; +to_message_annotation_val(V) + when is_integer(V) -> + {long, V}; +to_message_annotation_val(V) + when is_number(V) -> + %% AMQP double and Erlang float are both 64-bit. + {double, V}; +to_message_annotation_val(V) + when is_pid(V) -> + {utf8, to_pid(V)}; +to_message_annotation_val([{Key, _} | _] = Proplist) + when is_atom(Key) orelse + is_binary(Key) -> + {map, lists:map(fun({K, V}) -> + {{utf8, to_binary(K)}, + to_message_annotation_val(V)} + end, Proplist)}; +to_message_annotation_val([{Key, Type, _Value} | _] = Table) + when is_binary(Key) andalso + is_atom(Type) -> + %% Looks like an AMQP 0.9.1 table + mc_amqpl:from_091(table, Table); +to_message_annotation_val(V) + when is_list(V) -> + {list, [to_message_annotation_val(Val) || Val <- V]}; +to_message_annotation_val(V) -> + {utf8, fmt_other(V)}. + fmt_proplist(Props) -> lists:foldl(fun({K, V}, Acc) -> - case fmt(a2b(K), V) of + case fmt(to_binary(K), V) of L when is_list(L) -> lists:append(L, Acc); T -> [T | Acc] end @@ -226,11 +325,8 @@ fmt(K, V) when is_number(V) -> {K, float, V}; fmt(K, V) when is_binary(V) -> {K, longstr, V}; fmt(K, [{_, _}|_] = Vs) -> {K, table, fmt_proplist(Vs)}; fmt(K, Vs) when is_list(Vs) -> {K, array, [fmt(V) || V <- Vs]}; -fmt(K, V) when is_pid(V) -> {K, longstr, - list_to_binary(rabbit_misc:pid_to_string(V))}; -fmt(K, V) -> {K, longstr, - list_to_binary( - rabbit_misc:format("~1000000000p", [V]))}. +fmt(K, V) when is_pid(V) -> {K, longstr, to_pid(V)}; +fmt(K, V) -> {K, longstr, fmt_other(V)}. %% Exactly the same as fmt/2, duplicated only for performance issues fmt(true) -> {bool, true}; @@ -241,20 +337,16 @@ fmt(V) when is_number(V) -> {float, V}; fmt(V) when is_binary(V) -> {longstr, V}; fmt([{_, _}|_] = Vs) -> {table, fmt_proplist(Vs)}; fmt(Vs) when is_list(Vs) -> {array, [fmt(V) || V <- Vs]}; -fmt(V) when is_pid(V) -> {longstr, - list_to_binary(rabbit_misc:pid_to_string(V))}; -fmt(V) -> {longstr, - list_to_binary( - rabbit_misc:format("~1000000000p", [V]))}. +fmt(V) when is_pid(V) -> {longstr, to_pid(V)}; +fmt(V) -> {longstr, fmt_other(V)}. -a2b(A) when is_atom(A) -> atom_to_binary(A, utf8); -a2b(B) when is_binary(B) -> B. +fmt_other(V) -> + list_to_binary(rabbit_misc:format("~1000000000p", [V])). -get_vhost() -> - case application:get_env(rabbitmq_event_exchange, vhost) of - undefined -> - {ok, V} = application:get_env(rabbit, default_vhost), - V; - {ok, V} -> - V - end. +to_binary(Val) when is_atom(Val) -> + atom_to_binary(Val); +to_binary(Val) when is_binary(Val) -> + Val. + +to_pid(Val) -> + list_to_binary(rabbit_misc:pid_to_string(Val)). diff --git a/deps/rabbitmq_event_exchange/test/config_schema_SUITE_data/rabbitmq_event_exchange.snippets b/deps/rabbitmq_event_exchange/test/config_schema_SUITE_data/rabbitmq_event_exchange.snippets index 2fceed017a96..70eb722731b9 100644 --- a/deps/rabbitmq_event_exchange/test/config_schema_SUITE_data/rabbitmq_event_exchange.snippets +++ b/deps/rabbitmq_event_exchange/test/config_schema_SUITE_data/rabbitmq_event_exchange.snippets @@ -1,19 +1,34 @@ [ - {virtual_host1, - "event_exchange.vhost = /", - [ - {rabbitmq_event_exchange, [ - {vhost, <<"/">>} - ]} - ], [rabbitmq_event_exchange] - }, +{virtual_host1, + "event_exchange.vhost = /", + [{rabbitmq_event_exchange, [ + {vhost, <<"/">>} + ]}], + [rabbitmq_event_exchange] +}, - {virtual_host2, - "event_exchange.vhost = dev", - [ - {rabbitmq_event_exchange, [ - {vhost, <<"dev">>} - ]} - ], [rabbitmq_event_exchange] - } +{virtual_host2, + "event_exchange.vhost = dev", + [{rabbitmq_event_exchange, [ + {vhost, <<"dev">>} + ]} + ], + [rabbitmq_event_exchange] +}, + +{protocol_amqp, + "event_exchange.protocol = amqp_1_0", + [{rabbitmq_event_exchange, [ + {protocol, amqp_1_0} + ]}], + [rabbitmq_event_exchange] +}, + +{protocol_amqpl, + "event_exchange.protocol = amqp_0_9_1", + [{rabbitmq_event_exchange, [ + {protocol, amqp_0_9_1} + ]}], + [rabbitmq_event_exchange] +} ]. diff --git a/deps/rabbitmq_event_exchange/test/system_SUITE.erl b/deps/rabbitmq_event_exchange/test/system_SUITE.erl index 4610378131ea..07002efab805 100644 --- a/deps/rabbitmq_event_exchange/test/system_SUITE.erl +++ b/deps/rabbitmq_event_exchange/test/system_SUITE.erl @@ -13,74 +13,83 @@ -compile(export_all). --define(TAG, <<"user_who_performed_action">>). - all() -> [ - {group, amqp}, - {group, amqpl} + {group, amqp_1_0}, + {group, amqp_0_9_1} ]. groups() -> [ - {amqp, [shuffle], + {amqp_1_0, [shuffle], + shared_tests() ++ [ - amqp_connection + amqp_1_0_amqp_connection, + amqp_1_0_queue_created, + headers_exchange ]}, - {amqpl, [], + {amqp_0_9_1, [], + shared_tests() ++ [ - queue_created, - authentication, - audit_queue, - audit_exchange, - audit_exchange_internal_parameter, - audit_binding, - audit_vhost, - audit_vhost_deletion, - audit_channel, - audit_connection, - audit_direct_connection, - audit_consumer, - audit_parameter, - audit_policy, - audit_vhost_limit, - audit_user, - audit_user_password, - audit_user_tags, - audit_permission, - audit_topic_permission, - resource_alarm, + amqp_0_9_1_amqp_connection, + amqp_0_9_1_queue_created, unregister ]} ]. +shared_tests() -> + [ + authentication_success, + authentication_failure, + audit_queue, + audit_exchange, + audit_exchange_internal_parameter, + audit_binding, + audit_vhost, + audit_vhost_deletion, + audit_channel, + audit_connection, + audit_direct_connection, + audit_consumer, + audit_parameter, + audit_policy, + audit_vhost_limit, + audit_user, + audit_user_password, + audit_user_tags, + audit_permission, + audit_topic_permission, + resource_alarm + ]. + %% ------------------------------------------------------------------- %% Testsuite setup/teardown. %% ------------------------------------------------------------------- init_per_suite(Config) -> + {ok, _} = application:ensure_all_started(rabbitmq_amqp_client), rabbit_ct_helpers:log_environment(), - Config1 = rabbit_ct_helpers:set_config(Config, [ - {rmq_nodename_suffix, ?MODULE} - ]), - Config2 = rabbit_ct_helpers:run_setup_steps(Config1, - rabbit_ct_broker_helpers:setup_steps() ++ - rabbit_ct_client_helpers:setup_steps()), - Config2. + Config. end_per_suite(Config) -> - rabbit_ct_helpers:run_teardown_steps(Config, - rabbit_ct_client_helpers:teardown_steps() ++ - rabbit_ct_broker_helpers:teardown_steps()). - -init_per_group(amqp, Config) -> - {ok, _} = application:ensure_all_started(rabbitmq_amqp_client), - Config; -init_per_group(_, Config) -> Config. -end_per_group(_, Config) -> - Config. +init_per_group(Group, Config) -> + Config1 = rabbit_ct_helpers:merge_app_env( + Config, + {rabbitmq_event_exchange, [{protocol, Group}]}), + Config2 = rabbit_ct_helpers:set_config( + Config1, [{rmq_nodename_suffix, ?MODULE}]), + rabbit_ct_helpers:run_setup_steps( + Config2, + rabbit_ct_broker_helpers:setup_steps() ++ + rabbit_ct_client_helpers:setup_steps()). + +end_per_group(_Group, Config) -> + rabbit_ct_helpers:run_teardown_steps( + Config, + rabbit_ct_client_helpers:teardown_steps() ++ + rabbit_ct_broker_helpers:teardown_steps()). init_per_testcase(Testcase, Config) -> rabbit_ct_helpers:testcase_started(Config, Testcase). @@ -88,34 +97,52 @@ init_per_testcase(Testcase, Config) -> end_per_testcase(Testcase, Config) -> rabbit_ct_helpers:testcase_finished(Config, Testcase). - %% ------------------------------------------------------------------- %% Testsuite cases %% ------------------------------------------------------------------- -%% Only really tests that we're not completely broken. -queue_created(Config) -> - Now = os:system_time(seconds), - - Ch = declare_event_queue(Config, <<"queue.*">>), +amqp_1_0_queue_created(Config) -> + QName = atom_to_binary(?FUNCTION_NAME), + Headers = queue_created(QName, Config), + ?assertEqual({longstr, QName}, + rabbit_misc:table_lookup(Headers, <<"x-opt-name">>)), + ?assertEqual({table, [{<<"x-queue-type">>, longstr, <<"classic">>}]}, + rabbit_misc:table_lookup(Headers, <<"x-opt-arguments">>)). - #'queue.declare_ok'{queue = Q2} = - amqp_channel:call(Ch, #'queue.declare'{exclusive = true}), +amqp_0_9_1_queue_created(Config) -> + QName = atom_to_binary(?FUNCTION_NAME), + Headers = queue_created(QName,Config), + ?assertEqual({longstr, QName}, + rabbit_misc:table_lookup(Headers, <<"name">>)), + {array, QArgs} = rabbit_misc:table_lookup(Headers, <<"arguments">>), + %% Ideally, instead of a longstr containing the formatted Erlang term, + %% we should expect a table. + ?assertEqual(<<"{<<\"x-queue-type\">>,longstr,<<\"classic\">>}">>, + proplists:get_value(longstr, QArgs)). + +queue_created(QName, Config) -> + Ch = declare_event_queue(Config, <<"queue.created">>), + + Now = os:system_time(second), + #'queue.declare_ok'{} = amqp_channel:call( + Ch, #'queue.declare'{ + queue = QName, + exclusive = true, + arguments = [{<<"x-queue-type">>, longstr, <<"classic">>}] + }), receive {#'basic.deliver'{routing_key = Key}, - #amqp_msg{props = #'P_basic'{headers = Headers, timestamp = TS}}} -> + #amqp_msg{props = #'P_basic'{headers = Headers, + timestamp = TS}}} -> %% timestamp is within the last 5 seconds - true = ((TS - Now) =< 5), - <<"queue.created">> = Key, - {longstr, Q2} = rabbit_misc:table_lookup(Headers, <<"name">>) - end, - - rabbit_ct_client_helpers:close_channel(Ch), - ok. - + ?assert(((TS - Now) =< 5)), + ?assertEqual(<<"queue.created">>, Key), + rabbit_ct_client_helpers:close_channel(Ch), + Headers + end. -authentication(Config) -> +authentication_success(Config) -> Ch = declare_event_queue(Config, <<"user.#">>), Conn2 = rabbit_ct_client_helpers:open_unmanaged_connection(Config, 0), @@ -123,14 +150,41 @@ authentication(Config) -> {#'basic.deliver'{routing_key = Key}, #amqp_msg{props = #'P_basic'{headers = Headers}}} -> <<"user.authentication.success">> = Key, - undefined = rabbit_misc:table_lookup(Headers, <<"vhost">>), - {longstr, _PeerHost} = rabbit_misc:table_lookup(Headers, <<"peer_host">>), - {bool, false} = rabbit_misc:table_lookup(Headers, <<"ssl">>) + {Vhost, PeerHost, Ssl} = + case group_name(Config) of + amqp_0_9_1 -> + {<<"vhost">>, <<"peer_host">>, <<"ssl">>}; + amqp_1_0 -> + {<<"x-opt-vhost">>, <<"x-opt-peer-host">>, <<"x-opt-ssl">>} + end, + undefined = rabbit_misc:table_lookup(Headers, Vhost), + {longstr, _PeerHost} = rabbit_misc:table_lookup(Headers, PeerHost), + {bool, false} = rabbit_misc:table_lookup(Headers, Ssl) + after 5000 -> missing_deliver end, - amqp_connection:close(Conn2), - rabbit_ct_client_helpers:close_channel(Ch), - ok. + ok = amqp_connection:close(Conn2), + ok = rabbit_ct_client_helpers:close_channel(Ch). + +authentication_failure(Config) -> + Ch = declare_event_queue(Config, <<"user.authentication.*">>), + {error, _} = rabbit_ct_client_helpers:open_unmanaged_connection( + Config, 0, <<"fake user">>, <<"fake password">>), + + receive + {#'basic.deliver'{routing_key = Key}, + #amqp_msg{props = #'P_basic'{headers = Headers}}} -> + ?assertEqual(<<"user.authentication.failure">>, Key), + User = case group_name(Config) of + amqp_0_9_1 -> <<"name">>; + amqp_1_0 -> <<"x-opt-name">> + end, + ?assertEqual({longstr, <<"fake user">>}, + rabbit_misc:table_lookup(Headers, User)) + after 5000 -> missing_deliver + end, + + ok = rabbit_ct_client_helpers:close_channel(Ch). audit_queue(Config) -> Ch = declare_event_queue(Config, <<"queue.*">>), @@ -138,13 +192,12 @@ audit_queue(Config) -> #'queue.declare_ok'{queue = Q} = amqp_channel:call(Ch, #'queue.declare'{exclusive = true}), - User = proplists:get_value(rmq_username, Config), - receive_user_in_event(<<"queue.created">>, User), + receive_user_in_event(<<"queue.created">>, Config), #'queue.delete_ok'{} = amqp_channel:call(Ch, #'queue.delete'{queue = Q}), - receive_user_in_event(<<"queue.deleted">>, User), + receive_user_in_event(<<"queue.deleted">>, Config), rabbit_ct_client_helpers:close_channel(Ch), ok. @@ -157,13 +210,12 @@ audit_exchange(Config) -> amqp_channel:call(Ch, #'exchange.declare'{exchange = X, type = <<"topic">>}), - User = proplists:get_value(rmq_username, Config), - receive_user_in_event(<<"exchange.created">>, User), + receive_user_in_event(<<"exchange.created">>, Config), #'exchange.delete_ok'{} = amqp_channel:call(Ch, #'exchange.delete'{exchange = X}), - receive_user_in_event(<<"exchange.deleted">>, User), + receive_user_in_event(<<"exchange.deleted">>, Config), rabbit_ct_client_helpers:close_channel(Ch), ok. @@ -171,8 +223,7 @@ audit_exchange(Config) -> audit_binding(Config) -> Ch = declare_event_queue(Config, <<"binding.*">>), %% The binding to the event exchange itself is the first queued event - User = proplists:get_value(rmq_username, Config), - receive_user_in_event(<<"binding.created">>, User), + receive_user_in_event(<<"binding.created">>, Config), #'queue.declare_ok'{queue = Q} = amqp_channel:call(Ch, #'queue.declare'{exclusive = true}), @@ -181,26 +232,34 @@ audit_binding(Config) -> amqp_channel:call(Ch, #'queue.bind'{queue = Q, exchange = <<"amq.direct">>, routing_key = <<"test">>}), - receive_user_in_event(<<"binding.created">>, User), + receive_user_in_event(<<"binding.created">>, Config), #'queue.unbind_ok'{} = amqp_channel:call(Ch, #'queue.unbind'{queue = Q, exchange = <<"amq.direct">>, routing_key = <<"test">>}), - receive_user_in_event(<<"binding.deleted">>, User), + receive_user_in_event(<<"binding.deleted">>, Config), rabbit_ct_client_helpers:close_channel(Ch), ok. audit_vhost(Config) -> + Node = atom_to_binary(rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename)), Ch = declare_event_queue(Config, <<"vhost.*">>), User = <<"Bugs Bunny">>, rabbit_ct_broker_helpers:add_vhost(Config, 0, <<"test-vhost">>, User), - receive_user_in_event(<<"vhost.created">>, User), + Headers = receive_user_in_event(<<"vhost.created">>, User, Config), + + Key = case group_name(Config) of + amqp_0_9_1 -> <<"cluster_state">>; + amqp_1_0 -> <<"x-opt-cluster-state">> + end, + ?assertEqual({table, [{Node, longstr, <<"running">>}]}, + rabbit_misc:table_lookup(Headers, Key)), rabbit_ct_broker_helpers:delete_vhost(Config, 0, <<"test-vhost">>, User), - receive_user_in_event(<<"vhost.deleted">>, User), + receive_user_in_event(<<"vhost.deleted">>, User, Config), rabbit_ct_client_helpers:close_channel(Ch), ok. @@ -218,72 +277,81 @@ audit_vhost_deletion(Config) -> %% The user that creates the queue is the connection one, not the vhost creator #'queue.declare_ok'{queue = _Q} = amqp_channel:call(Ch2, #'queue.declare'{}), - receive_user_in_event(<<"queue.created">>, ConnUser), + receive_user_in_event(<<"queue.created">>, ConnUser, Config), ok = rabbit_ct_client_helpers:close_connection_and_channel(Conn, Ch2), %% Validate that the user deleting the queue is the one used to delete the vhost, %% not the original user that created the queue (the connection one) rabbit_ct_broker_helpers:delete_vhost(Config, 0, Vhost, User), - receive_user_in_event(<<"queue.deleted">>, User), + receive_user_in_event(<<"queue.deleted">>, User, Config), rabbit_ct_client_helpers:close_channel(Ch), ok. audit_channel(Config) -> Ch = declare_event_queue(Config, <<"channel.*">>), - User = proplists:get_value(rmq_username, Config), Conn = rabbit_ct_client_helpers:open_unmanaged_connection(Config), {ok, Ch2} = amqp_connection:open_channel(Conn), - receive_user_in_event(<<"channel.created">>, User), + receive_user_in_event(<<"channel.created">>, Config), rabbit_ct_client_helpers:close_channel(Ch2), - receive_user_in_event(<<"channel.closed">>, User), + receive_user_in_event(<<"channel.closed">>, Config), rabbit_ct_client_helpers:close_channel(Ch), ok. audit_connection(Config) -> Ch = declare_event_queue(Config, <<"connection.*">>), - User = proplists:get_value(rmq_username, Config), Conn = rabbit_ct_client_helpers:open_unmanaged_connection(Config), - receive_user_in_event(<<"connection.created">>, User), + receive_user_in_event(<<"connection.created">>, Config), %% Username is not available in connection_close rabbit_ct_client_helpers:close_connection(Conn), - receive_event(<<"connection.closed">>, ?TAG, undefined), + Headers = receive_event(<<"connection.closed">>, user_key(Config), undefined), + case group_name(Config) of + amqp_0_9_1 -> + ?assert(lists:keymember(<<"client_properties">>, 1, Headers)); + amqp_1_0 -> + {table, ClientProps} = rabbit_misc:table_lookup(Headers, <<"x-opt-client-properties">>), + ?assertEqual({longstr, <<"Erlang">>}, + rabbit_misc:table_lookup(ClientProps, <<"platform">>)), + {table, Caps} = rabbit_misc:table_lookup(ClientProps, <<"capabilities">>), + ?assertEqual({bool, true}, + rabbit_misc:table_lookup(Caps, <<"basic.nack">>)), + ?assertEqual({bool, true}, + rabbit_misc:table_lookup(Caps, <<"connection.blocked">>)) + end, rabbit_ct_client_helpers:close_channel(Ch), ok. audit_direct_connection(Config) -> Ch = declare_event_queue(Config, <<"connection.*">>), - User = proplists:get_value(rmq_username, Config), Conn = rabbit_ct_client_helpers:open_unmanaged_connection_direct(Config), - receive_user_in_event(<<"connection.created">>, User), + receive_user_in_event(<<"connection.created">>, Config), rabbit_ct_client_helpers:close_connection(Conn), - receive_event(<<"connection.closed">>, ?TAG, undefined), + receive_event(<<"connection.closed">>, user_key(Config), undefined), rabbit_ct_client_helpers:close_channel(Ch), ok. audit_consumer(Config) -> Ch = declare_event_queue(Config, <<"consumer.*">>), - User = proplists:get_value(rmq_username, Config), - receive_user_in_event(<<"consumer.created">>, User), + receive_user_in_event(<<"consumer.created">>, Config), #'queue.declare_ok'{queue = Q} = amqp_channel:call(Ch, #'queue.declare'{exclusive = true}), amqp_channel:subscribe(Ch, #'basic.consume'{queue = Q, no_ack = true}, self()), CTag = receive #'basic.consume_ok'{consumer_tag = C} -> C end, - receive_user_in_event(<<"consumer.created">>, User), + receive_user_in_event(<<"consumer.created">>, Config), amqp_channel:call(Ch, #'basic.cancel'{consumer_tag = CTag}), - receive_user_in_event(<<"consumer.deleted">>, User), + receive_user_in_event(<<"consumer.deleted">>, Config), rabbit_ct_client_helpers:close_channel(Ch), ok. @@ -298,11 +366,10 @@ audit_exchange_internal_parameter(Config) -> #'exchange.delete_ok'{} = amqp_channel:call(Ch, #'exchange.delete'{exchange = X}), - User = proplists:get_value(rmq_username, Config), %% Exchange deletion sets and clears a runtime parameter which acts as a %% kind of lock: - receive_user_in_event(<<"parameter.set">>, User), - receive_user_in_event(<<"parameter.cleared">>, User), + receive_user_in_event(<<"parameter.set">>, Config), + receive_user_in_event(<<"parameter.cleared">>, Config), rabbit_ct_client_helpers:close_channel(Ch), ok. @@ -315,11 +382,11 @@ audit_parameter(Config) -> ok = rabbit_ct_broker_helpers:set_parameter( Config, 0, VHost, <<"vhost-limits">>, <<"limits">>, [{<<"max-connections">>, 200}], User), - receive_user_in_event(<<"parameter.set">>, User), + receive_user_in_event(<<"parameter.set">>, User, Config), ok = rabbit_ct_broker_helpers:clear_parameter( Config, 0, VHost, <<"vhost-limits">>, <<"limits">>, User), - receive_user_in_event(<<"parameter.cleared">>, User), + receive_user_in_event(<<"parameter.cleared">>, User, Config), rabbit_ct_client_helpers:close_channel(Ch), ok. @@ -330,10 +397,10 @@ audit_policy(Config) -> rabbit_ct_broker_helpers:set_policy(Config, 0, <<".*">>, <<"all">>, <<"queues">>, [{<<"max-length-bytes">>, 10000}], User), - receive_user_in_event(<<"policy.set">>, User), + receive_user_in_event(<<"policy.set">>, User, Config), ok = rabbit_ct_broker_helpers:clear_policy(Config, 0, <<".*">>, User), - receive_user_in_event(<<"policy.cleared">>, User), + receive_user_in_event(<<"policy.cleared">>, User, Config), rabbit_ct_client_helpers:close_channel(Ch), ok. @@ -346,11 +413,11 @@ audit_vhost_limit(Config) -> ok = rabbit_ct_broker_helpers:set_parameter( Config, 0, VHost, <<"vhost-limits">>, <<"limits">>, [{<<"max-connections">>, 200}], User), - receive_user_in_event(<<"vhost.limits.set">>, User), + receive_user_in_event(<<"vhost.limits.set">>, User, Config), ok = rabbit_ct_broker_helpers:clear_parameter( Config, 0, VHost, <<"vhost-limits">>, <<"limits">>, User), - receive_user_in_event(<<"vhost.limits.cleared">>, User), + receive_user_in_event(<<"vhost.limits.cleared">>, User, Config), rabbit_ct_client_helpers:close_channel(Ch), ok. @@ -361,10 +428,10 @@ audit_user(Config) -> User = <<"Wabbit">>, rabbit_ct_broker_helpers:add_user(Config, 0, User, User, ActingUser), - receive_user_in_event(<<"user.created">>, ActingUser), + receive_user_in_event(<<"user.created">>, ActingUser, Config), rabbit_ct_broker_helpers:delete_user(Config, 0, User, ActingUser), - receive_user_in_event(<<"user.deleted">>, ActingUser), + receive_user_in_event(<<"user.deleted">>, ActingUser, Config), rabbit_ct_client_helpers:close_channel(Ch), ok. @@ -376,10 +443,10 @@ audit_user_password(Config) -> rabbit_ct_broker_helpers:add_user(Config, 0, User, User, ActingUser), rabbit_ct_broker_helpers:change_password(Config, 0, User, <<"pass">>, ActingUser), - receive_user_in_event(<<"user.password.changed">>, ActingUser), + receive_user_in_event(<<"user.password.changed">>, ActingUser, Config), rabbit_ct_broker_helpers:clear_password(Config, 0, User, ActingUser), - receive_user_in_event(<<"user.password.cleared">>, ActingUser), + receive_user_in_event(<<"user.password.cleared">>, ActingUser, Config), rabbit_ct_broker_helpers:delete_user(Config, 0, User, ActingUser), rabbit_ct_client_helpers:close_channel(Ch), @@ -392,7 +459,7 @@ audit_user_tags(Config) -> rabbit_ct_broker_helpers:add_user(Config, 0, User, User, ActingUser), rabbit_ct_broker_helpers:set_user_tags(Config, 0, User, [management], ActingUser), - receive_user_in_event(<<"user.tags.set">>, ActingUser), + receive_user_in_event(<<"user.tags.set">>, ActingUser, Config), rabbit_ct_broker_helpers:delete_user(Config, 0, User, ActingUser), @@ -408,10 +475,10 @@ audit_permission(Config) -> rabbit_ct_broker_helpers:add_user(Config, 0, User, User, ActingUser), rabbit_ct_broker_helpers:set_permissions(Config, 0, User, VHost, <<".*">>, <<".*">>, <<".*">>, ActingUser), - receive_user_in_event(<<"permission.created">>, ActingUser), + receive_user_in_event(<<"permission.created">>, ActingUser, Config), rabbit_ct_broker_helpers:clear_permissions(Config, 0, User, VHost, ActingUser), - receive_user_in_event(<<"permission.deleted">>, ActingUser), + receive_user_in_event(<<"permission.deleted">>, ActingUser, Config), rabbit_ct_broker_helpers:delete_user(Config, 0, User, ActingUser), rabbit_ct_client_helpers:close_channel(Ch), @@ -427,12 +494,12 @@ audit_topic_permission(Config) -> rabbit_ct_broker_helpers:rpc( Config, 0, rabbit_auth_backend_internal, set_topic_permissions, [User, VHost, <<"amq.topic">>, "^a", "^a", ActingUser]), - receive_user_in_event(<<"topic.permission.created">>, ActingUser), + receive_user_in_event(<<"topic.permission.created">>, ActingUser, Config), rabbit_ct_broker_helpers:rpc( Config, 0, rabbit_auth_backend_internal, clear_topic_permissions, [User, VHost, ActingUser]), - receive_user_in_event(<<"topic.permission.deleted">>, ActingUser), + receive_user_in_event(<<"topic.permission.deleted">>, ActingUser, Config), rabbit_ct_broker_helpers:delete_user(Config, 0, User, ActingUser), rabbit_ct_client_helpers:close_channel(Ch), @@ -469,8 +536,8 @@ unregister(Config) -> lookup, [X])), ok. -%% Test that the event exchange works when publising and consuming via AMQP 1.0. -amqp_connection(Config) -> +%% Test the plugin publishing internally with AMQP 0.9.1 while the client uses AMQP 1.0. +amqp_0_9_1_amqp_connection(Config) -> QName = atom_to_binary(?FUNCTION_NAME), Address = rabbitmq_amqp_address:queue(QName), {Connection1, Session, LinkPair} = amqp_init(Config), @@ -498,6 +565,111 @@ amqp_connection(Config) -> ok = amqp10_client:end_session(Session), ok = amqp10_client:close_connection(Connection1). +%% Test the plugin publishing internally with AMQP 1.0 and the client using AMQP 1.0. +amqp_1_0_amqp_connection(Config) -> + QName = atom_to_binary(?FUNCTION_NAME), + Address = rabbitmq_amqp_address:queue(QName), + {Connection1, Session, LinkPair} = amqp_init(Config), + {ok, _} = rabbitmq_amqp_client:declare_queue(LinkPair, QName,#{}), + ok = rabbitmq_amqp_client:bind_queue( + LinkPair, QName, <<"amq.rabbitmq.event">>, <<"connection.*">>, #{}), + {ok, Receiver} = amqp10_client:attach_receiver_link( + Session, <<"receiver">>, Address, settled), + + Now = os:system_time(millisecond), + OpnConf0 = amqp_connection_config(Config), + OpnConf = maps:update(container_id, <<"2nd container">>, OpnConf0), + {ok, Connection2} = amqp10_client:open_connection(OpnConf), + receive {amqp10_event, {connection, Connection2, opened}} -> ok + after 5000 -> ct:fail({missing_event, ?LINE}) + end, + {ok, Msg} = amqp10_client:get_msg(Receiver), + ?assertEqual(<<>>, iolist_to_binary(amqp10_msg:body(Msg))), + MsgAnns = amqp10_msg:message_annotations(Msg), + ?assertMatch(#{<<"x-routing-key">> := <<"connection.created">>, + <<"x-opt-container-id">> := <<"2nd container">>, + <<"x-opt-channel-max">> := ChannelMax} + when is_integer(ChannelMax), + MsgAnns), + %% We expect to receive event properties that have complex types. + ClientProps = maps:get(<<"x-opt-client-properties">>, MsgAnns), + OtpRelease = integer_to_binary(?OTP_RELEASE), + ?assertMatch(#{ + {symbol, <<"version">>} := {utf8, _Version}, + {symbol, <<"product">>} := {utf8, <<"AMQP 1.0 client">>}, + {symbol, <<"platform">>} := {utf8, <<"Erlang/OTP ", OtpRelease/binary>>} + }, + maps:from_list(ClientProps)), + FormattedPid = maps:get(<<"x-opt-pid">>, MsgAnns), + + %% The formatted Pid should include the RabbitMQ node name: + ?assertMatch({match, _}, + re:run(FormattedPid, <<"rmq-ct-system_SUITE">>)), + + #{creation_time := CreationTime} = amqp10_msg:properties(Msg), + ?assert(is_integer(CreationTime)), + ?assert(CreationTime > Now - 5000), + ?assert(CreationTime < Now + 5000), + + ok = amqp10_client:close_connection(Connection2), + {ok, _} = rabbitmq_amqp_client:delete_queue(LinkPair, QName), + ok = rabbitmq_amqp_client:detach_management_link_pair_sync(LinkPair), + ok = amqp10_client:end_session(Session), + ok = amqp10_client:close_connection(Connection1). + +%% Test that routing on specific event properties works. +headers_exchange(Config) -> + XName = <<"my headers exchange">>, + QName = atom_to_binary(?FUNCTION_NAME), + Address = rabbitmq_amqp_address:queue(QName), + OpnConf = amqp_connection_config(Config), + {Connection, Session, LinkPair} = amqp_init(Config), + + ok = rabbitmq_amqp_client:declare_exchange(LinkPair, XName, #{type => <<"headers">>}), + {ok, _} = rabbitmq_amqp_client:declare_queue(LinkPair, QName, #{}), + ok = rabbitmq_amqp_client:bind_queue( + LinkPair, QName, XName, <<>>, + #{<<"x-opt-container-id">> => {utf8, <<"client-2">>}, + <<"x-match">> => {utf8, <<"any-with-x">>}}), + ok = rabbitmq_amqp_client:bind_exchange( + LinkPair, XName, <<"amq.rabbitmq.event">>, <<"connection.created">>, #{}), + {ok, Receiver} = amqp10_client:attach_receiver_link( + Session, <<"receiver">>, Address, settled), + + %% Open two connections. + OpnConf1 = maps:update(container_id, <<"client-1">>, OpnConf), + {ok, Connection1} = amqp10_client:open_connection(OpnConf1), + receive {amqp10_event, {connection, Connection1, opened}} -> ok + after 5000 -> ct:fail({missing_event, ?LINE}) + end, + OpnConf2 = maps:update(container_id, <<"client-2">>, OpnConf), + {ok, Connection2} = amqp10_client:open_connection(OpnConf2), + receive {amqp10_event, {connection, Connection2, opened}} -> ok + after 5000 -> ct:fail({missing_event, ?LINE}) + end, + + %% Thanks to routing via headers exchange on event property + %% x-opt-container-id = client-2 + %% we should only receive the second connection.created event. + ok = amqp10_client:flow_link_credit(Receiver, 2, never, true), + receive {amqp10_msg, Receiver, Msg} -> + ?assertMatch(#{<<"x-routing-key">> := <<"connection.created">>, + <<"x-opt-container-id">> := <<"client-2">>}, + amqp10_msg:message_annotations(Msg)) + after 5000 -> ct:fail({missing_msg, ?LINE}) + end, + receive {amqp10_event, {link, Receiver, credit_exhausted}} -> ok + after 5000 -> ct:fail({missing_event, ?LINE}) + end, + + ok = amqp10_client:close_connection(Connection1), + ok = amqp10_client:close_connection(Connection2), + {ok, _} = rabbitmq_amqp_client:delete_queue(LinkPair, QName), + ok = rabbitmq_amqp_client:delete_exchange(LinkPair, XName), + ok = rabbitmq_amqp_client:detach_management_link_pair_sync(LinkPair), + ok = amqp10_client:end_session(Session), + ok = amqp10_client:close_connection(Connection). + %% ------------------------------------------------------------------- %% Helpers %% ------------------------------------------------------------------- @@ -516,17 +688,36 @@ declare_event_queue(Config, RoutingKey) -> end, Ch. -receive_user_in_event(Event, User) -> - receive_event(Event, ?TAG, {longstr, User}). +user_key(Config) -> + case group_name(Config) of + amqp_0_9_1 -> + <<"user_who_performed_action">>; + amqp_1_0 -> + <<"x-opt-user-who-performed-action">> + end. + +group_name(Config) -> + GroupProps = proplists:get_value(tc_group_properties, Config), + proplists:get_value(name, GroupProps). + +receive_user_in_event(Event, Config) -> + User = proplists:get_value(rmq_username, Config), + receive_user_in_event(Event, User, Config). + +receive_user_in_event(Event, User, Config) -> + Key = user_key(Config), + Value = {longstr, User}, + receive_event(Event, Key, Value). receive_event(Event, Key, Value) -> receive {#'basic.deliver'{routing_key = RoutingKey}, #amqp_msg{props = #'P_basic'{headers = Headers}}} -> - Event = RoutingKey, - Value = rabbit_misc:table_lookup(Headers, Key) + ?assertEqual(Event, RoutingKey), + ?assertEqual(Value, rabbit_misc:table_lookup(Headers, Key)), + Headers after - 60000 -> + 10_000 -> throw({receive_event_timeout, Event, Key, Value}) end. @@ -534,9 +725,9 @@ receive_event(Event) -> receive {#'basic.deliver'{routing_key = RoutingKey}, #amqp_msg{props = #'P_basic'{}}} -> - Event = RoutingKey + ?assertEqual(Event, RoutingKey) after - 60000 -> + 10_000 -> throw({receive_event_timeout, Event}) end. diff --git a/release-notes/4.1.0.md b/release-notes/4.1.0.md index 32ae19d73e1c..6ffd23bc853c 100644 --- a/release-notes/4.1.0.md +++ b/release-notes/4.1.0.md @@ -37,6 +37,11 @@ These metrics have already been emitted for AMQP 0.9.1 connections prior to Rabb * Session flow control state * Number of unconfirmed and unacknowledged messages +### Support publishing AMQP 1.0 messages to the Event Exchange +[PR #12714](https://github.com/rabbitmq/rabbitmq-server/pull/12714) allows the `rabbitmq_event_exchange` plugin to be configured to internally publish AMQP 1.0 instead of AMQP 0.9.1 messages to the `amq.rabbitmq.event` topic exchange. + +This feature allows AMQP 1.0 consumers to receive event properties containing complex types such as [lists](https://docs.oasis-open.org/amqp/core/v1.0/os/amqp-core-types-v1.0-os.html#type-list) or [maps](https://docs.oasis-open.org/amqp/core/v1.0/os/amqp-core-types-v1.0-os.html#type-map), for example queue arguments for the `queue.created` event or client provided properties for the `connection.created` event. + ### Prometheus histogram for message sizes [PR #12342](https://github.com/rabbitmq/rabbitmq-server/pull/12342) exposes a Prometheus histogram for message sizes received by RabbitMQ.