Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor the JSON-RPC connection #52

Open
wants to merge 7 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 18 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,24 @@ and this project adheres to

## [Unreleased]

### Changed

- The name of the grisp_connect configuration key to control the timout of
individual JSON-RPC requests changed from ws_requests_timeout ot
ws_request_timeout.
- Le default log filter changed to trying to filter out only some messages to
filtering out all progress messages, as it wasn't working reliably.
- The connection is not a persistent process anymore, it is now a transiant
process handling a connection and dying when the connection is closed.
- Internally, the JSON-RPC is parsed into a list of atom or binaries to pave the
road for namespaces. foo.bar.Buz is parsed into [foo, bar, <<"Buz">>] (if foo
and bar are already existing atoms, but 'Buz' is not).

## Fixed

- The client is now waiting 1 second before trying to reconnect when it gets
disconnected fomr the server.

## [1.1.0] - 2024-10-12

### Added
Expand Down
10 changes: 4 additions & 6 deletions config/dev.config
Original file line number Diff line number Diff line change
Expand Up @@ -10,18 +10,16 @@
]},

{kernel, [
{logger_level, info},
{logger_level, debug},
{logger, [
{handler, default, logger_std_h, #{
config => #{type => standard_io},
filter_default => log,
filters => [
% Filter out supervisor progress reports so
% TLS certificates are not swamping the console...
{filter_out_progress, {
fun grisp_connect_logger_bin:filter_out/2,
{supervisor, report_progress}}}
{disable_progress, {fun logger_filters:progress/2, stop}}
]
}}
]}
]}

].
13 changes: 12 additions & 1 deletion config/local.config
Original file line number Diff line number Diff line change
Expand Up @@ -26,5 +26,16 @@
{port, 8443}
]},

{kernel, [{logger_level, debug}]}
{kernel, [
{logger_level, debug},
{logger, [
{handler, default, logger_std_h, #{
config => #{type => standard_io},
filter_default => log,
filters => [
{disable_progress, {fun logger_filters:progress/2, stop}}
]
}}
]}
]}
].
13 changes: 13 additions & 0 deletions config/test.config
Original file line number Diff line number Diff line change
Expand Up @@ -14,5 +14,18 @@
{grisp_connect, [
{domain, localhost},
{port, 3030}
]},

{kernel, [
{logger_level, debug},
{logger, [
{handler, default, logger_std_h, #{
config => #{type => standard_io},
filter_default => log,
filters => [
{disable_progress, {fun logger_filters:progress/2, stop}}
]
}}
]}
]}
].
30 changes: 14 additions & 16 deletions src/grisp_connect.app.src
Original file line number Diff line number Diff line change
Expand Up @@ -21,26 +21,24 @@
{port, 443},
{connect, true}, % keeps a constant connection with grisp.io
{ntp, false}, % if set to true, starts the NTP client
{ws_requests_timeout, 5_000},
{ws_transport, tls},
{ws_path, "/grisp-connect/ws"},
{ws_request_timeout, 5_000},
{ws_ping_timeout, 60_000},
{logs_interval, 2_000},
{logs_batch_size, 100},
{logger, [
% Enable our own default handler,
% which will receive all events from boot
{handler,
grisp_connect_log_handler,
grisp_connect_logger_bin,
#{formatter => {grisp_connect_logger_bin, #{}},
% Filter out supervisor progress reports to prevent the ones
% from tls_dyn_connection_sup that logs all the certificates
% to crash the connection...
filters => [
{filter_out_progress,
{fun grisp_connect_logger_bin:filter_out/2,
{supervisor, report_progress}}}
]
}}
% Enable our own default handler,
% which will receive all events from boot
{handler,
grisp_connect_log_handler,
grisp_connect_logger_bin,
#{formatter => {grisp_connect_logger_bin, #{}},
filter_default => log,
filters => [
{disable_progress, {fun logger_filters:progress/2, stop}}
]
}}
]}
]},
{modules, []},
Expand Down
162 changes: 45 additions & 117 deletions src/grisp_connect_api.erl
Original file line number Diff line number Diff line change
@@ -1,152 +1,80 @@
%% @doc Library module containing the jsonrpc API logic
-module(grisp_connect_api).

-export([request/3]).
-export([notify/3]).
-export([handle_msg/1]).

-include_lib("kernel/include/logger.hrl").

