Skip to content

Commit

Permalink
Merge pull request #198 from emqx/241114-check-max_connections-config…
Browse files Browse the repository at this point in the history
…-against-ulimit-and-processes_limit

Check max connections config against ulimit and processes limit
  • Loading branch information
zmstone authored Nov 14, 2024
2 parents 55ff2e7 + 51ef1e0 commit d0c56a9
Show file tree
Hide file tree
Showing 7 changed files with 122 additions and 28 deletions.
20 changes: 20 additions & 0 deletions include/esockd.hrl
Original file line number Diff line number Diff line change
Expand Up @@ -71,4 +71,24 @@
-type ssl_option() :: ssl:ssl_option().
-endif. % OTP_RELEASE


-define(ERROR_MAXLIMIT, maxlimit).

-define(ARG_ACCEPTED, accepted).
-define(ARG_CLOSED_SYS_LIMIT, closed_sys_limit).
-define(ARG_CLOSED_MAX_LIMIT, closed_max_limit).
-define(ARG_CLOSED_OVERLOADED, closed_overloaded).
-define(ARG_CLOSED_RATE_LIMITED, closed_rate_limited).
-define(ARG_CLOSED_OTHER_REASONS, closed_other_reasons).

-define(ACCEPT_RESULT_GROUPS,
[
?ARG_ACCEPTED,
?ARG_CLOSED_SYS_LIMIT,
?ARG_CLOSED_MAX_LIMIT,
?ARG_CLOSED_OVERLOADED,
?ARG_CLOSED_RATE_LIMITED,
?ARG_CLOSED_OTHER_REASONS
]).

-endif. % ESOCKD_HRL
18 changes: 17 additions & 1 deletion src/esockd.erl
Original file line number Diff line number Diff line change
Expand Up @@ -411,7 +411,23 @@ parse_opt([_|Opts], Acc) ->
%% @doc System 'ulimit -n'
-spec(ulimit() -> pos_integer()).
ulimit() ->
proplists:get_value(max_fds, hd(erlang:system_info(check_io))).
find_max_fd(erlang:system_info(check_io)).

find_max_fd([]) ->
%% Magic!
%% According to Erlang/OTP doc, erlang:system_info(check_io)):
%% Returns a list containing miscellaneous information about the emulators
%% internal I/O checking. Notice that the content of the returned list can
%% vary between platforms and over time. It is only guaranteed that a list
%% is returned.
1023;
find_max_fd([CheckIoResult | More]) ->
case lists:keyfind(max_fds, 1, CheckIoResult) of
{max_fds, N} when is_integer(N) andalso N > 0 ->
N;
_ ->
find_max_fd(More)
end.

-spec(to_string(listen_on()) -> string()).
to_string(Port) when is_integer(Port) ->
Expand Down
13 changes: 7 additions & 6 deletions src/esockd_acceptor.erl
Original file line number Diff line number Diff line change
Expand Up @@ -320,12 +320,13 @@ inc_stats(#state{proto = Proto, listen_on = ListenOn}, Tag) ->
_ = esockd_server:inc_stats({Proto, ListenOn}, Counter, 1),
ok.

counter(accepted) -> accepted;
counter(emfile) -> closed_sys_limit;
counter(enfile) -> closed_sys_limit;
counter(overloaded) -> closed_overloaded;
counter(rate_limited) -> closed_rate_limited;
counter(_) -> closed_other_reasons.
counter(accepted) -> ?ARG_ACCEPTED;
counter(emfile) -> ?ARG_CLOSED_SYS_LIMIT;
counter(enfile) -> ?ARG_CLOSED_SYS_LIMIT;
counter(?ERROR_MAXLIMIT) -> ?ARG_CLOSED_MAX_LIMIT;
counter(overloaded) -> ?ARG_CLOSED_OVERLOADED;
counter(rate_limited) -> ?ARG_CLOSED_RATE_LIMITED;
counter(_) -> ?ARG_CLOSED_OTHER_REASONS.

start_connection(ConnSup, Sock, UpgradeFuns) when is_pid(ConnSup) ->
esockd_connection_sup:start_connection(ConnSup, Sock, UpgradeFuns);
Expand Down
43 changes: 38 additions & 5 deletions src/esockd_connection_sup.erl
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@
, code_change/3
]).

-include("esockd.hrl").

-type(shutdown() :: brutal_kill | infinity | pos_integer()).

-type option() :: {shutdown, shutdown()}
Expand All @@ -61,7 +63,6 @@
mfargs :: esockd:mfargs()
}).

