diff --git a/doc/src/schema.adoc b/doc/src/schema.adoc index 9ebd813..2bccd3b 100644 --- a/doc/src/schema.adoc +++ b/doc/src/schema.adoc @@ -106,6 +106,25 @@ WARNING: In order to measure latency accurately, the scenario should ensure that Otherwise clock skew between different load generator instances will introduce a systematic error. +[id=scenarios.sub._.verify_sequence] +== Verify sequence of messages + +When this option is enabled, emqttb will parse the metadata embedded in the messages and check for missing or duplicated messages. +This option implies <>. + +Errors about missing messages and warnings about duplicate messages are printed to the `emqttb.log`. +Relevant prometheus metrics include: + +- `emqttb_repeats_number` -- number of times when the sequence number of the message goes backwards +- `emqttb_gaps_number` -- number of times when the sequence number of the message skips the messages (a gap) +- `emqttb_repeat_size` -- rolling average; size of the repeated sequence +- `emqttb_gap_size` -- rolling average; size of the gap + + +WARNING: Publishers should insert metadata into the payloads in order for this feature to work. + +WARNING: This feature can use a lot of RAM to store the sequence numbers for each triple of sender client id, receiver client id, and MQTT topic. + === Client groups - `sub` diff --git a/emqttb-dashboard.json b/emqttb-dashboard.json index cd3a370..ab10d6d 100644 --- a/emqttb-dashboard.json +++ b/emqttb-dashboard.json @@ -24,8 +24,8 @@ "fiscalYearStartMonth": 0, "gnetId": null, "graphTooltip": 0, - "id": 7, - "iteration": 1702000316244, + "id": 1, + "iteration": 1705260864053, "links": [], "liveNow": false, "panels": [ @@ -870,7 +870,7 @@ } ] }, - "unit": "none" + "unit": "µs" }, "overrides": [] }, @@ -880,6 +880,90 @@ "x": 12, "y": 33 }, + "id": 13, + "interval": "1s", + "options": { + "legend": { + "calcs": [], + "displayMode": "list", + "placement": "bottom" + }, + "tooltip": { + "mode": "single" + } + }, + "targets": [ + { + "exemplar": true, + "expr": "emqttb_group_op_time", + "interval": "", + "legendFormat": "{{group}}/{{operation}}", + "refId": "A" + } + ], + "title": "Average latency", + "type": "timeseries" + }, + { + "datasource": "${Prometheus}", + "description": "", + "fieldConfig": { + "defaults": { + "color": { + "mode": "palette-classic" + }, + "custom": { + "axisLabel": "", + "axisPlacement": "auto", + "barAlignment": 0, + "drawStyle": "line", + "fillOpacity": 0, + "gradientMode": "none", + "hideFrom": { + "legend": false, + "tooltip": false, + "viz": false + }, + "lineInterpolation": "linear", + "lineWidth": 1, + "pointSize": 5, + "scaleDistribution": { + "type": "linear" + }, + "showPoints": "auto", + "spanNulls": false, + "stacking": { + "group": "A", + "mode": "none" + }, + "thresholdsStyle": { + "mode": "off" + } + }, + "mappings": [], + "thresholds": { + "mode": "absolute", + "steps": [ + { + "color": "green", + "value": null + }, + { + "color": "red", + "value": 80 + } + ] + }, + "unit": "none" + }, + "overrides": [] + }, + "gridPos": { + "h": 8, + "w": 12, + "x": 0, + "y": 41 + }, "id": 10, "interval": "1s", "options": { @@ -895,13 +979,198 @@ "targets": [ { "exemplar": true, - "expr": "emqttb_autorate_control", + "expr": "emqttb_gaps_number", + "interval": "", + "legendFormat": "Gap: {{group}}", + "refId": "A" + }, + { + "exemplar": true, + "expr": "emqttb_repeats_number", + "hide": false, + "interval": "", + "legendFormat": "Repeat: {{group}}", + "refId": "B" + } + ], + "title": "Number of gaps and repeats", + "type": "timeseries" + }, + { + "datasource": "${Prometheus}", + "description": "", + "fieldConfig": { + "defaults": { + "color": { + "mode": "palette-classic" + }, + "custom": { + "axisLabel": "", + "axisPlacement": "auto", + "barAlignment": 0, + "drawStyle": "line", + "fillOpacity": 0, + "gradientMode": "none", + "hideFrom": { + "legend": false, + "tooltip": false, + "viz": false + }, + "lineInterpolation": "linear", + "lineWidth": 1, + "pointSize": 5, + "scaleDistribution": { + "type": "linear" + }, + "showPoints": "auto", + "spanNulls": false, + "stacking": { + "group": "A", + "mode": "none" + }, + "thresholdsStyle": { + "mode": "off" + } + }, + "mappings": [], + "thresholds": { + "mode": "absolute", + "steps": [ + { + "color": "green", + "value": null + }, + { + "color": "red", + "value": 80 + } + ] + }, + "unit": "none" + }, + "overrides": [] + }, + "gridPos": { + "h": 8, + "w": 12, + "x": 12, + "y": 41 + }, + "id": 14, + "interval": "1s", + "options": { + "legend": { + "calcs": [], + "displayMode": "list", + "placement": "bottom" + }, + "tooltip": { + "mode": "single" + } + }, + "targets": [ + { + "exemplar": true, + "expr": "emqttb_gap_size", + "interval": "", + "legendFormat": "Gap: {{group}}", + "refId": "A" + }, + { + "exemplar": true, + "expr": "emqttb_repeat_size", + "hide": false, + "interval": "", + "legendFormat": "Repeat: {{group}}", + "refId": "B" + } + ], + "title": "Size of gaps and repeats", + "type": "timeseries" + }, + { + "datasource": "${Prometheus}", + "description": "Count the number of triples sender client id, receiver client id, and topic. \nverify-sequence must be enabled.", + "fieldConfig": { + "defaults": { + "color": { + "mode": "palette-classic" + }, + "custom": { + "axisLabel": "", + "axisPlacement": "auto", + "barAlignment": 0, + "drawStyle": "line", + "fillOpacity": 0, + "gradientMode": "none", + "hideFrom": { + "legend": false, + "tooltip": false, + "viz": false + }, + "lineInterpolation": "linear", + "lineWidth": 1, + "pointSize": 5, + "scaleDistribution": { + "type": "linear" + }, + "showPoints": "auto", + "spanNulls": false, + "stacking": { + "group": "A", + "mode": "none" + }, + "thresholdsStyle": { + "mode": "off" + } + }, + "mappings": [], + "thresholds": { + "mode": "absolute", + "steps": [ + { + "color": "green", + "value": null + }, + { + "color": "red", + "value": 80 + } + ] + }, + "unit": "none" + }, + "overrides": [] + }, + "gridPos": { + "h": 8, + "w": 12, + "x": 0, + "y": 49 + }, + "id": 15, + "interval": "1s", + "options": { + "legend": { + "calcs": [], + "displayMode": "list", + "placement": "bottom" + }, + "tooltip": { + "mode": "single" + } + }, + "targets": [ + { + "exemplar": true, + "expr": "emqttb_n_sequences", + "hide": false, "interval": "", - "legendFormat": "{{id}}/{{term}}", + "legendFormat": "{{group}}", "refId": "A" } ], - "title": "Autorate control", + "title": "Number of sequences", "type": "timeseries" } ], diff --git a/rebar.config b/rebar.config index e026289..6dc6b05 100644 --- a/rebar.config +++ b/rebar.config @@ -3,7 +3,7 @@ {validate_app_modules, true}. {deps, - [ {emqtt, {git, "https://github.com/emqx/emqtt", {tag, "1.8.2"}}} + [ {emqtt, {git, "https://github.com/emqx/emqtt", {tag, "1.11.0"}}} , {gproc, "0.9.1"} , {lee, {git, "https://github.com/k32/lee", {tag, "0.4.4"}}} , {snabbkaffe, {git, "https://github.com/kafka4beam/snabbkaffe", {tag, "1.0.1"}}} diff --git a/src/behaviors/emqttb_behavior_pub.erl b/src/behaviors/emqttb_behavior_pub.erl index 2707026..ce9f093 100644 --- a/src/behaviors/emqttb_behavior_pub.erl +++ b/src/behaviors/emqttb_behavior_pub.erl @@ -74,7 +74,7 @@ model(Group) -> ID :: integer(), SeqNo :: non_neg_integer(), TS :: integer(). -parse_metadata(<>) -> +parse_metadata(<>) -> {ID, SeqNo, TS}. %%================================================================================ @@ -92,7 +92,7 @@ init_per_group(Group, AddMetadata = maps:get(metadata, Conf, false), PubRate = emqttb_autorate:get_counter(emqttb_autorate:from_model(PubInterval)), MetadataSize = case AddMetadata of - true -> (32 + 32 + 64) div 8; + true -> (32 + 64 + 64) div 8; false -> 0 end, HostShift = maps:get(host_shift, Conf, 0), @@ -168,7 +168,7 @@ message_metadata() -> SeqNo = msg_seqno(), ID = erlang:phash2({node(), self()}), TS = os:system_time(microsecond), - <>. + <>. msg_seqno() -> case get(emqttb_behavior_pub_seqno) of diff --git a/src/behaviors/emqttb_behavior_sub.erl b/src/behaviors/emqttb_behavior_sub.erl index 7b6e8e4..8391b7f 100644 --- a/src/behaviors/emqttb_behavior_sub.erl +++ b/src/behaviors/emqttb_behavior_sub.erl @@ -1,5 +1,5 @@ %%-------------------------------------------------------------------- -%% Copyright (c) 2022-2023 EMQ Technologies Co., Ltd. All Rights Reserved. +%% Copyright (c) 2022-2024 EMQ Technologies Co., Ltd. All Rights Reserved. %% %% Licensed under the Apache License, Version 2.0 (the "License"); %% you may not use this file except in compliance with the License. @@ -29,18 +29,23 @@ %% Type declarations %%================================================================================ --type config() :: #{ topic := binary() - , qos := 0..2 - , metrics := lee:model_key() - , clean_start => boolean() - , expiry => non_neg_integer() | undefined - , host_shift => integer() - , host_selection => _ - , parse_metadata => boolean() +-type config() :: #{ topic := binary() + , qos := 0..2 + , metrics := lee:model_key() + , clean_start => boolean() + , expiry => non_neg_integer() | undefined + , host_shift => integer() + , host_selection => _ + , parse_metadata => boolean() + , verify_sequence => boolean() }. -type prototype() :: {?MODULE, config()}. +-type sequence() :: {_From :: binary(), _To :: binary(), _Topic :: binary()}. + +-define(seq_tab, emqttb_behavior_sub_seq_tab). + %%================================================================================ %% API %%================================================================================ @@ -58,6 +63,44 @@ model(GroupId) -> emqttb_metrics:opstat(GroupId, 'connect') , sub_latency => emqttb_metrics:opstat(GroupId, 'subscribe') + + , number_of_gaps => + {[metric], + #{ oneliner => "Number of gaps in the sequence numbers" + , metric_type => counter + , id => {emqttb_gaps_number, GroupId} + , labels => [group] + }} + , gap_size => + {[metric], + #{ oneliner => "Average size of the gap in the sequence numbers" + , metric_type => rolling_average + , id => {emqttb_gap_size, GroupId} + , labels => [group] + }} + + , number_of_repeats => + {[metric], + #{ oneliner => "Number of repeats of the sequence numbers" + , metric_type => counter + , id => {emqttb_repeats_number, GroupId} + , labels => [group] + }} + , repeat_size => + {[metric], + #{ oneliner => "Average size of the repeated sequence of seqence numbers" + , metric_type => rolling_average + , id => {emqttb_repeat_size, GroupId} + , labels => [group] + }} + , n_streams => + {[metric], + #{ oneliner => "Number of sequences" + , metric_type => gauge + , id => {emqttb_n_sequences, GroupId} + , labels => [group] + }} + , e2e_latency => {[metric], #{ oneliner => "End-to-end latency" @@ -77,17 +120,28 @@ init_per_group(_Group, , qos := _QoS , metrics := MetricsModelKey } = Opts) when is_binary(Topic) -> + ParseMetadata = maps:get(parse_metadata, Opts, false) orelse + maps:get(verify_sequence, Opts, false), Defaults = #{ expiry => 0 , clean_start => true , host_shift => 0 , host_selection => random - , parse_metadata => false + , parse_metadata => ParseMetadata + , verify_sequence => false }, + NStreams = emqttb_metrics:from_model(MetricsModelKey ++ [n_streams]), + emqttb_metrics:gauge_set(NStreams, 0), Conf = maps:merge(Defaults, Opts), + ensure_sequence_table(), Conf#{ conn_opstat => emqttb_metrics:opstat_from_model(MetricsModelKey ++ [conn_latency]) , sub_opstat => emqttb_metrics:opstat_from_model(MetricsModelKey ++ [sub_latency]) , e2e_latency => emqttb_metrics:from_model(MetricsModelKey ++ [e2e_latency]) , sub_counter => emqttb_metrics:from_model(MetricsModelKey ++ [n_received]) + , number_of_gaps => emqttb_metrics:from_model(MetricsModelKey ++ [number_of_gaps]) + , gap_size => emqttb_metrics:from_model(MetricsModelKey ++ [gap_size]) + , number_of_repeats => emqttb_metrics:from_model(MetricsModelKey ++ [number_of_repeats]) + , repeat_size => emqttb_metrics:from_model(MetricsModelKey ++ [repeat_size]) + , n_streams => NStreams }. init(SubOpts0 = #{ topic := T @@ -106,16 +160,24 @@ init(SubOpts0 = #{ topic := T emqttb_metrics:call_with_counter(SubOpstat, emqtt, subscribe, [Conn, emqttb_worker:format_topic(T), QoS]), Conn. -handle_message(#{ parse_metadata := ParseMetadata, sub_counter := SubCnt, e2e_latency := E2ELatency}, +handle_message(#{ parse_metadata := ParseMetadata, verify_sequence := VerifySequence, + sub_counter := SubCnt, e2e_latency := E2ELatency + } = Conf, Conn, - {publish, #{client_pid := Pid, payload := Payload}} + {publish, #{client_pid := Pid, payload := Payload, topic := Topic}} ) when Pid =:= Conn -> emqttb_metrics:counter_inc(SubCnt, 1), case ParseMetadata of true -> - {_Id, _SeqNo, TS} = emqttb_behavior_pub:parse_metadata(Payload), + {Id, SeqNo, TS} = emqttb_behavior_pub:parse_metadata(Payload), Dt = os:system_time(microsecond) - TS, - emqttb_metrics:rolling_average_observe(E2ELatency, Dt); + emqttb_metrics:rolling_average_observe(E2ELatency, Dt), + case VerifySequence of + true -> + verify_sequence(Conf, Id, Topic, SeqNo); + false -> + ok + end; false -> ok end, @@ -130,3 +192,35 @@ terminate(_Shared, Conn) -> %%================================================================================ %% Internal functions %%================================================================================ + +verify_sequence(#{ number_of_gaps := NGaps, gap_size := GapSize, number_of_repeats := NRepeats + , repeat_size := RepeatSize, n_streams := NStreams}, + From, Topic, SeqNo) -> + Key = {From, emqttb_worker:my_id(), Topic}, + case ets:lookup(?seq_tab, Key) of + [] -> + emqttb_metrics:counter_inc(NStreams, 1), + ok; + [{_, OldSeqNo}] when SeqNo =:= OldSeqNo + 1 -> + ok; + [{_, OldSeqNo}] when SeqNo > OldSeqNo -> + logger:warning("Gap detected: ~p ~p; ~p", [OldSeqNo, SeqNo, Key]), + emqttb_metrics:counter_inc(NGaps, 1), + emqttb_metrics:rolling_average_observe(GapSize, SeqNo - OldSeqNo - 1); + [{_, OldSeqNo}] -> + logger:info("Repeat detected: ~p ~p; ~p", [OldSeqNo, SeqNo, Key]), + emqttb_metrics:counter_inc(NRepeats, 1), + emqttb_metrics:rolling_average_observe(RepeatSize, SeqNo - OldSeqNo + 1) + end, + ets:insert(?seq_tab, {Key, SeqNo}). + +ensure_sequence_table() -> + catch ets:new(?seq_tab, + [ named_table + , set + , public + , {write_concurrency, true} + , {read_concurrency, true} + , {heir, whereis(emqttb_metrics), ?seq_tab} + ]), + ok. diff --git a/src/framework/emqttb_worker.erl b/src/framework/emqttb_worker.erl index fe078bd..b62e14f 100644 --- a/src/framework/emqttb_worker.erl +++ b/src/framework/emqttb_worker.erl @@ -1,5 +1,5 @@ %%-------------------------------------------------------------------- -%% Copyright (c) 2022-2023 EMQ Technologies Co., Ltd. All Rights Reserved. +%% Copyright (c) 2022-2024 EMQ Technologies Co., Ltd. All Rights Reserved. %% %% Licensed under the Apache License, Version 2.0 (the "License"); %% you may not use this file except in compliance with the License. @@ -164,6 +164,10 @@ connect(ConnOpstat, Properties0, CustomOptions, CustomTcpOptions, CustomSslOptio , {keepalive, KeepAlive} ], {ok, Client} = emqtt:start_link(CustomOptions ++ Options), + %% dbg:tracer(), + %% dbg:p(Client, [c, s, r]), + %% dbg:tpl({emqtt, '_', '_'}, x), + %% logger:error("Debug ~p", [CustomOptions ++ Options]), ConnectFun = connect_fun(), {ok, _Properties} = emqttb_metrics:call_with_counter(ConnOpstat, emqtt, ConnectFun, [Client]), {ok, Client}. diff --git a/src/metrics/emqttb_metrics.erl b/src/metrics/emqttb_metrics.erl index 970c50c..08f9216 100644 --- a/src/metrics/emqttb_metrics.erl +++ b/src/metrics/emqttb_metrics.erl @@ -108,7 +108,8 @@ call_with_counter({AvgTime, NPending}, Mod, Fun, Args) -> T0 = os:system_time(microsecond), try apply(Mod, Fun, Args) catch - EC:Err -> + EC:Err:Stack -> + logger:error("~p:~p~p~n~p:~p~nStack:~p", [Mod, Fun, Args, EC, Err, Stack]), EC(Err) after T = os:system_time(microsecond), diff --git a/src/scenarios/emqttb_scenario_persistent_session.erl b/src/scenarios/emqttb_scenario_persistent_session.erl index b23b479..db13bd9 100644 --- a/src/scenarios/emqttb_scenario_persistent_session.erl +++ b/src/scenarios/emqttb_scenario_persistent_session.erl @@ -1,5 +1,5 @@ %%-------------------------------------------------------------------- -%% Copyright (c) 2022-2023 EMQ Technologies Co., Ltd. All Rights Reserved. +%% Copyright (c) 2022-2024 EMQ Technologies Co., Ltd. All Rights Reserved. %% %% Licensed under the Apache License, Version 2.0 (the "License"); %% you may not use this file except in compliance with the License. @@ -142,7 +142,7 @@ model() -> , expiry => {[value, cli_param], #{ oneliner => "Session expiry interval" - , type => non_neg_integer() + , type => range(0, 16#FFFFFFFF) , default => 16#FFFFFFFF , cli_operand => "expiry" }} @@ -181,6 +181,13 @@ model() -> , default_str => "10s" , cli_operand => "max-stuck-time" }} + , verify_sequence => + {[value, cli_param], + #{ oneliner => "Run message sequence number analysis to check for gaps and unexpected repeats" + , type => boolean() + , default => false + , cli_operand => "verify-sequence" + }} }. initial_config() -> @@ -224,11 +231,12 @@ do_run(S0, N) -> consume_stage(Cycle, S) -> TopicPrefix = topic_prefix(), - SubOpts = #{ topic => <> - , qos => my_conf([sub, qos]) - , expiry => my_conf([sub, expiry]) - , clean_start => Cycle =:= 0 - , metrics => my_conf_key([sub, metrics]) + SubOpts = #{ topic => <> + , qos => my_conf([sub, qos]) + , expiry => my_conf([sub, expiry]) + , clean_start => Cycle =:= 0 + , metrics => my_conf_key([sub, metrics]) + , verify_sequence => my_conf([verify_sequence]) }, emqttb_group:ensure(#{ id => ?SUB_GROUP , client_config => my_conf([group]) diff --git a/src/scenarios/emqttb_scenario_sub.erl b/src/scenarios/emqttb_scenario_sub.erl index 9fa315b..8a7e5ca 100644 --- a/src/scenarios/emqttb_scenario_sub.erl +++ b/src/scenarios/emqttb_scenario_sub.erl @@ -1,5 +1,5 @@ %%-------------------------------------------------------------------- -%%Copyright (c) 2022-2023 EMQ Technologies Co., Ltd. All Rights Reserved. +%%Copyright (c) 2022-2024 EMQ Technologies Co., Ltd. All Rights Reserved. %% %% Licensed under the Apache License, Version 2.0 (the "License"); %% you may not use this file except in compliance with the License. @@ -98,6 +98,12 @@ model() -> , default => false , cli_operand => "parse-metadata" }} + , verify_sequence => + {[value, cli_param], + #{ type => boolean() + , default => false + , cli_operand => "verify-sequence" + }} , clean_start => {[value, cli_param], #{ type => boolean() @@ -117,6 +123,7 @@ run() -> , qos => my_conf([qos]) , expiry => my_conf([expiry]) , parse_metadata => my_conf([parse_metadata]) + , verify_sequence => my_conf([verify_sequence]) , clean_start => my_conf([clean_start]) , metrics => my_conf_key([metrics]) },