%--- Macros --------------------------------------------------------------------
-define(method_get, <<"get">>).
-define(method_post, <<"post">>).
-define(method_patch, <<"patch">>).
-define(method_delete, <<"delete">>).
-define(method_get, get).
-define(method_post, post).

%--- API -----------------------------------------------------------------------

% #doc Assembles a jsonrpc request and its uuid
-spec request(Method :: atom() | binary(),
Type :: atom() | binary(),
Params :: map()) -> {ID :: binary(), Encoded :: binary()}.
request(Method, Type, Params) ->
ID = id(),
Rpc = {request, Method, maps:put(type, Type, Params), ID},
Encoded = grisp_connect_jsonrpc:encode(Rpc),
{ID, Encoded}.
%--- API -----------------------------------------------------------------------

% #doc Assembles a jsonrpc notification
-spec notify(Method :: atom() | binary(),
Type :: atom() | binary(),
Params :: map()) -> Encoded :: binary().
notify(Method, Type, Params) ->
Rpc = {notification, Method, maps:put(type, Type, Params)},
grisp_connect_jsonrpc:encode(Rpc).
% @doc Handles requests, notifications and errors from grisp.io.
-spec handle_msg(Msg) ->
ok | {reply, Result :: term(), ReqRef :: binary() | integer()}
when Msg :: {request, Method :: grisp_connect_connection:method(), Params :: map() | list(), ReqRef :: binary() | integer()}
| {notification, grisp_connect_connection:method(), Params :: map() | list()}
| {remote_error, Code :: integer() | atom(), Message :: undefined | binary(), Data :: term()}.
handle_msg({notification, M, Params}) ->
?LOG_ERROR("Received unexpected notification ~p: ~p", [M, Params]),
ok;
handle_msg({remote_error, Code, Message, _Data}) ->
?LOG_ERROR("Received JSON-RPC error ~p: ~s", [Code, Message]),
ok;
handle_msg({request, M, Params, ID})
when M == [?method_post]; M == [?method_get] ->
handle_request(M, Params, ID).

% @doc Indentifies if the message is a request or a reply to a previous request.
% In case it was a request, returns the reply to be sent to the peer.
% In case it was a response, returns the parsed ID and content to be handled by
% the caller.
-spec handle_msg(JSON :: binary()) ->
{send_response, Response :: binary()} |
{handle_response, ID :: binary(), {ok, Result :: map()} | {error, atom()}}.
handle_msg(JSON) ->
JSON_RPC = grisp_connect_jsonrpc:decode(JSON),
handle_jsonrpc(JSON_RPC).

%--- Internal Funcitons --------------------------------------------------------

format_error({internal_error, parse_error, ID}) ->
{error, -32700, <<"Parse error">>, undefined, ID};
format_error({internal_error, invalid_request, ID}) ->
{error, -32600, <<"Invalid request">>, undefined, ID};
format_error({internal_error, method_not_found, ID}) ->
{error, -32601, <<"Method not found">>, undefined, ID};
format_error({internal_error, invalid_params, ID}) ->
{error, -32602, <<"Invalid params">>, undefined, ID};
format_error({internal_error, Reason, ID}) ->
{error, -32603, <<"Internal error">>, Reason, ID}.

%FIXME: Batch are not supported yet. When receiving a batch of messages, as per
% the JSON-RPC standard, all the responses should goes in a single batch
% of responses.
handle_jsonrpc(Messages) when is_list(Messages) ->
handle_rpc_messages(Messages, []);
handle_jsonrpc(Message) ->
handle_rpc_messages([Message], []).

handle_rpc_messages([], Replies) -> lists:reverse(Replies);
handle_rpc_messages([{request, M, Params, ID} | Batch], Replies)
when M == ?method_post;
M == ?method_get ->
handle_rpc_messages(Batch, [handle_request(M, Params, ID) | Replies]);
handle_rpc_messages([{result, _, _} = Res| Batch], Replies) ->
handle_rpc_messages(Batch, [handle_response(Res)| Replies]);
handle_rpc_messages([{error, _Code, _Msg, _Data, _ID} = E | Batch], Replies) ->
?LOG_INFO("Received JsonRPC error: ~p",[E]),
handle_rpc_messages(Batch, [handle_response(E)| Replies]);
handle_rpc_messages([{decoding_error, _, _, _, _} = E | Batch], Replies) ->
?LOG_ERROR("JsonRPC decoding error: ~p",[E]),
handle_rpc_messages(Batch, Replies).

