Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make current_peers empty when disabled (#10) #1881

Open
wants to merge 1 commit into
base: develop-3.0
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 27 additions & 25 deletions src/riak_kv_replrtq_peer.erl
Original file line number Diff line number Diff line change
Expand Up @@ -178,46 +178,48 @@ do_discovery(QueueName, PeerInfo, Type) ->
{SnkWorkerCount, PerPeerLimit} = riak_kv_replrtq_snk:get_worker_counts(),
StartDelayMS = riak_kv_replrtq_snk:starting_delay(),
CurrentPeers =
case Type of
count_change ->
case {Type, riak_kv_replrtq_snk:current_peers(QueueName)} of
{count_change, _} ->
%% Ignore current peers, to update worker counts, so all
%% discovered peers will have their worker counts updated as
%% the list returned from discover_peers/2 will never match
%% the atom count_change.
count_change;
_ ->
{_, PeerList} when is_list(PeerList) ->
lists:usort(
lists:map(
fun({ID, _D, H, P, Prot}) ->
{ID, StartDelayMS, H, P, Prot}
end,
riak_kv_replrtq_snk:current_peers(QueueName)))
PeerList));
{_, SnkResponse} ->
lager:info(
"Peer Discovery disabled as snk status ~w", [SnkResponse]),
SnkResponse
end,
case discover_peers(PeerInfo, StartDelayMS) of
CurrentPeers ->
case {discover_peers(PeerInfo, StartDelayMS), CurrentPeers} of
{CurrentPeers, CurrentPeers} ->
lager:info("Type=~w discovery led to no change", [Type]),
false;
[] ->
{[], CurrentPeers} when is_list(CurrentPeers) ->
lager:info("Type=~w discovery led to reset of peers", [Type]),
riak_kv_replrtq_snk:add_snkqueue(QueueName,
PeerInfo,
SnkWorkerCount,
PerPeerLimit),
riak_kv_replrtq_snk:add_snkqueue(
QueueName, PeerInfo, SnkWorkerCount, PerPeerLimit),
false;
DiscoveredPeers ->
case CurrentPeers of
count_change ->
ok;
CurrentPeers when is_list(CurrentPeers) ->
lager:info(
"Type=~w discovery old_peers=~w new_peers=~w",
[Type, length(CurrentPeers), length(DiscoveredPeers)])
end,
riak_kv_replrtq_snk:add_snkqueue(QueueName,
DiscoveredPeers,
SnkWorkerCount,
PerPeerLimit),
true
{DiscoveredPeers, count_change} ->
riak_kv_replrtq_snk:add_snkqueue(
QueueName, DiscoveredPeers, SnkWorkerCount, PerPeerLimit),
true;
{DiscoveredPeers, _} when is_list(CurrentPeers) ->
lager:info(
"Type=~w discovery old_peers=~w new_peers=~w",
[Type, length(CurrentPeers), length(DiscoveredPeers)]),
riak_kv_replrtq_snk:add_snkqueue(
QueueName, DiscoveredPeers, SnkWorkerCount, PerPeerLimit),
true;
{_, _} ->
lager:info("Type=~w discovery led to no change", [Type]),
false
end.

-spec discover_peers(list(riak_kv_replrtq_snk:peer_info()), pos_integer())
Expand Down
23 changes: 17 additions & 6 deletions src/riak_kv_replrtq_snk.erl
Original file line number Diff line number Diff line change
Expand Up @@ -218,7 +218,7 @@ add_snkqueue(QueueName, Peers, WorkerCount, PerPeerLimit)
%% Return the current list of peers being used by this snk host, and the
%% settings currently being used for this host and he workers per peer.
%% Returns undefined if there are currently no peers defined.
-spec current_peers(queue_name()) -> list(peer_info())|undefined.
-spec current_peers(queue_name()) -> list(peer_info())|suspended|disabled.
current_peers(QueueName) ->
gen_server:call(?MODULE, {current_peers, QueueName}).

Expand All @@ -243,6 +243,7 @@ set_workercount(QueueName, WorkerCount, PerPeerLimit)
gen_server:call(?MODULE,
{worker_count, QueueName, WorkerCount, PerPeerLimit}).


%%%============================================================================
%%% gen_server callbacks
%%%============================================================================
Expand Down Expand Up @@ -356,11 +357,21 @@ handle_call({worker_count, QueueN, WorkerCount, PerPeerLimit}, _From, State) ->
{reply, ok, State#state{work = W0, iteration = Iteration}}
end;
handle_call({current_peers, QueueN}, _From, State) ->
case lists:keyfind(QueueN, 1, State#state.work) of
false ->
{reply, undefined, State};
{QueueN, _I, SinkWork} ->
{reply, SinkWork#sink_work.peer_list, State}
case State#state.enabled of
true ->
case lists:keyfind(QueueN, 1, State#state.work) of
false ->
{reply, [], State};
{QueueN, _I, SinkWork} ->
case SinkWork#sink_work.suspended of
false ->
{reply, SinkWork#sink_work.peer_list, State};
_ ->
{reply, suspended, State}
end
end;
_ ->
{reply, disabled, State}
end.


Expand Down
Loading