diff --git a/Makefile b/Makefile index 9085125..e487abc 100755 --- a/Makefile +++ b/Makefile @@ -21,7 +21,7 @@ ## DEALINGS IN THE SOFTWARE. REBAR=$(shell which rebar || echo ./rebar) -.PHONY: all compile clean eunit test eqc doc check dialyzer +.PHONY: all compile clean eunit test eqc doc check dialyzer di DIRS=src @@ -46,5 +46,7 @@ test: eunit doc: $(REBAR) get-deps compile doc -dialyzer: +dialyzer: compile $(REBAR) skip_deps=true dialyze + +di: dialyzer \ No newline at end of file diff --git a/rebar.config b/rebar.config index ad19008..a6fe77d 100644 --- a/rebar.config +++ b/rebar.config @@ -6,11 +6,13 @@ {git, "https://github.com/garret-smith/gen_leader_revival.git", "HEAD"}} ]}. {dialyzer_opts, [{warnings, [no_unused, + no_undefined_callbacks, no_improper_lists, no_fun_app, no_match, no_opaque, no_fail_call, error_handling, no_match, unmatched_returns, - behaviours, underspecs]}]}. + behaviours, + underspecs]}]}. {edoc_opts, [{doclet, edown_doclet}, {app_default, "http://www.erlang.org/doc/man"}, {top_level_readme, diff --git a/src/gproc.erl b/src/gproc.erl index 5325176..9c2052d 100644 --- a/src/gproc.erl +++ b/src/gproc.erl @@ -206,7 +206,7 @@ start_link() -> %% @end %% add_local_name(Name) -> - ?CATCH_GPROC_ERROR(reg1({n,l,Name}, undefined, [], reg), [Name]). + ?CATCH_GPROC_ERROR(reg1({n,l,Name}, reg), [Name]). %% spec(Name::any()) -> true @@ -215,7 +215,7 @@ add_local_name(Name) -> %% @end %% add_global_name(Name) -> - ?CATCH_GPROC_ERROR(reg1({n,g,Name}, undefined, [], reg), [Name]). + ?CATCH_GPROC_ERROR(reg1({n,g,Name}, reg), [Name]). %% spec(Name::any(), Value::any()) -> true @@ -223,8 +223,8 @@ add_global_name(Name) -> %% @doc Registers a local (non-unique) property. @equiv reg({p,l,Name},Value) %% @end %% -add_local_property(Name , Value) -> - ?CATCH_GPROC_ERROR(reg1({p,l,Name}, Value, [], reg), [Name, Value]). +add_local_property(Name, Value) -> + ?CATCH_GPROC_ERROR(reg1({p,l,Name}, Value, reg), [Name, Value]). %% spec(Name::any(), Value::any()) -> true %% @@ -232,7 +232,7 @@ add_local_property(Name , Value) -> %% @end %% add_global_property(Name, Value) -> - ?CATCH_GPROC_ERROR(reg1({p,g,Name}, Value, [], reg), [Name, Value]). + ?CATCH_GPROC_ERROR(reg1({p,g,Name}, Value, reg), [Name, Value]). %% spec(Name::any(), Initial::integer()) -> true %% @@ -240,7 +240,7 @@ add_global_property(Name, Value) -> %% @end %% add_local_counter(Name, Initial) when is_integer(Initial) -> - ?CATCH_GPROC_ERROR(reg1({c,l,Name}, Initial, [], reg), [Name, Initial]). + ?CATCH_GPROC_ERROR(reg1({c,l,Name}, Initial, reg), [Name, Initial]). %% spec(Name::any(), Initial::integer()) -> true @@ -259,7 +259,7 @@ add_shared_local_counter(Name, Initial) when is_integer(Initial) -> %% @end %% add_global_counter(Name, Initial) when is_integer(Initial) -> - ?CATCH_GPROC_ERROR(reg1({c,g,Name}, Initial, [], reg), [Name, Initial]). + ?CATCH_GPROC_ERROR(reg1({c,g,Name}, Initial, reg), [Name, Initial]). %% spec(Name::any()) -> true %% @@ -267,7 +267,8 @@ add_global_counter(Name, Initial) when is_integer(Initial) -> %% @equiv reg({a,l,Name}) %% @end %% -add_local_aggr_counter(Name) -> ?CATCH_GPROC_ERROR(reg1({a,l,Name}), [Name]). +add_local_aggr_counter(Name) -> + ?CATCH_GPROC_ERROR(reg1({a,l,Name}, reg), [Name]). %% spec(Name::any()) -> true %% @@ -276,7 +277,7 @@ add_local_aggr_counter(Name) -> ?CATCH_GPROC_ERROR(reg1({a,l,Name}), [Name]). %% @end %% add_global_aggr_counter(Name) -> - ?CATCH_GPROC_ERROR(reg1({a,g,Name}), [Name]). + ?CATCH_GPROC_ERROR(reg1({a,g,Name}, reg), [Name]). %% @spec (Name::any()) -> pid() @@ -541,7 +542,7 @@ os_env_key(Key) -> string:to_upper(atom_to_list(Key)). lookup_env(Scope, App, Key, P) -> - case ets:lookup(?TAB, {{p, Scope, {gproc_env, App, Key}}, P}) of + case ets_lookup({{p, Scope, {gproc_env, App, Key}}, P}) of [] -> undefined; [{_, _, Value}] -> @@ -550,7 +551,7 @@ lookup_env(Scope, App, Key, P) -> cache_env(Scope, App, Key, Value) -> ?CATCH_GPROC_ERROR( - reg1({p, Scope, {gproc_env, App, Key}}, Value, [], reg), + reg1({p, Scope, {gproc_env, App, Key}}, Value, reg), [Scope,App,Key,Value]). update_cached_env(Scope, App, Key, Value) -> @@ -619,10 +620,7 @@ is_string(S) -> %% @equiv reg(Key, default(Key), []) %% @end reg(Key) -> - ?CATCH_GPROC_ERROR(reg1(Key), [Key]). - -reg1(Key) -> - reg1(Key, default(Key), [], reg). + ?CATCH_GPROC_ERROR(reg1(Key, reg), [Key]). %% @spec reg_or_locate(Key::key()) -> {pid(), NewValue} %% @@ -633,10 +631,20 @@ reg_or_locate(Key) -> ?CATCH_GPROC_ERROR(reg_or_locate1(Key), [Key]). reg_or_locate1(Key) -> - reg_or_locate1(Key, default(Key), self()). + Type = reg_type(Key), + reg_or_locate1(Key, Type, default(Type), self()). + +default({c,_,_}) -> + 0; +default(#{default := Default}) -> + Default; +default(#{type := c}) -> + 0; +default({T,_,N}) -> + default(gproc_lib:reg_type(T, N)); +default(_) -> + undefined. -default({T,_,_}) when T==c -> 0; -default(_) -> undefined. %% @spec await(Key::key()) -> {pid(),Value} %% @equiv await(Key,infinity) @@ -678,7 +686,7 @@ await1({T,g,_} = Key, Timeout) when T=:=n; T=:=a; T=:=rc -> ?CHK_DIST, request_wait(Key, Timeout); await1({T,l,_} = Key, Timeout) when T=:=n; T=:=a; T=:=rc -> - case ets:lookup(?TAB, {Key, T}) of + case ets_lookup({Key, T}) of [{_, Pid, Value}] -> case is_process_alive(Pid) of true -> @@ -782,7 +790,7 @@ wide_request_wait(Nodes, {Tk,l,_} = Key, Timeout) when Tk=:=n; Tk=:=a -> Refs = lists:map( fun(Node) -> S = {?MODULE, Node}, - Ref = erlang:monitor(process, S), + Ref = monitor_process(S), ?MAY_FAIL(erlang:send(S, {'$gen_call', {self(), Ref}, Req}, [noconnect])), {Node, Ref} @@ -959,7 +967,7 @@ demonitor1(_, _) -> %% %% reg(Key, Value) -> - ?CATCH_GPROC_ERROR(reg1(Key, Value, [], reg), [Key, Value]). + ?CATCH_GPROC_ERROR(reg1(Key, Value, reg), [Key, Value]). %% @spec reg(Key::key(), Value::value(), Attrs::attrs()) -> true %% @@ -1026,30 +1034,41 @@ ensure_reg(Key, Value, Attrs) -> ?CATCH_GPROC_ERROR(reg1(Key, Value, Attrs, ensure), [Key, Value, Attrs]). reg1(Key, Op) -> - reg1(Key, default(Key), [], Op). + Type = reg_type(Key), + reg1(Key, Type, default(Type), [], Op). reg1(Key, Value, Op) -> - reg1(Key, Value, [], Op). + reg1(Key, reg_type(Key), Value, [], Op). + +reg1(Key, Value, As, Op) -> + reg1(Key, reg_type(Key), Value, As, Op). -reg1({T,g,_} = Key, Value, As, Op) when T==p; T==a; T==c; T==n; T==r; T==rc -> +reg1({_,g,_} = Key, #{type := T} = R, Value, As, Op) + when T=:=p; T=:=a; T=:=c; T=:=n; T=:=r; T=:=rc -> %% anything global ?CHK_DIST, - gproc_dist:reg(Key, Value, As, Op); -reg1({p,l,_} = Key, Value, As, Op) -> - local_reg(Key, Value, As, Op); -reg1({a,l,_} = Key, undefined, As, Op) -> - call({reg, Key, undefined, As, Op}); -reg1({c,l,_} = Key, Value, As, Op) when is_integer(Value) -> - call({reg, Key, Value, As, Op}); -reg1({n,l,_} = Key, Value, As, Op) -> - call({reg, Key, Value, As, Op}); -reg1({r,l,_} = Key, Value, As, Op) -> - call({reg, Key, Value, As, Op}); -reg1({rc,l,_} = Key, Value, As, Op) -> - call({reg, Key, Value, As, Op}); -reg1(_, _, _, _) -> + gproc_dist:reg(Key, R, Value, As, Op); +reg1({a,l,_} = Key, R, undefined, As, Op) -> + call({reg, Key, R, undefined, As, Op}); +reg1({c,l,_} = Key, R, Value, As, Op) when is_integer(Value) -> + call({reg, Key, R, Value, As, Op}); +reg1({_,l,_} = Key, #{type := T} = R, Value, As, Op) + when T=:=p; T=:=n; T=:=c; T=:=a; T=:=r; T=:=rc -> + case reg_locally(T) of + true -> + local_reg(Key, R, Value, As, Op); + false -> + call({reg, Key, R, Value, As, Op}) + end; +reg1(_, _, _, _, _) -> ?THROW_GPROC_ERROR(badarg). +reg_type({T,_,N}) -> + gproc_lib:reg_type(T, N). + +reg_locally(p) -> true; +reg_locally(_) -> false. + %% @equiv reg_other(Key, Pid, default(Key), []) reg_other(Key, Pid) -> ?CATCH_GPROC_ERROR(reg_other1(Key, Pid, reg), [Key, Pid]). @@ -1101,14 +1120,19 @@ ensure_reg_other(Key, Pid, Value, Attrs) -> [Key, Pid, Value, Attrs]). reg_other1(Key, Pid, Op) -> - reg_other1(Key, Pid, default(Key), [], Op). + R = reg_type(Key), + reg_other1(Key, R, Pid, default(R), [], Op). + +reg_other1({_,_,_} = Key, Pid, Value, As, Op) -> + reg_other1(Key, reg_type(Key), Pid, Value, As, Op). -reg_other1({_,g,_} = Key, Pid, Value, As, Op) when is_pid(Pid) -> +reg_other1({_,g,_} = Key, R, Pid, Value, As, Op) when is_pid(Pid) -> ?CHK_DIST, - gproc_dist:reg_other(Key, Pid, Value, As, Op); -reg_other1({T,l,_} = Key, Pid, Value, As, Op) when is_pid(Pid) -> + gproc_dist:reg_other(Key, R, Pid, Value, As, Op); +reg_other1({_,l,_} = Key, #{type := T} = R, Pid, Value, As, Op) + when is_pid(Pid) -> if T==n; T==a; T==r; T==rc -> - call({reg_other, Key, Pid, Value, As, Op}); + call({reg_other, Key, R, Pid, Value, As, Op}); true -> ?THROW_GPROC_ERROR(badarg) end. @@ -1139,12 +1163,15 @@ reg_or_locate(Key, Value) -> reg_or_locate({n,_,_} = Key, Value, F) when is_function(F, 0) -> ?CATCH_GPROC_ERROR(reg_or_locate1(Key, Value, F), [Key, Value, F]). -reg_or_locate1({_,g,_} = Key, Value, P) -> +reg_or_locate1({_,_,_} = Key, Value, P) -> + reg_or_locate1(Key, reg_type(Key), Value, P). + +reg_or_locate1({_,g,_} = Key, R, Value, P) -> ?CHK_DIST, - gproc_dist:reg_or_locate(Key, Value, P); -reg_or_locate1({T,l,_} = Key, Value, P) when T==n; T==a; T==rc -> - call({reg_or_locate, Key, Value, P}); -reg_or_locate1(_, _, _) -> + gproc_dist:reg_or_locate(Key, R, Value, P); +reg_or_locate1({T,l,_} = Key, R, Value, P) when T==n; T==a; T==rc -> + call({reg_or_locate, Key, R, Value, P}); +reg_or_locate1(_, _, _,_) -> ?THROW_GPROC_ERROR(badarg). %% @spec reg_shared(Key::key()) -> true @@ -1158,8 +1185,10 @@ reg_shared(Key) -> ?CATCH_GPROC_ERROR(reg_shared1(Key), [Key]). %% @private -reg_shared1({T,_,_} = Key) when T==a; T==p; T==c -> - reg_shared(Key, default(Key)). +reg_shared1({_,_,_} = Key) -> + R = reg_type(Key), + reg_shared1(Key, R, default(R), []). + %% reg_shared(Key, default(Key)). %% @spec reg_shared(Key::key(), Value) -> true %% @@ -1177,25 +1206,28 @@ reg_shared1({T,_,_} = Key) when T==a; T==p; T==c -> %% @end %% reg_shared(Key, Value) -> - ?CATCH_GPROC_ERROR(reg_shared1(Key, Value, []), [Key, Value]). + ?CATCH_GPROC_ERROR( + reg_shared1(Key, reg_type(Key), Value, []), [Key, Value]). reg_shared(Key, Value, Attrs) when is_list(Attrs) -> - ?CATCH_GPROC_ERROR(reg_shared1(Key, Value, Attrs), [Key, Value, Attrs]). + ?CATCH_GPROC_ERROR( + reg_shared1(Key, reg_type(Key), Value, Attrs), [Key, Value, Attrs]). %% @private -reg_shared1({_,g,_} = Key, Value, As) -> +reg_shared1({_,g,_} = Key, R, Value, As) -> %% anything global ?CHK_DIST, - gproc_dist:reg_shared(Key, Value, As); -reg_shared1({a,l,_} = Key, undefined, As) -> - call({reg_shared, Key, undefined, As, reg}); -reg_shared1({c,l,_} = Key, Value, As) when is_integer(Value) -> - call({reg_shared, Key, Value, As, reg}); -reg_shared1({p,l,_} = Key, Value, As) -> - call({reg_shared, Key, Value, As, reg}); -reg_shared1({rc,l,_} = Key, undefined, As) -> - call({reg_shared, Key, undefined, As, reg}); -reg_shared1(_, _, _) -> + gproc_dist:reg_shared(Key, R, Value, As, reg); + %% gproc_dist:reg_shared(Key, Value, As); +reg_shared1({a,l,_} = Key, R, undefined, As) -> + call({reg_shared, Key, R, undefined, As, reg}); +reg_shared1({c,l,_} = Key, R, Value, As) when is_integer(Value) -> + call({reg_shared, Key, R, Value, As, reg}); +reg_shared1({rc,l,_} = Key, R, undefined, As) -> + call({reg_shared, Key, R, undefined, As, reg}); +reg_shared1({_,_,_} = Key, R, Value, As) -> + call({reg_shared, Key, R, Value, As, reg}); +reg_shared1(_, _, _, _) -> ?THROW_GPROC_ERROR(badarg). %% @spec mreg(type(), scope(), [{Key::any(), Value::any()}]) -> true @@ -1253,7 +1285,7 @@ existing(T,Scope,L) -> T==a; T==n -> [{{T,Scope,K}, T} || K <- L] end, - _ = [case ets:member(?TAB, K) of + _ = [case ets_member(K) of false -> erlang:error(badarg); true -> true end || K <- Keys], @@ -1275,7 +1307,7 @@ unreg1(Key) -> {T, l, _} when T == n; T == a; T == r; T == rc -> call({unreg, Key}); {_, l, _} -> - case ets:member(?TAB, {Key,self()}) of + case ets_member({Key,self()}) of true -> _ = gproc_lib:remove_reg(Key, self(), unreg), true; @@ -1408,7 +1440,7 @@ unregister_name(Key) -> %% {@type headpat()}. %% @end select({?TAB, _, _, _, _, _, _, _} = Continuation) -> - ets:select(Continuation); + ets_select_cont(Continuation); select(Pat) -> select(all, Pat). @@ -1429,7 +1461,7 @@ select(Pat) -> %% variable substitution and ensure that the scan is limited. %% @end select(Context, Pat) -> - ets:select(?TAB, pattern(Pat, Context)). + ets_select(pattern(Pat, Context)). %% @spec (Context::context(), Pat::sel_patten(), Limit::integer()) -> %% {[Match],Continuation} | '$end_of_table' @@ -1438,7 +1470,7 @@ select(Context, Pat) -> %% See [http://www.erlang.org/doc/man/ets.html#select-3]. %% @end select(Context, Pat, Limit) -> - ets:select(?TAB, pattern(Pat, Context), Limit). + ets_select(pattern(Pat, Context), Limit). %% @spec (sel_pattern()) -> list(sel_object()) @@ -1462,12 +1494,14 @@ select_count(Context, Pat) -> %%% Local properties can be registered in the local process, since %%% no other process can interfere. %%% -local_reg({_,Scope,_} = Key, Value, As, Op) -> - case gproc_lib:insert_reg(Key, Value, self(), l) of + +local_reg(Key, R, Value, As, Op) -> + case gproc_lib:insert_reg(Key, R, Value, self(), As, l, registered) of false -> - case ets:member(?TAB, {Key, self()}) of + TKey = {Key, self()}, + case ets_member(TKey) of true when Op == ensure -> - gproc_lib:do_set_value(Key, Value, self()), + ets_update_element(TKey, {3, Value}), set_attrs(As, Key, self()), updated; _ -> @@ -1476,7 +1510,7 @@ local_reg({_,Scope,_} = Key, Value, As, Op) -> true -> monitor_me(), if As =/= [] -> - gproc_lib:insert_attr(Key, As, self(), Scope), + gproc_lib:insert_attr(Key, As, self(), l), regged_new(Op); true -> regged_new(Op) @@ -1581,24 +1615,27 @@ get_value_shared(Key) -> get_value(Key, Pid) -> ?CATCH_GPROC_ERROR(get_value1(Key, Pid), [Key, Pid]). -get_value1({T,_,_} = Key, Pid) when is_pid(Pid) -> +get_value1({_,_,_} = Key, Pid) when is_pid(Pid) -> + #{type := T} = reg_type(Key), if T==n; T==a; T==rc -> - case ets:lookup(?TAB, {Key, T}) of + case ets_lookup({Key, T}) of [{_, P, Value}] when P == Pid -> Value; _ -> ?THROW_GPROC_ERROR(badarg) end; true -> ets:lookup_element(?TAB, {Key, Pid}, 3) end; -get_value1({T,_,_} = K, shared) when T==c; T==a; T==p; T==r -> +get_value1({_,_,_} = K, shared) -> + #{type := T} = reg_type(K), Key = case T of c -> {K, shared}; p -> {K, shared}; r -> {K, shared}; + n -> {K, n}; a -> {K, a}; rc -> {K, rc} end, - case ets:lookup(?TAB, Key) of + case ets_lookup(Key) of [{_, shared, Value}] -> Value; _ -> ?THROW_GPROC_ERROR(badarg) end; @@ -1614,13 +1651,7 @@ get_value1(_, _) -> %% An exception is raised if `Key' is not registered for the given process. %% @end get_attribute(Key, A) -> - Pid = case Key of - {T,_,_} when T==n; T==a; T==rc -> - where(Key); - {T,_,_} when T==p; T==c; T==r -> - self() - end, - ?CATCH_GPROC_ERROR(get_attribute1(Key, Pid, A), [Key, A]). + ?CATCH_GPROC_ERROR(get_attribute1(Key, A), [Key, A]). %% @spec (Key, Pid::pid() | shared, Attr::atom()) -> Value %% @doc Get the attribute value of `Attr' associated with `Key' for process Pid. @@ -1641,9 +1672,18 @@ get_attribute(Key, Pid, A) -> get_attribute_shared(Key, Attr) -> ?CATCH_GPROC_ERROR(get_attribute1(Key, shared, Attr), [Key, Attr]). +get_attribute1(Key, A) -> + #{type := T} = reg_type(Key), + Pid = if T==n; T==a; T==rc -> + where(Key); + T==p; T==c; T==r -> + self() + end, + get_attribute1(Key, Pid, A). + %% @private get_attribute1({_,_,_} = Key, Pid, A) when is_pid(Pid); Pid==shared -> - case ets:lookup(?TAB, {Pid, Key}) of + case ets_lookup({Pid, Key}) of [{_, Attrs}] -> case lists:keyfind(attrs, 1, Attrs) of false -> undefined; @@ -1676,7 +1716,7 @@ get_attributes(Key, Pid) -> ?CATCH_GPROC_ERROR(get_attributes1(Key, Pid), [Key, Pid]). get_attributes1({_,_,_} = Key, Pid) when is_pid(Pid); Pid==shared -> - case ets:lookup(?TAB, {Pid, Key}) of + case ets_lookup({Pid, Key}) of [{_, Attrs}] -> case lists:keyfind(attrs, 1, Attrs) of false -> []; @@ -1725,7 +1765,7 @@ where(Key) -> where1({T,_,_}=Key) -> if T==n orelse T==a orelse T==rc -> - case ets:lookup(?TAB, {Key,T}) of + case ets_lookup({Key,T}) of [{_, P, _Value}] -> case my_is_process_alive(P) of true -> P; @@ -1757,10 +1797,10 @@ whereis_name(Key) -> %% lookup_pids({T,_,_} = Key) -> L = if T==n orelse T==a orelse T==rc -> - ets:select(?TAB, [{{{Key,T}, '$1', '_'}, + ets_select([{{{Key,T}, '$1', '_'}, [{is_pid, '$1'}], ['$1']}]); true -> - ets:select(?TAB, [{{{Key,'_'}, '$1', '_'}, + ets_select([{{{Key,'_'}, '$1', '_'}, [{is_pid, '$1'}], ['$1']}]) end, [P || P <- L, my_is_process_alive(P)]. @@ -1785,9 +1825,9 @@ my_is_process_alive(_) -> %% lookup_values({T,_,_} = Key) -> L = if T==n orelse T==a orelse T==rc -> - ets:select(?TAB, [{{{Key,T}, '$1', '$2'},[],[{{'$1','$2'}}]}]); + ets_select([{{{Key,T}, '$1', '$2'},[],[{{'$1','$2'}}]}]); true -> - ets:select(?TAB, [{{{Key,'_'}, '$1', '$2'},[],[{{'$1','$2'}}]}]) + ets_select([{{{Key,'_'}, '$1', '$2'},[],[{{'$1','$2'}}]}]) end, [Pair || {P,_} = Pair <- L, my_is_process_alive(P)]. @@ -1815,24 +1855,33 @@ lookup_values({T,_,_} = Key) -> %% appropriate value type. %% @end %% --spec update_counter(key(), increment()) -> integer(). +-spec update_counter(key(), increment()) -> integer() | [integer()]. update_counter(Key, Incr) -> - Pid = case Key of - {n,_,_} -> n; - {c,_,_} -> self() - end, - ?CATCH_GPROC_ERROR(update_counter1(Key, Pid, Incr), [Key, Incr]). + ?CATCH_GPROC_ERROR(update_counter1(Key, Incr), [Key, Incr]). -update_counter(Key, Pid, Incr) when is_pid(Pid); - Pid == shared; Pid == n -> +update_counter(Key, Pid, Incr) when is_pid(Pid); Pid == shared; Pid == n -> ?CATCH_GPROC_ERROR(update_counter1(Key, Pid, Incr), [Key, Pid, Incr]). -update_counter1({T,l,_} = Key, Pid, Incr) when T==c; T==n -> - gproc_lib:update_counter(Key, Incr, Pid); -update_counter1({T,g,_} = Key, Pid, Incr) when T==c; T==n -> +update_counter1(Key, Incr) -> + #{type := T} = R = reg_type(Key), + Pid = if T =:= n -> + n; + T =:= c; T =:= r; T =:= p -> + self() + end, + update_counter1(Key, R, Pid, Incr). + +update_counter1({_,_,_} = Key, Pid, Incr) -> + update_counter1(Key, reg_type(Key), Pid, Incr). + +update_counter1({T,l,_} = Key, #{type := T} = R, Pid, Incr) + when T =:= n; T =:= c; T =:= r; T =:= p -> + gproc_lib:update_counter(Key, R, Incr, Pid); +update_counter1({_,g,_} = Key, #{type := T} = R, Pid, Incr) + when T =:= n; T =:= c; T =:= r; T =:= p -> ?CHK_DIST, - gproc_dist:update_counter(Key, Pid, Incr); -update_counter1(_, _, _) -> + gproc_dist:update_counter(Key, R, Pid, Incr); +update_counter1(_, _, _, _) -> ?THROW_GPROC_ERROR(badarg). %% @doc Update a list of counters @@ -1854,8 +1903,16 @@ update_counters(g, [_|_] = Cs) -> gproc_dist:update_counters(Cs). -update_counters1([{{c,l,_} = Key, Pid, Incr}|T]) -> - [{Key, Pid, gproc_lib:update_counter(Key, Incr, Pid)}|update_counters1(T)]; +update_counters1([{{_,l,_} = Key, Pid, Incr}|Cs]) when is_pid(Pid); + Pid =:= shared; + Pid =:= n -> + case reg_type(Key) of + #{type := T} = R when T =:= c; T =:= p; T =:= r -> + [{Key, Pid, gproc_lib:update_counter(Key, R, Incr, Pid)} + |update_counters1(Cs)]; + _ -> + ?THROW_GPROC_ERROR(badarg) + end; update_counters1([]) -> []; update_counters1(_) -> @@ -1891,14 +1948,14 @@ reset_counter1({T,g,_} = Key) when T==c; T==n -> ?CHK_DIST, gproc_dist:reset_counter(Key); reset_counter1({n,l,_} = Key) -> - [{_, Pid, Current}] = ets:lookup(?TAB, {Key, n}), + [{_, Pid, Current}] = ets_lookup({Key, n}), {Current, update_counter(Key, get_initial(Pid, Key) - Current)}; reset_counter1({c,l,_} = Key) -> Current = ets:lookup_element(?TAB, {Key, self()}, 3), {Current, update_counter(Key, get_initial(self(), Key) - Current)}. get_initial(Pid, Key) -> - case ets:lookup(?TAB, {Pid, Key}) of + case ets_lookup({Pid, Key}) of [{_, r}] -> 0; [{_, Opts}] -> proplists:get_value(initial, Opts, 0) @@ -1922,13 +1979,7 @@ get_initial(Pid, Key) -> %% @end %% update_shared_counter(Key, Incr) -> - ?CATCH_GPROC_ERROR(update_shared_counter1(Key, Incr), [Key, Incr]). - -update_shared_counter1({c,g,_} = Key, Incr) -> - ?CHK_DIST, - gproc_dist:update_shared_counter(Key, Incr); -update_shared_counter1({c,l,_} = Key, Incr) -> - gproc_lib:update_counter(Key, Incr, shared). + ?CATCH_GPROC_ERROR(update_counter1(Key, shared, Incr), [Key, Incr]). %% @spec (From::key(), To::pid() | key()) -> undefined | pid() %% @@ -1992,7 +2043,7 @@ send(Key, Msg) -> send1({T,C,_} = Key, Msg) when C==l; C==g -> if T == n orelse T == a orelse T == rc -> - case ets:lookup(?TAB, {Key, T}) of + case ets_lookup({Key, T}) of [{_, Pid, _}] -> Pid ! Msg; _ -> @@ -2050,7 +2101,7 @@ bcast1(Ns, {T,l,_} = Key, Msg) when T==p; T==a; T==c; T==n; T==r; T==rc -> first(Context) -> {S, T} = get_s_t(Context), {HeadPat,_} = headpat({S, T}, '_', '_', '_'), - case ets:select(?TAB, [{HeadPat,[],[{element,1,'$_'}]}], 1) of + case ets_select([{HeadPat,[],[{element,1,'$_'}]}], 1) of {[First], _} -> First; _ -> @@ -2200,7 +2251,7 @@ to_atom(S) -> end. gproc_info(Pid, Pat) -> - Keys = ets:select(?TAB, [{ {{Pid,Pat}, '_'}, [], [{element,2, + Keys = ets_select([{ {{Pid,Pat}, '_'}, [], [{element,2, {element,1,'$_'}}] }]), {?MODULE, lists:zf( fun(K) -> @@ -2224,7 +2275,7 @@ i() -> %% @hidden handle_cast({monitor_me, Pid}, S) -> - erlang:monitor(process, Pid), + monitor_process(Pid), {noreply, S}; handle_cast({audit_process, Pid}, S) -> case is_process_alive(Pid) of @@ -2235,7 +2286,7 @@ handle_cast({audit_process, Pid}, S) -> end, {noreply, S}; handle_cast({cancel_wait, Pid, {T,_,_} = Key, Ref}, S) -> - _ = case ets:lookup(?TAB, {Key,T}) of + _ = case ets_lookup({Key,T}) of [{_, Waiters}] -> gproc_lib:remove_wait(Key, Pid, Ref, Waiters); _ -> @@ -2243,7 +2294,7 @@ handle_cast({cancel_wait, Pid, {T,_,_} = Key, Ref}, S) -> end, {noreply, S}; handle_cast({cancel_wait_or_monitor, Pid, {T,_,_} = Key}, S) -> - _ = case ets:lookup(?TAB, {Key, T}) of + _ = case ets_lookup({Key, T}) of [{_, Waiters}] -> gproc_lib:remove_wait(Key, Pid, all, Waiters); [{_, OtherPid, _}] -> @@ -2254,10 +2305,12 @@ handle_cast({cancel_wait_or_monitor, Pid, {T,_,_} = Key}, S) -> {noreply, S}. %% @hidden -handle_call({reg, {_T,l,_} = Key, Val, Attrs, Op}, {Pid,_}, S) -> - handle_reg_call(Key, Pid, Val, Attrs, Op, S); -handle_call({reg_other, {_T,l,_} = Key, Pid, Val, Attrs, Op}, _, S) -> - handle_reg_call(Key, Pid, Val, Attrs, Op, S); +handle_call({reg, {_T,l,_} = Key, #{} = R, Val, Attrs, Op}, {Pid,_}, S) -> + handle_reg_call(Key, R, Pid, Val, Attrs, Op, S); +handle_call({reg_other, Key, #{} = R, Pid, Val, Attrs, Op}, _, S) -> + handle_reg_call(Key, R, Pid, Val, Attrs, Op, S); +handle_call({reg_other, {_T,l,_} = Key, #{} = R, Pid, Val, Attrs, Op}, _, S) -> + handle_reg_call(Key, R, Pid, Val, Attrs, Op, S); handle_call({set_attributes, {_,l,_} = Key, Attrs}, {Pid,_}, S) -> case gproc_lib:insert_attr(Key, Attrs, Pid, l) of false -> {reply, badarg, S}; @@ -2271,18 +2324,19 @@ handle_call({set_attributes_shared, {_,l,_} = Key, Attrs}, _, S) -> L when is_list(L) -> {reply, true, S} end; -handle_call({reg_or_locate, {T,l,_} = Key, Val, P}, _, S) -> +handle_call({reg_or_locate, {T,l,_} = Key, R, Val, P}, _, S) -> Reg = fun() -> Pid = if is_function(P, 0) -> spawn(P); is_pid(P) -> P end, - true = gproc_lib:insert_reg(Key, Val, Pid, l), + true = gproc_lib:insert_reg( + Key, R, Val, Pid, [], l, registered), _ = gproc_lib:ensure_monitor(Pid, l), {reply, {Pid, Val}, S} end, - case ets:lookup(?TAB, {Key, T}) of + case ets_lookup({Key, T}) of [] -> Reg(); [{_, _Waiters}] -> @@ -2293,7 +2347,7 @@ handle_call({reg_or_locate, {T,l,_} = Key, Val, P}, _, S) -> handle_call({monitor, {T,l,_} = Key, Pid, Type}, _From, S) when T==n; T==a -> Ref = make_ref(), - Lookup = ets:lookup(?TAB, {Key, T}), + Lookup = ets_lookup({Key, T}), IsRegged = is_regged(Lookup), _ = case {IsRegged, Type} of {false, info} -> @@ -2305,10 +2359,10 @@ handle_call({monitor, {T,l,_} = Key, Pid, Type}, _From, S) [{K, Waiters}] -> NewWaiters = [{Pid,Ref,follow}|Waiters], ets:insert(?TAB, {K, NewWaiters}), - ets:insert_new(?TAB, {{Pid,Key}, []}); + ets_insert_new({{Pid,Key}, []}); [] -> ets:insert(?TAB, {{Key,T}, [{Pid,Ref,follow}]}), - ets:insert_new(?TAB, {{Pid,Key}, []}) + ets_insert_new({{Pid,Key}, []}) end; {false, standby} -> Evt = {failover, Pid}, @@ -2318,14 +2372,14 @@ handle_call({monitor, {T,l,_} = Key, Pid, Type}, _From, S) {true, _} -> [{_, RegPid, _}] = Lookup, _ = gproc_lib:ensure_monitor(Pid, l), - case ets:lookup(?TAB, {RegPid, Key}) of + case ets_lookup({RegPid, Key}) of [{K,r}] -> ets:insert(?TAB, {K, [{monitor, [{Pid,Ref,Type}]}]}), - ets:insert_new(?TAB, {{Pid,Key}, []}); + ets_insert_new({{Pid,Key}, []}); [{K, Opts}] -> ets:insert(?TAB, {K, gproc_lib:add_monitor( Opts, Pid, Ref, Type)}), - ets:insert_new(?TAB, {{Pid,Key}, []}) + ets_insert_new({{Pid,Key}, []}) end end, {reply, Ref, S}; @@ -2335,7 +2389,7 @@ handle_call({demonitor, {T,l,_} = Key, Ref, Pid}, _From, S) undefined -> ok; % be nice RegPid -> - case ets:lookup(?TAB, {RegPid, Key}) of + case ets_lookup({RegPid, Key}) of [{_K,r}] -> ok; % be nice [{K, Opts}] -> @@ -2344,30 +2398,14 @@ handle_call({demonitor, {T,l,_} = Key, Ref, Pid}, _From, S) end end, {reply, ok, S}; -handle_call({reg_shared, {_T,l,_} = Key, Val, Attrs, Op}, _From, S) -> - case try_insert_reg(Key, Val, shared) of - true -> - _ = if Attrs =/= [] -> - gproc_lib:insert_attr(Key, Attrs, shared, l); - true -> true - end, - {reply, true, S}; - already_registered when Op == ensure -> - case gproc_lib:do_set_value(Key, Val, shared) of - true -> - {reply, updated, S}; - false -> - {reply, badarg, S} - end; - _ -> - {reply, badarg, S} - end; +handle_call({reg_shared, {_T,l,_} = Key, R, Val, Attrs, Op}, _From, S) -> + handle_reg_call(Key, R, shared, Val, Attrs, Op, S); handle_call({unreg, {_,l,_} = Key}, {Pid,_}, S) -> handle_unreg_call(Key, Pid, S); handle_call({unreg_other, {_,l,_} = Key, Pid}, _, S) -> handle_unreg_call(Key, Pid, S); handle_call({unreg_shared, {_,l,_} = Key}, _, S) -> - _ = case ets:lookup(?TAB, {shared, Key}) of + _ = case ets_lookup({shared, Key}) of [{_, r}] -> _ = gproc_lib:remove_reg(Key, shared, unreg, []); [{_, Opts}] -> @@ -2436,7 +2474,7 @@ handle_info(_, S) -> %% @hidden code_change(_FromVsn, S, _Extra) -> %% We have changed local monitor markers from {Pid} to {Pid,l}. - _ = case ets:select(?TAB, [{{'$1'},[],['$1']}]) of + _ = case ets_select([{{'$1'},[],['$1']}]) of [] -> ok; Pids -> @@ -2451,11 +2489,10 @@ terminate(_Reason, _S) -> %% handle_call body common to reg and reg_other. %% -handle_reg_call(Key, Pid, Val, Attrs, Op, S) -> - case try_insert_reg(Key, Val, Pid) of +handle_reg_call(Key, R, Pid, Val, Attrs, Op, S) -> + case try_insert_reg(Key, R, Val, Attrs, Pid) of true -> _ = gproc_lib:ensure_monitor(Pid,l), - _ = set_attrs(Attrs, Key, Pid), {reply, regged_new(Op), S}; already_registered when Op == ensure -> case gproc_lib:do_set_value(Key, Val, Pid) of @@ -2476,7 +2513,7 @@ set_attrs([_|_] = Attrs, Key, Pid) -> gproc_lib:insert_attr(Key, Attrs, Pid, l). handle_unreg_call(Key, Pid, S) -> - case ets:lookup(?TAB, {Pid,Key}) of + case ets_lookup({Pid,Key}) of [{_, r}] -> _ = gproc_lib:remove_reg(Key, Pid, unreg, []), {reply, true, S}; @@ -2520,10 +2557,11 @@ cast(Msg, g) -> cast(N, Msg, l) -> gen_server:cast({?MODULE, N}, Msg). -try_insert_reg({T,l,_} = Key, Val, Pid) -> - case gproc_lib:insert_reg(Key, Val, Pid, l) of +try_insert_reg(Key, R, Val, Attrs, Pid) -> + T = element(1, Key), + case gproc_lib:insert_reg(Key, R, Val, Pid, Attrs, l, registered) of false -> - case ets:lookup(?TAB, {Key,T}) of + case ets_lookup({Key,T}) of %% In this particular case, the lookup cannot result in %% [{_, Waiters}], since the insert_reg/4 function would %% have succeeded then. @@ -2535,7 +2573,7 @@ try_insert_reg({T,l,_} = Key, Val, Pid) -> false; false -> process_is_down(OtherPid), % may result in failover - try_insert_reg(Key, Val, Pid) + try_insert_reg(Key, R, Val, Attrs, Pid) end; [] -> false @@ -2545,9 +2583,9 @@ try_insert_reg({T,l,_} = Key, Val, Pid) -> end. %% try_insert_shared({c,l,_} = Key, Val) -> -%% ets:insert_new(?TAB, [{{Key,shared}, shared, Val}, {{shared, Key}, []}]); +%% ets_insert_new([{{Key,shared}, shared, Val}, {{shared, Key}, []}]); %% try_insert_shared({a,l,_} = Key, Val) -> -%% ets:insert_new(?TAB, [{{Key, a}, shared, Val}, {{shared, Key}, []}]). +%% ets_insert_new([{{Key, a}, shared, Val}, {{shared, Key}, []}]). -spec audit_process(pid()) -> ok. @@ -2563,17 +2601,17 @@ process_is_down(Pid) when is_pid(Pid) -> %% delete the monitor marker %% io:fwrite(user, "process_is_down(~p) - ~p~n", [Pid,ets:tab2list(?TAB)]), Marker = {Pid,l}, - case ets:member(?TAB, Marker) of + case ets_member(Marker) of false -> ok; true -> - Revs = ets:select(?TAB, [{{{Pid,'$1'}, '$2'}, + Revs = ets_select([{{{Pid,'$1'}, '$2'}, [{'==',{element,2,'$1'},l}], [{{'$1','$2'}}]}]), lists:foreach( fun({{n,l,_}=K, R}) -> Key = {K,n}, - case ets:lookup(?TAB, Key) of + case ets_lookup(Key) of [{_, Pid, V}] -> ets:delete(?TAB, Key), opt_notify(R, K, Pid, V); @@ -2586,7 +2624,7 @@ process_is_down(Pid) when is_pid(Pid) -> ets:insert(?TAB, {Key, Waiters1}) end; [{_, OtherPid, _}] when Pid =/= OtherPid -> - case ets:lookup(?TAB, {OtherPid, K}) of + case ets_lookup({OtherPid, K}) of [{RK, Opts}] when is_list(Opts) -> Opts1 = gproc_lib:remove_monitor_pid( Opts, Pid), @@ -2599,7 +2637,7 @@ process_is_down(Pid) when is_pid(Pid) -> end; ({{c,l,C} = K, _}) -> Key = {K, Pid}, - [{_, _, Value}] = ets:lookup(?TAB, Key), + [{_, _, Value}] = ets_lookup(Key), ets:delete(?TAB, Key), gproc_lib:update_aggr_counter(l, C, -Value); ({{r,l,Rsrc} = K, _}) -> @@ -2611,20 +2649,51 @@ process_is_down(Pid) when is_pid(Pid) -> ({{a,l,_} = K, R}) -> remove_aggregate(a, K, R, Pid); ({{p,_,_} = K, _}) -> - ets:delete(?TAB, {K, Pid}) + ets:delete(?TAB, {K, Pid}); + ({K, R}) -> + case lists:keyfind(type, 1, R) of + {_, c} -> remove_counter(K, Pid, R); + {_, r} -> remove_resource(K, Pid, R); + {_, rc} -> remove_aggregate(element(1,K), K, R, Pid); + {_, a} -> remove_aggregate(element(1,K), K, R, Pid); + {_, p} -> ets:delete(?TAB, {K, Pid}) + end end, Revs), ets:select_delete(?TAB, [{{{Pid,{'_',l,'_'}},'_'}, [], [true]}]), ets:delete(?TAB, Marker), ok end. +remove_counter(K, Pid, R) -> + Key = {K, Pid}, + [{_, _, Value}] = ets_lookup(Key), + Aggr = keyfind(aggr, 1, R, []), + ets:delete(?TAB, Key), + [gproc_lib:update_aggr_counter(l, T, A, -Value) || {T,A} <- Aggr], + ok. + +remove_resource(K, Pid, R) -> + Key = {K, Pid}, + Rc = keyfind(rc, 1, R, []), + ets:delete(?TAB, Key), + [gproc_lib:decrement_resource_count(l, T, Rsrc) || {T, Rsrc} <- Rc], + ok. + +keyfind(K, Pos, L, Default) -> + case lists:keyfind(K, Pos, L) of + false -> + Default; + {_, V} -> + V + end. + remove_aggregate(T, K, R, Pid) -> - case ets:lookup(?TAB, {K,T}) of + case ets_lookup({K,T}) of [{_, Pid, V}] -> ets:delete(?TAB, {K,T}), opt_notify(R, K, Pid, V); [{_, OtherPid, _}] when Pid =/= OtherPid -> - case ets:lookup(?TAB, {OtherPid, K}) of + case ets_lookup({OtherPid, K}) of [{RK, Opts}] when is_list(Opts) -> Opts1 = gproc_lib:remove_monitor_pid( Opts, Pid), @@ -2686,7 +2755,7 @@ pick_standby([]) -> do_give_away({T,l,_} = K, To, Pid) when T==n; T==a; T==rc -> Key = {K, T}, - case ets:lookup(?TAB, Key) of + case ets_lookup(Key) of [{_, Pid, Value}] -> %% Pid owns the reg; allowed to give_away case pid_to_give_away_to(To) of @@ -2709,12 +2778,12 @@ do_give_away({T,l,_} = K, To, Pid) when T==n; T==a; T==rc -> end; do_give_away({T,l,_} = K, To, Pid) when T==c; T==p; T==r -> Key = {K, Pid}, - case ets:lookup(?TAB, Key) of + case ets_lookup(Key) of [{_, Pid, Value}] -> case pid_to_give_away_to(To) of ToPid when is_pid(ToPid) -> ToKey = {K, ToPid}, - case ets:member(?TAB, ToKey) of + case ets_member(ToKey) of true -> badarg; false -> @@ -2737,7 +2806,7 @@ do_give_away({T,l,_} = K, To, Pid) when T==c; T==p; T==r -> pid_to_give_away_to(P) when is_pid(P), node(P) == node() -> P; pid_to_give_away_to({T,l,_} = Key) when T==n; T==a; T==rc -> - case ets:lookup(?TAB, {Key, T}) of + case ets_lookup({Key, T}) of [{_, Pid, _}] -> Pid; _ -> @@ -2761,17 +2830,17 @@ init([]) -> set_monitors() -> - set_monitors(ets:select(?TAB, [{{{'$1',l}},[],['$1']}], 100)). + set_monitors(ets_select([{{{'$1',l}},[],['$1']}], 100)). set_monitors('$end_of_table') -> ok; set_monitors({Pids, Cont}) -> - _ = [erlang:monitor(process,Pid) || Pid <- Pids], - set_monitors(ets:select(Cont)). + _ = [monitor_process(Pid) || Pid <- Pids], + set_monitors(ets_select_cont(Cont)). monitor_me() -> - case ets:insert_new(?TAB, {{self(),l}}) of + case ets_insert_new({{self(),l}}) of false -> true; true -> cast({monitor_me,self()}), @@ -3025,7 +3094,7 @@ qlc_lookup(_Scope, 1, Keys, Check) -> fun(Key) -> remove_dead( Check, - ets:select(?TAB, [{ {{Key,'_'},'_','_'}, [], + ets_select([{ {{Key,'_'},'_','_'}, [], [{{ {element,1,{element,1,'$_'}}, {element,2,'$_'}, {element,3,'$_'} }}] }])) @@ -3050,14 +3119,14 @@ qlc_lookup_pid(Pid, Scope, Check) -> []; false -> Found = - ets:select(?TAB, [{{{Pid, rev_keypat(Scope)}, '_'}, + ets_select([{{{Pid, rev_keypat(Scope)}, '_'}, [], ['$_']}]), lists:flatmap( fun({{_,{T,_,_}=K}, _}) -> K2 = if T==n orelse T==a -> T; true -> Pid end, - case ets:lookup(?TAB, {K,K2}) of + case ets_lookup({K,K2}) of [{{Key,_},_,Value}] -> [{Key, Pid, Value}]; [] -> @@ -3069,7 +3138,7 @@ qlc_lookup_pid(Pid, Scope, Check) -> qlc_next(_, '$end_of_table', _) -> []; qlc_next(Scope, K, Check) -> - case ets:lookup(?TAB, K) of + case ets_lookup(K) of [{{Key,_}, Pid, V}] -> case Check andalso ?PID_IS_DEAD(Pid) of true -> @@ -3086,7 +3155,7 @@ qlc_next(Scope, K, Check) -> qlc_prev(_, '$end_of_table', _) -> []; qlc_prev(Scope, K, Check) -> - case ets:lookup(?TAB, K) of + case ets_lookup(K) of [{{Key,_},Pid,V}] -> case Check andalso ?PID_IS_DEAD(Pid) of true -> @@ -3108,12 +3177,12 @@ qlc_select(true, {Objects, Cont}) -> not ?PID_IS_DEAD(Pid)] of [] -> %% re-run search - qlc_select(true, ets:select(Cont)); + qlc_select(true, ets_select_cont(Cont)); Found -> - Found ++ fun() -> qlc_select(true, ets:select(Cont)) end + Found ++ fun() -> qlc_select(true, ets_select_cont(Cont)) end end; qlc_select(false, {Objects, Cont}) -> - Objects ++ fun() -> qlc_select(false, ets:select(Cont)) end. + Objects ++ fun() -> qlc_select(false, ets_select_cont(Cont)) end. is_unique(n) -> true; @@ -3124,3 +3193,15 @@ is_regged([{_, _, _}]) -> true; is_regged(_) -> false. + +%% function wrappers for easier tracing +ets_lookup(K) -> ets:lookup(?TAB, K). +ets_member(K) -> ets:member(?TAB, K). +ets_select(Pat) -> ets:select(?TAB, Pat). +ets_select(P,L) -> ets:select(?TAB, P, L). +ets_select_cont(Cont) -> ets:select(Cont). +ets_insert_new(X) -> ets:insert_new(?TAB, X). +ets_update_element(K,V) -> ets:update_element(?TAB, K, V). + +monitor_process(Pid) -> + erlang:monitor(process, Pid). diff --git a/src/gproc_dist.erl b/src/gproc_dist.erl index 2f78232..09c566d 100644 --- a/src/gproc_dist.erl +++ b/src/gproc_dist.erl @@ -23,10 +23,10 @@ -behaviour(gen_leader). -export([start_link/0, start_link/1, - reg/1, reg/4, unreg/1, - reg_other/5, unreg_other/2, - reg_or_locate/3, - reg_shared/3, unreg_shared/1, + reg/1, reg/5, unreg/1, + reg_other/6, unreg_other/2, + reg_or_locate/4, + reg_shared/5, unreg_shared/1, monitor/2, demonitor/2, set_attributes/2, @@ -36,9 +36,8 @@ set_value/2, set_value_shared/2, give_away/2, - update_counter/3, + update_counter/4, update_counters/1, - update_shared_counter/2, reset_counter/1]). -export([leader_call/1, @@ -93,21 +92,25 @@ start_link({Nodes, Opts}) -> %% {@see gproc:reg/1} %% reg(Key) -> - reg(Key, gproc:default(Key), [], reg). + R = reg_type(Key), + reg(Key, R, gproc:default(R), [], reg). + +reg_type({T,_,N}) -> + gproc_lib:reg_type(T, N). %% {@see gproc:reg_or_locate/2} %% -reg_or_locate({n,g,_} = Key, Value, Pid) when is_pid(Pid) -> - leader_call({reg_or_locate, Key, Value, Pid}); -reg_or_locate({n,g,_} = Key, Value, F) when is_function(F, 0) -> +reg_or_locate({n,g,_} = Key, R, Value, Pid) when is_pid(Pid) -> + leader_call({reg_or_locate, Key, R, Value, Pid}); +reg_or_locate({n,g,_} = Key, R, Value, F) when is_function(F, 0) -> MyGroupLeader = group_leader(), - leader_call({reg_or_locate, Key, Value, + leader_call({reg_or_locate, Key, R, Value, fun() -> %% leader will spawn on caller's node group_leader(MyGroupLeader, self()), F() end}); -reg_or_locate(_, _, _) -> +reg_or_locate(_, _, _, _) -> ?THROW_GPROC_ERROR(badarg). @@ -120,10 +123,10 @@ reg_or_locate(_, _, _) -> %%% | r - resource property %%% | rc - resource counter %%% @end -reg({_,g,_} = Key, Value, Attrs, Op) -> +reg({_,g,_} = Key, R, Value, Attrs, Op) -> %% anything global - leader_call({reg, Key, Value, self(), Attrs, Op}); -reg(_, _, _, _) -> + leader_call({reg, Key, R, Value, self(), Attrs, Op}); +reg(_, _, _, _, _) -> ?THROW_GPROC_ERROR(badarg). %% @spec ({Class,g,Key}, pid(), Value, Attrs, Op::reg | unreg) -> true @@ -135,13 +138,15 @@ reg(_, _, _, _) -> %% Value = term() %% Attrs = [{Key, Value}] %% @end -reg_other({T,g,_} = Key, Pid, Value, Attrs, Op) when is_pid(Pid) -> +reg_other({_,g,_} = Key, #{type := T} = R, Pid, Value, Attrs, Op) + when is_pid(Pid); + Pid =:= shared -> if T==n; T==a; T==r; T==rc -> - leader_call({reg_other, Key, Value, Pid, Attrs, Op}); + leader_call({reg, Key, R, Value, Pid, Attrs, Op}); true -> ?THROW_GPROC_ERROR(badarg) end; -reg_other(_, _, _, _, _) -> +reg_other(_, _, _, _, _, _) -> ?THROW_GPROC_ERROR(badarg). unreg_other({T,g,_} = Key, Pid) when is_pid(Pid) -> @@ -153,9 +158,9 @@ unreg_other({T,g,_} = Key, Pid) when is_pid(Pid) -> unreg_other(_, _) -> ?THROW_GPROC_ERROR(badarg). -reg_shared({_,g,_} = Key, Value, Attrs) -> - leader_call({reg, Key, Value, shared, Attrs, reg}); -reg_shared(_, _, _) -> +reg_shared({_,g,_} = Key, R, Value, Attrs, Op) -> + leader_call({reg, Key, R, Value, shared, Attrs, Op}); +reg_shared(_, _, _, _, _) -> ?THROW_GPROC_ERROR(badarg). monitor({_,g,_} = Key, Type) when Type==info; @@ -224,10 +229,13 @@ give_away({_,g,_} = Key, To) -> leader_call({give_away, Key, To, self()}). -update_counter({T,g,_} = Key, Pid, Incr) when is_integer(Incr), T==c; - is_integer(Incr), T==n -> - leader_call({update_counter, Key, Incr, Pid}); -update_counter(_, _, _) -> +update_counter({_,g,_} = Key, #{type := T} = R, Pid, Incr) + when is_integer(Incr), T =:= c; + is_integer(Incr), T =:= p; + is_integer(Incr), T =:= r; + is_integer(Incr), T =:= n -> + leader_call({update_counter, Key, R, Incr, Pid}); +update_counter(_, _, _, _) -> ?THROW_GPROC_ERROR(badarg). update_counters(List) when is_list(List) -> @@ -235,12 +243,6 @@ update_counters(List) when is_list(List) -> update_counters(_) -> ?THROW_GPROC_ERROR(badarg). -update_shared_counter({c,g,_} = Key, Incr) when is_integer(Incr) -> - leader_call({update_counter, Key, Incr, shared}); -update_shared_counter(_, _) -> - ?THROW_GPROC_ERROR(badarg). - - reset_counter({c,g,_} = Key) -> leader_call({reset_counter, Key, self()}); reset_counter(_) -> @@ -364,9 +366,9 @@ handle_leader_call(sync, From, #state{sync_requests = SReqs} = S, E) -> GenLeader:broadcast({from_leader, {sync, From}}, Alive, E), {noreply, S#state{sync_requests = [{From, Alive}|SReqs]}} end; -handle_leader_call({Reg, {_C,g,_Name} = K, Value, Pid, As, Op}, _From, S, _E) +handle_leader_call({Reg, {_C,g,_Name} = K, R, Value, Pid, As, Op}, _From, S, _E) when Reg==reg; Reg==reg_other -> - case gproc_lib:insert_reg(K, Value, Pid, g) of + case gproc_lib:insert_reg(K, R, Value, Pid, As, g, registered) of false when Op == reg -> {reply, badarg, S}; false when Op == ensure -> @@ -381,10 +383,6 @@ handle_leader_call({Reg, {_C,g,_Name} = K, Value, Pid, As, Op}, _From, S, _E) end; true -> _ = gproc_lib:ensure_monitor(Pid,g), - _ = if As =/= [] -> - gproc_lib:insert_attr(K, As, Pid, g); - true -> [] - end, Vals = mk_broadcast_insert_vals([{K, Pid, Value}]), {reply, regged_new(Op), [{insert, Vals}], S} end; @@ -456,8 +454,9 @@ handle_leader_call({set_attributes, {_,g,_} = K, Attrs, Pid}, _From, S, _E) -> NewAttrs when is_list(NewAttrs) -> {reply, true, [{insert, [{{Pid,K}, NewAttrs}]}], S} end; -handle_leader_call({reg_or_locate, {n,g,_} = K, Value, P}, - {FromPid, _}, S, _E) -> +handle_leader_call({reg_or_locate, {Tg,g,_} = K, #{type := T}, Value, P}, + {FromPid, _}, S, _E) + when T =:= n; T =:= a; T =:= rc -> FromNode = node(FromPid), Reg = fun() -> Pid = if is_function(P, 0) -> @@ -468,13 +467,13 @@ handle_leader_call({reg_or_locate, {n,g,_} = K, Value, P}, case gproc_lib:insert_reg(K, Value, Pid, g) of true -> _ = gproc_lib:ensure_monitor(Pid,g), - Vals = [{{K,n},Pid,Value}], + Vals = [{{K,Tg},Pid,Value}], {reply, {Pid, Value}, [{insert, Vals}], S}; false -> {reply, badarg, S} end end, - case ets:lookup(?TAB, {K, n}) of + case ets:lookup(?TAB, {K, Tg}) of [] -> Reg(); [{_, _Waiters}] -> @@ -482,9 +481,11 @@ handle_leader_call({reg_or_locate, {n,g,_} = K, Value, P}, [{_, OtherPid, OtherVal}] -> {reply, {OtherPid, OtherVal}, S} end; -handle_leader_call({update_counter, {T,g,_Ctr} = Key, Incr, Pid}, _From, S, _E) - when is_integer(Incr), T==c; - is_integer(Incr), T==n -> +handle_leader_call({update_counter, {_,g,_Ctr} = Key, + #{type := T}, Incr, Pid}, _From, S, _E) + when is_number(Incr), T==c; + is_number(Incr), T==r; + is_number(Incr), T==n -> try New = ets:update_counter(?TAB, {Key, Pid}, {3,Incr}), RealPid = case Pid of n -> ets:lookup_element(?TAB, {Key,Pid}, 2); diff --git a/src/gproc_ext.erl b/src/gproc_ext.erl new file mode 100644 index 0000000..3d7e6fa --- /dev/null +++ b/src/gproc_ext.erl @@ -0,0 +1,21 @@ +-module(gproc_ext). + +-export([reg_type/2]). + + +reg_type(rw, _N) -> + #{tag => rw, + type => r, + unique => false, + scan => [], + aggr => [], + rc => [#{tag => rwc}, #{tag => rwc, wild => [0]}]}; +reg_type(rcw, _N) -> + #{tag => rcw, + type => rc, + unique => true, + scan => [#{tag => rc}], + aggr => [], + rc => []}; +reg_type(_, _) -> + undefined. diff --git a/src/gproc_int.hrl b/src/gproc_int.hrl index 83ea8c3..8ae26c1 100644 --- a/src/gproc_int.hrl +++ b/src/gproc_int.hrl @@ -29,3 +29,9 @@ %% Used to wrap operations that may fail, but we ignore the exception. %% Use instead of catch, to avoid building a stacktrace unnecessarily. -define(MAY_FAIL(Expr), try (Expr) catch _:_ -> '$caught_exception' end). + +-ifdef(GPROC_EXT). +-define(GPROC_EXT_CB, ?GPROC_EXT). +-else. +-define(GPROC_EXT_CB, gproc_ext). +-endif. diff --git a/src/gproc_lib.erl b/src/gproc_lib.erl index e4a9678..2f4b32b 100644 --- a/src/gproc_lib.erl +++ b/src/gproc_lib.erl @@ -21,12 +21,13 @@ %% @end -module(gproc_lib). --export([await/3, +-export([reg_type/2, + await/3, do_set_counter_value/3, do_set_value/3, ensure_monitor/2, insert_many/4, - insert_reg/4, insert_reg/5, + insert_reg/4, insert_reg/5, insert_reg/7, insert_attr/4, remove_many/4, remove_reg/3, remove_reg/4, @@ -40,21 +41,42 @@ remove_reverse_mapping/3, remove_reverse_mapping/4, notify/2, notify/3, remove_wait/4, - update_aggr_counter/3, - update_counter/3, - decrement_resource_count/2, + update_aggr_counter/3, update_aggr_counter/4, + update_counter/4, + decrement_resource_count/2, decrement_resource_count/3, valid_opts/2]). --export([dbg/1]). - -include("gproc_int.hrl"). -include("gproc.hrl"). -dbg(Mods) -> - dbg:tracer(), - [dbg:tpl(M,x) || M <- Mods], - dbg:tp(ets,'_',[{[gproc,'_'], [], [{message,{exception_trace}}]}]), - dbg:p(all,[c]). + +reg_type(n, _N) -> + #{tag => n, type => n, unique => true, scan => [], + aggr => [], rc => []}; +reg_type(p, _N) -> + #{tag => p, type => p, unique => false, scan => [], + aggr => [], rc => []}; +reg_type(c, _N) -> + #{tag => c, type => c, unique => false, scan => [], + aggr => [#{tag => a}], rc => []}; +reg_type(a, _N) -> + #{tag => a, type => a, unique => true, scan => [#{tag => c, + mode => sum}], + aggr => [], rc => []}; +reg_type(r, _N) -> + #{tag => r, type => r, unique => false, scan => [], + aggr => [], rc => [#{tag => rc}]}; +reg_type(rc, _N) -> + #{tag => rc, type => rc, unique => true, scan => [#{tag => r, + mode => count}], + aggr => [], rc => []}; +reg_type(T, N) -> + case ?GPROC_EXT_CB:reg_type(T, N) of + #{} = Type -> + Type; + undefined -> + ?THROW_GPROC_ERROR(badarg) + end. %% We want to store names and aggregated counters with the same %% structure as properties, but at the same time, we must ensure @@ -68,10 +90,10 @@ insert_reg(K, Value, Pid, Scope) -> insert_reg(K, Value, Pid, Scope, registered). insert_reg({T,_,Name} = K, Value, Pid, Scope, Event) when T==a; T==n; T==rc -> - Res = case ets:insert_new(?TAB, {{K,T}, Pid, Value}) of + Res = case ets_insert_new({{K,T}, Pid, Value}) of true -> %% Use insert_new to avoid overwriting existing entry - _ = ets:insert_new(?TAB, {{Pid,K}, []}), + _ = ets_insert_new({{Pid,K}, []}), true; false -> maybe_waiters(K, Pid, Value, T, Event) @@ -82,12 +104,12 @@ insert_reg({p,Scope,_} = K, Value, shared, Scope, _E) when Scope == g; Scope == l -> %% shared properties are unique Info = [{{K, shared}, shared, Value}, {{shared,K}, []}], - ets:insert_new(?TAB, Info); + ets_insert_new(Info); insert_reg({c,Scope,Ctr} = Key, Value, Pid, Scope, _E) when Scope==l; Scope==g -> %% Non-unique keys; store Pid in the key part K = {Key, Pid}, Kr = {Pid, Key}, - Res = ets:insert_new(?TAB, [{K, Pid, Value}, {Kr, [{initial, Value}]}]), + Res = ets_insert_new([{K, Pid, Value}, {Kr, [{initial, Value}]}]), case Res of true -> update_aggr_counter(Scope, Ctr, Value); @@ -98,7 +120,7 @@ insert_reg({c,Scope,Ctr} = Key, Value, Pid, Scope, _E) when Scope==l; Scope==g - insert_reg({r,Scope,R} = Key, Value, Pid, Scope, _E) when Scope==l; Scope==g -> K = {Key, Pid}, Kr = {Pid, Key}, - Res = ets:insert_new(?TAB, [{K, Pid, Value}, {Kr, [{initial, Value}]}]), + Res = ets_insert_new([{K, Pid, Value}, {Kr, [{initial, Value}]}]), case Res of true -> update_resource_count(Scope, R, 1); @@ -110,34 +132,160 @@ insert_reg({_,_,_} = Key, Value, Pid, _Scope, _E) when is_pid(Pid) -> %% Non-unique keys; store Pid in the key part K = {Key, Pid}, Kr = {Pid, Key}, - ets:insert_new(?TAB, [{K, Pid, Value}, {Kr, []}]). + ets_insert_new([{K, Pid, Value}, {Kr, []}]). + +insert_reg({T,Scope,N} = Key, + #{tag := T, + type := Ty, + unique := U, + scan := Scan, + aggr := Aggr, + rc := Rc}, V, Pid, As, Scope, Event) -> + K = case U of + true -> {Key, T}; + false -> {Key, Pid} + end, + V1 = if Ty =:= a; Ty =:= rc -> 0; + true -> V + end, + Kr = {Pid, Key}, + As1 = reg_attrs(Ty, Aggr, Rc), + Res = case ets_insert_new([{K, Pid, V1}, {Kr, [{initial, V1}, + {attrs, As}|As1]}]) of + true -> + update_aggr(Ty, Aggr, N, V1, Scope), + update_rc(Ty, Rc, N, 1, Scope), + true; + false -> + maybe_waiters(U, Key, Pid, V1, T, Event) + end, + maybe_scan_(Res, Scan, Scope, N, K), + Res. + +reg_attrs(c, Aggr, _) -> + [{type,c},{aggr, Aggr}]; +reg_attrs(r, _, Rc) -> + [{type,r},{rc, Rc}]; +reg_attrs(T, _, _) -> + [{type,T}]. + +update_aggr(c, [#{tag := Tag, wild := W}|T], N, Val, Scope) + when is_number(Val) -> + N1 = insert_wild(N, W, '\\_'), + ?MAY_FAIL(ets_update_counter({{Tag, Scope, N1}, Tag}, {3, Val})), + update_aggr(c, T, N, Val, Scope); +update_aggr(c, [#{tag := Tag}|T], N, Val, Scope) + when is_number(Val) -> + ?MAY_FAIL(ets_update_counter({{Tag, Scope, N}, Tag}, {3, Val})), + update_aggr(c, T, N, Val, Scope); +update_aggr(_, _, _, _, _) -> + ok. + +update_rc(r, [_|_] = Rc, N, Val, Scope) -> + update_rc(Rc, N, Val, Scope); +update_rc(_, _, _, _, _) -> + ok. + +update_rc([#{tag := Tag, wild := W}|T], N, Val, Scope) -> + update_one_rc(Tag, Scope, insert_wild(N, W, '\\_'), Val), + update_rc(T, N, Val, Scope); +update_rc([#{tag := Tag}|T], N, Val, Scope) -> + update_one_rc(Tag, Scope, N, Val), + update_rc(T, N, Val, Scope); +update_rc(_, _, _, _) -> + ok. + +update_one_rc(Tag, Scope, N, Val) -> + try ets_update_counter({{Tag, Scope, N}, Tag}, {3, Val}) of + 0 -> resource_count_zero(Scope, Tag, N); + _ -> ok + catch + _:_ -> ok + end. + +maybe_scan_(true, [_|_] = Tags, Scope, Name, K) -> + Mode = scan_mode(Tags), + Pat = scan_pattern(Tags, Mode, Scope, Name), + Sum = case Mode of + count -> ets_select_count(Pat); + _ -> + Vs = ets_select(Pat), + lists:sum(Vs) + end, + ets_update_counter(K, {3, Sum}); +maybe_scan_(_, _, _, _, _) -> + ok. + +scan_mode([#{mode := M}|T]) -> + scan_mode(T, M). + +scan_mode([#{mode := M}|T], M) -> + scan_mode(T, M); +scan_mode([#{mode := _}|_], _) -> + mixed; +scan_mode([], M) -> + M. + +scan_pattern([#{tag := Tag, wild := W, mode := M}|T], Mode, Scope, Name) -> + [{ {{{Tag,Scope,insert_wild(Name, W, '_')},'_'},'_', '_'}, + guard_pattern(M), + prod_pattern(M, Mode) } + | scan_pattern(T, Mode, Scope, Name)]; +scan_pattern([#{tag := Tag, mode := M}|T], Mode, Scope, Name) -> + [{ {{{Tag,Scope,Name},'_'},'_', '_'}, + guard_pattern(M), + prod_pattern(M, Mode) } + | scan_pattern(T, Mode, Scope, Name)]; +scan_pattern(_, _, _, _) -> + []. + +guard_pattern(sum ) -> [{is_number, {element, 3, '$_'}}]; +guard_pattern(count) -> []. + +prod_pattern(sum , _ ) -> [{element, 3, '$_'}]; +prod_pattern(count, count) -> [true]; +prod_pattern(count, _ ) -> [1]. + +insert_wild(Name, W, X) when is_tuple(Name) -> + Sz = size(Name), + insert_wild(W, Sz, Name, X). + +insert_wild([last|T], Sz, Nm, X) -> + insert_wild(T, Sz, setelement(Sz, Nm, X), X); +insert_wild([H|T], Sz, Nm, X) when is_integer(H) -> + P = if H > 0 -> H; + true -> Sz + H + end, + insert_wild(T, Sz, setelement(P, Nm, X), X); +insert_wild([], _, Nm, _) -> + Nm. maybe_scan(a, Pid, Scope, Name, K) -> Initial = scan_existing_counters(Scope, Name), - ets:insert(?TAB, {{K,a}, Pid, Initial}); + ets_insert({{K,a}, Pid, Initial}); maybe_scan(rc, Pid, Scope, Name, K) -> Initial = scan_existing_resources(Scope, Name), - ets:insert(?TAB, {{K,rc}, Pid, Initial}); + ets_insert({{K,rc}, Pid, Initial}); maybe_scan(_, _, _, _, _) -> true. insert_attr({_,Scope,_} = Key, Attrs, Pid, Scope) when Scope==l; Scope==g -> - case ets:lookup(?TAB, K = {Pid, Key}) of + case ets_lookup(K = {Pid, Key}) of [{_, Attrs0}] when is_list(Attrs) -> As = proplists:get_value(attrs, Attrs0, []), As1 = lists:foldl(fun({K1,_} = Attr, Acc) -> lists:keystore(K1, 1, Acc, Attr) end, As, Attrs), Attrs1 = lists:keystore(attrs, 1, Attrs0, {attrs, As1}), - ets:insert(?TAB, {K, Attrs1}), + ets_insert({K, Attrs1}), Attrs1; _ -> false end. get_attr(Attr, Pid, {_,_,_} = Key, Default) -> - case ets:lookup(?TAB, {Pid, Key}) of + case ets_lookup({Pid, Key}) of [{_, Opts}] when is_list(Opts) -> case lists:keyfind(attrs, 1, Opts) of {_, Attrs} -> @@ -159,14 +307,14 @@ get_attr(Attr, Pid, {_,_,_} = Key, Default) -> insert_many(T, Scope, KVL, Pid) -> Objs = mk_reg_objs(T, Scope, Pid, KVL), - case ets:insert_new(?TAB, Objs) of + case ets_insert_new(Objs) of true -> RevObjs = mk_reg_rev_objs(T, Scope, Pid, KVL), - ets:insert(?TAB, RevObjs), + ets_insert(RevObjs), _ = gproc_lib:ensure_monitor(Pid, Scope), {true, Objs}; false -> - Existing = [{Obj, ets:lookup(?TAB, K)} || {K,_,_} = Obj <- Objs], + Existing = [{Obj, ets_lookup(K)} || {K,_,_} = Obj <- Objs], case lists:any(fun({_, [{_, _, _}]}) -> true; (_) -> @@ -189,7 +337,7 @@ insert_many(T, Scope, KVL, Pid) -> insert_objects(Objs) -> lists:foreach( fun({{{Id,_} = _K, Pid, V} = Obj, Existing}) -> - ets:insert(?TAB, [Obj, {{Pid, Id}, []}]), + ets_insert([Obj, {{Pid, Id}, []}]), case Existing of [] -> ok; [{_, Waiters}] -> @@ -200,7 +348,7 @@ insert_objects(Objs) -> await({T,C,_} = Key, WPid, {_Pid, Ref} = From) -> Rev = {{WPid,Key}, []}, - case ets:lookup(?TAB, {Key,T}) of + case ets_lookup({Key,T}) of [{_, P, Value}] -> %% for symmetry, we always reply with Ref and then send a message if C == g -> @@ -215,25 +363,30 @@ await({T,C,_} = Key, WPid, {_Pid, Ref} = From) -> [{K, Waiters}] -> NewWaiters = [{WPid,Ref} | Waiters], W = {K, NewWaiters}, - ets:insert(?TAB, [W, Rev]), + ets_insert([W, Rev]), _ = gproc_lib:ensure_monitor(WPid,C), {reply, Ref, [W,Rev]}; [] -> W = {{Key,T}, [{WPid,Ref}]}, - ets:insert(?TAB, [W, Rev]), + ets_insert([W, Rev]), _ = gproc_lib:ensure_monitor(WPid,C), {reply, Ref, [W,Rev]} end. +maybe_waiters(_Unique = false, _, _, _, _, _) -> + false; +maybe_waiters(_Unique = true, K, Pid, Value, T, Event) -> + maybe_waiters(K, Pid, Value, T, Event). + maybe_waiters(_, _, _, _, []) -> false; maybe_waiters(K, Pid, Value, T, Event) -> - case ets:lookup(?TAB, {K,T}) of + case ets_lookup({K,T}) of [{_, Waiters}] when is_list(Waiters) -> Followers = [F || {_,_,follow} = F <- Waiters], - ets:insert(?TAB, [{{K,T}, Pid, Value}, - {{Pid,K}, [{monitor, Followers} - || Followers =/= []]}]), + ets_insert([{{K,T}, Pid, Value}, + {{Pid,K}, [{monitor, Followers} + || Followers =/= []]}]), notify_waiters(Waiters, K, Pid, Value, Event), true; _ -> @@ -255,18 +408,18 @@ remove_wait({T,_,_} = Key, Pid, Ref, Waiters) -> Rev = {Pid,Key}, case remove_from_waiters(Waiters, Pid, Ref) of [] -> - ets:delete(?TAB, {Key,T}), - ets:delete(?TAB, Rev), + ets_delete({Key,T}), + ets_delete(Rev), [{delete, [{Key,T}, Rev], []}]; NewWaiters -> - ets:insert(?TAB, {Key, NewWaiters}), + ets_insert({Key, NewWaiters}), case lists:keymember(Pid, 1, NewWaiters) of true -> %% should be extremely unlikely [{insert, [{Key, NewWaiters}]}]; false -> %% delete the reverse entry - ets:delete(?TAB, Rev), + ets_delete(Rev), [{insert, [{Key, NewWaiters}]}, {delete, [Rev], []}] end @@ -284,7 +437,7 @@ is_waiter(_, _, _) -> false. remove_monitors(Key, Pid, MPid) -> - case ets:lookup(?TAB, {Pid, Key}) of + case ets_lookup({Pid, Key}) of [{_, r}] -> []; [{K, Opts}] when is_list(Opts) -> @@ -295,7 +448,7 @@ remove_monitors(Key, Pid, MPid) -> Ms1 = [{P,R} || {P,R} <- Ms, P =/= MPid], NewMs = lists:keyreplace(monitors, 1, Opts, {monitors,Ms1}), - ets:insert(?TAB, {K, NewMs}), + ets_insert({K, NewMs}), [{insert, [{{Pid,Key}, NewMs}]}] end; _ -> @@ -326,7 +479,7 @@ ensure_monitor(Pid, _) when Pid == self() -> %% monitoring is ensured through a 'monitor_me' message ok; ensure_monitor(Pid, Scope) when Scope==g; Scope==l -> - case ets:insert_new(?TAB, {{Pid, Scope}}) of + case ets_insert_new({{Pid, Scope}}) of false -> ok; true -> erlang:monitor(process, Pid) end. @@ -342,7 +495,7 @@ remove_reg(Key, Pid, Event, Opts) -> [Reg, Rev]. remove_reverse_mapping(Event, Pid, Key) -> - Opts = case ets:lookup(?TAB, {Pid, Key}) of + Opts = case ets_lookup({Pid, Key}) of [] -> []; [{_, r}] -> []; [{_, L}] when is_list(L) -> @@ -355,7 +508,7 @@ remove_reverse_mapping(Event, Pid, Key, Opts) when Event==unreg; element(1,Event)==failover -> Rev = {Pid, Key}, _ = notify(Event, Key, Opts), - ets:delete(?TAB, Rev), + ets_delete(Rev), Rev. notify(Key, Opts) -> @@ -439,7 +592,7 @@ remove_many(T, Scope, L, Pid) -> end, L). unreg_opts(Key, Pid) -> - case ets:lookup(?TAB, {Pid, Key}) of + case ets_lookup({Pid, Key}) of [] -> []; [{_,r}] -> @@ -449,25 +602,25 @@ unreg_opts(Key, Pid) -> end. remove_reg_1({c,_,_} = Key, Pid) -> - remove_counter_1(Key, ets:lookup_element(?TAB, Reg = {Key,Pid}, 3), Pid), + remove_counter_1(Key, ets_lookup_element(Reg = {Key,Pid}, 3), Pid), Reg; remove_reg_1({r,_,_} = Key, Pid) -> - remove_resource_1(Key, ets:lookup_element(?TAB, Reg = {Key,Pid}, 3), Pid), + remove_resource_1(Key, ets_lookup_element(Reg = {Key,Pid}, 3), Pid), Reg; remove_reg_1({T,_,_} = Key, _Pid) when T==a; T==n; T==rc -> - ets:delete(?TAB, Reg = {Key,T}), + ets_delete(Reg = {Key,T}), Reg; remove_reg_1({_,_,_} = Key, Pid) -> - ets:delete(?TAB, Reg = {Key, Pid}), + ets_delete(Reg = {Key, Pid}), Reg. remove_counter_1({c,C,N} = Key, Val, Pid) -> - Res = ets:delete(?TAB, {Key, Pid}), + Res = ets_delete({Key, Pid}), update_aggr_counter(C, N, -Val), Res. remove_resource_1({r,C,N} = Key, _, Pid) -> - Res = ets:delete(?TAB, {Key, Pid}), + Res = ets_delete({Key, Pid}), update_resource_count(C, N, -1), Res. @@ -476,9 +629,9 @@ do_set_value({T,_,_} = Key, Value, Pid) -> T==n orelse T==a orelse T==rc -> T; true -> Pid end, - try ets:lookup_element(?TAB, {Key,K2}, 2) of + try ets_lookup_element({Key,K2}, 2) of Pid -> - ets:insert(?TAB, {{Key, K2}, Pid, Value}); + ets_insert({{Key, K2}, Pid, Value}); _ -> false catch @@ -486,48 +639,61 @@ do_set_value({T,_,_} = Key, Value, Pid) -> end. do_set_counter_value({_,C,N} = Key, Value, Pid) -> - OldVal = ets:lookup_element(?TAB, {Key, Pid}, 3), % may fail with badarg - Res = ets:insert(?TAB, {{Key, Pid}, Pid, Value}), + OldVal = ets_lookup_element({Key, Pid}, 3), % may fail with badarg + Res = ets_insert({{Key, Pid}, Pid, Value}), update_aggr_counter(C, N, Value - OldVal), Res. -update_counter({T,l,Ctr} = Key, Incr, Pid) when is_integer(Incr), T==c; - is_integer(Incr), T==n -> - Res = ets:update_counter(?TAB, {Key, Pid}, {3,Incr}), - if T==c -> - update_aggr_counter(l, Ctr, Incr); +update_counter({_,l,Ctr} = Key, #{type := T, aggr := Aggr}, Incr, Pid) + when is_integer(Incr), T =:= c; + is_integer(Incr), T =:= p; + is_integer(Incr), T =:= r; + is_integer(Incr), T =:= n -> + if is_pid(Pid); Pid =:= n; Pid =:= shared -> ok; + true -> ?THROW_GPROC_ERROR(badarg) + end, + Res = ets_update_counter({Key, Pid}, {3,Incr}), + if T =:= c -> + update_aggr(T, Aggr, Ctr, Incr, l); true -> ok end, Res; -update_counter({T,l,Ctr} = Key, {Incr, Threshold, SetValue}, Pid) - when is_integer(Incr), is_integer(Threshold), is_integer(SetValue), T==c; - is_integer(Incr), is_integer(Threshold), is_integer(SetValue), T==n -> - [Prev, New] = ets:update_counter(?TAB, {Key, Pid}, - [{3, 0}, {3, Incr, Threshold, SetValue}]), +update_counter({_,l,Ctr} = Key, #{type := T, aggr := Aggr}, + {Incr, Threshold, SetValue}, Pid) + when is_integer(Incr), is_integer(Threshold), is_integer(SetValue), T =:= c; + is_integer(Incr), is_integer(Threshold), is_integer(SetValue), T =:= r; + is_integer(Incr), is_integer(Threshold), is_integer(SetValue), T =:= p; + is_integer(Incr), is_integer(Threshold), is_integer(SetValue), T =:= n -> + if is_pid(Pid); Pid =:= n; Pid =:= shared -> ok; + true -> ?THROW_GPROC_ERROR(badarg) + end, + [Prev, New] = ets_update_counter( + {Key, Pid}, [{3, 0}, {3, Incr, Threshold, SetValue}]), if T==c -> - update_aggr_counter(l, Ctr, New - Prev); + update_aggr(T, Aggr, Ctr, New - Prev, l); true -> ok end, New; -update_counter({T,l,Ctr} = Key, Ops, Pid) when is_list(Ops), T==c; - is_list(Ops), T==r; - is_list(Ops), T==n -> - case ets:update_counter(?TAB, {Key, Pid}, - [{3, 0} | expand_ops(Ops)]) of +update_counter({_,l,Ctr} = Key, #{type := T, aggr := Aggr}, Ops, Pid) + when is_list(Ops), T==c; + is_list(Ops), T==r; + is_list(Ops), T==p; + is_list(Ops), T==n -> + case ets_update_counter({Key, Pid}, [{3, 0} | expand_ops(Ops)]) of [_] -> []; [Prev | Rest] -> [New | _] = lists:reverse(Rest), if T==c -> - update_aggr_counter(l, Ctr, New - Prev); + update_aggr(T, Aggr, Ctr, New - Prev, l); true -> ok end, Rest end; -update_counter(_, _, _) -> +update_counter(_, _, _, _) -> ?THROW_GPROC_ERROR(badarg). expand_ops([{Incr,Thr,SetV}|T]) @@ -541,52 +707,75 @@ expand_ops(_) -> ?THROW_GPROC_ERROR(badarg). update_aggr_counter(C, N, Val) -> - ?MAY_FAIL(ets:update_counter(?TAB, {{a,C,N},a}, {3, Val})). + update_aggr_counter(C, a, N, Val). + +update_aggr_counter(C, T, N, Val) -> + ?MAY_FAIL(ets_update_counter({{T,C,N},T}, {3, Val})). decrement_resource_count(C, N) -> - update_resource_count(C, N, -1). + update_resource_count(C, rc, N, -1). + +decrement_resource_count(C, T, N) -> + update_resource_count(C, T, N, -1). update_resource_count(C, N, Val) -> - try ets:update_counter(?TAB, {{rc,C,N},rc}, {3, Val}) of + update_resource_count(C, rc, N, Val). + +update_resource_count(C, T, N, Val) -> + try ets_update_counter({{T,C,N},T}, {3, Val}) of 0 -> - resource_count_zero(C, N); + resource_count_zero(T, C, N); _ -> ok catch _:_ -> ok end. -resource_count_zero(C, N) -> - case ets:lookup(?TAB, {K = {rc,C,N},rc}) of +%% resource_count_zero(C, N) -> +%% case ets:lookup(?TAB, {K = {rc,C,N},rc}) of +%% [{_, Pid, _}] -> +%% case get_attr(on_zero, Pid, K, undefined) of +%% undefined -> ok; +%% Actions -> +%% perform_on_zero(Actions, rc, C, N, Pid) +%% end; +%% _ -> ok +%% end. + +resource_count_zero(Tag, C, N) -> + case ets_lookup({K = {Tag,C,N},Tag}) of [{_, Pid, _}] -> case get_attr(on_zero, Pid, K, undefined) of undefined -> ok; Actions -> - perform_on_zero(Actions, C, N, Pid) + perform_on_zero(Actions, Tag, C, N, Pid) end; _ -> ok end. -perform_on_zero(Actions, C, N, Pid) -> +perform_on_zero(Actions, Tag, C, N, Pid) -> lists:foreach( fun(A) -> - try perform_on_zero_(A, C, N, Pid) + try perform_on_zero_(A, Tag, C, N, Pid) catch error:_ -> ignore end end, Actions). -perform_on_zero_({send, ToProc}, C, N, Pid) -> - gproc:send(ToProc, {gproc, resource_on_zero, C, N, Pid}), +perform_on_zero_({send, ToProc}, Tag, C, N, Pid) -> + gproc:send(ToProc, on_zero_msg(Tag, C, N, Pid)), ok; -perform_on_zero_({bcast, ToProc}, C, N, Pid) -> - gproc:bcast(ToProc, {gproc, resource_on_zero, C, N, Pid}), +perform_on_zero_({bcast, ToProc}, Tag, C, N, Pid) -> + gproc:bcast(ToProc, on_zero_msg(Tag, C, N, Pid)), ok; -perform_on_zero_(publish, C, N, Pid) -> +perform_on_zero_(publish, rc, C, N, Pid) -> gproc_ps:publish(C, gproc_resource_on_zero, {C, N, Pid}), ok; -perform_on_zero_({unreg_shared, T,N}, C, _, _) -> +perform_on_zero_(publish, Tag, C, N, Pid) -> + gproc_ps:publish(C, gproc_resource_on_zero, {Tag, C, N, Pid}), + ok; +perform_on_zero_({unreg_shared, T, N}, _, C, _, _) -> K = {T, C, N}, - case ets:member(?TAB, {K, shared}) of + case ets_member({K, shared}) of true -> Objs = remove_reg(K, shared, unreg), _ = if C == g -> self() ! {gproc_unreg, Objs}; @@ -596,17 +785,23 @@ perform_on_zero_({unreg_shared, T,N}, C, _, _) -> false -> ok end; -perform_on_zero_(_, _, _, _) -> +perform_on_zero_(_, _, _, _, _) -> ok. +on_zero_msg(rc, C, N, Pid) -> + {gproc, resource_on_zero, C, N, Pid}; +on_zero_msg(Tag, C, N, Pid) -> + {gproc, resource_on_zero, Tag, C, N, Pid}. + + scan_existing_counters(Ctxt, Name) -> Head = {{{c,Ctxt,Name},'_'},'_','$1'}, - Cs = ets:select(?TAB, [{Head, [], ['$1']}]), + Cs = ets_select([{Head, [], ['$1']}]), lists:sum(Cs). scan_existing_resources(Ctxt, Name) -> Head = {{{r,Ctxt,Name},'_'},'_','_'}, - ets:select_count(?TAB, [{Head, [], [true]}]). + ets_select_count([{Head, [], [true]}]). valid_opts(Type, Default) -> Opts = get_app_env(Type, Default), @@ -645,3 +840,15 @@ get_app_env(Key, Default) -> {ok, undefined} -> Default; {ok, Value} -> Value end. + +%% function wrappers for easier tracing +ets_insert(V) -> ets:insert(?TAB, V). +ets_insert_new(V) -> ets:insert_new(?TAB, V). +%%ets_update_element(K, X) -> ets:update_element(?TAB, K, X). +ets_update_counter(K, I) -> ets:update_counter(?TAB, K, I). +ets_lookup(K) -> ets:lookup(?TAB, K). +ets_member(K) -> ets:member(?TAB, K). +ets_lookup_element(K, P) -> ets:lookup_element(?TAB, K, P). +ets_select(Pat) -> ets:select(?TAB, Pat). +ets_select_count(Pat) -> ets:select_count(?TAB, Pat). +ets_delete(K) -> ets:delete(?TAB, K). diff --git a/test/gproc_dist_tests.erl b/test/gproc_dist_tests.erl index a8fde75..4bf1294 100644 --- a/test/gproc_dist_tests.erl +++ b/test/gproc_dist_tests.erl @@ -474,7 +474,7 @@ t_standby_monitor([A,B|_] = Ns) -> ?assertMatch({gproc,unreg,Ref1,Na}, got_msg(Pc, gproc)), ?assertMatch(ok, t_lookup_everywhere(Na, Ns, undefined)). -t_standby_monitor_unreg([A,B|_] = Ns) -> +t_standby_monitor_unreg([A|_] = Ns) -> Na = ?T_NAME, Pa = t_spawn(A, _Selective = true), Ref = t_call(Pa, {apply, gproc, monitor, [Na, standby]}), diff --git a/test/gproc_tests.erl b/test/gproc_tests.erl index 7399519..768546e 100644 --- a/test/gproc_tests.erl +++ b/test/gproc_tests.erl @@ -22,7 +22,7 @@ -include_lib("eunit/include/eunit.hrl"). -include_lib("stdlib/include/qlc.hrl"). --define(T_NAME, {n, l, {?MODULE, ?LINE, erlang:now()}}). +-define(T_NAME, {n, l, {?MODULE, ?LINE, os:timestamp()}}). conf_test_() -> {foreach,