From 9a9a3f3c59caafcdb53b6def244ed13b7d4a228b Mon Sep 17 00:00:00 2001 From: Xander van der Goot Date: Tue, 24 May 2022 17:38:29 +0200 Subject: [PATCH 1/4] Add support for multiple LZ4 frames. Multiple LZ4 frames are useful whenever you want to chunk a ByteString into parts that are individually decompressable. --- src/Codec/Compression/LZ4/Conduit.hsc | 120 ++++++++++++++++++-------- test/Main.hs | 70 ++++++++++++--- 2 files changed, 146 insertions(+), 44 deletions(-) diff --git a/src/Codec/Compression/LZ4/Conduit.hsc b/src/Codec/Compression/LZ4/Conduit.hsc index 493a7f0..6e18d61 100644 --- a/src/Codec/Compression/LZ4/Conduit.hsc +++ b/src/Codec/Compression/LZ4/Conduit.hsc @@ -1,6 +1,5 @@ {-# LANGUAGE LambdaCase #-} {-# LANGUAGE MultiWayIf #-} -{-# LANGUAGE PartialTypeSignatures #-} {-# LANGUAGE QuasiQuotes #-} {-# LANGUAGE ScopedTypeVariables #-} {-# LANGUAGE TemplateHaskell #-} @@ -69,12 +68,15 @@ module Codec.Compression.LZ4.Conduit , lz4DefaultPreferences , compress + , compressMultiFrame , compressYieldImmediately , compressWithOutBufferSize + , compressWithOutBufferSizeMultiFrame , decompress , bsChunksOf + , ignoreFlush -- * Internals , Lz4FrameCompressionContext(..) @@ -90,19 +92,16 @@ module Codec.Compression.LZ4.Conduit ) where import UnliftIO.Exception (throwString, bracket) +import UnliftIO.IORef (newIORef, readIORef, modifyIORef') +import Conduit import Control.Monad (foldM, when) -import Control.Monad.IO.Unlift (MonadUnliftIO) -import Control.Monad.IO.Class (liftIO) -import Control.Monad.Trans.Resource (MonadResource) import Data.Bits (testBit) import Data.ByteString (ByteString, packCStringLen) import Data.ByteString.Unsafe (unsafePackCString, unsafeUseAsCStringLen) import qualified Data.ByteString as BS import qualified Data.ByteString.Char8 as BS8 import qualified Data.ByteString.Lazy as BSL -import Data.Conduit import qualified Data.Conduit.Binary as CB -import Data.Monoid ((<>)) import Foreign.C.Types (CChar, CSize) import Foreign.ForeignPtr (ForeignPtr, addForeignPtrFinalizer, mallocForeignPtr, mallocForeignPtrBytes, finalizeForeignPtr, withForeignPtr) import Foreign.Marshal.Alloc (alloca, allocaBytes, malloc, free) @@ -333,10 +332,30 @@ lz4fCompressEnd (ScopedLz4FrameCompressionContext ctx) footerBuf footerBufLen = -- an arbitrarily large amount of input data, as long as the destination -- buffer is large enough. - +-- | +-- Compresss the incoming stream of ByteStrings and put it into a single LZ4 frame. compress :: (MonadUnliftIO m, MonadResource m) => ConduitT ByteString ByteString m () compress = compressWithOutBufferSize 0 +-- | Compress the incoming stream of ByteStrings and mark the end of a LZ4 frame with a @Conduit.Flush@. +-- Returns a list of byte offsets that point to the start of each LZ4 frame header. +-- +-- Multiple LZ4 frames are useful whenever you want to chunk a ByteString into parts that are individually decompressable. +compressMultiFrame :: (MonadUnliftIO m, MonadResource m) => ConduitT (Flush ByteString) ByteString m [Int] +{-# INLINE compressMultiFrame #-} +compressMultiFrame = do + counterRef <- newIORef 0 + offsetsRef <- newIORef [0] + compressWithOutBufferSizeMultiFrame 0 .| awaitForever (\case + Flush -> do + count <- readIORef counterRef + modifyIORef' offsetsRef (count:) + Chunk bs -> do + modifyIORef' counterRef (BS.length bs +) + yield bs) + offsets <- readIORef offsetsRef + return (reverse . tail $ offsets) + withLz4CtxAndPrefsConduit :: (MonadUnliftIO m, MonadResource m) @@ -455,9 +474,24 @@ bsChunksOf chunkSize bs -- Setting `bufferSize = 0` is the legitimate way to set the output buffer -- size to be the minimum required to compress 16 KB inputs and is still a -- fast default. + compressWithOutBufferSize :: forall m . (MonadUnliftIO m, MonadResource m) => CSize -> ConduitT ByteString ByteString m () -compressWithOutBufferSize bufferSize = +compressWithOutBufferSize bufferSize = Conduit.mapC Chunk .| compressWithOutBufferSizeMultiFrame bufferSize .| ignoreFlush + +ignoreFlush :: Monad m => ConduitT (Flush a) a m () +ignoreFlush = awaitForever $ \case + Chunk a -> yield a + Flush -> return () + +data FrameState = EndStream CSize | NewFrame CSize + +-- | Compress the incoming stream of ByteStrings and delineate the end of a LZ4 frame with a @Conduit.Flush@. +compressWithOutBufferSizeMultiFrame :: forall m . (MonadUnliftIO m, MonadResource m) => CSize -> ConduitT (Flush ByteString) (Flush ByteString) m () +compressWithOutBufferSizeMultiFrame bufferSize = withLz4CtxAndPrefsConduit $ \(ctx, prefs) -> do + -- From the LZ4 manual: + -- "A successful call to LZ4F_compressEnd() makes cctx available again for another compression task."" + -- Therefore we only need a single compression context (ctx) even though we use it for multiple LZ4 frames. -- We split any incoming ByteString into chunks of this size, so that -- we can pass this size to `lz4fCompressBound` once and reuse a buffer @@ -469,41 +503,44 @@ compressWithOutBufferSize bufferSize = outBuf <- liftIO $ mallocForeignPtrBytes (fromIntegral outBufferSize) let withOutBuf f = liftIO $ withForeignPtr outBuf f + let yieldOutBuf outBufLen = do outBs <- withOutBuf $ \buf -> packCStringLen (buf, fromIntegral outBufLen) - yield outBs + yield (Chunk outBs) - headerSize <- withOutBuf $ \buf -> lz4fCompressBegin ctx prefs buf outBufferSize + let writeHeader = withOutBuf $ \buf -> lz4fCompressBegin ctx prefs buf outBufferSize let writeFooterAndYield remainingCapacity = do - let offset = fromIntegral $ outBufferSize - remainingCapacity - footerWritten <- withOutBuf $ \buf -> lz4fCompressEnd ctx (buf `plusPtr` offset) remainingCapacity - let outBufLen = outBufferSize - remainingCapacity + footerWritten - yieldOutBuf outBufLen + let writeFooterToBuffer capacity = do + let offset = fromIntegral $ outBufferSize - capacity + footerWritten <- withOutBuf $ \buf -> lz4fCompressEnd ctx (buf `plusPtr` offset) capacity + let outBufLen = outBufferSize - capacity + footerWritten + yieldOutBuf outBufLen + yield Flush -- denote the end of a frame. + + -- Passing srcSize==0 provides bound for LZ4F_compressEnd(), + -- see docs of LZ4F_compressBound() for that. + footerSize <- liftIO $ lz4fCompressBound 0 prefs + + if remainingCapacity >= footerSize + then do + writeFooterToBuffer remainingCapacity + else do + -- Footer doesn't fit: Yield buffer, put footer into now-free buffer + yieldOutBuf (outBufferSize - remainingCapacity) + writeFooterToBuffer outBufferSize let loop remainingCapacity = do await >>= \case - Nothing -> do - -- Done, write footer. - - -- Passing srcSize==0 provides bound for LZ4F_compressEnd(), - -- see docs of LZ4F_compressBound() for that. - footerSize <- liftIO $ lz4fCompressBound 0 prefs - - if remainingCapacity >= footerSize - then do - writeFooterAndYield remainingCapacity - else do - -- Footer doesn't fit: Yield buffer, put footer into now-free buffer - yieldOutBuf (outBufferSize - remainingCapacity) - writeFooterAndYield outBufferSize + Nothing -> return $ EndStream remainingCapacity + Just (Flush) -> return $ NewFrame remainingCapacity - Just bs -> do + Just (Chunk bs) -> do let bss = bsChunksOf (fromIntegral bsInChunkSize) bs newRemainingCapacity <- foldM compressSingleBs remainingCapacity bss loop newRemainingCapacity - compressSingleBs :: CSize -> ByteString -> ConduitM i ByteString m CSize + compressSingleBs :: CSize -> ByteString -> ConduitM i (Flush ByteString) m CSize compressSingleBs remainingCapacity bs | remainingCapacity < compressBound = do -- Not enough space in outBuf to guarantee that the next call @@ -514,7 +551,7 @@ compressWithOutBufferSize bufferSize = | otherwise = do compressSingleBsFitting remainingCapacity bs - compressSingleBsFitting :: CSize -> ByteString -> ConduitM i ByteString m CSize + compressSingleBsFitting :: CSize -> ByteString -> ConduitM i (Flush ByteString) m CSize compressSingleBsFitting remainingCapacity bs = do when (remainingCapacity < compressBound) $ error "precondition violated" @@ -531,7 +568,16 @@ compressWithOutBufferSize bufferSize = let newRemainingCapacity = remainingCapacity - written return newRemainingCapacity - loop (outBufferSize - headerSize) + let writeFrame = do + headerSize <- writeHeader + status <- loop (outBufferSize - headerSize) + case status of + EndStream capacity -> writeFooterAndYield capacity + NewFrame capacity -> Conduit.peekC >>= \case -- Only create a new frame if there's still data left. + Nothing -> writeFooterAndYield capacity + Just _ -> writeFooterAndYield capacity >> writeFrame + + writeFrame @@ -595,7 +641,8 @@ lz4fDecompress (Lz4FrameDecompressionContext ctxForeignPtr) dstBuffer dstSizePtr return decompressSizeHint --- | TODO check why decompressSizeHint is always 4 +-- | Decompress a ByteString that uses the LZ4 frame format (both single and multiple frames). +-- TODO check why decompressSizeHint is always 4 decompress :: (MonadUnliftIO m, MonadResource m) => ConduitT ByteString ByteString m () decompress = do ctx <- liftIO lz4fCreateDecompressionContext @@ -659,7 +706,7 @@ decompress = do poke dstBufferSizePtr size peek dstBufferPtr - let loopSingleBs :: CSize -> ByteString -> _ + let loopSingleBs :: (MonadIO m) => CSize -> ByteString -> ConduitT i ByteString m CSize loopSingleBs decompressSizeHint bs = do (outBs, srcRead, newDecompressSizeHint) <- liftIO $ unsafeUseAsCStringLen bs $ \(srcBuffer, srcSize) -> do @@ -700,3 +747,8 @@ decompress = do -- Force resource release here to guarantee memory constantness -- of the conduit (and not rely on GC to do it "at some point in the future"). liftIO $ finalizeForeignPtr (unLz4FrameDecompressionContext ctx) + + -- | Check if there is another frame to decode. + peekC >>= \case + Nothing -> return () + Just _ -> decompress diff --git a/test/Main.hs b/test/Main.hs index e407cc8..56ccf05 100644 --- a/test/Main.hs +++ b/test/Main.hs @@ -1,22 +1,21 @@ {-# LANGUAGE OverloadedStrings #-} +{-# LANGUAGE LambdaCase #-} module Main (main) where -import Codec.Compression.LZ4.Conduit (compress, decompress, bsChunksOf) -import Control.Monad (when) -import Control.Monad.IO.Unlift (MonadUnliftIO) -import Control.Monad.Trans.Resource (ResourceT, runResourceT) +import Codec.Compression.LZ4.Conduit (compress, compressMultiFrame, decompress, bsChunksOf) +import Conduit +import Control.Monad (when, void) import Data.ByteString (ByteString) import qualified Data.ByteString as BS import qualified Data.ByteString.Lazy as BSL import qualified Data.ByteString.Lazy.Char8 as BSL8 -import Data.Conduit import Data.Conduit.Binary as CB import qualified Data.Conduit.List as CL import qualified Data.Conduit.Process as CP import Data.List (intersperse) import Test.Hspec -import Test.Hspec.QuickCheck (modifyMaxSize) +import Test.Hspec.QuickCheck (modifyMaxSize, prop) import qualified Test.QuickCheck as QC import qualified Test.QuickCheck.Monadic as QCM @@ -25,13 +24,22 @@ import qualified Test.QuickCheck.Monadic as QCM runCompressToLZ4 :: (MonadUnliftIO m) => ConduitT () ByteString (ResourceT m) () -> m ByteString runCompressToLZ4 source = runResourceT $ do - (_, result, _) <- CP.sourceCmdWithStreams "lz4 -d" (source .| compress) CL.consume CL.consume - return $ BS.concat result + (_, result, _) <- CP.sourceCmdWithStreams "lz4 -d" (source .| compress) foldC foldC + return result + +-- | Create a new LZ4 frame every len bytes. +createFramesOfCE :: (MonadIO m, Monad m) => Int -> ConduitT ByteString (Flush ByteString) m () +createFramesOfCE len = chunksOfCE len .| awaitForever (\x -> yield (Chunk x) >> yield Flush) + +runCompressMultiFrameToLZ4 :: (MonadUnliftIO m) => ConduitT () ByteString (ResourceT m) () -> m ByteString +runCompressMultiFrameToLZ4 source = runResourceT $ do + (_, result, _) <- CP.sourceCmdWithStreams "lz4 -d" (source .| createFramesOfCE 1000 .| void compressMultiFrame) foldC foldC + return result runLZ4ToDecompress :: (MonadUnliftIO m) => ConduitT () ByteString (ResourceT m) () -> m ByteString runLZ4ToDecompress source = runResourceT $ do - (_, result, _) <- CP.sourceCmdWithStreams "lz4 -c" source (decompress .| CL.consume) CL.consume - return $ BS.concat result + (_, result, _) <- CP.sourceCmdWithStreams "lz4 -c" source (decompress .| foldC) foldC + return result main :: IO () main = do @@ -74,6 +82,33 @@ main = do actual <- runCompressToLZ4 (CB.sourceLbs $ BSL.fromStrict bs) actual `shouldBe` bs + describe "Compression MultiFrame" $ do + it "compresses simple string" $ do + let string = "hellohellohellohello" + actual <- runCompressMultiFrameToLZ4 (yield string) + actual `shouldBe` string + + it "compresses 100000 integers" $ do + let strings = prepare $ map (BSL8.pack . show) [1..100000 :: Int] + actual <- runCompressMultiFrameToLZ4 (CL.sourceList strings) + actual `shouldBe` (BS.concat strings) + + it "compresses 100000 strings" $ do + let strings = prepare $ replicate 100000 "hello" + actual <- runCompressMultiFrameToLZ4 (CL.sourceList strings) + actual `shouldBe` (BS.concat strings) + + it "compresses 1MB ByteString" $ do + let bs = BS.replicate 100000 42 + actual <- runCompressToLZ4 (CB.sourceLbs $ BSL.fromStrict bs) + actual `shouldBe` bs + + it "compresses 5GiB ByteString" $ do -- more than 32-bit many Bytes + skipBigmemTest + let bs = BS.replicate (5 * 1024*1024*1024) 42 + actual <- runCompressMultiFrameToLZ4 (CB.sourceLbs $ BSL.fromStrict bs) + actual `shouldBe` bs + describe "Decompression" $ do it "decompresses simple string" $ do let string = "hellohellohellohello" @@ -107,3 +142,18 @@ main = do let bs = BSL.toChunks $ BSL8.pack string actual <- QCM.run (runConduitRes $ CL.sourceList bs .| compress .| decompress .| CL.consume) QCM.assert(BS.concat bs == BS.concat actual) + + describe "Identity MultiFrame" $ + modifyMaxSize (const 10000) $ prop "compress and decompress arbitrary strings" $ \string -> do + let bs = BS.pack string + actual <- runConduitRes $ yield bs .| createFramesOfCE 10 .| void compressMultiFrame .| decompress .| foldC + actual `shouldBe` bs + + describe "Slice a multiframe" $ + it "Decompress only the second frame." $ do + let bs = BS.pack $ replicate 10 0 ++ replicate 10 1 ++ replicate 10 2 + (offsets, cbs) <- runConduitRes $ yield bs .| createFramesOfCE 10 .| compressMultiFrame `fuseBoth` foldC + let mark = offsets !! 1 + mark2 = offsets !! 2 + actual <- runConduitRes $ yield cbs .| (dropCE mark >> takeCE (mark2 - mark) .| decompress .| foldC) + actual `shouldBe` (BS.pack $ replicate 10 1) From d3111c65e76280fa6583a3de7b1404e1746fce79 Mon Sep 17 00:00:00 2001 From: Xander van der Goot Date: Fri, 3 Jun 2022 17:10:26 +0200 Subject: [PATCH 2/4] compressMultiFrame: return total length. --- src/Codec/Compression/LZ4/Conduit.hsc | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/src/Codec/Compression/LZ4/Conduit.hsc b/src/Codec/Compression/LZ4/Conduit.hsc index 6e18d61..3d8fc67 100644 --- a/src/Codec/Compression/LZ4/Conduit.hsc +++ b/src/Codec/Compression/LZ4/Conduit.hsc @@ -338,7 +338,8 @@ compress :: (MonadUnliftIO m, MonadResource m) => ConduitT ByteString ByteString compress = compressWithOutBufferSize 0 -- | Compress the incoming stream of ByteStrings and mark the end of a LZ4 frame with a @Conduit.Flush@. --- Returns a list of byte offsets that point to the start of each LZ4 frame header. +-- Returns a list of byte offsets that point to the start of each LZ4 frame header, and the last entry is +-- the total length of the compressed ByteString. -- -- Multiple LZ4 frames are useful whenever you want to chunk a ByteString into parts that are individually decompressable. compressMultiFrame :: (MonadUnliftIO m, MonadResource m) => ConduitT (Flush ByteString) ByteString m [Int] @@ -354,7 +355,7 @@ compressMultiFrame = do modifyIORef' counterRef (BS.length bs +) yield bs) offsets <- readIORef offsetsRef - return (reverse . tail $ offsets) + return (reverse offsets) withLz4CtxAndPrefsConduit :: From 94520ebf8ad6121517f23be5ac40c72507555e19 Mon Sep 17 00:00:00 2001 From: Xander van der Goot Date: Fri, 3 Jun 2022 17:16:06 +0200 Subject: [PATCH 3/4] fixup! Add support for multiple LZ4 frames. --- src/Codec/Compression/LZ4/Conduit.hsc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/Codec/Compression/LZ4/Conduit.hsc b/src/Codec/Compression/LZ4/Conduit.hsc index 3d8fc67..d9e55bd 100644 --- a/src/Codec/Compression/LZ4/Conduit.hsc +++ b/src/Codec/Compression/LZ4/Conduit.hsc @@ -749,7 +749,7 @@ decompress = do -- of the conduit (and not rely on GC to do it "at some point in the future"). liftIO $ finalizeForeignPtr (unLz4FrameDecompressionContext ctx) - -- | Check if there is another frame to decode. + -- Check if there is another frame to decode. peekC >>= \case Nothing -> return () Just _ -> decompress From 6ae42b8884492872474eebcc3bc2dfaa2b4f7ce5 Mon Sep 17 00:00:00 2001 From: Joris Burgers Date: Wed, 15 Nov 2023 16:04:33 +0100 Subject: [PATCH 4/4] Add `decompressChunks` go get original chunks back --- src/Codec/Compression/LZ4/Conduit.hsc | 16 +++++++++++----- 1 file changed, 11 insertions(+), 5 deletions(-) diff --git a/src/Codec/Compression/LZ4/Conduit.hsc b/src/Codec/Compression/LZ4/Conduit.hsc index d9e55bd..4b36a36 100644 --- a/src/Codec/Compression/LZ4/Conduit.hsc +++ b/src/Codec/Compression/LZ4/Conduit.hsc @@ -74,6 +74,7 @@ module Codec.Compression.LZ4.Conduit , compressWithOutBufferSizeMultiFrame , decompress + , decompressChunks , bsChunksOf , ignoreFlush @@ -641,11 +642,13 @@ lz4fDecompress (Lz4FrameDecompressionContext ctxForeignPtr) dstBuffer dstSizePtr } |] return decompressSizeHint +decompress :: (MonadUnliftIO m, MonadResource m) => ConduitT ByteString ByteString m () +decompress = decompressChunks .| ignoreFlush -- | Decompress a ByteString that uses the LZ4 frame format (both single and multiple frames). -- TODO check why decompressSizeHint is always 4 -decompress :: (MonadUnliftIO m, MonadResource m) => ConduitT ByteString ByteString m () -decompress = do +decompressChunks :: (MonadUnliftIO m, MonadResource m) => ConduitT ByteString (Flush ByteString) m () +decompressChunks = do ctx <- liftIO lz4fCreateDecompressionContext -- OK, now here it gets a bit ugly. @@ -707,7 +710,7 @@ decompress = do poke dstBufferSizePtr size peek dstBufferPtr - let loopSingleBs :: (MonadIO m) => CSize -> ByteString -> ConduitT i ByteString m CSize + let loopSingleBs :: (MonadIO m) => CSize -> ByteString -> ConduitT i (Flush ByteString) m CSize loopSingleBs decompressSizeHint bs = do (outBs, srcRead, newDecompressSizeHint) <- liftIO $ unsafeUseAsCStringLen bs $ \(srcBuffer, srcSize) -> do @@ -725,7 +728,10 @@ decompress = do outBs <- packCStringLen (dstBuffer, fromIntegral dstWritten) return (outBs, srcRead, newDecompressSizeHint) - yield outBs + yield (Chunk outBs) + -- When a frame is fully decoded, LZ4F_decompress returns 0 (no more data expected), + -- see https://github.com/lz4/lz4/blob/7cf0bb97b2a988cb17435780d19e145147dd9f70/lib/lz4frame.h#L324 + when (newDecompressSizeHint == 0) $ yield Flush let srcReadInt = fromIntegral srcRead if @@ -752,4 +758,4 @@ decompress = do -- Check if there is another frame to decode. peekC >>= \case Nothing -> return () - Just _ -> decompress + Just _ -> decompressChunks