diff --git a/config/sys.config b/config/sys.config index f42d6dd..e30b1be 100644 --- a/config/sys.config +++ b/config/sys.config @@ -2,7 +2,9 @@ [ {pg_types, [{json_config, {jsone, [], [{keys, atom}]}}]}, {blockchain_node, [ - {jsonrpc_port, 4467} + {jsonrpc_port, 4467}, + {metrics, [block_metrics, txn_metrics, grpc_metrics]}, + {metrics_port, 9090} ]}, {kernel, [ %% force distributed erlang to only run on localhost diff --git a/rebar.config b/rebar.config index 6b7b86d..be10b68 100644 --- a/rebar.config +++ b/rebar.config @@ -28,6 +28,8 @@ jsone, {jsonrpc2, {git, "https://github.com/novalabsxyz/jsonrpc2-erlang.git", {branch, "master"}}}, {observer_cli, "1.7.1"}, + {telemetry, "1.1.0"}, + {prometheus, "4.8.2"}, pbkdf2 ]}. diff --git a/rebar.lock b/rebar.lock index a59a414..b752199 100644 --- a/rebar.lock +++ b/rebar.lock @@ -8,7 +8,7 @@ {<<"base64url">>,{pkg,<<"base64url">>,<<"1.0.1">>},1}, {<<"blockchain">>, {git,"https://github.com/helium/blockchain-core.git", - {ref,"4e94954d49a3ac24af50d01d58d8963b953ec3a3"}}, + {ref,"c7ff53f62a43b2b70badd4698d3ac302401de63d"}}, 0}, {<<"certifi">>,{pkg,<<"certifi">>,<<"2.9.0">>},2}, {<<"chatterbox">>, @@ -111,8 +111,8 @@ {<<"parse_trans">>,{pkg,<<"parse_trans">>,<<"3.3.1">>},2}, {<<"pbkdf2">>,{pkg,<<"pbkdf2">>,<<"2.0.0">>},0}, {<<"procket">>,{pkg,<<"procket">>,<<"0.9.6">>},3}, - {<<"prometheus">>,{pkg,<<"prometheus">>,<<"4.8.1">>},2}, - {<<"quantile_estimator">>,{pkg,<<"quantile_estimator">>,<<"0.2.1">>},3}, + {<<"prometheus">>,{pkg,<<"prometheus">>,<<"4.8.2">>},0}, + {<<"quantile_estimator">>,{pkg,<<"quantile_estimator">>,<<"0.2.1">>},1}, {<<"ranch">>,{pkg,<<"ranch">>,<<"1.5.0">>},2}, {<<"rand_compat">>,{pkg,<<"rand_compat">>,<<"0.0.3">>},3}, {<<"recon">>,{pkg,<<"recon">>,<<"2.5.2">>},1}, @@ -128,7 +128,7 @@ {<<"small_ints">>,{pkg,<<"small_ints">>,<<"0.1.0">>},4}, {<<"splicer">>,{pkg,<<"splicer">>,<<"0.5.5">>},2}, {<<"ssl_verify_fun">>,{pkg,<<"ssl_verify_fun">>,<<"1.1.6">>},2}, - {<<"telemetry">>,{pkg,<<"telemetry">>,<<"1.1.0">>},1}, + {<<"telemetry">>,{pkg,<<"telemetry">>,<<"1.1.0">>},0}, {<<"throttle">>,{pkg,<<"lambda_throttle">>,<<"0.2.0">>},2}, {<<"unicode_util_compat">>,{pkg,<<"unicode_util_compat">>,<<"0.7.0">>},2}, {<<"vincenty">>, @@ -169,7 +169,7 @@ {<<"parse_trans">>, <<"16328AB840CC09919BD10DAB29E431DA3AF9E9E7E7E6F0089DD5A2D2820011D8">>}, {<<"pbkdf2">>, <<"11C23279FDED5C0027AB3996CFAE77805521D7EF4BABDE2BD7EC04A9086CF499">>}, {<<"procket">>, <<"467014637052CAB030DD32D5269591B82763420C9572F61CD6C5657FE4579777">>}, - {<<"prometheus">>, <<"FA76B152555273739C14B06F09F485CF6D5D301FE4E9D31B7FF803D26025D7A0">>}, + {<<"prometheus">>, <<"B88F24279DD7A1F512CB090595FF6C88B50AAD0A6B394A4C4983725723DCD834">>}, {<<"quantile_estimator">>, <<"EF50A361F11B5F26B5F16D0696E46A9E4661756492C981F7B2229EF42FF1CD15">>}, {<<"ranch">>, <<"F04166F456790FEE2AC1AA05A02745CC75783C2BFB26D39FAF6AEFC9A3D3A58A">>}, {<<"rand_compat">>, <<"011646BC1F0B0C432FE101B816F25B9BBB74A085713CEE1DAFD2D62E9415EAD3">>}, @@ -210,7 +210,7 @@ {<<"parse_trans">>, <<"07CD9577885F56362D414E8C4C4E6BDF10D43A8767ABB92D24CBE8B24C54888B">>}, {<<"pbkdf2">>, <<"1E793CE6FDB0576613115714DEAE9DFC1D1537EABA74F07EFB36DE139774488D">>}, {<<"procket">>, <<"037287B8D10A747E54A69BDCE56724ECC8BE12BCB8233AE4D6F885F0ED71E895">>}, - {<<"prometheus">>, <<"6EDFBE928D271C7F657A6F2C46258738086584BD6CAE4A000B8B9A6009BA23A5">>}, + {<<"prometheus">>, <<"C3ABD6521E52CEC4F0D8ECA603CF214DFC84D8A27AA85946639F1424B8554D98">>}, {<<"quantile_estimator">>, <<"282A8A323CA2A845C9E6F787D166348F776C1D4A41EDE63046D72D422E3DA946">>}, {<<"ranch">>, <<"86D40FC42AA47BCB6952DDF1DBFD3DA04B5BA69AFB65C322C99845913250B11F">>}, {<<"rand_compat">>, <<"CDF7BE2B17308EC245B912C45FE55741F93B6E4F1A24BA6074F7137B0CC09BF4">>}, diff --git a/src/blockchain_node.app.src b/src/blockchain_node.app.src index fba043b..fce6a90 100644 --- a/src/blockchain_node.app.src +++ b/src/blockchain_node.app.src @@ -14,14 +14,16 @@ compiler, lager, jsone, - elli, jsonrpc2, base64url, pbkdf2, observer_cli, clique, recon, - grpcbox + grpcbox, + elli, + telemetry, + prometheus ] }, {included_applications, [blockchain]}, diff --git a/src/bn_sup.erl b/src/bn_sup.erl index 4387154..84418ee 100644 --- a/src/bn_sup.erl +++ b/src/bn_sup.erl @@ -93,7 +93,8 @@ init([]) -> [{follower_module, {bn_oracle_price, [{base_dir, BaseDir}]}}] ]), ?WORKER(bn_wallets, [[{base_dir, BaseDir}]]), - ?WORKER(elli, [[{callback, bn_jsonrpc_handler}, {port, NodePort}]]) + ?WORKER(elli, [[{callback, bn_jsonrpc_handler}, {port, NodePort}]]), + ?WORKER(bn_metrics_server, []) ]}}. random_val_predicate(Peer) -> diff --git a/src/metrics/bn_metrics_exporter.erl b/src/metrics/bn_metrics_exporter.erl new file mode 100644 index 0000000..88cc351 --- /dev/null +++ b/src/metrics/bn_metrics_exporter.erl @@ -0,0 +1,19 @@ +-module(bn_metrics_exporter). + +-behaviour(elli_handler). + +-include_lib("elli/include/elli.hrl"). + +-export([handle/2, handle_event/3]). + +handle(Req, _Args) -> + handle(Req#req.method, elli_request:path(Req), Req). + +%% Expose `/metrics` for Prometheus as a scrape target +handle('GET', [<<"metrics">>], _Req) -> + {ok, [], prometheus_text_format:format()}; +handle(_Verb, _Path, _Req) -> + ignore. + +handle_event(_Event, _Data, _Args) -> + ok. diff --git a/src/metrics/bn_metrics_server.erl b/src/metrics/bn_metrics_server.erl new file mode 100644 index 0000000..7c3cd76 --- /dev/null +++ b/src/metrics/bn_metrics_server.erl @@ -0,0 +1,175 @@ +-module(bn_metrics_server). + +-behaviour(gen_server). + +-include("metrics.hrl"). + +-export([ + handle_metric/4, + start_link/0 +]). + +-export([ + init/1, + handle_call/3, + handle_cast/2, + handle_info/2, + terminate/2, + code_change/3 +]). + +-define(SERVER, ?MODULE). + +-type metric() :: { + Metric :: string(), + Event :: [atom()], + PrometheusHandler :: module(), + Labels :: [atom()], + Description :: string() +}. + +-type metrics() :: [metric()]. + +-type exporter_opts() :: [ + {callback, module()} | + {callback_args, map()} | + {port, integer()} +]. + +-record(state, { + metrics :: metrics(), + exporter_opts :: exporter_opts(), + exporter_pid :: pid() | undefined +}). + +handle_metric(Event, Measurements, Metadata, _Config) -> + handle_metric_event(Event, Measurements, Metadata). + +start_link() -> + gen_server:start_link({local, ?SERVER}, ?SERVER, [], []). + +init(_Args) -> + case get_configs() of + {[], []} -> ignore; + {[_ | _], [_ | _] = Metrics} = MetricConfigs -> + erlang:process_flag(trap_exit, true), + + ok = setup_metrics(MetricConfigs), + + ElliOpts = [ + {callback, bn_metrics_exporter}, + {callback_args, #{}}, + {port, application:get_env(blockchain_node, metrics_port, 9090)} + ], + {ok, ExporterPid} = elli:start_link(ElliOpts), + {ok, #state{ + metrics = Metrics, + exporter_opts = ElliOpts, + exporter_pid = ExporterPid}} + end. + +handle_call(_Msg, _From, State) -> + lager:debug("Received unknown call msg: ~p from ~p", [_Msg, _From]), + {reply, ok, State}. + +handle_cast(_Msg, State) -> + {noreply, State}. + +handle_info({'EXIT', ExporterPid, Reason}, #state{exporter_pid=ExporterPid} = State) -> + lager:warning("Metrics exporter exited with reason ~p, restarting", [Reason]), + {ok, NewExporter} = elli:start_link(State#state.exporter_opts), + {noreply, State#state{exporter_pid = NewExporter}}; +handle_info(_Msg, State) -> + lager:debug("Received unknown info msg: ~p", [_Msg]), + {noreply, State}. + +code_change(_OldVsn, State, _Extra) -> + {ok, State}. + +terminate(Reason, #state{metrics = Metrics, exporter_pid = Exporter}) -> + true = erlang:exit(Exporter, Reason), + lists:foreach( + fun({Metric, Module, _, _}) -> + lager:info("De-registering metric ~p as ~p", [Metric, Module]), + Module:deregister(Metric) + end, + Metrics + ). + +setup_metrics({EventNames, EventSpecs}) -> + lager:info("METRICS ~p", [EventSpecs]), + lists:foreach( + fun({Metric, Module, Meta, Description}) -> + lager:info("Declaring metric ~p as ~p meta=~p", [Metric, Module, Meta]), + MetricOpts = [{name, Metric}, {help, Description}, {labels, Meta}], + case Module of + prometheus_histogram -> + Module:declare(MetricOpts ++ [{buckets, ?METRICS_HISTOGRAM_BUCKETS}]); + _ -> + Module:declare(MetricOpts) + end + end, + EventSpecs + ), + + ok = telemetry:attach_many(<<"bn-metrics-handler">>, EventNames, fun bn_metrics_server:handle_metric/4, []). + +get_configs() -> + lists:foldl( + fun(Metric, {Names, Specs} = Acc) -> + case maps:get(Metric, ?METRICS, undefined) of + undefined -> Acc; + {N, S} -> {Names ++ N, Specs ++ S} + end + end, + {[], []}, + application:get_env(blockchain_node, metrics, []) + ). + +handle_metric_event([blockchain, block, absorb], #{duration := Duration}, #{stage := Stage}) -> + prometheus_histogram:observe(?METRICS_BLOCK_ABSORB, [Stage], Duration), + ok; +handle_metric_event([blockchain, block, height], #{height := Height}, #{time := Time}) -> + prometheus_gauge:set(?METRICS_BLOCK_HEIGHT, [Time], Height), + ok; +handle_metric_event([blockchain, block, unvalidated_absorb], #{duration := Duration}, #{stage := Stage}) -> + prometheus_histogram:observe(?METRICS_BLOCK_UNVAL_ABSORB, [Stage], Duration), + ok; +handle_metric_event([blockchain, block, unvalidated_height], #{height := Height}, #{time := Time}) -> + prometheus_gauge:set(?METRICS_BLOCK_UNVAL_HEIGHT, [Time], Height), + ok; +handle_metric_event([blockchain, txn, absorb], #{duration := Duration}, #{type := Type}) -> + prometheus_histogram:observe(?METRICS_TXN_ABSORB_DURATION, [Type], Duration), + ok; +handle_metric_event([blockchain, txn_mgr, submit], _Measurements, #{type := Type}) -> + prometheus_counter:inc(?METRICS_TXN_SUBMIT_COUNT, [Type]), + ok; +handle_metric_event([blockchain, txn_mgr, reject], #{block_span := Span}, #{type := Type}) -> + prometheus_counter:inc(?METRICS_TXN_REJECT_COUNT, [Type]), + prometheus_gauge:set(?METRICS_TXN_BLOCK_SPAN, [], Span), + ok; +handle_metric_event([blockchain, txn_mgr, accept], #{block_span := Span, queue_len := QLen}, #{type := Type}) -> + prometheus_counter:inc(?METRICS_TXN_ACCEPT_COUNT, [Type]), + prometheus_gauge:set(?METRICS_TXN_BLOCK_SPAN, [], Span), + prometheus_gauge:set(?METRICS_TXN_QUEUE, [], QLen), + ok; +handle_metric_event([blockchain, txn_mgr, update], #{block_span := Span, queue_len := QLen}, #{type := Type}) -> + prometheus_counter:inc(?METRICS_TXN_UPDATE_COUNT, [Type]), + prometheus_gauge:set(?METRICS_TXN_BLOCK_SPAN, [], Span), + prometheus_gauge:set(?METRICS_TXN_QUEUE, [], QLen), + ok; +handle_metric_event([blockchain, txn_mgr, process], #{duration := Duration}, #{stage := Stage}) -> + prometheus_histogram:observe(?METRICS_TXN_PROCESS_DURATION, [Stage], Duration), + ok; +handle_metric_event([blockchain, txn_mgr, add_block], #{cache := Cache, block_time := BlockTime, block_age := BlockAge}, #{height := Height}) -> + prometheus_gauge:set(?METRICS_TXN_CACHE_SIZE, [Height], Cache), + prometheus_gauge:set(?METRICS_TXN_BLOCK_TIME, [Height], BlockTime), + prometheus_gauge:set(?METRICS_TXN_BLOCK_AGE, [Height], BlockAge), + ok; +handle_metric_event([grpcbox, server, rpc_end], #{server_latency := Latency}, #{grpc_server_method := Method, grpc_server_status := Status}) -> + prometheus_gauge:dec(?METRICS_GRPC_SESSIONS, [Method]), + prometheus_histogram:observe(?METRICS_GRPC_LATENCY, [Method, Status], Latency), + ok; +handle_metric_event([grpcbox, server, rpc_begin], _Measurements, #{grpc_server_method := Method}) -> + prometheus_gauge:inc(?METRICS_GRPC_SESSIONS, [Method]), + ok. diff --git a/src/metrics/metrics.hrl b/src/metrics/metrics.hrl new file mode 100644 index 0000000..b5e4f50 --- /dev/null +++ b/src/metrics/metrics.hrl @@ -0,0 +1,58 @@ +-define(METRICS_HISTOGRAM_BUCKETS, [50, 100, 250, 500, 1000, 2000, 5000, 10000, 30000, 60000]). + +-define(METRICS_BLOCK_ABSORB, "blockchain_block_absorb_duration"). +-define(METRICS_BLOCK_UNVAL_ABSORB, "blockchain_block_unval_absorb_duration"). +-define(METRICS_BLOCK_HEIGHT, "blockchain_block_height"). +-define(METRICS_BLOCK_UNVAL_HEIGHT, "blockchain_block_unval_height"). +-define(METRICS_TXN_ABSORB_DURATION, "blockchain_txn_absorb_duration"). +-define(METRICS_TXN_BLOCK_SPAN, "blockchain_txn_mgr_block_span"). +-define(METRICS_TXN_QUEUE, "blockchain_txn_mgr_queue"). +-define(METRICS_TXN_SUBMIT_COUNT, "blockchain_txn_mgr_submitted_count"). +-define(METRICS_TXN_REJECT_COUNT, "blockchain_txn_mgr_rejected_count"). +-define(METRICS_TXN_ACCEPT_COUNT, "blockchain_txn_mgr_accepted_count"). +-define(METRICS_TXN_UPDATE_COUNT, "blockchain_txn_mgr_updated_count"). +-define(METRICS_TXN_PROCESS_DURATION, "blockchain_txn_mgr_process_duration"). +-define(METRICS_TXN_CACHE_SIZE, "blockchain_txn_mgr_cache_size"). +-define(METRICS_TXN_BLOCK_TIME, "blockchain_txn_mgr_block_time"). +-define(METRICS_TXN_BLOCK_AGE, "blockchain_txn_mgr_block_age"). +-define(METRICS_GRPC_SESSIONS, "grpc_session_count"). +-define(METRICS_GRPC_LATENCY, "grpcbox_session_latency"). + +-define(METRICS, #{ + block_metrics => { + [ [blockchain, block, absorb], + [blockchain, block, height], + [blockchain, block, unvalidated_absorb], + [blockchain, block, unvalidated_height] ], + [ {?METRICS_BLOCK_ABSORB, prometheus_histogram, [stage], "Block absorb duration"}, + {?METRICS_BLOCK_HEIGHT, prometheus_gauge, [time], "Most recent block height"}, + {?METRICS_BLOCK_UNVAL_ABSORB, prometheus_histogram, [stage], "Block unvalidated absorb duration"}, + {?METRICS_BLOCK_UNVAL_HEIGHT, prometheus_gauge, [time], "Most recent unvalidated block height"} ] + }, + txn_metrics => { + [ [blockchain, txn, absorb], + [blockchain, txn_mgr, submit], + [blockchain, txn_mgr, reject], + [blockchain, txn_mgr, accept], + [blockchain, txn_mgr, update], + [blockchain, txn_mgr, process], + [blockchain, txn_mgr, add_block] ], + [ {?METRICS_TXN_ABSORB_DURATION, prometheus_histogram, [stage], "Txn absorb duration"}, + {?METRICS_TXN_BLOCK_SPAN, prometheus_gauge, [], "Block span on transactions"}, + {?METRICS_TXN_QUEUE, prometheus_gauge, [], "Txn manager submission queue length"}, + {?METRICS_TXN_SUBMIT_COUNT, prometheus_counter, [type], "Count of submitted transactions"}, + {?METRICS_TXN_REJECT_COUNT, prometheus_counter, [type], "Count of rejected transactions"}, + {?METRICS_TXN_ACCEPT_COUNT, prometheus_counter, [type], "Count of accepted transactions"}, + {?METRICS_TXN_UPDATE_COUNT, prometheus_counter, [type], "Count of updated transaction"}, + {?METRICS_TXN_PROCESS_DURATION, prometheus_histogram, [stage], "Transaction manager cache process duration"}, + {?METRICS_TXN_CACHE_SIZE, prometheus_gauge, [height], "Transaction manager buffer size"}, + {?METRICS_TXN_BLOCK_TIME, prometheus_gauge, [height], "Block time observed from the transaction manager"}, + {?METRICS_TXN_BLOCK_AGE, prometheus_gauge, [height], "Block age observed from the transaction manager"} ] + }, + grpc_metrics => { + [ [grpcbox, server, rpc_begin], + [grpcbox, server, rpc_end] ], + [ {?METRICS_GRPC_SESSIONS, prometheus_gauge, [method], "GRPC session count"}, + {?METRICS_GRPC_LATENCY, prometheus_histogram, [method, status], "GRPC session latency"} ] + } +}).