Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Mas d31 i416 #418

Merged
merged 2 commits into from
Nov 7, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
114 changes: 86 additions & 28 deletions include/leveled.hrl
Original file line number Diff line number Diff line change
@@ -1,34 +1,79 @@
% File paths
%%%============================================================================
%%% File paths
%%%============================================================================
-define(JOURNAL_FP, "journal").
-define(LEDGER_FP, "ledger").
%%%============================================================================

%%%============================================================================
%%% Configurable startup defaults
%%%============================================================================
-define(CACHE_SIZE, 2500).
-define(MAX_CACHE_MULTTIPLE, 2).
-define(MIN_CACHE_SIZE, 100).
-define(MIN_PCL_CACHE_SIZE, 400).
-define(MAX_PCL_CACHE_SIZE, 28000).
% This is less than actual max - but COIN_SIDECOUNT
-define(ABSOLUTEMAX_JOURNALSIZE, 4000000000).
-define(COMPRESSION_METHOD, lz4).
-define(COMPRESSION_POINT, on_receipt).
-define(COMPRESSION_LEVEL, 1).
-define(LOG_LEVEL, info).
-define(DEFAULT_DBID, 65536).
-define(OPEN_LASTMOD_RANGE, {0, infinity}).
-define(SNAPTIMEOUT_SHORT, 900). % 15 minutes
-define(SNAPTIMEOUT_LONG, 43200). % 12 hours
-define(SST_PAGECACHELEVEL_NOLOOKUP, 1).
-define(SST_PAGECACHELEVEL_LOOKUP, 4).
-define(DEFAULT_STATS_PERC, 10).
-define(DEFAULT_SYNC_STRATEGY, none).
%%%============================================================================

%%%============================================================================
%%% Non-configurable startup defaults
%%%============================================================================
-define(MAX_SSTSLOTS, 256).
-define(LOADING_PAUSE, 1000).
-define(LOADING_BATCH, 1000).
-define(CACHE_SIZE_JITTER, 25).
-define(JOURNAL_SIZE_JITTER, 20).
-define(LONG_RUNNING, 1000000).
% An individual task taking > 1s gets a specific log
-define(MAX_KEYCHECK_FREQUENCY, 100).
-define(MIN_KEYCHECK_FREQUENCY, 1).
-define(MAX_LEVELS, 8).
%% Should equal the length of the LEVEL_SCALEFACTOR
-define(CACHE_TYPE, skpl).
%%%============================================================================

%% Tag to be used on standard Riak KV objects
%%%============================================================================
%%% Tags
%%%============================================================================
-define(RIAK_TAG, o_rkv).
%% Tag to be used on K/V objects for non-Riak purposes
%% Tag to be used on standard Riak KV objects
-define(STD_TAG, o).
%% Tag used for secondary index keys
%% Tag to be used on K/V objects for non-Riak purposes
-define(IDX_TAG, i).
%% Tag used for head-only objects
%% Tag used for secondary index keys
-define(HEAD_TAG, h).
%% Tag used for head-only objects

%% Inker key type used for 'normal' objects
-define(INKT_STND, stnd).
%% Inker key type used for 'batch' objects
-define(INKT_STND, stnd).
%% Inker key type used for 'normal' objects
-define(INKT_MPUT, mput).
%% Inker key type used for objects which contain no value, only key changes
%% This is used currently for objects formed under a 'retain' strategy on Inker
%% compaction
%% Inker key type used for 'batch' objects
-define(INKT_KEYD, keyd).
%% Inker key type used for tombstones
%% Inker key type used for objects which contain no value, only key changes
%% This is used currently for objects formed under a 'retain' strategy
%% on Inker compaction
-define(INKT_TOMB, tomb).

-define(CACHE_TYPE, skpl).

-define(MAX_LEVELS, 8).
%% Should equal the length of the LEVEL_SCALEFACTOR
%% Inker key type used for tombstones
%%%============================================================================


