diff --git a/src/framework/emqttb_group.erl b/src/framework/emqttb_group.erl index 59e1adc..27c23a3 100644 --- a/src/framework/emqttb_group.erl +++ b/src/framework/emqttb_group.erl @@ -18,7 +18,7 @@ -behavior(gen_server). %% API: --export([ensure/1, stop/1, set_target/3, set_target_async/3, broadcast/2]). +-export([ensure/1, stop/1, set_target/3, set_target_async/3, broadcast/2, report_dead_id/2]). %% gen_server callbacks: -export([init/1, handle_call/3, handle_cast/2, terminate/2, handle_info/2]). @@ -94,6 +94,11 @@ broadcast(Group, Message) -> [], Group). +%% @doc Add an expired ID to the pool +-spec report_dead_id(emqttb:group(), integer()) -> true. +report_dead_id(Group, Id) -> + ets:insert(dead_id_pool(Group), {Id, []}). + %%================================================================================ %% behavior callbacks %%================================================================================ @@ -129,6 +134,7 @@ init([Conf]) -> #{ id => ID , group_conf => ConfID }), + ets:new(dead_id_pool(ID), [ordered_set, public, named_table]), StartN = maps:get(start_n, Conf, 0), persistent_term:put(?GROUP_LEADER_TO_GROUP_ID(self()), ID), persistent_term:put(?GROUP_BEHAVIOR(self()), Behavior), @@ -272,10 +278,18 @@ do_scale(S0) -> scale_up(0, S) -> S; -scale_up(NRepeats, S = #s{behavior = Behavior, id = _Id, next_id = WorkerId}) -> +scale_up(NRepeats, S = #s{behavior = Behavior, id = Group}) -> + case ets:first(dead_id_pool(Group)) of + '$end_of_table' -> + WorkerId = S#s.next_id, + NextId = WorkerId + 1; + WorkerId when is_integer(WorkerId) -> + ets:delete(dead_id_pool(Group), WorkerId), + NextId = S#s.next_id + end, _Pid = emqttb_worker:start(Behavior, self(), WorkerId), - ?tp(start_worker, #{group => _Id, pid => _Pid}), - scale_up(NRepeats - 1, S#s{next_id = WorkerId + 1}). + ?tp(start_worker, #{group => Group, pid => _Pid}), + scale_up(NRepeats - 1, S#s{next_id = NextId}). scale_down(_N, S0) -> S0. @@ -388,3 +402,6 @@ autoscale_error(Group) -> my_autorate(Group) -> list_to_atom(atom_to_list(Group) ++ "_autoscale"). + +dead_id_pool(Group) -> + Group. diff --git a/src/framework/emqttb_worker.erl b/src/framework/emqttb_worker.erl index fa09564..c722673 100644 --- a/src/framework/emqttb_worker.erl +++ b/src/framework/emqttb_worker.erl @@ -445,6 +445,7 @@ terminate(State, Reason) -> -spec terminate(_Reason) -> no_return(). terminate(Reason) -> + emqttb_group:report_dead_id(my_group(), my_id()), emqttb_metrics:counter_dec(?GROUP_N_WORKERS(my_group()), 1), ?tp(emqttb_worker_terminate, #{gl => group_leader(), number => my_id()}), exit(Reason).