Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

location improvement #278

Merged
merged 5 commits into from
Dec 5, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions include/hpr.hrl
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
-define(DATA_DIR, "/var/data").

-define(IOT_CONFIG_CHANNEL, iot_config_channel).
-define(LOCATION_CHANNEL, location_channel).
-define(DOWNLINK_CHANNEL, downlink_channel).
-define(MULTI_BUY_CHANNEL, multi_buy_channel).

Expand Down
145 changes: 116 additions & 29 deletions src/grpc/iot_config/hpr_gateway_location.erl
Original file line number Diff line number Diff line change
Expand Up @@ -5,34 +5,67 @@
%%%-------------------------------------------------------------------
-module(hpr_gateway_location).

-behaviour(gen_server).

-include("hpr.hrl").
-include("../autogen/iot_config_pb.hrl").

%% ------------------------------------------------------------------
%% API Function Exports
%% ------------------------------------------------------------------
-export([
start_link/1,
init/0,
get/1,
update_location/1,
expire_locations/0
]).

%% ------------------------------------------------------------------
%% gen_server Function Exports
%% ------------------------------------------------------------------
-export([
init/1,
handle_call/3,
handle_cast/2,
handle_info/2,
terminate/2
]).

-define(SERVER, ?MODULE).
-define(ETS, hpr_gateway_location_ets).
-define(DETS, hpr_gateway_location_dets).
-define(DEFAULT_DETS_FILE, "hpr_gateway_location_dets").
-define(CLEANUP_INTERVAL, timer:hours(1)).
-define(CACHE_TIME, timer:hours(24)).
-define(ERROR_CACHE_TIME, timer:hours(1)).
-define(NOT_FOUND, not_found).
-define(REQUESTED, requested).

-record(state, {}).

-record(location, {
status :: ok | ?NOT_FOUND | error | ?REQUESTED,
gateway :: libp2p_crypto:pubkey_bin(),
timestamp :: non_neg_integer(),
h3_index :: h3:index() | undefined,
lat :: float() | undefined,
long :: float() | undefined
h3_index = undefined :: h3:index() | undefined,
lat = undefined :: float() | undefined,
long = undefined :: float() | undefined
}).

-type state() :: #state{}.
-type loc() :: {h3:index(), float(), float()} | undefined.

-export_type([loc/0]).

%% ------------------------------------------------------------------
%%% API Function Definitions
%% ------------------------------------------------------------------

-spec start_link(map()) -> any().
start_link(Args) ->
gen_server:start_link({local, ?SERVER}, ?SERVER, Args, []).

