Skip to content

Commit

Permalink
Merge pull request #407 from rabbitmq/ra-leaderboard-fixes
Browse files Browse the repository at this point in the history
Fixes to ra_leaderboard module
  • Loading branch information
kjnilsson authored Jan 12, 2024
2 parents dd0fd38 + b18ab08 commit 092ec87
Show file tree
Hide file tree
Showing 5 changed files with 132 additions and 49 deletions.
97 changes: 57 additions & 40 deletions src/ra_server.erl
Original file line number Diff line number Diff line change
Expand Up @@ -503,19 +503,19 @@ handle_leader({PeerId, #append_entries_reply{success = false,
{leader, State, Effects};
handle_leader({command, Cmd}, #{cfg := #cfg{log_id = LogId} = Cfg} = State00) ->
ok = incr_counter(Cfg, ?C_RA_SRV_COMMANDS, 1),
case append_log_leader(Cmd, State00) of
{not_appended, Reason, State} ->
case append_log_leader(Cmd, State00, []) of
{not_appended, Reason, State, Effects0} ->
?WARN("~ts command ~W NOT appended to log. Reason ~w",
[LogId, Cmd, 10, Reason]),
Effects = case Cmd of
{_, #{from := From}, _, _} ->
[{reply, From, {error, Reason}}];
[{reply, From, {error, Reason}} | Effects0];
_ ->
[]
Effects0
end,
{leader, State, Effects};
{ok, Idx, Term, State0} ->
{State, _, Effects0} = make_pipelined_rpc_effects(State0, []),
{ok, Idx, Term, State0, Effects00} ->
{State, _, Effects0} = make_pipelined_rpc_effects(State0, Effects00),
% check if a reply is required.
% TODO: refactor - can this be made a bit nicer/more explicit?
Effects = case Cmd of
Expand All @@ -531,8 +531,8 @@ handle_leader({commands, Cmds}, #{cfg := Cfg} = State00) ->
%% TODO: refactor to use wal batch API?
Num = length(Cmds),
{State0, Effects0} =
lists:foldl(fun(C, {S0, E}) ->
{ok, I, T, S} = append_log_leader(C, S0),
lists:foldl(fun(C, {S0, E0}) ->
{ok, I, T, S, E} = append_log_leader(C, S0, E0),
case C of
{_, #{from := From}, _, after_log_append} ->
{S, [{reply, From,
Expand Down Expand Up @@ -1023,18 +1023,27 @@ handle_follower(#append_entries_rpc{term = Term,
_ ->
State1 = lists:foldl(fun pre_append_log_follower/2,
State0, Entries),
%% if the cluster has changed we need to update
%% the leaderboard
Effects1 = case maps:get(cluster, State0) =/=
maps:get(cluster, State1) of
true ->
[update_leaderboard | Effects0];
false ->
Effects0
end,
case ra_log:write(Entries, Log1) of
{ok, Log2} ->
{NextState, State, Effects} =
evaluate_commit_index_follower(State1#{log => Log2},
Effects0),
Effects1),
{NextState, State,
[{next_event, {ra_log_event, flush_cache}} | Effects]};
{error, wal_down} ->
{await_condition,
State1#{log => Log1,
condition => fun wal_down_condition/2},
Effects0};
Effects1};
{error, _} = Err ->
exit(Err)
end
Expand Down Expand Up @@ -1248,8 +1257,9 @@ handle_follower(force_member_change,
Cluster = #{Id => new_peer()},
?WARN("~ts: Forcing cluster change. New cluster ~w",
[LogId, Cluster]),
{ok, _, _, State} = append_cluster_change(Cluster, undefined, no_reply, State0),
call_for_election(pre_vote, State, [{reply, ok}]);
{ok, _, _, State, Effects} =
append_cluster_change(Cluster, undefined, no_reply, State0, []),
call_for_election(pre_vote, State, [{reply, ok} | Effects]);
handle_follower(Msg, State) ->
log_unhandled_msg(follower, Msg, State),
{follower, State, []}.
Expand Down Expand Up @@ -1639,6 +1649,8 @@ filter_follower_effects(Effects) ->
[C | Acc];
(garbage_collection = C, Acc) ->
[C | Acc];
(update_leaderboard = C, Acc) ->
[C | Acc];
({delete_snapshot, _} = C, Acc) ->
[C | Acc];
({send_msg, _, _, _Opts} = C, Acc) ->
Expand Down Expand Up @@ -2495,65 +2507,67 @@ add_reply(_, _, _, % From, Reply, Mode
{Effects, Notifys}.

append_log_leader({CmdTag, _, _, _},
State = #{cluster_change_permitted := false})
#{cluster_change_permitted := false} = State,
Effects)
when CmdTag == '$ra_join' orelse
CmdTag == '$ra_leave' ->
{not_appended, cluster_change_not_permitted, State};
append_log_leader({'$ra_join', From,
#{id := JoiningNode, voter_status := Voter0},
ReplyMode},
State = #{cluster := OldCluster}) ->
{not_appended, cluster_change_not_permitted, State, Effects};
append_log_leader({'$ra_join', From, #{id := JoiningNode,
voter_status := Voter0}, ReplyMode},
#{cluster := OldCluster} = State, Effects) ->
case ensure_promotion_target(Voter0, State) of
{error, Reason} ->
{not_appended, Reason, State};
{ok, Voter} ->
case OldCluster of
#{JoiningNode := #{voter_status := Voter}} ->
already_member(State);
already_member(State, Effects);
#{JoiningNode := Peer} ->
% Update member status.
Cluster = OldCluster#{JoiningNode => Peer#{voter_status => Voter}},
append_cluster_change(Cluster, From, ReplyMode, State);
append_cluster_change(Cluster, From, ReplyMode, State, Effects);
_ ->
% Insert new member.
Cluster = OldCluster#{JoiningNode => new_peer_with(#{voter_status => Voter})},
append_cluster_change(Cluster, From, ReplyMode, State)
append_cluster_change(Cluster, From, ReplyMode, State, Effects)
end
end;
append_log_leader({'$ra_join', From, #{id := JoiningNode} = Config, ReplyMode},
State) ->
State, Effects) ->
append_log_leader({'$ra_join', From,
#{id => JoiningNode,
voter_status => maps:with([membership, uid, target],
Config)},
ReplyMode}, State);
ReplyMode}, State, Effects);
append_log_leader({'$ra_join', From, JoiningNode, ReplyMode},
State = #{cluster := OldCluster}) ->
#{cluster := OldCluster} = State,
Effects) ->
% Legacy $ra_join, join as voter if no such member in the cluster.
case OldCluster of
#{JoiningNode := _} ->
already_member(State);
already_member(State, Effects);
_ ->
append_log_leader({'$ra_join', From, #{id => JoiningNode}, ReplyMode}, State)
append_log_leader({'$ra_join', From, #{id => JoiningNode}, ReplyMode},
State, Effects)
end;
append_log_leader({'$ra_leave', From, LeavingServer, ReplyMode},
State = #{cfg := #cfg{log_id = LogId},
cluster := OldCluster}) ->
#{cfg := #cfg{log_id = LogId},
cluster := OldCluster} = State, Effects) ->
case OldCluster of
#{LeavingServer := _} ->
Cluster = maps:remove(LeavingServer, OldCluster),
append_cluster_change(Cluster, From, ReplyMode, State);
append_cluster_change(Cluster, From, ReplyMode, State, Effects);
_ ->
?DEBUG("~ts: member ~w requested to leave but was not a member. "
"Members: ~w",
[LogId, LeavingServer, maps:keys(OldCluster)]),
% not a member - do nothing
{not_appended, not_member, State}
{not_appended, not_member, State, Effects}
end;
append_log_leader(Cmd, State = #{log := Log0, current_term := Term}) ->
append_log_leader(Cmd, State = #{log := Log0, current_term := Term}, Effects) ->
NextIdx = ra_log:next_index(Log0),
Log = ra_log:append({NextIdx, Term, Cmd}, Log0),
{ok, NextIdx, Term, State#{log => Log}}.
{ok, NextIdx, Term, State#{log => Log}, Effects}.

pre_append_log_follower({Idx, Term, Cmd} = Entry,
State = #{cluster_index_term := {Idx, CITTerm}})
Expand Down Expand Up @@ -2582,10 +2596,11 @@ pre_append_log_follower(_, State) ->
State.

append_cluster_change(Cluster, From, ReplyMode,
State = #{log := Log0,
cluster := PrevCluster,
cluster_index_term := {PrevCITIdx, PrevCITTerm},
current_term := Term}) ->
#{log := Log0,
cluster := PrevCluster,
cluster_index_term := {PrevCITIdx, PrevCITTerm},
current_term := Term} = State,
Effects0) ->
% turn join command into a generic cluster change command
% that include the new cluster configuration
Command = {'$ra_cluster_change', From, Cluster, ReplyMode},
Expand All @@ -2594,12 +2609,14 @@ append_cluster_change(Cluster, From, ReplyMode,
% TODO: is it safe to do change the cluster config with an async write?
% what happens if the write fails?
Log = ra_log:append({NextIdx, Term, Command}, Log0),
Effects = [update_leaderboard | Effects0],
{ok, NextIdx, Term,
State#{log => Log,
cluster => Cluster,
cluster_change_permitted => false,
cluster_index_term => IdxTerm,
previous_cluster => {PrevCITIdx, PrevCITTerm, PrevCluster}}}.
previous_cluster => {PrevCITIdx, PrevCITTerm, PrevCluster}},
Effects}.

mismatch_append_entries_reply(Term, CommitIndex, State0) ->
{CITerm, State} = fetch_term(CommitIndex, State0),
Expand Down Expand Up @@ -2891,10 +2908,10 @@ meta_name(#cfg{system_config = #{names := #{log_meta := Name}}}) ->
meta_name(#{names := #{log_meta := Name}}) ->
Name.

already_member(State) ->
already_member(State, Effects) ->
% already a member do nothing
% TODO: reply? If we don't reply the caller may block until timeout
{not_appended, already_member, State}.
{not_appended, already_member, State, Effects}.

%%% ====================
%%% Voter status helpers
Expand All @@ -2909,7 +2926,7 @@ ensure_promotion_target(#{membership := promotable, uid := _} = Status,
#{log := Log}) ->
%% The next index in the log is used by for a cluster change command:
%% the caller of `ensure_promotion_target/2' also calls
%% `append_cluster_change/4'. So even if a peer joins a cluster which isn't
%% `append_cluster_change/5'. So even if a peer joins a cluster which isn't
%% handling any other commands, this promotion target will be reachable.
Target = ra_log:next_index(Log),
{ok, Status#{target => Target}};
Expand Down
6 changes: 4 additions & 2 deletions src/ra_server_proc.erl
Original file line number Diff line number Diff line change
Expand Up @@ -980,10 +980,8 @@ terminate(Reason, StateName,
Parent = ra_directory:where_is_parent(Names, UId),
case Reason of
{shutdown, delete} ->
catch ra_leaderboard:clear(ClusterName),
catch ra_directory:unregister_name(Names, UId),
catch ra_log_meta:delete_sync(MetaName, UId),
catch ets:delete(ra_state, UId),
catch ra_counters:delete(Id),
Self = self(),
%% we have to terminate the child spec from the supervisor as it
Expand All @@ -1004,6 +1002,7 @@ terminate(Reason, StateName,

_ -> ok
end,
catch ra_leaderboard:clear(ClusterName),
_ = ets:delete(ra_metrics, MetricsKey),
_ = ets:delete(ra_state, Key),
ok;
Expand Down Expand Up @@ -1340,6 +1339,9 @@ handle_effect(_, garbage_collection, _EvtType, State, Actions) ->
true = erlang:garbage_collect(),
incr_counter(State#state.conf, ?C_RA_SRV_GCS, 1),
{State, Actions};
handle_effect(_, update_leaderboard, _EvtType, State, Actions) ->
ok = record_leader_change(leader_id(State), State),
{State, Actions};
handle_effect(_, {monitor, _ProcOrNode, PidOrNode}, _,
#state{monitors = Monitors} = State, Actions0) ->
{State#state{monitors = ra_monitors:add(PidOrNode, machine, Monitors)},
Expand Down
11 changes: 6 additions & 5 deletions src/ra_server_sup_sup.erl
Original file line number Diff line number Diff line change
Expand Up @@ -130,12 +130,12 @@ prepare_server_stop_rpc(System, RaName) ->
{ok, Parent, SrvSup}
end.

-spec delete_server(atom(), NodeId :: ra_server_id()) ->
-spec delete_server(atom(), ServerId :: ra_server_id()) ->
ok | {error, term()} | {badrpc, term()}.
delete_server(System, NodeId) when is_atom(System) ->
Node = ra_lib:ra_server_id_node(NodeId),
Name = ra_lib:ra_server_id_to_local_name(NodeId),
case stop_server(System, NodeId) of
delete_server(System, ServerId) when is_atom(System) ->
Node = ra_lib:ra_server_id_node(ServerId),
Name = ra_lib:ra_server_id_to_local_name(ServerId),
case stop_server(System, ServerId) of
ok ->
rpc:call(Node, ?MODULE, delete_server_rpc, [System, Name]);
{error, _} = Err -> Err
Expand All @@ -151,6 +151,7 @@ delete_server_rpc(System, RaName) ->
?INFO("Deleting server ~w and its data directory.~n",
[RaName]),
%% TODO: better handle and report errors
%% UId could be `undefined' here
UId = ra_directory:uid_of(Names, RaName),
Pid = ra_directory:where_is(Names, RaName),
ra_log_meta:delete(Meta, UId),
Expand Down
63 changes: 63 additions & 0 deletions test/coordination_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ all_tests() ->
delete_three_server_cluster_parallel,
start_cluster_majority,
start_cluster_minority,
grow_cluster,
send_local_msg,
local_log_effect,
leaderboard,
Expand Down Expand Up @@ -295,6 +296,68 @@ start_cluster_minority(Config) ->
[ok = slave:stop(S) || {_, S} <- NodeIds0],
ok.

grow_cluster(Config) ->
PrivDir = ?config(data_dir, Config),
ClusterName = ?config(cluster_name, Config),
[{_, ANode} = A,
{_, BNode} = B,
{_, CNode} = C] =
ServerIds = [{ClusterName, start_follower(N, PrivDir)} || N <- [s1,s2,s3]],
Machine = {module, ?MODULE, #{}},
{ok, [A], []} = ra:start_cluster(?SYS, ClusterName, Machine, [A]),

ok = ra:start_server(?SYS, ClusterName, B, Machine, [A]),
{ok, _, _} = ra:add_member(A, B),
{ok, _, _} = ra:process_command(A, banana),
[A, B] = rpc:call(ANode, ra_leaderboard, lookup_members, [ClusterName]),
[A, B] = rpc:call(BNode, ra_leaderboard, lookup_members, [ClusterName]),

ok = ra:start_server(?SYS, ClusterName, C, Machine, [A, B]),
{ok, _, _} = ra:add_member(A, C),
{ok, _, _} = ra:process_command(A, banana),
{ok, _, L1} = ra:members(A),
[A, B, C] = rpc:call(ANode, ra_leaderboard, lookup_members, [ClusterName]),
L1 = rpc:call(ANode, ra_leaderboard, lookup_leader, [ClusterName]),
%% TODO: handle race conditions
await_condition(
fun () ->
[A, B, C] == rpc:call(BNode, ra_leaderboard, lookup_members, [ClusterName]) andalso
L1 == rpc:call(BNode, ra_leaderboard, lookup_leader, [ClusterName])
end, 20),
await_condition(
fun () ->
[A, B, C] == rpc:call(CNode, ra_leaderboard, lookup_members, [ClusterName]) andalso
L1 == rpc:call(CNode, ra_leaderboard, lookup_leader, [ClusterName])
end, 20),

ok = ra:leave_and_delete_server(?SYS, A, A),
{ok, _, _} = ra:process_command(B, banana),
{ok, _, L2} = ra:members(B),

%% check members
[B, C] = rpc:call(BNode, ra_leaderboard, lookup_members, [ClusterName]),
[B, C] = rpc:call(CNode, ra_leaderboard, lookup_members, [ClusterName]),
undefined = rpc:call(ANode, ra_leaderboard, lookup_members, [ClusterName]),
%% check leader
L2 = rpc:call(BNode, ra_leaderboard, lookup_leader, [ClusterName]),
L2 = rpc:call(CNode, ra_leaderboard, lookup_leader, [ClusterName]),
undefined = rpc:call(ANode, ra_leaderboard, lookup_leader, [ClusterName]),


ok = ra:leave_and_delete_server(?SYS, B, B),
{ok, _, _} = ra:process_command(C, banana),
%% check members
[C] = rpc:call(CNode, ra_leaderboard, lookup_members, [ClusterName]),
undefined = rpc:call(ANode, ra_leaderboard, lookup_members, [ClusterName]),
undefined = rpc:call(BNode, ra_leaderboard, lookup_members, [ClusterName]),
%% check leader
C = rpc:call(CNode, ra_leaderboard, lookup_leader, [ClusterName]),
undefined = rpc:call(ANode, ra_leaderboard, lookup_leader, [ClusterName]),
undefined = rpc:call(BNode, ra_leaderboard, lookup_leader, [ClusterName]),

[ok = slave:stop(S) || {_, S} <- ServerIds],
ok.

send_local_msg(Config) ->
PrivDir = ?config(data_dir, Config),
ClusterName = ?config(cluster_name, Config),
Expand Down
4 changes: 2 additions & 2 deletions test/ra_server_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -876,8 +876,8 @@ append_entries_reply_success_promotes_nonvoter(_Config) ->
entries = [{4, 5, {'$ra_cluster_change', _,
#{N2 := #{voter_status := #{membership := voter,
uid := <<"uid">>}}},
_}}]}}
]} = ra_server:handle_leader(RaJoin, State2),
_}}]}} |
_]} = ra_server:handle_leader(RaJoin, State2),

Ack2 = #append_entries_reply{term = 5, success = true,
next_index = 5,
Expand Down

0 comments on commit 092ec87

Please sign in to comment.