Skip to content

Commit 71f541a

Browse files
wenkokkejorisdral
andcommitted
fix(issue392): process feedback on PR
Co-authored-by: Joris Dral <[email protected]>
1 parent a4b5508 commit 71f541a

File tree

11 files changed

+265
-249
lines changed

11 files changed

+265
-249
lines changed

doc/format-directory.md

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,12 @@ Each snapshot contains a metadata file for the snapshot overall and a checksum:
7777
* `${session}/snapshots/${name}/snapshot`
7878
* `${session}/snapshots/${name}/snapshot.checksum`
7979

80+
plus three files for the serialised write buffer:
81+
82+
* `${session}/snapshots/${name}/${n}.keyops`
83+
* `${session}/snapshots/${name}/${n}.blobs`
84+
* `${session}/snapshots/${name}/${n}.checksum`
85+
8086
plus the five files for each LSM run in the snapshot:
8187

8288
* `${session}/snapshots/${name}/${n}.keyops`
@@ -85,7 +91,7 @@ plus the five files for each LSM run in the snapshot:
8591
* `${session}/snapshots/${name}/${n}.index`
8692
* `${session}/snapshots/${name}/${n}.checksum`
8793

88-
In this case the LSM run files are numbered from 0 within each snapshot.
94+
In this case the serialised write buffer and the LSM run files are numbered from 0 within each snapshot.
8995

9096
## Snapshot metadata
9197

@@ -97,6 +103,7 @@ The snapshot metadata file contains the following information:
97103
- Page size for all the key/operations files
98104
- Index type and parameters
99105
* Key/value type information for dynamic-type sanity checking
106+
* The number of the serialised write buffer files (should always be 0).
100107
* The shape of the LSM tree overall (runs within levels etc), referencing
101108
the numbered LSM runs.
102109

src-extras/Database/LSMTree/Extras/RunData.hs

