Skip to content

Commit

Permalink
feat: add more close counters
Browse files Browse the repository at this point in the history
  • Loading branch information
zmstone committed Nov 11, 2024
1 parent ceb4ef9 commit 78a8022
Show file tree
Hide file tree
Showing 3 changed files with 67 additions and 31 deletions.
73 changes: 51 additions & 22 deletions src/esockd_acceptor.erl
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2020 EMQ Technologies Co., Ltd. All Rights Reserved.
%% Copyright (c) 2020-2024 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -59,7 +59,7 @@
tune_fun :: esockd:sock_fun(),
upgrade_funs :: [esockd:sock_fun()],
conn_limiter :: undefined | esockd_generic_limiter:limiter(),
conn_sup :: pid(),
conn_sup :: pid() | {function(), list()},
accept_ref = no_ref :: term()
}).

Expand Down Expand Up @@ -125,31 +125,36 @@ handle_event(internal, begin_waiting, waiting, #state{accept_ref = Ref}) when Re
%% started waiting in suspending state
keep_state_and_data;
handle_event(internal, begin_waiting, waiting, State = #state{lsock = LSock, accept_ref = no_ref}) ->
case prim_inet:async_accept(LSock, -1) of
case async_accept(LSock) of
{ok, Ref} ->
{keep_state, State#state{accept_ref = Ref}};
{error, Reason} when
Reason =:= emfile;
Reason =:= enfile
->
inc_stats(State, Reason),
log_system_limit(State, Reason),
start_suspending(State, 1000);
{error, econnaborted} ->
inc_stats(State, econnaborted),
{next_state, waiting, State, {next_event, internal, begin_waiting}};
{error, closed} ->
{stop, normal, State};
{error, Reason} ->
{stop, Reason, State}
end;
handle_event(internal, accept_and_close, suspending, State = #state{lsock = LSock}) ->
case prim_inet:async_accept(LSock, -1) of
case async_accept(LSock) of
{ok, Ref} ->
{keep_state, State#state{accept_ref = Ref}};
{error, Reason} when
Reason =:= emfile;
Reason =:= enfile
->
inc_stats(State, Reason),
{keep_state_and_data, {next_event, internal, accept_and_close}};
{error, econnaborted} ->
inc_stats(State, econnaborted),
{keep_state_and_data, {next_event, internal, accept_and_close}};
{error, closed} ->
{stop, normal, State};
Expand All @@ -171,6 +176,7 @@ handle_event(
State = #state{lsock = LSock, accept_ref = Ref}
) ->
_ = close(Sock),
inc_stats(State, rate_limitted),
NextEvent = {next_event, internal, accept_and_close},
{keep_state, State#state{accept_ref = no_ref}, NextEvent};
handle_event(
Expand All @@ -182,15 +188,14 @@ handle_event(
{next_event, internal, {accept, Sock}}};
{pause, PauseTime, Limiter2} ->
_ = close(Sock),
inc_stats(State, rate_limitted),
start_suspending(State#state{conn_limiter = Limiter2}, PauseTime)
end;
handle_event(
internal,
{accept, Sock},
accepting,
State = #state{
proto = Proto,
listen_on = ListenOn,
sockmod = SockMod,
tune_fun = TuneFun,
upgrade_funs = UpgradeFuns,
Expand All @@ -199,22 +204,23 @@ handle_event(
) ->
%% make it look like gen_tcp:accept
inet_db:register_socket(Sock, SockMod),

%% Inc accepted stats.
_ = esockd_server:inc_stats({Proto, ListenOn}, accepted, 1),

case eval_tune_socket_fun(TuneFun, Sock) of
{ok, Sock} ->
case esockd_connection_sup:start_connection(ConnSup, Sock, UpgradeFuns) of
{ok, NewSock} ->
case start_connection(ConnSup, NewSock, UpgradeFuns) of
{ok, _Pid} ->
%% Inc accepted stats.
inc_stats(State, accepted),
ok;
{error, Reason} ->
handle_accept_error(Reason, "failed_to_start_connection_process", State),
close(Sock)
close(NewSock),
inc_stats(State, Reason)
end;
{error, Reason} ->
handle_accept_error(Reason, "failed_to_apply_tune_funcs", State),
close(Sock)
{error, _Reason} ->
%% the socket became invalid before
%% starting the owner process
close(Sock),
inc_stats(State, closed_nostart)
end,
{next_state, waiting, State, {next_event, internal, begin_waiting}};
handle_event(state_timeout, begin_waiting, suspending, State) ->
Expand All @@ -225,6 +231,7 @@ handle_event(
StateName,
State = #state{lsock = LSock, accept_ref = Ref}
) ->
inc_stats(State, Reason),
handle_socket_error(Reason, State#state{accept_ref = no_ref}, StateName);
handle_event(Type, Content, StateName, _) ->
logger:log(warning, #{msg => "esockd_acceptor_unhandled_event",
Expand Down Expand Up @@ -252,7 +259,7 @@ code_change(_OldVsn, StateName, State, _Extra) ->
close(Sock) ->
try
%% port-close leads to a TPC reset which cuts out the tcp graceful close overheads
true = port_close(Sock),
_ = port_close(Sock),
receive {'EXIT', Sock, _} -> ok after 1 -> ok end
catch
error:_ -> ok
Expand All @@ -265,8 +272,7 @@ handle_accept_error(enotconn, _, _) ->
ok;
handle_accept_error(einval, _, _) ->
ok;
handle_accept_error(overloaded, _, #state{proto = Proto, listen_on = ListenOn}) ->
_ = esockd_server:inc_stats({Proto, ListenOn}, closed_overloaded, 1),
handle_accept_error(overloaded, _, _) ->
ok;
handle_accept_error(Reason, Msg, #state{sockname = Sockname}) ->
logger:log(error, #{msg => Msg,
Expand Down Expand Up @@ -300,11 +306,34 @@ explain_posix(enfile) ->
"ENFILE (File table overflow)".

log_system_limit(State, Reason) ->
logger:log(error, #{msg => "cannot_accept_more_connections",
listener => esockd:format(State#state.sockname),
cause => explain_posix(Reason)}).
logger:log(critical,
#{msg => "cannot_accept_more_connections",
listener => esockd:format(State#state.sockname),
cause => explain_posix(Reason)}).

start_suspending(State, Timeout) ->
Actions = [{next_event, internal, accept_and_close},
{state_timeout, Timeout, begin_waiting}],
{next_state, suspending, State, Actions}.

inc_stats(#state{proto = Proto, listen_on = ListenOn}, Tag) ->
Counter = counter(Tag),
_ = esockd_server:inc_stats({Proto, ListenOn}, Counter, 1),
ok.

counter(accepted) -> accepted;
counter(closed_nostart) -> closed_nostart;
counter(emfile) -> closed_overloaded;
counter(enfile) -> closed_overloaded;
counter(overloaded) -> closed_overloaded;
counter(rate_limitted) -> closed_rate_limitted;
counter(_) -> closed_other_reasons.

start_connection(ConnSup, Sock, UpgradeFuns) when is_pid(ConnSup) ->
esockd_connection_sup:start_connection(ConnSup, Sock, UpgradeFuns);
start_connection({F, A}, Sock, UpgradeFuns) when is_function(F) ->
%% only in tests so far
apply(F, A ++ [Sock, UpgradeFuns]).

async_accept(LSock) ->
prim_inet:async_accept(LSock, -1).
23 changes: 15 additions & 8 deletions src/esockd_server.erl
Original file line number Diff line number Diff line change
Expand Up @@ -66,14 +66,14 @@ stop() -> gen_server:stop(?SERVER).

-spec(stats_fun({atom(), esockd:listen_on()}, atom()) -> fun()).
stats_fun({Protocol, ListenOn}, Metric) ->
init_stats({Protocol, ListenOn}, Metric),
init_stats({Protocol, ListenOn}, [Metric]),
fun({inc, Num}) -> esockd_server:inc_stats({Protocol, ListenOn}, Metric, Num);
({dec, Num}) -> esockd_server:dec_stats({Protocol, ListenOn}, Metric, Num)
end.

-spec(init_stats({atom(), esockd:listen_on()}, atom()) -> ok).
init_stats({Protocol, ListenOn}, Metric) ->
gen_server:call(?SERVER, {init, {Protocol, ListenOn}, Metric}).
-spec(init_stats({atom(), esockd:listen_on()}, [atom()]) -> ok).
init_stats({Protocol, ListenOn}, Metrics) ->
gen_server:call(?SERVER, {init, {Protocol, ListenOn}, Metrics}).

-spec(get_stats({atom(), esockd:listen_on()}) -> [{atom(), non_neg_integer()}]).
get_stats({Protocol, ListenOn}) ->
Expand All @@ -97,8 +97,13 @@ del_stats({Protocol, ListenOn}) ->

-spec ensure_stats({atom(), esockd:listen_on()}) -> ok.
ensure_stats(StatsKey) ->
ok = ?MODULE:init_stats(StatsKey, accepted),
ok = ?MODULE:init_stats(StatsKey, closed_overloaded).
Stats = [accepted,
closed_nostart,
closed_overloaded,
closed_rate_limitted,
closed_other_reasons],
ok = ?MODULE:init_stats(StatsKey, Stats),
ok.

-spec get_listener_prop(esockd:listener_ref(), _Name) -> _Value | undefined.
get_listener_prop(ListenerRef = {_Proto, _ListenOn}, Name) ->
Expand All @@ -125,8 +130,10 @@ init([]) ->
{write_concurrency, true}]),
{ok, #state{listener_props = #{}}}.

handle_call({init, {Protocol, ListenOn}, Metric}, _From, State) ->
true = ets:insert(?STATS_TAB, {{{Protocol, ListenOn}, Metric}, 0}),
handle_call({init, {Protocol, ListenOn}, Metrics}, _From, State) ->
lists:foreach(fun(Metric) ->
true = ets:insert(?STATS_TAB, {{{Protocol, ListenOn}, Metric}, 0})
end, Metrics),
{reply, ok, State, hibernate};

handle_call({get_listener_prop, ListenerRef, Name}, _From,
Expand Down
2 changes: 1 addition & 1 deletion test/esockd_server_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ all() -> esockd_ct:all(?MODULE).
t_inc_dec_stats(_) ->
{ok, _} = esockd_server:start_link(),
Name = {echo, 3000},
esockd_server:init_stats(Name, accepting),
esockd_server:init_stats(Name, [accepting]),
esockd_server:inc_stats(Name, accepting, 2),
esockd_server:inc_stats(Name, accepting, 2),
esockd_server:dec_stats(Name, accepting, 1),
Expand Down

0 comments on commit 78a8022

Please sign in to comment.