%%%============================================================================
%%% Shared records
%%%============================================================================
-record(level,
{level :: integer(),
is_basement = false :: boolean(),
Expand All @@ -47,19 +92,26 @@
file_path :: string() | undefined,
waste_path :: string() | undefined,
binary_mode = false :: boolean(),
sync_strategy = sync,
% Default set by bookie to be `true`
% `false` set here due to legacy of unit tests
% using non-binary keys
sync_strategy = ?DEFAULT_SYNC_STRATEGY,
log_options = leveled_log:get_opts()
:: leveled_log:log_options(),
monitor = {no_monitor, 0} :: leveled_monitor:monitor()}).
monitor = {no_monitor, 0}
:: leveled_monitor:monitor()}).

-record(sst_options,
{press_method = native
{press_method = ?COMPRESSION_METHOD
:: leveled_sst:press_method(),
press_level = ?COMPRESSION_LEVEL :: non_neg_integer(),
log_options = leveled_log:get_opts()
:: leveled_log:log_options(),
max_sstslots = 256 :: pos_integer(),
pagecache_level = 1 :: pos_integer(),
monitor = {no_monitor, 0} :: leveled_monitor:monitor()}).
max_sstslots = ?MAX_SSTSLOTS :: pos_integer(),
pagecache_level = ?SST_PAGECACHELEVEL_NOLOOKUP
:: pos_integer(),
monitor = {no_monitor, 0}
:: leveled_monitor:monitor()}).

-record(inker_options,
{cdb_max_size :: integer() | undefined,
Expand All @@ -70,14 +122,16 @@
source_inker :: pid() | undefined,
reload_strategy = [] :: list(),
waste_retention_period :: integer() | undefined,
compression_method = native :: lz4|native,
compression_method = ?COMPRESSION_METHOD
:: lz4|native|none,
compress_on_receipt = false :: boolean(),
max_run_length,
singlefile_compactionperc :: float()|undefined,
maxrunlength_compactionperc :: float()|undefined,
score_onein = 1 :: pos_integer(),
snaptimeout_long :: pos_integer() | undefined,
monitor = {no_monitor, 0} :: leveled_monitor:monitor()}).
monitor = {no_monitor, 0}
:: leveled_monitor:monitor()}).

-record(penciller_options,
{root_path :: string() | undefined,
Expand All @@ -89,19 +143,23 @@
bookies_mem :: tuple() | undefined,
source_penciller :: pid() | undefined,
snapshot_longrunning = true :: boolean(),
compression_method = native :: lz4|native,
compression_method = ?COMPRESSION_METHOD
:: lz4|native|none,
levelzero_cointoss = false :: boolean(),
snaptimeout_short :: pos_integer() | undefined,
snaptimeout_long :: pos_integer() | undefined,
monitor = {no_monitor, 0} :: leveled_monitor:monitor()}).
monitor = {no_monitor, 0}
:: leveled_monitor:monitor()}).

-record(iclerk_options,
{inker :: pid() | undefined,
max_run_length :: integer() | undefined,
cdb_options = #cdb_options{} :: #cdb_options{},
waste_retention_period :: integer() | undefined,
compression_method = native :: lz4|native,
compression_method = ?COMPRESSION_METHOD
:: lz4|native|none,
singlefile_compactionperc :: float()|undefined,
maxrunlength_compactionperc :: float()|undefined,
score_onein = 1 :: pos_integer(),
reload_strategy = [] :: list()}).
%%%============================================================================
15 changes: 12 additions & 3 deletions priv/leveled.schema
Original file line number Diff line number Diff line change
Expand Up @@ -46,11 +46,10 @@
%% Can be lz4 or native (which will use the Erlang native zlib compression)
%% within term_to_binary
{mapping, "leveled.compression_method", "leveled.compression_method", [
{default, native},
{datatype, atom}
{datatype, {enum, [native, lz4, none]}},
{default, native}
]}.


%% @doc Compression point
%% The point at which compression is applied to the Journal (the Ledger is
%% always compressed). Use on_receipt or on_compact. on_compact is suitable
Expand All @@ -61,6 +60,16 @@
{datatype, atom}
]}.

%% @doc Compresstion Level (Ledger LSM)
%% Specify the level in the LSM tree from which compression should occur.
%% Defaults to L1, so only L0 writes are uncompressed.
{mapping, "leveled.compression_level", "leveled.compression_level", [
{default, 1},
{datatype, integer},
{validators, ["range:0-7"]},
hidden
]}.

