Skip to content

Commit

Permalink
Merge pull request #60 from emqx/dev/declarative-autorate
Browse files Browse the repository at this point in the history
feat: Declarative metrics and autorates
  • Loading branch information
ieQu1 authored Dec 14, 2023
2 parents a771c67 + 5a2338e commit 62a9cb1
Show file tree
Hide file tree
Showing 25 changed files with 857 additions and 455 deletions.
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ dialyzer: $(REBAR)

.PHONY: test
test: $(REBAR)
$(REBAR) do eunit, ct
$(REBAR) do compile, eunit, ct

.PHONY: release
release: compile docs
Expand Down
62 changes: 38 additions & 24 deletions doc/src/schema.adoc
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
:!sectids:
:stem:
= Documentation

[id=cluster.node_name]
Expand All @@ -20,22 +21,53 @@ This can be very expensive in man-hours and computing resources.
In order to prevent that, emqttb can tune some parameters (such as message publishing interval)
automatically using https://controlguru.com/integral-reset-windup-jacketing-logic-and-the-velocity-pi-form/[PI controller].

The following formula is used for the error function:

stem:[e=(a_{SP} - a_{PV}) a_{coeff}]

=== Autoscale

A special autorate controlling the rate of spawning new clients is implicitly created for each client group.
Its name follows the pattern `<group_id>_autoscale`.
Its name usually follows the pattern `%scenario%/conn_interval`.


By default, the number of pending (unacked) connections is used as the process variable.
Number of pending connections is a metric that responds very fast to target overload, so it makes a reasonable default.

For example the following command can automatically adjust the rate of connections:

[code,bash]
----
./emqttb @conn -I 10ms -N 10_000 \
@g --host 172.22.0.13,172.22.0.6 \
@a -a conn_group_autoscale -V 1000
./emqttb --pushgw @conn -I 10ms -N 5_000 \
@a -a conn/conninterval -V 1000 --setpoint 10
----

Note: autoscale can be applied to any group used by any scenario.
Additionally, autoscale is used for overload protection, see section about <<value.groups._.scram.threshold,SCRAM>>.

[id=autorate._.id]
== ID of the autorate configuration

Autorate configuration can be referred by id.
This value must be equal to one of the elements returned by `emqttb @ls autorate` command.


[id=autorate._.speed]
== Maximum rate of change of the controlled parameter

Note: by default this parameter is set to 0 for each autorate, effectively locking the control parameter in place.


[id=autorate._.process_variable]
== Process variable

This parameter specifies ID of the metric that senses pressure on the SUT and serves as the process variable (PV).
Its value must be equal to one of the metric IDs returned by `emqttb @ls metric` command.

[id=autorate._.setpoint]
== Setpoint

The desired value of the process variable (PV) is called the setpoint.
Autorate adjusts the value of the control variable (CV) to bring the PV close to the setpoint.


[id=interval]
== Default interval between events
Expand All @@ -50,11 +82,6 @@ Supported units:

If unit is not specified then `ms` is assumed.

[id=autorate._.id]
== ID of the autorate configuration

Autorate configuration can be referred by id.

[id=scenarios.sub]
== Run scenario sub

Expand Down Expand Up @@ -277,14 +304,6 @@ The following substitutions are supported:

How often the clients will send `PING` MQTT message to the broker on idle connections.

[id=groups._.target_conn_pending]
== Target number of unacked connections

In order to optimize the connection rate autoscale relies on the number of unacked (pending) connections.
This parameter configures the value that emqttb autoscale will try to approach.

Number of pending connections is a metric that responds very fast to target overload, so we use it.


[id=groups._.scram.threshold]
== Maximum unacked CONNECT packets
Expand All @@ -310,8 +329,3 @@ It's not desirable to switch between normal and SCRAM connection rate too often.
== How often autorate is updated

This parameter governs how often error is calculated and control parameter is updated.

[id=autorate._.speed]
== Maximum rate of change of the controlled parameter

Note: this parameter can be set to 0 to effectively disable autorate and lock control parameter in place.
40 changes: 29 additions & 11 deletions src/behaviors/emqttb_behavior_conn.erl
Original file line number Diff line number Diff line change
Expand Up @@ -17,34 +17,52 @@

-behavior(emqttb_worker).

%% API:
-export([model/1]).

