Skip to content

Commit

Permalink
Merge pull request #62 from emqx/dev/verify-sequence
Browse files Browse the repository at this point in the history
Gap and repeat analysis
  • Loading branch information
ieQu1 authored May 13, 2024
2 parents aed7d1d + e1f3981 commit 0bebf12
Show file tree
Hide file tree
Showing 9 changed files with 436 additions and 34 deletions.
19 changes: 19 additions & 0 deletions doc/src/schema.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,25 @@ WARNING: In order to measure latency accurately, the scenario should ensure that
Otherwise clock skew between different load generator instances will introduce a systematic error.


[id=scenarios.sub._.verify_sequence]
== Verify sequence of messages

When this option is enabled, emqttb will parse the metadata embedded in the messages and check for missing or duplicated messages.
This option implies <<value.scenarios.sub._.parse_metadata>>.

Errors about missing messages and warnings about duplicate messages are printed to the `emqttb.log`.
Relevant prometheus metrics include:

- `emqttb_repeats_number` -- number of times when the sequence number of the message goes backwards
- `emqttb_gaps_number` -- number of times when the sequence number of the message skips the messages (a gap)
- `emqttb_repeat_size` -- rolling average; size of the repeated sequence
- `emqttb_gap_size` -- rolling average; size of the gap


WARNING: Publishers should insert metadata into the payloads in order for this feature to work.

WARNING: This feature can use a lot of RAM to store the sequence numbers for each triple of sender client id, receiver client id, and MQTT topic.

=== Client groups

- `sub`
Expand Down
281 changes: 275 additions & 6 deletions emqttb-dashboard.json
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@
"fiscalYearStartMonth": 0,
"gnetId": null,
"graphTooltip": 0,
"id": 7,
"iteration": 1702000316244,
"id": 1,
"iteration": 1705260864053,
"links": [],
"liveNow": false,
"panels": [
Expand Down Expand Up @@ -870,7 +870,7 @@
}
]
},
"unit": "none"
"unit": "µs"
},
"overrides": []
},
Expand All @@ -880,6 +880,90 @@
"x": 12,
"y": 33
},
"id": 13,
"interval": "1s",
"options": {
"legend": {
"calcs": [],
"displayMode": "list",
"placement": "bottom"
},
"tooltip": {
"mode": "single"
}
},
"targets": [
{
"exemplar": true,
"expr": "emqttb_group_op_time",
"interval": "",
"legendFormat": "{{group}}/{{operation}}",
"refId": "A"
}
],
"title": "Average latency",
"type": "timeseries"
},
{
"datasource": "${Prometheus}",
"description": "",
"fieldConfig": {
"defaults": {
"color": {
"mode": "palette-classic"
},
"custom": {
"axisLabel": "",
"axisPlacement": "auto",
"barAlignment": 0,
"drawStyle": "line",
"fillOpacity": 0,
"gradientMode": "none",
"hideFrom": {
"legend": false,
"tooltip": false,
"viz": false
},
"lineInterpolation": "linear",
"lineWidth": 1,
"pointSize": 5,
"scaleDistribution": {
"type": "linear"
},
"showPoints": "auto",
"spanNulls": false,
"stacking": {
"group": "A",
"mode": "none"
},
"thresholdsStyle": {
"mode": "off"
}
},
"mappings": [],
"thresholds": {
"mode": "absolute",
"steps": [
{
"color": "green",
"value": null
},
{
"color": "red",
"value": 80
}
]
},
"unit": "none"
},
"overrides": []
},
"gridPos": {
"h": 8,
"w": 12,
"x": 0,
"y": 41
},
"id": 10,
"interval": "1s",
"options": {
Expand All @@ -895,13 +979,198 @@
"targets": [
{
"exemplar": true,
"expr": "emqttb_autorate_control",
"expr": "emqttb_gaps_number",
"interval": "",
"legendFormat": "Gap: {{group}}",
"refId": "A"
},
{
"exemplar": true,
"expr": "emqttb_repeats_number",
"hide": false,
"interval": "",
"legendFormat": "Repeat: {{group}}",
"refId": "B"
}
],
"title": "Number of gaps and repeats",
"type": "timeseries"
},
{
"datasource": "${Prometheus}",
"description": "",
"fieldConfig": {
"defaults": {
"color": {
"mode": "palette-classic"
},
"custom": {
"axisLabel": "",
"axisPlacement": "auto",
"barAlignment": 0,
"drawStyle": "line",
"fillOpacity": 0,
"gradientMode": "none",
"hideFrom": {
"legend": false,
"tooltip": false,
"viz": false
},
"lineInterpolation": "linear",
"lineWidth": 1,
"pointSize": 5,
"scaleDistribution": {
"type": "linear"
},
"showPoints": "auto",
"spanNulls": false,
"stacking": {
"group": "A",
"mode": "none"
},
"thresholdsStyle": {
"mode": "off"
}
},
"mappings": [],
"thresholds": {
"mode": "absolute",
"steps": [
{
"color": "green",
"value": null
},
{
"color": "red",
"value": 80
}
]
},
"unit": "none"
},
"overrides": []
},
"gridPos": {
"h": 8,
"w": 12,
"x": 12,
"y": 41
},
"id": 14,
"interval": "1s",
"options": {
"legend": {
"calcs": [],
"displayMode": "list",
"placement": "bottom"
},
"tooltip": {
"mode": "single"
}
},
"targets": [
{
"exemplar": true,
"expr": "emqttb_gap_size",
"interval": "",
"legendFormat": "Gap: {{group}}",
"refId": "A"
},
{
"exemplar": true,
"expr": "emqttb_repeat_size",
"hide": false,
"interval": "",
"legendFormat": "Repeat: {{group}}",
"refId": "B"
}
],
"title": "Size of gaps and repeats",
"type": "timeseries"
},
{
"datasource": "${Prometheus}",
"description": "Count the number of triples sender client id, receiver client id, and topic. \nverify-sequence must be enabled.",
"fieldConfig": {
"defaults": {
"color": {
"mode": "palette-classic"
},
"custom": {
"axisLabel": "",
"axisPlacement": "auto",
"barAlignment": 0,
"drawStyle": "line",
"fillOpacity": 0,
"gradientMode": "none",
"hideFrom": {
"legend": false,
"tooltip": false,
"viz": false
},
"lineInterpolation": "linear",
"lineWidth": 1,
"pointSize": 5,
"scaleDistribution": {
"type": "linear"
},
"showPoints": "auto",
"spanNulls": false,
"stacking": {
"group": "A",
"mode": "none"
},
"thresholdsStyle": {
"mode": "off"
}
},
"mappings": [],
"thresholds": {
"mode": "absolute",
"steps": [
{
"color": "green",
"value": null
},
{
"color": "red",
"value": 80
}
]
},
"unit": "none"
},
"overrides": []
},
"gridPos": {
"h": 8,
"w": 12,
"x": 0,
"y": 49
},
"id": 15,
"interval": "1s",
"options": {
"legend": {
"calcs": [],
"displayMode": "list",
"placement": "bottom"
},
"tooltip": {
"mode": "single"
}
},
"targets": [
{
"exemplar": true,
"expr": "emqttb_n_sequences",
"hide": false,
"interval": "",
"legendFormat": "{{id}}/{{term}}",
"legendFormat": "{{group}}",
"refId": "A"
}
],
"title": "Autorate control",
"title": "Number of sequences",
"type": "timeseries"
}
],
Expand Down
2 changes: 1 addition & 1 deletion rebar.config
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
{validate_app_modules, true}.