-spec init() -> ok.
init() ->
?ETS = ets:new(?ETS, [
Expand All @@ -54,62 +87,110 @@ init() ->

-spec get(libp2p_crypto:pubkey_bin()) -> {ok, h3:index(), float(), float()} | {error, any()}.
get(PubKeyBin) ->
Yesterday = erlang:system_time(millisecond) - ?CACHE_TIME,
Now = erlang:system_time(millisecond),
Yesterday = Now - ?CACHE_TIME,
LastHour = Now - ?ERROR_CACHE_TIME,
case ets:lookup(?ETS, PubKeyBin) of
[] ->
update_location(PubKeyBin);
[#location{timestamp = T}] when T < Yesterday ->
update_location(PubKeyBin);
[#location{h3_index = undefined}] ->
ok = ?MODULE:update_location(PubKeyBin),
{error, ?NOT_FOUND};
[#location{status = ok, timestamp = T, h3_index = H3Index, lat = Lat, long = Long}] when
T < Yesterday
->
ok = ?MODULE:update_location(PubKeyBin),
{ok, H3Index, Lat, Long};
[#location{status = _, timestamp = T}] when T < Yesterday ->
ok = ?MODULE:update_location(PubKeyBin),
{error, ?NOT_FOUND};
[#location{h3_index = H3Index, lat = Lat, long = Long}] ->
[#location{status = error, timestamp = T}] when T < LastHour ->
ok = ?MODULE:update_location(PubKeyBin),
{error, undefined};
[#location{status = error}] ->
{error, undefined};
[#location{status = requested, timestamp = T, gateway = PubKeyBin}] when T < LastHour ->
GatewayName = hpr_utils:gateway_name(PubKeyBin),
lager:warning("got an old request for ~p ~s", [PubKeyBin, GatewayName]),
ok = ?MODULE:update_location(PubKeyBin),
{error, ?REQUESTED};
[#location{status = requested}] ->
{error, ?REQUESTED};
[#location{status = ?NOT_FOUND}] ->
{error, ?NOT_FOUND};
[#location{status = ok, h3_index = H3Index, lat = Lat, long = Long}] ->
{ok, H3Index, Lat, Long}
end.

-spec update_location(libp2p_crypto:pubkey_bin()) -> ok.
update_location(PubKeyBin) ->
gen_server:cast(?SERVER, {update_location, PubKeyBin}).

-spec expire_locations() -> ok.
expire_locations() ->
Time = erlang:system_time(millisecond) - ?CACHE_TIME,
DETSDeleted = dets:select_delete(?DETS, [
{{'_', '_', '$3', '_', '_', '_'}, [{'<', '$3', Time}], [true]}
{{'_', '_', '_', '$3', '_', '_', '_'}, [{'<', '$3', Time}], [true]}
]),
lager:info("expiring ~w dets keys", [DETSDeleted]).

% ------------------------------------------------------------------
%%% gen_server Function Definitions
%% ------------------------------------------------------------------
%% Internal Function Definitions
%% ------------------------------------------------------------------
-spec update_location(libp2p_crypto:pubkey_bin()) ->
{ok, h3:index(), float(), float()} | {error, any()}.
update_location(PubKeyBin) ->
Start = erlang:system_time(millisecond),
-spec init(map()) -> {ok, state()}.
init(_Args) ->
{ok, #state{}}.

handle_call(_Msg, _From, State) ->
{reply, ok, State}.

handle_cast({update_location, PubKeyBin}, State) ->
NewLoc = #location{
status = ?REQUESTED,
gateway = PubKeyBin,
timestamp = erlang:system_time(millisecond),
h3_index = undefined,
lat = undefined,
long = undefined
timestamp = erlang:system_time(millisecond)
},
true = ets:insert(?ETS, NewLoc),
Start = erlang:system_time(millisecond),
case get_location_from_ics(PubKeyBin) of
{error, ?NOT_FOUND} ->
hpr_metrics:observe_gateway_location(Start, ?NOT_FOUND),
GatewayName = hpr_utils:gateway_name(PubKeyBin),
lager:info(
"fail to get_location_from_ics ~p for ~s",
[?NOT_FOUND, GatewayName]
),
ok = insert(NewLoc#location{status = ?NOT_FOUND});
{error, Reason} ->
hpr_metrics:observe_gateway_location(Start, error),
GatewauName = hpr_utils:gateway_name(PubKeyBin),
GatewayName = hpr_utils:gateway_name(PubKeyBin),
lager:warning(
"fail to get_location_from_ics ~p for ~s",
[Reason, GatewauName]
[Reason, GatewayName]
),
ok = insert(NewLoc),
{error, not_found};
ok = insert(NewLoc#location{status = error});
{ok, H3IndexString} ->
hpr_metrics:observe_gateway_location(Start, ok),
H3Index = h3:from_string(H3IndexString),
{Lat, Long} = h3:to_geo(H3Index),
ok = insert(NewLoc#location{
status = ok,
h3_index = H3Index,
lat = Lat,
long = Long
}),
{ok, H3Index, Lat, Long}
end.
})
end,
{noreply, State};
handle_cast(_Msg, State) ->
{noreply, State}.

handle_info(_Info, State) ->
{noreply, State}.

terminate(_Reason, _state) ->
ok.

%% ------------------------------------------------------------------
%% Internal Function Definitions
%% ------------------------------------------------------------------
-spec insert(Loc :: #location{}) -> ok.
insert(Loc) ->
true = ets:insert(?ETS, Loc),
Expand All @@ -131,11 +212,17 @@ get_location_from_ics(PubKeyBin) ->
SignedReq = Req#iot_config_gateway_location_req_v1_pb{signature = SigFun(EncodedReq)},
case
helium_iot_config_gateway_client:location(SignedReq, #{
channel => ?IOT_CONFIG_CHANNEL
channel => ?LOCATION_CHANNEL
})
of
{error, {Status, Reason}, _} when is_binary(Status) ->
{error, {grpcbox_utils:status_to_string(Status), Reason}};
StringStatus = grpcbox_utils:status_to_string(Status),
case StringStatus of
<<"NOT_FOUND">> ->
{error, ?NOT_FOUND};
_ ->
{error, {StringStatus, Reason}}
end;
{grpc_error, Reason} ->
{error, Reason};
{error, Reason} ->
Expand Down
3 changes: 3 additions & 0 deletions src/hpr_sup.erl
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ init([]) ->
%% Starting config service client channel here because of the way we get
%% .env vars into the app.
_ = maybe_start_channel(ConfigServiceConfig, ?IOT_CONFIG_CHANNEL),
_ = maybe_start_channel(ConfigServiceConfig, ?LOCATION_CHANNEL),
_ = maybe_start_channel(DownlinkServiceConfig, ?DOWNLINK_CHANNEL),
_ = maybe_start_channel(MultiBuyServiceConfig, ?MULTI_BUY_CHANNEL),

Expand All @@ -84,6 +85,8 @@ init([]) ->

?WORKER(hpr_packet_reporter, [PacketReporterConfig]),

?WORKER(hpr_gateway_location, [#{}]),

?WORKER(hpr_route_stream_worker, [#{}]),

?WORKER(hpr_protocol_router, [#{}]),
Expand Down
2 changes: 1 addition & 1 deletion src/metrics/hpr_metrics.erl
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ ics_update(Type, Action) ->

-spec observe_gateway_location(
Start :: non_neg_integer(),
Status :: ok | error
Status :: ok | error | not_found
) -> ok.
observe_gateway_location(Start, Status) ->
prometheus_histogram:observe(
Expand Down
26 changes: 22 additions & 4 deletions test/hpr_gateway_location_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,16 @@
main_test/1
]).

-define(NOT_FOUND, not_found).
-define(REQUESTED, requested).

-record(location, {
status :: ok | ?NOT_FOUND | error | ?REQUESTED,
gateway :: libp2p_crypto:pubkey_bin(),
timestamp :: non_neg_integer(),
h3_index :: h3:index() | undefined,
lat :: float() | undefined,
long :: float() | undefined
h3_index = undefined :: h3:index() | undefined,
lat = undefined :: float() | undefined,
long = undefined :: float() | undefined
}).

%%--------------------------------------------------------------------
Expand Down Expand Up @@ -65,8 +69,20 @@ main_test(_Config) ->
),
{ExpectedLat, ExpectedLong} = h3:to_geo(ExpectedIndex),

%% Make request to get gateway location
%% The location update is now async
Before = erlang:system_time(millisecond) - 1,

%% Wait until the location has been fetched
ok = test_utils:wait_until(fun() ->
case hpr_gateway_location:get(PubKeyBin1) of
{ok, _, _, _} ->
true;
Other ->
{false, Other}
end
end),

%% Make request to get gateway location
?assertEqual(
{ok, ExpectedIndex, ExpectedLat, ExpectedLong}, hpr_gateway_location:get(PubKeyBin1)
),
Expand All @@ -77,6 +93,7 @@ main_test(_Config) ->

%% Verify ETS data
[ETSLocationRec] = ets:lookup(hpr_gateway_location_ets, PubKeyBin1),
?assertEqual(ok, ETSLocationRec#location.status),
?assertEqual(PubKeyBin1, ETSLocationRec#location.gateway),
?assertEqual(ExpectedIndex, ETSLocationRec#location.h3_index),
?assertEqual(ExpectedLat, ETSLocationRec#location.lat),
Expand All @@ -86,6 +103,7 @@ main_test(_Config) ->

%% Verify DETS data
[DETSLocationRec] = dets:lookup(hpr_gateway_location_dets, PubKeyBin1),
?assertEqual(ok, DETSLocationRec#location.status),
?assertEqual(PubKeyBin1, DETSLocationRec#location.gateway),
?assertEqual(ExpectedIndex, DETSLocationRec#location.h3_index),
?assertEqual(ExpectedLat, DETSLocationRec#location.lat),
Expand Down
2 changes: 1 addition & 1 deletion test/hpr_metrics_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,7 @@ main_test(_Config) ->
fun() ->
undefined =/=
prometheus_histogram:value(?METRICS_ICS_GATEWAY_LOCATION_HISTOGRAM, [
error
not_found
])
end
),
Expand Down
11 changes: 11 additions & 0 deletions test/hpr_protocol_gwmp_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,17 @@ with_location_test(_Config) ->
h3_index_str => IndexString
}),

%% Wait until the location has been fetched
PubKeyBin = hpr_test_gateway:pubkey_bin(GatewayPid),
ok = test_utils:wait_until(fun() ->
case hpr_gateway_location:get(PubKeyBin) of
{ok, _, _, _} ->
true;
Other ->
{false, Other}
end
end),

%% Send packet and route directly through interface
ok = hpr_test_gateway:send_packet(GatewayPid, #{}),

Expand Down
10 changes: 10 additions & 0 deletions test/hpr_protocol_http_roaming_packet_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -760,6 +760,16 @@ uplink_with_gateway_location_test(_Config) ->
IndexString
),

%% Trigger an update location and wait until the location has been fetched
ok = test_utils:wait_until(fun() ->
case hpr_gateway_location:get(PubKeyBin) of
{ok, _, _, _} ->
true;
Other ->
{false, Other}
end
end),

ok = start_uplink_listener(),

SendPacketFun = fun(DevAddr) ->
Expand Down
Loading