%% behavior callbacks:
-export([init_per_group/2, init/1, handle_message/3, terminate/2]).

-export_type([]).
-export_type([prototype/0, config/0]).

%%================================================================================
%% Type declarations
%%================================================================================

-type config() :: #{ expiry => non_neg_integer()
, clean_start => boolean()
, metrics := lee:model_key()
}.

-type prototype() :: {?MODULE, config()}.

%%================================================================================
%% behavior callbacks
%% API
%%================================================================================

init_per_group(_Group, Opts) ->
Expiry = maps:get(expiry, Opts, 0),
CleanStart = maps:get(clean_start, Opts, true),
HostShift = maps:get(host_shift, Opts, 0),
HostSelection = maps:get(host_selection, Opts, random),
#{ expiry => Expiry
, clean_start => CleanStart
model(GroupId) ->
#{ conn_latency =>
emqttb_metrics:opstat(GroupId, connect)
}.

init(Opts0 = #{clean_start := CleanStart, expiry := Expiry}) ->
%%================================================================================
%% behavior callbacks
%%================================================================================

init_per_group(_Group, Opts = #{metrics := MetricsKey}) ->
Defaults = #{ expiry => 0
, clean_start => true
},
Config = maps:merge(Defaults, Opts),
Config#{ conn_opstat => emqttb_metrics:opstat_from_model(MetricsKey ++ [conn_latency])
}.

init(#{clean_start := CleanStart, expiry := Expiry, conn_opstat := ConnOpstat}) ->
Props = case Expiry of
undefined -> #{};
_ -> #{'Session-Expiry-Interval' => Expiry}
end,
{ok, Conn} = emqttb_worker:connect(Props, [{clean_start, CleanStart}], [], []),
{ok, Conn} = emqttb_worker:connect(ConnOpstat, Props, [{clean_start, CleanStart}], [], []),
Conn.

handle_message(_, Conn, _) ->
Expand Down
87 changes: 42 additions & 45 deletions src/behaviors/emqttb_behavior_pub.erl
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,12 @@
-behavior(emqttb_worker).

%% API
-export([parse_metadata/1, my_autorate/1]).
-export([parse_metadata/1, model/1]).

%% behavior callbacks:
-export([init_per_group/2, init/1, handle_message/3, terminate/2]).

-export_type([]).
-export_type([prototype/0, config/0]).

-import(emqttb_worker, [send_after/2, send_after_rand/2, repeat/2,
my_group/0, my_id/0, my_clientid/0, my_cfg/1, connect/2]).
Expand All @@ -34,19 +34,38 @@
%% Type declarations
%%================================================================================

-type config() :: #{ topic := binary()
, pubinterval := counters:counters_ref()
, qos := emqttb:qos()
, retain := boolean()
, set_latency := lee:key()
, msg_size := non_neg_integer()
, metadata => boolean()
-type config() :: #{ topic := binary()
, pubinterval := lee:model_key()
, msg_size := non_neg_integer()
, metrics := lee:model_key()
, qos := emqttb:qos()
, retain => boolean()
, metadata => boolean()
, host_shift => integer()
, host_selection => random | round_robin
}.

-type prototype() :: {?MODULE, config()}.

%%================================================================================
%% API
%%================================================================================

-spec model(atom()) -> lee:namespace().
model(Group) ->
#{ conn_latency =>
emqttb_metrics:opstat(Group, connect)
, pub_latency =>
emqttb_metrics:opstat(Group, publish)
, n_published =>
{[metric],
#{ oneliner => "Total number of messages published by the group"
, id => {emqttb_published_messages, Group}
, labels => [group]
, metric_type => counter
}}
}.

