Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

supurrt otp 18 #13

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 10 additions & 10 deletions src/rd_core.erl
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
%%%-------------------------------------------------------------------
%%% @author Martin Logan
%%% @author Martin Logan
%%% @copyright 2008 Erlware
%%% @doc
%%% Cache and distribute resources.
Expand Down Expand Up @@ -52,7 +52,7 @@

-include("../include/resource_discovery.hrl").

-define(SERVER, ?MODULE).
-define(SERVER, ?MODULE).

-record(state, {}).

Expand Down Expand Up @@ -92,7 +92,7 @@ make_callbacks(NewResources) ->
%%--------------------------------------------------------------------
-spec filter_resource_tuples_by_types([resource_type()], [resource_tuple()]) -> [resource_tuple()].
filter_resource_tuples_by_types(TargetTypes, Resources) ->
Fun =
Fun =
fun({Type, _Instance} = Resource, Acc) ->
case lists:member(Type, TargetTypes) of
true -> [Resource|Acc];
Expand Down Expand Up @@ -182,7 +182,7 @@ trade_resources() ->
-spec round_robin_get(resource_type()) -> {ok, resource()} | {error, not_found}.
round_robin_get(Type) ->
gen_server:call(?SERVER, {round_robin_get, Type}).

%%--------------------------------------------------------------------
%% @doc
%% Gets all cached resources of a given type
Expand All @@ -198,7 +198,7 @@ all_of_type_get(Type) ->
%%-----------------------------------------------------------------------
%%-spec sync_resources(node(), {LocalResourceTuples, TargetTypes, DeletedTuples}) -> ok.
sync_resources(Node, {LocalResourceTuples, TargetTypes, DeletedTuples}) ->
error_logger:info_msg("synch resources for node: ~p", [Node]),
%error_logger:info_msg("synch resources for node: ~p", [Node]),
{ok, FilteredRemotes} = gen_server:call({?SERVER, Node}, {sync_resources, {LocalResourceTuples, TargetTypes, DeletedTuples}}),
rd_store:store_resource_tuples(FilteredRemotes),
make_callbacks(FilteredRemotes),
Expand Down Expand Up @@ -256,12 +256,12 @@ init([]) ->
{ok, #state{}}.

handle_call({sync_resources, {Remotes, RemoteTargetTypes, RemoteDeletedTuples}}, _From, State) ->
error_logger:info_msg("sync_resources, got remotes: ~p deleted: ~p", [Remotes, RemoteDeletedTuples]),
%error_logger:info_msg("sync_resources, got remotes: ~p deleted: ~p", [Remotes, RemoteDeletedTuples]),
LocalResourceTuples = rd_store:get_local_resource_tuples(),
TargetTypes = rd_store:get_target_resource_types(),
FilteredRemotes = filter_resource_tuples_by_types(TargetTypes, Remotes),
FilteredLocals = filter_resource_tuples_by_types(RemoteTargetTypes, LocalResourceTuples),
error_logger:info_msg("sync_resources, storing filted remotes: ~p", [FilteredRemotes]),
%error_logger:info_msg("sync_resources, storing filted remotes: ~p", [FilteredRemotes]),
rd_store:store_resource_tuples(FilteredRemotes),
[rd_store:delete_resource_tuple(DR) || DR <- RemoteDeletedTuples],
make_callbacks(FilteredRemotes),
Expand Down Expand Up @@ -327,14 +327,14 @@ handle_cast(trade_resources, State) ->
rd_store:delete_deleted_resource_tuple(),
{noreply, State};
handle_cast({trade_resources, {ReplyTo, {Remotes, RemoteDeletedTuples}}}, State) ->
error_logger:info_msg("trade_resources, got remotes ~p: deleted: ~p", [Remotes, RemoteDeletedTuples]),
%error_logger:info_msg("trade_resources, got remotes ~p: deleted: ~p", [Remotes, RemoteDeletedTuples]),
Locals = rd_store:get_local_resource_tuples(),
LocalsDeleted = rd_store:get_deleted_resource_tuples(),
TargetTypes = rd_store:get_target_resource_types(),
FilteredRemotes = filter_resource_tuples_by_types(TargetTypes, Remotes),
error_logger:info_msg("got remotes and filtered ~p", [FilteredRemotes]),
%error_logger:info_msg("got remotes and filtered ~p", [FilteredRemotes]),
rd_store:store_resource_tuples(FilteredRemotes),
error_logger:info_msg("trade_resources, deleting ~p", [RemoteDeletedTuples]),
%error_logger:info_msg("trade_resources, deleting ~p", [RemoteDeletedTuples]),
[rd_store:delete_resource_tuple(DR) || DR <- RemoteDeletedTuples],
make_callbacks(FilteredRemotes),
reply(ReplyTo, {Locals, LocalsDeleted}),
Expand Down
29 changes: 14 additions & 15 deletions src/rd_util.erl
Original file line number Diff line number Diff line change
Expand Up @@ -30,33 +30,33 @@ do_until(F, [H|T]) ->
false -> do_until(F, T);
Return -> Return
end.

%% @doc Pings a node and returns only after the net kernal distributes the nodes.
-spec sync_ping(node(), timeout()) -> pang | pong.
sync_ping(Node, Timeout) ->
error_logger:info_msg("pinging node: ~p", [Node]),
%%error_logger:info_msg("pinging node: ~p", [Node]),
case net_adm:ping(Node) of
pong ->
Resp =
poll_until(fun() ->
length(get_remote_nodes(Node)) == length(nodes())
end,
Resp =
poll_until(fun() ->
length(get_remote_nodes(Node)) == length(nodes())
end,
10, Timeout div 10),
case Resp of
true -> pong;
false -> pang
end;
pang ->
pang ->
pang
end.

%% @doc This is a higher order function that allows for Iterations
%% number of executions of Fun until false is not returned
%% number of executions of Fun until false is not returned
%% from the Fun pausing for PauseMS after each execution.
%% <pre>
%% Variables:
%% Fun - A fun to execute per iteration.
%% Iterations - The maximum number of iterations to try getting Reply out of Fun.
%% Iterations - The maximum number of iterations to try getting Reply out of Fun.
%% PauseMS - The number of miliseconds to wait inbetween each iteration.
%% Return - What ever the fun returns.
%% </pre>
Expand All @@ -65,13 +65,13 @@ poll_until(Fun, 0, _PauseMS) ->
Fun();
poll_until(Fun, Iterations, PauseMS) ->
case Fun() of
false ->
false ->
timer:sleep(PauseMS),
case Iterations of
case Iterations of
infinity -> poll_until(Fun, Iterations, PauseMS);
Iterations -> poll_until(Fun, Iterations - 1, PauseMS)
end;
Reply ->
Reply ->
Reply
end.

Expand All @@ -82,7 +82,7 @@ get_env(Key, Default) ->
{ok, Value} -> {ok, Value};
undefined -> {ok, Default}
end.



%%%===================================================================
Expand All @@ -93,11 +93,10 @@ get_env(Key, Default) ->
get_remote_nodes(Node) ->
try
Nodes = rpc:call(Node, erlang, nodes, []),
error_logger:info_msg("contact node has ~p", [Nodes]),
%%error_logger:info_msg("contact node has ~p", [Nodes]),
Nodes
catch
_C:E ->
error_logger:info_msg("failed to connect to contact node ~p", [Node]),
throw(E)
end.

60 changes: 30 additions & 30 deletions src/resource_discovery.erl
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
%%%-------------------------------------------------------------------
%%% File : resource_discovery.erl
%%% Author : Martin J. Logan <[email protected]>
%%% @doc
%%% @doc
%%% Resource Discovery has 3 major types. They are listed here.
%%% @type resource_tuple() = {resource_type(), resource()}. The type
%%% of a resource followed by the actual resource. Local
Expand All @@ -10,7 +10,7 @@
%%% @type resource_type() = atom(). The name of a resource, how it is identified. For example
%%% a type of service that you have on the network may be identified by it's node name
%%% in which case you might have a resource type of 'my_service' of which there may be
%%% many node names representing resources such as {my_service, myservicenode@myhost}.
%%% many node names representing resources such as {my_service, myservicenode@myhost}.
%%% @type resource() = term(). Either a concrete resource or a reference to one like a pid().
%%% @end
%%%-------------------------------------------------------------------
Expand Down Expand Up @@ -38,9 +38,9 @@
]).
% Get
-export([
get_resource/1,
get_resources/1,
get_num_resource/1,
get_resource/1,
get_resources/1,
get_num_resource/1,
get_resource_types/0,
get_num_resource_types/0
]).
Expand Down Expand Up @@ -86,9 +86,9 @@ start() ->
%% @end
%%--------------------------------------------------------------------
start(_Type, _StartArgs) ->
% Create the storage for the local parameters; i.e. LocalTypes
% Create the storage for the local parameters; i.e. LocalTypes
% and TargetTypes.
random:seed(now()),
random:seed(erlang:timestamp()),
rd_store:new(),
rd_sup:start_link().

