Skip to content

Commit

Permalink
Merge pull request #489 from rabbitmq/segment-fadvise
Browse files Browse the repository at this point in the history
Segment fadvise
  • Loading branch information
michaelklishin authored Dec 18, 2024
2 parents 7406ed2 + ddfbec9 commit ece627a
Show file tree
Hide file tree
Showing 8 changed files with 97 additions and 26 deletions.
13 changes: 8 additions & 5 deletions src/ra_log.erl
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
fold/5,
sparse_read/2,
partial_read/3,
execute_read_plan/3,
execute_read_plan/4,
read_plan_info/1,
last_index_term/1,
set_last_index/2,
Expand Down Expand Up @@ -210,7 +210,7 @@ init(#{uid := UId,
Curr -> Curr
end,

AccessPattern = maps:get(initial_access_pattern, Conf, random),
AccessPattern = maps:get(initial_access_pattern, Conf, sequential),
{ok, Mt0} = ra_log_ets:mem_table_please(Names, UId),
% recover current range and any references to segments
% this queries the segment writer and thus blocks until any
Expand Down Expand Up @@ -558,12 +558,15 @@ partial_read(Indexes0, #?MODULE{cfg = Cfg,


-spec execute_read_plan(read_plan(), undefined | ra_flru:state(),
TransformFun :: transform_fun()) ->
TransformFun :: transform_fun(),
ra_log_reader:read_plan_options()) ->
{#{ra_index() => Command :: term()}, ra_flru:state()}.
execute_read_plan(#read_plan{dir = Dir,
read = Read,
plan = Plan}, Flru0, TransformFun) ->
ra_log_reader:exec_read_plan(Dir, Plan, Flru0, TransformFun, Read).
plan = Plan}, Flru0, TransformFun,
Options) ->
ra_log_reader:exec_read_plan(Dir, Plan, Flru0, TransformFun,
Options, Read).

-spec read_plan_info(read_plan()) -> map().
read_plan_info(#read_plan{read = Read,
Expand Down
12 changes: 11 additions & 1 deletion src/ra_log_read_plan.erl
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,22 @@


-export([execute/2,
execute/3,
info/1]).

-spec execute(ra_log:read_plan(), undefined | ra_flru:state()) ->
{#{ra:index() => Command :: term()}, ra_flru:state()}.
execute(Plan, Flru) ->
ra_log:execute_read_plan(Plan, Flru, fun ra_server:transform_for_partial_read/3).
execute(Plan, Flru, #{access_pattern => random,
file_advise => normal}).

-spec execute(ra_log:read_plan(), undefined | ra_flru:state(),
ra_log_reader:read_plan_options()) ->
{#{ra:index() => Command :: term()}, ra_flru:state()}.
execute(Plan, Flru, Options) ->
ra_log:execute_read_plan(Plan, Flru,
fun ra_server:transform_for_partial_read/3,
Options).

-spec info(ra_log:read_plan()) -> map().
info(Plan) ->
Expand Down
29 changes: 17 additions & 12 deletions src/ra_log_reader.erl
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
fold/5,
sparse_read/3,
read_plan/2,
exec_read_plan/5,
exec_read_plan/6,
fetch_term/2
]).

Expand All @@ -47,11 +47,14 @@

-opaque state() :: #?STATE{}.
-type read_plan() :: [{BaseName :: file:filename_all(), [ra:index()]}].
-type read_plan_options() :: #{access_pattern => random | sequential,
file_advise => ra_log_segment:posix_file_advise()}.


-export_type([
state/0,
read_plan/0
read_plan/0,
read_plan_options/0
]).

