diff --git a/core/shared/src/main/scala/fs2/Stream.scala b/core/shared/src/main/scala/fs2/Stream.scala index 12168acbc1..1759ff9f6c 100644 --- a/core/shared/src/main/scala/fs2/Stream.scala +++ b/core/shared/src/main/scala/fs2/Stream.scala @@ -346,6 +346,19 @@ final class Stream[+F[_], +O] private[fs2] (private[fs2] val underlying: Pull[F, go(Nil, false, this).stream } + /** Buffers the chunks of this stream through a queue, and returns a new stream that consumes + * chunks from that queue. The new stream terminates after both this stream terminates and all + * chunks are emitted. Errors from this stream are propagated to the new stream. + * + * If the supplied queue is not empty, any residing chunks will be emitted to the new stream. + */ + def bufferThrough[F2[x] >: F[x]: Concurrent, O2 >: O]( + queue: Queue[F2, Option[Chunk[O2]]] + ): Stream[F2, O2] = + Stream + .fromQueueNoneTerminatedChunk(queue) + .concurrently(this.enqueueNoneTerminatedChunks(queue)) + /** Emits only elements that are distinct from their immediate predecessors, * using natural equality for comparison. * @@ -2121,9 +2134,7 @@ final class Stream[+F[_], +O] private[fs2] (private[fs2] val underlying: Pull[F, n: Int ): Stream[F2, O] = Stream.eval(Queue.bounded[F2, Option[Chunk[O]]](n)).flatMap { queue => - Stream - .fromQueueNoneTerminatedChunk(queue) - .concurrently(enqueueNoneTerminatedChunks(queue)) + this.bufferThrough(queue) } /** Rechunks the stream such that output chunks are within `[inputChunk.size * minFactor, inputChunk.size * maxFactor]`.