Skip to content

Commit

Permalink
rehydrate since (#276)
Browse files Browse the repository at this point in the history
* move cli functions to their modules

* config rehydrate, and updates since

- rehydrate config from dets files upon startup
- when starting the config update stream, send the timestamp of the last
  update we recieved

- todo: if any of the config cannot be rehydrated, remove everything and
start from the beginning.

* fix table names and tests passing

use with_open_dets helpers. Keeping them in individual files so it
doesn't become unneccassarily generic

* add some timing around ets initialization in supervisor

* do not spawn skf rehydration

to avoid conflicts of rehydrating while updates are streaming in.

* make sure all the ets/dets config is consistent

* remove from merge

* rebase test service rename

* process routes before other updates

* Start the checkpoint timer when a stream is successfully initialized.

maybe cancel any previouss timers so we don't build up checkpoints if a
disconnect happens.

* don't get caught in a checkpoint loop or race condition

- add helpers for manually running checkpoints.
- cancel timers correctly.
- Don't make a timer until we have a connection.

* helpers for checking next checkpoint

* delete dets file when route is deleted

* Route Delete, ensure skf file deletion

A Route Delete can come through as part of the restreaming of events.
The Route may not exist in the ets table, and we want to make sure teh
skf file does not stick around.

* HPR only deals in milliseconds

timestamps from routing responses are unix epoch seconds, requests are
sent in milliseconds.

* helper counts function for metrics CLI

* fix counts table name

* return message to log and print

* reset connection by killing the worker

* add cli commands

* proto has been merged upstream
  • Loading branch information
michaeldjeffrey authored Jan 16, 2024
1 parent d80223c commit 0b1ef6e
Show file tree
Hide file tree
Showing 19 changed files with 959 additions and 120 deletions.
3 changes: 2 additions & 1 deletion config/ct.config
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,8 @@
}},
{multi_buy_enabled, true},
{routing_cache_window_secs, 5},
{routing_cache_timeout_secs, 10}
{routing_cache_timeout_secs, 10},
{ics_stream_worker_checkpoint_secs, 3}
]},
{aws_credentials, [
{credential_providers, [aws_credentials_env]},
Expand Down
3 changes: 2 additions & 1 deletion config/sys.config.src
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,8 @@
port => "${HPR_MULTI_BUY_SERVICE_PORT}"
}},
{routing_cache_timeout_secs, 15},
{routing_cache_window_secs, 120}
{routing_cache_window_secs, 120},
{ics_stream_worker_checkpoint_secs, 300}
]},
{aws_credentials, [
{credential_providers, [aws_credentials_env, aws_credentials_ec2]}
Expand Down
2 changes: 1 addition & 1 deletion rebar.lock
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@
{<<"hackney">>,{pkg,<<"hackney">>,<<"1.17.0">>},0},
{<<"helium_proto">>,
{git,"https://github.com/helium/proto.git",
{ref,"8d81732f55aa34442ada818c93f504f1a16ca659"}},
{ref,"585704c871d32846a6e35d186a08443883545687"}},
0},
{<<"hpack">>,{pkg,<<"hpack_erl">>,<<"0.3.0">>},2},
{<<"http2_client">>,
Expand Down
73 changes: 72 additions & 1 deletion src/cli/hpr_cli_config.erl
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,12 @@ config_usage() ->
"config route deactivate <route_id> - Deactivate route\n",
"config skf <DevAddr/Session Key> - List all Session Key Filters for Devaddr or Session Key\n",
"config eui --app <app_eui> --dev <dev_eui> - List all Routes with EUI pair\n"
"\n\n",
"config counts - Simple Counts of Configuration\n",
"config checkpoint next - Time until next writing of configuration to disk\n"
"config checkpoint write - Write current configuration to disk\n",
"config checkpoint reset - Set checkpoint timestamp to beginning of time (0)\n"
"config reconnect - Reset connection to Configuration Service\n"
]
].

