Skip to content

Commit

Permalink
Merge pull request #479 from rabbitmq/segment-writer-crash
Browse files Browse the repository at this point in the history
Handle case where mem table is deleted during segment writer flush
  • Loading branch information
kjnilsson authored Dec 2, 2024
2 parents 16d2996 + cbcdaaf commit d57b6f7
Show file tree
Hide file tree
Showing 5 changed files with 158 additions and 43 deletions.
4 changes: 2 additions & 2 deletions src/ra_log.erl
Original file line number Diff line number Diff line change
Expand Up @@ -683,8 +683,8 @@ handle_event({segments, TidRanges, NewSegs},
readers = Readers
} = State0) ->
Reader = ra_log_reader:update_segments(NewSegs, Reader0),
%% the tid ranges arrive in the order they were written so we need to
%% foldr here to process the oldest first
%% the tid ranges arrive in the reverse order they were written
%% (new -> old) so we need to foldr here to process the oldest first
Mt = lists:foldr(
fun ({Tid, Range}, Acc0) ->
{Spec, Acc} = ra_mt:record_flushed(Tid, Range, Acc0),
Expand Down
88 changes: 48 additions & 40 deletions src/ra_log_segment_writer.erl
Original file line number Diff line number Diff line change
Expand Up @@ -158,8 +158,8 @@ handle_cast({mem_tables, Ranges, WalFile}, #state{data_dir = Dir,
T1 = erlang:monotonic_time(),
_ = [begin
{_, Failures} = ra_lib:partition_parallel(
fun (E) ->
ok = flush_mem_table_ranges(E, State),
fun (TidRange) ->
ok = flush_mem_table_ranges(TidRange, State),
true
end, Tabs, infinity),
case Failures of
Expand Down Expand Up @@ -253,34 +253,46 @@ get_overview(#state{data_dir = Dir,
flush_mem_table_ranges({ServerUId, TidRanges0},
#state{system = System} = State) ->
SnapIdx = snap_idx(ServerUId),
%% TidRanges arrive here sorted new -> old.

%% truncate and limit all ranges to create a contiguous non-overlapping
%% list of tid ranges to flush to disk
TidRanges =
lists:foldl(fun ({T, Range}, []) ->
[{T, ra_range:truncate(SnapIdx, Range)}];
({T, Range0}, [{_T, {Start, _}} | _] = Acc) ->
Range1 = ra_range:truncate(SnapIdx, Range0),
case ra_range:limit(Start, Range1) of
undefined ->
Acc;
Range ->
[{T, Range} | Acc]
end
end, [], TidRanges0),
%% now TidRanges are sorted old -> new, i.e the correct order of
%% processing
TidRanges = lists:foldl(
fun ({T, Range0}, []) ->
case ra_range:truncate(SnapIdx, Range0) of
undefined ->
[];
Range ->
[{T, Range}]
end;
({T, Range0}, [{_T, {Start, _}} | _] = Acc) ->
Range1 = ra_range:truncate(SnapIdx, Range0),
case ra_range:limit(Start, Range1) of
undefined ->
Acc;
Range ->
[{T, Range} | Acc]
end
end, [], TidRanges0),

SegRefs0 = lists:append(
[flush_mem_table_range(ServerUId, TidRange, State)
|| {_Tid, Range} = TidRange <- TidRanges,
Range =/= undefined]),
lists:reverse(
%% segrefs are returned in appended order so new -> old
%% so we need to reverse them so that the final appended list
%% of segrefs is in the old -> new order
[flush_mem_table_range(ServerUId, TidRange, State)
|| TidRange <- TidRanges])),

%% compact cases where a segment was appended in a subsequent call to
%% flush_mem_table_range
%% the list of segrefs is returned in new -> old order which is the same
%% order they are kept by the ra_log
SegRefs = lists:reverse(
lists:foldl(
fun ({_, _, FILE} = New, [{_, _, FILE} | Rem]) ->
%% same
[New | Rem];
fun ({_, _, FILE}, [{_, _, FILE} | _] = Acc) ->
Acc;
(Seg, Acc) ->
[Seg | Acc]
end, [], SegRefs0)),
Expand Down Expand Up @@ -350,26 +362,14 @@ send_segments(System, ServerUId, TidRanges, SegRefs) ->
[ServerUId, "No Pid"]),
%% delete from the memtable on the non-running server's behalf
[begin
%% this looks a bit weird but
%% we dont need full init to run a delete
_ = ra_mt:delete({range, Tid, Range})
_ = catch ra_mt:delete({range, Tid, Range})
end || {Tid, Range} <- TidRanges],
ok;
Pid ->
Pid ! {ra_log_event, {segments, TidRanges, SegRefs}},
ok
end.

clean_closed_mem_tables(System, UId, Tid) ->
{ok, ClosedTbl} = ra_system:lookup_name(System, closed_mem_tbls),
Tables = ets:lookup(ClosedTbl, UId),
[begin
?DEBUG("~w: cleaning closed table for '~ts' range: ~b-~b",
[?MODULE, UId, From, To]),
%% delete the entry in the closed table lookup
true = ets:delete_object(ClosedTbl, O)
end || {_, _, From, To, T} = O <- Tables, T == Tid].

append_to_segment(UId, Tid, StartIdx0, EndIdx, Seg, State) ->
StartIdx = start_index(UId, StartIdx0),
% EndIdx + 1 because FP
Expand All @@ -379,13 +379,13 @@ append_to_segment(_, _, StartIdx, EndIdx, Seg, Closed, _State)
when StartIdx >= EndIdx ->
{Seg, Closed};
append_to_segment(UId, Tid, Idx, EndIdx, Seg0, Closed, State) ->
case ets:lookup(Tid, Idx) of
try ets:lookup(Tid, Idx) of
[] ->
StartIdx = start_index(UId, Idx),
case Idx < StartIdx of
true ->
%% TODO: we could combine registered check with this
%% although registered check could be slow
%% a snapshot must have been completed after we last checked
%% the start idx, continue flush from new start index.
append_to_segment(UId, Tid, StartIdx, EndIdx, Seg0,
Closed, State);
false ->
Expand All @@ -395,11 +395,11 @@ append_to_segment(UId, Tid, Idx, EndIdx, Seg0, Closed, State) ->
[Idx, UId, StartIdx]),
case ra_directory:is_registered_uid(State#state.system, UId) of
true ->
?ERROR("segment_writer: uid ~s is registered, exiting...",
?ERROR("segment_writer: uid ~ts is registered, exiting...",
[UId]),
exit({missing_index, UId, Idx});
false ->
?INFO("segment_writer: UId ~s was not registered, skipping",
?INFO("segment_writer: uid ~ts was not registered, skipping",
[UId]),
undefined
end
Expand All @@ -422,8 +422,6 @@ append_to_segment(UId, Tid, Idx, EndIdx, Seg0, Closed, State) ->
undefined ->
%% a successor cannot be opened - this is most likely due
%% to the directory having been deleted.
%% clear close mem tables here
_ = clean_closed_mem_tables(State#state.system, UId, Tid),
undefined;
Seg ->
ok = counters:add(State#state.counter, ?C_SEGMENTS, 1),
Expand All @@ -437,6 +435,16 @@ append_to_segment(UId, Tid, Idx, EndIdx, Seg0, Closed, State) ->
FileName = ra_log_segment:filename(Seg0),
exit({segment_writer_append_error, FileName, Posix})
end
catch _:badarg ->
?ERROR("segment_writer: uid ~s ets table deleted", [UId]),
%% ets table has been deleted.
%% this could be due to two reasons
%% 1. the ra server has been deleted.
%% 2. an old mem table has been deleted due to snapshotting
%% but the member is still active
%% skipping this table
undefined

end.

find_segment_files(Dir) ->
Expand Down
4 changes: 3 additions & 1 deletion src/ra_server_proc.erl
Original file line number Diff line number Diff line change
Expand Up @@ -1761,7 +1761,9 @@ gen_statem_safe_call(ServerId, Msg, Timeout) ->
exit:{{nodedown, _}, _} ->
{error, nodedown};
exit:{shutdown, _} ->
{error, shutdown}
{error, shutdown};
exit:{Reason, _} ->
{error, Reason}
end.

do_state_query(QueryName, #state{server_state = State}) ->
Expand Down
37 changes: 37 additions & 0 deletions test/ra_log_2_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ all_tests() ->
handle_overwrite,
handle_overwrite_append,
receive_segment,
delete_during_segment_flush,
read_one,
take_after_overwrite_and_init,
validate_sequential_fold,
Expand Down Expand Up @@ -212,6 +213,42 @@ receive_segment(Config) ->
ra_log:close(FinalLog),
ok.

delete_during_segment_flush(Config) ->
%% this test doesn't necessarily trigger the potential issue but is
%% worth keeping around
Log0 = ra_log_init(Config),
Data = crypto:strong_rand_bytes(4000),
% write a few entries
Entries = [{I, 1, Data} || I <- lists:seq(1, 100000)],

{PreWritten, _} = ra_log:last_written(Log0),
Log1 = lists:foldl(fun(E, Acc0) ->
ra_log:append(E, Acc0)
end, Log0, Entries),
Log2 = deliver_log_events_cond(
Log1, fun (L) ->
{PostWritten, _} = ra_log:last_written(L),
PostWritten >= (PreWritten + 10000)
end, 100),
Ref = monitor(process, ra_log_segment_writer),
% force wal roll over
ok = ra_log_wal:force_roll_over(ra_log_wal),

timer:sleep(0),
ra_log:delete_everything(Log2),


receive
{'DOWN', Ref, _, _, _} ->
flush(),
ct:fail("segment writer unexpectedly exited")
after 100 ->
ok
end,
flush(),

ok.

read_one(Config) ->
ra_counters:new(?FUNCTION_NAME, ?RA_COUNTER_FIELDS),
Log0 = ra_log_init(Config, #{counter => ra_counters:fetch(?FUNCTION_NAME)}),
Expand Down
68 changes: 68 additions & 0 deletions test/ra_log_segment_writer_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ all_tests() ->
accept_mem_tables_for_down_server,
accept_mem_tables_with_deleted_server,
accept_mem_tables_with_corrupt_segment,
accept_mem_tables_multiple_ranges,
accept_mem_tables_multiple_ranges_snapshot,
truncate_segments,
truncate_segments_with_pending_update,
truncate_segments_with_pending_overwrite,
Expand Down Expand Up @@ -259,6 +261,7 @@ accept_mem_tables_multi_segment(Config) ->
end,
ok = gen_server:stop(Pid),
ok.

accept_mem_tables_multi_segment_overwrite(Config) ->
Dir = ?config(wal_dir, Config),
UId = ?config(uid, Config),
Expand Down Expand Up @@ -462,6 +465,71 @@ accept_mem_tables_with_corrupt_segment(Config) ->
ok = gen_server:stop(TblWriterPid),
ok.

accept_mem_tables_multiple_ranges(Config)->
Dir = ?config(wal_dir, Config),
SegConf = #{max_count => 16},
{ok, TblWriterPid} = ra_log_segment_writer:start_link(#{system => default,
name => ?SEGWR,
data_dir => Dir,
segment_conf => SegConf}),
UId = ?config(uid, Config),
Entries = [{N, 42, N} || N <- lists:seq(1, 32)],
Mt = make_mem_table(UId, Entries),
Entries2 = [{N, 42, N} || N <- lists:seq(33, 64)],
Mt2 = make_mem_table(UId, Entries2),
Ranges = #{UId => [
{ra_mt:tid(Mt2), ra_mt:range(Mt2)},
{ra_mt:tid(Mt), ra_mt:range(Mt)}
]},
ok = ra_log_segment_writer:accept_mem_tables(?SEGWR, Ranges,
make_wal(Config, "w1.wal")),
receive
{ra_log_event, {segments, _TidRanges, SegRefs}} ->
?assertMatch([
{49, 64, _},
{33, 48, _},
{17, 32, _},
{1, 16, _}
], SegRefs),
ok
after 3000 ->
flush(),
throw(ra_log_event_timeout)
end,
ok = gen_server:stop(TblWriterPid),
ok.

accept_mem_tables_multiple_ranges_snapshot(Config)->
Dir = ?config(wal_dir, Config),
SegConf = #{max_count => 16},
{ok, TblWriterPid} = ra_log_segment_writer:start_link(#{system => default,
name => ?SEGWR,
data_dir => Dir,
segment_conf => SegConf}),
UId = ?config(uid, Config),
Entries = [{N, 42, N} || N <- lists:seq(1, 32)],
Mt = make_mem_table(UId, Entries),
Entries2 = [{N, 42, N} || N <- lists:seq(33, 64)],
Mt2 = make_mem_table(UId, Entries2),
Ranges = #{UId => [
{ra_mt:tid(Mt2), ra_mt:range(Mt2)},
{ra_mt:tid(Mt), ra_mt:range(Mt)}
]},
ets:insert(ra_log_snapshot_state, {UId, 64}),
ok = ra_log_segment_writer:accept_mem_tables(?SEGWR, Ranges,
make_wal(Config, "w1.wal")),

receive
{ra_log_event, {segments, _TidRanges, SegRefs}} ->
?assertMatch([], SegRefs),
ok
after 3000 ->
flush(),
throw(ra_log_event_timeout)
end,
ok = gen_server:stop(TblWriterPid),
ok.

truncate_segments(Config) ->
Dir = ?config(wal_dir, Config),
SegConf = #{max_count => 12},
Expand Down

0 comments on commit d57b6f7

Please sign in to comment.