diff --git a/src/ra_log.erl b/src/ra_log.erl index c527fe97..6891bcb8 100644 --- a/src/ra_log.erl +++ b/src/ra_log.erl @@ -592,26 +592,35 @@ last_written(#?MODULE{last_written_index_term = LWTI}) -> %% forces the last index and last written index back to a prior index -spec set_last_index(ra_index(), state()) -> {ok, state()} | {not_found, state()}. -set_last_index(Idx, #?MODULE{cfg = Cfg, - last_written_index_term = {LWIdx0, _}} = State0) -> +set_last_index(Idx, State0) -> case fetch_term(Idx, State0) of {undefined, State} -> - {not_found, State}; - {Term, State1} -> - LWIdx = min(Idx, LWIdx0), - {LWTerm, State2} = fetch_term(LWIdx, State1), - %% this should always be found but still assert just in case - %% _if_ this ends up as a genuine reversal next time we try - %% to write to the mem table it will detect this and open - %% a new one - true = LWTerm =/= undefined, - put_counter(Cfg, ?C_RA_SVR_METRIC_LAST_INDEX, Idx), - put_counter(Cfg, ?C_RA_SVR_METRIC_LAST_WRITTEN_INDEX, LWIdx), - {ok, State2#?MODULE{last_index = Idx, - last_term = Term, - last_written_index_term = {LWIdx, LWTerm}}} + case snapshot_index_term(State) of + {Idx, SnapTerm} -> + set_last_index0(Idx, SnapTerm, State); + _ -> + {not_found, State} + end; + {Term, State} -> + set_last_index0(Idx, Term, State) end. +set_last_index0(Idx, Term, + #?MODULE{cfg = Cfg, + last_written_index_term = {LWIdx0, _}} = State0) -> + LWIdx = min(Idx, LWIdx0), + {LWTerm, State1} = fetch_term(LWIdx, State0), + %% this should always be found but still assert just in case + %% _if_ this ends up as a genuine reversal next time we try + %% to write to the mem table it will detect this and open + %% a new one + true = LWTerm =/= undefined, + put_counter(Cfg, ?C_RA_SVR_METRIC_LAST_INDEX, Idx), + put_counter(Cfg, ?C_RA_SVR_METRIC_LAST_WRITTEN_INDEX, LWIdx), + {ok, State1#?MODULE{last_index = Idx, + last_term = Term, + last_written_index_term = {LWIdx, LWTerm}}}. + -spec handle_event(event_body(), state()) -> {state(), [effect()]}. handle_event({written, _Term, {FromIdx, _ToIdx}}, diff --git a/test/ra_log_memory.erl b/test/ra_log_memory.erl index b5bd6c6b..b09bb0a1 100644 --- a/test/ra_log_memory.erl +++ b/test/ra_log_memory.erl @@ -148,17 +148,28 @@ last_index_term(#state{last_index = LastIdx, -spec set_last_index(ra_index(), ra_log_memory_state()) -> {ok, ra_log_memory_state()} | {not_found, ra_log_memory_state()}. -set_last_index(Idx, #state{last_written = {LWIdx, _}} = State0) -> +set_last_index(Idx, State0) -> case fetch_term(Idx, State0) of {undefined, State} -> - {not_found, State}; - {Term, State1} when Idx < LWIdx -> + case snapshot_index_term(State) of + {Idx, SnapTerm} -> + set_last_index0(Idx, SnapTerm, State); + _ -> + {not_found, State} + end; + {Term, State} -> + set_last_index0(Idx, Term, State) + end. + +set_last_index0(Idx, Term, #state{last_written = {LWIdx, _}} = State0) -> + case Idx < LWIdx of + true -> %% need to revert last_written too - State = State1#state{last_index = Idx, + State = State0#state{last_index = Idx, last_written = {Idx, Term}}, {ok, State}; - {_, State1} -> - State = State1#state{last_index = Idx}, + false -> + State = State0#state{last_index = Idx}, {ok, State} end. diff --git a/test/ra_server_SUITE.erl b/test/ra_server_SUITE.erl index 23a4e402..63d97b79 100644 --- a/test/ra_server_SUITE.erl +++ b/test/ra_server_SUITE.erl @@ -18,6 +18,7 @@ all() -> election_timeout, follower_aer_diverged, follower_aer_term_mismatch, + follower_aer_term_mismatch_at_snapshot, follower_aer_term_mismatch_snapshot, follower_handles_append_entries_rpc, candidate_handles_append_entries_rpc, @@ -710,6 +711,62 @@ follower_aer_term_mismatch(_Config) -> last_term = 3}, Reply), ok. +follower_aer_term_mismatch_at_snapshot(_Config) -> + %% case where the last correct entry in the log is the snapshot + State0 = (base_state(3, ?FUNCTION_NAME))#{last_applied => 3, + commit_index => 3 + }, + Log0 = maps:get(log, State0), + Meta = #{index => 3, + term => 5, + cluster => #{}, + machine_version => 1}, + Data = <<"hi3">>, + {Log,_} = ra_log_memory:install_snapshot({3, 5}, {Meta, Data}, Log0), + State = maps:put(log, Log, State0), + + %% append entries from the current leader in the current term + AER = #append_entries_rpc{term = 5, + leader_id = ?N1, + prev_log_index = 3, + prev_log_term = 5, % same log term + leader_commit = 3, + entries = [ + {4, 5, usr(<<"hi4">>)}, + {5, 5, usr(<<"hi4">>)}, + {6, 5, usr(<<"hi4">>)} + ]}, + {follower, State1, _} = ra_server:handle_follower(AER, State), + ?assertMatch(#{last_applied := 3, + commit_index := 3}, State1), + {follower, State2, + [{cast, _, {_, #append_entries_reply{term = 5, + success = true, + next_index = 7}}}]} + = ra_server:handle_follower(written_evt(5, {4, 6}), State1), + + %% a new leader deposes the old one and the uncommitted entries must be + %% truncated down to the snapshot index + AER1 = #append_entries_rpc{term = 6, % higher term + leader_id = ?N2, + prev_log_index = 3, + prev_log_term = 5, % same log term + leader_commit = 3, + entries = []}, + + % term mismatch scenario follower has index 3 but for different term + % rewinds back to last_applied + 1 as next index and enters await condition + {follower, State3, + [{cast, _, {_, #append_entries_reply{term = 6, + success = true, + next_index = 4, + last_index = 3, + last_term = 5}}} | _]} + = ra_server:handle_follower(AER1, State2), + ?assertMatch(#{last_applied := 3, + commit_index := 3}, State3), + ok. + follower_aer_term_mismatch_snapshot(_Config) -> %% case when we have to revert all the way back to a snapshot State0 = (base_state(3, ?FUNCTION_NAME))#{last_applied => 3,