diff --git a/conduit/ChangeLog.md b/conduit/ChangeLog.md index 1f7cce1aa..33d498863 100644 --- a/conduit/ChangeLog.md +++ b/conduit/ChangeLog.md @@ -1,5 +1,9 @@ # ChangeLog for conduit +## 1.3.6 + +* Avoid dropping upstream items in `mergeSource` [#513](https://github.com/snoyberg/conduit/pull/513) + ## 1.3.5 * Add `groupOn` diff --git a/conduit/src/Data/Conduit/Internal/Conduit.hs b/conduit/src/Data/Conduit/Internal/Conduit.hs index 9139fa353..8384e7d3e 100644 --- a/conduit/src/Data/Conduit/Internal/Conduit.hs +++ b/conduit/src/Data/Conduit/Internal/Conduit.hs @@ -613,7 +613,7 @@ mergeSource = loop . sealConduitT go a = do (src1, mi) <- lift $ src0 $$++ await case mi of - Nothing -> return () + Nothing -> leftover a Just i -> yield (i, a) >> loop src1 diff --git a/conduit/test/main.hs b/conduit/test/main.hs index c7860c622..2d33ea38e 100644 --- a/conduit/test/main.hs +++ b/conduit/test/main.hs @@ -32,7 +32,7 @@ import Control.Monad.Trans.Class (lift) import Control.Monad.Trans.Writer (execWriter, tell, runWriterT) import Control.Monad.Trans.State (evalStateT, get, put) import qualified Control.Monad.Writer as W -import Control.Applicative (pure, (<$>), (<*>)) +import Control.Applicative (pure, (<$>), (<*>), liftA2) import qualified Control.Monad.Catch as Catch import Data.Functor.Identity (Identity,runIdentity) import Control.Monad (forever, void) @@ -704,6 +704,12 @@ main = hspec $ do withShortAlphaIndex = CI.mergeSource (CL.sourceList ["A", "B", "C"]) output <- runConduit $ src .| withShortAlphaIndex .| CL.consume output `shouldBe` [("A", 1), ("B", 2), ("C", 3)] + it "does not drop upstream items" $ do + let num = CL.sourceList [1 .. 10 :: Int] + let chr = CL.sourceList ['a' .. 'c'] + (output, remainder) <- runConduit $ num .| liftA2 (,) (CI.mergeSource chr .| CL.consume) CL.consume + output `shouldBe` [('a', 1), ('b', 2), ('c', 3)] + remainder `shouldBe` [4 .. 10] describe "passthroughSink" $ do it "works" $ do