Skip to content

Commit

Permalink
Merge pull request #324 from martinsumner/mas-updateto29
Browse files Browse the repository at this point in the history
Mas updateto29
  • Loading branch information
martinsumner authored Dec 6, 2020
2 parents b44b21f + 6922e73 commit 04f5c18
Show file tree
Hide file tree
Showing 14 changed files with 904 additions and 382 deletions.
4 changes: 2 additions & 2 deletions docs/FUTURE.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,9 @@ The store supports all the required Riak backend capabilities. A number of furt
- The potential to support Map/Reduce functions on object metadata not values, so that cache-pollution and disk i/o overheads of full object Map/Reduce can be eliminated by smarter object construction strategies that promote information designed for queries from values into metadata.


## Outstanding work
## Production Ready

There is some work required before LevelEd could be considered production ready:
Leveled has been running in a stable way in production-like environments for over 9 months. The following requirements which had previously stated as the outstanding work items for production readiness, have all been completed:

- A strategy for the supervision and restart of processes, in particular for clerks.

Expand Down
3 changes: 2 additions & 1 deletion docs/STARTUP_OPTIONS.md
Original file line number Diff line number Diff line change
Expand Up @@ -106,8 +106,9 @@ The `compaction_runs_perday` indicates for the leveled store how many times eahc

The `compaction_low_hour` and `compaction_high_hour` are the hours of the day which support the compaction window - set to 0 and 23 respectively if compaction is required to be a continuous process.

The `max_run_length` controls how many files can be compacted in a single compaction run. The scoring of files and runs is controlled through `maxrunlength_compactionpercentage` and `singlefile_compactionpercentage`.
The `max_run_length` controls how many files can be compacted in a single compaction run. The scoring of files and runs is controlled through `maxrunlength_compactionpercentage` and `singlefile_compactionpercentage`. The `singlefile_compactionpercentage` is an acceptable compaction score for a file to be eligible for compaction on its own, where as the `maxrunlength_compactionpercentage` is the score required for a run of the `max_run_length` to be considered eligible. The higher the `maxrunlength_compactionpercentage` and the lower the `singlefile_compactionpercentage` - the more likely a longer run will be chosen over a shorter run.

The `journalcompaction_scoreonein` option controls how frequently a file will be scored. If this is set to one, then each and every file will be scored each and every compaction run. If this is set to an integer greater than one ('n'), then on average any given file will only be score on one in 'n' runs. On other runs. a cached score for the file will be used. On startup all files will be scored on the first run. As journals get very large, and where frequent comapction is required due to mutating objects, this can save significant resource. In Riak, this option is controlled via `leveled.compaction_scores_perday`, with the number of `leveled.compaction_runs_perday` being divided by this to produce the `journalcompaction_scoreonein`. By default each file will only be scored once per day.

## Snapshot Timeouts

Expand Down
2 changes: 2 additions & 0 deletions include/leveled.hrl
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@
max_run_length,
singlefile_compactionperc :: float()|undefined,
maxrunlength_compactionperc :: float()|undefined,
score_onein = 1 :: pos_integer(),
snaptimeout_long :: pos_integer() | undefined}).

-record(penciller_options,
Expand All @@ -94,4 +95,5 @@
compression_method = native :: lz4|native,
singlefile_compactionperc :: float()|undefined,
maxrunlength_compactionperc :: float()|undefined,
score_onein = 1 :: pos_integer(),
reload_strategy = [] :: list()}).
10 changes: 8 additions & 2 deletions priv/leveled.schema
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,12 @@
{datatype, integer}
]}.

%% @doc The number of times per day to score an individual file for compaction
{mapping, "leveled.compaction_scores_perday", "leveled.compaction_scores_perday", [
{default, 1},
{datatype, integer}
]}.