%% @doc Log level
%% Can be debug, info, warn, error or critical
%% Set the minimum log level to be used within leveled. Leveled will log many
Expand Down
14 changes: 12 additions & 2 deletions priv/leveled_multi.schema
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,8 @@
%% Can be lz4 or native (which will use the Erlang native zlib compression)
%% within term_to_binary
{mapping, "multi_backend.$name.leveled.compression_method", "riak_kv.multi_backend", [
{default, lz4},
{datatype, atom},
{datatype, {enum, [native, lz4, none]}},
{default, native}
hidden
]}.

Expand All @@ -55,6 +55,16 @@
hidden
]}.

%% @doc Compresstion Level (Ledger LSM)
%% Specify the level in the LSM tree from which compression should occur.
%% Defaults to L1, so only L0 writes are uncompressed.
{mapping, "multi_backend.$name.leveled.compression_level", "riak_kv.multi_backend", [
{default, 1},
{datatype, integer},
{validators, ["range:0-7"]},
hidden
]}.


%% @doc The approximate size (in bytes) when a Journal file should be rolled.
%% Normally keep this as around the size of o(100K) objects. Default is 1GB.
Expand Down
43 changes: 13 additions & 30 deletions src/leveled_bookie.erl
Original file line number Diff line number Diff line change
Expand Up @@ -102,34 +102,8 @@
-export([book_returnactors/1]).
-endif.

-define(LOADING_PAUSE, 1000).
-define(CACHE_SIZE, 2500).
-define(MAX_CACHE_MULTTIPLE, 2).
-define(MIN_CACHE_SIZE, 100).
-define(MIN_PCL_CACHE_SIZE, 400).
-define(MAX_PCL_CACHE_SIZE, 28000).
% This is less than actual max - but COIN_SIDECOUNT
-define(CACHE_SIZE_JITTER, 25).
-define(JOURNAL_SIZE_JITTER, 20).
-define(ABSOLUTEMAX_JOURNALSIZE, 4000000000).
-define(LONG_RUNNING, 1000000).
% An individual task taking > 1s gets a specific log
-define(COMPRESSION_METHOD, lz4).
-define(COMPRESSION_POINT, on_receipt).
-define(LOG_LEVEL, info).
-define(TIMING_SAMPLESIZE, 100).
-define(DEFAULT_DBID, 65536).
-define(TIMING_SAMPLECOUNTDOWN, 50000).
-define(DUMMY, dummy). % Dummy key used for mput operations
-define(MAX_KEYCHECK_FREQUENCY, 100).
-define(MIN_KEYCHECK_FREQUENCY, 1).
-define(OPEN_LASTMOD_RANGE, {0, infinity}).
-define(SNAPTIMEOUT_SHORT, 900). % 15 minutes
-define(SNAPTIMEOUT_LONG, 43200). % 12 hours
-define(SST_PAGECACHELEVEL_NOLOOKUP, 1).
-define(SST_PAGECACHELEVEL_LOOKUP, 4).
-define(CACHE_LOGPOINT, 50000).
-define(DEFAULT_STATS_PERC, 10).