Expand All @@ -113,7 +113,7 @@ sync_resources(Timeout) ->
sync_locals(),
Self = self(),
Nodes = nodes(known),
error_logger:info_msg("synching resources to nodes: ~p", [Nodes]),
%%error_logger:info_msg("synching resources to nodes: ~p", [Nodes]),
LocalResourceTuples = rd_core:get_local_resource_tuples(),
DeletedTuples = rd_core:get_deleted_resource_tuples(),
TargetTypes = rd_core:get_target_resource_types(),
Expand Down Expand Up @@ -148,7 +148,7 @@ get_responses(Pids, Timeout) ->
after
Timeout -> {error, timeout}
end.

%%------------------------------------------------------------------------------
%% @doc Adds to the list of target types. Target types are the types
%% of resources that this instance of resource_discovery will cache following
Expand All @@ -157,23 +157,23 @@ get_responses(Pids, Timeout) ->
%% @end
%%------------------------------------------------------------------------------
-spec add_target_resource_types([resource_type()]) -> ok.
add_target_resource_types([H|_] = TargetTypes) when is_atom(H) ->
add_target_resource_types([H|_] = TargetTypes) when is_atom(H) ->
rd_core:store_target_resource_types(TargetTypes).

-spec add_target_resource_type(resource_type()) -> ok.
add_target_resource_type(TargetType) when is_atom(TargetType) ->
add_target_resource_type(TargetType) when is_atom(TargetType) ->
add_target_resource_types([TargetType]).

