Skip to content

Commit

Permalink
Refactor the JSON-RPC connection
Browse files Browse the repository at this point in the history
  • Loading branch information
sylane committed Nov 26, 2024
1 parent 2422470 commit 755c7f5
Show file tree
Hide file tree
Showing 23 changed files with 1,735 additions and 434 deletions.
18 changes: 18 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,24 @@ and this project adheres to

## [Unreleased]

### Changed

- The name of the grisp_connect configuration key to control the timout of
individual JSON-RPC requests changed from ws_requests_timeout ot
ws_request_timeout.
- Le default log filter changed to trying to filter out only some messages to
filtering out all progress messages, as it wasn't working reliably.
- The connection is not a persistent process anymore, it is now a transiant
process handling a connection and dying when the connection is closed.
- Internally, the JSON-RPC is parsed into a list of atom or binaries to pave the
road for namespaces. foo.bar.Buz is parsed into [foo, bar, <<"Buz">>] (if foo
and bar are already existing atoms, but 'Buz' is not).

## Fixed

- The client is now waiting 1 second before trying to reconnect when it gets
disconnected fomr the server.

## [1.1.0] - 2024-10-12

### Added
Expand Down
10 changes: 4 additions & 6 deletions config/dev.config
Original file line number Diff line number Diff line change
Expand Up @@ -10,18 +10,16 @@
]},

{kernel, [
{logger_level, info},
{logger_level, debug},
{logger, [
{handler, default, logger_std_h, #{
config => #{type => standard_io},
filter_default => log,
filters => [
% Filter out supervisor progress reports so
% TLS certificates are not swamping the console...
{filter_out_progress, {
fun grisp_connect_logger_bin:filter_out/2,
{supervisor, report_progress}}}
{disable_progress, {fun logger_filters:progress/2, stop}}
]
}}
]}
]}

].
13 changes: 12 additions & 1 deletion config/local.config
Original file line number Diff line number Diff line change
Expand Up @@ -26,5 +26,16 @@
{port, 8443}
]},

{kernel, [{logger_level, debug}]}
{kernel, [
{logger_level, debug},
{logger, [
{handler, default, logger_std_h, #{
config => #{type => standard_io},
filter_default => log,
filters => [
{disable_progress, {fun logger_filters:progress/2, stop}}
]
}}
]}
]}
].
13 changes: 13 additions & 0 deletions config/test.config
Original file line number Diff line number Diff line change
Expand Up @@ -14,5 +14,18 @@
{grisp_connect, [
{domain, localhost},
{port, 3030}
]},

{kernel, [
{logger_level, debug},
{logger, [
{handler, default, logger_std_h, #{
config => #{type => standard_io},
filter_default => log,
filters => [
{disable_progress, {fun logger_filters:progress/2, stop}}
]
}}
]}
]}
].
30 changes: 14 additions & 16 deletions src/grisp_connect.app.src
Original file line number Diff line number Diff line change
Expand Up @@ -21,26 +21,24 @@
{port, 443},
{connect, true}, % keeps a constant connection with grisp.io
{ntp, false}, % if set to true, starts the NTP client
{ws_requests_timeout, 5_000},
{ws_transport, tls},
{ws_path, "/grisp-connect/ws"},
{ws_request_timeout, 5_000},
{ws_ping_timeout, 60_000},
{logs_interval, 2_000},
{logs_batch_size, 100},
{logger, [
% Enable our own default handler,
% which will receive all events from boot
{handler,
grisp_connect_log_handler,
grisp_connect_logger_bin,
#{formatter => {grisp_connect_logger_bin, #{}},
% Filter out supervisor progress reports to prevent the ones
% from tls_dyn_connection_sup that logs all the certificates
% to crash the connection...
filters => [
{filter_out_progress,
{fun grisp_connect_logger_bin:filter_out/2,
{supervisor, report_progress}}}
]
}}
% Enable our own default handler,
% which will receive all events from boot
{handler,
grisp_connect_log_handler,
grisp_connect_logger_bin,
#{formatter => {grisp_connect_logger_bin, #{}},
filter_default => log,
filters => [
{disable_progress, {fun logger_filters:progress/2, stop}}
]
}}
]}
]},
{modules, []},
Expand Down
162 changes: 45 additions & 117 deletions src/grisp_connect_api.erl
Original file line number Diff line number Diff line change
@@ -1,152 +1,80 @@
%% @doc Library module containing the jsonrpc API logic
-module(grisp_connect_api).