%% @doc Compaction Low Hour
%% The hour of the day in which journal compaction can start. Use Low hour
%% of 0 and High hour of 23 to have no compaction window (i.e. always compact
Expand Down Expand Up @@ -140,10 +146,10 @@
%% @doc Target Percentage for Single File
%% What is the target score for a run of a single file, to qualify for
%% compaction. If less than this percentage would be retained after compaction
%% then it is a candidate (e.g. in default case if 50% of space would be
%% then it is a candidate (e.g. in default case if 70% of space would be
%% recovered)
{mapping, "leveled.singlefile_compactionpercentage", "leveled.singlefile_compactionpercentage", [
{default, 50.0},
{default, 30.0},
{datatype, float},
hidden
]}.
Expand Down
11 changes: 10 additions & 1 deletion src/leveled_bookie.erl
Original file line number Diff line number Diff line change
Expand Up @@ -140,8 +140,9 @@
{head_only, false},
{waste_retention_period, undefined},
{max_run_length, undefined},
{singlefile_compactionpercentage, 50.0},
{singlefile_compactionpercentage, 30.0},
{maxrunlength_compactionpercentage, 70.0},
{journalcompaction_scoreonein, 1},
{reload_strategy, []},
{max_pencillercachesize, ?MAX_PCL_CACHE_SIZE},
{ledger_preloadpagecache_level, ?SST_PAGECACHELEVEL_LOOKUP},
Expand Down Expand Up @@ -292,6 +293,11 @@
% a run of max_run_length, before that run can be a compaction
% candidate. For runs between 1 and max_run_length, a
% proportionate score is calculated
{journalcompaction_scoreonein, pos_integer()} |
% When scoring for compaction run a probability (1 in x) of whether
% any file will be scored this run. If not scored a cached score
% will be used, and the cached score is the average of the latest
% score and the rolling average of previous scores
{reload_strategy, list()} |
% The reload_strategy is exposed as an option as currently no firm
% decision has been made about how recovery from failure should
Expand Down Expand Up @@ -1757,6 +1763,8 @@ set_options(Opts) ->

MaxSSTSlots = proplists:get_value(max_sstslots, Opts),

ScoreOneIn = proplists:get_value(journalcompaction_scoreonein, Opts),

{#inker_options{root_path = JournalFP,
reload_strategy = ReloadStrategy,
max_run_length = proplists:get_value(max_run_length, Opts),
Expand All @@ -1766,6 +1774,7 @@ set_options(Opts) ->
snaptimeout_long = SnapTimeoutLong,
compression_method = CompressionMethod,
compress_on_receipt = CompressOnReceipt,
score_onein = ScoreOneIn,
cdb_options =
#cdb_options{max_size=MaxJournalSize,
max_count=MaxJournalCount,
Expand Down
59 changes: 54 additions & 5 deletions src/leveled_cdb.erl
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,9 @@
cdb_deletepending/1,
cdb_deletepending/3,
cdb_isrolling/1,
cdb_clerkcomplete/1]).
cdb_clerkcomplete/1,
cdb_getcachedscore/2,
cdb_putcachedscore/2]).

-export([finished_rolling/1,
hashtable_calc/2]).
Expand All @@ -133,6 +135,8 @@
-define(GETPOS_FACTOR, 8).
-define(MAX_OBJECT_SIZE, 1000000000).
% 1GB but really should be much smaller than this
-define(MEGA, 1000000).
-define(CACHE_LIFE, 86400).

-record(state, {hashtree,
last_position :: integer() | undefined,
Expand All @@ -152,7 +156,8 @@
timings = no_timing :: cdb_timings(),
timings_countdown = 0 :: integer(),
log_options = leveled_log:get_opts()
:: leveled_log:log_options()}).
:: leveled_log:log_options(),
cached_score :: {float(), erlang:timestamp()}|undefined}).