%% PUBLIC
Expand Down Expand Up @@ -209,28 +212,31 @@ read_plan(#?STATE{segment_refs = SegRefs}, Indexes) ->
%% TODO: add counter for number of read plans requested
segment_read_plan(SegRefs, Indexes, []).

-spec exec_read_plan(file:filename_all(), read_plan(), undefined | ra_flru:state(),
TransformFun :: fun(),
-spec exec_read_plan(file:filename_all(),
read_plan(),
undefined | ra_flru:state(),
TransformFun :: fun((ra_index(), ra_term(), binary()) -> term()),
read_plan_options(),
#{ra_index() => Command :: term()}) ->
{#{ra_index() => Command :: term()}, ra_flru:state()}.
exec_read_plan(Dir, Plan, undefined, TransformFun, Acc0) ->
exec_read_plan(Dir, Plan, undefined, TransformFun, Options, Acc0) ->
Open = ra_flru:new(1, fun({_, Seg}) -> ra_log_segment:close(Seg) end),
exec_read_plan(Dir, Plan, Open, TransformFun, Acc0);
exec_read_plan(Dir, Plan, Open0, TransformFun, Acc0)
exec_read_plan(Dir, Plan, Open, TransformFun, Options, Acc0);
exec_read_plan(Dir, Plan, Open0, TransformFun, Options, Acc0)
when is_list(Plan) ->
Fun = fun (I, T, B, Acc) ->
E = TransformFun(I, T, binary_to_term(B)),
Acc#{I => E}
end,
lists:foldl(
fun ({Idxs, BaseName}, {Acc1, Open1}) ->
{Seg, Open2} = get_segment_ext(Dir, Open1, BaseName),
{Seg, Open2} = get_segment_ext(Dir, Open1, BaseName, Options),
case ra_log_segment:read_sparse(Seg, Idxs, Fun, Acc1) of
{ok, _, Acc} ->
{Acc, Open2};
{error, modified} ->
{_, Open3} = ra_flru:evict(BaseName, Open2),
{SegNew, Open} = get_segment_ext(Dir, Open3, BaseName),
{SegNew, Open} = get_segment_ext(Dir, Open3, BaseName, Options),
{ok, _, Acc} = ra_log_segment:read_sparse(SegNew, Idxs, Fun, Acc1),
{Acc, Open}
end
Expand Down Expand Up @@ -382,15 +388,14 @@ get_segment(#cfg{directory = Dir,
end
end.

get_segment_ext(Dir, Open0, Fn) ->
get_segment_ext(Dir, Open0, Fn, Options) ->
case ra_flru:fetch(Fn, Open0) of
{ok, S, Open1} ->
{S, Open1};
error ->
AbsFn = filename:join(Dir, Fn),
case ra_log_segment:open(AbsFn,
#{mode => read,
access_pattern => random})
Options#{mode => read})
of
{ok, S} ->
{S, ra_flru:insert(Fn, S, Open0)};
Expand Down
32 changes: 29 additions & 3 deletions src/ra_log_segment.erl
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
fd :: option(file:io_device()),
index_size :: pos_integer(),
access_pattern :: sequential | random,
file_advise = normal :: posix_file_advise(),
mode = append :: read | append,
compute_checksums = true :: boolean()}).

Expand All @@ -70,14 +71,19 @@
cache :: undefined | {non_neg_integer(), non_neg_integer(), binary()}
}).

-type posix_file_advise() :: 'normal' | 'sequential' | 'random'
| 'no_reuse' | 'will_need' | 'dont_need'.

-type ra_log_segment_options() :: #{max_count => non_neg_integer(),
max_pending => non_neg_integer(),
compute_checksums => boolean(),
mode => append | read,
access_pattern => sequential | random}.
access_pattern => sequential | random,
file_advise => posix_file_advise()}.
-opaque state() :: #state{}.

-export_type([state/0,
posix_file_advise/0,
ra_log_segment_options/0]).

-spec open(Filename :: file:filename_all()) ->
Expand Down Expand Up @@ -116,6 +122,15 @@ open(Filename, Options) ->

