Skip to content

Commit

Permalink
Add compression controls (#417)
Browse files Browse the repository at this point in the history
* Add compression controls

Add configuration options to allow for a compression algorithm of `none` to disable compression altogether.  Also an option to change the point in the LSM tree when compression is applied.

* Handle configurable defaults consistently

Move them into leveled.hrl.  This forces double-definitions to be resolved.

There are some other constants in leveled_bookie that are relevant outside of leveled_bookie.  These are all now in the non-configurable startup defaults section.

* Clarify referred-to default is OTP not leveled
  • Loading branch information
martinsumner authored Nov 7, 2023
1 parent 1a66349 commit 87aba06
Show file tree
Hide file tree
Showing 11 changed files with 315 additions and 149 deletions.
114 changes: 86 additions & 28 deletions include/leveled.hrl
Original file line number Diff line number Diff line change
@@ -1,34 +1,79 @@
% File paths
%%%============================================================================
%%% File paths
%%%============================================================================
-define(JOURNAL_FP, "journal").
-define(LEDGER_FP, "ledger").
%%%============================================================================

%%%============================================================================
%%% Configurable startup defaults
%%%============================================================================
-define(CACHE_SIZE, 2500).
-define(MAX_CACHE_MULTTIPLE, 2).
-define(MIN_CACHE_SIZE, 100).
-define(MIN_PCL_CACHE_SIZE, 400).
-define(MAX_PCL_CACHE_SIZE, 28000).
% This is less than actual max - but COIN_SIDECOUNT
-define(ABSOLUTEMAX_JOURNALSIZE, 4000000000).
-define(COMPRESSION_METHOD, lz4).
-define(COMPRESSION_POINT, on_receipt).
-define(COMPRESSION_LEVEL, 1).
-define(LOG_LEVEL, info).
-define(DEFAULT_DBID, 65536).
-define(OPEN_LASTMOD_RANGE, {0, infinity}).
-define(SNAPTIMEOUT_SHORT, 900). % 15 minutes
-define(SNAPTIMEOUT_LONG, 43200). % 12 hours
-define(SST_PAGECACHELEVEL_NOLOOKUP, 1).
-define(SST_PAGECACHELEVEL_LOOKUP, 4).
-define(DEFAULT_STATS_PERC, 10).
-define(DEFAULT_SYNC_STRATEGY, none).
%%%============================================================================

%%%============================================================================
%%% Non-configurable startup defaults
%%%============================================================================
-define(MAX_SSTSLOTS, 256).
-define(LOADING_PAUSE, 1000).
-define(LOADING_BATCH, 1000).
-define(CACHE_SIZE_JITTER, 25).
-define(JOURNAL_SIZE_JITTER, 20).
-define(LONG_RUNNING, 1000000).
% An individual task taking > 1s gets a specific log
-define(MAX_KEYCHECK_FREQUENCY, 100).
-define(MIN_KEYCHECK_FREQUENCY, 1).
-define(MAX_LEVELS, 8).
%% Should equal the length of the LEVEL_SCALEFACTOR
-define(CACHE_TYPE, skpl).
%%%============================================================================

%% Tag to be used on standard Riak KV objects
%%%============================================================================
%%% Tags
%%%============================================================================
-define(RIAK_TAG, o_rkv).
%% Tag to be used on K/V objects for non-Riak purposes
%% Tag to be used on standard Riak KV objects
-define(STD_TAG, o).
%% Tag used for secondary index keys
%% Tag to be used on K/V objects for non-Riak purposes
-define(IDX_TAG, i).
%% Tag used for head-only objects
%% Tag used for secondary index keys
-define(HEAD_TAG, h).
%% Tag used for head-only objects

%% Inker key type used for 'normal' objects
-define(INKT_STND, stnd).
%% Inker key type used for 'batch' objects
-define(INKT_STND, stnd).
%% Inker key type used for 'normal' objects
-define(INKT_MPUT, mput).
%% Inker key type used for objects which contain no value, only key changes
%% This is used currently for objects formed under a 'retain' strategy on Inker
%% compaction
%% Inker key type used for 'batch' objects
-define(INKT_KEYD, keyd).
%% Inker key type used for tombstones
%% Inker key type used for objects which contain no value, only key changes
%% This is used currently for objects formed under a 'retain' strategy
%% on Inker compaction
-define(INKT_TOMB, tomb).

-define(CACHE_TYPE, skpl).

-define(MAX_LEVELS, 8).
%% Should equal the length of the LEVEL_SCALEFACTOR
%% Inker key type used for tombstones
%%%============================================================================


%%%============================================================================
%%% Shared records
%%%============================================================================
-record(level,
{level :: integer(),
is_basement = false :: boolean(),
Expand All @@ -47,19 +92,26 @@
file_path :: string() | undefined,
waste_path :: string() | undefined,
binary_mode = false :: boolean(),
sync_strategy = sync,
% Default set by bookie to be `true`
% `false` set here due to legacy of unit tests
% using non-binary keys
sync_strategy = ?DEFAULT_SYNC_STRATEGY,
log_options = leveled_log:get_opts()
:: leveled_log:log_options(),
monitor = {no_monitor, 0} :: leveled_monitor:monitor()}).
monitor = {no_monitor, 0}
:: leveled_monitor:monitor()}).

