Skip to content

Commit

Permalink
Prioritisation support on selecting from pending changes
Browse files Browse the repository at this point in the history
Always send changes to nodes with less nodes first.  Try and populate joining nodes before shuffling, and avoid temporary excesses caused by shuffling
  • Loading branch information
martinsumner committed Oct 19, 2023
1 parent 797a2a9 commit f1aad0b
Show file tree
Hide file tree
Showing 2 changed files with 286 additions and 11 deletions.
17 changes: 17 additions & 0 deletions priv/riak_core.schema
Original file line number Diff line number Diff line change
Expand Up @@ -45,12 +45,29 @@
%% low value for the handoff_batch_threshold_count (e.g. 200), then raise the
%% handoff_timeout. Further, if using the leveled backend in Riak KV
%% investigate raising the the backend_pause.
%% Note that there is an additional cluster_transfer_limit configuration option
%% which may prevent this limit from being reached.
{mapping, "transfer_limit", "riak_core.handoff_concurrency", [
{datatype, integer},
{default, 2},
{commented, 2}
]}.

%% @doc Number of concurrent cluster-wide prompted transfers allowed.
%% Ownership handoffs may be prompted by vnode inactivity, or by the vnode
%% manager's scheduled tick activity. Should the vnodes never become inactive
%% only the manager will prompt, and this configuration option acts as a
%% cluster-wide limit on concurrent handoffs prompted by the management tick.
%% Unless the current number of ongoing, or blocked, handoffs is below this
%% limit in the cluster (regardless of the prompt for the handoff), the
%% management tick on all nodes will be blocked from prompting further
%% transfers.
{mapping, "cluster_transfer_limit", "riak_core.forced_ownership_handoff", [
{datatype, integer},
{default, 8},
{commented, 8}
]}.

%% @doc Handoff batch threshold count
%% The maximum number of objects allowed in a single handoff batch. If there
%% are issues with handoff timeouts, then the first change should be to reduce
Expand Down
280 changes: 269 additions & 11 deletions src/riak_core_vnode_manager.erl
Original file line number Diff line number Diff line change
Expand Up @@ -558,22 +558,95 @@ schedule_management_timer() ->
10000),
erlang:send_after(ManagementTick, ?MODULE, management_tick).


-type transfer() ::
{non_neg_integer(), node(), node(), [module()], awaiting | complete}.

