WIP: Constant space sort #61

base: master
Failed to load comments.
Failed to load files.
29 changes: 24 additions & 5 deletions ghc-events.cabal
Original file line number Diff line number Diff line change
Expand Up @@ -88,8 +88,11 @@ library
bytestring >= 0.10.4,
array >= 0.2 && < 0.6,
text >= && < 1.3,
vector >= 0.7 && < 0.13
vector >= 0.7 && < 0.13,
exposed-modules: GHC.RTS.Events,
Expand All @@ -101,28 +104,44 @@ library
hs-source-dirs: src
include-dirs: include
extensions: RecordWildCards, NamedFieldPuns, BangPatterns, PatternGuards
extensions: RecordWildCards, NamedFieldPuns, BangPatterns, PatternGuards
other-extensions: FlexibleContexts, CPP
ghc-options: -Wall

executable ghc-events
main-is: GhcEvents.hs
build-depends: ghc-events, base, containers
extensions: RecordWildCards, NamedFieldPuns, BangPatterns, PatternGuards
extensions: RecordWildCards, NamedFieldPuns, BangPatterns, PatternGuards

test-suite test-versions
type: exitcode-stdio-1.0
main-is: TestVersions.hs
other-modules: Utils
hs-source-dirs: ., test
build-depends: ghc-events, base
extensions: RecordWildCards, NamedFieldPuns, BangPatterns, PatternGuards
extensions: RecordWildCards, NamedFieldPuns, BangPatterns, PatternGuards

test-suite write-merge
type: exitcode-stdio-1.0
main-is: WriteMerge.hs
other-modules: Utils
hs-source-dirs: ., test
build-depends: ghc-events, base, bytestring
extensions: RecordWildCards, NamedFieldPuns, BangPatterns, PatternGuards
extensions: RecordWildCards, NamedFieldPuns, BangPatterns, PatternGuards
buildable: False

test-suite roundtrip
type: exitcode-stdio-1.0
main-is: Roundtrip.hs
other-modules: Utils
hs-source-dirs: ., test
build-depends: ghc-events, base
extensions: RecordWildCards, NamedFieldPuns, BangPatterns, PatternGuards

