From a03fc8135e5d1c4624ede2c1ca18a967739c37ea Mon Sep 17 00:00:00 2001 From: Ben Plommer Date: Thu, 17 Mar 2022 11:26:01 +0000 Subject: [PATCH 1/2] Add evalFlatten methods --- core/shared/src/main/scala/fs2/Stream.scala | 24 +++++++++++++++---- .../io/net/unixsocket/UnixSocketsSuite.scala | 2 +- 2 files changed, 21 insertions(+), 5 deletions(-) diff --git a/core/shared/src/main/scala/fs2/Stream.scala b/core/shared/src/main/scala/fs2/Stream.scala index c150b8fa7a..5a81c6062a 100644 --- a/core/shared/src/main/scala/fs2/Stream.scala +++ b/core/shared/src/main/scala/fs2/Stream.scala @@ -4073,10 +4073,7 @@ object Stream extends StreamLowPriority { } def outcomeJoiner: F[Unit] = - outcomes.stream - .evalMap(identity) - .compile - .drain + outcomes.stream.evalFlatten.compile.drain .guaranteeCase { case Outcome.Succeeded(_) => stop(None) >> output.close.void @@ -4117,6 +4114,25 @@ object Stream extends StreamLowPriority { parJoin(Int.MaxValue) } + /** Provides syntax for a stream of `F` effects. */ + implicit class StreamFOps[F[_], O](private val self: Stream[F, F[O]]) { + + /** Sequences the inner effects into the stream. */ + def evalFlatten: Stream[F, O] = + self.evalMap(identity) + + /** Evaluates up to `maxConcurrent` inner effects concurrently, emitting + * the results in order. + */ + def parEvalFlatten( + maxConcurrent: Int + )(implicit F: Concurrent[F]) = self.parEvalMap(maxConcurrent)(identity) + + /** Evaluates all inner effects concurrently, emitting the results in order. + */ + def parEvalFlattenUnbounded(implicit F: Concurrent[F]) = self.parEvalMapUnbounded(identity) + } + /** Provides syntax for pure streams. */ implicit final class PureOps[O](private val self: Stream[Pure, O]) extends AnyVal { diff --git a/io/shared/src/test/scala/fs2/io/net/unixsocket/UnixSocketsSuite.scala b/io/shared/src/test/scala/fs2/io/net/unixsocket/UnixSocketsSuite.scala index 09fe206d5b..3042e99335 100644 --- a/io/shared/src/test/scala/fs2/io/net/unixsocket/UnixSocketsSuite.scala +++ b/io/shared/src/test/scala/fs2/io/net/unixsocket/UnixSocketsSuite.scala @@ -47,7 +47,7 @@ class UnixSocketsSuite extends Fs2Suite with UnixSocketsSuitePlatform { val clients = (0 until 100).map(b => client(Chunk.singleton(b.toByte))) - (Stream.sleep_[IO](1.second) ++ Stream.emits(clients).evalMap(identity)) + (Stream.sleep_[IO](1.second) ++ Stream.emits(clients).evalFlatten) .concurrently(server) .compile .drain From 0c898d1ff80616748acc13fe61af502457d0fd19 Mon Sep 17 00:00:00 2001 From: Ben Plommer Date: Sun, 27 Mar 2022 13:54:00 +0100 Subject: [PATCH 2/2] Add type annotation, remove parEvalFlattenUnbounded --- core/shared/src/main/scala/fs2/Stream.scala | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/core/shared/src/main/scala/fs2/Stream.scala b/core/shared/src/main/scala/fs2/Stream.scala index 5a81c6062a..49f6d19754 100644 --- a/core/shared/src/main/scala/fs2/Stream.scala +++ b/core/shared/src/main/scala/fs2/Stream.scala @@ -4126,11 +4126,7 @@ object Stream extends StreamLowPriority { */ def parEvalFlatten( maxConcurrent: Int - )(implicit F: Concurrent[F]) = self.parEvalMap(maxConcurrent)(identity) - - /** Evaluates all inner effects concurrently, emitting the results in order. - */ - def parEvalFlattenUnbounded(implicit F: Concurrent[F]) = self.parEvalMapUnbounded(identity) + )(implicit F: Concurrent[F]): Stream[F, O] = self.parEvalMap(maxConcurrent)(identity) } /** Provides syntax for pure streams. */