Lines changed: 51 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,9 @@ module Database.LSMTree.Extras.RunData (
66
withRun
77
, withRuns
88
, unsafeFlushAsWriteBuffer
9+
-- * Serialise write buffers
10+
, withRunDataAsWriteBuffer
11+
, withSerialisedWriteBuffer
912
-- * RunData
1013
, RunData (..)
1114
, SerialisedRunData
@@ -20,18 +23,24 @@ module Database.LSMTree.Extras.RunData (
2023
, liftShrink2Map
2124
) where
2225

23-
import Control.Exception (bracket)
26+
import Control.Exception (bracket, bracket_)
2427
import Control.Monad
2528
import Control.RefCount
2629
import Data.Bifoldable (Bifoldable (bifoldMap))
2730
import Data.Bifunctor
31+
import Data.Foldable (for_)
32+
import qualified Data.Map as M
2833
import Data.Map.Strict (Map)
2934
import qualified Data.Map.Strict as Map
35+
import qualified Data.Vector as V
3036
import Data.Word (Word64)
3137
import Database.LSMTree.Extras (showPowersOf10)
3238
import Database.LSMTree.Extras.Generators ()
3339
import Database.LSMTree.Internal.Entry
40+
import Database.LSMTree.Internal.Lookup (ResolveSerialisedValue)
41+
import Database.LSMTree.Internal.MergeSchedule (addWriteBufferEntries)
3442
import Database.LSMTree.Internal.Paths
43+
import qualified Database.LSMTree.Internal.Paths as Paths
3544
import Database.LSMTree.Internal.Run (Run, RunDataCaching (..))
3645
import qualified Database.LSMTree.Internal.Run as Run
3746
import Database.LSMTree.Internal.RunAcc (RunBloomFilterAlloc (..),
@@ -40,7 +49,10 @@ import Database.LSMTree.Internal.RunNumber
4049
import Database.LSMTree.Internal.Serialise
4150
import qualified Database.LSMTree.Internal.WriteBuffer as WB
4251
import qualified Database.LSMTree.Internal.WriteBufferBlobs as WBB
52+
import Database.LSMTree.Internal.WriteBufferWriter (writeWriteBuffer)
53+
import qualified System.FS.API as FS
4354
import System.FS.API
55+
import qualified System.FS.BlockIO.API as FS
4456
import System.FS.BlockIO.API
4557
import Test.QuickCheck
4658

@@ -96,6 +108,44 @@ unsafeFlushAsWriteBuffer fs hbio fsPaths (RunData m) = do
96108
releaseRef wbblobs
97109
return run
98110

111+
{-------------------------------------------------------------------------------
112+
Serialise write buffers
113+
-------------------------------------------------------------------------------}
114+
115+
-- | Use 'SerialisedRunData' to 'WriteBuffer' and 'WriteBufferBlobs'.
116+
withRunDataAsWriteBuffer ::
117+
FS.HasFS IO h
118+
-> ResolveSerialisedValue
119+
-> WriteBufferFsPaths
120+
-> SerialisedRunData
121+
-> (WB.WriteBuffer -> Ref (WBB.WriteBufferBlobs IO h) -> IO a)
122+
-> IO a
123+
withRunDataAsWriteBuffer hfs f fsPaths rd action = do
124+
let es = V.fromList . M.toList $ unRunData rd
125+
let maxn = NumEntries $ V.length es
126+
let wbbPath = Paths.writeBufferBlobPath fsPaths
127+
bracket (WBB.new hfs wbbPath) releaseRef $ \wbb -> do
128+
(wb, _) <- addWriteBufferEntries hfs f wbb maxn WB.empty es
129+
action wb wbb
130+
131+
-- | Serialise a 'WriteBuffer' and 'WriteBufferBlobs' to disk and perform an 'IO' action.
132+
withSerialisedWriteBuffer ::
133+
FS.HasFS IO h
134+
-> FS.HasBlockIO IO h
135+
-> WriteBufferFsPaths
136+
-> WB.WriteBuffer
137+
-> Ref (WBB.WriteBufferBlobs IO h)
138+
-> IO a
139+
-> IO a
140+
withSerialisedWriteBuffer hfs hbio wbPaths wb wbb =
141+
bracket_ (writeWriteBuffer hfs hbio wbPaths wb wbb) $ do
142+
for_ [ Paths.writeBufferKOpsPath wbPaths
143+
, Paths.writeBufferBlobPath wbPaths
144+
, Paths.writeBufferChecksumsPath wbPaths
145+
] $ \fsPath -> do
146+
fsPathExists <- FS.doesFileExist hfs fsPath
147+
when fsPathExists $ FS.removeFile hfs fsPath
148+
99149
{-------------------------------------------------------------------------------
100150
RunData
101151
-------------------------------------------------------------------------------}

src/Database/LSMTree/Internal.hs

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1160,8 +1160,10 @@ createSnapshot snap label tableType t = do
11601160
(FS.createDirectory hfs (Paths.getNamedSnapshotDir snapDir))
11611161
(\_ -> FS.removeDirectoryRecursive hfs (Paths.getNamedSnapshotDir snapDir))
11621162

1163-
-- Get the table content.
1164-
content <- RW.withReadAccess (tableContent thEnv) pure
1163+
-- Duplicate references to the table content, so that resources do not disappear
1164+
-- from under our feet while taking a snapshot. These references are released
1165+
-- again after the snapshot files/directories are written.
1166+
content <- RW.withReadAccess (tableContent thEnv) (duplicateTableContent reg)
11651167

11661168
-- Snapshot the write buffer.
11671169
let activeDir = Paths.activeDir (tableSessionRoot thEnv)
@@ -1180,6 +1182,9 @@ createSnapshot snap label tableType t = do
11801182
SnapshotMetaDataChecksumFile checksumPath = Paths.snapshotMetaDataChecksumFile snapDir
11811183
writeFileSnapshotMetaData hfs contentPath checksumPath snapMetaData
11821184

1185+
-- Release the table content
1186+
releaseTableContent reg content
1187+
11831188
{-# SPECIALISE openSnapshot ::
11841189
Session IO h
11851190
-> SnapshotLabel
@@ -1231,10 +1236,8 @@ openSnapshot sesh label tableType override snap resolve = do
12311236
let activeDir = Paths.activeDir (sessionRoot seshEnv)
12321237

12331238
-- Read write buffer
1234-
activeWriteBufferNumber <- uniqueToRunNumber <$> incrUniqCounter uc
1235-
let activeWriteBufferPaths = Paths.WriteBufferFsPaths (Paths.getActiveDir activeDir) activeWriteBufferNumber
12361239
let snapWriteBufferPaths = Paths.WriteBufferFsPaths (Paths.getNamedSnapshotDir snapDir) snapWriteBuffer
1237-
(tableWriteBuffer, tableWriteBufferBlobs) <- openWriteBuffer reg resolve hfs hbio snapWriteBufferPaths activeWriteBufferPaths
1240+
(tableWriteBuffer, tableWriteBufferBlobs) <- openWriteBuffer reg resolve hfs hbio uc activeDir snapWriteBufferPaths
12381241

12391242
-- Hard link runs into the active directory,
12401243
snapLevels' <- openRuns reg hfs hbio conf (sessionUniqCounter seshEnv) snapDir activeDir snapLevels

src/Database/LSMTree/Internal/Entry.hs

Lines changed: 0 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@ module Database.LSMTree.Internal.Entry (
33
, hasBlob
44
, onValue
55
, onBlobRef
6-
, traverseBlobRef
76
, NumEntries (..)
87
, unNumEntries
98
-- * Value resolution/merging
@@ -48,17 +47,6 @@ onBlobRef def g = \case
4847
Mupdate{} -> def
4948
Delete -> def
5049

51-
traverseBlobRef ::
52-
Applicative t
53-
=> (blobref -> t blobref')
54-
-> Entry v blobref
55-
-> t (Entry v blobref')
56-
traverseBlobRef f = \case
57-
Insert v -> pure (Insert v)
58-
InsertWithBlob v blobref -> InsertWithBlob v <$> f blobref
59-
Mupdate v -> pure (Mupdate v)
60-
Delete -> pure Delete
61-
6250
instance Bifunctor Entry where
6351
first f = \case
6452
Insert v -> Insert (f v)

src/Database/LSMTree/Internal/Paths.hs

Lines changed: 19 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -30,9 +30,7 @@ module Database.LSMTree.Internal.Paths (
3030
, toChecksumsFile
3131
, fromChecksumsFile
3232
-- * Checksums for WriteBuffer files
33-
, checksumFileNamesForWriteBufferFiles
3433
, toChecksumsFileForWriteBufferFiles
35-
, fromChecksumsFileForWriteBufferFiles
3634
-- * ForRunFiles abstraction
3735
, ForKOps (..)
3836
, ForBlob (..)
@@ -45,22 +43,18 @@ module Database.LSMTree.Internal.Paths (
4543
, forRunIndexRaw
4644
-- * WriteBuffer paths
4745
, WriteBufferFsPaths (WrapRunFsPaths, WriteBufferFsPaths, writeBufferDir, writeBufferNumber)
48-
, pathsForWriteBufferFiles
4946
, writeBufferKOpsPath
5047
, writeBufferBlobPath
5148
, writeBufferChecksumsPath
5249
, writeBufferFilePathWithExt
53-
-- * ForWriteBufferFiles abstraction
54-
, ForWriteBufferFiles (..)
55-
, forWriteBufferKOpsRaw
56-
, forWriteBufferBlobRaw
57-
, writeBufferFileExts
5850
) where
5951

6052
import Control.Applicative (Applicative (..))
6153
import Control.DeepSeq (NFData (..))
54+
import Data.Bifunctor (Bifunctor (..))
6255
import qualified Data.ByteString.Char8 as BS
6356
import Data.Foldable (toList)
57+
import Data.Function ((&))
6458
import qualified Data.Map as Map
6559
import Data.Maybe (fromMaybe)
6660
import Data.String (IsString (..))
@@ -228,7 +222,7 @@ runFileExts = ForRunFiles {
228222
}
229223

230224
{-------------------------------------------------------------------------------
231-
Checksums
225+
Checksums For Run Files
232226
-------------------------------------------------------------------------------}
233227

234228
checksumFileNamesForRunFiles :: ForRunFiles CRC.ChecksumsFileName
@@ -243,18 +237,6 @@ fromChecksumsFile file = for checksumFileNamesForRunFiles $ \name ->
243237
Just crc -> Right crc
244238
Nothing -> Left ("key not found: " <> show name)
245239

246-
checksumFileNamesForWriteBufferFiles :: ForWriteBufferFiles CRC.ChecksumsFileName
247-
checksumFileNamesForWriteBufferFiles = fmap (CRC.ChecksumsFileName . BS.pack) writeBufferFileExts
248-
249-
toChecksumsFileForWriteBufferFiles :: ForWriteBufferFiles CRC.CRC32C -> CRC.ChecksumsFile
250-
toChecksumsFileForWriteBufferFiles = Map.fromList . toList . liftA2 (,) checksumFileNamesForWriteBufferFiles
251-
252-
fromChecksumsFileForWriteBufferFiles :: CRC.ChecksumsFile -> Either String (ForWriteBufferFiles CRC.CRC32C)
253-
fromChecksumsFileForWriteBufferFiles file = for checksumFileNamesForWriteBufferFiles $ \name ->
254-
case Map.lookup name file of
255-
Just crc -> Right crc
256-
Nothing -> Left ("key not found: " <> show name)
257-
258240
{-------------------------------------------------------------------------------
259241
Marker newtypes for individual elements of the ForRunFiles and the
260242
ForWriteBufferFiles abstractions
@@ -314,15 +296,17 @@ pattern WriteBufferFsPaths {writeBufferDir, writeBufferNumber} = WrapRunFsPaths
314296

315297
{-# COMPLETE WriteBufferFsPaths #-}
316298

317-
-- | Paths to all files associated with this run, except 'runChecksumsPath'.
318-
pathsForWriteBufferFiles :: WriteBufferFsPaths -> ForWriteBufferFiles FsPath
319-
pathsForWriteBufferFiles fsPaths = fmap (writeBufferFilePathWithExt fsPaths) writeBufferFileExts
299+
writeBufferKOpsExt :: String
300+
writeBufferKOpsExt = unForKOps . forRunKOps $ runFileExts
301+
302+
writeBufferBlobExt :: String
303+
writeBufferBlobExt = unForBlob . forRunBlob $ runFileExts
320304

321305
writeBufferKOpsPath :: WriteBufferFsPaths -> FsPath
322-
writeBufferKOpsPath = unForKOps . forWriteBufferKOps . pathsForWriteBufferFiles
306+
writeBufferKOpsPath = flip writeBufferFilePathWithExt writeBufferKOpsExt
323307

324308
writeBufferBlobPath :: WriteBufferFsPaths -> FsPath
325-
writeBufferBlobPath = unForBlob . forWriteBufferBlob . pathsForWriteBufferFiles
309+
writeBufferBlobPath = flip writeBufferFilePathWithExt writeBufferBlobExt
326310

327311
writeBufferChecksumsPath :: WriteBufferFsPaths -> FsPath
328312
writeBufferChecksumsPath = flip writeBufferFilePathWithExt "checksums"
@@ -331,28 +315,16 @@ writeBufferFilePathWithExt :: WriteBufferFsPaths -> String -> FsPath
331315
writeBufferFilePathWithExt (WriteBufferFsPaths dir n) ext =
332316
dir </> mkFsPath [show n] <.> ext
333317

334-
writeBufferFileExts :: ForWriteBufferFiles String
335-
writeBufferFileExts = ForWriteBufferFiles
336-
{ forWriteBufferKOps = forRunKOps runFileExts
337-
, forWriteBufferBlob = forRunBlob runFileExts
338-
}
339318

340319
{-------------------------------------------------------------------------------
341-
ForWriteBuffer abstraction
320+
Checksums For Run Files
342321
-------------------------------------------------------------------------------}
343322

344-
-- | Stores someting for each run file (except the checksums file), allowing to
345-
-- easily do something for all of them without mixing them up.
346-
data ForWriteBufferFiles a = ForWriteBufferFiles { forWriteBufferKOps :: !(ForKOps a), forWriteBufferBlob :: !(ForBlob a) }
347-
deriving stock (Show, Foldable, Functor, Traversable)
348-
349-
forWriteBufferKOpsRaw :: ForWriteBufferFiles a -> a
350-
forWriteBufferKOpsRaw = unForKOps . forWriteBufferKOps
351-
352-
forWriteBufferBlobRaw :: ForWriteBufferFiles a -> a
353-
forWriteBufferBlobRaw = unForBlob . forWriteBufferBlob
354-
355-
instance Applicative ForWriteBufferFiles where
356-
pure x = ForWriteBufferFiles (ForKOps x) (ForBlob x)
357-
ForWriteBufferFiles (ForKOps f1) (ForBlob f2) <*> ForWriteBufferFiles (ForKOps x1) (ForBlob x2) =
358-
ForWriteBufferFiles (ForKOps $ f1 x1) (ForBlob $ f2 x2)
323+
toChecksumsFileForWriteBufferFiles :: (ForKOps CRC.CRC32C, ForBlob CRC.CRC32C) -> CRC.ChecksumsFile
324+
toChecksumsFileForWriteBufferFiles checksums =
325+
Map.fromList . toList $ checksums & bimap
326+
((toChecksumsFileName writeBufferKOpsExt,) . unForKOps)
327+
((toChecksumsFileName writeBufferBlobExt,) . unForBlob)
328+
where
329+
toChecksumsFileName :: String -> CRC.ChecksumsFileName
330+
toChecksumsFileName = CRC.ChecksumsFileName . BS.pack

src/Database/LSMTree/Internal/RunReader.hs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -112,8 +112,8 @@ new !offsetKey
112112
let fileSizeInPages = fileSize `div` toEnum pageSize
113113
let indexedPages = getNumPages $ Run.sizeInPages readerRun
114114
assert (indexedPages == fileSizeInPages) $ pure h
115-
-- TODO: Why?
116-
-- Double the file readahead window (only applies to this file descriptor)
115+
-- Advise the OS that this file is being read sequentially, which will
116+
-- double the readahead window in response (only for this file descriptor)
117117
FS.hAdviseAll readerHasBlockIO readerKOpsHandle FS.AdviceSequential
118118

119119
(page, entryNo) <- seekFirstEntry readerKOpsHandle

0 commit comments

Comments
 (0)