-define(DEFAULT_MAX_CONNS, 1024).
-define(TRANSPORT, esockd_transport).
-define(ERROR_MSG(Format, Args),
error_logger:error_msg("[~s] " ++ Format, [?MODULE | Args])).
Expand Down Expand Up @@ -149,7 +150,7 @@ call(Sup, Req) ->
init(Opts) ->
process_flag(trap_exit, true),
Shutdown = get_value(shutdown, Opts, brutal_kill),
MaxConns = get_value(max_connections, Opts, ?DEFAULT_MAX_CONNS),
MaxConns = resolve_max_connections(get_value(max_connections, Opts)),
RawRules = get_value(access_rules, Opts, [{allow, all}]),
AccessRules = [esockd_access:compile(Rule) || Rule <- RawRules],
MFA = get_value(connection_mfargs, Opts),
Expand All @@ -162,7 +163,7 @@ init(Opts) ->
handle_call({start_connection, _Sock}, _From,
State = #state{curr_connections = Conns, max_connections = MaxConns})
when map_size(Conns) >= MaxConns ->
{reply, {error, maxlimit}, State};
{reply, {error, ?ERROR_MAXLIMIT}, State};

handle_call({start_connection, Sock}, _From,
State = #state{curr_connections = Conns, access_rules = Rules, mfargs = MFA}) ->
Expand Down Expand Up @@ -207,7 +208,10 @@ handle_call({add_rule, RawRule}, _From, State = #state{access_rules = Rules}) ->
end
catch
error:Reason ->
error_logger:error_msg("Bad access rule: ~p, compile errro: ~p", [RawRule, Reason]),
logger:log(error, #{msg => "bad_access_rule",
rule => RawRule,
compile_error => Reason
}),
{reply, {error, bad_access_rule}, State}
end;

Expand Down Expand Up @@ -283,7 +287,12 @@ get_state_option(connection_mfargs, #state{mfargs = MFA}) ->
MFA.

set_state_option({max_connections, MaxConns}, State) ->
State#state{max_connections = MaxConns};
case resolve_max_connections(MaxConns) of
MaxConns ->
State#state{max_connections = MaxConns};
_ ->
{error, bad_max_connections}
end;
set_state_option({shutdown, Shutdown}, State) ->
State#state{shutdown = Shutdown};
set_state_option({access_rules, Rules}, State) ->
Expand Down Expand Up @@ -453,3 +462,27 @@ log(Level, Error, Reason, Pid, #state{mfargs = MFA}) ->
get_module({M, _F, _A}) -> M;
get_module({M, _F}) -> M;
get_module(M) -> M.

resolve_max_connections(Desired) ->
MaxFds = esockd:ulimit(),
MaxProcs = erlang:system_info(process_limit),
resolve_max_connections(Desired, MaxFds, MaxProcs).

resolve_max_connections(undefined, MaxFds, MaxProcs) ->
%% not configured
min(MaxFds, MaxProcs);
resolve_max_connections(Desired, MaxFds, MaxProcs) when is_integer(Desired) ->
Res = lists:min([Desired, MaxFds, MaxProcs]),
case Res < Desired of
true ->
logger:log(error,
#{msg => "max_connections_config_ignored",
max_fds => MaxFds,
max_processes => MaxProcs,
desired => Desired
}
);
false ->
ok
end,
Res.
9 changes: 3 additions & 6 deletions src/esockd_server.erl
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@
, code_change/3
]).

-include("esockd.hrl").

-record(state, {
listener_props :: #{esockd:listener_ref() => #{_Name => _Value}}
}).
Expand Down Expand Up @@ -97,12 +99,7 @@ del_stats({Protocol, ListenOn}) ->

-spec ensure_stats({atom(), esockd:listen_on()}) -> ok.
ensure_stats(StatsKey) ->
Stats = [accepted,
closed_sys_limit,
closed_overloaded,
closed_rate_limited,
closed_other_reasons],
ok = ?MODULE:init_stats(StatsKey, Stats),
ok = ?MODULE:init_stats(StatsKey, ?ACCEPT_RESULT_GROUPS),
ok.

-spec get_listener_prop(esockd:listener_ref(), _Name) -> _Value | undefined.
Expand Down
15 changes: 14 additions & 1 deletion test/esockd_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -222,9 +222,22 @@ t_get_set_max_connections(_) ->
?assertEqual(16, esockd:get_max_connections({udp_echo, 7000})),
ok = esockd:close(udp_echo, 7000).

t_get_set_invalid_max_connections(_) ->
MaxFd = esockd:ulimit(),
MaxProcs = erlang:system_info(process_limit),
Invalid = max(MaxFd, MaxProcs) + 1,
{ok, _LSup} = esockd:open(echo, 7000, [{connection_mfargs, echo_server}]),
Expected = min(MaxFd, MaxProcs),
?assertEqual(Expected, esockd:get_max_connections({echo, 7000})),
esockd:set_max_connections({echo, 7000}, 2),
?assertEqual(2, esockd:get_max_connections({echo, 7000})),
esockd:set_max_connections({echo, 7000}, Invalid),
?assertEqual(2, esockd:get_max_connections({echo, 7000})),
ok = esockd:close(echo, 7000).

t_get_set_max_conn_rate(_) ->
LimiterOpt = #{module => esockd_limiter, capacity => 100, interval => 1},
{ok, _LSup} = esockd:open(echo, 7000,
{ok, _LSup} = esockd:open(echo, 7000,
[{limiter, LimiterOpt}, {connection_mfargs, echo_server}]),
?assertEqual({100, 1}, esockd:get_max_conn_rate({echo, 7000})),
esockd:set_max_conn_rate({echo, 7000}, LimiterOpt#{capacity := 50, interval := 2}),
Expand Down
32 changes: 23 additions & 9 deletions test/esockd_acceptor_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,13 @@
-define(COUNTER_OVERLOADED, 2).
-define(COUNTER_RATE_LIMITED, 3).
-define(COUNTER_SYS_LIMIT, 4).
-define(COUNTER_OTHER_REASONS, 5).
-define(COUNTER_MAX_LIMIT, 5).
-define(COUNTER_OTHER_REASONS, 6).
-define(COUNTER_LAST, 10).

counter_tag_to_index(accepted) -> ?COUNTER_ACCPETED;
counter_tag_to_index(closed_sys_limit) -> ?COUNTER_SYS_LIMIT;
counter_tag_to_index(closed_max_limit) -> ?COUNTER_MAX_LIMIT;
counter_tag_to_index(closed_overloaded) -> ?COUNTER_OVERLOADED;
counter_tag_to_index(closed_rate_limited) -> ?COUNTER_RATE_LIMITED;
counter_tag_to_index(closed_other_reasons) -> ?COUNTER_OTHER_REASONS.
Expand Down Expand Up @@ -90,7 +92,7 @@ connect(Port, Timeout, Opts0) ->
%% This is the very basic test, if this fails, nothing elese matters.
t_normal(Config) ->
Port = ?PORT,
Server = start(Port, no_limit()),
Server = start(Port, no_rate_limit()),
{ok, ClientSock} = connect(Port),
try
ok = wait_for_counter(Config, ?COUNTER_ACCPETED, 1, 2000)
Expand Down Expand Up @@ -129,7 +131,7 @@ t_rate_limitted(Config) ->
%% Failed to spawn new connection process
t_error_when_spawn(Config) ->
Port = ?PORT,
Server = start(Port, no_limit(), #{start_connection_result => {error, overloaded}}),
Server = start(Port, no_rate_limit(), #{start_connection_result => {error, overloaded}}),
{ok, Sock1} = connect(Port),
try
ok = wait_for_counter(Config, ?COUNTER_OVERLOADED, 1, 2000),
Expand All @@ -141,7 +143,7 @@ t_error_when_spawn(Config) ->
%% Failed to tune the socket opts
t_einval(Config) ->
Port = ?PORT,
Server = start(Port, no_limit(), #{tune_fun => {fun(_) -> {error, einval} end, []}}),
Server = start(Port, no_rate_limit(), #{tune_fun => {fun(_) -> {error, einval} end, []}}),
{ok, Sock1} = connect(Port),
try
ok = wait_for_counter(Config, ?COUNTER_OTHER_REASONS, 1, 2000),
Expand All @@ -157,7 +159,7 @@ t_sys_limit(Config) ->
meck:new(prim_inet, [passthrough, no_history, unstick]),
meck:expect(prim_inet, async_accept, fun(_, _) -> {error, emfile} end),
Port = ?PORT,
Server = start(Port, no_limit()),
Server = start(Port, no_rate_limit()),
try
%% acceptor to enter suspending state after started
%% because async_accept always returns {error, emfile}
Expand All @@ -178,9 +180,21 @@ t_sys_limit(Config) ->
stop(Server)
end.

%% Failed to spawn new connection process
t_max_limit(Config) ->
Port = ?PORT,
Server = start(Port, no_rate_limit(), #{start_connection_result => {error, ?ERROR_MAXLIMIT}}),
{ok, Sock1} = connect(Port),
try
ok = wait_for_counter(Config, ?COUNTER_MAX_LIMIT, 1, 2000),
disconnect(Sock1)
after
stop(Server)
end.

t_close_listener_socket_cause_acceptor_stop(_Config) ->
Port = ?PORT,
#{acceptor := Acceptor, lsock := LSock} = start(Port, no_limit()),
#{acceptor := Acceptor, lsock := LSock} = start(Port, no_rate_limit()),
Mref = monitor(process, Acceptor),
unlink(Acceptor),
unlink(LSock),
Expand Down Expand Up @@ -239,8 +253,8 @@ pause_then_allow(Pause) ->
}.

%% make a no-limit limiter
no_limit() ->
#{module => ?MODULE, name => no_limit}.
no_rate_limit() ->
#{module => ?MODULE, name => no_rate_limit}.

%% limiter callback
consume(_Token, #{name := pause_then_allow} = Limiter) ->
Expand All @@ -250,7 +264,7 @@ consume(_Token, #{name := pause_then_allow} = Limiter) ->
#{current := allow} ->
{ok, Limiter}
end;
consume(_Token, #{name := no_limit} = Limiter) ->
consume(_Token, #{name := no_rate_limit} = Limiter) ->
{ok, Limiter}.

now_ts() -> erlang:system_time(millisecond).
Expand Down

0 comments on commit d0c56a9

Please sign in to comment.