process_file(true, Mode, Filename, Fd, Options) ->
AccessPattern = maps:get(access_pattern, Options, random),
FileAdvise = maps:get(file_advise, Options, normal),
if FileAdvise == random andalso
Mode == read ->
Offs = maps:get(max_count, Options, ?SEGMENT_MAX_ENTRIES) * ?INDEX_RECORD_SIZE_V2,
_ = file:advise(Fd, Offs, 0, random),
ok;
true ->
ok
end,
case read_header(Fd) of
{ok, Version, MaxCount} ->
MaxPending = maps:get(max_pending, Options, ?SEGMENT_MAX_PENDING),
Expand All @@ -124,7 +139,6 @@ process_file(true, Mode, Filename, Fd, Options) ->
{NumIndexRecords, DataOffset, Range, Index} =
recover_index(Fd, Version, MaxCount),
IndexOffset = ?HEADER_SIZE + NumIndexRecords * IndexRecordSize,
Mode = maps:get(mode, Options, append),
ComputeChecksums = maps:get(compute_checksums, Options, true),
{ok, #state{cfg = #cfg{version = Version,
max_count = MaxCount,
Expand All @@ -133,6 +147,7 @@ process_file(true, Mode, Filename, Fd, Options) ->
mode = Mode,
index_size = IndexSize,
access_pattern = AccessPattern,
file_advise = FileAdvise,
compute_checksums = ComputeChecksums,
fd = Fd},
data_start = ?HEADER_SIZE + IndexSize,
Expand All @@ -156,6 +171,7 @@ process_file(false, Mode, Filename, Fd, Options) ->
ComputeChecksums = maps:get(compute_checksums, Options, true),
IndexSize = MaxCount * ?INDEX_RECORD_SIZE_V2,
ok = write_header(MaxCount, Fd),
FileAdvise = maps:get(file_advise, Options, dont_need),
{ok, #state{cfg = #cfg{version = ?VERSION,
max_count = MaxCount,
max_pending = MaxPending,
Expand All @@ -164,6 +180,7 @@ process_file(false, Mode, Filename, Fd, Options) ->
index_size = IndexSize,
fd = Fd,
compute_checksums = ComputeChecksums,
file_advise = FileAdvise,
access_pattern = random},
index_write_offset = ?HEADER_SIZE,
index_offset = ?HEADER_SIZE,
Expand Down Expand Up @@ -422,10 +439,18 @@ is_same_as(#state{cfg = #cfg{filename = Fn0}}, Fn) ->
is_same_filename_all(Fn0, Fn).

-spec close(state()) -> ok.
close(#state{cfg = #cfg{fd = Fd, mode = append}} = State) ->
close(#state{cfg = #cfg{fd = Fd,
mode = append,
file_advise = FileAdvise}} = State) ->
% close needs to be defensive and idempotent so we ignore the return
% values here
_ = sync(State),
case is_full(State) of
true ->
_ = file:advise(Fd, 0, 0, FileAdvise);
false ->
ok
end,
_ = file:close(Fd),
ok;
close(#state{cfg = #cfg{fd = Fd}}) ->
Expand Down Expand Up @@ -669,3 +694,4 @@ cache_length_test() ->

-endif.


1 change: 1 addition & 0 deletions src/ra_log_wal.erl
Original file line number Diff line number Diff line change
Expand Up @@ -577,6 +577,7 @@ roll_over(#state{wal = Wal0, file_num = Num0,
Half + rand:uniform(Half);
#wal{ranges = Ranges,
filename = Filename} ->
_ = file:advise(Wal0#wal.fd, 0, 0, dont_need),
ok = close_file(Wal0#wal.fd),
MemTables = Ranges,
%% TODO: only keep base name in state
Expand Down
8 changes: 3 additions & 5 deletions src/ra_server.erl
Original file line number Diff line number Diff line change
Expand Up @@ -409,12 +409,10 @@ recover(#{cfg := #cfg{log_id = LogId,
FromScan = CommitIndex + 1,
{ToScan, _} = ra_log:last_index_term(Log0),
?DEBUG("~ts: scanning for cluster changes ~b:~b ", [LogId, FromScan, ToScan]),
{State, Log1} = ra_log:fold(FromScan, ToScan,
fun cluster_scan_fun/2,
State1, Log0),
{State, Log} = ra_log:fold(FromScan, ToScan,
fun cluster_scan_fun/2,
State1, Log0),

%% disable segment read cache by setting random access pattern
Log = ra_log:release_resources(1, random, Log1),
put_counter(Cfg, ?C_RA_SVR_METRIC_COMMIT_LATENCY, 0),
State#{log => Log,
%% reset commit latency as recovery may calculate a very old value
Expand Down
4 changes: 4 additions & 0 deletions test/ra_log_2_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -519,6 +519,10 @@ read_plan(Config) ->
ReadPlan = ra_log:partial_read(Indexes, Log4, fun (_, _, Cmd) -> Cmd end),
?assert(is_map(ra_log_read_plan:info(ReadPlan))),
{EntriesOut, _} = ra_log_read_plan:execute(ReadPlan, undefined),
%% try again with different read plan options
{EntriesOut, _} = ra_log_read_plan:execute(ReadPlan, undefined,
#{access_pattern => sequential,
file_advise => random}),
?assertEqual(length(Indexes), maps:size(EntriesOut)),
%% assert the indexes requestd were all returned in order
[] = Indexes -- [I || I <- maps:keys(EntriesOut)],
Expand Down
24 changes: 24 additions & 0 deletions test/ra_log_segment_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ all_tests() ->
[
open_close_persists_max_count,
write_then_read,
write_then_read_file_advise,
write_then_read_no_checksums,
read_cons,
write_close_open_write,
Expand Down Expand Up @@ -270,6 +271,29 @@ write_then_read(Config) ->
ok = ra_log_segment:close(SegR),
ok.

write_then_read_file_advise(Config) ->
%% there is no real way to assert on the file_advise configuration
%% this test is mostly just a means of getting some coverage
% tests items are bing persisted and index can be recovered
Dir = ?config(data_dir, Config),
Fn = filename:join(Dir, "seg1.seg"),
Data = make_data(1024),
{ok, Seg0} = ra_log_segment:open(Fn, #{compute_checksums => true,
file_advise => normal}),
{ok, Seg1} = ra_log_segment:append(Seg0, 1, 2, Data),
{ok, Seg2} = ra_log_segment:append(Seg1, 2, 2, Data),
{ok, Seg} = ra_log_segment:sync(Seg2),
ok = ra_log_segment:close(Seg),

% read two consecutive entries from index 1
{ok, SegR} = ra_log_segment:open(Fn, #{mode => read,
file_advise => random}),
[{1, 2, Data}, {2, 2, Data}] = read_sparse(SegR, [1, 2]),
[{2, 2, Data}] = read_sparse(SegR, [2]),
{1, 2} = ra_log_segment:range(SegR),
ok = ra_log_segment:close(SegR),
ok.

write_then_read_no_checksums(Config) ->
% tests items are bing persisted and index can be recovered
Dir = ?config(data_dir, Config),
Expand Down

0 comments on commit ece627a

Please sign in to comment.