-define(OPTION_DEFAULTS,
[{root_path, undefined},
{snapshot_bookie, undefined},
Expand All @@ -138,7 +112,7 @@
{max_journalsize, 1000000000},
{max_journalobjectcount, 200000},
{max_sstslots, 256},
{sync_strategy, none},
{sync_strategy, ?DEFAULT_SYNC_STRATEGY},
{head_only, false},
{waste_retention_period, undefined},
{max_run_length, undefined},
Expand All @@ -150,6 +124,7 @@
{ledger_preloadpagecache_level, ?SST_PAGECACHELEVEL_LOOKUP},
{compression_method, ?COMPRESSION_METHOD},
{compression_point, ?COMPRESSION_POINT},
{compression_level, ?COMPRESSION_LEVEL},
{log_level, ?LOG_LEVEL},
{forced_logs, []},
{database_id, ?DEFAULT_DBID},
Expand Down Expand Up @@ -314,17 +289,23 @@
% To which level of the ledger should the ledger contents be
% pre-loaded into the pagecache (using fadvise on creation and
% startup)
{compression_method, native|lz4} |
{compression_method, native|lz4|none} |
% Compression method and point allow Leveled to be switched from
% using bif based compression (zlib) to using nif based compression
% (lz4).
% (lz4). To disable compression use none. This will disable in
% the ledger as well as the journla (both on_receipt and
% on_compact).
% Defaults to ?COMPRESSION_METHOD
{compression_point, on_compact|on_receipt} |
% The =compression point can be changed between on_receipt (all
% values are compressed as they are received), to on_compact where
% values are originally stored uncompressed (speeding PUT times),
% and are only compressed when they are first subject to compaction
% Defaults to ?COMPRESSION_POINT
{compression_level, 0..7} |
% At what level of the LSM tree in the ledger should compression be
% enabled.
% Defaults to ?COMPRESSION_LEVEL
{log_level, debug|info|warn|error|critical} |
% Set the log level. The default log_level of info is noisy - the
% current implementation was targetted at environments that have
Expand Down Expand Up @@ -1805,6 +1786,7 @@ set_options(Opts, Monitor) ->
% If using lz4 this is not recommended
false
end,
CompressionLevel = proplists:get_value(compression_level, Opts),

MaxSSTSlots = proplists:get_value(max_sstslots, Opts),

Expand Down Expand Up @@ -1837,6 +1819,7 @@ set_options(Opts, Monitor) ->
sst_options =
#sst_options{
press_method = CompressionMethod,
press_level = CompressionLevel,
log_options = leveled_log:get_opts(),
max_sstslots = MaxSSTSlots,
monitor = Monitor},
Expand Down
11 changes: 7 additions & 4 deletions src/leveled_codec.erl
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@
-type object_spec() ::
object_spec_v0()|object_spec_v1().
-type compression_method() ::
lz4|native.
lz4|native|none.
-type index_specs() ::
list({add|remove, any(), any()}).
-type journal_keychanges() ::
Expand Down Expand Up @@ -508,7 +508,9 @@ serialise_object(Object, true, Method) when is_binary(Object) ->
{ok, Bin} = lz4:pack(Object),
Bin;
native ->
zlib:compress(Object)
zlib:compress(Object);
none ->
Object
end;
serialise_object(Object, false, _Method) ->
term_to_binary(Object);
Expand Down Expand Up @@ -554,15 +556,16 @@ encode_valuetype(IsBinary, IsCompressed, Method) ->
Bit3 =
case Method of
lz4 -> 4;
native -> 0
native -> 0;
none -> 0
end,
Bit2 =
case IsBinary of
true -> 2;
false -> 0
end,
Bit1 =
case IsCompressed of
case IsCompressed and (Method =/= none) of
true -> 1;
false -> 0
end,
Expand Down
2 changes: 1 addition & 1 deletion src/leveled_iclerk.erl
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@
reload_strategy = ?DEFAULT_RELOAD_STRATEGY :: list(),
singlefile_compactionperc = ?SINGLEFILE_COMPACTION_TARGET :: float(),
maxrunlength_compactionperc = ?MAXRUNLENGTH_COMPACTION_TARGET ::float(),
compression_method = native :: lz4|native,
compression_method = native :: lz4|native|none,
scored_files = [] :: list(candidate()),
scoring_state :: scoring_state()|undefined,
score_onein = 1 :: pos_integer()}).
Expand Down
2 changes: 1 addition & 1 deletion src/leveled_inker.erl
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@
compaction_pending = false :: boolean(),
bookie_monref :: reference() | undefined,
is_snapshot = false :: boolean(),
compression_method = native :: lz4|native,
compression_method = native :: lz4|native|none,
compress_on_receipt = false :: boolean(),
snap_timeout :: pos_integer() | undefined, % in seconds
source_inker :: pid() | undefined}).
Expand Down
1 change: 0 additions & 1 deletion src/leveled_penciller.erl
Original file line number Diff line number Diff line change
Expand Up @@ -221,7 +221,6 @@
-define(ITERATOR_SCANWIDTH, 4).
-define(TIMING_SAMPLECOUNTDOWN, 10000).
-define(TIMING_SAMPLESIZE, 100).
-define(OPEN_LASTMOD_RANGE, {0, infinity}).
-define(SHUTDOWN_PAUSE, 10000).
% How long to wait for snapshots to be released on shutdown
% before forcing closure of snapshots
Expand Down
Loading
Loading