Expand Down Expand Up @@ -102,7 +108,17 @@ config_cmd() ->
]}
],
fun config_eui/3
]
],
[["config", "counts"], [], [], fun config_counts/3],
[["config", "checkpoint", "next"], [], [], fun config_checkpoint_next/3],
[["config", "checkpoint", "write"], [], [], fun config_checkpoint_write/3],
[
["config", "checkpoint", "reset"],
[],
[{commit, [{longname, "commit"}, {datatype, boolean}]}],
fun config_checkpoint_reset/3
],
[["config", "reconnect"], [], [], fun config_reconnect/3]
].

config_list(["config", "ls"], [], []) ->
Expand Down Expand Up @@ -342,6 +358,61 @@ config_eui(["config", "eui"], [], Flags) ->
config_eui(_, _, _) ->
usage.

config_counts(["config", "counts"], [], []) ->
c_table([
[
{" Routes ", hpr_route_storage:count()},
{" EUI Pairs ", hpr_eui_pair_storage:count()},
{" SKF ", hpr_skf_storage:count()},
{" DevAddr Ranges ", hpr_devaddr_range_storage:count()}
]
]);
config_counts(_, _, _) ->
usage.

config_checkpoint_next(["config", "checkpoint", "next"], [], []) ->
Msg = hpr_route_stream_worker:print_next_checkpoint(),
c_text(Msg);
config_checkpoint_next(_, _, _) ->
usage.

config_checkpoint_write(["config", "checkpoint", "write"], [], []) ->
case timer:tc(fun() -> hpr_route_stream_worker:do_checkpoint(erlang:system_time(second)) end) of
{Time0, ok} ->
Time = erlang:convert_time_unit(Time0, microsecond, millisecond),
c_text("Wrote checkpoint in ~wms", [Time]);
Other ->
c_text("Something went wrong: ~p", [Other])
end;
config_checkpoint_write(_, _, _) ->
usage.

config_checkpoint_reset(["config", "checkpoint", "reset"], [], Flags) ->
Options = maps:from_list(Flags),
case maps:is_key(commit, Options) of
true ->
case hpr_route_stream_worker:reset_timestamp() of
ok ->
c_text("Checkpoint reset");
Err ->
c_text("Something went wrong:~n~p", [Err])
end;
false ->
c_text("Must specify --commit to reset checkpoint")
end;
config_checkpoint_reset(_, _, _) ->
usage.

config_reconnect(["config", "reconnect"], [], []) ->
case hpr_route_stream_worker:reset_connection() of
ok ->
c_text("Reconnected");
Err ->
c_text("Something went wrong:~n~p", [Err])
end;
config_reconnect(_, _, _) ->
usage.

do_config_eui(AppEUI, DevEUI) ->
AppEUINum = erlang:list_to_integer(AppEUI, 16),
DevEUINum = erlang:list_to_integer(DevEUI, 16),
Expand Down
60 changes: 48 additions & 12 deletions src/grpc/iot_config/hpr_devaddr_range_storage.erl
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

