diff --git a/include/leveled.hrl b/include/leveled.hrl index 13b72cc4..8f79da6e 100644 --- a/include/leveled.hrl +++ b/include/leveled.hrl @@ -1,34 +1,79 @@ -% File paths +%%%============================================================================ +%%% File paths +%%%============================================================================ -define(JOURNAL_FP, "journal"). -define(LEDGER_FP, "ledger"). +%%%============================================================================ + +%%%============================================================================ +%%% Configurable startup defaults +%%%============================================================================ +-define(CACHE_SIZE, 2500). +-define(MAX_CACHE_MULTTIPLE, 2). +-define(MIN_CACHE_SIZE, 100). +-define(MIN_PCL_CACHE_SIZE, 400). +-define(MAX_PCL_CACHE_SIZE, 28000). + % This is less than actual max - but COIN_SIDECOUNT +-define(ABSOLUTEMAX_JOURNALSIZE, 4000000000). +-define(COMPRESSION_METHOD, lz4). +-define(COMPRESSION_POINT, on_receipt). +-define(COMPRESSION_LEVEL, 1). +-define(LOG_LEVEL, info). +-define(DEFAULT_DBID, 65536). +-define(OPEN_LASTMOD_RANGE, {0, infinity}). +-define(SNAPTIMEOUT_SHORT, 900). % 15 minutes +-define(SNAPTIMEOUT_LONG, 43200). % 12 hours +-define(SST_PAGECACHELEVEL_NOLOOKUP, 1). +-define(SST_PAGECACHELEVEL_LOOKUP, 4). +-define(DEFAULT_STATS_PERC, 10). +-define(DEFAULT_SYNC_STRATEGY, none). +%%%============================================================================ + +%%%============================================================================ +%%% Non-configurable startup defaults +%%%============================================================================ +-define(MAX_SSTSLOTS, 256). +-define(LOADING_PAUSE, 1000). -define(LOADING_BATCH, 1000). +-define(CACHE_SIZE_JITTER, 25). +-define(JOURNAL_SIZE_JITTER, 20). +-define(LONG_RUNNING, 1000000). + % An individual task taking > 1s gets a specific log +-define(MAX_KEYCHECK_FREQUENCY, 100). +-define(MIN_KEYCHECK_FREQUENCY, 1). +-define(MAX_LEVELS, 8). + %% Should equal the length of the LEVEL_SCALEFACTOR +-define(CACHE_TYPE, skpl). +%%%============================================================================ -%% Tag to be used on standard Riak KV objects +%%%============================================================================ +%%% Tags +%%%============================================================================ -define(RIAK_TAG, o_rkv). -%% Tag to be used on K/V objects for non-Riak purposes + %% Tag to be used on standard Riak KV objects -define(STD_TAG, o). -%% Tag used for secondary index keys + %% Tag to be used on K/V objects for non-Riak purposes -define(IDX_TAG, i). -%% Tag used for head-only objects + %% Tag used for secondary index keys -define(HEAD_TAG, h). + %% Tag used for head-only objects -%% Inker key type used for 'normal' objects --define(INKT_STND, stnd). -%% Inker key type used for 'batch' objects +-define(INKT_STND, stnd). + %% Inker key type used for 'normal' objects -define(INKT_MPUT, mput). -%% Inker key type used for objects which contain no value, only key changes -%% This is used currently for objects formed under a 'retain' strategy on Inker -%% compaction + %% Inker key type used for 'batch' objects -define(INKT_KEYD, keyd). -%% Inker key type used for tombstones + %% Inker key type used for objects which contain no value, only key changes + %% This is used currently for objects formed under a 'retain' strategy + %% on Inker compaction -define(INKT_TOMB, tomb). - --define(CACHE_TYPE, skpl). - --define(MAX_LEVELS, 8). -%% Should equal the length of the LEVEL_SCALEFACTOR + %% Inker key type used for tombstones +%%%============================================================================ +%%%============================================================================ +%%% Shared records +%%%============================================================================ -record(level, {level :: integer(), is_basement = false :: boolean(), @@ -47,19 +92,26 @@ file_path :: string() | undefined, waste_path :: string() | undefined, binary_mode = false :: boolean(), - sync_strategy = sync, + % Default set by bookie to be `true` + % `false` set here due to legacy of unit tests + % using non-binary keys + sync_strategy = ?DEFAULT_SYNC_STRATEGY, log_options = leveled_log:get_opts() :: leveled_log:log_options(), - monitor = {no_monitor, 0} :: leveled_monitor:monitor()}). + monitor = {no_monitor, 0} + :: leveled_monitor:monitor()}). -record(sst_options, - {press_method = native + {press_method = ?COMPRESSION_METHOD :: leveled_sst:press_method(), + press_level = ?COMPRESSION_LEVEL :: non_neg_integer(), log_options = leveled_log:get_opts() :: leveled_log:log_options(), - max_sstslots = 256 :: pos_integer(), - pagecache_level = 1 :: pos_integer(), - monitor = {no_monitor, 0} :: leveled_monitor:monitor()}). + max_sstslots = ?MAX_SSTSLOTS :: pos_integer(), + pagecache_level = ?SST_PAGECACHELEVEL_NOLOOKUP + :: pos_integer(), + monitor = {no_monitor, 0} + :: leveled_monitor:monitor()}). -record(inker_options, {cdb_max_size :: integer() | undefined, @@ -70,14 +122,16 @@ source_inker :: pid() | undefined, reload_strategy = [] :: list(), waste_retention_period :: integer() | undefined, - compression_method = native :: lz4|native, + compression_method = ?COMPRESSION_METHOD + :: lz4|native|none, compress_on_receipt = false :: boolean(), max_run_length, singlefile_compactionperc :: float()|undefined, maxrunlength_compactionperc :: float()|undefined, score_onein = 1 :: pos_integer(), snaptimeout_long :: pos_integer() | undefined, - monitor = {no_monitor, 0} :: leveled_monitor:monitor()}). + monitor = {no_monitor, 0} + :: leveled_monitor:monitor()}). -record(penciller_options, {root_path :: string() | undefined, @@ -89,19 +143,23 @@ bookies_mem :: tuple() | undefined, source_penciller :: pid() | undefined, snapshot_longrunning = true :: boolean(), - compression_method = native :: lz4|native, + compression_method = ?COMPRESSION_METHOD + :: lz4|native|none, levelzero_cointoss = false :: boolean(), snaptimeout_short :: pos_integer() | undefined, snaptimeout_long :: pos_integer() | undefined, - monitor = {no_monitor, 0} :: leveled_monitor:monitor()}). + monitor = {no_monitor, 0} + :: leveled_monitor:monitor()}). -record(iclerk_options, {inker :: pid() | undefined, max_run_length :: integer() | undefined, cdb_options = #cdb_options{} :: #cdb_options{}, waste_retention_period :: integer() | undefined, - compression_method = native :: lz4|native, + compression_method = ?COMPRESSION_METHOD + :: lz4|native|none, singlefile_compactionperc :: float()|undefined, maxrunlength_compactionperc :: float()|undefined, score_onein = 1 :: pos_integer(), reload_strategy = [] :: list()}). +%%%============================================================================ \ No newline at end of file diff --git a/priv/leveled.schema b/priv/leveled.schema index 85e4a397..3c404bbe 100644 --- a/priv/leveled.schema +++ b/priv/leveled.schema @@ -46,11 +46,10 @@ %% Can be lz4 or native (which will use the Erlang native zlib compression) %% within term_to_binary {mapping, "leveled.compression_method", "leveled.compression_method", [ - {default, native}, - {datatype, atom} + {datatype, {enum, [native, lz4, none]}}, + {default, native} ]}. - %% @doc Compression point %% The point at which compression is applied to the Journal (the Ledger is %% always compressed). Use on_receipt or on_compact. on_compact is suitable @@ -61,6 +60,16 @@ {datatype, atom} ]}. +%% @doc Compresstion Level (Ledger LSM) +%% Specify the level in the LSM tree from which compression should occur. +%% Defaults to L1, so only L0 writes are uncompressed. +{mapping, "leveled.compression_level", "leveled.compression_level", [ + {default, 1}, + {datatype, integer}, + {validators, ["range:0-7"]}, + hidden +]}. + %% @doc Log level %% Can be debug, info, warn, error or critical %% Set the minimum log level to be used within leveled. Leveled will log many diff --git a/priv/leveled_multi.schema b/priv/leveled_multi.schema index ff0f4a4b..486a09f1 100644 --- a/priv/leveled_multi.schema +++ b/priv/leveled_multi.schema @@ -39,8 +39,8 @@ %% Can be lz4 or native (which will use the Erlang native zlib compression) %% within term_to_binary {mapping, "multi_backend.$name.leveled.compression_method", "riak_kv.multi_backend", [ - {default, lz4}, - {datatype, atom}, + {datatype, {enum, [native, lz4, none]}}, + {default, native} hidden ]}. @@ -55,6 +55,16 @@ hidden ]}. +%% @doc Compresstion Level (Ledger LSM) +%% Specify the level in the LSM tree from which compression should occur. +%% Defaults to L1, so only L0 writes are uncompressed. +{mapping, "multi_backend.$name.leveled.compression_level", "riak_kv.multi_backend", [ + {default, 1}, + {datatype, integer}, + {validators, ["range:0-7"]}, + hidden +]}. + %% @doc The approximate size (in bytes) when a Journal file should be rolled. %% Normally keep this as around the size of o(100K) objects. Default is 1GB. diff --git a/src/leveled_bookie.erl b/src/leveled_bookie.erl index a2c76c50..b03d9442 100644 --- a/src/leveled_bookie.erl +++ b/src/leveled_bookie.erl @@ -104,34 +104,8 @@ -include_lib("eunit/include/eunit.hrl"). --define(LOADING_PAUSE, 1000). --define(CACHE_SIZE, 2500). --define(MAX_CACHE_MULTTIPLE, 2). --define(MIN_CACHE_SIZE, 100). --define(MIN_PCL_CACHE_SIZE, 400). --define(MAX_PCL_CACHE_SIZE, 28000). - % This is less than actual max - but COIN_SIDECOUNT --define(CACHE_SIZE_JITTER, 25). --define(JOURNAL_SIZE_JITTER, 20). --define(ABSOLUTEMAX_JOURNALSIZE, 4000000000). --define(LONG_RUNNING, 1000000). - % An individual task taking > 1s gets a specific log --define(COMPRESSION_METHOD, lz4). --define(COMPRESSION_POINT, on_receipt). --define(LOG_LEVEL, info). --define(TIMING_SAMPLESIZE, 100). --define(DEFAULT_DBID, 65536). --define(TIMING_SAMPLECOUNTDOWN, 50000). -define(DUMMY, dummy). % Dummy key used for mput operations --define(MAX_KEYCHECK_FREQUENCY, 100). --define(MIN_KEYCHECK_FREQUENCY, 1). --define(OPEN_LASTMOD_RANGE, {0, infinity}). --define(SNAPTIMEOUT_SHORT, 900). % 15 minutes --define(SNAPTIMEOUT_LONG, 43200). % 12 hours --define(SST_PAGECACHELEVEL_NOLOOKUP, 1). --define(SST_PAGECACHELEVEL_LOOKUP, 4). --define(CACHE_LOGPOINT, 50000). --define(DEFAULT_STATS_PERC, 10). + -define(OPTION_DEFAULTS, [{root_path, undefined}, {snapshot_bookie, undefined}, @@ -140,7 +114,7 @@ {max_journalsize, 1000000000}, {max_journalobjectcount, 200000}, {max_sstslots, 256}, - {sync_strategy, none}, + {sync_strategy, ?DEFAULT_SYNC_STRATEGY}, {head_only, false}, {waste_retention_period, undefined}, {max_run_length, undefined}, @@ -152,6 +126,7 @@ {ledger_preloadpagecache_level, ?SST_PAGECACHELEVEL_LOOKUP}, {compression_method, ?COMPRESSION_METHOD}, {compression_point, ?COMPRESSION_POINT}, + {compression_level, ?COMPRESSION_LEVEL}, {log_level, ?LOG_LEVEL}, {forced_logs, []}, {database_id, ?DEFAULT_DBID}, @@ -316,10 +291,12 @@ % To which level of the ledger should the ledger contents be % pre-loaded into the pagecache (using fadvise on creation and % startup) - {compression_method, native|lz4} | + {compression_method, native|lz4|none} | % Compression method and point allow Leveled to be switched from % using bif based compression (zlib) to using nif based compression - % (lz4). + % (lz4). To disable compression use none. This will disable in + % the ledger as well as the journla (both on_receipt and + % on_compact). % Defaults to ?COMPRESSION_METHOD {compression_point, on_compact|on_receipt} | % The =compression point can be changed between on_receipt (all @@ -327,6 +304,10 @@ % values are originally stored uncompressed (speeding PUT times), % and are only compressed when they are first subject to compaction % Defaults to ?COMPRESSION_POINT + {compression_level, 0..7} | + % At what level of the LSM tree in the ledger should compression be + % enabled. + % Defaults to ?COMPRESSION_LEVEL {log_level, debug|info|warn|error|critical} | % Set the log level. The default log_level of info is noisy - the % current implementation was targetted at environments that have @@ -1810,6 +1791,7 @@ set_options(Opts, Monitor) -> % If using lz4 this is not recommended false end, + CompressionLevel = proplists:get_value(compression_level, Opts), MaxSSTSlots = proplists:get_value(max_sstslots, Opts), @@ -1842,6 +1824,7 @@ set_options(Opts, Monitor) -> sst_options = #sst_options{ press_method = CompressionMethod, + press_level = CompressionLevel, log_options = leveled_log:get_opts(), max_sstslots = MaxSSTSlots, monitor = Monitor}, diff --git a/src/leveled_codec.erl b/src/leveled_codec.erl index ca1a3ddd..0251e1f1 100644 --- a/src/leveled_codec.erl +++ b/src/leveled_codec.erl @@ -108,7 +108,7 @@ -type object_spec() :: object_spec_v0()|object_spec_v1(). -type compression_method() :: - lz4|native. + lz4|native|none. -type index_specs() :: list({add|remove, any(), any()}). -type journal_keychanges() :: @@ -508,7 +508,9 @@ serialise_object(Object, true, Method) when is_binary(Object) -> {ok, Bin} = lz4:pack(Object), Bin; native -> - zlib:compress(Object) + zlib:compress(Object); + none -> + Object end; serialise_object(Object, false, _Method) -> term_to_binary(Object); @@ -554,7 +556,8 @@ encode_valuetype(IsBinary, IsCompressed, Method) -> Bit3 = case Method of lz4 -> 4; - native -> 0 + native -> 0; + none -> 0 end, Bit2 = case IsBinary of @@ -562,7 +565,7 @@ encode_valuetype(IsBinary, IsCompressed, Method) -> false -> 0 end, Bit1 = - case IsCompressed of + case IsCompressed and (Method =/= none) of true -> 1; false -> 0 end, diff --git a/src/leveled_iclerk.erl b/src/leveled_iclerk.erl index 7ae3b445..e286bc27 100644 --- a/src/leveled_iclerk.erl +++ b/src/leveled_iclerk.erl @@ -115,7 +115,7 @@ reload_strategy = ?DEFAULT_RELOAD_STRATEGY :: list(), singlefile_compactionperc = ?SINGLEFILE_COMPACTION_TARGET :: float(), maxrunlength_compactionperc = ?MAXRUNLENGTH_COMPACTION_TARGET ::float(), - compression_method = native :: lz4|native, + compression_method = native :: lz4|native|none, scored_files = [] :: list(candidate()), scoring_state :: scoring_state()|undefined, score_onein = 1 :: pos_integer()}). diff --git a/src/leveled_inker.erl b/src/leveled_inker.erl index 961973be..bebac6a7 100644 --- a/src/leveled_inker.erl +++ b/src/leveled_inker.erl @@ -152,7 +152,7 @@ compaction_pending = false :: boolean(), bookie_monref :: reference() | undefined, is_snapshot = false :: boolean(), - compression_method = native :: lz4|native, + compression_method = native :: lz4|native|none, compress_on_receipt = false :: boolean(), snap_timeout :: pos_integer() | undefined, % in seconds source_inker :: pid() | undefined}). diff --git a/src/leveled_penciller.erl b/src/leveled_penciller.erl index f5ac54ba..89955b04 100644 --- a/src/leveled_penciller.erl +++ b/src/leveled_penciller.erl @@ -221,7 +221,6 @@ -define(ITERATOR_SCANWIDTH, 4). -define(TIMING_SAMPLECOUNTDOWN, 10000). -define(TIMING_SAMPLESIZE, 100). --define(OPEN_LASTMOD_RANGE, {0, infinity}). -define(SHUTDOWN_PAUSE, 10000). % How long to wait for snapshots to be released on shutdown % before forcing closure of snapshots diff --git a/src/leveled_sst.erl b/src/leveled_sst.erl index b4bce3e7..2252f133 100644 --- a/src/leveled_sst.erl +++ b/src/leveled_sst.erl @@ -77,8 +77,12 @@ -define(LOOK_BLOCKSIZE, {24, 32}). % 4x + y = ?LOOK_SLOTSIZE -define(NOLOOK_SLOTSIZE, 256). -define(NOLOOK_BLOCKSIZE, {56, 32}). % 4x + y = ?NOLOOK_SLOTSIZE --define(COMPRESSION_LEVEL, 1). --define(BINARY_SETTINGS, [{compressed, ?COMPRESSION_LEVEL}]). +-define(COMPRESSION_FACTOR, 1). + % When using native compression - how hard should the compression code + % try to reduce the size of the compressed output. 1 Is to imply minimal + % effort, 6 is default in OTP: + % https://www.erlang.org/doc/man/erlang.html#term_to_binary-2 +-define(BINARY_SETTINGS, [{compressed, ?COMPRESSION_FACTOR}]). -define(MERGE_SCANWIDTH, 16). -define(DISCARD_EXT, ".discarded"). -define(DELETE_TIMEOUT, 10000). @@ -89,7 +93,6 @@ -define(BLOCK_LENGTHS_LENGTH, 20). -define(LMD_LENGTH, 4). -define(FLIPPER32, 4294967295). --define(COMPRESS_AT_LEVEL, 1). -define(DOUBLESIZE_LEVEL, 3). -define(INDEX_MODDATE, true). -define(TOMB_COUNT, true). @@ -288,12 +291,7 @@ sst_new(RootPath, Filename, Level, KVList, MaxSQN, OptsSST) -> sst_new(RootPath, Filename, Level, KVList, MaxSQN, OptsSST, IndexModDate) -> {ok, Pid} = gen_fsm:start_link(?MODULE, [], ?START_OPTS), - PressMethod0 = compress_level(Level, OptsSST#sst_options.press_method), - MaxSlots0 = maxslots_level(Level, OptsSST#sst_options.max_sstslots), - OptsSST0 = - OptsSST#sst_options{press_method = PressMethod0, - max_sstslots = MaxSlots0}, - + OptsSST0 = update_options(OptsSST, Level), {[], [], SlotList, FK, _CountOfTombs} = merge_lists(KVList, OptsSST0, IndexModDate), case gen_fsm:sync_send_event(Pid, @@ -344,11 +342,7 @@ sst_newmerge(RootPath, Filename, sst_newmerge(RootPath, Filename, KVL1, KVL2, IsBasement, Level, MaxSQN, OptsSST, IndexModDate, TombCount) -> - PressMethod0 = compress_level(Level, OptsSST#sst_options.press_method), - MaxSlots0 = maxslots_level(Level, OptsSST#sst_options.max_sstslots), - OptsSST0 = - OptsSST#sst_options{press_method = PressMethod0, - max_sstslots = MaxSlots0}, + OptsSST0 = update_options(OptsSST, Level), {Rem1, Rem2, SlotList, FK, CountOfTombs} = merge_lists(KVL1, KVL2, {IsBasement, Level}, OptsSST0, IndexModDate, TombCount), @@ -391,11 +385,7 @@ sst_newmerge(RootPath, Filename, sst_newlevelzero(RootPath, Filename, Slots, Fetcher, Penciller, MaxSQN, OptsSST) -> - PressMethod0 = compress_level(0, OptsSST#sst_options.press_method), - MaxSlots0 = maxslots_level(0, OptsSST#sst_options.max_sstslots), - OptsSST0 = - OptsSST#sst_options{press_method = PressMethod0, - max_sstslots = MaxSlots0}, + OptsSST0 = update_options(OptsSST, 0), {ok, Pid} = gen_fsm:start_link(?MODULE, [], ?START_OPTS), % Initiate the file into the "starting" state ok = gen_fsm:sync_send_event(Pid, @@ -1271,6 +1261,16 @@ tune_seglist(SegList) -> %%% Internal Functions %%%============================================================================ + +-spec update_options(sst_options(), non_neg_integer()) -> sst_options(). +update_options(OptsSST, Level) -> + CompressLevel = OptsSST#sst_options.press_level, + PressMethod0 = + compress_level(Level, CompressLevel, OptsSST#sst_options.press_method), + MaxSlots0 = + maxslots_level(Level, OptsSST#sst_options.max_sstslots), + OptsSST#sst_options{press_method = PressMethod0, max_sstslots = MaxSlots0}. + -spec new_blockindex_cache(pos_integer()) -> blockindex_cache(). new_blockindex_cache(Size) -> {0, array:new([{size, Size}, {default, none}]), 0}. @@ -1512,12 +1512,14 @@ fetch_range(StartKey, EndKey, ScanWidth, SegList, LowLastMod, State) -> State#state.index_moddate), {NeededBlockIdx, SlotsToFetchBinList, SlotsToPoint}. --spec compress_level(integer(), press_method()) -> press_method(). +-spec compress_level( + non_neg_integer(), non_neg_integer(), press_method()) -> press_method(). %% @doc %% disable compression at higher levels for improved performance -compress_level(Level, _PressMethod) when Level < ?COMPRESS_AT_LEVEL -> +compress_level( + Level, LevelToCompress, _PressMethod) when Level < LevelToCompress -> none; -compress_level(_Level, PressMethod) -> +compress_level(_Level, _LevelToCompress, PressMethod) -> PressMethod. -spec maxslots_level(level(), pos_integer()) -> pos_integer(). diff --git a/test/end_to_end/basic_SUITE.erl b/test/end_to_end/basic_SUITE.erl index 3badd2f7..0c86ea9b 100644 --- a/test/end_to_end/basic_SUITE.erl +++ b/test/end_to_end/basic_SUITE.erl @@ -199,11 +199,12 @@ bigsst_littlesst(_Config) -> {compression_point, on_compact}], {ok, Bookie1} = leveled_bookie:book_start(StartOpts1), ObjL1 = - testutil:generate_objects(80000, 1, [], + testutil:generate_objects(100000, 1, [], leveled_rand:rand_bytes(100), fun() -> [] end, <<"B">>), testutil:riakload(Bookie1, ObjL1), testutil:check_forlist(Bookie1, ObjL1), + timer:sleep(10000), % Wait for delete timeout JFP = RootPath ++ "/ledger/ledger_files/", {ok, FNS1} = file:list_dir(JFP), ok = leveled_bookie:book_destroy(Bookie1), @@ -213,6 +214,7 @@ bigsst_littlesst(_Config) -> {ok, Bookie2} = leveled_bookie:book_start(StartOpts2), testutil:riakload(Bookie2, ObjL1), testutil:check_forlist(Bookie2, ObjL1), + timer:sleep(10000), % Wait for delete timeout {ok, FNS2} = file:list_dir(JFP), ok = leveled_bookie:book_destroy(Bookie2), io:format("Big SST ~w files Little SST ~w files~n", @@ -1004,46 +1006,137 @@ many_put_fetch_switchcompression(_Config) -> RootPath = testutil:reset_filestructure(), StartOpts1 = [{root_path, RootPath}, {max_pencillercachesize, 16000}, - {sync_strategy, riak_sync}, + {max_journalobjectcount, 30000}, + {compression_level, 3}, + {sync_strategy, testutil:sync_strategy()}, {compression_method, native}], + StartOpts2 = [{root_path, RootPath}, + {max_pencillercachesize, 24000}, + {max_journalobjectcount, 30000}, + {sync_strategy, testutil:sync_strategy()}, + {compression_method, lz4}], + StartOpts3 = [{root_path, RootPath}, + {max_pencillercachesize, 16000}, + {max_journalobjectcount, 30000}, + {sync_strategy, testutil:sync_strategy()}, + {compression_method, none}], + + {ok, Bookie1} = leveled_bookie:book_start(StartOpts1), {TestObject, TestSpec} = testutil:generate_testobject(), ok = testutil:book_riakput(Bookie1, TestObject, TestSpec), testutil:check_forobject(Bookie1, TestObject), + CL1s = + testutil:load_objects( + 40000, + [2, 40002], + Bookie1, + TestObject, + fun testutil:generate_smallobjects/2), + + lists:foreach( + fun(CL) -> ok = testutil:check_forlist(Bookie1, CL) end, CL1s), ok = leveled_bookie:book_close(Bookie1), - StartOpts2 = [{root_path, RootPath}, - {max_journalsize, 500000000}, - {max_pencillercachesize, 32000}, - {sync_strategy, testutil:sync_strategy()}, - {compression_method, lz4}], - %% Change compression method + %% Change compression method -> lz4 {ok, Bookie2} = leveled_bookie:book_start(StartOpts2), testutil:check_forobject(Bookie2, TestObject), - GenList = [2, 40002, 80002, 120002], - CLs = testutil:load_objects(40000, GenList, Bookie2, TestObject, - fun testutil:generate_smallobjects/2), - CL1A = lists:nth(1, CLs), - ChkListFixed = lists:nth(length(CLs), CLs), - testutil:check_forlist(Bookie2, CL1A), - ObjList2A = testutil:generate_objects(5000, 2), - testutil:riakload(Bookie2, ObjList2A), - ChkList2A = lists:sublist(lists:sort(ObjList2A), 1000), - testutil:check_forlist(Bookie2, ChkList2A), - testutil:check_forlist(Bookie2, ChkListFixed), - testutil:check_forobject(Bookie2, TestObject), - testutil:check_forlist(Bookie2, ChkList2A), - testutil:check_forlist(Bookie2, ChkListFixed), - testutil:check_forobject(Bookie2, TestObject), + lists:foreach( + fun(CL) -> ok = testutil:check_forlist(Bookie2, CL) end, CL1s), + + CL2s = + testutil:load_objects( + 40000, + [80002, 120002], + Bookie2, + TestObject, + fun testutil:generate_smallobjects/2), + lists:foreach( + fun(CL) -> ok = testutil:check_forlist(Bookie2, CL) end, CL2s), + lists:foreach( + fun(CL) -> ok = testutil:check_forlist(Bookie2, CL) end, CL1s), ok = leveled_bookie:book_close(Bookie2), %% Change method back again {ok, Bookie3} = leveled_bookie:book_start(StartOpts1), - testutil:check_forlist(Bookie3, ChkList2A), - testutil:check_forlist(Bookie3, ChkListFixed), - testutil:check_forobject(Bookie3, TestObject), testutil:check_formissingobject(Bookie3, "Bookie1", "MissingKey0123"), - ok = leveled_bookie:book_destroy(Bookie3). + lists:foreach( + fun(CL) -> ok = testutil:check_forlist(Bookie3, CL) end, CL2s), + lists:foreach( + fun(CL) -> ok = testutil:check_forlist(Bookie3, CL) end, CL1s), + + CL3s = + testutil:load_objects( + 40000, + [160002, 200002], + Bookie3, + TestObject, + fun testutil:generate_smallobjects/2, + 30000 + ), + lists:foreach( + fun(CL) -> ok = testutil:check_forlist(Bookie3, CL) end, CL3s), + ok = leveled_bookie:book_close(Bookie3), + + % Change method to no compression + {ok, Bookie4} = leveled_bookie:book_start(StartOpts3), + lists:foreach( + fun(CL) -> ok = testutil:check_forlist(Bookie4, CL) end, CL2s), + lists:foreach( + fun(CL) -> ok = testutil:check_forlist(Bookie4, CL) end, CL1s), + lists:foreach( + fun(CL) -> ok = testutil:check_forlist(Bookie4, CL) end, CL3s), + + CL4s = + testutil:load_objects( + 40000, + [240002, 280002], + Bookie4, + TestObject, + fun testutil:generate_smallobjects/2 + ), + lists:foreach( + fun(CL) -> ok = testutil:check_forlist(Bookie4, CL) end, CL3s), + lists:foreach( + fun(CL) -> ok = testutil:check_forlist(Bookie4, CL) end, CL4s), + testutil:delete_some_objects(Bookie4, lists:flatten(CL3s), 60000), + CL5s = + testutil:load_objects( + 40000, + [320002, 360002], + Bookie4, + TestObject, + fun testutil:generate_smallobjects/2 + ), + ok = leveled_bookie:book_compactjournal(Bookie4, 30000), + testutil:wait_for_compaction(Bookie4), + lists:foreach( + fun(CL) -> ok = testutil:check_forlist(Bookie4, CL) end, CL4s), + lists:foreach( + fun(CL) -> ok = testutil:check_forlist(Bookie4, CL) end, CL5s), + + ok = leveled_bookie:book_close(Bookie4), + + %% Change compression method -> lz4 + {ok, Bookie5} = leveled_bookie:book_start(StartOpts2), + lists:foreach( + fun(CL) -> ok = testutil:check_forlist(Bookie5, CL) end, CL1s), + lists:foreach( + fun(CL) -> ok = testutil:check_forlist(Bookie5, CL) end, CL4s), + lists:foreach( + fun(CL) -> ok = testutil:check_forlist(Bookie5, CL) end, CL5s), + ok = leveled_bookie:book_close(Bookie5), + + %% Change compression method -> native + {ok, Bookie6} = leveled_bookie:book_start(StartOpts1), + lists:foreach( + fun(CL) -> ok = testutil:check_forlist(Bookie6, CL) end, CL1s), + lists:foreach( + fun(CL) -> ok = testutil:check_forlist(Bookie6, CL) end, CL4s), + lists:foreach( + fun(CL) -> ok = testutil:check_forlist(Bookie6, CL) end, CL5s), + + ok = leveled_bookie:book_destroy(Bookie6). safereaderror_startup(_Config) -> diff --git a/test/end_to_end/iterator_SUITE.erl b/test/end_to_end/iterator_SUITE.erl index b4c570ba..c4bf475c 100644 --- a/test/end_to_end/iterator_SUITE.erl +++ b/test/end_to_end/iterator_SUITE.erl @@ -28,19 +28,21 @@ all() -> [ expiring_indexes(_Config) -> - % Add objects to ths tore with index entries, where the objects (and hence + % Add objects to the store with index entries, where the objects (and hence % the indexes have an expiry time. Confirm that the indexes and the % objects are no longer present after the expiry time (and are present % before). Confirm that replacing an object has the expected outcome, if % the IndexSpecs are updated as part of the request. KeyCount = 50000, - Future = 120, - % 2 minutes - if running tests on a slow machine, may need to increase + Future = 60, + % 1 minute - if running tests on a slow machine, may need to increase % this value RootPath = testutil:reset_filestructure(), - StartOpts1 = [{root_path, RootPath}, - {max_journalsize, 100000000}, - {sync_strategy, testutil:sync_strategy()}], + StartOpts1 = + [{root_path, RootPath}, + {max_pencillercachesize, 16000}, + {max_journalobjectcount, 30000}, + {sync_strategy, testutil:sync_strategy()}], {ok, Bookie1} = leveled_bookie:book_start(StartOpts1), SW1 = os:timestamp(), @@ -48,12 +50,11 @@ expiring_indexes(_Config) -> LoadTime = timer:now_diff(os:timestamp(), SW1)/1000000, io:format("Load of ~w std objects in ~w seconds~n", [KeyCount, LoadTime]), - FilterFun = fun({I, _B, _K}) -> lists:member(I, [5, 6, 7, 8]) end, LoadedEntriesInRange = lists:sort(lists:filter(FilterFun, IBKL1)), - true = LoadTime < (Future - 30), - % need 30 seconds spare to run query + true = LoadTime < (Future - 20), + % need 20 seconds spare to run query % and add replacements {I0, B0, K0} = hd(IBKL1), @@ -62,12 +63,12 @@ expiring_indexes(_Config) -> CountI0Fold = fun() -> - leveled_bookie:book_indexfold(Bookie1, - B0, - {fun(_BF, _KT, Acc) -> Acc + 1 end, - 0}, - {<<"temp_int">>, I0, I0}, - {true, undefined}) + leveled_bookie:book_indexfold( + Bookie1, + B0, + {fun(_BF, _KT, Acc) -> Acc + 1 end, 0}, + {<<"temp_int">>, I0, I0}, + {true, undefined}) end, {async, I0Counter1} = CountI0Fold(), I0Count1 = I0Counter1(), @@ -76,29 +77,30 @@ expiring_indexes(_Config) -> InitAcc = [], IndexFold = fun() -> - leveled_bookie:book_indexfold(Bookie1, - B0, - {FoldFun, InitAcc}, - {<<"temp_int">>, 5, 8}, - {true, undefined}) + leveled_bookie:book_indexfold( + Bookie1, + B0, + {FoldFun, InitAcc}, + {<<"temp_int">>, 5, 8}, + {true, undefined}) end, {async, Folder1} = IndexFold(), QR1 = Folder1(), true = lists:sort(QR1) == LoadedEntriesInRange, % Replace object with one with an index value of 6 - testutil:stdload_object(Bookie1, B0, K0, 6, <<"value">>, - leveled_util:integer_now() + 600), + testutil:stdload_object( + Bookie1, B0, K0, 6, <<"value">>, leveled_util:integer_now() + 600), % Confirm that this has reduced the index entries in I0 by 1 {async, I0Counter2} = CountI0Fold(), I0Count2 = I0Counter2(), io:format("Count with index value ~w changed from ~w to ~w~n", [I0, I0Count1, I0Count2]), true = I0Count2 == (I0Count1 - 1), - % Now replace again, shortening the timeout to 15s, + % Now replace again, shortening the timeout to 10s, % this time index value of 6 - testutil:stdload_object(Bookie1, B0, K0, 5, <<"value">>, - leveled_util:integer_now() + 15), + testutil:stdload_object( + Bookie1, B0, K0, 5, <<"value">>, leveled_util:integer_now() + 10), {async, Folder2} = IndexFold(), leveled_bookie:book_indexfold(Bookie1, B0, @@ -108,8 +110,8 @@ expiring_indexes(_Config) -> QR2 = Folder2(), io:format("Query with additional entry length ~w~n", [length(QR2)]), true = lists:sort(QR2) == lists:sort([{5, B0, K0}|LoadedEntriesInRange]), - % Wait for a 15s timeout + lus a second to be sure - timer:sleep(15000 + 1000), + % Wait for a 10s timeout plus a second to be sure + timer:sleep(10000 + 1000), {async, Folder3} = IndexFold(), QR3 = Folder3(), % Now the entry should be missing (and the 600s TTL entry should not have @@ -118,16 +120,23 @@ expiring_indexes(_Config) -> true = lists:sort(QR3) == LoadedEntriesInRange, FoldTime = timer:now_diff(os:timestamp(), SW1)/1000000 - LoadTime, - io:format("Query returned ~w entries in ~w seconds - 3 queries + 15s wait~n", + io:format("Query returned ~w entries in ~w seconds - 3 queries + 10s wait~n", [length(QR1), FoldTime]), true = (LoadTime + FoldTime) < Future, - SleepTime = round((Future - (LoadTime + FoldTime)) * 1000 + 1000), % add a second + SleepTime = round((Future - (LoadTime + FoldTime)) * 1000), io:format("Sleeping ~w s for all to expire~n", [SleepTime/1000]), - timer:sleep(SleepTime), + timer:sleep(SleepTime + 1000), % add a second % Index entries should now have expired {async, Folder4} = IndexFold(), QR4 = Folder4(), + io:format("Unexpired indexes of length ~w~n", [length(QR4)]), + lists:foreach( + fun(I) -> + io:format("Unexpired index ~p~n", [I]) + end, + QR4 + ), true = QR4 == [], ok = leveled_bookie:book_close(Bookie1),