@@ -84,11 +84,15 @@ module ScheduledMerges (
84
84
evalInvariant ,
85
85
treeInvariant ,
86
86
mergeDebtInvariant ,
87
+
88
+ -- * Run sizes
89
+ levelNumberToMaxRunSize ,
90
+ runSizeToLevelNumber ,
91
+ maxBufferSize
87
92
) where
88
93
89
94
import Prelude hiding (lookup )
90
95
91
- import Data.Bits
92
96
import Data.Foldable (for_ , toList , traverse_ )
93
97
import Data.Map.Strict (Map )
94
98
import qualified Data.Map.Strict as Map
@@ -102,6 +106,8 @@ import qualified Control.Monad.Trans.Except as E
102
106
import Control.Tracer (Tracer , contramap , traceWith )
103
107
import GHC.Stack (HasCallStack , callStack )
104
108
109
+ import Text.Printf (printf )
110
+
105
111
import qualified Test.QuickCheck as QC
106
112
107
113
data LSM s = LSMHandle ! (STRef s Counter )
@@ -129,6 +135,10 @@ data LSMContent s =
129
135
130
136
type Levels s = [Level s ]
131
137
138
+ -- | The number of the level. The write buffer lives at level 0, and all other
139
+ -- levels are number starting from 1.
140
+ type LevelNo = Int
141
+
132
142
-- | A level is a sequence of resident runs at this level, prefixed by an
133
143
-- incoming run, which is usually multiple runs that are being merged. Once
134
144
-- completed, the resulting run will become a resident run at this level.
@@ -284,32 +294,6 @@ resolveValue (V x) (V y) = V (x + y)
284
294
newtype Blob = B Int
285
295
deriving stock (Eq , Show )
286
296
287
- -- | The size of the 4 tiering runs at each level are allowed to be:
288
- -- @4^(level-1) < size <= 4^level@
289
- --
290
- tieringRunSize :: Int -> Int
291
- tieringRunSize n = 4 ^ n
292
-
293
- -- | Levelling runs take up the whole level, so are 4x larger.
294
- --
295
- levellingRunSize :: Int -> Int
296
- levellingRunSize n = 4 ^ (n+ 1 )
297
-
298
- tieringRunSizeToLevel :: Run -> Int
299
- tieringRunSizeToLevel r
300
- | s <= maxBufferSize = 1 -- level numbers start at 1
301
- | otherwise =
302
- 1 + (finiteBitSize s - countLeadingZeros (s- 1 ) - 1 ) `div` 2
303
- where
304
- s = runSize r
305
-
306
- levellingRunSizeToLevel :: Run -> Int
307
- levellingRunSizeToLevel r =
308
- max 1 (tieringRunSizeToLevel r - 1 ) -- level numbers start at 1
309
-
310
- maxBufferSize :: Int
311
- maxBufferSize = tieringRunSize 1 -- 4
312
-
313
297
-- | We use levelling on the last level, unless that is also the first level.
314
298
mergePolicyForLevel :: Int -> [Level s ] -> UnionLevel s -> MergePolicy
315
299
mergePolicyForLevel 1 _ _ = MergePolicyTiering
@@ -326,7 +310,7 @@ mergeTypeForLevel _ _ = MergeMidLevel
326
310
-- the last level.
327
311
--
328
312
invariant :: forall s . LSMConfig -> LSMContent s -> ST s ()
329
- invariant _conf (LSMContent _ levels ul) = do
313
+ invariant conf (LSMContent _ levels ul) = do
330
314
levelsInvariant 1 levels
331
315
case ul of
332
316
NoUnion -> return ()
@@ -364,7 +348,7 @@ invariant _conf (LSMContent _ levels ul) = do
364
348
MergePolicyLevelling -> assertST $ null rs
365
349
-- Runs in tiering levels usually fit that size, but they can be one
366
350
-- larger, if a run has been held back (creating a 5-way merge).
367
- MergePolicyTiering -> assertST $ all (\ r -> tieringRunSizeToLevel r `elem` [ln, ln+ 1 ]) rs
351
+ MergePolicyTiering -> assertST $ all (\ r -> runToLevelNumber MergePolicyTiering conf r `elem` [ln, ln+ 1 ]) rs
368
352
-- (This is actually still not really true, but will hold in practice.
369
353
-- In the pathological case, all runs passed to the next level can be
370
354
-- factor (5/4) too large, and there the same holding back can lead to
@@ -383,13 +367,13 @@ invariant _conf (LSMContent _ levels ul) = do
383
367
(Single r, m) -> do
384
368
assertST $ case m of CompletedMerge {} -> True
385
369
OngoingMerge {} -> False
386
- assertST $ levellingRunSizeToLevel r == ln
370
+ assertST $ runToLevelNumber MergePolicyLevelling conf r == ln
387
371
388
372
-- A completed merge for levelling can be of almost any size at all!
389
373
-- It can be smaller, due to deletions in the last level. But it
390
374
-- can't be bigger than would fit into the next level.
391
375
(_, CompletedMerge r) ->
392
- assertST $ levellingRunSizeToLevel r <= ln+ 1
376
+ assertST $ runToLevelNumber MergePolicyLevelling conf r <= ln+ 1
393
377
394
378
-- An ongoing merge for levelling should have 4 incoming runs of
395
379
-- the right size for the level below (or slightly larger due to
@@ -402,8 +386,8 @@ invariant _conf (LSMContent _ levels ul) = do
402
386
assertST $ all (\ r -> runSize r > 0 ) rs -- don't merge empty runs
403
387
let incoming = take 4 rs
404
388
let resident = drop 4 rs
405
- assertST $ all (\ r -> tieringRunSizeToLevel r `elem` [ln- 1 , ln]) incoming
406
- assertST $ all (\ r -> levellingRunSizeToLevel r <= ln+ 1 ) resident
389
+ assertST $ all (\ r -> runToLevelNumber MergePolicyTiering conf r `elem` [ln- 1 , ln]) incoming
390
+ assertST $ all (\ r -> runToLevelNumber MergePolicyLevelling conf r <= ln+ 1 ) resident
407
391
408
392
MergePolicyTiering ->
409
393
case (ir, mrs, mergeTypeForLevel ls ul) of
@@ -412,30 +396,30 @@ invariant _conf (LSMContent _ levels ul) = do
412
396
(Single r, m, _) -> do
413
397
assertST $ case m of CompletedMerge {} -> True
414
398
OngoingMerge {} -> False
415
- assertST $ tieringRunSizeToLevel r == ln
399
+ assertST $ runToLevelNumber MergePolicyTiering conf r == ln
416
400
417
401
-- A completed last level run can be of almost any smaller size due
418
402
-- to deletions, but it can't be bigger than the next level down.
419
403
-- Note that tiering on the last level only occurs when there is
420
404
-- a single level only.
421
405
(_, CompletedMerge r, MergeLastLevel ) -> do
422
406
assertST $ ln == 1
423
- assertST $ tieringRunSizeToLevel r <= ln+ 1
407
+ assertST $ runToLevelNumber MergePolicyTiering conf r <= ln+ 1
424
408
425
409
-- A completed mid level run is usually of the size for the
426
410
-- level it is entering, but can also be one smaller (in which case
427
411
-- it'll be held back and merged again) or one larger (because it
428
412
-- includes a run that has been held back before).
429
413
(_, CompletedMerge r, MergeMidLevel ) ->
430
- assertST $ tieringRunSizeToLevel r `elem` [ln- 1 , ln, ln+ 1 ]
414
+ assertST $ runToLevelNumber MergePolicyTiering conf r `elem` [ln- 1 , ln, ln+ 1 ]
431
415
432
416
-- An ongoing merge for tiering should have 4 incoming runs of
433
417
-- the right size for the level below, and at most 1 run held back
434
418
-- due to being too small (which would thus also be of the size of
435
419
-- the level below).
436
420
(_, OngoingMerge _ rs _, _) -> do
437
421
assertST $ length rs == 4 || length rs == 5
438
- assertST $ all (\ r -> tieringRunSizeToLevel r == ln- 1 ) rs
422
+ assertST $ all (\ r -> runToLevelNumber MergePolicyTiering conf r == ln- 1 ) rs
439
423
440
424
-- We don't make many assumptions apart from what the types already enforce.
441
425
-- In particular, there are no invariants on the progress of the merges,
@@ -534,6 +518,107 @@ assert p x = Exc.assert p (const x callStack)
534
518
assertST :: HasCallStack => Bool -> ST s ()
535
519
assertST p = assert p $ return ()
536
520
521
+ -------------------------------------------------------------------------------
522
+ -- Run sizes
523
+ --
524
+
525
+ -- | Compute the maximum size of a run for a given level.
526
+ --
527
+ -- The size of a tiering run at each level is allowed to be
528
+ -- @bufferSize*sizeRatio^(level-1) < size <= bufferSize*sizeRatio^level@.
529
+ --
530
+ -- >>> levelNumberToMaxRunSize MergePolicyTiering (LSMConfig 2) <$> [0, 1, 2, 3, 4]
531
+ -- [0,2,8,32,128]
532
+ --
533
+ -- The @size@ of a levelling run at each level is allowed to be
534
+ -- @bufferSize*sizeRatio^(level-1) < size <= bufferSize*sizeRatio^(level+1)@. A
535
+ -- levelling run can take take up a whole level, so the maximum size of a run is
536
+ -- @sizeRatio@ tmes larger than the maximum size of a tiering run on the same
537
+ -- level.
538
+ --
539
+ -- >>> levelNumberToMaxRunSize MergePolicyLevelling (LSMConfig 2) <$> [0, 1, 2, 3, 4]
540
+ -- [0,8,32,128,512]
541
+ levelNumberToMaxRunSize :: MergePolicy -> LSMConfig -> LevelNo -> Int
542
+ levelNumberToMaxRunSize = \ case
543
+ MergePolicyTiering -> levelNumberToMaxRunSizeTiering
544
+ MergePolicyLevelling -> levelNumberToMaxRunSizeLevelling
545
+
546
+ -- | See 'levelNumberToMaxRunSize'
547
+ levelNumberToMaxRunSizeTiering :: LSMConfig -> LevelNo -> Int
548
+ levelNumberToMaxRunSizeTiering LSMConfig {configMaxWriteBufferSize = bufSize} ln
549
+ | ln < 0 = error " level number must be non-negative"
550
+ | ln == 0 = 0
551
+ | otherwise = fromIntegerChecked (toInteger bufSize * 4 ^ pred (toInteger ln))
552
+ -- Perform the computation with arbitrary precision using 'Integers', but
553
+ -- throw an error if the result does not fit into an 'Int'.
554
+
555
+ -- | See 'maxRunSize'
556
+ levelNumberToMaxRunSizeLevelling :: LSMConfig -> LevelNo -> Int
557
+ levelNumberToMaxRunSizeLevelling conf ln
558
+ | ln < 0 = error " level number must be non-negative"
559
+ | ln == 0 = 0
560
+ | otherwise = levelNumberToMaxRunSizeTiering conf (succ ln)
561
+
562
+ runToLevelNumber :: MergePolicy -> LSMConfig -> Run -> LevelNo
563
+ runToLevelNumber mpl conf run = runSizeToLevelNumber mpl conf (runSize run)
564
+
565
+ -- | Compute the appropriate level for the size of the given run.
566
+ --
567
+ -- See 'maxRunSize' for the bounds on (tiering or levelling) run sizes at each level.
568
+ --
569
+ -- >>> runSizeToLevelNumber MergePolicyTiering (LSMConfig 2) <$> [0,2,8,32,128]
570
+ -- [0,1,2,3,4]
571
+ --
572
+ -- >>> runSizeToLevelNumber MergePolicyLevelling (LSMConfig 2) <$> [0,8,32,128,512]
573
+ -- [0,1,2,3,4]
574
+ runSizeToLevelNumber :: MergePolicy -> LSMConfig -> Int -> LevelNo
575
+ runSizeToLevelNumber = \ case
576
+ MergePolicyTiering -> runSizeToLevelNumberTiering
577
+ MergePolicyLevelling -> runSizeToLevelNumberLevelling
578
+
579
+ -- | See 'runSizeToLevelNumber'.
580
+ runSizeToLevelNumberTiering :: LSMConfig -> Int -> LevelNo
581
+ runSizeToLevelNumberTiering conf n
582
+ | n < 0 = error " run size must be positive"
583
+ | n == 0 = 0
584
+ -- TODO: enumerating level numbers is potentially costly, but it does gives a
585
+ -- precise answer, where we'd otherwise have to deal with Double rounding
586
+ -- errors in computing @ln = logBase 4 (n / configMaxWriteBufferSize) + 1@
587
+ | otherwise = head $ -- the list is guaranteed to be non-empty
588
+ [ ln
589
+ | ln <- [1 .. ]
590
+ , levelNumberToMaxRunSizeTiering conf (ln - 1 ) < n
591
+ , n <= levelNumberToMaxRunSizeTiering conf ln
592
+ ]
593
+
594
+ -- | See 'runSizeToLevelNumber'.
595
+ runSizeToLevelNumberLevelling :: LSMConfig -> Int -> LevelNo
596
+ runSizeToLevelNumberLevelling conf n
597
+ | n < 0 = error " run size must be positive"
598
+ | n == 0 = 0
599
+ | otherwise = head $
600
+ [ ln
601
+ | ln <- [1 .. ]
602
+ , levelNumberToMaxRunSizeLevelling conf (ln - 1 ) < n
603
+ , n <= levelNumberToMaxRunSizeLevelling conf ln
604
+ ]
605
+
606
+ maxBufferSize :: LSMConfig -> Int
607
+ maxBufferSize conf = levelNumberToMaxRunSizeTiering conf 1 -- equal to configMaxWriteBufferSize
608
+
609
+ {-# INLINABLE fromIntegerChecked #-}
610
+ -- | Like 'fromInteger', but throws an error when @(x :: Integer) /= toInteger
611
+ -- (fromInteger x :: b)@.
612
+ fromIntegerChecked :: (HasCallStack , Integral a ) => Integer -> a
613
+ fromIntegerChecked x
614
+ | x'' == x
615
+ = x'
616
+ | otherwise
617
+ = error $ printf " fromIntegerChecked: conversion failed, %s /= %s" (show x) (show x'')
618
+ where
619
+ x' = fromInteger x
620
+ x'' = toInteger x'
621
+
537
622
-------------------------------------------------------------------------------
538
623
-- Merging credits
539
624
--
@@ -803,7 +888,7 @@ update tr (LSMHandle scr conf lsmr) k op = do
803
888
supplyCreditsLevels (NominalCredit 1 ) ls
804
889
invariant conf content
805
890
let wb' = Map. insertWith combine k op wb
806
- if bufferSize wb' >= maxBufferSize
891
+ if bufferSize wb' >= maxBufferSize conf
807
892
then do
808
893
ls' <- increment tr sc conf (bufferToRun wb') ls unionLevel
809
894
let content' = LSMContent Map. empty ls' unionLevel
@@ -1222,7 +1307,7 @@ increment tr sc conf run0 ls0 ul = do
1222
1307
1223
1308
-- If r is still too small for this level then keep it and merge again
1224
1309
-- with the incoming runs.
1225
- MergePolicyTiering | tieringRunSizeToLevel r < ln -> do
1310
+ MergePolicyTiering | runToLevelNumber MergePolicyTiering conf r < ln -> do
1226
1311
ir' <- newLevelMerge tr' conf ln MergePolicyTiering (mergeTypeFor ls) (incoming ++ [r])
1227
1312
return (Level ir' rs : ls)
1228
1313
@@ -1246,7 +1331,7 @@ increment tr sc conf run0 ls0 ul = do
1246
1331
-- run is too large for this level, we promote the run to the next
1247
1332
-- level and start merging the incoming runs into this (otherwise
1248
1333
-- empty) level .
1249
- MergePolicyLevelling | levellingLevelIsFull ln incoming r -> do
1334
+ MergePolicyLevelling | levellingLevelIsFull conf ln incoming r -> do
1250
1335
assert (null rs && null ls) $ return ()
1251
1336
ir' <- newLevelMerge tr' conf ln MergePolicyTiering MergeMidLevel incoming
1252
1337
ls' <- go (ln+ 1 ) [r] []
@@ -1267,7 +1352,7 @@ newLevelMerge :: Tracer (ST s) EventDetail
1267
1352
-> Int -> MergePolicy -> LevelMergeType
1268
1353
-> [Run ] -> ST s (IncomingRun s )
1269
1354
newLevelMerge _ _ _ _ _ [r] = return (Single r)
1270
- newLevelMerge tr _conf level mergePolicy mergeType rs = do
1355
+ newLevelMerge tr conf level mergePolicy mergeType rs = do
1271
1356
assertST (length rs `elem` [4 , 5 ])
1272
1357
mergingRun@ (MergingRun _ physicalDebt _) <- newMergingRun mergeType rs
1273
1358
assertST (totalDebt physicalDebt <= maxPhysicalDebt)
@@ -1283,7 +1368,7 @@ newLevelMerge tr _conf level mergePolicy mergeType rs = do
1283
1368
-- The nominal debt equals the minimum of credits we will supply before we
1284
1369
-- expect the merge to complete. This is the same as the number of updates
1285
1370
-- in a run that gets moved to this level.
1286
- nominalDebt = NominalDebt (tieringRunSize level)
1371
+ nominalDebt = NominalDebt (levelNumberToMaxRunSize MergePolicyTiering conf level)
1287
1372
1288
1373
-- The physical debt is the number of actual merge steps we will need to
1289
1374
-- perform before the merge is complete. This is always the sum of the
@@ -1297,18 +1382,18 @@ newLevelMerge tr _conf level mergePolicy mergeType rs = do
1297
1382
-- includes the single run in the current level.
1298
1383
maxPhysicalDebt =
1299
1384
case mergePolicy of
1300
- MergePolicyLevelling -> 4 * tieringRunSize (level- 1 )
1301
- + levellingRunSize level
1302
- MergePolicyTiering -> length rs * tieringRunSize (level- 1 )
1385
+ MergePolicyLevelling -> 4 * levelNumberToMaxRunSize MergePolicyTiering conf (level- 1 )
1386
+ + levelNumberToMaxRunSize MergePolicyLevelling conf level
1387
+ MergePolicyTiering -> length rs * levelNumberToMaxRunSize MergePolicyTiering conf (level- 1 )
1303
1388
1304
1389
-- | Only based on run count, not their sizes.
1305
1390
tieringLevelIsFull :: Int -> [Run ] -> [Run ] -> Bool
1306
1391
tieringLevelIsFull _ln _incoming resident = length resident >= 4
1307
1392
1308
1393
-- | The level is only considered full once the resident run is /too large/ for
1309
1394
-- the level.
1310
- levellingLevelIsFull :: Int -> [Run ] -> Run -> Bool
1311
- levellingLevelIsFull ln _incoming resident = levellingRunSizeToLevel resident > ln
1395
+ levellingLevelIsFull :: LSMConfig -> Int -> [Run ] -> Run -> Bool
1396
+ levellingLevelIsFull conf ln _incoming resident = runToLevelNumber MergePolicyLevelling conf resident > ln
1312
1397
1313
1398
-------------------------------------------------------------------------------
1314
1399
-- MergingTree abstraction
0 commit comments