-export([
init_ets/0,
checkpoint/0,

lookup/1,
insert/1,
Expand All @@ -20,15 +21,23 @@
-export([test_delete_ets/0, test_size/0, test_tab_name/0]).
-endif.

-define(ETS_DEVADDR_RANGES, hpr_route_devaddr_ranges_ets).
-define(ETS, hpr_route_devaddr_ranges_ets).
-define(DETS, hpr_route_devaddr_ranges_dets).

-spec init_ets() -> ok.
init_ets() ->
?ETS_DEVADDR_RANGES = ets:new(?ETS_DEVADDR_RANGES, [
?ETS = ets:new(?ETS, [
public, named_table, bag, {read_concurrency, true}
]),
ok = rehydrate_from_dets(),
ok.

-spec checkpoint() -> ok.
checkpoint() ->
with_open_dets(fun() ->
ok = dets:from_ets(?DETS, ?ETS)
end).

-spec lookup(DevAddr :: non_neg_integer()) -> [hpr_route_ets:route()].
lookup(DevAddr) ->
MS = [
Expand All @@ -40,7 +49,7 @@ lookup(DevAddr) ->
['$3']
}
],
RouteIDs = ets:select(?ETS_DEVADDR_RANGES, MS),
RouteIDs = ets:select(?ETS, MS),

lists:usort(
lists:flatten([
Expand All @@ -52,7 +61,7 @@ lookup(DevAddr) ->

-spec insert(DevAddrRange :: hpr_devaddr_range:devaddr_range()) -> ok.
insert(DevAddrRange) ->
true = ets:insert(?ETS_DEVADDR_RANGES, [
true = ets:insert(?ETS, [
{
{hpr_devaddr_range:start_addr(DevAddrRange), hpr_devaddr_range:end_addr(DevAddrRange)},
hpr_devaddr_range:route_id(DevAddrRange)
Expand All @@ -70,7 +79,7 @@ insert(DevAddrRange) ->

-spec delete(DevAddrRange :: hpr_devaddr_range:devaddr_range()) -> ok.
delete(DevAddrRange) ->
true = ets:delete_object(?ETS_DEVADDR_RANGES, {
true = ets:delete_object(?ETS, {
{hpr_devaddr_range:start_addr(DevAddrRange), hpr_devaddr_range:end_addr(DevAddrRange)},
hpr_devaddr_range:route_id(DevAddrRange)
}),
Expand All @@ -86,23 +95,23 @@ delete(DevAddrRange) ->

-spec delete_all() -> ok.
delete_all() ->
ets:delete_all_objects(?ETS_DEVADDR_RANGES),
ets:delete_all_objects(?ETS),
ok.

-ifdef(TEST).

-spec test_delete_ets() -> ok.
test_delete_ets() ->
ets:delete(?ETS_DEVADDR_RANGES),
ets:delete(?ETS),
ok.

-spec test_size() -> non_neg_integer().
test_size() ->
ets:info(?ETS_DEVADDR_RANGES, size).
ets:info(?ETS, size).

-spec test_tab_name() -> atom().
test_tab_name() ->
?ETS_DEVADDR_RANGES.
?ETS.

-endif.

Expand All @@ -114,12 +123,12 @@ test_tab_name() ->
list({non_neg_integer(), non_neg_integer()}).
lookup_for_route(RouteID) ->
MS = [{{{'$1', '$2'}, RouteID}, [], [{{'$1', '$2'}}]}],
ets:select(?ETS_DEVADDR_RANGES, MS).
ets:select(?ETS, MS).

-spec count_for_route(RouteID :: hpr_route:id()) -> non_neg_integer().
count_for_route(RouteID) ->
MS = [{{'_', RouteID}, [], [true]}],
ets:select_count(?ETS_DEVADDR_RANGES, MS).
ets:select_count(?ETS, MS).

%% -------------------------------------------------------------------
%% Route Stream Helpers
Expand All @@ -128,7 +137,7 @@ count_for_route(RouteID) ->
-spec delete_route(hpr_route:id()) -> non_neg_integer().
delete_route(RouteID) ->
MS1 = [{{'_', RouteID}, [], [true]}],
ets:select_delete(?ETS_DEVADDR_RANGES, MS1).
ets:select_delete(?ETS, MS1).

-spec replace_route(
RouteID :: hpr_route:id(),
Expand All @@ -138,3 +147,30 @@ replace_route(RouteID, DevAddrRanges) ->
Removed = hpr_devaddr_range_storage:delete_route(RouteID),
lists:foreach(fun ?MODULE:insert/1, DevAddrRanges),
Removed.

-spec rehydrate_from_dets() -> ok.
rehydrate_from_dets() ->
with_open_dets(fun() ->
case dets:to_ets(?DETS, ?ETS) of
{error, _Reason} ->
lager:error("failed ot hydrate ets: ~p", [_Reason]);
_ ->
lager:info("ets hydrated")
end
end).

-spec with_open_dets(FN :: fun()) -> ok.
with_open_dets(FN) ->
DataDir = hpr_utils:base_data_dir(),
DETSFile = filename:join([DataDir, "hpr_devaddr_range_storage.dets"]),
ok = filelib:ensure_dir(DETSFile),

case dets:open_file(?DETS, [{file, DETSFile}, {type, bag}]) of
{ok, _Dets} ->
FN(),
dets:close(?DETS);
{error, Reason} ->
Deleted = file:delete(DETSFile),
lager:warning("failed to open dets file ~p: ~p, deleted: ~p", [?MODULE, Reason, Deleted]),
with_open_dets(FN)
end.
Loading

0 comments on commit 0b1ef6e

Please sign in to comment.