%%------------------------------------------------------------------------------
%% @doc Adds to the list of local resource tuples.
%% @doc Adds to the list of local resource tuples.
%% @end
%%------------------------------------------------------------------------------
-spec add_local_resource_tuples([resource_tuple()]) -> ok.
add_local_resource_tuples([{T,_}|_] = LocalResourceTuples) when is_atom(T) ->
add_local_resource_tuples([{T,_}|_] = LocalResourceTuples) when is_atom(T) ->
rd_core:store_local_resource_tuples(LocalResourceTuples).

-spec add_local_resource_tuple(resource_tuple()) -> ok.
add_local_resource_tuple({T,_} = LocalResourceTuple) when is_atom(T) ->
add_local_resource_tuple({T,_} = LocalResourceTuple) when is_atom(T) ->
add_local_resource_tuples([LocalResourceTuple]).

%%------------------------------------------------------------------------------
Expand All @@ -196,15 +196,15 @@ add_callback_module(Module) when is_atom(Module) ->
%%------------------------------------------------------------------------------
-spec get_resource(resource_type()) -> {ok, resource()} | {error, not_found}.
get_resource(Type) when is_atom(Type) ->
rd_core:round_robin_get(Type).
rd_core:round_robin_get(Type).

%%------------------------------------------------------------------------------
%% @doc Returns ALL cached resources for a particular type.
%% @end
%%------------------------------------------------------------------------------
-spec get_resources(resource_type()) -> [resource()].
get_resources(Type) ->
rd_core:all_of_type_get(Type).
rd_core:all_of_type_get(Type).

