diff --git a/config/ct.config b/config/ct.config index 756bff94..1a33ce0a 100644 --- a/config/ct.config +++ b/config/ct.config @@ -24,7 +24,8 @@ }}, {multi_buy_enabled, true}, {routing_cache_window_secs, 5}, - {routing_cache_timeout_secs, 10} + {routing_cache_timeout_secs, 10}, + {ics_stream_worker_checkpoint_secs, 3} ]}, {aws_credentials, [ {credential_providers, [aws_credentials_env]}, diff --git a/config/sys.config.src b/config/sys.config.src index 3e01fa60..94731d78 100644 --- a/config/sys.config.src +++ b/config/sys.config.src @@ -25,7 +25,8 @@ port => "${HPR_MULTI_BUY_SERVICE_PORT}" }}, {routing_cache_timeout_secs, 15}, - {routing_cache_window_secs, 120} + {routing_cache_window_secs, 120}, + {ics_stream_worker_checkpoint_secs, 300} ]}, {aws_credentials, [ {credential_providers, [aws_credentials_env, aws_credentials_ec2]} diff --git a/rebar.lock b/rebar.lock index 581cd3f0..6060d0fc 100644 --- a/rebar.lock +++ b/rebar.lock @@ -63,7 +63,7 @@ {<<"hackney">>,{pkg,<<"hackney">>,<<"1.17.0">>},0}, {<<"helium_proto">>, {git,"https://github.com/helium/proto.git", - {ref,"8d81732f55aa34442ada818c93f504f1a16ca659"}}, + {ref,"585704c871d32846a6e35d186a08443883545687"}}, 0}, {<<"hpack">>,{pkg,<<"hpack_erl">>,<<"0.3.0">>},2}, {<<"http2_client">>, diff --git a/src/cli/hpr_cli_config.erl b/src/cli/hpr_cli_config.erl index 87d7f024..cc6be22c 100644 --- a/src/cli/hpr_cli_config.erl +++ b/src/cli/hpr_cli_config.erl @@ -45,6 +45,12 @@ config_usage() -> "config route deactivate - Deactivate route\n", "config skf - List all Session Key Filters for Devaddr or Session Key\n", "config eui --app --dev - List all Routes with EUI pair\n" + "\n\n", + "config counts - Simple Counts of Configuration\n", + "config checkpoint next - Time until next writing of configuration to disk\n" + "config checkpoint write - Write current configuration to disk\n", + "config checkpoint reset - Set checkpoint timestamp to beginning of time (0)\n" + "config reconnect - Reset connection to Configuration Service\n" ] ]. @@ -102,7 +108,17 @@ config_cmd() -> ]} ], fun config_eui/3 - ] + ], + [["config", "counts"], [], [], fun config_counts/3], + [["config", "checkpoint", "next"], [], [], fun config_checkpoint_next/3], + [["config", "checkpoint", "write"], [], [], fun config_checkpoint_write/3], + [ + ["config", "checkpoint", "reset"], + [], + [{commit, [{longname, "commit"}, {datatype, boolean}]}], + fun config_checkpoint_reset/3 + ], + [["config", "reconnect"], [], [], fun config_reconnect/3] ]. config_list(["config", "ls"], [], []) -> @@ -342,6 +358,61 @@ config_eui(["config", "eui"], [], Flags) -> config_eui(_, _, _) -> usage. +config_counts(["config", "counts"], [], []) -> + c_table([ + [ + {" Routes ", hpr_route_storage:count()}, + {" EUI Pairs ", hpr_eui_pair_storage:count()}, + {" SKF ", hpr_skf_storage:count()}, + {" DevAddr Ranges ", hpr_devaddr_range_storage:count()} + ] + ]); +config_counts(_, _, _) -> + usage. + +config_checkpoint_next(["config", "checkpoint", "next"], [], []) -> + Msg = hpr_route_stream_worker:print_next_checkpoint(), + c_text(Msg); +config_checkpoint_next(_, _, _) -> + usage. + +config_checkpoint_write(["config", "checkpoint", "write"], [], []) -> + case timer:tc(fun() -> hpr_route_stream_worker:do_checkpoint(erlang:system_time(second)) end) of + {Time0, ok} -> + Time = erlang:convert_time_unit(Time0, microsecond, millisecond), + c_text("Wrote checkpoint in ~wms", [Time]); + Other -> + c_text("Something went wrong: ~p", [Other]) + end; +config_checkpoint_write(_, _, _) -> + usage. + +config_checkpoint_reset(["config", "checkpoint", "reset"], [], Flags) -> + Options = maps:from_list(Flags), + case maps:is_key(commit, Options) of + true -> + case hpr_route_stream_worker:reset_timestamp() of + ok -> + c_text("Checkpoint reset"); + Err -> + c_text("Something went wrong:~n~p", [Err]) + end; + false -> + c_text("Must specify --commit to reset checkpoint") + end; +config_checkpoint_reset(_, _, _) -> + usage. + +config_reconnect(["config", "reconnect"], [], []) -> + case hpr_route_stream_worker:reset_connection() of + ok -> + c_text("Reconnected"); + Err -> + c_text("Something went wrong:~n~p", [Err]) + end; +config_reconnect(_, _, _) -> + usage. + do_config_eui(AppEUI, DevEUI) -> AppEUINum = erlang:list_to_integer(AppEUI, 16), DevEUINum = erlang:list_to_integer(DevEUI, 16), diff --git a/src/grpc/iot_config/hpr_devaddr_range_storage.erl b/src/grpc/iot_config/hpr_devaddr_range_storage.erl index 76044362..1cd2d54f 100644 --- a/src/grpc/iot_config/hpr_devaddr_range_storage.erl +++ b/src/grpc/iot_config/hpr_devaddr_range_storage.erl @@ -2,6 +2,7 @@ -export([ init_ets/0, + checkpoint/0, lookup/1, insert/1, @@ -20,15 +21,23 @@ -export([test_delete_ets/0, test_size/0, test_tab_name/0]). -endif. --define(ETS_DEVADDR_RANGES, hpr_route_devaddr_ranges_ets). +-define(ETS, hpr_route_devaddr_ranges_ets). +-define(DETS, hpr_route_devaddr_ranges_dets). -spec init_ets() -> ok. init_ets() -> - ?ETS_DEVADDR_RANGES = ets:new(?ETS_DEVADDR_RANGES, [ + ?ETS = ets:new(?ETS, [ public, named_table, bag, {read_concurrency, true} ]), + ok = rehydrate_from_dets(), ok. +-spec checkpoint() -> ok. +checkpoint() -> + with_open_dets(fun() -> + ok = dets:from_ets(?DETS, ?ETS) + end). + -spec lookup(DevAddr :: non_neg_integer()) -> [hpr_route_ets:route()]. lookup(DevAddr) -> MS = [ @@ -40,7 +49,7 @@ lookup(DevAddr) -> ['$3'] } ], - RouteIDs = ets:select(?ETS_DEVADDR_RANGES, MS), + RouteIDs = ets:select(?ETS, MS), lists:usort( lists:flatten([ @@ -52,7 +61,7 @@ lookup(DevAddr) -> -spec insert(DevAddrRange :: hpr_devaddr_range:devaddr_range()) -> ok. insert(DevAddrRange) -> - true = ets:insert(?ETS_DEVADDR_RANGES, [ + true = ets:insert(?ETS, [ { {hpr_devaddr_range:start_addr(DevAddrRange), hpr_devaddr_range:end_addr(DevAddrRange)}, hpr_devaddr_range:route_id(DevAddrRange) @@ -70,7 +79,7 @@ insert(DevAddrRange) -> -spec delete(DevAddrRange :: hpr_devaddr_range:devaddr_range()) -> ok. delete(DevAddrRange) -> - true = ets:delete_object(?ETS_DEVADDR_RANGES, { + true = ets:delete_object(?ETS, { {hpr_devaddr_range:start_addr(DevAddrRange), hpr_devaddr_range:end_addr(DevAddrRange)}, hpr_devaddr_range:route_id(DevAddrRange) }), @@ -86,23 +95,23 @@ delete(DevAddrRange) -> -spec delete_all() -> ok. delete_all() -> - ets:delete_all_objects(?ETS_DEVADDR_RANGES), + ets:delete_all_objects(?ETS), ok. -ifdef(TEST). -spec test_delete_ets() -> ok. test_delete_ets() -> - ets:delete(?ETS_DEVADDR_RANGES), + ets:delete(?ETS), ok. -spec test_size() -> non_neg_integer(). test_size() -> - ets:info(?ETS_DEVADDR_RANGES, size). + ets:info(?ETS, size). -spec test_tab_name() -> atom(). test_tab_name() -> - ?ETS_DEVADDR_RANGES. + ?ETS. -endif. @@ -114,12 +123,12 @@ test_tab_name() -> list({non_neg_integer(), non_neg_integer()}). lookup_for_route(RouteID) -> MS = [{{{'$1', '$2'}, RouteID}, [], [{{'$1', '$2'}}]}], - ets:select(?ETS_DEVADDR_RANGES, MS). + ets:select(?ETS, MS). -spec count_for_route(RouteID :: hpr_route:id()) -> non_neg_integer(). count_for_route(RouteID) -> MS = [{{'_', RouteID}, [], [true]}], - ets:select_count(?ETS_DEVADDR_RANGES, MS). + ets:select_count(?ETS, MS). %% ------------------------------------------------------------------- %% Route Stream Helpers @@ -128,7 +137,7 @@ count_for_route(RouteID) -> -spec delete_route(hpr_route:id()) -> non_neg_integer(). delete_route(RouteID) -> MS1 = [{{'_', RouteID}, [], [true]}], - ets:select_delete(?ETS_DEVADDR_RANGES, MS1). + ets:select_delete(?ETS, MS1). -spec replace_route( RouteID :: hpr_route:id(), @@ -138,3 +147,30 @@ replace_route(RouteID, DevAddrRanges) -> Removed = hpr_devaddr_range_storage:delete_route(RouteID), lists:foreach(fun ?MODULE:insert/1, DevAddrRanges), Removed. + +-spec rehydrate_from_dets() -> ok. +rehydrate_from_dets() -> + with_open_dets(fun() -> + case dets:to_ets(?DETS, ?ETS) of + {error, _Reason} -> + lager:error("failed ot hydrate ets: ~p", [_Reason]); + _ -> + lager:info("ets hydrated") + end + end). + +-spec with_open_dets(FN :: fun()) -> ok. +with_open_dets(FN) -> + DataDir = hpr_utils:base_data_dir(), + DETSFile = filename:join([DataDir, "hpr_devaddr_range_storage.dets"]), + ok = filelib:ensure_dir(DETSFile), + + case dets:open_file(?DETS, [{file, DETSFile}, {type, bag}]) of + {ok, _Dets} -> + FN(), + dets:close(?DETS); + {error, Reason} -> + Deleted = file:delete(DETSFile), + lager:warning("failed to open dets file ~p: ~p, deleted: ~p", [?MODULE, Reason, Deleted]), + with_open_dets(FN) + end. diff --git a/src/grpc/iot_config/hpr_eui_pair_storage.erl b/src/grpc/iot_config/hpr_eui_pair_storage.erl index 78b94ec7..863e8ed1 100644 --- a/src/grpc/iot_config/hpr_eui_pair_storage.erl +++ b/src/grpc/iot_config/hpr_eui_pair_storage.erl @@ -2,6 +2,7 @@ -export([ init_ets/0, + checkpoint/0, insert/1, lookup/2, @@ -22,24 +23,38 @@ -export([test_delete_ets/0, test_size/0, test_tab_name/0]). -endif. --define(ETS_EUI_PAIRS, hpr_route_eui_pairs_ets). +-define(ETS, hpr_route_eui_pairs_ets). +-define(DETS, hpr_route_eui_pairs_dets). -spec init_ets() -> ok. init_ets() -> - ?ETS_EUI_PAIRS = ets:new(?ETS_EUI_PAIRS, [public, named_table, bag, {read_concurrency, true}]), + ?ETS = ets:new(?ETS, [ + public, + named_table, + bag, + {read_concurrency, true} + ]), + ok = rehydrate_from_dets(), + ok. +-spec checkpoint() -> ok. +checkpoint() -> + with_open_dets(fun() -> + ok = dets:from_ets(?DETS, ?ETS) + end). + -spec lookup(AppEUI :: non_neg_integer(), DevEUI :: non_neg_integer()) -> [hpr_route_ets:route()]. lookup(AppEUI, 0) -> - EUIPairs = ets:lookup(?ETS_EUI_PAIRS, {AppEUI, 0}), + EUIPairs = ets:lookup(?ETS, {AppEUI, 0}), lists:flatten([ Route || {_, RouteID} <- EUIPairs, {ok, Route} <- [hpr_route_storage:lookup(RouteID)] ]); lookup(AppEUI, DevEUI) -> - EUIPairs = ets:lookup(?ETS_EUI_PAIRS, {AppEUI, DevEUI}), + EUIPairs = ets:lookup(?ETS, {AppEUI, DevEUI}), lists:usort( lists:flatten([ Route @@ -50,7 +65,7 @@ lookup(AppEUI, DevEUI) -> -spec insert(EUIPair :: hpr_eui_pair:eui_pair()) -> ok. insert(EUIPair) -> - true = ets:insert(?ETS_EUI_PAIRS, [ + true = ets:insert(?ETS, [ { {hpr_eui_pair:app_eui(EUIPair), hpr_eui_pair:dev_eui(EUIPair)}, hpr_eui_pair:route_id(EUIPair) @@ -68,7 +83,7 @@ insert(EUIPair) -> -spec delete(EUIPair :: hpr_eui_pair:eui_pair()) -> ok. delete(EUIPair) -> - true = ets:delete_object(?ETS_EUI_PAIRS, { + true = ets:delete_object(?ETS, { {hpr_eui_pair:app_eui(EUIPair), hpr_eui_pair:dev_eui(EUIPair)}, hpr_eui_pair:route_id(EUIPair) }), @@ -84,23 +99,23 @@ delete(EUIPair) -> -spec delete_all() -> ok. delete_all() -> - ets:delete_all_objects(?ETS_EUI_PAIRS), + ets:delete_all_objects(?ETS), ok. -ifdef(TEST). -spec test_delete_ets() -> ok. test_delete_ets() -> - ets:delete(?ETS_EUI_PAIRS), + ets:delete(?ETS), ok. -spec test_size() -> non_neg_integer(). test_size() -> - ets:info(?ETS_EUI_PAIRS, size). + ets:info(?ETS, size). -spec test_tab_name() -> atom(). test_tab_name() -> - ?ETS_EUI_PAIRS. + ?ETS. -endif. @@ -112,24 +127,24 @@ test_tab_name() -> list({AppEUI :: non_neg_integer(), DevEUI :: non_neg_integer()}). lookup_dev_eui(DevEUI) -> MS = [{{{'$1', DevEUI}, '_'}, [], [{{'$1', DevEUI}}]}], - ets:select(?ETS_EUI_PAIRS, MS). + ets:select(?ETS, MS). -spec lookup_app_eui(AppEUI :: non_neg_integer()) -> list({AppEUI :: non_neg_integer(), DevEUI :: non_neg_integer()}). lookup_app_eui(AppEUI) -> MS = [{{{AppEUI, '$1'}, '_'}, [], [{{AppEUI, '$1'}}]}], - ets:select(?ETS_EUI_PAIRS, MS). + ets:select(?ETS, MS). -spec lookup_for_route(RouteID :: hpr_route:id()) -> list({AppEUI :: non_neg_integer(), DevEUI :: non_neg_integer()}). lookup_for_route(RouteID) -> MS = [{{{'$1', '$2'}, RouteID}, [], [{{'$1', '$2'}}]}], - ets:select(?ETS_EUI_PAIRS, MS). + ets:select(?ETS, MS). -spec count_for_route(RouteID :: hpr_route:id()) -> non_neg_integer(). count_for_route(RouteID) -> MS = [{{'_', RouteID}, [], [true]}], - ets:select_count(?ETS_EUI_PAIRS, MS). + ets:select_count(?ETS, MS). %% ------------------------------------------------------------------- %% Route Stream Helpers @@ -138,7 +153,7 @@ count_for_route(RouteID) -> -spec delete_route(hpr_route:id()) -> non_neg_integer(). delete_route(RouteID) -> MS2 = [{{'_', RouteID}, [], [true]}], - ets:select_delete(?ETS_EUI_PAIRS, MS2). + ets:select_delete(?ETS, MS2). -spec replace_route(RouteID :: hpr_route:id(), EUIs :: list(hpr_eui_pair:eui_pair())) -> non_neg_integer(). @@ -146,3 +161,30 @@ replace_route(RouteID, EUIs) -> Removed = ?MODULE:delete_route(RouteID), lists:foreach(fun ?MODULE:insert/1, EUIs), Removed. + +-spec rehydrate_from_dets() -> ok. +rehydrate_from_dets() -> + with_open_dets(fun() -> + case dets:to_ets(?DETS, ?ETS) of + {error, _Reason} -> + lager:error("failed ot hydrate ets: ~p", [_Reason]); + _ -> + lager:info("ets hydrated") + end + end). + +with_open_dets(FN) -> + DataDir = hpr_utils:base_data_dir(), + DETSFile = filename:join([DataDir, "hpr_eui_pair_storage.dets"]), + ok = filelib:ensure_dir(DETSFile), + + case dets:open_file(?DETS, [{file, DETSFile}, {type, set}]) of + {ok, _Dets} -> + lager:info("~s opened by ~p", [DETSFile, self()]), + FN(), + dets:close(?DETS); + {error, Reason} -> + Deleted = file:delete(DETSFile), + lager:warning("failed to open dets file ~p: ~p, deleted: ~p", [?MODULE, Reason, Deleted]), + with_open_dets(FN) + end. diff --git a/src/grpc/iot_config/hpr_route_ets.erl b/src/grpc/iot_config/hpr_route_ets.erl index 0067619d..28b2baca 100644 --- a/src/grpc/iot_config/hpr_route_ets.erl +++ b/src/grpc/iot_config/hpr_route_ets.erl @@ -33,6 +33,7 @@ init() -> ok = hpr_route_storage:init_ets(), ok = hpr_devaddr_range_storage:init_ets(), + %% SKF hydration is handled by route hydration ok = hpr_eui_pair_storage:init_ets(), ok. diff --git a/src/grpc/iot_config/hpr_route_storage.erl b/src/grpc/iot_config/hpr_route_storage.erl index d9325d00..50fa5617 100644 --- a/src/grpc/iot_config/hpr_route_storage.erl +++ b/src/grpc/iot_config/hpr_route_storage.erl @@ -2,6 +2,7 @@ -export([ init_ets/0, + checkpoint/0, insert/1, insert/2, insert/3, delete/1, @@ -19,22 +20,40 @@ -export([test_delete_ets/0, test_size/0]). -endif. --define(ETS_ROUTES, hpr_routes_ets). +-define(ETS, hpr_routes_ets). +-define(DETS, hpr_routes_dets). -spec init_ets() -> ok. init_ets() -> - ?ETS_ROUTES = ets:new(?ETS_ROUTES, [ + ?ETS = ets:new(?ETS, [ public, named_table, set, {keypos, hpr_route_ets:ets_keypos()}, {read_concurrency, true} ]), + with_open_dets(fun() -> + [] = dets:traverse( + ?DETS, + fun(RouteETS) -> + Route = hpr_route_ets:route(RouteETS), + ok = ?MODULE:insert(Route), + continue + end + ) + end), + ok. +-spec checkpoint() -> ok. +checkpoint() -> + with_open_dets(fun() -> + ok = dets:from_ets(?DETS, ?ETS) + end). + -spec lookup(ID :: hpr_route:id()) -> {ok, hpr_route_ets:route()} | {error, not_found}. lookup(ID) -> - case ets:lookup(?ETS_ROUTES, ID) of + case ets:lookup(?ETS, ID) of [Route] -> {ok, Route}; _Other -> @@ -64,7 +83,7 @@ insert(Route, SKFETS) -> ) -> ok. insert(Route, SKFETS, Backoff) -> RouteETS = hpr_route_ets:new(Route, SKFETS, Backoff), - true = ets:insert(?ETS_ROUTES, RouteETS), + true = ets:insert(?ETS, RouteETS), Server = hpr_route:server(Route), RouteFields = [ {id, hpr_route:id(Route)}, @@ -88,7 +107,7 @@ delete(Route) -> EUIsEntries = hpr_eui_pair_storage:delete_route(RouteID), SKFEntries = hpr_skf_storage:delete_route(RouteID), - true = ets:delete(?ETS_ROUTES, RouteID), + true = ets:delete(?ETS, RouteID), lager:info( [{devaddr, DevAddrEntries}, {euis, EUIsEntries}, {skfs, SKFEntries}, {route_id, RouteID}], "route deleted" @@ -97,24 +116,24 @@ delete(Route) -> -spec delete_all() -> ok. delete_all() -> - ets:delete_all_objects(?ETS_ROUTES), + ets:delete_all_objects(?ETS), ok. -spec set_backoff(RouteID :: hpr_route:id(), Backoff :: hpr_route_ets:backoff()) -> ok. set_backoff(RouteID, Backoff) -> - true = ets:update_element(?ETS_ROUTES, RouteID, {5, Backoff}), + true = ets:update_element(?ETS, RouteID, {5, Backoff}), ok. -ifdef(TEST). -spec test_delete_ets() -> ok. test_delete_ets() -> - ets:delete(?ETS_ROUTES), + ets:delete(?ETS), ok. -spec test_size() -> non_neg_integer(). test_size() -> - ets:info(?ETS_ROUTES, size). + ets:info(?ETS, size). -endif. @@ -124,19 +143,40 @@ test_size() -> -spec all_routes() -> list(hpr_route:route()). all_routes() -> - [hpr_route_ets:route(R) || R <- ets:tab2list(?ETS_ROUTES)]. + [hpr_route_ets:route(R) || R <- ets:tab2list(?ETS)]. -spec all_route_ets() -> list(hpr_route_ets:route()). all_route_ets() -> - ets:tab2list(?ETS_ROUTES). + ets:tab2list(?ETS). -spec oui_routes(OUI :: non_neg_integer()) -> list(hpr_route_ets:route()). oui_routes(OUI) -> [ RouteETS - || RouteETS <- ets:tab2list(?ETS_ROUTES), OUI == hpr_route:oui(hpr_route_ets:route(RouteETS)) + || RouteETS <- ets:tab2list(?ETS), OUI == hpr_route:oui(hpr_route_ets:route(RouteETS)) ]. %% ------------------------------------------------------------------- %% Internal Functions %% ------------------------------------------------------------------- + +with_open_dets(FN) -> + DataDir = hpr_utils:base_data_dir(), + DETSFile = filename:join([DataDir, "hpr_routes_storage.dets"]), + ok = filelib:ensure_dir(DETSFile), + + case + dets:open_file(?DETS, [ + {file, DETSFile}, {type, set}, {keypos, hpr_route_ets:ets_keypos()} + ]) + of + {ok, _Dets} -> + lager:info("~s opened by ~p", [DETSFile, self()]), + FN(), + dets:close(?DETS); + %% ok; + {error, Reason} -> + Deleted = file:delete(DETSFile), + lager:warning("failed to open dets file ~p: ~p, deleted: ~p", [?MODULE, Reason, Deleted]), + with_open_dets(FN) + end. diff --git a/src/grpc/iot_config/hpr_route_stream_req.erl b/src/grpc/iot_config/hpr_route_stream_req.erl index 1d4da979..077db3d3 100644 --- a/src/grpc/iot_config/hpr_route_stream_req.erl +++ b/src/grpc/iot_config/hpr_route_stream_req.erl @@ -3,11 +3,12 @@ -include("../autogen/iot_config_pb.hrl"). -export([ - new/1, + new/1, new/2, timestamp/1, signer/1, signature/1, sign/2, + since/1, verify/1 ]). @@ -17,9 +18,14 @@ -spec new(Signer :: libp2p_crypto:pubkey_bin()) -> req(). new(Signer) -> + new(Signer, 0). + +-spec new(Signer :: libp2p_crypto:pubkey_bin(), Since :: non_neg_integer()) -> req(). +new(Signer, Since) -> #iot_config_route_stream_req_v1_pb{ signer = Signer, - timestamp = erlang:system_time(millisecond) + timestamp = erlang:system_time(millisecond), + since = Since }. -spec timestamp(RouteStreamReq :: req()) -> non_neg_integer(). @@ -41,6 +47,10 @@ sign(RouteStreamReq, SigFun) -> ), RouteStreamReq#iot_config_route_stream_req_v1_pb{signature = SigFun(EncodedRouteStreamReq)}. +-spec since(RouteStreamReq :: req()) -> non_neg_integer(). +since(RouteStreamReq) -> + RouteStreamReq#iot_config_route_stream_req_v1_pb.since. + -spec verify(RouteStreamReq :: req()) -> boolean(). verify(RouteStreamReq) -> EncodedRouteStreamReq = iot_config_pb:encode_msg( diff --git a/src/grpc/iot_config/hpr_route_stream_res.erl b/src/grpc/iot_config/hpr_route_stream_res.erl index 38dc04b8..f7f10f0e 100644 --- a/src/grpc/iot_config/hpr_route_stream_res.erl +++ b/src/grpc/iot_config/hpr_route_stream_res.erl @@ -4,7 +4,8 @@ -export([ action/1, - data/1 + data/1, + timestamp/1 ]). -ifdef(TEST). @@ -30,6 +31,17 @@ action(RouteStreamRes) -> data(RouteStreamRes) -> RouteStreamRes#iot_config_route_stream_res_v1_pb.data. +-spec timestamp(RouteStreamRes :: res()) -> non_neg_integer(). +timestamp(RouteStreamRes) -> + %% NOTE: All requests are sent with a millisecond timestamp. + %% Responses have Second timestamps. + %% HPR speaks exclusively in millisecond. + erlang:convert_time_unit( + RouteStreamRes#iot_config_route_stream_res_v1_pb.timestamp, + second, + millisecond + ). + %% ------------------------------------------------------------------ %% Tests Functions %% ------------------------------------------------------------------ @@ -39,7 +51,8 @@ data(RouteStreamRes) -> test_new(Map) -> #iot_config_route_stream_res_v1_pb{ action = maps:get(action, Map), - data = maps:get(data, Map) + data = maps:get(data, Map), + timestamp = maps:get(timestamp, Map, 0) }. -endif. diff --git a/src/grpc/iot_config/hpr_route_stream_worker.erl b/src/grpc/iot_config/hpr_route_stream_worker.erl index 1600dbd8..f2b4de8f 100644 --- a/src/grpc/iot_config/hpr_route_stream_worker.erl +++ b/src/grpc/iot_config/hpr_route_stream_worker.erl @@ -61,9 +61,24 @@ %% ------------------------------------------------------------------ -export([ start_link/1, - refresh_route/1 + refresh_route/1, + checkpoint/0, + schedule_checkpoint/0 ]). +-export([ + do_checkpoint/1, + reset_timestamp/0, + checkpoint_timer/0, + print_next_checkpoint/0, + last_timestamp/0, + reset_connection/0 +]). + +-ifdef(TEST). +-export([test_counts/0, test_stream/0]). +-endif. + %% ------------------------------------------------------------------ %% gen_server Function Exports %% ------------------------------------------------------------------ @@ -99,13 +114,24 @@ devaddr_added := non_neg_integer() }. +-type counts_map() :: #{ + route := non_neg_integer(), + eui_pair := non_neg_integer(), + skf := non_neg_integer(), + devaddr_range := non_neg_integer() +}. + -record(state, { stream :: grpcbox_client:stream() | undefined, - conn_backoff :: backoff:backoff() + conn_backoff :: backoff:backoff(), + counts :: counts_map(), + last_timestamp = 0 :: non_neg_integer(), + checkpoint_timer :: undefined | {TimeScheduled :: non_neg_integer(), timer:tref()} }). -define(SERVER, ?MODULE). -define(INIT_STREAM, init_stream). +-define(DETS, hpr_route_stream_worker_dets). %% ------------------------------------------------------------------ %% API Function Definitions @@ -121,6 +147,69 @@ start_link(Args) -> refresh_route(RouteID) -> gen_server:call(?MODULE, {refresh_route, RouteID}, timer:seconds(120)). +-spec checkpoint() -> ok. +checkpoint() -> + gen_server:call(?MODULE, checkpoint). + +-spec last_timestamp() -> non_neg_integer(). +last_timestamp() -> + gen_server:call(?MODULE, last_timetstamp). + +-spec do_checkpoint(LastTimestamp :: non_neg_integer()) -> ok. +do_checkpoint(LastTimestamp) -> + ok = hpr_route_storage:checkpoint(), + ok = hpr_eui_pair_storage:checkpoint(), + ok = hpr_devaddr_range_storage:checkpoint(), + ok = hpr_skf_storage:checkpoint(), + ok = dets:insert(?DETS, {timestamp, LastTimestamp}). + +-spec schedule_checkpoint() -> {TimeScheduled :: non_neg_integer(), timer:tref()}. +schedule_checkpoint() -> + Delay = hpr_utils:get_env_int(ics_stream_worker_checkpoint_secs, 300), + lager:info([{timer_secs, Delay}], "scheduling checkpoint"), + {ok, Timer} = timer:apply_after(timer:seconds(Delay), ?MODULE, checkpoint, []), + {erlang:system_time(millisecond), Timer}. + +-spec reset_timestamp() -> ok. +reset_timestamp() -> + gen_server:call(?MODULE, reset_timestamp). + +-spec checkpoint_timer() -> undefined | {TimeScheduled :: non_neg_integer(), timer:tref()}. +checkpoint_timer() -> + gen_server:call(?MODULE, checkpoint_timer). + +-spec print_next_checkpoint() -> string(). +print_next_checkpoint() -> + Msg = + case ?MODULE:checkpoint_timer() of + undefined -> + "Timer not active"; + {TimeScheduled, _TimerRef} -> + Now = erlang:system_time(millisecond), + TimeLeft = Now - TimeScheduled, + TotalSeconds = erlang:convert_time_unit(TimeLeft, millisecond, second), + {_Hour, Minute, Seconds} = calendar:seconds_to_time(TotalSeconds), + io_lib:format("Running again in T- ~pm ~ps", [Minute, Seconds]) + end, + lager:info(Msg), + Msg. + +-spec reset_connection() -> ok. +reset_connection() -> + gen_server:call(?MODULE, reset_connection, timer:seconds(30)). + +-ifdef(TEST). + +-spec test_counts() -> counts_map(). +test_counts() -> + gen_server:call(?MODULE, test_counts). + +-spec test_stream() -> undefined | grpcbox_client:stream(). +test_stream() -> + gen_server:call(?MODULE, test_stream). + +-endif. + %% ------------------------------------------------------------------ %% gen_server Function Definitions %% ------------------------------------------------------------------ @@ -129,9 +218,24 @@ init(Args) -> Backoff = backoff:type(backoff:init(?BACKOFF_MIN, ?BACKOFF_MAX), normal), lager:info("starting ~p with ~p", [?MODULE, Args]), self() ! ?INIT_STREAM, + + ok = open_dets(), + LastTimestamp = + case dets:lookup(?DETS, timestamp) of + [{timestamp, T}] -> T; + _ -> 0 + end, + {ok, #state{ stream = undefined, - conn_backoff = Backoff + conn_backoff = Backoff, + counts = #{ + route => 0, + eui_pair => 0, + skf => 0, + devaddr_range => 0 + }, + last_timestamp = LastTimestamp }}. handle_call({refresh_route, RouteID}, _From, State) -> @@ -166,26 +270,64 @@ handle_call({refresh_route, RouteID}, _From, State) -> end, {reply, Reply, State}; +handle_call(reset_timestamp, _From, #state{} = State) -> + ok = dets:insert(?DETS, {timestamp, 0}), + {reply, ok, State#state{last_timestamp = 0}}; +handle_call(last_timetstamp, _From, #state{last_timestamp = LastTimestamp} = State) -> + {reply, LastTimestamp, State}; +handle_call(checkpoint_timer, _From, #state{checkpoint_timer = CheckpointTimerRef} = State) -> + {reply, CheckpointTimerRef, State}; +handle_call(test_counts, _From, State) -> + {reply, State#state.counts, State}; +handle_call(test_stream, _From, State) -> + {reply, State#state.stream, State}; +handle_call(checkpoint, _From, #state{last_timestamp = LastTimestamp} = State) -> + lager:info([{timestamp, LastTimestamp}], "checkpointing configuration"), + + %% We don't spawn the checkpoints to reduce the chance of continued updates + %% causing weirdness in the DB. + ok = ?MODULE:do_checkpoint(LastTimestamp), + CheckpointTimerRef = ?MODULE:schedule_checkpoint(), + lager:info([{timestamp, LastTimestamp}], "checkpoint done"), + + {reply, ok, State#state{checkpoint_timer = CheckpointTimerRef}}; +handle_call(reset_connection, _From, State) -> + {stop, manual_connection_reset, ok, State}; handle_call(Msg, _From, State) -> {stop, {unimplemented_call, Msg}, State}. handle_cast(Msg, State) -> {stop, {unimplemented_cast, Msg}, State}. -handle_info(?INIT_STREAM, #state{conn_backoff = Backoff0} = State) -> - lager:info("connecting"), +handle_info(checkpoint, #state{} = State) -> + ok = ?MODULE:checkpoint(), + {noreply, State}; +handle_info( + ?INIT_STREAM, + #state{ + conn_backoff = Backoff0, + last_timestamp = LastTimestamp, + checkpoint_timer = PreviousCheckpointTimerRef + } = State +) -> + lager:info([{from, LastTimestamp}], "connecting"), + ok = maybe_cancel_timer(PreviousCheckpointTimerRef), SigFun = hpr_utils:sig_fun(), PubKeyBin = hpr_utils:pubkey_bin(), - RouteStreamReq = hpr_route_stream_req:new(PubKeyBin), + + RouteStreamReq = hpr_route_stream_req:new(PubKeyBin, LastTimestamp), SignedRouteStreamReq = hpr_route_stream_req:sign(RouteStreamReq, SigFun), StreamOptions = #{channel => ?IOT_CONFIG_CHANNEL}, + case helium_iot_config_route_client:stream(SignedRouteStreamReq, StreamOptions) of {ok, Stream} -> - lager:info("stream initialized"), + lager:info([{from, LastTimestamp}], "stream initialized"), {_, Backoff1} = backoff:succeed(Backoff0), - ok = hpr_route_ets:delete_all(), + Timer = ?MODULE:schedule_checkpoint(), {noreply, State#state{ - stream = Stream, conn_backoff = Backoff1 + stream = Stream, + conn_backoff = Backoff1, + checkpoint_timer = Timer }}; {error, undefined_channel} -> lager:error( @@ -199,18 +341,28 @@ handle_info(?INIT_STREAM, #state{conn_backoff = Backoff0} = State) -> {noreply, State#state{conn_backoff = Backoff1}} end; %% GRPC stream callbacks -handle_info({data, _StreamID, RouteStreamRes}, #state{} = State) -> +handle_info({data, _StreamID, RouteStreamRes}, #state{counts = Counts0} = State) -> Action = hpr_route_stream_res:action(RouteStreamRes), Data = hpr_route_stream_res:data(RouteStreamRes), + Timestamp = hpr_route_stream_res:timestamp(RouteStreamRes), {Type, _} = Data, lager:debug([{action, Action}, {type, Type}], "got route stream update"), - _ = erlang:spawn( - fun() -> + Counts1 = Counts0#{Type => maps:get(Type, Counts0, 0) + 1}, + case Type of + %% Routes are required for many updates, we don't spawn them to make + %% sure everything is setup by the time updates start coming in for the route. + route -> ok = process_route_stream_res(Action, Data), - ok = hpr_metrics:ics_update(Type, Action) - end - ), - {noreply, State}; + ok = hpr_metrics:ics_update(Type, Action); + _ -> + _ = erlang:spawn( + fun() -> + ok = process_route_stream_res(Action, Data), + ok = hpr_metrics:ics_update(Type, Action) + end + ) + end, + {noreply, State#state{counts = Counts1, last_timestamp = Timestamp}}; handle_info({headers, _StreamID, _Headers}, State) -> %% noop on headers {noreply, State}; @@ -263,6 +415,7 @@ handle_info(_Msg, State) -> terminate(_Reason, _State) -> lager:error("terminate ~p", [_Reason]), + dets:close(?DETS), ok. %% ------------------------------------------------------------------ @@ -317,7 +470,7 @@ refresh_skfs(RouteID) -> SKFs when erlang:is_list(SKFs) -> Previous = hpr_skf_storage:lookup_route(RouteID), PreviousCnt = hpr_skf_storage:replace_route(RouteID, SKFs), - ct:print( + lager:info( "route refresh skfs ~p", [{{previous, PreviousCnt}, {current, length(SKFs)}}] ), @@ -430,3 +583,24 @@ do_recv_from_stream(stream_finished, _Stream, Acc) -> do_recv_from_stream(Msg, _Stream, _Acc) -> lager:warning("unhandled msg from stream: ~p", [Msg]), {error, {unhandled_message, Msg}}. + +-spec open_dets() -> ok. +open_dets() -> + DataDir = hpr_utils:base_data_dir(), + DETSFile = filename:join(DataDir, "stream_worker.dets"), + ok = filelib:ensure_dir(DETSFile), + case dets:open_file(?DETS, [{file, DETSFile}, {type, set}]) of + {ok, _DETS} -> + ok; + {error, _Reason} -> + Deleted = file:delete(DETSFile), + lager:error("failed to open dets ~p deleting file ~p", [_Reason, Deleted]), + open_dets() + end. + +-spec maybe_cancel_timer(undefined | {TimeScheduled :: non_neg_integer(), timer:tref()}) -> ok. +maybe_cancel_timer(undefined) -> + ok; +maybe_cancel_timer({_TimeScheduled, Timer}) -> + lager:info([{t, Timer}, {timer, timer:cancel(Timer)}], "maybe cancelling timer"), + ok. diff --git a/src/grpc/iot_config/hpr_skf_storage.erl b/src/grpc/iot_config/hpr_skf_storage.erl index 0875d078..d225bba5 100644 --- a/src/grpc/iot_config/hpr_skf_storage.erl +++ b/src/grpc/iot_config/hpr_skf_storage.erl @@ -2,6 +2,8 @@ -export([ make_ets/1, + checkpoint/0, + dets_filename/1, insert/1, update/4, @@ -27,18 +29,69 @@ -endif. -define(ETS_SKFS, hpr_route_skfs_ets). +-define(DETS_FILENAME(ROUTE_ID), io_lib:format("hpr_skf_~s.dets", [ROUTE_ID])). -define(SKF_HEIR, hpr_sup). -spec make_ets(hpr_route:id()) -> ets:tab(). make_ets(RouteID) -> - ets:new(?ETS_SKFS, [ + Ref = ets:new(?ETS_SKFS, [ public, set, {read_concurrency, true}, {write_concurrency, true}, {heir, erlang:whereis(?SKF_HEIR), RouteID} - ]). + ]), + + lager:info("rehydrating SKF from dets: ~p", [RouteID]), + ok = rehydrate_from_dets(RouteID, Ref), + + Ref. + +-spec dets_filename(Route :: hpr_route:id()) -> list(). +dets_filename(RouteID) -> + DataDir = hpr_utils:base_data_dir(), + Filename = ?DETS_FILENAME(RouteID), + filename:join([DataDir, Filename]). + +-spec checkpoint() -> ok. +checkpoint() -> + lists:foreach( + fun(RouteETS) -> + Route = hpr_route_ets:route(RouteETS), + RouteID = hpr_route:id(Route), + DETSFile = ?MODULE:dets_filename(RouteID), + + ETS = hpr_route_ets:skf_ets(RouteETS), + with_open_dets(DETSFile, fun() -> ok = dets:from_ets(DETSFile, ETS) end) + end, + hpr_route_storage:all_route_ets() + ), + ok. + +-spec with_open_dets(Filename :: list(), Fn :: fun()) -> ok. +with_open_dets(Filename, Fn) -> + ok = filelib:ensure_dir(Filename), + + case dets:open_file(Filename, [{type, set}]) of + {ok, _} -> + lager:info("opened dets: ~s~n", [Filename]), + Fn(), + dets:close(Filename); + {error, _Reason} -> + Deleted = file:delete(Filename), + lager:warning("failed to open file ~p: ~p, deleted: ~p", [Filename, _Reason, Deleted]), + with_open_dets(Filename, Fn) + end. + +rehydrate_from_dets(RouteID, EtsRef) -> + Filename = ?MODULE:dets_filename(RouteID), + with_open_dets(Filename, fun() -> + [] = dets:traverse(Filename, fun(SKF) -> + ok = do_rehydrate_insert_skf(EtsRef, SKF), + continue + end) + end). -spec lookup(ETS :: ets:table(), DevAddr :: non_neg_integer()) -> [{SessionKey :: binary(), MaxCopies :: non_neg_integer()}]. @@ -159,7 +212,21 @@ delete_route(RouteID) -> SKFETS = hpr_route_ets:skf_ets(RouteETS), Size = ets:info(SKFETS, size), ets:delete(SKFETS), + DetsFilename = ?MODULE:dets_filename(RouteID), + _ = file:delete(DetsFilename), Size; + {error, not_found} = Err -> + DetsFilename = ?MODULE:dets_filename(RouteID), + Deleted = file:delete(DetsFilename), + lager:info( + [ + {route_id, RouteID}, + {deleted, Deleted}, + {filename, DetsFilename} + ], + "route not found, skf file maybe deleted" + ), + Err; Other -> lager:warning("failed to delete skf table ~p for ~s", [Other, RouteID]), {error, Other} @@ -182,7 +249,6 @@ replace_route(RouteID, NewSKFs) -> OldSize = ets:info(OldTab, size), - ct:print("replace on ~s: ~p", [RouteID, [{old, OldSize}, {new, length(NewSKFs)}]]), ets:delete(OldTab), {ok, OldSize}; Other -> @@ -223,6 +289,14 @@ do_insert_skf(SKFETS, SKF) -> true = ets:insert(SKFETS, {hpr_utils:hex_to_bin(SessionKey), {DevAddr, MaxCopies}}), ok. +-spec do_rehydrate_insert_skf( + Table :: ets:table(), + Entry :: {SessionKey :: binary(), {DevAddr :: binary(), MaxCopies :: non_neg_integer()}} +) -> ok. +do_rehydrate_insert_skf(SKFETS, SKF) -> + true = ets:insert(SKFETS, SKF), + ok. + -spec skf_md(hpr_route:id(), hpr_skf:skf()) -> proplists:proplist(). skf_md(RouteID, SKF) -> [ diff --git a/src/hpr_sup.erl b/src/hpr_sup.erl index 1cf59991..656c0ccb 100644 --- a/src/hpr_sup.erl +++ b/src/hpr_sup.erl @@ -54,12 +54,12 @@ init([]) -> ok = filelib:ensure_dir(KeyFileName), ok = hpr_utils:load_key(KeyFileName), - ok = hpr_routing_cache:init_ets(), - ok = hpr_routing:init(), - ok = hpr_multi_buy:init(), - ok = hpr_protocol_router:init(), - ok = hpr_route_ets:init(), - ok = hpr_gateway_location:init(), + ok = timing("packet routing cache", fun() -> hpr_routing_cache:init_ets() end), + ok = timing("routing throttles", fun() -> hpr_routing:init() end), + ok = timing("multi buy", fun() -> hpr_multi_buy:init() end), + ok = timing("packet_router streams", fun() -> hpr_protocol_router:init() end), + ok = timing("config service", fun() -> hpr_route_ets:init() end), + ok = timing("gw location", fun() -> hpr_gateway_location:init() end), PacketReporterConfig = application:get_env(?APP, packet_reporter, #{}), ConfigServiceConfig = application:get_env(?APP, iot_config_service, #{}), @@ -125,3 +125,11 @@ maybe_start_channel(Config, ChannelName) -> _ -> lager:error("no host/port/transport to start ~s", [ChannelName]) end. + +-spec timing(Label:: string(), Fn :: fun()) -> ok. +timing(Label, Fn) -> + Start = erlang:system_time(millisecond), + Result = Fn(), + End = erlang:system_time(millisecond), + lager:info("~s took ~w ms", [Label, End - Start]), + Result. diff --git a/src/hpr_utils.erl b/src/hpr_utils.erl index 74b0c76a..05347595 100644 --- a/src/hpr_utils.erl +++ b/src/hpr_utils.erl @@ -233,7 +233,6 @@ b58() -> -spec base_data_dir() -> string(). base_data_dir() -> DataDir = application:get_env(?APP, data_dir, ?DATA_DIR), - lager:info("base data dir ~s", [DataDir]), ok = filelib:ensure_dir(DataDir), DataDir. diff --git a/src/metrics/hpr_metrics.erl b/src/metrics/hpr_metrics.erl index 615ede2b..1e4b1fc5 100644 --- a/src/metrics/hpr_metrics.erl +++ b/src/metrics/hpr_metrics.erl @@ -20,6 +20,10 @@ observe_gateway_location/2 ]). +-export([ + counts/0 +]). + %% ------------------------------------------------------------------ %% gen_server Function Exports %% ------------------------------------------------------------------ @@ -138,6 +142,29 @@ observe_gateway_location(Start, Status) -> erlang:system_time(millisecond) - Start ). +%% ------------------------------------------------------------------ +%% CLI Function Definitions +%% ------------------------------------------------------------------ + +-spec counts() -> proplists:proplist(). +counts() -> + [ + {routes, ets:info(hpr_routes_ets, size)}, + {eui_pairs, ets:info(hpr_route_eui_pairs_ets, size)}, + {devaddr_ranges, ets:info(hpr_route_devaddr_ranges_ets, size)}, + {skfs, + lists:foldl( + fun(RouteETS, Acc) -> + case ets:info(hpr_route_ets:skf_ets(RouteETS), size) of + undefined -> Acc; + N -> N + Acc + end + end, + 0, + ets:tab2list(hpr_routes_ets) + )} + ]. + %% ------------------------------------------------------------------ %% gen_server Function Definitions %% ------------------------------------------------------------------ @@ -213,6 +240,7 @@ declare_metrics() -> ?METRICS ). + -spec record_routes() -> ok. record_routes() -> case ets:info(hpr_routes_ets, size) of diff --git a/test/hpr_route_stream_worker_SUITE.erl b/test/hpr_route_stream_worker_SUITE.erl index a31c422a..92736283 100644 --- a/test/hpr_route_stream_worker_SUITE.erl +++ b/test/hpr_route_stream_worker_SUITE.erl @@ -3,7 +3,6 @@ -include_lib("eunit/include/eunit.hrl"). -include("hpr_metrics.hrl"). --include("../src/grpc/autogen/iot_config_pb.hrl"). -export([ all/0, @@ -13,7 +12,10 @@ -export([ main_test/1, - refresh_route_test/1 + refresh_route_test/1, + stream_crash_resume_updates_test/1, + app_restart_rehydrate_test/1, + route_remove_delete_skf_dets_test/1 ]). %%-------------------------------------------------------------------- @@ -29,7 +31,10 @@ all() -> [ main_test, - refresh_route_test + refresh_route_test, + stream_crash_resume_updates_test, + app_restart_rehydrate_test, + route_remove_delete_skf_dets_test ]. %%-------------------------------------------------------------------- @@ -50,6 +55,276 @@ end_per_testcase(TestCase, Config) -> %% TEST CASES %%-------------------------------------------------------------------- +route_remove_delete_skf_dets_test(_Config) -> + #{ + route_id := Route1ID, + route := Route1, + eui_pair := EUIPair1, + devaddr_range := DevAddrRange1, + skf := SessionKeyFilter1 + } = test_data("7d502f32-4d58-4746-965e-001"), + #{ + route_id := Route2ID, + route := Route2, + eui_pair := EUIPair2, + devaddr_range := DevAddrRange2, + skf := SessionKeyFilter2 + } = test_data("7d502f32-4d58-4746-965e-002"), + + Updates0 = [ + {route, Route1}, + {eui_pair, EUIPair1}, + {devaddr_range, DevAddrRange1}, + {skf, SessionKeyFilter1}, + {route, Route2}, + {eui_pair, EUIPair2}, + {devaddr_range, DevAddrRange2}, + {skf, SessionKeyFilter2} + ], + Updates = [ + hpr_route_stream_res:test_new(#{ + action => add, data => Data, timestamp => 100 + }) + || Data <- Updates0 + ], + [ok = hpr_test_ics_route_service:stream_resp(Update) || Update <- Updates], + timer:sleep(20), + + ok = check_config_counts(Route1ID, 2, 2, 2, 1), + ok = check_config_counts(Route2ID, 2, 2, 2, 1), + + Route1SKFFileName = hpr_skf_storage:dets_filename(Route1ID), + Route2SKFFileName = hpr_skf_storage:dets_filename(Route2ID), + + ?assert(filelib:is_file(Route1SKFFileName)), + ?assert(filelib:is_file(Route2SKFFileName)), + + %% Remove a route and ensure the skf file is removed. + ok = hpr_test_ics_route_service:stream_resp( + hpr_route_stream_res:test_new(#{ + action => remove, + data => {route, Route1}, + timestamp => 200 + }) + ), + + timer:sleep(20), + ?assertNot(filelib:is_file(Route1SKFFileName)), + ok = check_config_counts(Route2ID, 1, 1, 1, 1), + + ok. + +app_restart_rehydrate_test(_Config) -> + %% Fill up the app with a few config things. + ?assertMatch( + #{ + route := 0, + eui_pair := 0, + devaddr_range := 0, + skf := 0 + }, + hpr_route_stream_worker:test_counts() + ), + + %% Create a bunch of data to ingest + #{ + route_id := Route1ID, + route := Route1, + eui_pair := EUIPair1, + devaddr_range := DevAddrRange1, + skf := SessionKeyFilter1 + } = test_data("7d502f32-4d58-4746-965e-001"), + + hpr_test_ics_route_service:stream_resp( + hpr_route_stream_res:test_new(#{action => add, data => {route, Route1}, timestamp => 100}) + ), + timer:sleep(100), + + Updates1 = [ + hpr_route_stream_res:test_new(#{ + action => add, data => {eui_pair, EUIPair1}, timestamp => 100 + }), + hpr_route_stream_res:test_new(#{ + action => add, data => {devaddr_range, DevAddrRange1}, timestamp => 100 + }), + hpr_route_stream_res:test_new(#{ + action => add, data => {skf, SessionKeyFilter1}, timestamp => 100 + }) + ], + [ok = hpr_test_ics_route_service:stream_resp(Update) || Update <- Updates1], + ct:print("all updates sent"), + timer:sleep(timer:seconds(1)), + + %% make sure all the data was received + %% ok = timer:sleep(150), + ok = check_config_counts(Route1ID, 1, 1, 1, 1), + ?assertMatch( + #{ + route := 1, + eui_pair := 1, + devaddr_range := 1, + skf := 1 + }, + hpr_route_stream_worker:test_counts() + ), + + %% Timestamps are inclusive, send a route update to bump the last timestamp + hpr_test_ics_route_service:stream_resp( + hpr_route_stream_res:test_new(#{action => add, data => {route, Route1}, timestamp => 150}) + ), + ok = timer:sleep(150), + ct:print("checkpointing"), + %% Make sure the latest config timestamp is saved + ok = hpr_route_stream_worker:checkpoint(), + + %% Stop the app. + ct:print("stopping the app"), + ok = application:stop(hpr), + %% Restart the app. + timer:sleep(100), + ct:print("starting the app"), + {ok, _} = application:ensure_all_started(hpr), + ok = test_utils:wait_until( + fun() -> + Stream = hpr_route_stream_worker:test_stream(), + %% {state, Stream, _Backoff} = sys:get_state(hpr_route_stream_worker), + Stream =/= undefined andalso + erlang:is_pid(erlang:whereis(hpr_test_ics_route_service)) + end, + 20, + 500 + ), + ct:print("everything should be rehydrated"), + %% Make sure the config is still there. + ok = check_config_counts(Route1ID, 1, 1, 1, 1), + %% And no new updates have been received. + ?assertMatch( + #{ + route := 0, + eui_pair := 0, + devaddr_range := 0, + skf := 0 + }, + hpr_route_stream_worker:test_counts() + ), + + ok. + +stream_crash_resume_updates_test(_Config) -> + %% The first time the stream worker starts up, it should ingest all available config. + %% Then we kill it. + %% Then we start it up again, and it should ingest only new available config. + + ?assertMatch( + #{ + route := 0, + eui_pair := 0, + devaddr_range := 0, + skf := 0 + }, + hpr_route_stream_worker:test_counts() + ), + + %% Create a bunch of data to ingest + #{ + route_id := Route1ID, + route := Route1, + eui_pair := EUIPair1, + devaddr_range := DevAddrRange1, + skf := SessionKeyFilter1 + } = test_data("7d502f32-4d58-4746-965e-001"), + + Updates1 = [ + hpr_route_stream_res:test_new(#{action => add, data => {route, Route1}, timestamp => 100}), + hpr_route_stream_res:test_new(#{ + action => add, data => {eui_pair, EUIPair1}, timestamp => 100 + }), + hpr_route_stream_res:test_new(#{ + action => add, data => {devaddr_range, DevAddrRange1}, timestamp => 100 + }), + hpr_route_stream_res:test_new(#{ + action => add, data => {skf, SessionKeyFilter1}, timestamp => 100 + }) + ], + [ok = hpr_test_ics_route_service:stream_resp(Update) || Update <- Updates1], + + %% make sure all the data was received + %% ok = timer:sleep(150), + ok = check_config_counts(Route1ID, 1, 1, 1, 1), + ?assertMatch( + #{ + route := 1, + eui_pair := 1, + devaddr_range := 1, + skf := 1 + }, + hpr_route_stream_worker:test_counts() + ), + + %% Timestamps are inclusive, send a route update to bump the last timestamp + hpr_test_ics_route_service:stream_resp( + hpr_route_stream_res:test_new(#{action => add, data => {route, Route1}, timestamp => 150}) + ), + ok = timer:sleep(150), + %% Make sure the latest config timestamp is saved + ok = hpr_route_stream_worker:checkpoint(), + %% Kill the stream worker + exit(whereis(hpr_route_stream_worker), kill), + ok = test_utils:wait_until( + fun() -> + whereis(hpr_route_stream_worker) =/= undefined andalso + erlang:is_process_alive(whereis(hpr_route_stream_worker)) andalso + hpr_route_stream_worker:test_stream() =/= undefined andalso + erlang:is_pid(erlang:whereis(hpr_test_ics_route_service)) + end, + 20, + 500 + ), + + %% Create a bunch of new data to ingest + #{ + route_id := Route2ID, + route := Route2, + eui_pair := EUIPair2, + devaddr_range := DevAddrRange2, + skf := SessionKeyFilter2 + } = test_data("7d502f32-4d58-4746-965e-002"), + + %% Send the old data, and the new data + Updates2 = + Updates1 ++ + [ + hpr_route_stream_res:test_new(#{ + action => add, data => {route, Route2}, timestamp => 200 + }), + hpr_route_stream_res:test_new(#{ + action => add, data => {eui_pair, EUIPair2}, timestamp => 200 + }), + hpr_route_stream_res:test_new(#{ + action => add, data => {devaddr_range, DevAddrRange2}, timestamp => 200 + }), + hpr_route_stream_res:test_new(#{ + action => add, data => {skf, SessionKeyFilter2}, timestamp => 200 + }) + ], + [ok = hpr_test_ics_route_service:stream_resp(Update) || Update <- Updates2], + + %% make sure only the new data was received + timer:sleep(timer:seconds(1)), + %% NOTE: we expect 2 of everything, but skfs are checked per route. + ok = check_config_counts(Route2ID, 2, 2, 2, 1), + ?assertMatch( + #{ + route := 1, + eui_pair := 1, + devaddr_range := 1, + skf := 1 + }, + hpr_route_stream_worker:test_counts() + ), + + ok. + main_test(_Config) -> %% Let it startup timer:sleep(500), @@ -95,19 +370,7 @@ main_test(_Config) -> ), %% Let time to process new routes - ok = test_utils:wait_until( - fun() -> - case hpr_route_storage:lookup(Route1ID) of - {ok, RouteETS} -> - 1 =:= ets:info(hpr_routes_ets, size) andalso - 1 =:= ets:info(hpr_route_eui_pairs_ets, size) andalso - 1 =:= ets:info(hpr_route_devaddr_ranges_ets, size) andalso - 1 =:= ets:info(hpr_route_ets:skf_ets(RouteETS), size); - _ -> - false - end - end - ), + ok = check_config_counts(Route1ID, 1, 1, 1, 1), ?assertEqual( 1, @@ -196,19 +459,9 @@ main_test(_Config) -> ok = hpr_test_ics_route_service:stream_resp( hpr_route_stream_res:test_new(#{action => add, data => {skf, SessionKeyFilter}}) ), - ok = test_utils:wait_until( - fun() -> - case hpr_route_storage:lookup(Route1ID) of - {ok, RouteETS} -> - 1 =:= ets:info(hpr_routes_ets, size) andalso - 1 =:= ets:info(hpr_route_eui_pairs_ets, size) andalso - 1 =:= ets:info(hpr_route_devaddr_ranges_ets, size) andalso - 1 =:= ets:info(hpr_route_ets:skf_ets(RouteETS), size); - _ -> - false - end - end - ), + + ok = check_config_counts(Route1ID, 1, 1, 1, 1), + ?assertMatch([RouteETS1], hpr_devaddr_range_storage:lookup(16#00000005)), ?assertEqual([RouteETS1], hpr_eui_pair_storage:lookup(1, 12)), ?assertEqual([RouteETS1], hpr_eui_pair_storage:lookup(1, 100)), @@ -279,19 +532,7 @@ refresh_route_test(_Config) -> ), %% Let time to process new routes - ok = test_utils:wait_until( - fun() -> - case hpr_route_storage:lookup(Route1ID) of - {ok, RouteETS} -> - 1 =:= ets:info(hpr_routes_ets, size) andalso - 1 =:= ets:info(hpr_route_eui_pairs_ets, size) andalso - 1 =:= ets:info(hpr_route_devaddr_ranges_ets, size) andalso - 1 =:= ets:info(hpr_route_ets:skf_ets(RouteETS), size); - _ -> - false - end - end - ), + ok = check_config_counts(Route1ID, 1, 1, 1, 1), {ok, RouteETS1} = hpr_route_storage:lookup(Route1ID), SKFETS1 = hpr_route_ets:skf_ets(RouteETS1), @@ -401,3 +642,78 @@ refresh_route_test(_Config) -> ?assertEqual([], hpr_skf_storage:lookup(SKFETS3, DevAddr1)), ok. + +%% =================================================================== +%% Helpers +%% =================================================================== + +test_data(RouteID) -> + Route1 = hpr_route:test_new(#{ + id => RouteID, + net_id => 0, + oui => 1, + server => #{ + host => "localhost", + port => 8080, + protocol => {packet_router, #{}} + }, + max_copies => 10 + }), + EUIPair1 = hpr_eui_pair:test_new(#{ + route_id => RouteID, app_eui => 1, dev_eui => 0 + }), + DevAddrRange1 = hpr_devaddr_range:test_new(#{ + route_id => RouteID, start_addr => 16#00000001, end_addr => 16#0000000A + }), + DevAddr1 = 16#00000001, + SessionKey1 = hpr_utils:bin_to_hex_string(crypto:strong_rand_bytes(16)), + SessionKeyFilter1 = hpr_skf:new(#{ + route_id => RouteID, + devaddr => DevAddr1, + session_key => SessionKey1, + max_copies => 1 + }), + #{ + route_id => RouteID, + route => Route1, + eui_pair => EUIPair1, + devaddr_range => DevAddrRange1, + skf => SessionKeyFilter1 + }. + +check_config_counts( + RouteID, + ExpectedRouteCount, + ExpectedEUIPairCount, + ExpectedDevaddrRangeCount, + %% NOTE: SKF are separated by Route, provide amount expected for RouteID + ExpectedSKFCount +) -> + ok = test_utils:wait_until( + fun() -> + case hpr_route_storage:lookup(RouteID) of + {ok, RouteETS} -> + RouteCount = ets:info(hpr_routes_ets, size), + EUIPairCount = ets:info(hpr_route_eui_pairs_ets, size), + DevaddrRangeCount = ets:info(hpr_route_devaddr_ranges_ets, size), + SKFCount = ets:info(hpr_route_ets:skf_ets(RouteETS), size), + + { + ExpectedRouteCount =:= RouteCount andalso + ExpectedEUIPairCount =:= EUIPairCount andalso + ExpectedDevaddrRangeCount =:= DevaddrRangeCount andalso + ExpectedSKFCount =:= SKFCount, + [ + {route_id, RouteID}, + {route, ExpectedRouteCount, RouteCount}, + {eui_pair, ExpectedEUIPairCount, EUIPairCount}, + {devaddr_range, ExpectedDevaddrRangeCount, DevaddrRangeCount}, + {skf, ExpectedSKFCount, SKFCount}, + {skf_items, ets:tab2list(hpr_route_ets:skf_ets(RouteETS))} + ] + }; + _ -> + {false, {route_not_found, RouteID}} + end + end + ). diff --git a/test/hpr_test_downlink_service_http_roaming.erl b/test/hpr_test_downlink_service_http_roaming.erl index b940d126..2b370744 100644 --- a/test/hpr_test_downlink_service_http_roaming.erl +++ b/test/hpr_test_downlink_service_http_roaming.erl @@ -24,6 +24,10 @@ -spec init(atom(), StreamState :: grpcbox_stream:t()) -> grpcbox_stream:t(). init(_RPC, StreamState) -> Self = self(), + case lists:member(?MODULE, erlang:registered()) of + false -> ok; + true -> erlang:unregister(?MODULE) + end, true = erlang:register(?MODULE, self()), PubKeyBin = hpr_utils:pubkey_bin(), lager:notice("init ~p @ ~p with signer ~p", [?MODULE, Self, PubKeyBin]), diff --git a/test/hpr_test_ics_route_service.erl b/test/hpr_test_ics_route_service.erl index b215a8f5..b9cbe6fa 100644 --- a/test/hpr_test_ics_route_service.erl +++ b/test/hpr_test_ics_route_service.erl @@ -31,11 +31,17 @@ stream_resp/1 ]). +-record(stream_handler_state, {since = 0 :: non_neg_integer()}). + -spec init(atom(), StreamState :: grpcbox_stream:t()) -> grpcbox_stream:t(). init(RPC, StreamState) -> case RPC of stream -> Self = self(), + case lists:member(?MODULE, erlang:registered()) of + false -> ok; + true -> erlang:unregister(?MODULE) + end, true = erlang:register(?MODULE, self()), lager:notice("init ~p @ ~p", [?MODULE, Self]); _ -> @@ -45,8 +51,19 @@ init(RPC, StreamState) -> -spec handle_info(Msg :: any(), StreamState :: grpcbox_stream:t()) -> grpcbox_stream:t(). handle_info({stream_resp, RouteStreamResp}, StreamState) -> - lager:notice("got RouteStreamResp ~p", [RouteStreamResp]), - grpcbox_stream:send(false, RouteStreamResp, StreamState); + %% If the timestamp of the message is after `since`, send the update. + #stream_handler_state{since = Since} = grpcbox_stream:stream_handler_state(StreamState), + Timestamp = hpr_route_stream_res:timestamp(RouteStreamResp), + MD = [{since, Since}, {timestamp, Timestamp}], + %% ct:print("got RouteStreamResp ~p~n~p", [RouteStreamResp, MD]), + case Since =< Timestamp of + true -> + lager:notice("sending RouteStreamResp ~p", [MD ++ [RouteStreamResp]]), + grpcbox_stream:send(false, RouteStreamResp, StreamState); + false -> + lager:notice("ignoring RouteStreamResp ~p", [MD ++ [RouteStreamResp]]), + StreamState + end; handle_info(_Msg, StreamState) -> StreamState. @@ -70,7 +87,10 @@ stream(RouteStreamReq, StreamState) -> false -> {grpc_error, {grpcbox_stream:code_to_status(7), <<"PERMISSION_DENIED">>}}; true -> - {ok, StreamState} + {ok, + grpcbox_stream:stream_handler_state(StreamState, #stream_handler_state{ + since = hpr_route_stream_req:since(RouteStreamReq) + })} end. get_euis(GetEUIsReq, StreamState) -> diff --git a/test/test_utils.erl b/test/test_utils.erl index f7a6ea91..8df646a4 100644 --- a/test/test_utils.erl +++ b/test/test_utils.erl @@ -89,7 +89,8 @@ init_per_testcase(TestCase, Config) -> ok = test_utils:wait_until( fun() -> - {state, Stream, _Backoff} = sys:get_state(hpr_route_stream_worker), + Stream = hpr_route_stream_worker:test_stream(), + %% {state, Stream, _Backoff} = sys:get_state(hpr_route_stream_worker), Stream =/= undefined andalso erlang:is_pid(erlang:whereis(hpr_test_ics_route_service)) end,