{deps,
[ {emqtt, {git, "https://github.com/emqx/emqtt", {tag, "1.8.2"}}}
[ {emqtt, {git, "https://github.com/emqx/emqtt", {tag, "1.11.0"}}}
, {gproc, "0.9.1"}
, {lee, {git, "https://github.com/k32/lee", {tag, "0.4.4"}}}
, {snabbkaffe, {git, "https://github.com/kafka4beam/snabbkaffe", {tag, "1.0.1"}}}
Expand Down
6 changes: 3 additions & 3 deletions src/behaviors/emqttb_behavior_pub.erl
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ model(Group) ->
ID :: integer(),
SeqNo :: non_neg_integer(),
TS :: integer().
parse_metadata(<<ID:32, SeqNo:32, TS:64, _/binary>>) ->
parse_metadata(<<ID:32, SeqNo:64, TS:64, _/binary>>) ->
{ID, SeqNo, TS}.

%%================================================================================
Expand All @@ -92,7 +92,7 @@ init_per_group(Group,
AddMetadata = maps:get(metadata, Conf, false),
PubRate = emqttb_autorate:get_counter(emqttb_autorate:from_model(PubInterval)),
MetadataSize = case AddMetadata of
true -> (32 + 32 + 64) div 8;
true -> (32 + 64 + 64) div 8;
false -> 0
end,
HostShift = maps:get(host_shift, Conf, 0),
Expand Down Expand Up @@ -168,7 +168,7 @@ message_metadata() ->
SeqNo = msg_seqno(),
ID = erlang:phash2({node(), self()}),
TS = os:system_time(microsecond),
<<ID:32, SeqNo:32, TS:64>>.
<<ID:32, SeqNo:64, TS:64>>.

msg_seqno() ->
case get(emqttb_behavior_pub_seqno) of
Expand Down
Loading

0 comments on commit 0bebf12

Please sign in to comment.