Skip to content

Commit

Permalink
Macpie/gateway location (#273)
Browse files Browse the repository at this point in the history
* Basic gateway location worker

* Redo data dir

* IMprove CT

* Use hpr_gateway_location

* Fix xref

* Fix eunits

* Fix CTs

* Rly fix cts

* Add gateway location to gwmp

* Add location info to http roaming

* Add some stats around gateway location calls
  • Loading branch information
macpie authored Nov 28, 2023
1 parent 4f3d053 commit 43c7047
Show file tree
Hide file tree
Showing 30 changed files with 855 additions and 189 deletions.
5 changes: 3 additions & 2 deletions config/ct.config
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[
{hpr, [
{key, "data/hpr.key"},
{data_dir, "data"},
{packet_reporter, #{
aws_bucket => <<"test-bucket">>,
aws_bucket_region => <<"local">>,
Expand Down Expand Up @@ -64,7 +64,8 @@
grpc_opts => #{
service_protos => [iot_config_pb],
services => #{
'helium.iot_config.route' => hpr_test_iot_config_service_route
'helium.iot_config.gateway' => hpr_test_ics_gateway_service,
'helium.iot_config.route' => hpr_test_ics_route_service
}
},
transport_opts => #{ssl => false},
Expand Down
2 changes: 1 addition & 1 deletion config/sys.config
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[
{hpr, [
{key, "/var/data/hpr.key"},
{data_dir, "/var/data"},
{no_routes, [{"localhost", 8080}, {"localhost", 8080}]},
{packet_reporter, #{
aws_bucket => <<"test-bucket">>,
Expand Down
2 changes: 1 addition & 1 deletion config/sys.config.src
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[
{hpr, [
{key, "/var/data/hpr.key"},
{data_dir, "/var/data"},
{http_roaming_sender_nsid, <<"${HPR_ROAMING_SENDER_NSID}">>},
{no_routes, [{"${HPR_PP_HOST_1}", 8080}, {"${HPR_PP_HOST_2}", 8080}]},
{packet_reporter, #{
Expand Down
2 changes: 2 additions & 0 deletions include/hpr.hrl
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
-define(APP, hpr).

-define(DATA_DIR, "/var/data").

-define(IOT_CONFIG_CHANNEL, iot_config_channel).
-define(DOWNLINK_CHANNEL, downlink_channel).
-define(MULTI_BUY_CHANNEL, multi_buy_channel).
Expand Down
7 changes: 6 additions & 1 deletion include/hpr_metrics.hrl
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@
-define(METRICS_VM_ETS_MEMORY, "hpr_vm_ets_memory").
-define(METRICS_VM_PROC_Q, "hpr_vm_process_queue").
-define(METRICS_ICS_UPDATES_COUNTER, "hpr_iot_config_service_updates_counter").
-define(METRICS_ICS_GATEWAY_LOCATION_HISTOGRAM,
"hpr_iot_config_service_gateway_location_histogram"
).

-define(METRICS, [
{?METRICS_GRPC_CONNECTION_GAUGE, prometheus_gauge, [], "Number of active GRPC Connections"},
Expand All @@ -36,5 +39,7 @@
{?METRICS_FIND_ROUTES_HISTOGRAM, prometheus_histogram, [], "Find Routes"},
{?METRICS_VM_ETS_MEMORY, prometheus_gauge, [name], "HPR ets memory"},
{?METRICS_VM_PROC_Q, prometheus_gauge, [name], "Process queue"},
{?METRICS_ICS_UPDATES_COUNTER, prometheus_counter, [type, action], "ICS updates counter"}
{?METRICS_ICS_UPDATES_COUNTER, prometheus_counter, [type, action], "ICS updates counter"},
{?METRICS_ICS_GATEWAY_LOCATION_HISTOGRAM, prometheus_histogram, [status],
"ICS gateway location req"}
]).
3 changes: 2 additions & 1 deletion rebar.config
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,8 @@
{iso8601, ".*", {git, "https://github.com/erlsci/iso8601.git", {tag, "1.3.1"}}},
{xxhash, {git, "https://github.com/pierreis/erlang-xxhash", {branch, "master"}}},
{erl_angry_purple_tiger,
{git, "https://github.com/helium/erl_angry_purple_tiger.git", {branch, "master"}}}
{git, "https://github.com/helium/erl_angry_purple_tiger.git", {branch, "master"}}},
{h3, ".*", {git, "https://github.com/helium/erlang-h3.git", {branch, "master"}}}
]}.

{erl_opts, [
Expand Down
4 changes: 4 additions & 0 deletions rebar.lock
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,10 @@
{git,"https://github.com/novalabsxyz/grpcbox.git",
{ref,"6243151a7c54c714a018b3d0b92dbf057c033730"}},
0},
{<<"h3">>,
{git,"https://github.com/helium/erlang-h3.git",
{ref,"90e1b6ebf93f88702ce8d24d9142833a8401e3ab"}},
0},
{<<"hackney">>,{pkg,<<"hackney">>,<<"1.17.0">>},0},
{<<"helium_proto">>,
{git,"https://github.com/helium/proto.git",
Expand Down
159 changes: 159 additions & 0 deletions src/grpc/iot_config/hpr_gateway_location.erl
Original file line number Diff line number Diff line change
@@ -0,0 +1,159 @@
%%%-------------------------------------------------------------------
%% @doc
%% == HPR Gateway Location ==
%% @end
%%%-------------------------------------------------------------------
-module(hpr_gateway_location).

-include("hpr.hrl").
-include("../autogen/iot_config_pb.hrl").

-export([
init/0,
get/1,
expire_locations/0
]).

-define(ETS, hpr_gateway_location_ets).
-define(DETS, hpr_gateway_location_dets).
-define(DEFAULT_DETS_FILE, "hpr_gateway_location_dets").
-define(CLEANUP_INTERVAL, timer:hours(1)).
-define(CACHE_TIME, timer:hours(24)).
-define(NOT_FOUND, not_found).

-record(location, {
gateway :: libp2p_crypto:pubkey_bin(),
timestamp :: non_neg_integer(),
h3_index :: h3:index() | undefined,
lat :: float() | undefined,
long :: float() | undefined
}).

-type loc() :: {h3:index(), float(), float()} | undefined.

-export_type([loc/0]).

-spec init() -> ok.
init() ->
?ETS = ets:new(?ETS, [
public,
named_table,
set,
{read_concurrency, true},
{keypos, #location.gateway}
]),
ok = open_dets(),
case dets:to_ets(?DETS, ?ETS) of
{error, _Reason} ->
lager:error("failed to hydrate ets ~p", [_Reason]);
_ ->
lager:info("ets hydrated")
end,
{ok, _} = timer:apply_interval(?CLEANUP_INTERVAL, ?MODULE, expire_locations, []),
ok.

-spec get(libp2p_crypto:pubkey_bin()) -> {ok, h3:index(), float(), float()} | {error, any()}.
get(PubKeyBin) ->
Yesterday = erlang:system_time(millisecond) - ?CACHE_TIME,
case ets:lookup(?ETS, PubKeyBin) of
[] ->
update_location(PubKeyBin);
[#location{timestamp = T}] when T < Yesterday ->
update_location(PubKeyBin);
[#location{h3_index = undefined}] ->
{error, ?NOT_FOUND};
[#location{h3_index = H3Index, lat = Lat, long = Long}] ->
{ok, H3Index, Lat, Long}
end.

-spec expire_locations() -> ok.
expire_locations() ->
Time = erlang:system_time(millisecond) - ?CACHE_TIME,
DETSDeleted = dets:select_delete(?DETS, [
{{'_', '_', '$3', '_', '_', '_'}, [{'<', '$3', Time}], [true]}
]),
lager:info("expiring ~w dets keys", [DETSDeleted]).

%% ------------------------------------------------------------------
%% Internal Function Definitions
%% ------------------------------------------------------------------
-spec update_location(libp2p_crypto:pubkey_bin()) ->
{ok, h3:index(), float(), float()} | {error, any()}.
update_location(PubKeyBin) ->
Start = erlang:system_time(millisecond),
NewLoc = #location{
gateway = PubKeyBin,
timestamp = erlang:system_time(millisecond),
h3_index = undefined,
lat = undefined,
long = undefined
},
case get_location_from_ics(PubKeyBin) of
{error, Reason} ->
hpr_metrics:observe_gateway_location(Start, error),
GatewauName = hpr_utils:gateway_name(PubKeyBin),
lager:warning(
"fail to get_location_from_ics ~p for ~s",
[Reason, GatewauName]
),
ok = insert(NewLoc),
{error, not_found};
{ok, H3IndexString} ->
hpr_metrics:observe_gateway_location(Start, ok),
H3Index = h3:from_string(H3IndexString),
{Lat, Long} = h3:to_geo(H3Index),
ok = insert(NewLoc#location{
h3_index = H3Index,
lat = Lat,
long = Long
}),
{ok, H3Index, Lat, Long}
end.

-spec insert(Loc :: #location{}) -> ok.
insert(Loc) ->
true = ets:insert(?ETS, Loc),
_ = erlang:spawn(dets, insert, [?DETS, Loc]),
ok.

%% We have to do this because the call to `helium_iot_config_gateway_client:location` can return
%% `{error, {Status, Reason}, _}` but is not in the spec...
-dialyzer({nowarn_function, get_location_from_ics/1}).
-spec get_location_from_ics(PubKeyBin :: libp2p_crypto:pubkey_bin()) ->
{ok, string()} | {error, any()}.
get_location_from_ics(PubKeyBin) ->
SigFun = hpr_utils:sig_fun(),
Req = #iot_config_gateway_location_req_v1_pb{
gateway = PubKeyBin,
signer = hpr_utils:pubkey_bin()
},
EncodedReq = iot_config_pb:encode_msg(Req, iot_config_gateway_location_req_v1_pb),
SignedReq = Req#iot_config_gateway_location_req_v1_pb{signature = SigFun(EncodedReq)},
case
helium_iot_config_gateway_client:location(SignedReq, #{
channel => ?IOT_CONFIG_CHANNEL
})
of
{error, {Status, Reason}, _} when is_binary(Status) ->
{error, {grpcbox_utils:status_to_string(Status), Reason}};
{grpc_error, Reason} ->
{error, Reason};
{error, Reason} ->
{error, Reason};
{ok, #iot_config_gateway_location_res_v1_pb{location = Location}, _Meta} ->
{ok, Location}
end.

-spec open_dets() -> ok.
open_dets() ->
DataDir = hpr_utils:base_data_dir(),
DETSFile = filename:join(DataDir, ?DEFAULT_DETS_FILE),
ok = filelib:ensure_dir(DETSFile),
case dets:open_file(?DETS, [{file, DETSFile}, {keypos, #location.gateway}]) of
{error, _Reason} ->
Deleted = file:delete(DETSFile),
lager:error("failed to open dets ~p deleting file ~p", [_Reason, Deleted]),
open_dets();
{ok, _} ->
ok
end.
7 changes: 6 additions & 1 deletion src/grpc/packet_router/hpr_packet_router_service.erl
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ register(PubKeyBin) ->
{error, not_found} ->
true = gproc:add_local_name(?REG_KEY(PubKeyBin)),
lager:debug("register"),
hpr_protocol_router:register(PubKeyBin, Self),
ok = hpr_protocol_router:register(PubKeyBin, Self),
ok;
{ok, Self} ->
lager:info("nothing to do, already registered"),
Expand Down Expand Up @@ -148,6 +148,8 @@ handle_register(Reg, StreamState0) ->
{stop, StreamState0};
true ->
ok = ?MODULE:register(PubKeyBin),
%% Atttempt to get location from ICS to pre-cache data
_ = hpr_gateway_location:get(PubKeyBin),
HandlerState = grpcbox_stream:stream_handler_state(StreamState0),
StreamState1 = grpcbox_stream:stream_handler_state(
StreamState0, HandlerState#handler_state{pubkey_bin = PubKeyBin}
Expand Down Expand Up @@ -267,6 +269,8 @@ route_packet_test() ->
route_register_test() ->
meck:new(hpr_metrics, [passthrough]),
meck:expect(hpr_metrics, observe_grpc_connection, fun(_, _) -> ok end),
meck:new(hpr_gateway_location, [passthrough]),
meck:expect(hpr_gateway_location, get, fun(_) -> ok end),
application:ensure_all_started(gproc),

Self = self(),
Expand Down Expand Up @@ -297,6 +301,7 @@ route_register_test() ->

application:stop(gproc),
meck:unload(hpr_metrics),
meck:unload(hpr_gateway_location),
ok.

handle_info_test() ->
Expand Down
Loading

0 comments on commit 43c7047

Please sign in to comment.