Skip to content

Commit

Permalink
feat: Add verify sequence to persistent_session scenario
Browse files Browse the repository at this point in the history
  • Loading branch information
ieQu1 committed May 13, 2024
1 parent 4977603 commit e1f3981
Show file tree
Hide file tree
Showing 4 changed files with 47 additions and 11 deletions.
27 changes: 25 additions & 2 deletions src/behaviors/emqttb_behavior_sub.erl
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,13 @@ model(GroupId) ->
, id => {emqttb_repeat_size, GroupId}
, labels => [group]
}}
, n_streams =>
{[metric],
#{ oneliner => "Number of sequences"
, metric_type => gauge
, id => {emqttb_n_sequences, GroupId}
, labels => [group]
}}

, e2e_latency =>
{[metric],
Expand Down Expand Up @@ -122,8 +129,10 @@ init_per_group(_Group,
, parse_metadata => ParseMetadata
, verify_sequence => false
},
NStreams = emqttb_metrics:from_model(MetricsModelKey ++ [n_streams]),
emqttb_metrics:gauge_set(NStreams, 0),
Conf = maps:merge(Defaults, Opts),
ets:new(?seq_tab, [named_table, public, {write_concurrency, true}]),
ensure_sequence_table(),
Conf#{ conn_opstat => emqttb_metrics:opstat_from_model(MetricsModelKey ++ [conn_latency])
, sub_opstat => emqttb_metrics:opstat_from_model(MetricsModelKey ++ [sub_latency])
, e2e_latency => emqttb_metrics:from_model(MetricsModelKey ++ [e2e_latency])
Expand All @@ -132,6 +141,7 @@ init_per_group(_Group,
, gap_size => emqttb_metrics:from_model(MetricsModelKey ++ [gap_size])
, number_of_repeats => emqttb_metrics:from_model(MetricsModelKey ++ [number_of_repeats])
, repeat_size => emqttb_metrics:from_model(MetricsModelKey ++ [repeat_size])
, n_streams => NStreams
}.

init(SubOpts0 = #{ topic := T
Expand Down Expand Up @@ -183,11 +193,13 @@ terminate(_Shared, Conn) ->
%% Internal functions
%%================================================================================

verify_sequence(#{number_of_gaps := NGaps, gap_size := GapSize, number_of_repeats := NRepeats, repeat_size := RepeatSize},
verify_sequence(#{ number_of_gaps := NGaps, gap_size := GapSize, number_of_repeats := NRepeats
, repeat_size := RepeatSize, n_streams := NStreams},
From, Topic, SeqNo) ->
Key = {From, emqttb_worker:my_id(), Topic},
case ets:lookup(?seq_tab, Key) of
[] ->
emqttb_metrics:counter_inc(NStreams, 1),
ok;
[{_, OldSeqNo}] when SeqNo =:= OldSeqNo + 1 ->
ok;
Expand All @@ -201,3 +213,14 @@ verify_sequence(#{number_of_gaps := NGaps, gap_size := GapSize, number_of_repeat
emqttb_metrics:rolling_average_observe(RepeatSize, SeqNo - OldSeqNo + 1)
end,
ets:insert(?seq_tab, {Key, SeqNo}).

ensure_sequence_table() ->
catch ets:new(?seq_tab,
[ named_table
, set
, public
, {write_concurrency, true}
, {read_concurrency, true}
, {heir, whereis(emqttb_metrics), ?seq_tab}
]),
ok.
6 changes: 5 additions & 1 deletion src/framework/emqttb_worker.erl
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2022-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
%% Copyright (c) 2022-2024 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -164,6 +164,10 @@ connect(ConnOpstat, Properties0, CustomOptions, CustomTcpOptions, CustomSslOptio
, {keepalive, KeepAlive}
],
{ok, Client} = emqtt:start_link(CustomOptions ++ Options),
%% dbg:tracer(),
%% dbg:p(Client, [c, s, r]),
%% dbg:tpl({emqtt, '_', '_'}, x),
%% logger:error("Debug ~p", [CustomOptions ++ Options]),
ConnectFun = connect_fun(),
{ok, _Properties} = emqttb_metrics:call_with_counter(ConnOpstat, emqtt, ConnectFun, [Client]),
{ok, Client}.
Expand Down
3 changes: 2 additions & 1 deletion src/metrics/emqttb_metrics.erl
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,8 @@ call_with_counter({AvgTime, NPending}, Mod, Fun, Args) ->
T0 = os:system_time(microsecond),
try apply(Mod, Fun, Args)
catch
EC:Err ->
EC:Err:Stack ->
logger:error("~p:~p~p~n~p:~p~nStack:~p", [Mod, Fun, Args, EC, Err, Stack]),
EC(Err)
after
T = os:system_time(microsecond),
Expand Down
22 changes: 15 additions & 7 deletions src/scenarios/emqttb_scenario_persistent_session.erl
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2022-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
%% Copyright (c) 2022-2024 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -142,7 +142,7 @@ model() ->
, expiry =>
{[value, cli_param],
#{ oneliner => "Session expiry interval"
, type => non_neg_integer()
, type => range(0, 16#FFFFFFFF)
, default => 16#FFFFFFFF
, cli_operand => "expiry"
}}
Expand Down Expand Up @@ -181,6 +181,13 @@ model() ->
, default_str => "10s"
, cli_operand => "max-stuck-time"
}}
, verify_sequence =>
{[value, cli_param],
#{ oneliner => "Run message sequence number analysis to check for gaps and unexpected repeats"
, type => boolean()
, default => false
, cli_operand => "verify-sequence"
}}
}.

initial_config() ->
Expand Down Expand Up @@ -224,11 +231,12 @@ do_run(S0, N) ->

consume_stage(Cycle, S) ->
TopicPrefix = topic_prefix(),
SubOpts = #{ topic => <<TopicPrefix/binary, "#">>
, qos => my_conf([sub, qos])
, expiry => my_conf([sub, expiry])
, clean_start => Cycle =:= 0
, metrics => my_conf_key([sub, metrics])
SubOpts = #{ topic => <<TopicPrefix/binary, "#">>
, qos => my_conf([sub, qos])
, expiry => my_conf([sub, expiry])
, clean_start => Cycle =:= 0
, metrics => my_conf_key([sub, metrics])
, verify_sequence => my_conf([verify_sequence])
},
emqttb_group:ensure(#{ id => ?SUB_GROUP
, client_config => my_conf([group])
Expand Down

0 comments on commit e1f3981

Please sign in to comment.