-record(sst_options,
{press_method = native
{press_method = ?COMPRESSION_METHOD
:: leveled_sst:press_method(),
press_level = ?COMPRESSION_LEVEL :: non_neg_integer(),
log_options = leveled_log:get_opts()
:: leveled_log:log_options(),
max_sstslots = 256 :: pos_integer(),
pagecache_level = 1 :: pos_integer(),
monitor = {no_monitor, 0} :: leveled_monitor:monitor()}).
max_sstslots = ?MAX_SSTSLOTS :: pos_integer(),
pagecache_level = ?SST_PAGECACHELEVEL_NOLOOKUP
:: pos_integer(),
monitor = {no_monitor, 0}
:: leveled_monitor:monitor()}).

-record(inker_options,
{cdb_max_size :: integer() | undefined,
Expand All @@ -70,14 +122,16 @@
source_inker :: pid() | undefined,
reload_strategy = [] :: list(),
waste_retention_period :: integer() | undefined,
compression_method = native :: lz4|native,
compression_method = ?COMPRESSION_METHOD
:: lz4|native|none,
compress_on_receipt = false :: boolean(),
max_run_length,
singlefile_compactionperc :: float()|undefined,
maxrunlength_compactionperc :: float()|undefined,
score_onein = 1 :: pos_integer(),
snaptimeout_long :: pos_integer() | undefined,
monitor = {no_monitor, 0} :: leveled_monitor:monitor()}).
monitor = {no_monitor, 0}
:: leveled_monitor:monitor()}).

-record(penciller_options,
{root_path :: string() | undefined,
Expand All @@ -89,19 +143,23 @@
bookies_mem :: tuple() | undefined,
source_penciller :: pid() | undefined,
snapshot_longrunning = true :: boolean(),
compression_method = native :: lz4|native,
compression_method = ?COMPRESSION_METHOD
:: lz4|native|none,
levelzero_cointoss = false :: boolean(),
snaptimeout_short :: pos_integer() | undefined,
snaptimeout_long :: pos_integer() | undefined,
monitor = {no_monitor, 0} :: leveled_monitor:monitor()}).
monitor = {no_monitor, 0}
:: leveled_monitor:monitor()}).

-record(iclerk_options,
{inker :: pid() | undefined,
max_run_length :: integer() | undefined,
cdb_options = #cdb_options{} :: #cdb_options{},
waste_retention_period :: integer() | undefined,
compression_method = native :: lz4|native,
compression_method = ?COMPRESSION_METHOD
:: lz4|native|none,
singlefile_compactionperc :: float()|undefined,
maxrunlength_compactionperc :: float()|undefined,
score_onein = 1 :: pos_integer(),
reload_strategy = [] :: list()}).
%%%============================================================================
15 changes: 12 additions & 3 deletions priv/leveled.schema
Original file line number Diff line number Diff line change
Expand Up @@ -46,11 +46,10 @@
%% Can be lz4 or native (which will use the Erlang native zlib compression)
%% within term_to_binary
{mapping, "leveled.compression_method", "leveled.compression_method", [
{default, native},
{datatype, atom}
{datatype, {enum, [native, lz4, none]}},
{default, native}
]}.


%% @doc Compression point
%% The point at which compression is applied to the Journal (the Ledger is
%% always compressed). Use on_receipt or on_compact. on_compact is suitable
Expand All @@ -61,6 +60,16 @@
{datatype, atom}
]}.

