Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix WS disconnection handling #30

Merged
merged 7 commits into from
May 29, 2024
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,11 @@ Such client is disabled by default (`{ntp, false}`), and is not required to auth
Accepts an integer that represents time in milliseconds, default value is `5_000`.
Allows to tweak the timeout of each API request going through the websocket.

### ws_ping_timeout
Accepts an integer that represents time in milliseconds, default value is `60_000`.
Allows to tweak the timeout between expected ping frames from the server.
If the timeout is exceeded, the socket is closed and a new connection is attempted.

### logs_interval

Accepts an integer that represents time in milliseconds, default value is `2_000`.
Expand Down
1 change: 1 addition & 0 deletions src/grisp_connect.app.src
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
{connect, true}, % keeps a constant connection with grisp.io
{ntp, false}, % if set to true, starts the NTP client
{ws_requests_timeout, 5_000},
{ws_ping_timeout, 60_000},
{logs_interval, 2_000},
{logs_batch_size, 100},
{logger, [
Expand Down
8 changes: 7 additions & 1 deletion src/grisp_connect_log_server.erl
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ jsonify(Event) ->
jsonify_msg(#{msg := {string, String}} = Event) ->
maps:put(msg, unicode:characters_to_binary(String), Event);
jsonify_msg(#{msg := {report, Report}} = Event) ->
case jsx:is_term(Report) of
case is_json_compatible(Report) of
true ->
maps:put(msg, Report, Event);
false ->
Expand Down Expand Up @@ -108,3 +108,9 @@ jsonify_meta(#{meta := Meta} = Event) ->
Optional = maps:without(maps:keys(Default), Meta),
FilterFun = fun(Key, Value) -> jsx:is_term(#{Key => Value}) end,
maps:put(meta, maps:merge(maps:filter(FilterFun, Optional), Default), Event).

is_json_compatible(Term) ->
try jsx:is_term(Term)
catch error:_:_ ->
maehjam marked this conversation as resolved.
Show resolved Hide resolved
false
end.
45 changes: 37 additions & 8 deletions src/grisp_connect_ws.erl
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,13 @@
gun_pid,
gun_ref,
ws_stream,
ws_up = false
ws_up = false,
ping_timer
}).

-define(disconnected_state,
#state{gun_pid = undefined, gun_ref = undefine, ws_up = false}).
#state{gun_pid = undefined, gun_ref = undefine,
ws_up = false, ping_timer = undefined}).

-include_lib("kernel/include/logger.hrl").

Expand Down Expand Up @@ -72,31 +74,58 @@ handle_cast({send, Payload}, #state{gun_pid = Pid, ws_stream = Stream} = S) ->
handle_info({gun_up, GunPid, _}, #state{gun_pid = GunPid} = S) ->
?LOG_INFO(#{event => connection_enstablished}),
GunRef = monitor(process, GunPid),
WsStream = gun:ws_upgrade(GunPid, "/grisp-connect/ws"),
WsStream = gun:ws_upgrade(GunPid, "/grisp-connect/ws",[],
#{silence_pings => false}),
NewState = S#state{gun_pid = GunPid, gun_ref = GunRef, ws_stream = WsStream},
{noreply, NewState};
handle_info({gun_up, _OldPid, http}, #state{gun_pid = _GunPid} = S) ->
% Ignoring outdated gun_up messages
maehjam marked this conversation as resolved.
Show resolved Hide resolved
{noreply, S};
handle_info({gun_upgrade, Pid, Stream, [<<"websocket">>], _},
#state{gun_pid = Pid, ws_stream = Stream} = S) ->
?LOG_INFO(#{event => ws_upgrade}),
{noreply, S#state{ws_up = true}};
{noreply, S#state{ws_up = true, ping_timer = start_ping_timer()}};
handle_info({gun_response, Pid, Stream, _, Status, _Headers},
#state{gun_pid = Pid, ws_stream = Stream} = S) ->
?LOG_ERROR(#{event => ws_upgrade_failure, status => Status}),
{noreply, shutdown_gun(S)};
handle_info({gun_ws, Conn, Stream, {text, Text}},
#state{gun_pid = Conn, ws_stream = Stream} = S) ->
handle_info({gun_ws, Pid, Stream, ping},
#state{gun_pid = Pid, ws_stream = Stream,
ping_timer = PingTimer} = S) ->
timer:cancel(PingTimer),
{noreply, S#state{ping_timer = start_ping_timer()}};
handle_info({gun_ws, Pid, Stream, {text, Text}},
#state{gun_pid = Pid, ws_stream = Stream} = S) ->
grisp_connect_client:handle_message(Text),
{noreply, S};
handle_info({gun_down, Pid, ws, closed, [Stream]}, #state{gun_pid = Pid, ws_stream = Stream} = S) ->
?LOG_WARNING(#{event => ws_closed}),
grisp_connect_client:disconnected(),
{noreply, shutdown_gun(S)};
handle_info({'DOWN', _, process, Pid, Reason}, #state{gun_pid = Pid,
ping_timer = Tref} = S) ->
?LOG_WARNING(#{event => gun_crash, reason => Reason}),
timer:cancel(Tref),
grisp_connect_client:disconnected(),
{noreply, S?disconnected_state};
handle_info(ping_timeout, S) ->
?LOG_WARNING(#{event => ping_timeout}),
grisp_connect_client:disconnected(),
{noreply, shutdown_gun(S)};
handle_info(M, S) ->
?LOG_WARNING(#{event => unhandled_info, info => M}),
?LOG_WARNING(#{event => unhandled_info, info => M, state => S}),
{noreply, S}.

% internal functions -----------------------------------------------------------

shutdown_gun(#state{gun_pid = Pid} = State) ->
shutdown_gun(#state{gun_pid = Pid, gun_ref = GunRef,
ping_timer = PingTimer} = State) ->
timer:cancel(PingTimer),
demonitor(GunRef),
gun:shutdown(Pid),
State?disconnected_state.

start_ping_timer() ->
{ok, Timeout} = application:get_env(grisp_connect, ws_ping_timeout),
{ok, Tref} = timer:send_after(Timeout, ping_timeout),
Tref.
6 changes: 5 additions & 1 deletion test/grisp_connect_api_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,9 @@
-compile([export_all, nowarn_export_all]).

-import(grisp_connect_test_client, [wait_connection/0]).
-import(grisp_connect_test_client, [wait_connection/1]).
-import(grisp_connect_test_client, [wait_disconnection/0]).
-import(grisp_connect_test_client, [wait_disconnection/1]).
-import(grisp_connect_test_client, [serial_number/0]).
-import(grisp_connect_test_client, [cert_dir/0]).

Expand All @@ -28,7 +31,8 @@ init_per_suite(Config) ->
?assertEqual(ok, file:write_file(PolicyFile, <<>>)),
application:set_env(seabac, policy_file, PolicyFile),

Config2 = grisp_connect_manager:start(CertDir, Config),
Config2 = grisp_connect_manager:start(Config),
grisp_connect_manager:kraft_start(CertDir),
[{cert_dir, CertDir} | Config2].

end_per_suite(Config) ->
Expand Down
3 changes: 2 additions & 1 deletion test/grisp_connect_log_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,8 @@ init_per_suite(Config) ->
?assertEqual(ok, file:write_file(PolicyFile, <<>>)),
application:set_env(seabac, policy_file, PolicyFile),

Config2 = grisp_connect_manager:start(CertDir, Config),
Config2 = grisp_connect_manager:start(Config),
grisp_connect_manager:kraft_start(CertDir),
grisp_connect_manager:link_device(),
[{cert_dir, CertDir} | Config2].

Expand Down
19 changes: 13 additions & 6 deletions test/grisp_connect_manager.erl
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

-include_lib("common_test/include/ct.hrl").

start(CertDir, Config) ->
start(Config) ->
PrivDir = ?config(priv_dir, Config),
application:set_env(mnesia, dir, PrivDir),

Expand All @@ -18,25 +18,32 @@ start(CertDir, Config) ->
application:start(mnesia),

{ok, Started2} = application:ensure_all_started(kraft),

{ok, Started3} = application:ensure_all_started(grisp_manager),
Apps = Started1 ++ Started2 ++ Started3,
[{apps, Apps} | Config].

kraft_start(CertDir) ->
kraft_start(CertDir, #{}).

kraft_start(CertDir, OverrideOpts) ->
SslOpts = [
{verify, verify_peer},
{keyfile, filename:join(CertDir, "server.key")},
{certfile, filename:join(CertDir, "server.crt")},
{cacertfile, filename:join(CertDir, "CA.crt")}
],
KraftOpts = #{
Opts = #{
port => 3030,
ssl_opts => SslOpts,
app => grisp_manager
},
KraftOpts = mapz:deep_merge(Opts, OverrideOpts),
KraftRoutes = [
{"/grisp-connect/ws",
{ws, grisp_manager_device_api}, #{}, #{type => json_rpc}}
],
kraft:start(KraftOpts, KraftRoutes),

{ok, Started3} = application:ensure_all_started(grisp_manager),
[{apps, Started1 ++ Started2 ++ Started3} | Config].
kraft:start(KraftOpts, KraftRoutes).

cleanup_apps(Apps) ->
mnesia:delete_table(eresu_user),
Expand Down
94 changes: 94 additions & 0 deletions test/grisp_connect_reconnect_SUITE.erl
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
-module(grisp_connect_reconnect_SUITE).

-behaviour(ct_suite).
-include_lib("common_test/include/ct.hrl").
-include_lib("stdlib/include/assert.hrl").

-compile([export_all, nowarn_export_all]).

-import(grisp_connect_test_client, [wait_connection/0]).
-import(grisp_connect_test_client, [wait_connection/1]).
-import(grisp_connect_test_client, [wait_disconnection/0]).
-import(grisp_connect_test_client, [wait_disconnection/1]).
-import(grisp_connect_test_client, [serial_number/0]).
-import(grisp_connect_test_client, [cert_dir/0]).

%--- API -----------------------------------------------------------------------

all() ->
[
F
||
{F, 1} <- ?MODULE:module_info(exports),
lists:suffix("_test", atom_to_list(F))
].

init_per_suite(Config) ->
PrivDir = ?config(priv_dir, Config),
CertDir = cert_dir(),

PolicyFile = filename:join(PrivDir, "policies.term"),
?assertEqual(ok, file:write_file(PolicyFile, <<>>)),
application:set_env(seabac, policy_file, PolicyFile),

Config2 = grisp_connect_manager:start(Config),
[{cert_dir, CertDir} | Config2].

end_per_suite(Config) ->
grisp_connect_manager:cleanup_apps(?config(apps, Config)).

init_per_testcase(_, Config) ->
% the kraf instance links to this process
process_flag(trap_exit, true),
{ok, _} = application:ensure_all_started(kraft),
KraftRef = grisp_connect_manager:kraft_start(?config(cert_dir, Config)),
{ok, _} = application:ensure_all_started(grisp_emulation),
application:set_env(grisp_connect, test_cert_dir, ?config(cert_dir, Config)),
{ok, _} = application:ensure_all_started(grisp_connect),
[{kraft_instance, KraftRef} | Config].

end_per_testcase(_, Config) ->
ok = application:stop(grisp_connect),
kraft:stop(?config(kraft_instance, Config)),
ok = application:stop(kraft),
mnesia:activity(transaction, fun() ->
mnesia:delete({grisp_device, serial_number()})
end),
flush(),
Config.

%--- Tests ---------------------------------------------------------------------

reconnect_on_gun_crash_test(_) ->
?assertMatch(ok, wait_connection(100)),
{state, GunPid, _, _, _, _} = sys:get_state(grisp_connect_ws),
proc_lib:stop(GunPid),
?assertMatch(ok, wait_disconnection()),
?assertMatch(ok, wait_connection()).

reconnect_on_disconnection_test(Config) ->
?assertMatch(ok, wait_connection()),
ok = kraft:stop(?config(kraft_instance, Config)),
?assertMatch(ok, wait_disconnection()),
KraftRef2 = grisp_connect_manager:kraft_start(cert_dir()),
?assertMatch(ok, wait_connection(100)),
[{kraft_instance, KraftRef2} | proplists:delete(kraft_instance, Config)].

reconnect_on_ping_timeout_test(_) ->
?assertMatch(ok, wait_connection()),
{state, GunPid, _, _, _, _} = sys:get_state(grisp_connect_ws),
proc_lib:stop(GunPid),
% Now decrease ping timeout so that the WS closes after just 1 second
application:set_env(grisp_connect, ws_ping_timeout, 1000),
?assertMatch(ok, wait_disconnection()),
?assertMatch(ok, wait_connection(100)),
?assertMatch(ok, wait_disconnection()),
?assertMatch(ok, wait_connection(100)),
?assertMatch(ok, wait_disconnection()).

%--- Internal ------------------------------------------------------------------

flush() ->
receive Any -> ct:pal("Flushed: ~p", [Any]), flush()
after 0 -> ok
end.
21 changes: 19 additions & 2 deletions test/grisp_connect_test_client.erl
Original file line number Diff line number Diff line change
Expand Up @@ -6,18 +6,21 @@
-export([cert_dir/0]).
-export([serial_number/0]).
-export([wait_connection/0]).
-export([wait_connection/1]).
-export([wait_disconnection/0]).
-export([wait_disconnection/1]).

%--- API -----------------------------------------------------------------------

cert_dir() -> filename:join(code:lib_dir(grisp_connect, test), "certs").
cert_dir() -> filename:join(code:lib_dir(grisp_connect, test), "certs").

serial_number() -> <<"0000">>.

wait_connection() ->
wait_connection(20).

wait_connection(0) ->
ct:pal("grisp_connect state:~n~p~n", [sys:get_state(grisp_connect_client)]),
ct:pal("grisp_connect_ws state:~n~p~n", [sys:get_state(grisp_connect_ws)]),
{error, timeout};
wait_connection(N) ->
case grisp_connect:is_connected() of
Expand All @@ -26,3 +29,17 @@ wait_connection(N) ->
ct:sleep(100),
wait_connection(N - 1)
end.

wait_disconnection() ->
wait_disconnection(20).

wait_disconnection(0) ->
ct:pal("grisp_connect_ws state:~n~p~n", [sys:get_state(grisp_connect_ws)]),
{error, timeout};
wait_disconnection(N) ->
case grisp_connect:is_connected() of
true ->
ct:sleep(100),
wait_disconnection(N - 1);
false -> ok
end.
Loading