From e1f398127845080c359bbe99c01725f16e1abcba Mon Sep 17 00:00:00 2001 From: ieQu1 <99872536+ieQu1@users.noreply.github.com> Date: Mon, 15 Jan 2024 03:10:04 +0100 Subject: [PATCH] feat: Add verify sequence to persistent_session scenario --- src/behaviors/emqttb_behavior_sub.erl | 27 +++++++++++++++++-- src/framework/emqttb_worker.erl | 6 ++++- src/metrics/emqttb_metrics.erl | 3 ++- .../emqttb_scenario_persistent_session.erl | 22 ++++++++++----- 4 files changed, 47 insertions(+), 11 deletions(-) diff --git a/src/behaviors/emqttb_behavior_sub.erl b/src/behaviors/emqttb_behavior_sub.erl index 36e6555..8391b7f 100644 --- a/src/behaviors/emqttb_behavior_sub.erl +++ b/src/behaviors/emqttb_behavior_sub.erl @@ -93,6 +93,13 @@ model(GroupId) -> , id => {emqttb_repeat_size, GroupId} , labels => [group] }} + , n_streams => + {[metric], + #{ oneliner => "Number of sequences" + , metric_type => gauge + , id => {emqttb_n_sequences, GroupId} + , labels => [group] + }} , e2e_latency => {[metric], @@ -122,8 +129,10 @@ init_per_group(_Group, , parse_metadata => ParseMetadata , verify_sequence => false }, + NStreams = emqttb_metrics:from_model(MetricsModelKey ++ [n_streams]), + emqttb_metrics:gauge_set(NStreams, 0), Conf = maps:merge(Defaults, Opts), - ets:new(?seq_tab, [named_table, public, {write_concurrency, true}]), + ensure_sequence_table(), Conf#{ conn_opstat => emqttb_metrics:opstat_from_model(MetricsModelKey ++ [conn_latency]) , sub_opstat => emqttb_metrics:opstat_from_model(MetricsModelKey ++ [sub_latency]) , e2e_latency => emqttb_metrics:from_model(MetricsModelKey ++ [e2e_latency]) @@ -132,6 +141,7 @@ init_per_group(_Group, , gap_size => emqttb_metrics:from_model(MetricsModelKey ++ [gap_size]) , number_of_repeats => emqttb_metrics:from_model(MetricsModelKey ++ [number_of_repeats]) , repeat_size => emqttb_metrics:from_model(MetricsModelKey ++ [repeat_size]) + , n_streams => NStreams }. init(SubOpts0 = #{ topic := T @@ -183,11 +193,13 @@ terminate(_Shared, Conn) -> %% Internal functions %%================================================================================ -verify_sequence(#{number_of_gaps := NGaps, gap_size := GapSize, number_of_repeats := NRepeats, repeat_size := RepeatSize}, +verify_sequence(#{ number_of_gaps := NGaps, gap_size := GapSize, number_of_repeats := NRepeats + , repeat_size := RepeatSize, n_streams := NStreams}, From, Topic, SeqNo) -> Key = {From, emqttb_worker:my_id(), Topic}, case ets:lookup(?seq_tab, Key) of [] -> + emqttb_metrics:counter_inc(NStreams, 1), ok; [{_, OldSeqNo}] when SeqNo =:= OldSeqNo + 1 -> ok; @@ -201,3 +213,14 @@ verify_sequence(#{number_of_gaps := NGaps, gap_size := GapSize, number_of_repeat emqttb_metrics:rolling_average_observe(RepeatSize, SeqNo - OldSeqNo + 1) end, ets:insert(?seq_tab, {Key, SeqNo}). + +ensure_sequence_table() -> + catch ets:new(?seq_tab, + [ named_table + , set + , public + , {write_concurrency, true} + , {read_concurrency, true} + , {heir, whereis(emqttb_metrics), ?seq_tab} + ]), + ok. diff --git a/src/framework/emqttb_worker.erl b/src/framework/emqttb_worker.erl index fe078bd..b62e14f 100644 --- a/src/framework/emqttb_worker.erl +++ b/src/framework/emqttb_worker.erl @@ -1,5 +1,5 @@ %%-------------------------------------------------------------------- -%% Copyright (c) 2022-2023 EMQ Technologies Co., Ltd. All Rights Reserved. +%% Copyright (c) 2022-2024 EMQ Technologies Co., Ltd. All Rights Reserved. %% %% Licensed under the Apache License, Version 2.0 (the "License"); %% you may not use this file except in compliance with the License. @@ -164,6 +164,10 @@ connect(ConnOpstat, Properties0, CustomOptions, CustomTcpOptions, CustomSslOptio , {keepalive, KeepAlive} ], {ok, Client} = emqtt:start_link(CustomOptions ++ Options), + %% dbg:tracer(), + %% dbg:p(Client, [c, s, r]), + %% dbg:tpl({emqtt, '_', '_'}, x), + %% logger:error("Debug ~p", [CustomOptions ++ Options]), ConnectFun = connect_fun(), {ok, _Properties} = emqttb_metrics:call_with_counter(ConnOpstat, emqtt, ConnectFun, [Client]), {ok, Client}. diff --git a/src/metrics/emqttb_metrics.erl b/src/metrics/emqttb_metrics.erl index 970c50c..08f9216 100644 --- a/src/metrics/emqttb_metrics.erl +++ b/src/metrics/emqttb_metrics.erl @@ -108,7 +108,8 @@ call_with_counter({AvgTime, NPending}, Mod, Fun, Args) -> T0 = os:system_time(microsecond), try apply(Mod, Fun, Args) catch - EC:Err -> + EC:Err:Stack -> + logger:error("~p:~p~p~n~p:~p~nStack:~p", [Mod, Fun, Args, EC, Err, Stack]), EC(Err) after T = os:system_time(microsecond), diff --git a/src/scenarios/emqttb_scenario_persistent_session.erl b/src/scenarios/emqttb_scenario_persistent_session.erl index b23b479..db13bd9 100644 --- a/src/scenarios/emqttb_scenario_persistent_session.erl +++ b/src/scenarios/emqttb_scenario_persistent_session.erl @@ -1,5 +1,5 @@ %%-------------------------------------------------------------------- -%% Copyright (c) 2022-2023 EMQ Technologies Co., Ltd. All Rights Reserved. +%% Copyright (c) 2022-2024 EMQ Technologies Co., Ltd. All Rights Reserved. %% %% Licensed under the Apache License, Version 2.0 (the "License"); %% you may not use this file except in compliance with the License. @@ -142,7 +142,7 @@ model() -> , expiry => {[value, cli_param], #{ oneliner => "Session expiry interval" - , type => non_neg_integer() + , type => range(0, 16#FFFFFFFF) , default => 16#FFFFFFFF , cli_operand => "expiry" }} @@ -181,6 +181,13 @@ model() -> , default_str => "10s" , cli_operand => "max-stuck-time" }} + , verify_sequence => + {[value, cli_param], + #{ oneliner => "Run message sequence number analysis to check for gaps and unexpected repeats" + , type => boolean() + , default => false + , cli_operand => "verify-sequence" + }} }. initial_config() -> @@ -224,11 +231,12 @@ do_run(S0, N) -> consume_stage(Cycle, S) -> TopicPrefix = topic_prefix(), - SubOpts = #{ topic => <> - , qos => my_conf([sub, qos]) - , expiry => my_conf([sub, expiry]) - , clean_start => Cycle =:= 0 - , metrics => my_conf_key([sub, metrics]) + SubOpts = #{ topic => <> + , qos => my_conf([sub, qos]) + , expiry => my_conf([sub, expiry]) + , clean_start => Cycle =:= 0 + , metrics => my_conf_key([sub, metrics]) + , verify_sequence => my_conf([verify_sequence]) }, emqttb_group:ensure(#{ id => ?SUB_GROUP , client_config => my_conf([group])