Skip to content

Commit

Permalink
Merge pull request #496 from rabbitmq/extend-wal-segment-file-names
Browse files Browse the repository at this point in the history
Extend WAL and segment file names from 8 to 16 characters
  • Loading branch information
kjnilsson authored Jan 3, 2025
2 parents dce73f0 + cf8f270 commit 7bfc118
Show file tree
Hide file tree
Showing 6 changed files with 154 additions and 34 deletions.
46 changes: 30 additions & 16 deletions src/ra_lib.erl
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
zpad_filename/3,
zpad_filename_incr/1,
zpad_extract_num/1,
zpad_upgrade/3,
recursive_delete/1,
make_uid/0,
make_uid/1,
Expand Down Expand Up @@ -146,29 +147,44 @@ zpad_hex(Num) ->
lists:flatten(io_lib:format("~16.16.0B", [Num])).

zpad_filename("", Ext, Num) ->
lists:flatten(io_lib:format("~8..0B.~ts", [Num, Ext]));
lists:flatten(io_lib:format("~16..0B.~ts", [Num, Ext]));
zpad_filename(Prefix, Ext, Num) ->
lists:flatten(io_lib:format("~ts_~8..0B.~ts", [Prefix, Num, Ext])).
lists:flatten(io_lib:format("~ts_~16..0B.~ts", [Prefix, Num, Ext])).