handle_request(?method_get, #{type := <<"system_info">>} = _Params, ID) ->
handle_request([?method_get], #{type := <<"system_info">>} = _Params, ID) ->
Info = grisp_connect_updater:system_info(),
{send_response, grisp_connect_jsonrpc:encode({result, Info, ID})};
handle_request(?method_post, #{type := <<"start_update">>} = Params, ID) ->
{reply, Info, ID};
handle_request([?method_post], #{type := <<"start_update">>} = Params, ID) ->
try
URL = maps:get(url, Params),
Reply = case grisp_connect_updater:start_update(URL) of
case grisp_connect_updater:start_update(URL) of
{error, grisp_updater_unavailable} ->
{error, -10, grisp_updater_unavailable, undefined, ID};
{error, grisp_updater_unavailable, undefined, undefined, ID};
{error, already_updating} ->
{error, -11, already_updating, undefined, ID};
{error, already_updating, undefined, undefined, ID};
{error, boot_system_not_validated} ->
{error, -12, boot_system_not_validated, undefined, ID};
{error, boot_system_not_validated, undefined, undefined, ID};
{error, Reason} ->
ReasonBinary = iolist_to_binary(io_lib:format("~p", [Reason])),
format_error({internal_error, ReasonBinary, ID});
{error, internal_error, ReasonBinary, undefined, ID};
ok ->
{result, ok, ID}
end,
{send_response, grisp_connect_jsonrpc:encode(Reply)}
{reply, ok, ID}
end
catch
throw:bad_key ->
{send_response,
format_error({internal_error, invalid_params, ID})}
end;
handle_request(?method_post, #{type := <<"validate">>}, ID) ->
Reply = case grisp_connect_updater:validate() of
{error, internal_error, <<"Invalid params">>, ID}
end;
handle_request([?method_post], #{type := <<"validate">>}, ID) ->
case grisp_connect_updater:validate() of
{error, grisp_updater_unavailable} ->
{error, -10, grisp_updater_unavailable, undefined, ID};
{error, grisp_updater_unavailable, undefined, undefined, ID};
{error, {validate_from_unbooted, PartitionIndex}} ->
{error, -13, validate_from_unbooted, PartitionIndex, ID};
{error, validate_from_unbooted, undefined, PartitionIndex, ID};
{error, Reason} ->
ReasonBinary = iolist_to_binary(io_lib:format("~p", [Reason])),
format_error({internal_error, ReasonBinary, ID});
{error, internal_error, ReasonBinary, undefined, ID};
ok ->
{result, ok, ID}
end,
{send_response, grisp_connect_jsonrpc:encode(Reply)};
handle_request(?method_post, #{type := <<"reboot">>}, ID) ->
{reply, ok, ID}
end;
handle_request([?method_post], #{type := <<"reboot">>}, ID) ->
grisp_connect_client:reboot(),
{send_response, grisp_connect_jsonrpc:encode({result, ok, ID})};
handle_request(?method_post, #{type := <<"cancel">>}, ID) ->
Reply = case grisp_connect_updater:cancel() of
{reply, ok, ID};
handle_request([?method_post], #{type := <<"cancel">>}, ID) ->
case grisp_connect_updater:cancel() of
{error, grisp_updater_unavailable} ->
{error, -10, grisp_updater_unavailable, undefined, ID};
{error, grisp_updater_unavailable, undefined, undefined, ID};
ok ->
{result, ok, ID}
end,
{send_response, grisp_connect_jsonrpc:encode(Reply)};
{reply, ok, ID}
end;
handle_request(_T, _P, ID) ->
Error = {internal_error, method_not_found, ID},
FormattedError = format_error(Error),
{send_response, grisp_connect_jsonrpc:encode(FormattedError)}.

handle_response(Response) ->
{ID, Reply} = case Response of
{result, Result, ID0} ->
{ID0, {ok, Result}};
{error, Code, _Message, _Data, ID0} ->
{ID0, {error, error_atom(Code)}}
end,
{handle_response, ID, Reply}.

error_atom(-1) -> device_not_linked;
error_atom(-2) -> token_expired;
error_atom(-3) -> device_already_linked;
error_atom(-4) -> invalid_token;
error_atom(_) -> jsonrpc_error.

id() ->
list_to_binary(integer_to_list(erlang:unique_integer())).
{error, method_not_found, undefined, undefined, ID}.
Loading
Loading