-export([request/3]).
-export([notify/3]).
-export([handle_msg/1]).

-include_lib("kernel/include/logger.hrl").

%--- Macros --------------------------------------------------------------------
-define(method_get, <<"get">>).
-define(method_post, <<"post">>).
-define(method_patch, <<"patch">>).
-define(method_delete, <<"delete">>).
-define(method_get, get).
-define(method_post, post).

%--- API -----------------------------------------------------------------------

% #doc Assembles a jsonrpc request and its uuid
-spec request(Method :: atom() | binary(),
Type :: atom() | binary(),
Params :: map()) -> {ID :: binary(), Encoded :: binary()}.
request(Method, Type, Params) ->
ID = id(),
Rpc = {request, Method, maps:put(type, Type, Params), ID},
Encoded = grisp_connect_jsonrpc:encode(Rpc),
{ID, Encoded}.
%--- API -----------------------------------------------------------------------

% #doc Assembles a jsonrpc notification
-spec notify(Method :: atom() | binary(),
Type :: atom() | binary(),
Params :: map()) -> Encoded :: binary().
notify(Method, Type, Params) ->
Rpc = {notification, Method, maps:put(type, Type, Params)},
grisp_connect_jsonrpc:encode(Rpc).
% @doc Handles requests, notifications and errors from grisp.io.
-spec handle_msg(Msg) ->
ok | {reply, Result :: term(), ReqRef :: binary() | integer()}
when Msg :: {request, Method :: grisp_connect_connection:method(), Params :: map() | list(), ReqRef :: binary() | integer()}
| {notification, grisp_connect_connection:method(), Params :: map() | list()}
| {remote_error, Code :: integer() | atom(), Message :: undefined | binary(), Data :: term()}.
handle_msg({notification, M, Params}) ->
?LOG_ERROR("Received unexpected notification ~p: ~p", [M, Params]),
ok;
handle_msg({remote_error, Code, Message, _Data}) ->
?LOG_ERROR("Received JSON-RPC error ~p: ~s", [Code, Message]),
ok;
handle_msg({request, M, Params, ID})
when M == [?method_post]; M == [?method_get] ->
handle_request(M, Params, ID).

% @doc Indentifies if the message is a request or a reply to a previous request.
% In case it was a request, returns the reply to be sent to the peer.
% In case it was a response, returns the parsed ID and content to be handled by
% the caller.
-spec handle_msg(JSON :: binary()) ->
{send_response, Response :: binary()} |
{handle_response, ID :: binary(), {ok, Result :: map()} | {error, atom()}}.
handle_msg(JSON) ->
JSON_RPC = grisp_connect_jsonrpc:decode(JSON),
handle_jsonrpc(JSON_RPC).

%--- Internal Funcitons --------------------------------------------------------

format_error({internal_error, parse_error, ID}) ->
{error, -32700, <<"Parse error">>, undefined, ID};
format_error({internal_error, invalid_request, ID}) ->
{error, -32600, <<"Invalid request">>, undefined, ID};
format_error({internal_error, method_not_found, ID}) ->
{error, -32601, <<"Method not found">>, undefined, ID};
format_error({internal_error, invalid_params, ID}) ->
{error, -32602, <<"Invalid params">>, undefined, ID};
format_error({internal_error, Reason, ID}) ->
{error, -32603, <<"Internal error">>, Reason, ID}.

%FIXME: Batch are not supported yet. When receiving a batch of messages, as per
% the JSON-RPC standard, all the responses should goes in a single batch
% of responses.
handle_jsonrpc(Messages) when is_list(Messages) ->
handle_rpc_messages(Messages, []);
handle_jsonrpc(Message) ->
handle_rpc_messages([Message], []).

handle_rpc_messages([], Replies) -> lists:reverse(Replies);
handle_rpc_messages([{request, M, Params, ID} | Batch], Replies)
when M == ?method_post;
M == ?method_get ->
handle_rpc_messages(Batch, [handle_request(M, Params, ID) | Replies]);
handle_rpc_messages([{result, _, _} = Res| Batch], Replies) ->
handle_rpc_messages(Batch, [handle_response(Res)| Replies]);
handle_rpc_messages([{error, _Code, _Msg, _Data, _ID} = E | Batch], Replies) ->
?LOG_INFO("Received JsonRPC error: ~p",[E]),
handle_rpc_messages(Batch, [handle_response(E)| Replies]);
handle_rpc_messages([{decoding_error, _, _, _, _} = E | Batch], Replies) ->
?LOG_ERROR("JsonRPC decoding error: ~p",[E]),
handle_rpc_messages(Batch, Replies).

