Skip to content

Commit

Permalink
Implement components as a gen_statem over ranch
Browse files Browse the repository at this point in the history
  • Loading branch information
NelsonVides committed Dec 18, 2024
1 parent fd3ebe1 commit 95a96b4
Show file tree
Hide file tree
Showing 9 changed files with 629 additions and 36 deletions.
16 changes: 12 additions & 4 deletions big_tests/tests/component_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -125,12 +125,14 @@ register_one_component(Config) ->
{Component, ComponentAddr, _} = component_helper:connect_component(CompOpts),
% start stream reply
instrument_helper:assert(component_xmpp_element_size_out, #{}, fun(#{byte_size := S}) -> S > 0 end,
#{expected_count => 1, min_timestamp => TS}),
#{expected_count => 2, min_timestamp => TS}),
% 1. start stream, 2. component handshake
instrument_helper:assert(component_xmpp_element_size_in, #{}, fun(#{byte_size := S}) -> S > 0 end,
#{expected_count => 2, min_timestamp => TS}),
instrument_helper:assert(component_tcp_data_in, #{}, fun(#{byte_size := S}) -> S > 0 end,
#{expected_count => 2, min_timestamp => TS}),
instrument_helper:assert(component_auth_failed, #{}, fun(#{byte_size := S}) -> S > 0 end,
#{expected_count => 0, min_timestamp => TS}),
% 1. start stream reply, 2. handshake reply
instrument_helper:assert(component_tcp_data_out, #{}, fun(#{byte_size := S}) -> S > 0 end,
#{expected_count => 2, min_timestamp => TS}),
Expand All @@ -144,6 +146,10 @@ register_one_component(Config) ->
% Reply to Alice
instrument_helper:assert(component_xmpp_element_size_in, #{}, fun(#{byte_size := S}) -> S > 0 end,
#{expected_count => 1, min_timestamp => TS1}),
instrument_helper:assert(component_element_in, #{}, fun(_) -> true end,
#{expected_count => 1, min_timestamp => TS1}),
instrument_helper:assert(component_element_out, #{}, fun(_) -> true end,
#{expected_count => 1, min_timestamp => TS1}),

component_helper:disconnect_component(Component, ComponentAddr).

Expand Down Expand Up @@ -243,16 +249,18 @@ register_two_components(Config) ->

try_registering_with_wrong_password(Config) ->
%% Given a component with a wrong password
TS = instrument_helper:timestamp(),
CompOpts1 = ?config(component1, Config),
CompOpts2 = lists:keyreplace(password, 1, CompOpts1,
{password, <<"wrong_one">>}),
CompOpts2 = lists:keyreplace(password, 1, CompOpts1, {password, <<"wrong_one">>}),
try
%% When trying to connect it
{Comp, Addr, _} = component_helper:connect_component(CompOpts2),
component_helper:disconnect_component(Comp, Addr),
ct:fail("component connected successfully with wrong password")
catch {stream_error, _E} ->
%% Then it should fail to do so
instrument_helper:assert(component_auth_failed, #{}, fun(_) -> true end,
#{expected_count => 1, min_timestamp => TS}),
ok
end.

Expand Down Expand Up @@ -559,7 +567,7 @@ restore_domain(Config) ->
Config.

events() ->
instrument_helper:declared_events(ejabberd_service, [#{connection_type => component}]).
instrument_helper:declared_events(mongoose_component_listener, []).

%%--------------------------------------------------------------------
%% Stanzas
Expand Down
484 changes: 484 additions & 0 deletions src/component/mongoose_component_connection.erl

Large diffs are not rendered by default.

29 changes: 19 additions & 10 deletions src/component/mongoose_component_listener.erl
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
-module(mongoose_component_listener).

-behaviour(mongoose_listener).
-export([start_listener/1, instrumentation/1]).
-export([start_listener/1, instrumentation/0]).

-behaviour(ranch_protocol).
-export([start_link/3]).
Expand All @@ -19,14 +19,22 @@
-export_type([conflict_behaviour/0, options/0]).

%% mongoose_listener
-spec instrumentation(options()) -> [mongoose_instrument:spec()].
instrumentation(#{connection_type := component} = _Opts) ->
[{component_xmpp_element_size_in, #{}, #{metrics => #{byte_size => histogram}}},
{component_xmpp_element_size_out, #{}, #{metrics => #{byte_size => histogram}}},
{component_tcp_data_in, #{}, #{metrics => #{byte_size => spiral}}},
{component_tls_data_in, #{}, #{metrics => #{byte_size => spiral}}},

-spec instrumentation() -> [mongoose_instrument:spec()].
instrumentation() ->
[{component_tcp_data_in, #{}, #{metrics => #{byte_size => spiral}}},
{component_tcp_data_out, #{}, #{metrics => #{byte_size => spiral}}},
{component_tls_data_out, #{}, #{metrics => #{byte_size => spiral}}}].
{component_xmpp_element_size_out, #{}, #{metrics => #{byte_size => histogram}}},
{component_xmpp_element_size_in, #{}, #{metrics => #{byte_size => histogram}}},
{component_auth_failed, #{}, #{metrics => #{count => spiral}}},
{component_element_in, #{},
#{metrics => maps:from_list([{Metric, spiral} || Metric <- element_spirals()])}},
{component_element_out, #{},
#{metrics => maps:from_list([{Metric, spiral} || Metric <- element_spirals()])}}].

element_spirals() ->
[count, stanza_count, message_count, iq_count, presence_count,
error_count, message_error_count, iq_error_count, presence_error_count].

-spec start_listener(options()) -> ok.
start_listener(#{module := Module} = Opts) when is_atom(Module) ->
Expand All @@ -36,5 +44,6 @@ start_listener(#{module := Module} = Opts) when is_atom(Module) ->
mongoose_listener_sup:start_child(ChildSpec).

%% ranch_protocol
start_link(Ref, Transport, Opts) ->
ejabberd_service:start_link({Transport, Ref, Opts}, Opts).
start_link(Ref, Transport, Opts = #{hibernate_after := HibernateAfterTimeout}) ->
ProcessOpts = [{hibernate_after, HibernateAfterTimeout}],
mongoose_component_connection:start_link({mongoose_component_ranch, {Transport, Ref}, Opts}, ProcessOpts).
2 changes: 1 addition & 1 deletion src/component/mongoose_component_packet_handler.erl
Original file line number Diff line number Diff line change
Expand Up @@ -8,5 +8,5 @@
mongoose_acc:t(), jid:jid(), jid:jid(), exml:element(), #{pid := pid()}) ->
mongoose_acc:t().
process_packet(Acc, _From, _To, _El, #{pid := Pid}) ->
Pid ! {route, Acc},
mongoose_component_connection:route(Pid, Acc),
Acc.
62 changes: 62 additions & 0 deletions src/component/mongoose_component_ranch.erl
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
-module(mongoose_component_ranch).

-behaviour(mongoose_component_socket).

-export([socket_new/2,
socket_peername/1,
socket_handle_data/2,
socket_activate/1,
socket_close/1,
socket_send_xml/2
]).

-record(state, {
ranch_ref :: ranch:ref(),
socket :: ranch_transport:socket(),
ip :: {inet:ip_address(), inet:port_number()}
}).

-type state() :: #state{}.

-spec socket_new(term(), mongoose_listener:options()) -> state().
socket_new({ranch_tcp, RanchRef}, _Opts) ->
{ok, TcpSocket} = ranch:handshake(RanchRef),
{ok, Ip} = ranch_tcp:peername(TcpSocket),
#state{
ranch_ref = RanchRef,
socket = TcpSocket,
ip = Ip}.

-spec socket_peername(state()) -> {inet:ip_address(), inet:port_number()}.
socket_peername(#state{ip = Ip}) ->
Ip.

Check warning on line 32 in src/component/mongoose_component_ranch.erl

View check run for this annotation

Codecov / codecov/patch

src/component/mongoose_component_ranch.erl#L32

Added line #L32 was not covered by tests

-spec socket_handle_data(state(), {tcp, term(), iodata()}) ->
iodata() | {raw, [exml:element()]} | {error, term()}.
socket_handle_data(#state{socket = Socket}, {tcp, Socket, Data}) ->
mongoose_instrument:execute(component_tcp_data_in, #{}, #{byte_size => byte_size(Data)}),
Data.

-spec socket_activate(state()) -> ok.
socket_activate(#state{socket = Socket}) ->
ranch_tcp:setopts(Socket, [{active, once}]).

-spec socket_close(state()) -> ok.
socket_close(#state{socket = Socket}) ->
ranch_tcp:close(Socket).

-spec socket_send_xml(state(), iodata() | exml_stream:element() | [exml_stream:element()]) ->
ok | {error, term()}.
socket_send_xml(#state{socket = Socket}, XML) ->
Text = exml:to_iolist(XML),
case send(Socket, Text) of
ok ->
ok;
Error ->
Error

Check warning on line 56 in src/component/mongoose_component_ranch.erl

View check run for this annotation

Codecov / codecov/patch

src/component/mongoose_component_ranch.erl#L56

Added line #L56 was not covered by tests
end.

-spec send(ranch_transport:socket(), iodata()) -> ok | {error, term()}.
send(Socket, Data) ->
mongoose_instrument:execute(component_tcp_data_out, #{}, #{byte_size => iolist_size(Data)}),
ranch_tcp:send(Socket, Data).
47 changes: 47 additions & 0 deletions src/component/mongoose_component_socket.erl
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
-module(mongoose_component_socket).

-export([new/3, handle_data/2, activate/1, close/1, send_xml/2]).

-callback socket_new(term(), mongoose_c2s:listener_opts()) -> state().
-callback socket_peername(state()) -> {inet:ip_address(), inet:port_number()}.
-callback socket_handle_data(state(), {tcp, term(), iodata()}) ->
iodata() | {raw, [exml:element()]} | {error, term()}.
-callback socket_activate(state()) -> ok.
-callback socket_close(state()) -> ok.
-callback socket_send_xml(state(), iodata() | exml_stream:element() | [exml_stream:element()]) ->
ok | {error, term()}.

-record(component_socket, {module :: module(),
state :: state()}).
-type socket() :: #component_socket{}.
-type state() :: term().
-type conn_type() :: component.
-export_type([socket/0, state/0, conn_type/0]).

-spec new(module(), term(), mongoose_listener:options()) -> socket().
new(Module, SocketOpts, LOpts) ->
State = Module:socket_new(SocketOpts, LOpts),
C2SSocket = #component_socket{
module = Module,
state = State},
activate(C2SSocket),
C2SSocket.

-spec handle_data(socket(), {tcp, term(), iodata()}) ->
iodata() | {raw, [term()]} | {error, term()}.
handle_data(#component_socket{module = Module, state = State}, Payload) ->
Module:socket_handle_data(State, Payload);
handle_data(_, _) ->
{error, bad_packet}.

Check warning on line 35 in src/component/mongoose_component_socket.erl

View check run for this annotation

Codecov / codecov/patch

src/component/mongoose_component_socket.erl#L35

Added line #L35 was not covered by tests

-spec activate(socket()) -> ok | {error, term()}.
activate(#component_socket{module = Module, state = State}) ->
Module:socket_activate(State).

-spec close(socket()) -> ok.
close(#component_socket{module = Module, state = State}) ->
Module:socket_close(State).

-spec send_xml(socket(), exml_stream:element() | [exml_stream:element()]) -> ok | {error, term()}.
send_xml(#component_socket{module = Module, state = State}, XML) ->
Module:socket_send_xml(State, XML).
3 changes: 0 additions & 3 deletions src/ejabberd_sup.erl
Original file line number Diff line number Diff line change
Expand Up @@ -63,8 +63,6 @@ init([]) ->
template_supervisor_spec(ejabberd_s2s_in_sup, ejabberd_s2s_in),
S2SOutSupervisor =
template_supervisor_spec(ejabberd_s2s_out_sup, ejabberd_s2s_out),
ServiceSupervisor =
template_supervisor_spec(ejabberd_service_sup, ejabberd_service),
IQSupervisor =
template_supervisor_spec(ejabberd_iq_sup, mongoose_iq_worker),
{ok, {{one_for_one, 10, 1},
Expand All @@ -83,7 +81,6 @@ init([]) ->
C2SSupervisor,
S2SInSupervisor,
S2SOutSupervisor,
ServiceSupervisor,
IQSupervisor,
Listener,
MucIQ,
Expand Down
1 change: 1 addition & 0 deletions src/listeners/mongoose_listener.erl
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ stop_listener(Opts) ->
instrumentation(Listeners) ->
%% c2s instrumentation is shared between Bosh, Websockets and TCP listeners
lists:usort([Spec || Listener <- Listeners, Spec <- listener_instrumentation(Listener)])
++ mongoose_component_listener:instrumentation()
++ mongoose_c2s:instrumentation().

-spec listener_instrumentation(options()) -> [mongoose_instrument:spec()].
Expand Down
21 changes: 3 additions & 18 deletions src/mongoose_transport.erl
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
%%%----------------------------------------------------------------------
%%% File : mongoose_transport.erl
%%% Author : Piotr Nosek <[email protected]>
%%% Purpose : transport module for s2s and components connection
%%% Purpose : transport module for s2s connection
%%% Created : 18 Jan 2017
%%%----------------------------------------------------------------------

Expand All @@ -26,7 +26,7 @@
-type peercert_return() :: no_peer_cert | {ok, #'Certificate'{}}.

-type stanza_size() :: pos_integer() | infinity.
-type connection_type() :: s2s | component | undefined.
-type connection_type() :: s2s | undefined.

-type options() :: #{max_stanza_size := stanza_size(),
hibernate_after := non_neg_integer(),
Expand Down Expand Up @@ -182,10 +182,6 @@ send_text(SocketData, Data) ->
send_element(#socket_data{connection_type = s2s} = SocketData, El) ->
mongoose_instrument:execute(s2s_xmpp_element_size_out, #{}, #{byte_size => exml:xml_size(El)}),
BinEl = exml:to_binary(El),
send_text(SocketData, BinEl);
send_element(#socket_data{connection_type = component} = SocketData, El) ->
mongoose_instrument:execute(component_xmpp_element_size_out, #{}, #{byte_size => exml:xml_size(El)}),
BinEl = exml:to_binary(El),
send_text(SocketData, BinEl).

-spec get_peer_certificate(socket_data()) -> mongoose_tls:cert().
Expand Down Expand Up @@ -413,9 +409,6 @@ process_data(Data, #state{parser = Parser,

wrap_xml_elements_and_update_metrics(E, s2s) ->
mongoose_instrument:execute(s2s_xmpp_element_size_in, #{}, #{byte_size => exml:xml_size(E)}),
wrap_xml(E);
wrap_xml_elements_and_update_metrics(E, component) ->
mongoose_instrument:execute(component_xmpp_element_size_in, #{}, #{byte_size => exml:xml_size(E)}),
wrap_xml(E).

wrap_xml(#xmlel{} = E) ->
Expand All @@ -434,15 +427,7 @@ update_transport_metrics(Data, #{connection_type := s2s, direction := in, sockmo
update_transport_metrics(Data, #{connection_type := s2s, direction := out, sockmod := gen_tcp}) ->
mongoose_instrument:execute(s2s_tcp_data_out, #{}, #{byte_size => byte_size(Data)});
update_transport_metrics(Data, #{connection_type := s2s, direction := out, sockmod := mongoose_tls}) ->
mongoose_instrument:execute(s2s_tls_data_out, #{}, #{byte_size => byte_size(Data)});
update_transport_metrics(Data, #{connection_type := component, direction := in, sockmod := gen_tcp}) ->
mongoose_instrument:execute(component_tcp_data_in, #{}, #{byte_size => byte_size(Data)});
update_transport_metrics(Data, #{connection_type := component, direction := in, sockmod := mongoose_tls}) ->
mongoose_instrument:execute(component_tls_data_in, #{}, #{byte_size => byte_size(Data)});
update_transport_metrics(Data, #{connection_type := component, direction := out, sockmod := gen_tcp}) ->
mongoose_instrument:execute(component_tcp_data_out, #{}, #{byte_size => byte_size(Data)});
update_transport_metrics(Data, #{connection_type := component, direction := out, sockmod := mongoose_tls}) ->
mongoose_instrument:execute(component_tls_data_out, #{}, #{byte_size => byte_size(Data)}).
mongoose_instrument:execute(s2s_tls_data_out, #{}, #{byte_size => byte_size(Data)}).

-spec maybe_pause(Delay :: non_neg_integer(), state()) -> any().
maybe_pause(_, #state{dest_pid = undefined}) ->
Expand Down

0 comments on commit 95a96b4

Please sign in to comment.