diff --git a/priv/leveled.schema b/priv/leveled.schema index 1582f411..6f726931 100644 --- a/priv/leveled.schema +++ b/priv/leveled.schema @@ -26,6 +26,15 @@ hidden ]}. +%% @doc The maximum size of the bookie's cache before each new PUT results in +%% a slow-offer pause. Prior to Riak 3.0.10 this defaulted to 4 +{mapping, "leveled.cache_multiple", "leveled.cache_multiple", [ + {default, 2}, + {datatype, integer}, + hidden +]}. + + %% @doc The key size of the Penciller's in-memory cache {mapping, "leveled.penciller_cache_size", "leveled.penciller_cache_size", [ {default, 20000}, diff --git a/src/leveled_bookie.erl b/src/leveled_bookie.erl index 41fd40bc..0842982b 100644 --- a/src/leveled_bookie.erl +++ b/src/leveled_bookie.erl @@ -106,6 +106,7 @@ -define(LOADING_PAUSE, 1000). -define(CACHE_SIZE, 2500). +-define(MAX_CACHE_MULTTIPLE, 2). -define(MIN_CACHE_SIZE, 100). -define(MIN_PCL_CACHE_SIZE, 400). -define(MAX_PCL_CACHE_SIZE, 28000). @@ -134,6 +135,7 @@ [{root_path, undefined}, {snapshot_bookie, undefined}, {cache_size, ?CACHE_SIZE}, + {cache_multiple, ?MAX_CACHE_MULTTIPLE}, {max_journalsize, 1000000000}, {max_journalobjectcount, 200000}, {max_sstslots, 256}, @@ -166,7 +168,8 @@ -record(state, {inker :: pid() | undefined, penciller :: pid() | undefined, - cache_size :: integer() | undefined, + cache_size :: pos_integer() | undefined, + cache_multiple :: pos_integer() |undefined, ledger_cache = #ledger_cache{} :: ledger_cache(), is_snapshot :: boolean() | undefined, slow_offer = false :: boolean(), @@ -236,6 +239,10 @@ % randomised jitter (randomised jitter will still be added to % configured values) % The minimum value is 100 - any lower value will be ignored + {cache_multiple, pos_integer()} | + % A multiple of the cache size beyond which the cache should not + % grow even if the penciller is busy. A pasue will be returned for + % every PUT when this multiple of the cache_size is reached {max_journalsize, pos_integer()} | % The maximum size of a journal file in bytes. The absolute % maximum must be 4GB due to 4 byte file pointers being used @@ -1208,6 +1215,8 @@ init([Opts]) -> max(1, ConfiguredCacheSize div (100 div ?CACHE_SIZE_JITTER)), CacheSize = ConfiguredCacheSize + erlang:phash2(self()) rem CacheJitter, + MaxCacheMultiple = + proplists:get_value(cache_multiple, Opts), PCLMaxSize = PencillerOpts#penciller_options.max_inmemory_tablesize, CacheRatio = PCLMaxSize div ConfiguredCacheSize, @@ -1242,10 +1251,13 @@ init([Opts]) -> PencillerOpts0 = PencillerOpts#penciller_options{sst_options = SSTOpts0}, - State0 = #state{cache_size=CacheSize, - is_snapshot=false, - head_only=HeadOnly, - head_lookup = HeadLookup}, + State0 = + #state{ + cache_size=CacheSize, + cache_multiple = MaxCacheMultiple, + is_snapshot=false, + head_only=HeadOnly, + head_lookup = HeadLookup}, {Inker, Penciller} = startup(InkerOpts, PencillerOpts0, State0), @@ -1297,9 +1309,11 @@ handle_call({put, Bucket, Key, Object, IndexSpecs, Tag, TTL, DataSync}, gen_server:reply(From, ok) end, maybe_longrunning(SW0, overall_put), - case maybepush_ledgercache(State#state.cache_size, - Cache0, - State#state.penciller) of + case maybepush_ledgercache( + State#state.cache_size, + State#state.cache_multiple, + Cache0, + State#state.penciller) of {ok, NewCache} -> {noreply, State#state{ledger_cache = NewCache, put_timings = Timings, @@ -1326,9 +1340,11 @@ handle_call({mput, ObjectSpecs, TTL}, From, State) false -> gen_server:reply(From, ok) end, - case maybepush_ledgercache(State#state.cache_size, - Cache0, - State#state.penciller) of + case maybepush_ledgercache( + State#state.cache_size, + State#state.cache_multiple, + Cache0, + State#state.penciller) of {ok, NewCache} -> {noreply, State#state{ledger_cache = NewCache, slow_offer = false}}; @@ -2356,8 +2372,9 @@ check_in_ledgercache(PK, Hash, Cache, loader) -> end. --spec maybepush_ledgercache(integer(), ledger_cache(), pid()) - -> {ok|returned, ledger_cache()}. +-spec maybepush_ledgercache( + pos_integer(), pos_integer(), ledger_cache(), pid()) + -> {ok|returned, ledger_cache()}. %% @doc %% Following an update to the ledger cache, check if this now big enough to be %% pushed down to the Penciller. There is some random jittering here, to @@ -2368,10 +2385,10 @@ check_in_ledgercache(PK, Hash, Cache, loader) -> %% in the reply. Try again later when it isn't busy (and also potentially %% implement a slow_offer state to slow down the pace at which PUTs are being %% received) -maybepush_ledgercache(MaxCacheSize, Cache, Penciller) -> +maybepush_ledgercache(MaxCacheSize, MaxCacheMult, Cache, Penciller) -> Tab = Cache#ledger_cache.mem, CacheSize = ets:info(Tab, size), - TimeToPush = maybe_withjitter(CacheSize, MaxCacheSize), + TimeToPush = maybe_withjitter(CacheSize, MaxCacheSize, MaxCacheMult), if TimeToPush -> CacheToLoad = {Tab, @@ -2391,14 +2408,16 @@ maybepush_ledgercache(MaxCacheSize, Cache, Penciller) -> {ok, Cache} end. --spec maybe_withjitter(integer(), integer()) -> boolean(). +-spec maybe_withjitter( + non_neg_integer(), pos_integer(), pos_integer()) -> boolean(). %% @doc %% Push down randomly, but the closer to 4 * the maximum size, the more likely %% a push should be -maybe_withjitter(CacheSize, MaxCacheSize) when CacheSize > MaxCacheSize -> - R = leveled_rand:uniform(4 * MaxCacheSize), +maybe_withjitter( + CacheSize, MaxCacheSize, MaxCacheMult) when CacheSize > MaxCacheSize -> + R = leveled_rand:uniform(MaxCacheMult * MaxCacheSize), (CacheSize - MaxCacheSize) > R; -maybe_withjitter(_CacheSize, _MaxCacheSize) -> +maybe_withjitter(_CacheSize, _MaxCacheSize, _MaxCacheMult) -> false. diff --git a/src/leveled_pmanifest.erl b/src/leveled_pmanifest.erl index 68463aeb..bdfee8ae 100644 --- a/src/leveled_pmanifest.erl +++ b/src/leveled_pmanifest.erl @@ -149,8 +149,11 @@ copy_manifest(Manifest) -> % about is switched to undefined Manifest#manifest{snapshots = undefined, pending_deletes = undefined}. --spec load_manifest(manifest(), fun(), fun()) -> - {integer(), manifest(), list()}. +-spec load_manifest( + manifest(), + fun((file:name_all(), 1..7) -> {pid(), leveled_ebloom:bloom()}), + fun((pid()) -> pos_integer())) + -> {integer(), manifest(), list()}. %% @doc %% Roll over the manifest starting a process to manage each file in the %% manifest. The PidFun should be able to return the Pid of a file process @@ -182,7 +185,9 @@ load_manifest(Manifest, LoadFun, SQNFun) -> {0, Manifest, []}, lists:reverse(lists:seq(0, Manifest#manifest.basement))). --spec close_manifest(manifest(), fun()) -> ok. +-spec close_manifest( + manifest(), + fun((any()) -> ok)) -> ok. %% @doc %% Close all the files in the manifest (using CloseEntryFun to call close on %% a file). Firts all the files in the active manifest are called, and then diff --git a/src/leveled_pmem.erl b/src/leveled_pmem.erl index 17b92776..6f4c8736 100644 --- a/src/leveled_pmem.erl +++ b/src/leveled_pmem.erl @@ -140,7 +140,8 @@ add_to_cache(L0Size, {LevelMinus1, MinSQN, MaxSQN}, LedgerSQN, TreeList) -> end end. --spec to_list(integer(), fun()) -> list(). +-spec to_list( + integer(), fun((pos_integer()) -> leveled_tree:leveled_tree())) -> list(). %% @doc %% The cache is a list of leveled_trees of length Slots. This will fetch %% each tree in turn by slot ID and then produce a merged/sorted output of