From 95a96b42f0ba99b9ba422434f74880d719e935c2 Mon Sep 17 00:00:00 2001 From: Nelson Vides <nelson.vides@erlang-solutions.com> Date: Tue, 17 Dec 2024 19:28:25 +0100 Subject: [PATCH] Implement components as a gen_statem over ranch --- big_tests/tests/component_SUITE.erl | 16 +- .../mongoose_component_connection.erl | 484 ++++++++++++++++++ src/component/mongoose_component_listener.erl | 29 +- .../mongoose_component_packet_handler.erl | 2 +- src/component/mongoose_component_ranch.erl | 62 +++ src/component/mongoose_component_socket.erl | 47 ++ src/ejabberd_sup.erl | 3 - src/listeners/mongoose_listener.erl | 1 + src/mongoose_transport.erl | 21 +- 9 files changed, 629 insertions(+), 36 deletions(-) create mode 100644 src/component/mongoose_component_connection.erl create mode 100644 src/component/mongoose_component_ranch.erl create mode 100644 src/component/mongoose_component_socket.erl diff --git a/big_tests/tests/component_SUITE.erl b/big_tests/tests/component_SUITE.erl index aa1f7ef09e..f1789582c6 100644 --- a/big_tests/tests/component_SUITE.erl +++ b/big_tests/tests/component_SUITE.erl @@ -125,12 +125,14 @@ register_one_component(Config) -> {Component, ComponentAddr, _} = component_helper:connect_component(CompOpts), % start stream reply instrument_helper:assert(component_xmpp_element_size_out, #{}, fun(#{byte_size := S}) -> S > 0 end, - #{expected_count => 1, min_timestamp => TS}), + #{expected_count => 2, min_timestamp => TS}), % 1. start stream, 2. component handshake instrument_helper:assert(component_xmpp_element_size_in, #{}, fun(#{byte_size := S}) -> S > 0 end, #{expected_count => 2, min_timestamp => TS}), instrument_helper:assert(component_tcp_data_in, #{}, fun(#{byte_size := S}) -> S > 0 end, #{expected_count => 2, min_timestamp => TS}), + instrument_helper:assert(component_auth_failed, #{}, fun(#{byte_size := S}) -> S > 0 end, + #{expected_count => 0, min_timestamp => TS}), % 1. start stream reply, 2. handshake reply instrument_helper:assert(component_tcp_data_out, #{}, fun(#{byte_size := S}) -> S > 0 end, #{expected_count => 2, min_timestamp => TS}), @@ -144,6 +146,10 @@ register_one_component(Config) -> % Reply to Alice instrument_helper:assert(component_xmpp_element_size_in, #{}, fun(#{byte_size := S}) -> S > 0 end, #{expected_count => 1, min_timestamp => TS1}), + instrument_helper:assert(component_element_in, #{}, fun(_) -> true end, + #{expected_count => 1, min_timestamp => TS1}), + instrument_helper:assert(component_element_out, #{}, fun(_) -> true end, + #{expected_count => 1, min_timestamp => TS1}), component_helper:disconnect_component(Component, ComponentAddr). @@ -243,9 +249,9 @@ register_two_components(Config) -> try_registering_with_wrong_password(Config) -> %% Given a component with a wrong password + TS = instrument_helper:timestamp(), CompOpts1 = ?config(component1, Config), - CompOpts2 = lists:keyreplace(password, 1, CompOpts1, - {password, <<"wrong_one">>}), + CompOpts2 = lists:keyreplace(password, 1, CompOpts1, {password, <<"wrong_one">>}), try %% When trying to connect it {Comp, Addr, _} = component_helper:connect_component(CompOpts2), @@ -253,6 +259,8 @@ try_registering_with_wrong_password(Config) -> ct:fail("component connected successfully with wrong password") catch {stream_error, _E} -> %% Then it should fail to do so + instrument_helper:assert(component_auth_failed, #{}, fun(_) -> true end, + #{expected_count => 1, min_timestamp => TS}), ok end. @@ -559,7 +567,7 @@ restore_domain(Config) -> Config. events() -> - instrument_helper:declared_events(ejabberd_service, [#{connection_type => component}]). + instrument_helper:declared_events(mongoose_component_listener, []). %%-------------------------------------------------------------------- %% Stanzas diff --git a/src/component/mongoose_component_connection.erl b/src/component/mongoose_component_connection.erl new file mode 100644 index 0000000000..39c5b38259 --- /dev/null +++ b/src/component/mongoose_component_connection.erl @@ -0,0 +1,484 @@ +-module(mongoose_component_connection). + +-xep([{xep, 114}, {version, "1.6"}]). + +-include("mongoose_logger.hrl"). +-include("mongoose.hrl"). +-include("jlib.hrl"). +-include_lib("exml/include/exml_stream.hrl"). +-include("external_component.hrl"). + +-define(RETRIES, 3). + +-behaviour(gen_statem). +-export([callback_mode/0, init/1, handle_event/4, terminate/3]). + +%% utils +-export([start_link/2, exit/2, route/2]). + +-record(component_data, { + lserver = ?MYNAME :: jid:lserver(), + streamid = mongoose_bin:gen_from_crypto() :: binary(), + is_subdomain = false :: boolean(), + components = [] :: [mongoose_component:external_component()], + socket :: undefined | mongoose_component_socket:socket(), + parser :: undefined | exml_stream:parser(), + shaper :: undefined | mongoose_shaper:shaper(), + listener_opts :: undefined | listener_opts() + }). +-type data() :: #component_data{}. +-type maybe_ok() :: ok | {error, atom()}. +-type fsm_res() :: gen_statem:event_handler_result(state(), data()). +-type packet() :: {jid:jid(), jid:jid(), exml:element()}. + +-type retries() :: 0..3. +-type state() :: connect + | wait_for_stream + | wait_for_handshake + | stream_established. + +-type listener_opts() :: #{shaper := atom(), + max_stanza_size := non_neg_integer(), + state_timeout := non_neg_integer(), + port := inet:port_number(), + ip_tuple := inet:ip_address(), + proto := tcp, + term() => term()}. + +-export_type([packet/0, data/0, state/0, fsm_res/0, retries/0, listener_opts/0]). + +%%%---------------------------------------------------------------------- +%%% gen_statem +%%%---------------------------------------------------------------------- + +-spec callback_mode() -> gen_statem:callback_mode_result(). +callback_mode() -> + handle_event_function. + +-spec init({module(), term(), listener_opts()}) -> + gen_statem:init_result(state(), data()). +init({SocketModule, SocketOpts, LOpts}) -> + StateData = #component_data{listener_opts = LOpts}, + ConnectEvent = {next_event, internal, {connect, {SocketModule, SocketOpts}}}, + {ok, connect, StateData, ConnectEvent}. + +-spec handle_event(gen_statem:event_type(), term(), state(), data()) -> fsm_res(). +handle_event(internal, {connect, {SocketModule, SocketOpts}}, connect, + StateData = #component_data{listener_opts = #{shaper := ShaperName, + max_stanza_size := MaxStanzaSize} = LOpts}) -> + {ok, Parser} = exml_stream:new_parser([{max_element_size, MaxStanzaSize}]), + Shaper = mongoose_shaper:new(ShaperName), + C2SSocket = mongoose_component_socket:new(SocketModule, SocketOpts, LOpts), + StateData1 = StateData#component_data{socket = C2SSocket, parser = Parser, shaper = Shaper}, + {next_state, wait_for_stream, StateData1, state_timeout(LOpts)}; +handle_event(internal, #xmlstreamstart{name = Name, attrs = Attrs}, wait_for_stream, StateData) -> + StreamStart = #xmlel{name = Name, attrs = Attrs}, + execute_element_event(component_element_in, StreamStart, StateData), + handle_stream_start(StateData, StreamStart); +handle_event(internal, #xmlel{name = <<"handshake">>} = El, wait_for_handshake, StateData) -> + execute_element_event(component_element_in, El, StateData), + handle_handshake(StateData, El); +handle_event(internal, #xmlel{} = El, stream_established, StateData) -> + execute_element_event(component_element_in, El, StateData), + handle_stream_established(StateData, El); +handle_event(internal, #xmlstreamend{name = Name}, _, StateData) -> + StreamEnd = #xmlel{name = Name}, + execute_element_event(component_element_in, StreamEnd, StateData), + send_trailer(StateData), + {stop, {shutdown, stream_end}}; +handle_event(internal, #xmlstreamstart{}, _, StateData) -> + stream_start_error(StateData, mongoose_xmpp_errors:policy_violation()); +handle_event(cast, {exit, Reason}, _, StateData) when is_binary(Reason) -> + StreamConflict = mongoose_xmpp_errors:stream_conflict(?MYLANG, Reason), + send_xml(StateData, StreamConflict), + send_trailer(StateData), + {stop, {shutdown, Reason}}; +handle_event(cast, {exit, system_shutdown}, _, StateData) -> + Error = mongoose_xmpp_errors:system_shutdown(?MYLANG, <<"System shutting down">>), + send_xml(StateData, Error), + send_trailer(StateData), + {stop, {shutdown, system_shutdown}}; +handle_event(info, {route, Acc}, State, StateData) -> + handle_route(StateData, State, Acc); +handle_event(info, {tcp, _Socket, _Packet} = SocketData, _, StateData) -> + handle_socket_data(StateData, SocketData); +handle_event(info, {ClosedOrError, _Socket}, _, _) + when tcp_closed =:= ClosedOrError; tcp_error =:= ClosedOrError -> + {stop, {shutdown, ClosedOrError}}; +handle_event({timeout, activate_socket}, activate_socket, _State, StateData) -> + activate_socket(StateData), + keep_state_and_data; +handle_event(state_timeout, state_timeout_termination, _State, StateData) -> + StreamConflict = mongoose_xmpp_errors:connection_timeout(), + send_xml(StateData, StreamConflict), + send_trailer(StateData), + {stop, {shutdown, state_timeout}}; +handle_event(_EventType, _EventContent, _State, _StateData) -> + exit('WTF'). + +-spec terminate(term(), undefined | state(), data()) -> any(). +terminate(Reason, stream_established, StateData) -> + unregister_routes(StateData), + terminate(Reason, undefined, StateData); +terminate(Reason, State, StateData) -> + ?LOG_INFO(#{what => component_statem_terminate, + reason => Reason, + component => StateData#component_data.lserver, + component_state => State, + component_data => StateData}), + close_parser(StateData), + close_socket(StateData). + +%%%---------------------------------------------------------------------- +%%% socket helpers +%%%---------------------------------------------------------------------- + +-spec handle_socket_data(data(), {_, _, iodata()}) -> fsm_res(). +handle_socket_data(StateData = #component_data{socket = Socket}, Payload) -> + case mongoose_component_socket:handle_data(Socket, Payload) of + {error, _Reason} -> + {stop, {shutdown, socket_error}, StateData}; + Data -> + handle_socket_packet(StateData, Data) + end. + +-spec handle_socket_packet(data(), iodata()) -> fsm_res(). +handle_socket_packet(StateData = #component_data{parser = Parser}, Packet) -> + ?LOG_DEBUG(#{what => received_xml_on_stream, packet => Packet, component_pid => self()}), + case exml_stream:parse(Parser, Packet) of + {error, Reason} -> + NextEvent = {next_event, internal, #xmlstreamerror{name = iolist_to_binary(Reason)}}, + {keep_state, StateData, NextEvent}; + {ok, NewParser, XmlElements} -> + Size = iolist_size(Packet), + NewStateData = StateData#component_data{parser = NewParser}, + handle_socket_elements(NewStateData, XmlElements, Size) + end. + +-spec handle_socket_elements(data(), [exml_stream:element()], non_neg_integer()) -> fsm_res(). +handle_socket_elements(StateData = #component_data{shaper = Shaper}, Elements, Size) -> + {NewShaper, Pause} = mongoose_shaper:update(Shaper, Size), + [mongoose_instrument:execute( + component_xmpp_element_size_in, #{}, #{byte_size => exml:xml_size(El)}) + || El <- Elements], + NewStateData = StateData#component_data{shaper = NewShaper}, + MaybePauseTimeout = maybe_pause(NewStateData, Pause), + StreamEvents = [ {next_event, internal, XmlEl} || XmlEl <- Elements ], + {keep_state, NewStateData, MaybePauseTimeout ++ StreamEvents}. + +-spec maybe_pause(data(), integer()) -> any(). +maybe_pause(_StateData, Pause) when Pause > 0 -> + [{{timeout, activate_socket}, Pause, activate_socket}]; +maybe_pause(#component_data{socket = Socket}, _) -> + mongoose_component_socket:activate(Socket), + []. + +-spec close_socket(data()) -> ok | {error, term()}. +close_socket(#component_data{socket = undefined}) -> + ok; +close_socket(#component_data{socket = Socket}) -> + mongoose_component_socket:close(Socket). + +-spec activate_socket(data()) -> ok | {error, term()}. +activate_socket(#component_data{socket = Socket}) -> + mongoose_component_socket:activate(Socket). + +%%%---------------------------------------------------------------------- +%%% error handler helpers +%%%---------------------------------------------------------------------- + +%%%---------------------------------------------------------------------- +%%% helpers +%%%---------------------------------------------------------------------- + +-spec handle_stream_start(data(), exml:element()) -> fsm_res(). +handle_stream_start(S0, StreamStart) -> + LServer = jid:nameprep(exml_query:attr(StreamStart, <<"to">>, <<>>)), + XmlNs = exml_query:attr(StreamStart, <<"xmlns">>, <<>>), + case {XmlNs, LServer} of + {?NS_COMPONENT_ACCEPT, LServer} when error =/= LServer -> + IsSubdomain = <<"true">> =:= exml_query:attr(StreamStart, <<"is_subdomain">>, <<>>), + send_header(S0), + S1 = S0#component_data{lserver = LServer, is_subdomain = IsSubdomain}, + {next_state, wait_for_handshake, S1, state_timeout(S1)}; + {?NS_COMPONENT_ACCEPT, error} -> + stream_start_error(S0, mongoose_xmpp_errors:host_unknown()); + {_, _} -> + stream_start_error(S0, mongoose_xmpp_errors:invalid_namespace()) + end. + +-spec handle_handshake(data(), exml:element()) -> fsm_res(). +handle_handshake(StateData, El) -> + #component_data{lserver = LServer, + streamid = StreamId, + listener_opts = #{password := Password}} = StateData, + Proof = create_proof(StreamId, Password), + Digest = exml_query:cdata(El), + case crypto:hash_equals(Digest, Proof) of + true -> + try_register_routes(StateData, ?RETRIES); + false -> + mongoose_instrument:execute(component_auth_failed, #{}, + #{count => 1, lserver => LServer}), + Error = mongoose_xmpp_errors:stream_not_authorized(?MYLANG, <<"Invalid handshake">>), + stream_start_error(StateData, Error) + end. + +% The XML character data of the handshake element is computed according to the following algorithm: +% 1. Concatenate the Stream ID received from the server with the shared secret. +% 2. Hash the concatenated string according to the SHA1 algorithm, i.e., SHA1( concat (sid, password)). +% 3. Ensure that the hash output is in hexadecimal format, not binary or base64. +% 4. Convert the hash output to all lowercase characters. +-spec create_proof(binary(), binary()) -> binary(). +create_proof(StreamId, Password) -> + Concat = [StreamId, Password], + Sha1 = crypto:hash(sha, Concat), + binary:encode_hex(Sha1, lowercase). + +%% Once authenticated, the component can send stanzas through the server and receive stanzas from +%% the server. All stanzas sent to the server MUST possess a 'from' attribute and a 'to' attribute, +%% as in the 'jabber:server' namespace. The domain identifier portion of the JID contained in the +%% 'from' attribute MUST match the hostname of the component. However, this is the only restriction +%% on 'from' addresses, and the component MAY send stanzas from any user at its hostname. +-spec handle_stream_established(data(), exml:element()) -> fsm_res(). +handle_stream_established(StateData, #xmlel{name = Name} = El) -> + #component_data{lserver = LServer, listener_opts = #{check_from := CheckFrom}} = StateData, + NewEl = jlib:remove_attr(<<"xmlns">>, El), + FromJid = jid:from_binary(exml_query:attr(El, <<"from">>, <<>>)), + IsValidFromJid = + case {CheckFrom, FromJid} of + %% The default is the standard behaviour in XEP-0114 + {true, #jid{lserver = FromLServer}} -> + FromLServer =:= LServer; + %% If the admin does not want to check the from field when accept packets from any + %% address. In this case, the component can send packet of behalf of the server users. + _ -> + ok + end, + ToJid = jid:from_binary(exml_query:attr(El, <<"to">>, <<>>)), + IsStanza = (<<"iq">> =:= Name) + orelse (<<"message">> =:= Name) + orelse (<<"presence">> =:= Name), + case IsStanza andalso IsValidFromJid andalso (error =/= ToJid) of + true -> + Acc = element_to_origin_accum(StateData, FromJid, ToJid, NewEl), + ejabberd_router:route(FromJid, ToJid, Acc, NewEl); + false -> + ?LOG_INFO(#{what => comp_bad_request, + text => <<"Not valid Name or error in FromJid or ToJid">>, + stanza_name => Name, from_jid => FromJid, to_jid => ToJid}), + Err = jlib:make_error_reply(NewEl, mongoose_xmpp_errors:bad_request()), + send_xml(StateData, Err), + error + end, + keep_state_and_data. + +-spec register_routes(data()) -> any(). +register_routes(StateData) -> + #component_data{listener_opts = #{hidden_components := AreHidden}} = StateData, + Routes = get_routes(StateData), + Handler = mongoose_packet_handler:new(mongoose_component_packet_handler, #{pid => self()}), + mongoose_component:register_components(Routes, node(), Handler, AreHidden). + +-spec get_routes(data()) -> [jid:lserver()]. +get_routes(#component_data{lserver = Subdomain, is_subdomain=true}) -> + Hosts = mongoose_config:get_opt(hosts), + component_routes(Subdomain, Hosts); +get_routes(#component_data{lserver = Host}) -> + [Host]. + +-spec component_routes(binary(), [jid:lserver()]) -> [jid:lserver()]. +component_routes(Subdomain, Hosts) -> + [<<Subdomain/binary, ".", Host/binary>> || Host <- Hosts]. + +-spec try_register_routes(data(), retries()) -> fsm_res(). +try_register_routes(StateData, Retries) -> + case register_routes(StateData) of + {ok, Components} -> + send_xml(StateData, #xmlel{name = <<"handshake">>}), + {next_state, stream_established, StateData#component_data{components = Components}}; + {error, Reason} -> + #component_data{listener_opts = #{conflict_behaviour := ConflictBehaviour}} = StateData, + RoutesInfo = lookup_routes(StateData), + ?LOG_ERROR(#{what => comp_registration_conflict, + text => <<"Another connection from a component with the same name">>, + component => StateData#component_data.lserver, + reason => Reason, retries => Retries, routes_info => RoutesInfo, + conflict_behaviour => ConflictBehaviour}), + handle_registration_conflict(ConflictBehaviour, RoutesInfo, StateData, Retries) + end. + +handle_registration_conflict(kick_old, RoutesInfo, StateData, Retries) when Retries > 0 -> + %% see lookup_routes + Pids = lists:usort(routes_info_to_pids(RoutesInfo)), + Results = lists:map(fun stop_process/1, Pids), + AllOk = lists:all(fun(Result) -> Result =:= ok end, Results), + case AllOk of + true -> + %% Do recursive call + try_register_routes(StateData, Retries - 1); + false -> + ?LOG_ERROR(#{what => comp_registration_kick_failed, + text => <<"Failed to stop old component connection. Disconnecting next.">>, + component => StateData#component_data.lserver, + component_pids => Pids, results => Results}), + do_disconnect_on_conflict(StateData) + end; +handle_registration_conflict(_Behaviour, _RoutesInfo, StateData, _Retries) -> + do_disconnect_on_conflict(StateData). + +do_disconnect_on_conflict(StateData) -> + send_xml(StateData, mongoose_xmpp_errors:stream_conflict()), + {stop, normal, StateData}. + +-spec lookup_routes(data()) -> [{_, _}]. +lookup_routes(StateData) -> + Routes = get_routes(StateData), + %% Lookup for all pids for the route (both local and global) + [{Route, mongoose_component:lookup_component(Route)} || Route <- Routes]. + +stop_process(Pid) -> + ?MODULE:exit(Pid, <<"Replaced by new connection">>), + MonRef = erlang:monitor(process, Pid), + receive + {'DOWN', MonRef, process, Pid, _Reason} -> + ok + after 5000 -> + erlang:demonitor(MonRef, [flush]), + {error, timeout} + end. + +routes_info_to_pids(RoutesInfo) -> + {_Hosts, ExtComponentsPerHost} = lists:unzip(RoutesInfo), + %% Flatten the list of lists + ExtComponents = lists:append(ExtComponentsPerHost), + %% Ignore handlers from other modules + [maps:get(pid, mongoose_packet_handler:extra(H)) + || #external_component{handler = H} <- ExtComponents, + mongoose_packet_handler:module(H) =:= mongoose_component_packet_handler]. + +-spec handle_route(data(), state(), mongoose_acc:t()) -> fsm_res(). +handle_route(StateData = #component_data{}, _, Acc) -> + ?LOG_DEBUG(#{what => comp_route, + text => <<"Route packet to an external component">>, + component => StateData#component_data.lserver, acc => Acc}), + {From, To, Packet} = mongoose_acc:packet(Acc), + #component_data{listener_opts = #{access := Access}} = StateData, + case acl:match_rule(global, Access, From) of + allow -> + mongoose_hooks:packet_to_component(Acc, From, To), + Attrs2 = jlib:replace_from_to_attrs(jid:to_binary(From), + jid:to_binary(To), + Packet#xmlel.attrs), + send_xml(StateData, Packet#xmlel{attrs = Attrs2}); + deny -> + ejabberd_router:route_error_reply(To, From, Acc, mongoose_xmpp_errors:not_allowed()) + end, + keep_state_and_data. + +-spec element_to_origin_accum(data(), jid:jid(), jid:jid(), exml:element()) -> mongoose_acc:t(). +element_to_origin_accum(StateData, FromJid, ToJid, El) -> + Params = #{host_type => undefined, + lserver => StateData#component_data.lserver, + location => ?LOCATION, + element => El, + from_jid => FromJid, + to_jid => ToJid}, + Acc = mongoose_acc:new(Params), + mongoose_acc:set_permanent(component, [{module, ?MODULE}, {origin_jid, 'TODO'}], Acc). + +-spec stream_start_error(data(), exml:element()) -> fsm_res(). +stream_start_error(StateData, Error) -> + ?LOG_DEBUG(#{what => stream_start_error, xml_error => Error, component_state => StateData}), + send_xml(StateData, Error), + send_xml(StateData, ?XML_STREAM_TRAILER), + {stop, {shutdown, stream_error}, StateData}. + +-spec send_header(StateData :: data()) -> any(). +send_header(StateData) -> + Header = stream_header(StateData), + send_xml(StateData, Header). + +-spec send_trailer(data()) -> maybe_ok(). +send_trailer(StateData) -> + send_xml(StateData, ?XML_STREAM_TRAILER). + +-spec route(pid(), mongoose_acc:t()) -> {route, mongoose_acc:t()}. +route(Pid, Acc) -> + Pid ! {route, Acc}. + +-spec close_parser(data()) -> ok. +close_parser(#component_data{parser = undefined}) -> + ok; +close_parser(#component_data{parser = Parser}) -> + exml_stream:free_parser(Parser). + +-spec send_xml(data(), exml_stream:element() | [exml_stream:element()]) -> maybe_ok(). +send_xml(Data, XmlElement) when is_tuple(XmlElement) -> + send_xml(Data, [XmlElement]); +send_xml(#component_data{socket = Socket} = StateData, XmlElements) when is_list(XmlElements) -> + [ begin + execute_element_event(component_element_out, El, StateData), + mongoose_instrument:execute( + component_xmpp_element_size_out, #{}, #{byte_size => exml:xml_size(El)}) + end || El <- XmlElements], + mongoose_component_socket:send_xml(Socket, XmlElements). + +state_timeout(#component_data{listener_opts = LOpts}) -> + state_timeout(LOpts); +state_timeout(#{state_timeout := Timeout}) -> + {state_timeout, Timeout, state_timeout_termination}. + +%%%---------------------------------------------------------------------- +%%% API +%%%---------------------------------------------------------------------- + +-spec start_link({module(), term(), listener_opts()}, [gen_statem:start_opt()]) -> + gen_statem:start_ret(). +start_link(Params, ProcOpts) -> + gen_statem:start_link(?MODULE, Params, ProcOpts). + +-spec exit(pid(), binary() | atom()) -> ok. +exit(Pid, Reason) -> + gen_statem:cast(Pid, {exit, Reason}). + +-spec stream_header(data()) -> exml_stream:start(). +stream_header(StateData) -> + LServer = StateData#component_data.lserver, + StreamId = StateData#component_data.streamid, + Attrs = [{<<"xmlns:stream">>, <<"http://etherx.jabber.org/streams">>}, + {<<"xmlns">>, ?NS_COMPONENT_ACCEPT}, + {<<"id">>, StreamId}, + {<<"from">>, LServer}], + #xmlstreamstart{name = <<"stream:stream">>, attrs = Attrs}. + +-spec unregister_routes(data()) -> any(). +unregister_routes(#component_data{components = Components}) -> + mongoose_component:unregister_components(Components). + +%%% Instrumentation helpers +-spec execute_element_event(mongoose_instrument:event_name(), exml_stream:element(), data()) -> ok. +execute_element_event(EventName, #xmlel{name = Name} = El, #component_data{lserver = LServer}) -> + Metrics = measure_element(Name, exml_query:attr(El, <<"type">>)), + Measurements = Metrics#{element => El, lserver => LServer}, + mongoose_instrument:execute(EventName, #{}, Measurements); +execute_element_event(_, _, _) -> + ok. + +-spec measure_element(binary(), binary() | undefined) -> mongoose_instrument:measurements(). +measure_element(<<"message">>, <<"error">>) -> + #{count => 1, stanza_count => 1, error_count => 1, message_error_count => 1}; +measure_element(<<"iq">>, <<"error">>) -> + #{count => 1, stanza_count => 1, error_count => 1, iq_error_count => 1}; +measure_element(<<"presence">>, <<"error">>) -> + #{count => 1, stanza_count => 1, error_count => 1, presence_error_count => 1}; +measure_element(<<"message">>, _Type) -> + #{count => 1, stanza_count => 1, message_count => 1}; +measure_element(<<"iq">>, _Type) -> + #{count => 1, stanza_count => 1, iq_count => 1}; +measure_element(<<"presence">>, _Type) -> + #{count => 1, stanza_count => 1, presence_count => 1}; +measure_element(<<"stream:error">>, _Type) -> + #{count => 1, error_count => 1}; +measure_element(_Name, _Type) -> + #{count => 1}. diff --git a/src/component/mongoose_component_listener.erl b/src/component/mongoose_component_listener.erl index 0476d79b90..32e1afe329 100644 --- a/src/component/mongoose_component_listener.erl +++ b/src/component/mongoose_component_listener.erl @@ -1,7 +1,7 @@ -module(mongoose_component_listener). -behaviour(mongoose_listener). --export([start_listener/1, instrumentation/1]). +-export([start_listener/1, instrumentation/0]). -behaviour(ranch_protocol). -export([start_link/3]). @@ -19,14 +19,22 @@ -export_type([conflict_behaviour/0, options/0]). %% mongoose_listener --spec instrumentation(options()) -> [mongoose_instrument:spec()]. -instrumentation(#{connection_type := component} = _Opts) -> - [{component_xmpp_element_size_in, #{}, #{metrics => #{byte_size => histogram}}}, - {component_xmpp_element_size_out, #{}, #{metrics => #{byte_size => histogram}}}, - {component_tcp_data_in, #{}, #{metrics => #{byte_size => spiral}}}, - {component_tls_data_in, #{}, #{metrics => #{byte_size => spiral}}}, + +-spec instrumentation() -> [mongoose_instrument:spec()]. +instrumentation() -> + [{component_tcp_data_in, #{}, #{metrics => #{byte_size => spiral}}}, {component_tcp_data_out, #{}, #{metrics => #{byte_size => spiral}}}, - {component_tls_data_out, #{}, #{metrics => #{byte_size => spiral}}}]. + {component_xmpp_element_size_out, #{}, #{metrics => #{byte_size => histogram}}}, + {component_xmpp_element_size_in, #{}, #{metrics => #{byte_size => histogram}}}, + {component_auth_failed, #{}, #{metrics => #{count => spiral}}}, + {component_element_in, #{}, + #{metrics => maps:from_list([{Metric, spiral} || Metric <- element_spirals()])}}, + {component_element_out, #{}, + #{metrics => maps:from_list([{Metric, spiral} || Metric <- element_spirals()])}}]. + +element_spirals() -> + [count, stanza_count, message_count, iq_count, presence_count, + error_count, message_error_count, iq_error_count, presence_error_count]. -spec start_listener(options()) -> ok. start_listener(#{module := Module} = Opts) when is_atom(Module) -> @@ -36,5 +44,6 @@ start_listener(#{module := Module} = Opts) when is_atom(Module) -> mongoose_listener_sup:start_child(ChildSpec). %% ranch_protocol -start_link(Ref, Transport, Opts) -> - ejabberd_service:start_link({Transport, Ref, Opts}, Opts). +start_link(Ref, Transport, Opts = #{hibernate_after := HibernateAfterTimeout}) -> + ProcessOpts = [{hibernate_after, HibernateAfterTimeout}], + mongoose_component_connection:start_link({mongoose_component_ranch, {Transport, Ref}, Opts}, ProcessOpts). diff --git a/src/component/mongoose_component_packet_handler.erl b/src/component/mongoose_component_packet_handler.erl index 9e470d0024..3139283c64 100644 --- a/src/component/mongoose_component_packet_handler.erl +++ b/src/component/mongoose_component_packet_handler.erl @@ -8,5 +8,5 @@ mongoose_acc:t(), jid:jid(), jid:jid(), exml:element(), #{pid := pid()}) -> mongoose_acc:t(). process_packet(Acc, _From, _To, _El, #{pid := Pid}) -> - Pid ! {route, Acc}, + mongoose_component_connection:route(Pid, Acc), Acc. diff --git a/src/component/mongoose_component_ranch.erl b/src/component/mongoose_component_ranch.erl new file mode 100644 index 0000000000..82da72f679 --- /dev/null +++ b/src/component/mongoose_component_ranch.erl @@ -0,0 +1,62 @@ +-module(mongoose_component_ranch). + +-behaviour(mongoose_component_socket). + +-export([socket_new/2, + socket_peername/1, + socket_handle_data/2, + socket_activate/1, + socket_close/1, + socket_send_xml/2 + ]). + +-record(state, { + ranch_ref :: ranch:ref(), + socket :: ranch_transport:socket(), + ip :: {inet:ip_address(), inet:port_number()} + }). + +-type state() :: #state{}. + +-spec socket_new(term(), mongoose_listener:options()) -> state(). +socket_new({ranch_tcp, RanchRef}, _Opts) -> + {ok, TcpSocket} = ranch:handshake(RanchRef), + {ok, Ip} = ranch_tcp:peername(TcpSocket), + #state{ + ranch_ref = RanchRef, + socket = TcpSocket, + ip = Ip}. + +-spec socket_peername(state()) -> {inet:ip_address(), inet:port_number()}. +socket_peername(#state{ip = Ip}) -> + Ip. + +-spec socket_handle_data(state(), {tcp, term(), iodata()}) -> + iodata() | {raw, [exml:element()]} | {error, term()}. +socket_handle_data(#state{socket = Socket}, {tcp, Socket, Data}) -> + mongoose_instrument:execute(component_tcp_data_in, #{}, #{byte_size => byte_size(Data)}), + Data. + +-spec socket_activate(state()) -> ok. +socket_activate(#state{socket = Socket}) -> + ranch_tcp:setopts(Socket, [{active, once}]). + +-spec socket_close(state()) -> ok. +socket_close(#state{socket = Socket}) -> + ranch_tcp:close(Socket). + +-spec socket_send_xml(state(), iodata() | exml_stream:element() | [exml_stream:element()]) -> + ok | {error, term()}. +socket_send_xml(#state{socket = Socket}, XML) -> + Text = exml:to_iolist(XML), + case send(Socket, Text) of + ok -> + ok; + Error -> + Error + end. + +-spec send(ranch_transport:socket(), iodata()) -> ok | {error, term()}. +send(Socket, Data) -> + mongoose_instrument:execute(component_tcp_data_out, #{}, #{byte_size => iolist_size(Data)}), + ranch_tcp:send(Socket, Data). diff --git a/src/component/mongoose_component_socket.erl b/src/component/mongoose_component_socket.erl new file mode 100644 index 0000000000..9ae72eeee6 --- /dev/null +++ b/src/component/mongoose_component_socket.erl @@ -0,0 +1,47 @@ +-module(mongoose_component_socket). + +-export([new/3, handle_data/2, activate/1, close/1, send_xml/2]). + +-callback socket_new(term(), mongoose_c2s:listener_opts()) -> state(). +-callback socket_peername(state()) -> {inet:ip_address(), inet:port_number()}. +-callback socket_handle_data(state(), {tcp, term(), iodata()}) -> + iodata() | {raw, [exml:element()]} | {error, term()}. +-callback socket_activate(state()) -> ok. +-callback socket_close(state()) -> ok. +-callback socket_send_xml(state(), iodata() | exml_stream:element() | [exml_stream:element()]) -> + ok | {error, term()}. + +-record(component_socket, {module :: module(), + state :: state()}). +-type socket() :: #component_socket{}. +-type state() :: term(). +-type conn_type() :: component. +-export_type([socket/0, state/0, conn_type/0]). + +-spec new(module(), term(), mongoose_listener:options()) -> socket(). +new(Module, SocketOpts, LOpts) -> + State = Module:socket_new(SocketOpts, LOpts), + C2SSocket = #component_socket{ + module = Module, + state = State}, + activate(C2SSocket), + C2SSocket. + +-spec handle_data(socket(), {tcp, term(), iodata()}) -> + iodata() | {raw, [term()]} | {error, term()}. +handle_data(#component_socket{module = Module, state = State}, Payload) -> + Module:socket_handle_data(State, Payload); +handle_data(_, _) -> + {error, bad_packet}. + +-spec activate(socket()) -> ok | {error, term()}. +activate(#component_socket{module = Module, state = State}) -> + Module:socket_activate(State). + +-spec close(socket()) -> ok. +close(#component_socket{module = Module, state = State}) -> + Module:socket_close(State). + +-spec send_xml(socket(), exml_stream:element() | [exml_stream:element()]) -> ok | {error, term()}. +send_xml(#component_socket{module = Module, state = State}, XML) -> + Module:socket_send_xml(State, XML). diff --git a/src/ejabberd_sup.erl b/src/ejabberd_sup.erl index 82f365ee44..f75ae75d68 100644 --- a/src/ejabberd_sup.erl +++ b/src/ejabberd_sup.erl @@ -63,8 +63,6 @@ init([]) -> template_supervisor_spec(ejabberd_s2s_in_sup, ejabberd_s2s_in), S2SOutSupervisor = template_supervisor_spec(ejabberd_s2s_out_sup, ejabberd_s2s_out), - ServiceSupervisor = - template_supervisor_spec(ejabberd_service_sup, ejabberd_service), IQSupervisor = template_supervisor_spec(ejabberd_iq_sup, mongoose_iq_worker), {ok, {{one_for_one, 10, 1}, @@ -83,7 +81,6 @@ init([]) -> C2SSupervisor, S2SInSupervisor, S2SOutSupervisor, - ServiceSupervisor, IQSupervisor, Listener, MucIQ, diff --git a/src/listeners/mongoose_listener.erl b/src/listeners/mongoose_listener.erl index 79c7fab0d6..e298f9cdec 100644 --- a/src/listeners/mongoose_listener.erl +++ b/src/listeners/mongoose_listener.erl @@ -68,6 +68,7 @@ stop_listener(Opts) -> instrumentation(Listeners) -> %% c2s instrumentation is shared between Bosh, Websockets and TCP listeners lists:usort([Spec || Listener <- Listeners, Spec <- listener_instrumentation(Listener)]) + ++ mongoose_component_listener:instrumentation() ++ mongoose_c2s:instrumentation(). -spec listener_instrumentation(options()) -> [mongoose_instrument:spec()]. diff --git a/src/mongoose_transport.erl b/src/mongoose_transport.erl index 31f35b7092..beaab8f653 100644 --- a/src/mongoose_transport.erl +++ b/src/mongoose_transport.erl @@ -1,7 +1,7 @@ %%%---------------------------------------------------------------------- %%% File : mongoose_transport.erl %%% Author : Piotr Nosek <piotr.nosek@erlang-solutions.com> -%%% Purpose : transport module for s2s and components connection +%%% Purpose : transport module for s2s connection %%% Created : 18 Jan 2017 %%%---------------------------------------------------------------------- @@ -26,7 +26,7 @@ -type peercert_return() :: no_peer_cert | {ok, #'Certificate'{}}. -type stanza_size() :: pos_integer() | infinity. --type connection_type() :: s2s | component | undefined. +-type connection_type() :: s2s | undefined. -type options() :: #{max_stanza_size := stanza_size(), hibernate_after := non_neg_integer(), @@ -182,10 +182,6 @@ send_text(SocketData, Data) -> send_element(#socket_data{connection_type = s2s} = SocketData, El) -> mongoose_instrument:execute(s2s_xmpp_element_size_out, #{}, #{byte_size => exml:xml_size(El)}), BinEl = exml:to_binary(El), - send_text(SocketData, BinEl); -send_element(#socket_data{connection_type = component} = SocketData, El) -> - mongoose_instrument:execute(component_xmpp_element_size_out, #{}, #{byte_size => exml:xml_size(El)}), - BinEl = exml:to_binary(El), send_text(SocketData, BinEl). -spec get_peer_certificate(socket_data()) -> mongoose_tls:cert(). @@ -413,9 +409,6 @@ process_data(Data, #state{parser = Parser, wrap_xml_elements_and_update_metrics(E, s2s) -> mongoose_instrument:execute(s2s_xmpp_element_size_in, #{}, #{byte_size => exml:xml_size(E)}), - wrap_xml(E); -wrap_xml_elements_and_update_metrics(E, component) -> - mongoose_instrument:execute(component_xmpp_element_size_in, #{}, #{byte_size => exml:xml_size(E)}), wrap_xml(E). wrap_xml(#xmlel{} = E) -> @@ -434,15 +427,7 @@ update_transport_metrics(Data, #{connection_type := s2s, direction := in, sockmo update_transport_metrics(Data, #{connection_type := s2s, direction := out, sockmod := gen_tcp}) -> mongoose_instrument:execute(s2s_tcp_data_out, #{}, #{byte_size => byte_size(Data)}); update_transport_metrics(Data, #{connection_type := s2s, direction := out, sockmod := mongoose_tls}) -> - mongoose_instrument:execute(s2s_tls_data_out, #{}, #{byte_size => byte_size(Data)}); -update_transport_metrics(Data, #{connection_type := component, direction := in, sockmod := gen_tcp}) -> - mongoose_instrument:execute(component_tcp_data_in, #{}, #{byte_size => byte_size(Data)}); -update_transport_metrics(Data, #{connection_type := component, direction := in, sockmod := mongoose_tls}) -> - mongoose_instrument:execute(component_tls_data_in, #{}, #{byte_size => byte_size(Data)}); -update_transport_metrics(Data, #{connection_type := component, direction := out, sockmod := gen_tcp}) -> - mongoose_instrument:execute(component_tcp_data_out, #{}, #{byte_size => byte_size(Data)}); -update_transport_metrics(Data, #{connection_type := component, direction := out, sockmod := mongoose_tls}) -> - mongoose_instrument:execute(component_tls_data_out, #{}, #{byte_size => byte_size(Data)}). + mongoose_instrument:execute(s2s_tls_data_out, #{}, #{byte_size => byte_size(Data)}). -spec maybe_pause(Delay :: non_neg_integer(), state()) -> any(). maybe_pause(_, #state{dest_pid = undefined}) ->