From 313b8ed0417c272d63f176bff7c876048567817a Mon Sep 17 00:00:00 2001 From: Martin Sumner Date: Sat, 30 Nov 2024 22:00:20 +0000 Subject: [PATCH] Mas d31 leveled.i459 partialmerge (#461) * Mas d34 leveled.i459 partialmerge (#460) * Add test to replicate issue 459 Nothing actually crashes due to the issue - but looking at the logs there is the polarised stats associated with the issue. When merging into L3, you would normally expect to merge into 4 files - but actually we see FileCounter occasionally spiking. * Add partial merge support There is a `max_mergebelow` size which can be a positive integer, or infinity. It defaults to 32. If a merge from Level N covers less than `max_mergebelow` files in level N + 1 - the merge will proceesd as before. If it has >= `max_mergebelow`, the merge will be curtailed when `max_mergebelow div 2` files have been created at that level. The remainder for Level N will then be written, as well as for Level N + 1 up to the next whole file that has no yet been touched by the merge. The backlog that prompted the merge will still exist - as the files in Level N have not been changed. However, it is likely the next file picked will not be the same one, and will in probability have a lower number of files to merge (as the average is =< 8). This will stop progress from being halted by long merge jobs, as they will exit out in a safe way after partial completion. In the case where the majority of files covered do not require a merge, then those files will be skipped the next time the remainder file is picked up for merge at Level N * Optimise test Test made faster through backporting testutil:get_compressiblevalue/0 from develop-3.4. Also use lower max_sstslots to invoke condition with fewer kesy, and reduce test time. --- include/leveled.hrl | 4 +- rebar.config | 4 +- src/leveled_bookie.erl | 11 +- src/leveled_log.erl | 4 +- src/leveled_pclerk.erl | 220 +++++++++++++++++++++++++-------- src/leveled_sst.erl | 52 +++++--- test/end_to_end/riak_SUITE.erl | 162 +++++++++++++++++++++++- test/end_to_end/testutil.erl | 32 +++-- 8 files changed, 404 insertions(+), 85 deletions(-) diff --git a/include/leveled.hrl b/include/leveled.hrl index 55b82816..cb645aa1 100644 --- a/include/leveled.hrl +++ b/include/leveled.hrl @@ -33,6 +33,7 @@ %%% Non-configurable startup defaults %%%============================================================================ -define(MAX_SSTSLOTS, 256). +-define(MAX_MERGEBELOW, 24). -define(LOADING_PAUSE, 1000). -define(LOADING_BATCH, 1000). -define(CACHE_SIZE_JITTER, 25). @@ -107,7 +108,8 @@ press_level = ?COMPRESSION_LEVEL :: non_neg_integer(), log_options = leveled_log:get_opts() :: leveled_log:log_options(), - max_sstslots = ?MAX_SSTSLOTS :: pos_integer(), + max_sstslots = ?MAX_SSTSLOTS :: pos_integer()|infinity, + max_mergebelow = ?MAX_MERGEBELOW :: pos_integer()|infinity, pagecache_level = ?SST_PAGECACHELEVEL_NOLOOKUP :: pos_integer(), monitor = {no_monitor, 0} diff --git a/rebar.config b/rebar.config index f0b17482..0ac285aa 100644 --- a/rebar.config +++ b/rebar.config @@ -29,8 +29,8 @@ ]}. {deps, [ - {lz4, ".*", {git, "https://github.com/nhs-riak/erlang-lz4", {branch, "nhse-develop"}}}, - {zstd, ".*", {git, "https://github.com/nhs-riak/zstd-erlang", {branch, "nhse-develop"}}} + {lz4, ".*", {git, "https://github.com/OpenRiak/erlang-lz4", {branch, "openriak-3.2"}}}, + {zstd, ".*", {git, "https://github.com/OpenRiak/zstd-erlang", {branch, "openriak-3.2"}}} ]}. {ct_opts, [{dir, ["test/end_to_end"]}]}. diff --git a/src/leveled_bookie.erl b/src/leveled_bookie.erl index 429480a0..e4f9bd92 100644 --- a/src/leveled_bookie.erl +++ b/src/leveled_bookie.erl @@ -114,6 +114,7 @@ {max_journalsize, 1000000000}, {max_journalobjectcount, 200000}, {max_sstslots, 256}, + {max_mergebelow, 24}, {sync_strategy, ?DEFAULT_SYNC_STRATEGY}, {head_only, false}, {waste_retention_period, undefined}, @@ -201,6 +202,12 @@ % The maximum number of slots in a SST file. All testing is done % at a size of 256 (except for Quickcheck tests}, altering this % value is not recommended + {max_mergeblow, pos_integer()|infinity} | + % The maximum number of files for a single file to be merged into + % within the ledger. If less than this, the merge will continue + % without a maximum. If this or more overlapping below, only up + % to max_mergebelow div 2 additions should be created (the merge + % should be partial) {sync_strategy, sync_mode()} | % Should be sync if it is necessary to flush to disk after every % write, or none if not (allow the OS to schecdule). This has a @@ -293,7 +300,7 @@ % To which level of the ledger should the ledger contents be % pre-loaded into the pagecache (using fadvise on creation and % startup) - {compression_method, native|lz4|zstd|none} | + {compression_method, native|lz4|zstd|none} | % Compression method and point allow Leveled to be switched from % using bif based compression (zlib) to using nif based compression % (lz4 or zstd). @@ -1836,6 +1843,7 @@ set_options(Opts, Monitor) -> CompressionLevel = proplists:get_value(compression_level, Opts), MaxSSTSlots = proplists:get_value(max_sstslots, Opts), + MaxMergeBelow = proplists:get_value(max_mergebelow, Opts), ScoreOneIn = proplists:get_value(journalcompaction_scoreonein, Opts), @@ -1869,6 +1877,7 @@ set_options(Opts, Monitor) -> press_level = CompressionLevel, log_options = leveled_log:get_opts(), max_sstslots = MaxSSTSlots, + max_mergebelow = MaxMergeBelow, monitor = Monitor}, monitor = Monitor} }. diff --git a/src/leveled_log.erl b/src/leveled_log.erl index dfdc43ec..2dced9aa 100644 --- a/src/leveled_log.erl +++ b/src/leveled_log.erl @@ -147,7 +147,7 @@ pc010 => {info, <<"Merge to be commenced for FileToMerge=~s with MSN=~w">>}, pc011 => - {info, <<"Merge completed with MSN=~w to Level=~w and FileCounter=~w">>}, + {info, <<"Merge completed with MSN=~w to Level=~w and FileCounter=~w merge_type=~w">>}, pc012 => {debug, <<"File to be created as part of MSN=~w Filename=~s IsBasement=~w">>}, pc013 => @@ -172,6 +172,8 @@ {info, <<"Grooming compaction picked file with tomb_count=~w">>}, pc025 => {info, <<"At level=~w file_count=~w average words for heap_block_size=~w heap_size=~w recent_size=~w bin_vheap_size=~w">>}, + pc026 => + {info, <<"Performing potential partial to level=~w merge as FileCounter=~w restricting to MaxAdditions=~w">>}, pm002 => {info, <<"Completed dump of L0 cache to list of l0cache_size=~w">>}, sst03 => diff --git a/src/leveled_pclerk.erl b/src/leveled_pclerk.erl index c242dbef..410217eb 100644 --- a/src/leveled_pclerk.erl +++ b/src/leveled_pclerk.erl @@ -262,61 +262,177 @@ perform_merge(Manifest, MaxSQN = leveled_sst:sst_getmaxsequencenumber(Src#manifest_entry.owner), SinkLevel = SrcLevel + 1, SinkBasement = leveled_pmanifest:is_basement(Manifest, SinkLevel), - Additions = - do_merge(SrcList, SinkList, - SinkLevel, SinkBasement, - RootPath, NewSQN, MaxSQN, - OptsSST, - []), - RevertPointerFun = - fun({next, ME, _SK}) -> - ME + MaxMergeBelow = OptsSST#sst_options.max_mergebelow, + MergeLimit = merge_limit(SrcLevel, length(SinkList), MaxMergeBelow), + {L2Additions, L1Additions, L2FileRemainder} = + do_merge( + SrcList, SinkList, + SinkLevel, SinkBasement, + RootPath, NewSQN, MaxSQN, + OptsSST, + [], + MergeLimit + ), + RevertPointerFun = fun({next, ME, _SK}) -> ME end, + SinkManifestRemovals = + lists:subtract( + lists:map(RevertPointerFun, SinkList), + lists:map(RevertPointerFun, L2FileRemainder) + ), + Man0 = + leveled_pmanifest:replace_manifest_entry( + Manifest, + NewSQN, + SinkLevel, + SinkManifestRemovals, + L2Additions + ), + Man1 = + case L1Additions of + [] -> + leveled_pmanifest:remove_manifest_entry( + Man0, + NewSQN, + SrcLevel, + Src + ); + PartialFiles -> + leveled_pmanifest:replace_manifest_entry( + Man0, + NewSQN, + SrcLevel, + [Src], + PartialFiles + ) end, - SinkManifestList = lists:map(RevertPointerFun, SinkList), - Man0 = leveled_pmanifest:replace_manifest_entry(Manifest, - NewSQN, - SinkLevel, - SinkManifestList, - Additions), - Man2 = leveled_pmanifest:remove_manifest_entry(Man0, - NewSQN, - SrcLevel, - Src), - {Man2, [Src|SinkManifestList]}. - -do_merge([], [], SinkLevel, _SinkB, _RP, NewSQN, _MaxSQN, _Opts, Additions) -> - leveled_log:log(pc011, [NewSQN, SinkLevel, length(Additions)]), - Additions; -do_merge(KL1, KL2, SinkLevel, SinkB, RP, NewSQN, MaxSQN, OptsSST, Additions) -> - FileName = leveled_penciller:sst_filename(NewSQN, - SinkLevel, - length(Additions)), + {Man1, [Src|SinkManifestRemovals]}. + +-spec merge_limit( + non_neg_integer(), non_neg_integer(), pos_integer()|infinity) + -> pos_integer()|infinity. +merge_limit(SrcLevel, SinkListLength, MMB) when SrcLevel =< 1; SinkListLength < MMB -> + infinity; +merge_limit(SrcLevel, SinkListLength, MMB) when is_integer(MMB) -> + AdditionsLimit = max(1, MMB div 2), + leveled_log:log(pc026, [SrcLevel + 1, SinkListLength, AdditionsLimit]), + AdditionsLimit. + +-type merge_maybe_expanded_pointer() :: + leveled_codec:ledger_kv()| + leveled_sst:slot_pointer()| + leveled_sst:sst_pointer(). + % Different to leveled_sst:maybe_expanded_pointer/0 + % No sst_closed_pointer() + +do_merge( + [], [], SinkLevel, _SinkB, _RP, NewSQN, _MaxSQN, _Opts, Additions, _Max) -> + leveled_log:log(pc011, [NewSQN, SinkLevel, length(Additions), full]), + {lists:reverse(Additions), [], []}; +do_merge( + KL1, KL2, SinkLevel, SinkB, RP, NewSQN, MaxSQN, OptsSST, Additions, Max) + when length(Additions) >= Max -> + leveled_log:log(pc011, [NewSQN, SinkLevel, length(Additions), partial]), + FNSrc = + leveled_penciller:sst_filename( + NewSQN, SinkLevel - 1, 1 + ), + FNSnk = + leveled_penciller:sst_filename( + NewSQN, SinkLevel, length(Additions) + 1 + ), + {ExpandedKL1, []} = split_unexpanded_files(KL1), + {ExpandedKL2, L2FilePointersRem} = split_unexpanded_files(KL2), + TS1 = os:timestamp(), + InfOpts = OptsSST#sst_options{max_sstslots = infinity}, + % Need to be careful to make sure all the remainder goes in one file, + % could be situations whereby the max_sstslots has been changed between + % restarts - and so there is too much data for one file in the + % remainder ... but don't want to loop round and consider more complex + % scenarios here. + NewMergeKL1 = + leveled_sst:sst_newmerge( + RP, FNSrc,ExpandedKL1, [], false, SinkLevel - 1, MaxSQN, InfOpts + ), + TS2 = os:timestamp(), + NewMergeKL2 = + leveled_sst:sst_newmerge( + RP, FNSnk, [], ExpandedKL2, SinkB, SinkLevel, MaxSQN, InfOpts + ), + {KL1Additions, [], []} = add_entry(NewMergeKL1, FNSrc, TS1, []), + {KL2Additions, [], []} = add_entry(NewMergeKL2, FNSnk, TS2, Additions), + {lists:reverse(KL2Additions), KL1Additions, L2FilePointersRem}; +do_merge( + KL1, KL2, SinkLevel, SinkB, RP, NewSQN, MaxSQN, OptsSST, Additions, Max) -> + FileName = + leveled_penciller:sst_filename( + NewSQN, SinkLevel, length(Additions) + ), leveled_log:log(pc012, [NewSQN, FileName, SinkB]), TS1 = os:timestamp(), - case leveled_sst:sst_newmerge(RP, FileName, - KL1, KL2, SinkB, SinkLevel, MaxSQN, - OptsSST) of - empty -> - leveled_log:log(pc013, [FileName]), - do_merge([], [], - SinkLevel, SinkB, - RP, NewSQN, MaxSQN, - OptsSST, - Additions); - {ok, Pid, Reply, Bloom} -> - {{KL1Rem, KL2Rem}, SmallestKey, HighestKey} = Reply, - Entry = #manifest_entry{start_key=SmallestKey, - end_key=HighestKey, - owner=Pid, - filename=FileName, - bloom=Bloom}, - leveled_log:log_timer(pc015, [], TS1), - do_merge(KL1Rem, KL2Rem, - SinkLevel, SinkB, - RP, NewSQN, MaxSQN, - OptsSST, - Additions ++ [Entry]) - end. + NewMerge = + leveled_sst:sst_newmerge( + RP, FileName, KL1, KL2, SinkB, SinkLevel, MaxSQN, OptsSST), + {UpdAdditions, KL1Rem, KL2Rem} = + add_entry(NewMerge, FileName, TS1, Additions), + do_merge( + KL1Rem, + KL2Rem, + SinkLevel, + SinkB, + RP, + NewSQN, + MaxSQN, + OptsSST, + UpdAdditions, + Max + ). + +add_entry(empty, FileName, _TS1, Additions) -> + leveled_log:log(pc013, [FileName]), + {[], [], Additions}; +add_entry({ok, Pid, Reply, Bloom}, FileName, TS1, Additions) -> + {{KL1Rem, KL2Rem}, SmallestKey, HighestKey} = Reply, + Entry = + #manifest_entry{ + start_key=SmallestKey, + end_key=HighestKey, + owner=Pid, + filename=FileName, + bloom=Bloom + }, + leveled_log:log_timer(pc015, [], TS1), + {[Entry|Additions], KL1Rem, KL2Rem}. + + +-spec split_unexpanded_files( + list(merge_maybe_expanded_pointer())) -> + { + list(leveled_codec:ledger_kv()|leveled_sst:slot_pointer()), + list(leveled_sst:sst_pointer()) + }. +split_unexpanded_files(Pointers) -> + split_unexpanded_files(Pointers, [], []). + +-spec split_unexpanded_files( + list(merge_maybe_expanded_pointer()), + list(leveled_codec:ledger_kv()|leveled_sst:slot_pointer()), + list(leveled_sst:sst_pointer())) -> + { + list(leveled_codec:ledger_kv()|leveled_sst:slot_pointer()), + list(leveled_sst:sst_pointer()) + }. +split_unexpanded_files([], MaybeExpanded, FilePointers) -> + {lists:reverse(MaybeExpanded), lists:reverse(FilePointers)}; +split_unexpanded_files([{next, P, SK}|Rest], MaybeExpanded, FilePointers) -> + split_unexpanded_files(Rest, MaybeExpanded, [{next, P, SK}|FilePointers]); +split_unexpanded_files([{LK, LV}|Rest], MaybeExpanded, []) -> + % Should never see this, once a FilePointer has been seen + split_unexpanded_files(Rest, [{LK, LV}|MaybeExpanded], []); +split_unexpanded_files([{pointer, P, SIV, SK, EK}|Rest], MaybeExpanded, []) -> + % Should never see this, once a FilePointer has been seen + split_unexpanded_files( + Rest, [{pointer, P, SIV, SK, EK}|MaybeExpanded], [] + ). -spec grooming_scorer( list(leveled_pmanifest:manifest_entry())) diff --git a/src/leveled_sst.erl b/src/leveled_sst.erl index 9f55b332..9d2dd680 100644 --- a/src/leveled_sst.erl +++ b/src/leveled_sst.erl @@ -231,7 +231,17 @@ -type build_timings() :: no_timing|#build_timings{}. --export_type([expandable_pointer/0, press_method/0, segment_check_fun/0]). +-export_type( + [ + expandable_pointer/0, + sst_closed_pointer/0, + sst_pointer/0, + slot_pointer/0, + press_method/0, + segment_check_fun/0, + sst_options/0 + ] +). %%%============================================================================ %%% API @@ -292,17 +302,29 @@ sst_new(RootPath, Filename, Level, KVList, MaxSQN, OptsSST, IndexModDate) -> {ok, Pid, {SK, EK}, Bloom} end. --spec sst_newmerge(string(), string(), - list(leveled_codec:ledger_kv()|sst_pointer()), - list(leveled_codec:ledger_kv()|sst_pointer()), - boolean(), leveled_pmanifest:lsm_level(), - integer(), sst_options()) - -> empty|{ok, pid(), - {{list(leveled_codec:ledger_kv()), - list(leveled_codec:ledger_kv())}, - leveled_codec:ledger_key(), - leveled_codec:ledger_key()}, - binary()}. +-spec sst_newmerge( + string(), string(), + list(leveled_codec:ledger_kv()|sst_pointer()), + list(leveled_codec:ledger_kv()|sst_pointer()), + boolean(), + leveled_pmanifest:lsm_level(), + integer(), + sst_options()) + -> + empty| + { + ok, + pid(), + { + { + list(leveled_codec:ledger_kv()), + list(leveled_codec:ledger_kv()) + }, + leveled_codec:ledger_key(), + leveled_codec:ledger_key() + }, + binary() + }. %% @doc %% Start a new SST file at the assigned level passing in a two lists of %% {Key, Value} pairs to be merged. The merge_lists function will use the @@ -1433,7 +1455,9 @@ compress_level(_Level, _LevelToCompress, PressMethod) -> PressMethod. -spec maxslots_level( - leveled_pmanifest:lsm_level(), pos_integer()) -> pos_integer(). + leveled_pmanifest:lsm_level(), pos_integer()|infinity) -> pos_integer()|infinity. +maxslots_level(_Level, infinity) -> + infinity; maxslots_level(Level, MaxSlotCount) when Level < ?DOUBLESIZE_LEVEL -> MaxSlotCount; maxslots_level(_Level, MaxSlotCount) -> @@ -2787,7 +2811,7 @@ merge_lists( list(binary_slot()), leveled_codec:ledger_key()|null, non_neg_integer(), - non_neg_integer(), + pos_integer()|infinity, press_method(), boolean(), non_neg_integer()|not_counted, diff --git a/test/end_to_end/riak_SUITE.erl b/test/end_to_end/riak_SUITE.erl index 90419835..4c5cbab8 100644 --- a/test/end_to_end/riak_SUITE.erl +++ b/test/end_to_end/riak_SUITE.erl @@ -1,8 +1,12 @@ -module(riak_SUITE). -include_lib("common_test/include/ct.hrl"). -include("include/leveled.hrl"). + -export([all/0]). --export([ + +-export( + [ + test_large_lsm_merge/1, basic_riak/1, fetchclocks_modifiedbetween/1, crossbucket_aae/1, @@ -11,7 +15,8 @@ dollar_key_index/1, bigobject_memorycheck/1, summarisable_sstindex/1 - ]). + ] +). all() -> [ basic_riak, @@ -21,17 +26,166 @@ all() -> [ dollar_bucket_index, dollar_key_index, bigobject_memorycheck, - summarisable_sstindex + summarisable_sstindex, + test_large_lsm_merge ]. -define(MAGIC, 53). % riak_kv -> riak_object +test_large_lsm_merge(_Config) -> + lsm_merge_tester(12). + +lsm_merge_tester(LoopsPerBucket) -> + RootPath = testutil:reset_filestructure("lsmMerge"), + PutsPerLoop = 32000, + SampleOneIn = 100, + StartOpts1 = + [ + {root_path, RootPath}, + {max_pencillercachesize, 16000}, + {max_sstslots, 48}, + % Make SST files smaller, to accelerate merges + {max_mergebelow, 24}, + {sync_strategy, testutil:sync_strategy()}, + {log_level, warn}, + {compression_method, zstd}, + { + forced_logs, + [ + b0015, b0016, b0017, b0018, p0032, sst12, + pc008, pc010, pc011, pc026, + p0018, p0024 + ] + } + ], + {ok, Bookie1} = leveled_bookie:book_start(StartOpts1), + + LoadBucketFun = + fun(Book, Bucket, Loops) -> + V = testutil:get_compressiblevalue(), + lists:foreach( + fun(_I) -> + {_, V} = + testutil:put_indexed_objects( + Book, + Bucket, + PutsPerLoop, + V + ) + end, + lists:seq(1, Loops) + ), + V + end, + + V1 = LoadBucketFun(Bookie1, <<"B1">>, LoopsPerBucket), + io:format("Completed load of ~s~n", [<<"B1">>]), + V2 = LoadBucketFun(Bookie1, <<"B2">>, LoopsPerBucket), + io:format("Completed load of ~s~n", [<<"B2">>]), + ValueMap = #{<<"B1">> => V1, <<"B2">> => V2}, + + CheckBucketFun = + fun(Book) -> + BookHeadFoldFun = + fun(B, K, _Hd, {SampleKeys, CountAcc}) -> + UpdCntAcc = + maps:update_with(B, fun(C) -> C + 1 end, 1, CountAcc), + case rand:uniform(SampleOneIn) of + R when R == 1 -> + {[{B, K}|SampleKeys], UpdCntAcc}; + _ -> + {SampleKeys, UpdCntAcc} + end + end, + {async, HeadFolder} = + leveled_bookie:book_headfold( + Book, + ?RIAK_TAG, + {BookHeadFoldFun, {[], maps:new()}}, + true, + false, + false + ), + {Time, R} = timer:tc(HeadFolder), + io:format( + "CheckBucketFold returned counts ~w in ~w ms~n", + [element(2, R), Time div 1000] + ), + R + end, + + {SampleKeysF1, CountMapF1} = CheckBucketFun(Bookie1), + true = (LoopsPerBucket * PutsPerLoop) == maps:get(<<"B1">>, CountMapF1), + true = (LoopsPerBucket * PutsPerLoop) == maps:get(<<"B2">>, CountMapF1), + + TestSampleKeyFun = + fun(Book, Values) -> + fun({B, K}) -> + ExpectedV = maps:get(B, Values), + {ok, Obj} = testutil:book_riakget(Book, B, K), + true = ExpectedV == testutil:get_value(Obj) + end + end, + + {GT1, ok} = + timer:tc( + fun() -> + lists:foreach(TestSampleKeyFun(Bookie1, ValueMap), SampleKeysF1) + end + ), + io:format( + "Returned ~w sample gets in ~w ms~n", + [length(SampleKeysF1), GT1 div 1000] + ), + + ok = leveled_bookie:book_close(Bookie1), + {ok, Bookie2} = + leveled_bookie:book_start( + lists:ukeysort(1, [{max_sstslots, 64}|StartOpts1]) + ), + + {SampleKeysF2, CountMapF2} = CheckBucketFun(Bookie2), + true = (LoopsPerBucket * PutsPerLoop) == maps:get(<<"B1">>, CountMapF2), + true = (LoopsPerBucket * PutsPerLoop) == maps:get(<<"B2">>, CountMapF2), + + {GT2, ok} = + timer:tc( + fun() -> + lists:foreach(TestSampleKeyFun(Bookie2, ValueMap), SampleKeysF2) + end + ), + io:format( + "Returned ~w sample gets in ~w ms~n", + [length(SampleKeysF2), GT2 div 1000] + ), + + V3 = LoadBucketFun(Bookie2, <<"B3">>, LoopsPerBucket), + io:format("Completed load of ~s~n", [<<"B3">>]), + UpdValueMap = #{<<"B1">> => V1, <<"B2">> => V2, <<"B3">> => V3}, + + {SampleKeysF3, CountMapF3} = CheckBucketFun(Bookie2), + true = (LoopsPerBucket * PutsPerLoop) == maps:get(<<"B1">>, CountMapF3), + true = (LoopsPerBucket * PutsPerLoop) == maps:get(<<"B2">>, CountMapF3), + true = (LoopsPerBucket * PutsPerLoop) == maps:get(<<"B3">>, CountMapF3), + + {GT3, ok} = + timer:tc( + fun() -> + lists:foreach(TestSampleKeyFun(Bookie2, UpdValueMap), SampleKeysF3) + end + ), + io:format( + "Returned ~w sample gets in ~w ms~n", + [length(SampleKeysF3), GT3 div 1000] + ), + + ok = leveled_bookie:book_destroy(Bookie2). + basic_riak(_Config) -> basic_riak_tester(<<"B0">>, 640000), basic_riak_tester({<<"Type0">>, <<"B0">>}, 80000). - basic_riak_tester(Bucket, KeyCount) -> % Key Count should be > 10K and divisible by 5 io:format("Basic riak test with Bucket ~w KeyCount ~w~n", diff --git a/test/end_to_end/testutil.erl b/test/end_to_end/testutil.erl index f003be3e..dc01ff9d 100644 --- a/test/end_to_end/testutil.erl +++ b/test/end_to_end/testutil.erl @@ -446,11 +446,15 @@ get_compressiblevalue() -> Selector = [{1, S1}, {2, S2}, {3, S3}, {4, S4}, {5, S5}, {6, S6}, {7, S7}, {8, S8}], L = lists:seq(1, 1024), - lists:foldl(fun(_X, Acc) -> - {_, Str} = lists:keyfind(leveled_rand:uniform(8), 1, Selector), - Acc ++ Str end, - "", - L). + iolist_to_binary( + lists:foldl( + fun(_X, Acc) -> + {_, Str} = lists:keyfind(rand:uniform(8), 1, Selector), + [Str|Acc] end, + [""], + L + ) + ). generate_smallobjects(Count, KeyNumber) -> generate_objects(Count, KeyNumber, [], leveled_rand:rand_bytes(512)). @@ -637,8 +641,10 @@ get_value(ObjectBin) -> <> = SibsBin, <> = Rest2, case ContentBin of - <<0, ContentBin0/binary>> -> - binary_to_term(ContentBin0) + <<0:8/integer, ContentBin0/binary>> -> + binary_to_term(ContentBin0); + <<1:8/integer, ContentAsIs/binary>> -> + ContentAsIs end; N -> io:format("SibCount of ~w with ObjectBin ~w~n", [N, ObjectBin]), @@ -696,8 +702,10 @@ get_randomindexes_generator(Count) -> lists:map( fun(X) -> {add, - list_to_binary("idx" ++ integer_to_list(X) ++ "_bin"), - list_to_binary(get_randomdate() ++ get_randomname())} + iolist_to_binary( + "idx" ++ integer_to_list(X) ++ "_bin"), + iolist_to_binary( + get_randomdate() ++ get_randomname())} end, lists:seq(1, Count)) end, @@ -780,7 +788,11 @@ put_indexed_objects(Book, Bucket, Count, V) -> KSpecL = lists:map( fun({_RN, Obj, Spc}) -> - book_riakput(Book, Obj, Spc), + R = book_riakput(Book,Obj, Spc), + case R of + ok -> ok; + pause -> timer:sleep(?SLOWOFFER_DELAY) + end, {testutil:get_key(Obj), Spc} end, ObjL1),