Skip to content

Commit

Permalink
fix: Reuse client IDs when restarting
Browse files Browse the repository at this point in the history
  • Loading branch information
ieQu1 committed Jun 28, 2023
1 parent 326ae36 commit 72464f6
Show file tree
Hide file tree
Showing 2 changed files with 22 additions and 4 deletions.
25 changes: 21 additions & 4 deletions src/framework/emqttb_group.erl
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
-behavior(gen_server).

%% API:
-export([ensure/1, stop/1, set_target/3, set_target_async/3, broadcast/2]).
-export([ensure/1, stop/1, set_target/3, set_target_async/3, broadcast/2, report_dead_id/2]).

%% gen_server callbacks:
-export([init/1, handle_call/3, handle_cast/2, terminate/2, handle_info/2]).
Expand Down Expand Up @@ -94,6 +94,11 @@ broadcast(Group, Message) ->
[],
Group).

%% @doc Add an expired ID to the pool
-spec report_dead_id(emqttb:group(), integer()) -> true.
report_dead_id(Group, Id) ->
ets:insert(dead_id_pool(Group), {Id, []}).

%%================================================================================
%% behavior callbacks
%%================================================================================
Expand Down Expand Up @@ -129,6 +134,7 @@ init([Conf]) ->
#{ id => ID
, group_conf => ConfID
}),
ets:new(dead_id_pool(ID), [ordered_set, public, named_table]),
StartN = maps:get(start_n, Conf, 0),
persistent_term:put(?GROUP_LEADER_TO_GROUP_ID(self()), ID),
persistent_term:put(?GROUP_BEHAVIOR(self()), Behavior),
Expand Down Expand Up @@ -272,10 +278,18 @@ do_scale(S0) ->

scale_up(0, S) ->
S;
scale_up(NRepeats, S = #s{behavior = Behavior, id = _Id, next_id = WorkerId}) ->
scale_up(NRepeats, S = #s{behavior = Behavior, id = Group}) ->
case ets:first(dead_id_pool(Group)) of
'$end_of_table' ->
WorkerId = S#s.next_id,
NextId = WorkerId + 1;
WorkerId when is_integer(WorkerId) ->
ets:delete(dead_id_pool(Group), WorkerId),
NextId = S#s.next_id
end,
_Pid = emqttb_worker:start(Behavior, self(), WorkerId),
?tp(start_worker, #{group => _Id, pid => _Pid}),
scale_up(NRepeats - 1, S#s{next_id = WorkerId + 1}).
?tp(start_worker, #{group => Group, pid => _Pid}),
scale_up(NRepeats - 1, S#s{next_id = NextId}).

scale_down(_N, S0) ->
S0.
Expand Down Expand Up @@ -388,3 +402,6 @@ autoscale_error(Group) ->

my_autorate(Group) ->
list_to_atom(atom_to_list(Group) ++ "_autoscale").

dead_id_pool(Group) ->
Group.
1 change: 1 addition & 0 deletions src/framework/emqttb_worker.erl
Original file line number Diff line number Diff line change
Expand Up @@ -445,6 +445,7 @@ terminate(State, Reason) ->

-spec terminate(_Reason) -> no_return().
terminate(Reason) ->
emqttb_group:report_dead_id(my_group(), my_id()),
emqttb_metrics:counter_dec(?GROUP_N_WORKERS(my_group()), 1),
?tp(emqttb_worker_terminate, #{gl => group_leader(), number => my_id()}),
exit(Reason).
Expand Down

0 comments on commit 72464f6

Please sign in to comment.