From eb29efa1fa5613736233cb358bbbab32a8714ef5 Mon Sep 17 00:00:00 2001 From: Macpie Date: Mon, 4 Dec 2023 13:20:18 -0800 Subject: [PATCH 1/5] * Location now has its own channel * Requests are done in order --- include/hpr.hrl | 1 + src/grpc/iot_config/hpr_gateway_location.erl | 144 ++++++++++++++---- src/hpr_sup.erl | 3 + src/metrics/hpr_metrics.erl | 2 +- test/hpr_gateway_location_SUITE.erl | 20 ++- test/hpr_metrics_SUITE.erl | 2 +- ...hpr_protocol_http_roaming_packet_SUITE.erl | 5 + 7 files changed, 142 insertions(+), 35 deletions(-) diff --git a/include/hpr.hrl b/include/hpr.hrl index 7b7f146d..ae7184f0 100644 --- a/include/hpr.hrl +++ b/include/hpr.hrl @@ -3,6 +3,7 @@ -define(DATA_DIR, "/var/data"). -define(IOT_CONFIG_CHANNEL, iot_config_channel). +-define(LOCATION_CHANNEL, location_channel). -define(DOWNLINK_CHANNEL, downlink_channel). -define(MULTI_BUY_CHANNEL, multi_buy_channel). diff --git a/src/grpc/iot_config/hpr_gateway_location.erl b/src/grpc/iot_config/hpr_gateway_location.erl index 9e38436a..55389c63 100644 --- a/src/grpc/iot_config/hpr_gateway_location.erl +++ b/src/grpc/iot_config/hpr_gateway_location.erl @@ -5,34 +5,67 @@ %%%------------------------------------------------------------------- -module(hpr_gateway_location). +-behaviour(gen_server). + -include("hpr.hrl"). -include("../autogen/iot_config_pb.hrl"). +%% ------------------------------------------------------------------ +%% API Function Exports +%% ------------------------------------------------------------------ -export([ + start_link/1, init/0, get/1, + update_location/1, expire_locations/0 ]). +%% ------------------------------------------------------------------ +%% gen_server Function Exports +%% ------------------------------------------------------------------ +-export([ + init/1, + handle_call/3, + handle_cast/2, + handle_info/2, + terminate/2 +]). + +-define(SERVER, ?MODULE). -define(ETS, hpr_gateway_location_ets). -define(DETS, hpr_gateway_location_dets). -define(DEFAULT_DETS_FILE, "hpr_gateway_location_dets"). -define(CLEANUP_INTERVAL, timer:hours(1)). -define(CACHE_TIME, timer:hours(24)). +-define(ERROR_CACHE_TIME, timer:hours(1)). -define(NOT_FOUND, not_found). +-define(REQUESTED, requested). + +-record(state, {}). -record(location, { + status :: ok | ?NOT_FOUND | error | ?REQUESTED, gateway :: libp2p_crypto:pubkey_bin(), timestamp :: non_neg_integer(), - h3_index :: h3:index() | undefined, - lat :: float() | undefined, - long :: float() | undefined + h3_index = undefined :: h3:index() | undefined, + lat = undefined :: float() | undefined, + long = undefined :: float() | undefined }). +-type state() :: #state{}. -type loc() :: {h3:index(), float(), float()} | undefined. -export_type([loc/0]). +%% ------------------------------------------------------------------ +%%% API Function Definitions +%% ------------------------------------------------------------------ + +-spec start_link(map()) -> any(). +start_link(Args) -> + gen_server:start_link({local, ?SERVER}, ?SERVER, Args, []). + -spec init() -> ok. init() -> ?ETS = ets:new(?ETS, [ @@ -54,62 +87,109 @@ init() -> -spec get(libp2p_crypto:pubkey_bin()) -> {ok, h3:index(), float(), float()} | {error, any()}. get(PubKeyBin) -> - Yesterday = erlang:system_time(millisecond) - ?CACHE_TIME, + Now = erlang:system_time(millisecond), + Yesterday = Now - ?CACHE_TIME, + LastHour = Now - ?ERROR_CACHE_TIME, case ets:lookup(?ETS, PubKeyBin) of [] -> - update_location(PubKeyBin); - [#location{timestamp = T}] when T < Yesterday -> - update_location(PubKeyBin); - [#location{h3_index = undefined}] -> + ok = ?MODULE:update_location(PubKeyBin), + {error, ?NOT_FOUND}; + [#location{status = ok, timestamp = T, h3_index = H3Index, lat = Lat, long = Long}] when + T < Yesterday + -> + ok = ?MODULE:update_location(PubKeyBin), + {ok, H3Index, Lat, Long}; + [#location{status = _, timestamp = T}] when T < Yesterday -> + ok = ?MODULE:update_location(PubKeyBin), {error, ?NOT_FOUND}; - [#location{h3_index = H3Index, lat = Lat, long = Long}] -> + [#location{status = error, timestamp = T}] when T < LastHour -> + ok = ?MODULE:update_location(PubKeyBin), + {error, undefined}; + [#location{status = error}] -> + {error, undefined}; + [#location{status = requested, timestamp = T}] when T < LastHour -> + %% LOG + ok = ?MODULE:update_location(PubKeyBin), + {error, ?REQUESTED}; + [#location{status = requested}] -> + {error, ?REQUESTED}; + [#location{status = ?NOT_FOUND}] -> + {error, ?NOT_FOUND}; + [#location{status = ok, h3_index = H3Index, lat = Lat, long = Long}] -> {ok, H3Index, Lat, Long} end. +-spec update_location(libp2p_crypto:pubkey_bin()) -> ok. +update_location(PubKeyBin) -> + gen_server:cast(?SERVER, {update_location, PubKeyBin}). + -spec expire_locations() -> ok. expire_locations() -> Time = erlang:system_time(millisecond) - ?CACHE_TIME, DETSDeleted = dets:select_delete(?DETS, [ - {{'_', '_', '$3', '_', '_', '_'}, [{'<', '$3', Time}], [true]} + {{'_', '_', '_', '$3', '_', '_', '_'}, [{'<', '$3', Time}], [true]} ]), lager:info("expiring ~w dets keys", [DETSDeleted]). +% ------------------------------------------------------------------ +%%% gen_server Function Definitions %% ------------------------------------------------------------------ -%% Internal Function Definitions -%% ------------------------------------------------------------------ --spec update_location(libp2p_crypto:pubkey_bin()) -> - {ok, h3:index(), float(), float()} | {error, any()}. -update_location(PubKeyBin) -> - Start = erlang:system_time(millisecond), +-spec init(map()) -> {ok, state()}. +init(_Args) -> + {ok, #state{}}. + +handle_call(_Msg, _From, State) -> + {reply, ok, State}. + +handle_cast({update_location, PubKeyBin}, State) -> NewLoc = #location{ + status = ?REQUESTED, gateway = PubKeyBin, - timestamp = erlang:system_time(millisecond), - h3_index = undefined, - lat = undefined, - long = undefined + timestamp = erlang:system_time(millisecond) }, + true = ets:insert(?ETS, NewLoc), + Start = erlang:system_time(millisecond), case get_location_from_ics(PubKeyBin) of + {error, ?NOT_FOUND} -> + hpr_metrics:observe_gateway_location(Start, ?NOT_FOUND), + GatewayName = hpr_utils:gateway_name(PubKeyBin), + lager:info( + "fail to get_location_from_ics ~p for ~s", + [?NOT_FOUND, GatewayName] + ), + ok = insert(NewLoc#location{status = ?NOT_FOUND}); {error, Reason} -> hpr_metrics:observe_gateway_location(Start, error), - GatewauName = hpr_utils:gateway_name(PubKeyBin), + GatewayName = hpr_utils:gateway_name(PubKeyBin), lager:warning( "fail to get_location_from_ics ~p for ~s", - [Reason, GatewauName] + [Reason, GatewayName] ), - ok = insert(NewLoc), - {error, not_found}; + ok = insert(NewLoc#location{status = error}); {ok, H3IndexString} -> hpr_metrics:observe_gateway_location(Start, ok), H3Index = h3:from_string(H3IndexString), {Lat, Long} = h3:to_geo(H3Index), ok = insert(NewLoc#location{ + status = ok, h3_index = H3Index, lat = Lat, long = Long - }), - {ok, H3Index, Lat, Long} - end. + }) + end, + {noreply, State}; +handle_cast(_Msg, State) -> + {noreply, State}. + +handle_info(_Info, State) -> + {noreply, State}. + +terminate(_Reason, _state) -> + ok. +%% ------------------------------------------------------------------ +%% Internal Function Definitions +%% ------------------------------------------------------------------ -spec insert(Loc :: #location{}) -> ok. insert(Loc) -> true = ets:insert(?ETS, Loc), @@ -131,11 +211,17 @@ get_location_from_ics(PubKeyBin) -> SignedReq = Req#iot_config_gateway_location_req_v1_pb{signature = SigFun(EncodedReq)}, case helium_iot_config_gateway_client:location(SignedReq, #{ - channel => ?IOT_CONFIG_CHANNEL + channel => ?LOCATION_CHANNEL }) of {error, {Status, Reason}, _} when is_binary(Status) -> - {error, {grpcbox_utils:status_to_string(Status), Reason}}; + StringStatus = grpcbox_utils:status_to_string(Status), + case StringStatus of + <<"NOT_FOUND">> -> + {error, ?NOT_FOUND}; + _ -> + {error, {StringStatus, Reason}} + end; {grpc_error, Reason} -> {error, Reason}; {error, Reason} -> diff --git a/src/hpr_sup.erl b/src/hpr_sup.erl index 1cf59991..769b9bcf 100644 --- a/src/hpr_sup.erl +++ b/src/hpr_sup.erl @@ -69,6 +69,7 @@ init([]) -> %% Starting config service client channel here because of the way we get %% .env vars into the app. _ = maybe_start_channel(ConfigServiceConfig, ?IOT_CONFIG_CHANNEL), + _ = maybe_start_channel(ConfigServiceConfig, ?LOCATION_CHANNEL), _ = maybe_start_channel(DownlinkServiceConfig, ?DOWNLINK_CHANNEL), _ = maybe_start_channel(MultiBuyServiceConfig, ?MULTI_BUY_CHANNEL), @@ -84,6 +85,8 @@ init([]) -> ?WORKER(hpr_packet_reporter, [PacketReporterConfig]), + ?WORKER(hpr_gateway_location, [#{}]), + ?WORKER(hpr_route_stream_worker, [#{}]), ?WORKER(hpr_protocol_router, [#{}]), diff --git a/src/metrics/hpr_metrics.erl b/src/metrics/hpr_metrics.erl index 615ede2b..8c7fd252 100644 --- a/src/metrics/hpr_metrics.erl +++ b/src/metrics/hpr_metrics.erl @@ -129,7 +129,7 @@ ics_update(Type, Action) -> -spec observe_gateway_location( Start :: non_neg_integer(), - Status :: ok | error + Status :: ok | error | not_found ) -> ok. observe_gateway_location(Start, Status) -> prometheus_histogram:observe( diff --git a/test/hpr_gateway_location_SUITE.erl b/test/hpr_gateway_location_SUITE.erl index b6ff3739..557685e3 100644 --- a/test/hpr_gateway_location_SUITE.erl +++ b/test/hpr_gateway_location_SUITE.erl @@ -13,12 +13,16 @@ main_test/1 ]). +-define(NOT_FOUND, not_found). +-define(REQUESTED, requested). + -record(location, { + status :: ok | ?NOT_FOUND | error | ?REQUESTED, gateway :: libp2p_crypto:pubkey_bin(), timestamp :: non_neg_integer(), - h3_index :: h3:index() | undefined, - lat :: float() | undefined, - long :: float() | undefined + h3_index = undefined :: h3:index() | undefined, + lat = undefined :: float() | undefined, + long = undefined :: float() | undefined }). %%-------------------------------------------------------------------- @@ -65,8 +69,14 @@ main_test(_Config) -> ), {ExpectedLat, ExpectedLong} = h3:to_geo(ExpectedIndex), - %% Make request to get gateway location + %% The location update is now async Before = erlang:system_time(millisecond) - 1, + ?assertEqual( + {error, not_found}, hpr_gateway_location:get(PubKeyBin1) + ), + timer:sleep(15), + + %% Make request to get gateway location ?assertEqual( {ok, ExpectedIndex, ExpectedLat, ExpectedLong}, hpr_gateway_location:get(PubKeyBin1) ), @@ -77,6 +87,7 @@ main_test(_Config) -> %% Verify ETS data [ETSLocationRec] = ets:lookup(hpr_gateway_location_ets, PubKeyBin1), + ?assertEqual(ok, ETSLocationRec#location.status), ?assertEqual(PubKeyBin1, ETSLocationRec#location.gateway), ?assertEqual(ExpectedIndex, ETSLocationRec#location.h3_index), ?assertEqual(ExpectedLat, ETSLocationRec#location.lat), @@ -86,6 +97,7 @@ main_test(_Config) -> %% Verify DETS data [DETSLocationRec] = dets:lookup(hpr_gateway_location_dets, PubKeyBin1), + ?assertEqual(ok, DETSLocationRec#location.status), ?assertEqual(PubKeyBin1, DETSLocationRec#location.gateway), ?assertEqual(ExpectedIndex, DETSLocationRec#location.h3_index), ?assertEqual(ExpectedLat, DETSLocationRec#location.lat), diff --git a/test/hpr_metrics_SUITE.erl b/test/hpr_metrics_SUITE.erl index e15a5a01..91bc78aa 100644 --- a/test/hpr_metrics_SUITE.erl +++ b/test/hpr_metrics_SUITE.erl @@ -191,7 +191,7 @@ main_test(_Config) -> fun() -> undefined =/= prometheus_histogram:value(?METRICS_ICS_GATEWAY_LOCATION_HISTOGRAM, [ - error + not_found ]) end ), diff --git a/test/hpr_protocol_http_roaming_packet_SUITE.erl b/test/hpr_protocol_http_roaming_packet_SUITE.erl index c5c370ed..cb3a960a 100644 --- a/test/hpr_protocol_http_roaming_packet_SUITE.erl +++ b/test/hpr_protocol_http_roaming_packet_SUITE.erl @@ -760,6 +760,11 @@ uplink_with_gateway_location_test(_Config) -> IndexString ), + %% Trigger an update location + ?assertEqual( + {error, not_found}, hpr_gateway_location:get(PubKeyBin) + ), + ok = start_uplink_listener(), SendPacketFun = fun(DevAddr) -> From 6b0804b73d2ade1190c6a8be71f9ef35fa166006 Mon Sep 17 00:00:00 2001 From: Macpie Date: Mon, 4 Dec 2023 13:22:49 -0800 Subject: [PATCH 2/5] Add log --- src/grpc/iot_config/hpr_gateway_location.erl | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/src/grpc/iot_config/hpr_gateway_location.erl b/src/grpc/iot_config/hpr_gateway_location.erl index 55389c63..87dde382 100644 --- a/src/grpc/iot_config/hpr_gateway_location.erl +++ b/src/grpc/iot_config/hpr_gateway_location.erl @@ -107,8 +107,9 @@ get(PubKeyBin) -> {error, undefined}; [#location{status = error}] -> {error, undefined}; - [#location{status = requested, timestamp = T}] when T < LastHour -> - %% LOG + [#location{status = requested, timestamp = T, gateway = PubKeyBin}] when T < LastHour -> + GatewayName = hpr_utils:gateway_name(PubKeyBin), + lager:warning("got an old request for ~p ~s", [PubKeyBin, GatewayName]), ok = ?MODULE:update_location(PubKeyBin), {error, ?REQUESTED}; [#location{status = requested}] -> From f753991a831d57c3d05a341335b2401ba69205f0 Mon Sep 17 00:00:00 2001 From: Michael Jeffrey Date: Mon, 4 Dec 2023 16:01:41 -0700 Subject: [PATCH 3/5] fixup gwmp location test, wait for location request --- test/hpr_protocol_gwmp_SUITE.erl | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/test/hpr_protocol_gwmp_SUITE.erl b/test/hpr_protocol_gwmp_SUITE.erl index 570a9aa4..8dd35af3 100644 --- a/test/hpr_protocol_gwmp_SUITE.erl +++ b/test/hpr_protocol_gwmp_SUITE.erl @@ -112,6 +112,17 @@ with_location_test(_Config) -> h3_index_str => IndexString }), + %% Wait until the location has been fetched + PubKeyBin = hpr_test_gateway:pubkey_bin(GatewayPid), + ok = test_utils:wait_until(fun() -> + case hpr_gateway_location:get(PubKeyBin) of + {ok, _, _, _} -> + true; + Other -> + {false, Other} + end + end), + %% Send packet and route directly through interface ok = hpr_test_gateway:send_packet(GatewayPid, #{}), From b64d01aef213a14c6b489aac455e39f3c8687f6d Mon Sep 17 00:00:00 2001 From: Macpie Date: Mon, 4 Dec 2023 15:31:08 -0800 Subject: [PATCH 4/5] Fix test --- test/hpr_protocol_http_roaming_packet_SUITE.erl | 13 +++++++++---- 1 file changed, 9 insertions(+), 4 deletions(-) diff --git a/test/hpr_protocol_http_roaming_packet_SUITE.erl b/test/hpr_protocol_http_roaming_packet_SUITE.erl index cb3a960a..6e28ac34 100644 --- a/test/hpr_protocol_http_roaming_packet_SUITE.erl +++ b/test/hpr_protocol_http_roaming_packet_SUITE.erl @@ -760,10 +760,15 @@ uplink_with_gateway_location_test(_Config) -> IndexString ), - %% Trigger an update location - ?assertEqual( - {error, not_found}, hpr_gateway_location:get(PubKeyBin) - ), + %% Trigger an update location and wait until the location has been fetched + ok = test_utils:wait_until(fun() -> + case hpr_gateway_location:get(PubKeyBin) of + {ok, _, _, _} -> + true; + Other -> + {false, Other} + end + end), ok = start_uplink_listener(), From adbcafb4a0ebe68a8edbc12db56004180c13fb98 Mon Sep 17 00:00:00 2001 From: Macpie Date: Mon, 4 Dec 2023 16:03:22 -0800 Subject: [PATCH 5/5] Improve test --- test/hpr_gateway_location_SUITE.erl | 14 ++++++++++---- 1 file changed, 10 insertions(+), 4 deletions(-) diff --git a/test/hpr_gateway_location_SUITE.erl b/test/hpr_gateway_location_SUITE.erl index 557685e3..d19cb13d 100644 --- a/test/hpr_gateway_location_SUITE.erl +++ b/test/hpr_gateway_location_SUITE.erl @@ -71,10 +71,16 @@ main_test(_Config) -> %% The location update is now async Before = erlang:system_time(millisecond) - 1, - ?assertEqual( - {error, not_found}, hpr_gateway_location:get(PubKeyBin1) - ), - timer:sleep(15), + + %% Wait until the location has been fetched + ok = test_utils:wait_until(fun() -> + case hpr_gateway_location:get(PubKeyBin1) of + {ok, _, _, _} -> + true; + Other -> + {false, Other} + end + end), %% Make request to get gateway location ?assertEqual(