%% @doc Compresstion Level (Ledger LSM)
%% Specify the level in the LSM tree from which compression should occur.
%% Defaults to L1, so only L0 writes are uncompressed.
{mapping, "leveled.compression_level", "leveled.compression_level", [
{default, 1},
{datatype, integer},
{validators, ["range:0-7"]},
hidden
]}.

%% @doc Log level
%% Can be debug, info, warn, error or critical
%% Set the minimum log level to be used within leveled. Leveled will log many
Expand Down
14 changes: 12 additions & 2 deletions priv/leveled_multi.schema
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,8 @@
%% Can be lz4 or native (which will use the Erlang native zlib compression)
%% within term_to_binary
{mapping, "multi_backend.$name.leveled.compression_method", "riak_kv.multi_backend", [
{default, lz4},
{datatype, atom},
{datatype, {enum, [native, lz4, none]}},
{default, native}
hidden
]}.

Expand All @@ -55,6 +55,16 @@
hidden
]}.

%% @doc Compresstion Level (Ledger LSM)
%% Specify the level in the LSM tree from which compression should occur.
%% Defaults to L1, so only L0 writes are uncompressed.
{mapping, "multi_backend.$name.leveled.compression_level", "riak_kv.multi_backend", [
{default, 1},
{datatype, integer},
{validators, ["range:0-7"]},
hidden
]}.


%% @doc The approximate size (in bytes) when a Journal file should be rolled.
%% Normally keep this as around the size of o(100K) objects. Default is 1GB.
Expand Down
43 changes: 13 additions & 30 deletions src/leveled_bookie.erl
Original file line number Diff line number Diff line change
Expand Up @@ -104,34 +104,8 @@

-include_lib("eunit/include/eunit.hrl").

-define(LOADING_PAUSE, 1000).
-define(CACHE_SIZE, 2500).
-define(MAX_CACHE_MULTTIPLE, 2).
-define(MIN_CACHE_SIZE, 100).
-define(MIN_PCL_CACHE_SIZE, 400).
-define(MAX_PCL_CACHE_SIZE, 28000).
% This is less than actual max - but COIN_SIDECOUNT
-define(CACHE_SIZE_JITTER, 25).
-define(JOURNAL_SIZE_JITTER, 20).
-define(ABSOLUTEMAX_JOURNALSIZE, 4000000000).
-define(LONG_RUNNING, 1000000).
% An individual task taking > 1s gets a specific log
-define(COMPRESSION_METHOD, lz4).
-define(COMPRESSION_POINT, on_receipt).
-define(LOG_LEVEL, info).
-define(TIMING_SAMPLESIZE, 100).
-define(DEFAULT_DBID, 65536).
-define(TIMING_SAMPLECOUNTDOWN, 50000).
-define(DUMMY, dummy). % Dummy key used for mput operations
-define(MAX_KEYCHECK_FREQUENCY, 100).
-define(MIN_KEYCHECK_FREQUENCY, 1).
-define(OPEN_LASTMOD_RANGE, {0, infinity}).
-define(SNAPTIMEOUT_SHORT, 900). % 15 minutes
-define(SNAPTIMEOUT_LONG, 43200). % 12 hours
-define(SST_PAGECACHELEVEL_NOLOOKUP, 1).
-define(SST_PAGECACHELEVEL_LOOKUP, 4).
-define(CACHE_LOGPOINT, 50000).
-define(DEFAULT_STATS_PERC, 10).