-record(cdb_timings, {sample_count = 0 :: integer(),
sample_cyclecount = 0 :: integer(),
Expand All @@ -164,6 +169,9 @@
-type cdb_timings() :: no_timing|#cdb_timings{}.
-type hashtable_index() :: tuple().
-type file_location() :: integer()|eof.
-type filter_fun() ::
fun((any(), binary(), integer(), any(), fun((binary()) -> any())) ->
{stop|loop, any()}).



Expand Down Expand Up @@ -369,7 +377,7 @@ cdb_deletepending(Pid) ->
cdb_deletepending(Pid, ManSQN, Inker) ->
gen_fsm:send_event(Pid, {delete_pending, ManSQN, Inker}).

-spec cdb_scan(pid(), fun(), any(), integer()|undefined) ->
-spec cdb_scan(pid(), filter_fun(), any(), integer()|undefined) ->
{integer()|eof, any()}.
%% @doc
%% cdb_scan returns {LastPosition, Acc}. Use LastPosition as StartPosiiton to
Expand Down Expand Up @@ -424,6 +432,20 @@ cdb_isrolling(Pid) ->
cdb_clerkcomplete(Pid) ->
gen_fsm:send_all_state_event(Pid, clerk_complete).

-spec cdb_getcachedscore(pid(), erlang:timestamp()) -> undefined|float().
%% @doc
%% Return the cached score for a CDB file
cdb_getcachedscore(Pid, Now) ->
gen_fsm:sync_send_all_state_event(Pid, {get_cachedscore, Now}, infinity).


-spec cdb_putcachedscore(pid(), float()) -> ok.
%% @doc
%% Return the cached score for a CDB file
cdb_putcachedscore(Pid, Score) ->
gen_fsm:sync_send_all_state_event(Pid, {put_cachedscore, Score}, infinity).



%%%============================================================================
%%% gen_server callbacks
Expand Down Expand Up @@ -829,15 +851,32 @@ handle_sync_event(cdb_filename, _From, StateName, State) ->
{reply, State#state.filename, StateName, State};
handle_sync_event(cdb_isrolling, _From, StateName, State) ->
{reply, StateName == rolling, StateName, State};
handle_sync_event({get_cachedscore, {NowMega, NowSecs, _}},
_From, StateName, State) ->
ScoreToReturn =
case State#state.cached_score of
undefined ->
undefined;
{Score, {CacheMega, CacheSecs, _}} ->
case (NowMega * ?MEGA + NowSecs) >
(CacheMega * ?MEGA + CacheSecs + ?CACHE_LIFE) of
true ->
undefined;
false ->
Score
end
end,
{reply, ScoreToReturn, StateName, State};
handle_sync_event({put_cachedscore, Score}, _From, StateName, State) ->
{reply, ok, StateName, State#state{cached_score = {Score,os:timestamp()}}};
handle_sync_event(cdb_close, _From, delete_pending, State) ->
leveled_log:log("CDB05",
[State#state.filename, delete_pending, cdb_close]),
close_pendingdelete(State#state.handle,
State#state.filename,
State#state.waste_path),
{stop, normal, ok, State};
handle_sync_event(cdb_close, _From, StateName, State) ->
leveled_log:log("CDB05", [State#state.filename, StateName, cdb_close]),
handle_sync_event(cdb_close, _From, _StateName, State) ->
file:close(State#state.handle),
{stop, normal, ok, State}.

Expand Down Expand Up @@ -2396,6 +2435,16 @@ get_keys_byposition_manykeys_test_to() ->
SampleList3 = cdb_getpositions(P2, KeyCount + 1),
?assertMatch(KeyCount, length(SampleList3)),

?assertMatch(undefined, cdb_getcachedscore(P2, os:timestamp())),
ok = cdb_putcachedscore(P2, 80.0),
?assertMatch(80.0, cdb_getcachedscore(P2, os:timestamp())),
timer:sleep(1000),
{NowMega, NowSecs, _} = Now = os:timestamp(),
?assertMatch(80.0, cdb_getcachedscore(P2, Now)),
FutureEpoch = NowMega * ?MEGA + NowSecs + ?CACHE_LIFE,
Future = {FutureEpoch div ?MEGA, FutureEpoch rem ?MEGA, 0},
?assertMatch(undefined, cdb_getcachedscore(P2, Future)),

ok = cdb_close(P2),
ok = file:delete(F2).

Expand Down
Loading

0 comments on commit 04f5c18

Please sign in to comment.