Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for multiple LZ4 frames #5

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
133 changes: 96 additions & 37 deletions src/Codec/Compression/LZ4/Conduit.hsc
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
{-# LANGUAGE LambdaCase #-}
{-# LANGUAGE MultiWayIf #-}
{-# LANGUAGE PartialTypeSignatures #-}
{-# LANGUAGE QuasiQuotes #-}
{-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE TemplateHaskell #-}
Expand Down Expand Up @@ -69,12 +68,16 @@ module Codec.Compression.LZ4.Conduit
, lz4DefaultPreferences

, compress
, compressMultiFrame
, compressYieldImmediately
, compressWithOutBufferSize
, compressWithOutBufferSizeMultiFrame

, decompress
, decompressChunks

, bsChunksOf
, ignoreFlush

-- * Internals
, Lz4FrameCompressionContext(..)
Expand All @@ -90,19 +93,16 @@ module Codec.Compression.LZ4.Conduit
) where

import UnliftIO.Exception (throwString, bracket)
import UnliftIO.IORef (newIORef, readIORef, modifyIORef')
import Conduit
import Control.Monad (foldM, when)
import Control.Monad.IO.Unlift (MonadUnliftIO)
import Control.Monad.IO.Class (liftIO)
import Control.Monad.Trans.Resource (MonadResource)
import Data.Bits (testBit)
import Data.ByteString (ByteString, packCStringLen)
import Data.ByteString.Unsafe (unsafePackCString, unsafeUseAsCStringLen)
import qualified Data.ByteString as BS
import qualified Data.ByteString.Char8 as BS8
import qualified Data.ByteString.Lazy as BSL
import Data.Conduit
import qualified Data.Conduit.Binary as CB
import Data.Monoid ((<>))
import Foreign.C.Types (CChar, CSize)
import Foreign.ForeignPtr (ForeignPtr, addForeignPtrFinalizer, mallocForeignPtr, mallocForeignPtrBytes, finalizeForeignPtr, withForeignPtr)
import Foreign.Marshal.Alloc (alloca, allocaBytes, malloc, free)
Expand Down Expand Up @@ -333,10 +333,31 @@ lz4fCompressEnd (ScopedLz4FrameCompressionContext ctx) footerBuf footerBufLen =
-- an arbitrarily large amount of input data, as long as the destination
-- buffer is large enough.


-- |
-- Compresss the incoming stream of ByteStrings and put it into a single LZ4 frame.
compress :: (MonadUnliftIO m, MonadResource m) => ConduitT ByteString ByteString m ()
compress = compressWithOutBufferSize 0

-- | Compress the incoming stream of ByteStrings and mark the end of a LZ4 frame with a @Conduit.Flush@.
-- Returns a list of byte offsets that point to the start of each LZ4 frame header, and the last entry is
-- the total length of the compressed ByteString.
--
-- Multiple LZ4 frames are useful whenever you want to chunk a ByteString into parts that are individually decompressable.
compressMultiFrame :: (MonadUnliftIO m, MonadResource m) => ConduitT (Flush ByteString) ByteString m [Int]
{-# INLINE compressMultiFrame #-}
compressMultiFrame = do
counterRef <- newIORef 0
offsetsRef <- newIORef [0]
compressWithOutBufferSizeMultiFrame 0 .| awaitForever (\case
Flush -> do
count <- readIORef counterRef
modifyIORef' offsetsRef (count:)
Chunk bs -> do
modifyIORef' counterRef (BS.length bs +)
yield bs)
offsets <- readIORef offsetsRef
return (reverse offsets)


withLz4CtxAndPrefsConduit ::
(MonadUnliftIO m, MonadResource m)
Expand Down Expand Up @@ -455,9 +476,24 @@ bsChunksOf chunkSize bs
-- Setting `bufferSize = 0` is the legitimate way to set the output buffer
-- size to be the minimum required to compress 16 KB inputs and is still a
-- fast default.

compressWithOutBufferSize :: forall m . (MonadUnliftIO m, MonadResource m) => CSize -> ConduitT ByteString ByteString m ()
compressWithOutBufferSize bufferSize =
compressWithOutBufferSize bufferSize = Conduit.mapC Chunk .| compressWithOutBufferSizeMultiFrame bufferSize .| ignoreFlush

ignoreFlush :: Monad m => ConduitT (Flush a) a m ()
ignoreFlush = awaitForever $ \case
Chunk a -> yield a
Flush -> return ()

data FrameState = EndStream CSize | NewFrame CSize

-- | Compress the incoming stream of ByteStrings and delineate the end of a LZ4 frame with a @Conduit.Flush@.
compressWithOutBufferSizeMultiFrame :: forall m . (MonadUnliftIO m, MonadResource m) => CSize -> ConduitT (Flush ByteString) (Flush ByteString) m ()
compressWithOutBufferSizeMultiFrame bufferSize =
withLz4CtxAndPrefsConduit $ \(ctx, prefs) -> do
-- From the LZ4 manual:
-- "A successful call to LZ4F_compressEnd() makes cctx available again for another compression task.""
-- Therefore we only need a single compression context (ctx) even though we use it for multiple LZ4 frames.

-- We split any incoming ByteString into chunks of this size, so that
-- we can pass this size to `lz4fCompressBound` once and reuse a buffer
Expand All @@ -469,41 +505,44 @@ compressWithOutBufferSize bufferSize =

outBuf <- liftIO $ mallocForeignPtrBytes (fromIntegral outBufferSize)
let withOutBuf f = liftIO $ withForeignPtr outBuf f

let yieldOutBuf outBufLen = do
outBs <- withOutBuf $ \buf -> packCStringLen (buf, fromIntegral outBufLen)
yield outBs
yield (Chunk outBs)

headerSize <- withOutBuf $ \buf -> lz4fCompressBegin ctx prefs buf outBufferSize
let writeHeader = withOutBuf $ \buf -> lz4fCompressBegin ctx prefs buf outBufferSize

let writeFooterAndYield remainingCapacity = do
let offset = fromIntegral $ outBufferSize - remainingCapacity
footerWritten <- withOutBuf $ \buf -> lz4fCompressEnd ctx (buf `plusPtr` offset) remainingCapacity
let outBufLen = outBufferSize - remainingCapacity + footerWritten
yieldOutBuf outBufLen
let writeFooterToBuffer capacity = do
let offset = fromIntegral $ outBufferSize - capacity
footerWritten <- withOutBuf $ \buf -> lz4fCompressEnd ctx (buf `plusPtr` offset) capacity
let outBufLen = outBufferSize - capacity + footerWritten
yieldOutBuf outBufLen
yield Flush -- denote the end of a frame.

-- Passing srcSize==0 provides bound for LZ4F_compressEnd(),
-- see docs of LZ4F_compressBound() for that.
footerSize <- liftIO $ lz4fCompressBound 0 prefs

if remainingCapacity >= footerSize
then do
writeFooterToBuffer remainingCapacity
else do
-- Footer doesn't fit: Yield buffer, put footer into now-free buffer
yieldOutBuf (outBufferSize - remainingCapacity)
writeFooterToBuffer outBufferSize

let loop remainingCapacity = do
await >>= \case
Nothing -> do
-- Done, write footer.

-- Passing srcSize==0 provides bound for LZ4F_compressEnd(),
-- see docs of LZ4F_compressBound() for that.
footerSize <- liftIO $ lz4fCompressBound 0 prefs
Nothing -> return $ EndStream remainingCapacity
Just (Flush) -> return $ NewFrame remainingCapacity

if remainingCapacity >= footerSize
then do
writeFooterAndYield remainingCapacity
else do
-- Footer doesn't fit: Yield buffer, put footer into now-free buffer
yieldOutBuf (outBufferSize - remainingCapacity)
writeFooterAndYield outBufferSize

Just bs -> do
Just (Chunk bs) -> do
let bss = bsChunksOf (fromIntegral bsInChunkSize) bs
newRemainingCapacity <- foldM compressSingleBs remainingCapacity bss
loop newRemainingCapacity

compressSingleBs :: CSize -> ByteString -> ConduitM i ByteString m CSize
compressSingleBs :: CSize -> ByteString -> ConduitM i (Flush ByteString) m CSize
compressSingleBs remainingCapacity bs
| remainingCapacity < compressBound = do
-- Not enough space in outBuf to guarantee that the next call
Expand All @@ -514,7 +553,7 @@ compressWithOutBufferSize bufferSize =
| otherwise = do
compressSingleBsFitting remainingCapacity bs

compressSingleBsFitting :: CSize -> ByteString -> ConduitM i ByteString m CSize
compressSingleBsFitting :: CSize -> ByteString -> ConduitM i (Flush ByteString) m CSize
compressSingleBsFitting remainingCapacity bs = do
when (remainingCapacity < compressBound) $ error "precondition violated"

Expand All @@ -531,7 +570,16 @@ compressWithOutBufferSize bufferSize =
let newRemainingCapacity = remainingCapacity - written
return newRemainingCapacity

loop (outBufferSize - headerSize)
let writeFrame = do
headerSize <- writeHeader
status <- loop (outBufferSize - headerSize)
case status of
EndStream capacity -> writeFooterAndYield capacity
NewFrame capacity -> Conduit.peekC >>= \case -- Only create a new frame if there's still data left.
Nothing -> writeFooterAndYield capacity
Just _ -> writeFooterAndYield capacity >> writeFrame

writeFrame



Expand Down Expand Up @@ -594,10 +642,13 @@ lz4fDecompress (Lz4FrameDecompressionContext ctxForeignPtr) dstBuffer dstSizePtr
} |]
return decompressSizeHint


-- | TODO check why decompressSizeHint is always 4
decompress :: (MonadUnliftIO m, MonadResource m) => ConduitT ByteString ByteString m ()
decompress = do
decompress = decompressChunks .| ignoreFlush

-- | Decompress a ByteString that uses the LZ4 frame format (both single and multiple frames).
-- TODO check why decompressSizeHint is always 4
decompressChunks :: (MonadUnliftIO m, MonadResource m) => ConduitT ByteString (Flush ByteString) m ()
decompressChunks = do
ctx <- liftIO lz4fCreateDecompressionContext

-- OK, now here it gets a bit ugly.
Expand Down Expand Up @@ -659,7 +710,7 @@ decompress = do
poke dstBufferSizePtr size
peek dstBufferPtr

let loopSingleBs :: CSize -> ByteString -> _
let loopSingleBs :: (MonadIO m) => CSize -> ByteString -> ConduitT i (Flush ByteString) m CSize
loopSingleBs decompressSizeHint bs = do
(outBs, srcRead, newDecompressSizeHint) <- liftIO $
unsafeUseAsCStringLen bs $ \(srcBuffer, srcSize) -> do
Expand All @@ -677,7 +728,10 @@ decompress = do
outBs <- packCStringLen (dstBuffer, fromIntegral dstWritten)
return (outBs, srcRead, newDecompressSizeHint)

yield outBs
yield (Chunk outBs)
-- When a frame is fully decoded, LZ4F_decompress returns 0 (no more data expected),
-- see https://github.com/lz4/lz4/blob/7cf0bb97b2a988cb17435780d19e145147dd9f70/lib/lz4frame.h#L324
when (newDecompressSizeHint == 0) $ yield Flush

let srcReadInt = fromIntegral srcRead
if
Expand All @@ -700,3 +754,8 @@ decompress = do
-- Force resource release here to guarantee memory constantness
-- of the conduit (and not rely on GC to do it "at some point in the future").
liftIO $ finalizeForeignPtr (unLz4FrameDecompressionContext ctx)

-- Check if there is another frame to decode.
peekC >>= \case
Nothing -> return ()
Just _ -> decompressChunks
70 changes: 60 additions & 10 deletions test/Main.hs
Original file line number Diff line number Diff line change
@@ -1,22 +1,21 @@
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE LambdaCase #-}

module Main (main) where

import Codec.Compression.LZ4.Conduit (compress, decompress, bsChunksOf)
import Control.Monad (when)
import Control.Monad.IO.Unlift (MonadUnliftIO)
import Control.Monad.Trans.Resource (ResourceT, runResourceT)
import Codec.Compression.LZ4.Conduit (compress, compressMultiFrame, decompress, bsChunksOf)
import Conduit
import Control.Monad (when, void)
import Data.ByteString (ByteString)
import qualified Data.ByteString as BS
import qualified Data.ByteString.Lazy as BSL
import qualified Data.ByteString.Lazy.Char8 as BSL8
import Data.Conduit
import Data.Conduit.Binary as CB
import qualified Data.Conduit.List as CL
import qualified Data.Conduit.Process as CP
import Data.List (intersperse)
import Test.Hspec
import Test.Hspec.QuickCheck (modifyMaxSize)
import Test.Hspec.QuickCheck (modifyMaxSize, prop)
import qualified Test.QuickCheck as QC
import qualified Test.QuickCheck.Monadic as QCM

Expand All @@ -25,13 +24,22 @@ import qualified Test.QuickCheck.Monadic as QCM
runCompressToLZ4 :: (MonadUnliftIO m) => ConduitT () ByteString (ResourceT m) () -> m ByteString
runCompressToLZ4 source = runResourceT $ do

(_, result, _) <- CP.sourceCmdWithStreams "lz4 -d" (source .| compress) CL.consume CL.consume
return $ BS.concat result
(_, result, _) <- CP.sourceCmdWithStreams "lz4 -d" (source .| compress) foldC foldC
return result

-- | Create a new LZ4 frame every len bytes.
createFramesOfCE :: (MonadIO m, Monad m) => Int -> ConduitT ByteString (Flush ByteString) m ()
createFramesOfCE len = chunksOfCE len .| awaitForever (\x -> yield (Chunk x) >> yield Flush)

runCompressMultiFrameToLZ4 :: (MonadUnliftIO m) => ConduitT () ByteString (ResourceT m) () -> m ByteString
runCompressMultiFrameToLZ4 source = runResourceT $ do
(_, result, _) <- CP.sourceCmdWithStreams "lz4 -d" (source .| createFramesOfCE 1000 .| void compressMultiFrame) foldC foldC
return result

runLZ4ToDecompress :: (MonadUnliftIO m) => ConduitT () ByteString (ResourceT m) () -> m ByteString
runLZ4ToDecompress source = runResourceT $ do
(_, result, _) <- CP.sourceCmdWithStreams "lz4 -c" source (decompress .| CL.consume) CL.consume
return $ BS.concat result
(_, result, _) <- CP.sourceCmdWithStreams "lz4 -c" source (decompress .| foldC) foldC
return result

main :: IO ()
main = do
Expand Down Expand Up @@ -74,6 +82,33 @@ main = do
actual <- runCompressToLZ4 (CB.sourceLbs $ BSL.fromStrict bs)
actual `shouldBe` bs

describe "Compression MultiFrame" $ do
it "compresses simple string" $ do
let string = "hellohellohellohello"
actual <- runCompressMultiFrameToLZ4 (yield string)
actual `shouldBe` string

it "compresses 100000 integers" $ do
let strings = prepare $ map (BSL8.pack . show) [1..100000 :: Int]
actual <- runCompressMultiFrameToLZ4 (CL.sourceList strings)
actual `shouldBe` (BS.concat strings)

it "compresses 100000 strings" $ do
let strings = prepare $ replicate 100000 "hello"
actual <- runCompressMultiFrameToLZ4 (CL.sourceList strings)
actual `shouldBe` (BS.concat strings)

it "compresses 1MB ByteString" $ do
let bs = BS.replicate 100000 42
actual <- runCompressToLZ4 (CB.sourceLbs $ BSL.fromStrict bs)
actual `shouldBe` bs

it "compresses 5GiB ByteString" $ do -- more than 32-bit many Bytes
skipBigmemTest
let bs = BS.replicate (5 * 1024*1024*1024) 42
actual <- runCompressMultiFrameToLZ4 (CB.sourceLbs $ BSL.fromStrict bs)
actual `shouldBe` bs

describe "Decompression" $ do
it "decompresses simple string" $ do
let string = "hellohellohellohello"
Expand Down Expand Up @@ -107,3 +142,18 @@ main = do
let bs = BSL.toChunks $ BSL8.pack string
actual <- QCM.run (runConduitRes $ CL.sourceList bs .| compress .| decompress .| CL.consume)
QCM.assert(BS.concat bs == BS.concat actual)

describe "Identity MultiFrame" $
modifyMaxSize (const 10000) $ prop "compress and decompress arbitrary strings" $ \string -> do
let bs = BS.pack string
actual <- runConduitRes $ yield bs .| createFramesOfCE 10 .| void compressMultiFrame .| decompress .| foldC
actual `shouldBe` bs

describe "Slice a multiframe" $
it "Decompress only the second frame." $ do
let bs = BS.pack $ replicate 10 0 ++ replicate 10 1 ++ replicate 10 2
(offsets, cbs) <- runConduitRes $ yield bs .| createFramesOfCE 10 .| compressMultiFrame `fuseBoth` foldC
let mark = offsets !! 1
mark2 = offsets !! 2
actual <- runConduitRes $ yield cbs .| (dropCE mark >> takeCE (mark2 - mark) .| decompress .| foldC)
actual `shouldBe` (BS.pack $ replicate 10 1)