handle_request(?method_get, #{type := <<"system_info">>} = _Params, ID) ->
handle_request([?method_get], #{type := <<"system_info">>} = _Params, ID) ->
Info = grisp_connect_updater:system_info(),
{send_response, grisp_connect_jsonrpc:encode({result, Info, ID})};
handle_request(?method_post, #{type := <<"start_update">>} = Params, ID) ->
{reply, Info, ID};
handle_request([?method_post], #{type := <<"start_update">>} = Params, ID) ->
try
URL = maps:get(url, Params),
Reply = case grisp_connect_updater:start_update(URL) of
case grisp_connect_updater:start_update(URL) of
{error, grisp_updater_unavailable} ->
{error, -10, grisp_updater_unavailable, undefined, ID};
{error, grisp_updater_unavailable, undefined, undefined, ID};
{error, already_updating} ->
{error, -11, already_updating, undefined, ID};
{error, already_updating, undefined, undefined, ID};
{error, boot_system_not_validated} ->
{error, -12, boot_system_not_validated, undefined, ID};
{error, boot_system_not_validated, undefined, undefined, ID};
{error, Reason} ->
ReasonBinary = iolist_to_binary(io_lib:format("~p", [Reason])),
format_error({internal_error, ReasonBinary, ID});
{error, internal_error, ReasonBinary, undefined, ID};
ok ->
{result, ok, ID}
end,
{send_response, grisp_connect_jsonrpc:encode(Reply)}
{reply, ok, ID}
end
catch
throw:bad_key ->
{send_response,
format_error({internal_error, invalid_params, ID})}
end;
handle_request(?method_post, #{type := <<"validate">>}, ID) ->
Reply = case grisp_connect_updater:validate() of
{error, internal_error, <<"Invalid params">>, ID}
end;
handle_request([?method_post], #{type := <<"validate">>}, ID) ->
case grisp_connect_updater:validate() of
{error, grisp_updater_unavailable} ->
{error, -10, grisp_updater_unavailable, undefined, ID};
{error, grisp_updater_unavailable, undefined, undefined, ID};
{error, {validate_from_unbooted, PartitionIndex}} ->
{error, -13, validate_from_unbooted, PartitionIndex, ID};
{error, validate_from_unbooted, undefined, PartitionIndex, ID};
{error, Reason} ->
ReasonBinary = iolist_to_binary(io_lib:format("~p", [Reason])),
format_error({internal_error, ReasonBinary, ID});
{error, internal_error, ReasonBinary, undefined, ID};
ok ->
{result, ok, ID}
end,
{send_response, grisp_connect_jsonrpc:encode(Reply)};
handle_request(?method_post, #{type := <<"reboot">>}, ID) ->
{reply, ok, ID}
end;
handle_request([?method_post], #{type := <<"reboot">>}, ID) ->
grisp_connect_client:reboot(),
{send_response, grisp_connect_jsonrpc:encode({result, ok, ID})};
handle_request(?method_post, #{type := <<"cancel">>}, ID) ->
Reply = case grisp_connect_updater:cancel() of
{reply, ok, ID};
handle_request([?method_post], #{type := <<"cancel">>}, ID) ->
case grisp_connect_updater:cancel() of
{error, grisp_updater_unavailable} ->
{error, -10, grisp_updater_unavailable, undefined, ID};
{error, grisp_updater_unavailable, undefined, undefined, ID};
ok ->
{result, ok, ID}
end,
{send_response, grisp_connect_jsonrpc:encode(Reply)};
{reply, ok, ID}
end;
handle_request(_T, _P, ID) ->
Error = {internal_error, method_not_found, ID},
FormattedError = format_error(Error),
{send_response, grisp_connect_jsonrpc:encode(FormattedError)}.

handle_response(Response) ->
{ID, Reply} = case Response of
{result, Result, ID0} ->
{ID0, {ok, Result}};
{error, Code, _Message, _Data, ID0} ->
{ID0, {error, error_atom(Code)}}
end,
{handle_response, ID, Reply}.

error_atom(-1) -> device_not_linked;
error_atom(-2) -> token_expired;
error_atom(-3) -> device_already_linked;
error_atom(-4) -> invalid_token;
error_atom(_) -> jsonrpc_error.

id() ->
list_to_binary(integer_to_list(erlang:unique_integer())).
{error, method_not_found, undefined, undefined, ID}.
Loading

0 comments on commit 755c7f5

Please sign in to comment.