%%------------------------------------------------------------------------------
%% @doc Gets a list of the types that have resources that have been cached.
Expand All @@ -224,7 +224,7 @@ delete_resource_tuple(ResourceTuple = {_,_}) ->
rd_core:delete_resource_tuple(ResourceTuple).

%%------------------------------------------------------------------------------
%% @doc Remove a target type and all associated resources.
%% @doc Remove a target type and all associated resources.
%% @end
%%------------------------------------------------------------------------------
-spec delete_target_resource_type(resource_type()) -> true.
Expand Down Expand Up @@ -274,7 +274,7 @@ get_num_resource_types() ->
-spec get_num_resource(resource_type()) -> integer().
get_num_resource(Type) ->
rd_core:get_num_resource(Type).

%%------------------------------------------------------------------------------
%% @doc Contacts resource discoveries initial contact node.
%%
Expand Down Expand Up @@ -316,7 +316,7 @@ ping_contact_nodes(Nodes, Timeout) ->
case rd_util:sync_ping(Node, Timeout) of
pong -> true;
pang ->
error_logger:info_msg("ping contact node at ~p failed", [Node]),
error_logger:info_msg("ping contact node at ~p failed", [Node]),
false
end
end,
Expand All @@ -325,10 +325,10 @@ ping_contact_nodes(Nodes, Timeout) ->
false -> {error, bad_contact_node};
true -> ok
end.

%%------------------------------------------------------------------------------
%% @doc Execute an rpc on a cached resource. If the result of the rpc is {badrpc, reason} the
%% resource is deleted and the next resource is tried, else the result is
%% @doc Execute an rpc on a cached resource. If the result of the rpc is {badrpc, reason} the
%% resource is deleted and the next resource is tried, else the result is
%% returned to the user.
%% <pre>
%% Varibles:
Expand All @@ -339,8 +339,8 @@ ping_contact_nodes(Nodes, Timeout) ->
-spec rpc_call(resource_type(), atom(), atom(), [term()], timeout()) -> term() | {error, not_found}.
rpc_call(Type, Module, Function, Args, Timeout) ->
case get_resource(Type) of
{ok, Resource} ->
error_logger:info_msg("got a resource ~p", [Resource]),
{ok, Resource} ->
%%error_logger:info_msg("got a resource ~p", [Resource]),
case rpc:call(Resource, Module, Function, Args, Timeout) of
{badrpc, Reason} ->
error_logger:info_msg("got a badrpc ~p", [Reason]),
Expand All @@ -358,17 +358,17 @@ rpc_call(Type, Module, Function, Args) ->
rpc_call(Type, Module, Function, Args, 60000).

%%------------------------------------------------------------------------------
%% @doc Execute an rpc on a cached resource. Any bad nodes are deleted.
%% resource is deleted and the next resource is tried, else the result is
%% @doc Execute an rpc on a cached resource. Any bad nodes are deleted.
%% resource is deleted and the next resource is tried, else the result is
%% returned to the user.
%% @end
%%------------------------------------------------------------------------------
-spec rpc_multicall(resource_type(), atom(), atom(), [term()], timeout()) -> {term(), [node()]} | {error, no_resources}.
rpc_multicall(Type, Module, Function, Args, Timeout) ->
case get_resources(Type) of
[] -> {error, no_resources};
Resources ->
error_logger:info_msg("got resources ~p", [Resources]),
Resources ->
%%error_logger:info_msg("got resources ~p", [Resources]),
{Resl, BadNodes} = rpc:multicall(Resources, Module, Function, Args, Timeout),
[delete_resource_tuple({Type, BadNode}) || BadNode <- BadNodes],
{Resl, BadNodes}
Expand Down