-define(OPTION_DEFAULTS,
[{root_path, undefined},
{snapshot_bookie, undefined},
Expand All @@ -140,7 +114,7 @@
{max_journalsize, 1000000000},
{max_journalobjectcount, 200000},
{max_sstslots, 256},
{sync_strategy, none},
{sync_strategy, ?DEFAULT_SYNC_STRATEGY},
{head_only, false},
{waste_retention_period, undefined},
{max_run_length, undefined},
Expand All @@ -152,6 +126,7 @@
{ledger_preloadpagecache_level, ?SST_PAGECACHELEVEL_LOOKUP},
{compression_method, ?COMPRESSION_METHOD},
{compression_point, ?COMPRESSION_POINT},
{compression_level, ?COMPRESSION_LEVEL},
{log_level, ?LOG_LEVEL},
{forced_logs, []},
{database_id, ?DEFAULT_DBID},
Expand Down Expand Up @@ -316,17 +291,23 @@
% To which level of the ledger should the ledger contents be
% pre-loaded into the pagecache (using fadvise on creation and
% startup)
{compression_method, native|lz4} |
{compression_method, native|lz4|none} |
% Compression method and point allow Leveled to be switched from
% using bif based compression (zlib) to using nif based compression
% (lz4).
% (lz4). To disable compression use none. This will disable in
% the ledger as well as the journla (both on_receipt and
% on_compact).
% Defaults to ?COMPRESSION_METHOD
{compression_point, on_compact|on_receipt} |
% The =compression point can be changed between on_receipt (all
% values are compressed as they are received), to on_compact where
% values are originally stored uncompressed (speeding PUT times),
% and are only compressed when they are first subject to compaction
% Defaults to ?COMPRESSION_POINT
{compression_level, 0..7} |
% At what level of the LSM tree in the ledger should compression be
% enabled.
% Defaults to ?COMPRESSION_LEVEL
{log_level, debug|info|warn|error|critical} |
% Set the log level. The default log_level of info is noisy - the
% current implementation was targetted at environments that have
Expand Down Expand Up @@ -1810,6 +1791,7 @@ set_options(Opts, Monitor) ->
% If using lz4 this is not recommended
false
end,
CompressionLevel = proplists:get_value(compression_level, Opts),

MaxSSTSlots = proplists:get_value(max_sstslots, Opts),

Expand Down Expand Up @@ -1842,6 +1824,7 @@ set_options(Opts, Monitor) ->
sst_options =
#sst_options{
press_method = CompressionMethod,
press_level = CompressionLevel,
log_options = leveled_log:get_opts(),
max_sstslots = MaxSSTSlots,
monitor = Monitor},
Expand Down
11 changes: 7 additions & 4 deletions src/leveled_codec.erl
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@
-type object_spec() ::
object_spec_v0()|object_spec_v1().
-type compression_method() ::
lz4|native.
lz4|native|none.
-type index_specs() ::
list({add|remove, any(), any()}).
-type journal_keychanges() ::
Expand Down Expand Up @@ -508,7 +508,9 @@ serialise_object(Object, true, Method) when is_binary(Object) ->
{ok, Bin} = lz4:pack(Object),
Bin;
native ->
zlib:compress(Object)
zlib:compress(Object);
none ->
Object
end;
serialise_object(Object, false, _Method) ->
term_to_binary(Object);
Expand Down Expand Up @@ -554,15 +556,16 @@ encode_valuetype(IsBinary, IsCompressed, Method) ->
Bit3 =
case Method of
lz4 -> 4;
native -> 0
native -> 0;
none -> 0
end,
Bit2 =
case IsBinary of
true -> 2;
false -> 0
end,
Bit1 =
case IsCompressed of
case IsCompressed and (Method =/= none) of
true -> 1;
false -> 0
end,
Expand Down
2 changes: 1 addition & 1 deletion src/leveled_iclerk.erl
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@
reload_strategy = ?DEFAULT_RELOAD_STRATEGY :: list(),
singlefile_compactionperc = ?SINGLEFILE_COMPACTION_TARGET :: float(),
maxrunlength_compactionperc = ?MAXRUNLENGTH_COMPACTION_TARGET ::float(),
compression_method = native :: lz4|native,
compression_method = native :: lz4|native|none,
scored_files = [] :: list(candidate()),
scoring_state :: scoring_state()|undefined,
score_onein = 1 :: pos_integer()}).
Expand Down
2 changes: 1 addition & 1 deletion src/leveled_inker.erl
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@
compaction_pending = false :: boolean(),
bookie_monref :: reference() | undefined,
is_snapshot = false :: boolean(),
compression_method = native :: lz4|native,
compression_method = native :: lz4|native|none,
compress_on_receipt = false :: boolean(),
snap_timeout :: pos_integer() | undefined, % in seconds
source_inker :: pid() | undefined}).
Expand Down
1 change: 0 additions & 1 deletion src/leveled_penciller.erl
Original file line number Diff line number Diff line change
Expand Up @@ -221,7 +221,6 @@
-define(ITERATOR_SCANWIDTH, 4).
-define(TIMING_SAMPLECOUNTDOWN, 10000).
-define(TIMING_SAMPLESIZE, 100).
-define(OPEN_LASTMOD_RANGE, {0, infinity}).
-define(SHUTDOWN_PAUSE, 10000).
% How long to wait for snapshots to be released on shutdown
% before forcing closure of snapshots
Expand Down
Loading

0 comments on commit 87aba06

Please sign in to comment.