zpad_filename_incr(Fn) ->
Base = filename:basename(Fn),
Dir = filename:dirname(Fn),
case re:run(Base, "(.*)([0-9]{8})(.*)",
case re:run(Base, "(.*)([0-9]{16})(.*)",
[{capture, all_but_first, list}]) of
{match, [Prefix, NumStr, Ext]} ->
Num = list_to_integer(NumStr),
filename:join(Dir,
lists:flatten(
io_lib:format("~ts~8..0B~ts", [Prefix, Num+1, Ext])));
NewFn = lists:flatten(io_lib:format("~ts~16..0B~ts",
[Prefix, Num + 1, Ext])),
filename:join(Dir, NewFn);
_ ->
undefined
end.

zpad_extract_num(Fn) ->
{match, [_, NumStr, _]} = re:run(Fn, "(.*)([0-9]{8})(.*)",
{match, [_, NumStr, _]} = re:run(Fn, "(.*)([0-9]{16})(.*)",
[{capture, all_but_first, list}]),
list_to_integer(NumStr).

zpad_upgrade(Dir, File, Ext) ->
B = filename:basename(File, Ext),
case length(B) of
8 ->
%% old format, convert and rename
F = "00000000" ++ B ++ Ext,
New = filename:join(Dir, F),
Old = filename:join(Dir, File),
ok = file:rename(Old, New),
F;
16 ->
File
end.


recursive_delete(Dir) ->
case is_dir(Dir) of
true ->
Expand Down Expand Up @@ -427,16 +443,14 @@ lists_shuffle(List0) ->

is_dir(Dir) ->
case prim_file:read_file_info(Dir) of
{ok, #file_info{type=directory}} ->
{ok, #file_info{type = directory}} ->
true;
_ ->
false
end.

is_file(File) ->
case prim_file:read_file_info(File) of
{ok, #file_info{type = directory}} ->
true;
{ok, #file_info{type = regular}} ->
true;
_ ->
Expand Down Expand Up @@ -516,17 +530,17 @@ make_uid_test() ->
ok.

zpad_filename_incr_test() ->
Fn = "/lib/blah/prefix_00000001.segment",
Ex = "/lib/blah/prefix_00000002.segment",
Fn = "/lib/blah/prefix_0000000000000001.segment",
Ex = "/lib/blah/prefix_0000000000000002.segment",
Ex = zpad_filename_incr(Fn),
undefined = zpad_filename_incr("0000001"),
undefined = zpad_filename_incr("000000000000001"),
ok.

zpad_filename_incr_utf8_test() ->
Fn = "/lib/🐰/prefix/00000001.segment",
Ex = "/lib/🐰/prefix/00000002.segment",
Fn = "/lib/🐰/prefix/0000000000000001.segment",
Ex = "/lib/🐰/prefix/0000000000000002.segment",
Ex = zpad_filename_incr(Fn),
undefined = zpad_filename_incr("0000001"),
undefined = zpad_filename_incr("000000000000001"),
ok.

derive_safe_string_test() ->
Expand Down
32 changes: 29 additions & 3 deletions src/ra_log_segment_writer.erl
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,9 @@ await(SegWriter) ->
ok
end.

%%%===================================================================
-define(UPGRADE_MARKER, "segment_name_upgrade_marker").

%%%==================================================================
%%% gen_server callbacks
%%%===================================================================

Expand All @@ -115,6 +117,7 @@ init([#{data_dir := DataDir,
process_flag(trap_exit, true),
CRef = ra_counters:new(SegWriterName, ?COUNTER_FIELDS),
SegmentConf = maps:get(segment_conf, Conf, #{}),
maybe_upgrade_segment_file_names(System, DataDir),
{ok, #state{system = System,
data_dir = DataDir,
counter = CRef,
Expand Down Expand Up @@ -460,8 +463,8 @@ find_segment_files(Dir) ->
segment_files(Dir) ->
case prim_file:list_dir(Dir) of
{ok, Files0} ->
Files = [filename:join(Dir, F) || F <- Files0,
filename:extension(F) == ".segment"],
Files = [filename:join(Dir, F)
|| F <- Files0, filename:extension(F) =:= ".segment"],
lists:sort(Files);
{error, enoent} ->
[]
Expand Down Expand Up @@ -529,3 +532,26 @@ maybe_wait_for_segment_writer(SegWriter, TimeRemaining)
maybe_wait_for_segment_writer(_SegWriter, _TimeRemaining) ->
ok.

maybe_upgrade_segment_file_names(System, DataDir) ->
Marker = filename:join(DataDir, ?UPGRADE_MARKER),
case ra_lib:is_file(Marker) of
false ->
?INFO("segment_writer: upgrading segment file names to "
"new format in dirctory ~ts",
[DataDir]),
[begin
Dir = filename:join(DataDir, UId),
case prim_file:list_dir(Dir) of
{ok, Files} ->
[ra_lib:zpad_upgrade(Dir, F, ".segment")
|| F <- Files, filename:extension(F) =:= ".segment"];
{error, enoent} ->
ok
end
end || {_, UId} <- ra_directory:list_registered(System)],

ok = ra_lib:write_file(Marker, <<>>);
true ->
ok
end.

9 changes: 6 additions & 3 deletions src/ra_log_wal.erl
Original file line number Diff line number Diff line change
Expand Up @@ -370,9 +370,12 @@ recover_wal(Dir, #conf{segment_writer = SegWriter,
ok = ra_log_segment_writer:await(SegWriter),
post_boot
end,
{ok, Files} = file:list_dir(Dir),
WalFiles = lists:sort([F || F <- Files,
filename:extension(F) == ".wal"]),
{ok, Files0} = file:list_dir(Dir),
Files = [begin
ra_lib:zpad_upgrade(Dir, File, ".wal")
end || File <- Files0,
filename:extension(File) == ".wal"],
WalFiles = lists:sort(Files),
AllWriters =
[begin
?DEBUG("wal: recovering ~ts, Mode ~s", [F, Mode]),
Expand Down
2 changes: 1 addition & 1 deletion test/ra_dbg_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ execute_state_machine() ->

wal_file() ->
{ok, RaDataDir} = application:get_env(ra, data_dir),
filename:join([RaDataDir, node(), "00000001.wal"]).
filename:join([RaDataDir, node(), "0000000000000001.wal"]).

report(Pid, Count) ->
receive
Expand Down
63 changes: 53 additions & 10 deletions test/ra_log_segment_writer_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ all_tests() ->
truncate_segments_with_pending_update,
truncate_segments_with_pending_overwrite,
my_segments,
upgrade_segment_name_format,
skip_entries_lower_than_snapshot_index,
skip_all_entries_lower_than_snapshot_index
].
Expand All @@ -61,7 +62,6 @@ init_per_testcase(TestCase, Config) ->
SysCfg = ra_system:default_config(),
ra_system:store(SysCfg),
_ = ra_log_ets:start_link(SysCfg),
% ra_directory:init(default),
ra_counters:init(),
UId = atom_to_binary(TestCase, utf8),
ok = ra_directory:register_name(default, UId, self(), undefined,
Expand Down Expand Up @@ -314,23 +314,23 @@ accept_mem_tables_for_down_server(Config) ->
ets:new(ra_log_closed_mem_tables, [named_table, bag, public]),
Dir = ?config(wal_dir, Config),
UId = ?config(uid, Config),
FakeUId = <<"down-uid">>,
DownUId = <<"down-uid">>,
%% only insert into dets so that the server is shown as registered
%% but not running
ok = dets:insert(maps:get(directory_rev, get_names(default)),
{down_uid, FakeUId}),
ok = ra_lib:make_dir(filename:join(Dir, FakeUId)),
{down_uid, DownUId}),
ok = ra_lib:make_dir(filename:join(Dir, DownUId)),
application:start(sasl),
{ok, TblWriterPid} = ra_log_segment_writer:start_link(#{system => default,
name => ?SEGWR,
data_dir => Dir}),
% fake up a mem segment for Self
Entries = [{1, 42, a}, {2, 42, b}, {3, 43, c}],
Mt = make_mem_table(FakeUId, Entries),
Mt = make_mem_table(DownUId, Entries),
Mt2 = make_mem_table(UId, Entries),
Tid = ra_mt:tid(Mt),
Tid2 = ra_mt:tid(Mt2),
Ranges = #{FakeUId => [{Tid, {1, 3}}],
Ranges = #{DownUId => [{Tid, {1, 3}}],
UId => [{Tid2, {1, 3}}]},
WalFile = filename:join(Dir, "00001.wal"),
ok = file:write_file(WalFile, <<"waldata">>),
Expand All @@ -348,10 +348,11 @@ accept_mem_tables_for_down_server(Config) ->
end,
%% validate fake uid entries were written
ra_log_segment_writer:await(?SEGWR),
FakeSegmentFile = filename:join([?config(wal_dir, Config),
FakeUId,
"00000001.segment"]),
{ok, FakeSeg} = ra_log_segment:open(FakeSegmentFile, #{mode => read}),
DownFn = ra_lib:zpad_filename("", "segment", 1),
ct:pal("DownFn ~s", [DownFn]),
DownSegmentFile = filename:join([?config(wal_dir, Config),
DownUId, DownFn]),
{ok, FakeSeg} = ra_log_segment:open(DownSegmentFile, #{mode => read}),
% assert Entries have been fully transferred
Entries = [{I, T, binary_to_term(B)}
|| {I, T, B} <- read_sparse(FakeSeg, [1, 2, 3])],
Expand Down Expand Up @@ -701,6 +702,48 @@ my_segments(Config) ->
proc_lib:stop(TblWriterPid),
ok.

upgrade_segment_name_format(Config) ->
Dir = ?config(wal_dir, Config),
{ok, TblWriterPid} = ra_log_segment_writer:start_link(#{name => ?SEGWR,
system => default,
data_dir => Dir}),
UId = ?config(uid, Config),
% fake up a mem segment for Self
Entries = [{1, 42, a}, {2, 42, b}, {3, 43, c}],
Mt = make_mem_table(UId, Entries),
Ranges = #{UId => [{ra_mt:tid(Mt), ra_mt:range(Mt)}]},
TidRanges = maps:get(UId, Ranges),
WalFile = make_wal(Config, "00001.wal"),
ok = ra_log_segment_writer:accept_mem_tables(?SEGWR, Ranges, WalFile),
File =
receive
{ra_log_event, {segments, TidRanges, [{{1, 3}, _Fn}]}} ->
[MyFile] = ra_log_segment_writer:my_segments(?SEGWR,UId),
MyFile
after 2000 ->
flush(),
exit(ra_log_event_timeout)
end,

%% stop segment writer and rename existing segment to old format
proc_lib:stop(TblWriterPid),
Root = filename:dirname(File),
Base = filename:basename(File),
{_, FileOld} = lists:split(8, Base),
ok = file:rename(File, filename:join(Root, FileOld)),
%% also remove upgrade marker file
ok = file:delete(filename:join(Dir, "segment_name_upgrade_marker")),
%% restart segment writer which should trigger upgrade process
{ok, Pid2} = ra_log_segment_writer:start_link(#{name => ?SEGWR,
system => default,
data_dir => Dir}),
%% validate the renamed segment has been renamed back to the new
%% 16 character format
[File] = ra_log_segment_writer:my_segments(?SEGWR, UId),

proc_lib:stop(Pid2),
ok.

skip_entries_lower_than_snapshot_index(Config) ->
Dir = ?config(wal_dir, Config),
UId = ?config(uid, Config),
Expand Down
36 changes: 35 additions & 1 deletion test/ra_log_wal_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ all() ->
all_tests() ->
[
basic_log_writes,
wal_filename_upgrade,
same_uid_different_process,
consecutive_terms_in_batch_should_result_in_two_written_events,
overwrite_in_same_batch,
Expand Down Expand Up @@ -143,7 +144,7 @@ basic_log_writes(Config) ->
ra_log_wal:force_roll_over(Pid),
receive
{'$gen_cast',
{mem_tables, #{UId := [{Tid, {12, 13}}]}, "00000001.wal"}} ->
{mem_tables, #{UId := [{Tid, {12, 13}}]}, "0000000000000001.wal"}} ->
ok
after 5000 ->
flush(),
Expand All @@ -153,6 +154,39 @@ basic_log_writes(Config) ->
meck:unload(),
ok.

wal_filename_upgrade(Config) ->
meck:new(ra_log_segment_writer, [passthrough]),
meck:expect(ra_log_segment_writer, await, fun(_) -> ok end),
Conf = ?config(wal_conf, Config),
#{dir := Dir} = Conf,
{UId, _} = WriterId = ?config(writer_id, Config),
Tid = ets:new(?FUNCTION_NAME, []),
{ok, Pid} = ra_log_wal:start_link(Conf#{segment_writer => self()}),
{ok, _} = ra_log_wal:write(Pid, WriterId, Tid, 12, 1, "value"),
ok = await_written(WriterId, 1, {12, 12}),
{ok, _} = ra_log_wal:write(Pid, WriterId, Tid, 13, 1, "value2"),
ok = await_written(WriterId, 1, {13, 13}),
proc_lib:stop(Pid),
%% rename file to old 8 character format
Fn = filename:join(Dir, "0000000000000001.wal"),
FnOld = filename:join(Dir, "00000001.wal"),
ok = file:rename(Fn, FnOld),
% debugger:start(),
% int:i(ra_log_wal),
% int:break(ra_log_wal, 373),
{ok, Pid2} = ra_log_wal:start_link(Conf#{segment_writer => self()}),
receive
{'$gen_cast',
{mem_tables, #{UId := [{_Tid, {12, 13}}]}, "0000000000000001.wal"}} ->
ok
after 5000 ->
flush(),
ct:fail("receiving mem tables timed out")
end,
proc_lib:stop(Pid2),
meck:unload(),
ok.

same_uid_different_process(Config) ->
meck:new(ra_log_segment_writer, [passthrough]),
meck:expect(ra_log_segment_writer, await, fun(_) -> ok end),
Expand Down

0 comments on commit 7bfc118

Please sign in to comment.