-spec parse_metadata(Msg) -> {ID, SeqNo, TS}
when Msg :: binary(),
ID :: integer(),
Expand All @@ -62,55 +81,48 @@ parse_metadata(<<ID:32, SeqNo:32, TS:64, _/binary>>) ->
init_per_group(Group,
#{ topic := Topic
, pubinterval := PubInterval
, pub_autorate := AutorateConf
, msg_size := MsgSize
, qos := QoS
, retain := Retain
, set_latency := SetLatencyKey
, metrics := MetricsKey
} = Conf) when is_binary(Topic),
is_integer(MsgSize),
is_list(SetLatencyKey) ->
is_integer(MsgSize) ->
AddMetadata = maps:get(metadata, Conf, false),
PubCnt = emqttb_metrics:new_counter(?CNT_PUB_MESSAGES(Group),
[ {help, <<"Number of published messages">>}
, {labels, [group]}
]),
emqttb_worker:new_opstat(Group, ?AVG_PUB_TIME),
{auto, PubRate} = emqttb_autorate:ensure(#{ id => my_autorate(Group)
, error => fun() -> error_fun(SetLatencyKey, Group) end
, init_val => PubInterval
, conf_root => AutorateConf
}),
PubRate = emqttb_autorate:get_counter(emqttb_autorate:from_model(PubInterval)),
MetadataSize = case AddMetadata of
true -> (32 + 32 + 64) div 8;
false -> 0
end,
HostShift = maps:get(host_shift, Conf, 0),
HostSelection = maps:get(host_selection, Conf, random),
Retain = maps:get(retain, Conf, false),
#{ topic => Topic
, message => message(max(0, MsgSize - MetadataSize))
, pub_opts => [{qos, QoS}, {retain, Retain}]
, pub_counter => PubCnt
, pubinterval => PubRate
, metadata => AddMetadata
, host_shift => HostShift
, host_selection => HostSelection
, pub_opstat => emqttb_metrics:opstat_from_model(MetricsKey ++ [pub_latency])
, conn_opstat => emqttb_metrics:opstat_from_model(MetricsKey ++ [conn_latency])
, pub_counter => emqttb_metrics:from_model(MetricsKey ++ [n_published])
}.

init(PubOpts = #{pubinterval := I}) ->
init(PubOpts = #{pubinterval := I, conn_opstat := ConnOpstat}) ->
rand:seed(default),
{SleepTime, N} = emqttb:get_duration_and_repeats(I),
send_after_rand(SleepTime, {publish, N}),
HostShift = maps:get(host_shift, PubOpts, 0),
HostSelection = maps:get(host_selection, PubOpts, random),
{ok, Conn} = emqttb_worker:connect(#{ host_shift => HostShift
{ok, Conn} = emqttb_worker:connect(ConnOpstat,
#{ host_shift => HostShift
, host_selection => HostSelection
}),
Conn.

handle_message(Shared, Conn, {publish, N1}) ->
#{ topic := TP, pubinterval := I, message := Msg0, pub_opts := PubOpts
, pub_counter := Cnt
, pub_counter := PubCounter
, pub_opstat := PubOpstat
, metadata := AddMetadata
} = Shared,
{SleepTime, N2} = emqttb:get_duration_and_repeats(I),
Expand All @@ -121,8 +133,8 @@ handle_message(Shared, Conn, {publish, N1}) ->
end,
T = emqttb_worker:format_topic(TP),
repeat(N1, fun() ->
emqttb_worker:call_with_counter(?AVG_PUB_TIME, emqtt, publish, [Conn, T, Msg, PubOpts]),
emqttb_metrics:counter_inc(Cnt, 1)
emqttb_metrics:call_with_counter(PubOpstat, emqtt, publish, [Conn, T, Msg, PubOpts]),
emqttb_metrics:counter_inc(PubCounter, 1)
end),
{ok, Conn};
handle_message(_, Conn, _) ->
Expand All @@ -135,21 +147,6 @@ terminate(_Shared, Conn) ->
%% Internal functions
%%================================================================================

error_fun(SetLatencyKey, Group) ->
SetLatency = ?CFG(SetLatencyKey),
AvgWindow = 250,
%% Note that dependency of latency on publish interval is inverse:
%% lesser interval -> messages are sent more often -> more load -> more latency
%%
%% So the control must be reversed, and error is the negative of what one usually
%% expects:
%% Current - Target instead of Target - Current.
(emqttb_metrics:get_rolling_average(?GROUP_OP_TIME(Group, ?AVG_PUB_TIME), AvgWindow) -
erlang:convert_time_unit(SetLatency, millisecond, microsecond)).

my_autorate(Group) ->
list_to_atom(atom_to_list(Group) ++ ".pub.rate").

message(Size) ->
list_to_binary([$A || _ <- lists:seq(1, Size)]).

Expand Down
Loading

0 comments on commit 62a9cb1

Please sign in to comment.