test-suite merge-sort
type: exitcode-stdio-1.0
main-is: Sort.hs
other-modules: Utils
hs-source-dirs: ., test
build-depends: ghc-events, base, bytestring, filepath, temporary
extensions: RecordWildCards, NamedFieldPuns, BangPatterns, PatternGuards
45 changes: 27 additions & 18 deletions src/GHC/RTS/Events/Binary.hs
Original file line number Diff line number Diff line change
Expand Up @@ -916,8 +916,9 @@ putHeader (Header ets) = do
putType n
putE $ fromMaybe 0xffff msz
putE (fromIntegral $ T.length d :: EventTypeDescLen)
putE d
let d' = TE.encodeUtf8 d
putE (fromIntegral $ B.length d' :: EventTypeDescLen)
putByteString d'
-- the event type header allows for extra data, which we don't use:
putE (0 :: Word32)
putMarker EVENT_ET_END
Expand Down Expand Up @@ -1136,9 +1137,10 @@ putEventSpec (WakeupThread t c) = do
putCap c

putEventSpec (ThreadLabel t l) = do
putE (fromIntegral (T.length l) + sz_tid :: Word16)
let l' = TE.encodeUtf8 l
putE (fromIntegral (B.length l') + sz_tid :: Word16)
putE t
putE l
putByteString l'

putEventSpec Shutdown =
return ()
Expand Down Expand Up @@ -1245,21 +1247,24 @@ putEventSpec (CapsetRemoveCap cs cp) = do
putCap cp

putEventSpec (RtsIdentifier cs rts) = do
putE (fromIntegral (T.length rts) + sz_capset :: Word16)
let rts' = TE.encodeUtf8 rts
putE (fromIntegral (B.length rts') + sz_capset :: Word16)
putE cs
putE rts
putByteString rts'

putEventSpec (ProgramArgs cs as) = do
let sz_args = sum $ map ((+ 1) {- for \0 -} . T.length) as
let as' = map TE.encodeUtf8 as
let sz_args = sum (map ((+ 1) {- for \0 -} . B.length) as') - 1
putE (fromIntegral sz_args + sz_capset :: Word16)
putE cs
mapM_ putE (intersperse "\0" as)
mapM_ putByteString (intersperse "\0" as')

putEventSpec (ProgramEnv cs es) = do
let sz_env = sum $ map ((+ 1) {- for \0 -} . T.length) es
let es' = map TE.encodeUtf8 es
let sz_env = sum (map ((+ 1) {- for \0 -} . B.length) es') - 1
putE (fromIntegral sz_env + sz_capset :: Word16)
putE cs
mapM_ putE $ intersperse "\0" es
mapM_ putByteString $ intersperse "\0" es'

putEventSpec (OsProcessPid cs pid) = do
putE cs
Expand All @@ -1275,16 +1280,19 @@ putEventSpec (WallClockTime cs sec nsec) = do
putE nsec

putEventSpec (Message s) = do
putE (fromIntegral (T.length s) :: Word16)
putE s
let s' = TE.encodeUtf8 s
putE (fromIntegral (B.length s') :: Word16)
putByteString s'

putEventSpec (UserMessage s) = do
putE (fromIntegral (T.length s) :: Word16)
putE s
let s' = TE.encodeUtf8 s
putE (fromIntegral (B.length s') :: Word16)
putByteString s'

putEventSpec (UserMarker s) = do
putE (fromIntegral (T.length s) :: Word16)
putE s
let s' = TE.encodeUtf8 s
putE (fromIntegral (B.length s') :: Word16)
putByteString s'

putEventSpec (UnknownEvent {}) = error "putEventSpec UnknownEvent"

Expand Down Expand Up @@ -1388,9 +1396,10 @@ putEventSpec MerCapSleeping = return ()
putEventSpec MerCallingMain = return ()

putEventSpec PerfName{..} = do
putE (fromIntegral (T.length name) + sz_perf_num :: Word16)
let name' = TE.encodeUtf8 name
putE (fromIntegral (B.length name') + sz_perf_num :: Word16)
putE perfNum
putE name
putByteString name'

putEventSpec PerfCounter{..} = do
putE perfNum
Expand Down
3 changes: 3 additions & 0 deletions src/GHC/RTS/Events/Incremental.hs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,9 @@ module GHC.RTS.Events.Incremental
, readHeader
, readEvents
, readEventLog

-- * Low-level API
, mkEventDecoder
) where
import Control.Monad
import Data.Either
Expand Down
188 changes: 188 additions & 0 deletions src/GHC/RTS/Events/Sort.hs
Original file line number Diff line number Diff line change
@@ -0,0 +1,188 @@
-- | Constant-space sorting.
-- This module provides a routine for sorting events in constant-space via
-- on-disk merge sort.
module GHC.RTS.Events.Sort
( sortEvents
, sortEvents'
, SortParams(..)
, defaultSortParams
) where

import Data.Traversable
import Data.Coerce
import Data.Function (on)
import Data.List (sortBy, minimumBy)
import Data.Maybe
import Data.Foldable (toList)
import System.IO
import System.IO.Temp
import System.Directory
import Prelude

import Data.Binary.Put as P
import qualified Data.ByteString.Lazy as BSL
import qualified Data.Sequence as S

import GHC.RTS.Events hiding (sortEvents)
import GHC.RTS.Events.Binary (putEventLog)

type SortedChunk = FilePath

newtype OnTime = OnTime Event

instance Ord OnTime where
compare = coerce (compare `on` evTime)

instance Eq OnTime where
(==) = coerce ((==) `on` evTime)

-- | Parameters which determine the behavior of the merge sort.
data SortParams = SortParams
{ -- | The chunk size which the input eventlog is broken into (in events). This
-- determines the upper-bound on memory usage during the sorting process.
-- This value is a reasonable trade-off between memory and computation,
-- requiring approximately 100MBytes while sorting a "typical" eventlog.
chunkSize :: !Int

-- | Maximum number of chunks to merge at once. Determined by the largest
-- number of file descriptors we can safely open at once.
, maxFanIn :: !Int

-- | A reasonable set of sorting parameters.
defaultSortParams :: SortParams
defaultSortParams =
SortParams { chunkSize = 500*1000
, maxFanIn = 256

-- | @sortEvents outPath eventlog@ sorts @eventlog@ via on-disk merge
-- sort. The sorted eventlog is written to @eventlog@. The system's temporary
-- directory is used for temporary data. See 'sortEvents\'' for more control.
:: FilePath -- ^ output eventlog file path
-> EventLog -- ^ eventlog to sort
-> IO ()
sortEvents outPath eventLog =
withSystemTempDirectory "sort-events" $ \tmpDir ->
sortEvents' defaultSortParams tmpDir outPath eventLog

-- | @sortEvents' params tmpDir outPath eventlog@ sorts
-- @eventlog@ via on-disk merge sort, using @tmpDir@ for
-- intermediate data. The caller is responsible for deleting @tmpDir@ upon
-- return.
-- The sorted eventlog is written to @eventlog@.
:: SortParams
-> FilePath -- ^ temporary directory
-> FilePath -- ^ output eventlog file path
-> EventLog -- ^ eventlog to sort
-> IO ()
sortEvents' _params _tmpDir _outPath (EventLog _ (Data [])) = fail "sortEvents: no events"
sortEvents' params tmpDir outPath (EventLog hdr (Data events0)) = do
chunks <- toSortedChunks events0
hdl <- openBinaryFile outPath WriteMode
mergeChunks' hdl chunks
hClose hdl
return ()
SortParams chunkSize fanIn = params

toSortedChunks :: [Event] -> IO (S.Seq SortedChunk)
toSortedChunks =
fmap S.fromList
. mapM (writeTempChunk . sortEventsInMem)
. chunksOf chunkSize

mergeChunks' :: Handle -> S.Seq SortedChunk -> IO ()
mergeChunks' destFile chunks
| S.null chunks =
fail "sortEvents: this can't happen"
| S.length chunks <= fanIn = do
events <- mapM readChunk chunks
let sorted = mergeSort $ toList (coerce events :: S.Seq [OnTime])
writeChunk destFile (coerce sorted)
mapM_ removeFile chunks
hClose destFile
| otherwise = do
chunksss <- flip mapM (nChunks fanIn chunks) $ \fps -> do
(fp, hdl) <- createTempChunk
mergeChunks' hdl fps
return fp
mergeChunks' destFile (S.fromList chunksss)

readChunk :: SortedChunk -> IO [Event]
readChunk fp = do
result <- readEventLogFromFile fp
case result of
Left err -> fail $ "sortEvents: error reading chunk: " ++ fp ++ ": " ++ err
Right (EventLog _ (Data events)) -> return events

createTempChunk :: IO (FilePath, Handle)
createTempChunk =
openBinaryTempFile tmpDir "chunk"

writeTempChunk :: [Event] -> IO FilePath
writeTempChunk evs = do
(fp, hdl) <- createTempChunk
writeChunk hdl evs
hClose hdl
return fp

writeChunk :: Handle -> [Event] -> IO ()
writeChunk hdl events =
BSL.hPutStr hdl
$ P.runPut
$ putEventLog
$ EventLog hdr
$ Data events

-- | An unordered set.
type Bag a = [a]

-- | Break a list in chunks of the given size.
chunksOf :: Int -> [a] -> [[a]]
chunksOf _ [] = []
chunksOf n xs =
let (ys, rest) = splitAt n xs
in ys : chunksOf n rest

-- | Break a 'S.Seq' into \(n\) roughly-even chunks.
nChunks :: Int -> S.Seq a -> [S.Seq a]
nChunks n xs0 = go xs0
go :: S.Seq a -> [S.Seq a]
go xs
| S.null xs = []
| otherwise = let (x,y) = S.splitAt len xs in x : go y
len = S.length xs0 `div` n + 1

-- | Merge the given lists into sorted order.
mergeSort :: Ord a => Bag [a] -> [a]
mergeSort = go
go [] = []
go xss =
case catMaybes $ mapZipper f xss of
[] -> []
xs -> minimumBy (compare `on` head) xs

f :: Ord a => Bag [a] -> [a] -> Maybe [a]
f _ [] = Nothing
f rest (x:xs) = Just $ x : go (xs : rest)

mapZipper :: (Bag a -> a -> b) -> Bag a -> [b]
mapZipper f = go []
--go :: Bag a -> Bag [a] -> [b]
go _prevs [] = []
go prevs (x:nexts) =
f (prevs ++ nexts) x : go (x : prevs) nexts

sortEventsInMem :: [Event] -> [Event]
sortEventsInMem =
sortBy (compare `on` evTime)

22 changes: 22 additions & 0 deletions test/Roundtrip.hs
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
import Control.Monad
import System.Exit

import GHC.RTS.Events
import GHC.RTS.Events.Incremental
import Utils (files, diffLines)

-- | Check that an eventlog round-trips through encoding/decoding.
checkRoundtrip :: FilePath -> IO Bool
checkRoundtrip logFile = do
putStrLn logFile
Right eventlog <- readEventLogFromFile logFile
let Right (roundtripped, _) = readEventLog $ serialiseEventLog eventlog
let getEvents = sortEvents . events . dat
if show roundtripped == show eventlog
then return True
else putStrLn "bad" >> return False

main :: IO ()
main = do
successes <- mapM checkRoundtrip files
unless (and successes) exitFailure