-spec trigger_ownership_handoff(
list(transfer()),
list(module()),
riak_core_ring:riak_core_ring(),
#state{}) -> ok.
trigger_ownership_handoff(Transfers, Mods, Ring, State) ->
Limit =
app_helper:get_env(
riak_core, forced_ownership_handoff, ?DEFAULT_OWNERSHIP_TRIGGER),
Awaiting =
select_candidate_transfers(Transfers, Mods, Ring, Limit, node()),
_ = [maybe_trigger_handoff(Mod, Idx, State) || {Mod, Idx} <- Awaiting],
ok.

-spec select_candidate_transfers(
list(transfer()),
list(module()),
riak_core_ring:riak_core_ring(),
non_neg_integer(),
node()) -> list({module(), non_neg_integer()}).
select_candidate_transfers(Transfers, Mods, Ring, Limit, ThisNode) ->
IsResizing = riak_core_ring:is_resizing(Ring),
Throttle = limit_ownership_handoff(Transfers, IsResizing),
Awaiting = [{Mod, Idx} || {Idx, Node, _, CMods, S} <- Throttle,
Throttle =
limit_ownership_handoff(
Limit,
order_transfers(Transfers, Mods, Ring, IsResizing),
IsResizing),
[{Mod, Idx} || {Idx, Node, _, CMods, S} <- Throttle,
Mod <- Mods,
S =:= awaiting,
Node =:= node(),
not lists:member(Mod, CMods)],
_ = [maybe_trigger_handoff(Mod, Idx, State) || {Mod, Idx} <- Awaiting],
ok.
Node =:= ThisNode,
not lists:member(Mod, CMods)].

-spec order_transfers(
list(transfer()),
list(module()),
riak_core_ring:riak_core_ring(),
boolean()) -> list(transfer()).
order_transfers(Transfers, _Mods, _Ring, true) ->
Transfers;
order_transfers(Transfers, Mods, Ring, false) ->
Members = riak_core_ring:all_members(Ring),
Owners = riak_core_ring:all_owners(Ring),
MinVnodes = length(Owners) div length(Members),
MinWants =
lists:map(
fun(N) -> {N, MinVnodes - length(owned_partitions(Owners, N))} end,
Members
),
PriorityTransfers =
lists:foldl(
fun({N, NC}, Acc) ->
case NC > 0 of
true ->
CandidateTs = receiving_transfers(Transfers, N, Mods),
lists:sublist(CandidateTs, NC) ++ Acc;
false ->
Acc
end
end,
[],
MinWants
),
lists:keysort(1, PriorityTransfers)
++ lists:keysort(1, Transfers -- PriorityTransfers).

-spec receiving_transfers(
list(transfer()), node(), list(module())) -> list(transfer()).
receiving_transfers(Transfers, RcvNode, Mods) ->
lists:filter(
fun({_Idx, _SndNode, TrgNode, CMods, S}) ->
case {TrgNode, lists:subtract(Mods, CMods), S} of
{RcvNode, ToDoMods, awaiting} when length(ToDoMods) > 0 ->
true;
_ ->
false
end
end,
Transfers
).

limit_ownership_handoff(Transfers, IsResizing) ->
Limit = app_helper:get_env(riak_core,
forced_ownership_handoff,
?DEFAULT_OWNERSHIP_TRIGGER),
limit_ownership_handoff(Limit, Transfers, IsResizing).
%% Return the list of partitions owned by a node
-spec owned_partitions(list({integer(), node()}), node()) -> list(integer()).
owned_partitions(Owners, Node) ->
[P || {P, Owner} <- Owners, Owner =:= Node].

limit_ownership_handoff(Limit, Transfers, false) ->
lists:sublist(Transfers, Limit);
Expand Down Expand Up @@ -1058,3 +1131,188 @@ register_vnode_stats(Mod, Index, Pid) ->

unregister_vnode_stats(Mod, Index) ->
riak_core_stat:unregister_vnode_stats(Mod, Index).


%% ===================================================================
%% Unit tests
%% ===================================================================
-ifdef(TEST).

order_transfer_test_() ->
{timeout, 60, fun order_transfer_tester/0}.

order_transfer_tester() ->
N1 = node1,
N1Loc = loc1,
TargetN = 4,
RingSize = 256,
InitJoiningNodes =
[{node2, loc2},
{node3, loc3},
{node4, loc1},
{node5, loc2},
{node6, loc3},
{node7, loc1},
{node8, loc2},
{node9, loc1}],

Params = [{target_n_val, TargetN}, {target_location_n_val, 3}],
R1 =
riak_core_ring:set_node_location(
N1,
N1Loc,
riak_core_ring:fresh(RingSize, N1)),

RClaimInit =
lists:foldl(
fun({N, L}, AccR) ->
AccR0 = riak_core_ring:add_member(N1, AccR, N),
riak_core_ring:set_node_location(N, L, AccR0)
end,
R1,
InitJoiningNodes),
%% Use the entry for ?MODULE here:
CurrentRing0 =
riak_core_membership_claim:claim(
RClaimInit,
{riak_core_membership_claim, default_wants_claim},
{riak_core_claim_swapping, choose_claim_v4, Params}),

CurrentRing1 =
lists:foldl(
fun(N, R) -> riak_core_ring:set_member(node1, R, N, valid) end,
CurrentRing0,
[node2, node3, node4, node5, node6, node7, node8, node9]
),

NextRingClaimInit =
lists:foldl(
fun({N, L}, AccR) ->
AccR0 = riak_core_ring:add_member(N1, AccR, N),
riak_core_ring:set_node_location(N, L, AccR0)
end,
CurrentRing1,
[{node10, loc3}, {node11, loc4}, {node12, loc4}]),

NextRing0 =
riak_core_membership_claim:claim(
NextRingClaimInit,
{riak_core_membership_claim, default_wants_claim},
{riak_core_claim_swapping, choose_claim_v4, Params}),

CurrentRing2 =
lists:foldl(
fun(N, R) -> riak_core_ring:set_member(node1, R, N, valid) end,
NextRingClaimInit,
[node10, node11, node12]
),

Changes =
lists:map(
fun({{Idx, CN}, {Idx, FN}}) ->
{Idx, CN, FN, [], awaiting}
end,
lists:filter(
fun({Current, Future}) -> Current =/= Future end,
lists:zip(
riak_core_ring:all_owners(CurrentRing1),
riak_core_ring:all_owners(NextRing0))
)
),

NodeList =
[node1, node2, node3, node4, node5, node6, node7, node8, node9,
node10, node11, node12],

CurrentRing3 = riak_core_ring:set_pending_changes(CurrentRing2, Changes),
TransferLimit = 4,
FirstTransfers =
fetch_transfers_all_nodes(CurrentRing3, TransferLimit, NodeList),

test_destinations(
FirstTransfers, TransferLimit, [node10, node11, node12], Changes),

MinVnodes = RingSize div length(NodeList),
NearlyAllTransferLimit = (MinVnodes * 3) - 1,
NearlyAllTransfersToNew =
fetch_transfers_all_nodes(
CurrentRing3, NearlyAllTransferLimit, NodeList),

test_destinations(
NearlyAllTransfersToNew,
NearlyAllTransferLimit,
[node10, node11, node12],
Changes
),

{CurrentRing4, UpdChanges} =
lists:foldl(
fun({riak_kv, Idx}, {R, Cs}) ->
{Idx, _SndNode, RcvNode, [], awaiting} =
lists:keyfind(Idx, 1, Cs),
{riak_core_ring:transfer_node(Idx, RcvNode, R),
lists:keydelete(Idx, 1, Cs)}
end,
{CurrentRing3, Changes},
NearlyAllTransfersToNew
),
NextTransfers =
fetch_transfers_all_nodes(
riak_core_ring:set_pending_changes(CurrentRing4, UpdChanges),
4,
NodeList
),
?assertMatch(4, length(NextTransfers)),
NextTransfersDetails =
lists:keysort(
1,
lists:map(
fun({riak_kv, Idx}) -> lists:keyfind(Idx, 1, UpdChanges) end,
NextTransfers
)
),
%% If a node has sent more than 3 vnodes as part of the first wave, it will
%% now be potentially treated as a joiner - a node with outstanding wants
%% So cannot now be really deterministic in what to expect, as one joiner
%% may still have wants, as well as non-joining nodes
%% In the next 4 changes, probably all will be to non-joining nodes, but at
%% least the second, third and fourth will be
lists:foreach(
fun({_Idx, _SndNode, RcvNode, [], awaiting}) ->
?assert(not lists:member(RcvNode, [node10, node11, node12]))
end,
lists:sublist(NextTransfersDetails, 2, 3)
).


fetch_transfers_all_nodes(Ring, TransferLimit, NodeList) ->
lists:foldl(
fun(N, Acc) ->
lists:append(
Acc,
select_candidate_transfers(
riak_core_ring:pending_changes(Ring),
[riak_kv],
Ring,
TransferLimit,
N)
)
end,
[],
NodeList
).


test_destinations(Transfers, ExpectedLength, ValidDestinations, Changes) ->
?assertMatch(ExpectedLength, length(Transfers)),
lists:foreach(
fun({riak_kv, Idx}) ->
{Idx, _SndNode, RcvNode, [], awaiting} =
lists:keyfind(Idx, 1, Changes),
?assert(lists:member(RcvNode, ValidDestinations))
end,
Transfers
).

-endif.

0 comments on commit f1aad0b

Please sign in to comment.