From 5e7dc9f73b40a70f459f5e9d0720962660d5e14d Mon Sep 17 00:00:00 2001 From: tothpeti Date: Tue, 22 Aug 2023 15:40:18 +0200 Subject: [PATCH 01/16] #3279 | Add sendAndClose implementation and test --- .../main/scala/fs2/concurrent/Channel.scala | 28 +++++++++++++++++++ .../scala/fs2/concurrent/ChannelSuite.scala | 14 ++++++++++ 2 files changed, 42 insertions(+) diff --git a/core/shared/src/main/scala/fs2/concurrent/Channel.scala b/core/shared/src/main/scala/fs2/concurrent/Channel.scala index 70b9cf7ddd..69665058ec 100644 --- a/core/shared/src/main/scala/fs2/concurrent/Channel.scala +++ b/core/shared/src/main/scala/fs2/concurrent/Channel.scala @@ -110,6 +110,12 @@ sealed trait Channel[F[_], A] { */ def close: F[Either[Channel.Closed, Unit]] + /** Sends an element through this channel, and closes it right after. + * + * No-op if the channel is closed, see [[close]] for further info. + */ + def sendAndClose(a: A): F[Either[Channel.Closed, Unit]] + /** Returns true if this channel is closed */ def isClosed: F[Boolean] @@ -188,6 +194,28 @@ object Channel { (s, rightFalse.pure[F]) } + def sendAndClose(a: A) = + F.deferred[Unit].flatMap { producer => + state.flatModifyFull { case (_, state) => + state match { + case s @ State(_, _, _, _, closed @ true) => + (s, Channel.closed[Unit].pure[F]) + + case State(values, size, waiting, producers, closed @ false) => + if (size < capacity) + ( + State(a :: values, size + 1, None, producers, false), + notifyStream(waiting).as(rightUnit) + ) + else + ( + State(values, size, None, (a, producer) :: producers, true), + notifyStream(waiting).as(rightUnit) <* signalClosure + ) + } + } + } + def close = state.flatModify { case s @ State(_, _, _, _, closed @ true) => diff --git a/core/shared/src/test/scala/fs2/concurrent/ChannelSuite.scala b/core/shared/src/test/scala/fs2/concurrent/ChannelSuite.scala index ce250c0e07..ff490d9d72 100644 --- a/core/shared/src/test/scala/fs2/concurrent/ChannelSuite.scala +++ b/core/shared/src/test/scala/fs2/concurrent/ChannelSuite.scala @@ -251,4 +251,18 @@ class ChannelSuite extends Fs2Suite { TestControl.executeEmbed(test) } + + test("sendAndClose closes right after sending the last element") { + val v = Vector(1, 2, 3, 4) + val capacity = 3 + val p = for { + chan <- Channel.bounded[IO, Int](capacity) + _ <- v.traverse(chan.sendAndClose) + isClosed <- chan.isClosed + res <- chan.stream.chunks.take(1).compile.lastOrError + } yield (res.toVector, isClosed) + + p._1F.assertEquals(v.take(capacity)) + p._2F.assertEquals(true) + } } From 64be9c8ff45d0badfdaa80f89b7b7d3315348a33 Mon Sep 17 00:00:00 2001 From: tothpeti Date: Tue, 22 Aug 2023 20:15:39 +0200 Subject: [PATCH 02/16] #3279 | Refactor --- .../main/scala/fs2/concurrent/Channel.scala | 33 ++++++------------- .../scala/fs2/concurrent/ChannelSuite.scala | 16 +++++++++ 2 files changed, 26 insertions(+), 23 deletions(-) diff --git a/core/shared/src/main/scala/fs2/concurrent/Channel.scala b/core/shared/src/main/scala/fs2/concurrent/Channel.scala index 69665058ec..3e574171e5 100644 --- a/core/shared/src/main/scala/fs2/concurrent/Channel.scala +++ b/core/shared/src/main/scala/fs2/concurrent/Channel.scala @@ -157,7 +157,7 @@ object Channel { .drain } - def send(a: A) = + def sendImpl(a: A, close: Boolean) = F.deferred[Unit].flatMap { producer => state.flatModifyFull { case (poll, state) => state match { @@ -170,6 +170,11 @@ object Channel { State(a :: values, size + 1, None, producers, false), notifyStream(waiting).as(rightUnit) ) + else if (close) + ( + State(values, size, None, producers, true), + notifyStream(waiting).as(rightUnit) <* signalClosure + ) else ( State(values, size, None, (a, producer) :: producers, false), @@ -179,6 +184,10 @@ object Channel { } } + def send(a: A) = sendImpl(a, false) + + def sendAndClose(a: A) = sendImpl(a, true) + def trySend(a: A) = state.flatModify { case s @ State(_, _, _, _, closed @ true) => @@ -194,28 +203,6 @@ object Channel { (s, rightFalse.pure[F]) } - def sendAndClose(a: A) = - F.deferred[Unit].flatMap { producer => - state.flatModifyFull { case (_, state) => - state match { - case s @ State(_, _, _, _, closed @ true) => - (s, Channel.closed[Unit].pure[F]) - - case State(values, size, waiting, producers, closed @ false) => - if (size < capacity) - ( - State(a :: values, size + 1, None, producers, false), - notifyStream(waiting).as(rightUnit) - ) - else - ( - State(values, size, None, (a, producer) :: producers, true), - notifyStream(waiting).as(rightUnit) <* signalClosure - ) - } - } - } - def close = state.flatModify { case s @ State(_, _, _, _, closed @ true) => diff --git a/core/shared/src/test/scala/fs2/concurrent/ChannelSuite.scala b/core/shared/src/test/scala/fs2/concurrent/ChannelSuite.scala index ff490d9d72..3028e38677 100644 --- a/core/shared/src/test/scala/fs2/concurrent/ChannelSuite.scala +++ b/core/shared/src/test/scala/fs2/concurrent/ChannelSuite.scala @@ -265,4 +265,20 @@ class ChannelSuite extends Fs2Suite { p._1F.assertEquals(v.take(capacity)) p._2F.assertEquals(true) } + + test("racing send and sendAndClose should work in bounded case") { + val test = Channel.bounded[IO, Int](2).flatMap { ch => + ch.send(0).both(ch.sendAndClose(1)) + } + test.assertEquals((Right(()), Right(()))) + } + + test("racing send and sendAndClose should work in unbounded case") { + val test = Channel.unbounded[IO, Int].flatMap { ch => + ch.send(0).both(ch.sendAndClose(1)) + } + + test.assertEquals((Right(()), Right(()))) + } + } From 4768c5193ef222b722e829e0e83c43949be4755a Mon Sep 17 00:00:00 2001 From: tothpeti Date: Thu, 24 Aug 2023 09:35:59 +0200 Subject: [PATCH 03/16] #3279 | Refactor --- .../src/main/scala/fs2/concurrent/Channel.scala | 11 ++++------- .../test/scala/fs2/concurrent/ChannelSuite.scala | 14 ++++---------- 2 files changed, 8 insertions(+), 17 deletions(-) diff --git a/core/shared/src/main/scala/fs2/concurrent/Channel.scala b/core/shared/src/main/scala/fs2/concurrent/Channel.scala index 3e574171e5..ab5da918b4 100644 --- a/core/shared/src/main/scala/fs2/concurrent/Channel.scala +++ b/core/shared/src/main/scala/fs2/concurrent/Channel.scala @@ -168,17 +168,14 @@ object Channel { if (size < capacity) ( State(a :: values, size + 1, None, producers, false), - notifyStream(waiting).as(rightUnit) - ) - else if (close) - ( - State(values, size, None, producers, true), - notifyStream(waiting).as(rightUnit) <* signalClosure + signalClosure.whenA(close) *> notifyStream(waiting).as(rightUnit) ) else ( State(values, size, None, (a, producer) :: producers, false), - notifyStream(waiting).as(rightUnit) <* waitOnBound(producer, poll) + signalClosure.whenA(close) *> + notifyStream(waiting).as(rightUnit) <* + waitOnBound(producer, poll) ) } } diff --git a/core/shared/src/test/scala/fs2/concurrent/ChannelSuite.scala b/core/shared/src/test/scala/fs2/concurrent/ChannelSuite.scala index 3028e38677..a4e14f6cf1 100644 --- a/core/shared/src/test/scala/fs2/concurrent/ChannelSuite.scala +++ b/core/shared/src/test/scala/fs2/concurrent/ChannelSuite.scala @@ -253,17 +253,11 @@ class ChannelSuite extends Fs2Suite { } test("sendAndClose closes right after sending the last element") { - val v = Vector(1, 2, 3, 4) - val capacity = 3 - val p = for { - chan <- Channel.bounded[IO, Int](capacity) - _ <- v.traverse(chan.sendAndClose) - isClosed <- chan.isClosed - res <- chan.stream.chunks.take(1).compile.lastOrError - } yield (res.toVector, isClosed) + val result = Channel.bounded[IO, Int](1).flatMap { ch => + ch.sendAndClose(0) *> ch.isClosed + } - p._1F.assertEquals(v.take(capacity)) - p._2F.assertEquals(true) + result.assertEquals(true) } test("racing send and sendAndClose should work in bounded case") { From 68c7b289aaa6e8c8343a8e67c40cf55ea776c0f9 Mon Sep 17 00:00:00 2001 From: tothpeti Date: Thu, 24 Aug 2023 16:36:56 +0200 Subject: [PATCH 04/16] #3279 | Add test case --- .../src/test/scala/fs2/concurrent/ChannelSuite.scala | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/core/shared/src/test/scala/fs2/concurrent/ChannelSuite.scala b/core/shared/src/test/scala/fs2/concurrent/ChannelSuite.scala index a4e14f6cf1..bb657cc0bf 100644 --- a/core/shared/src/test/scala/fs2/concurrent/ChannelSuite.scala +++ b/core/shared/src/test/scala/fs2/concurrent/ChannelSuite.scala @@ -252,7 +252,7 @@ class ChannelSuite extends Fs2Suite { TestControl.executeEmbed(test) } - test("sendAndClose closes right after sending the last element") { + test("sendAndClose closes right after sending the last element (bounded case)") { val result = Channel.bounded[IO, Int](1).flatMap { ch => ch.sendAndClose(0) *> ch.isClosed } @@ -260,6 +260,14 @@ class ChannelSuite extends Fs2Suite { result.assertEquals(true) } + test("sendAndClose closes right after sending the last element (unbounded case)") { + val result = Channel.unbounded[IO, Int].flatMap { ch => + ch.sendAndClose(0) *> ch.isClosed + } + + result.assertEquals(true) + } + test("racing send and sendAndClose should work in bounded case") { val test = Channel.bounded[IO, Int](2).flatMap { ch => ch.send(0).both(ch.sendAndClose(1)) From 425144f99f5a771796322be87bd65799bbeddd1f Mon Sep 17 00:00:00 2001 From: tothpeti Date: Thu, 24 Aug 2023 20:12:59 +0200 Subject: [PATCH 05/16] #3279 | Add new test cases --- .../main/scala/fs2/concurrent/Channel.scala | 4 +-- .../scala/fs2/concurrent/ChannelSuite.scala | 26 +++++++++++++++---- 2 files changed, 23 insertions(+), 7 deletions(-) diff --git a/core/shared/src/main/scala/fs2/concurrent/Channel.scala b/core/shared/src/main/scala/fs2/concurrent/Channel.scala index ab5da918b4..bc6947ffc2 100644 --- a/core/shared/src/main/scala/fs2/concurrent/Channel.scala +++ b/core/shared/src/main/scala/fs2/concurrent/Channel.scala @@ -167,12 +167,12 @@ object Channel { case State(values, size, waiting, producers, closed @ false) => if (size < capacity) ( - State(a :: values, size + 1, None, producers, false), + State(a :: values, size + 1, None, producers, close), signalClosure.whenA(close) *> notifyStream(waiting).as(rightUnit) ) else ( - State(values, size, None, (a, producer) :: producers, false), + State(values, size, None, (a, producer) :: producers, close), signalClosure.whenA(close) *> notifyStream(waiting).as(rightUnit) <* waitOnBound(producer, poll) diff --git a/core/shared/src/test/scala/fs2/concurrent/ChannelSuite.scala b/core/shared/src/test/scala/fs2/concurrent/ChannelSuite.scala index bb657cc0bf..a365650b5c 100644 --- a/core/shared/src/test/scala/fs2/concurrent/ChannelSuite.scala +++ b/core/shared/src/test/scala/fs2/concurrent/ChannelSuite.scala @@ -268,19 +268,35 @@ class ChannelSuite extends Fs2Suite { result.assertEquals(true) } - test("racing send and sendAndClose should work in bounded case") { + test("racing send and sendAndClose should work in bounded(1) case") { + val test = Channel.bounded[IO, Int](1).flatMap { ch => + ch.send(0).both(ch.sendAndClose(1)).parProduct(ch.stream.compile.toList) + } + + test.assertEquals(((Right(()), Right(())), List(0, 1))) + } + + test("racing send and sendAndClose should work in bounded(2) case") { val test = Channel.bounded[IO, Int](2).flatMap { ch => - ch.send(0).both(ch.sendAndClose(1)) + ch.send(0).both(ch.sendAndClose(1)).parProduct(ch.stream.compile.toList) } - test.assertEquals((Right(()), Right(()))) + + test.assertEquals(((Right(()), Right(())), List(0, 1))) } test("racing send and sendAndClose should work in unbounded case") { val test = Channel.unbounded[IO, Int].flatMap { ch => - ch.send(0).both(ch.sendAndClose(1)) + ch.send(0).both(ch.sendAndClose(1)).parProduct(ch.stream.compile.toList) } - test.assertEquals((Right(()), Right(()))) + test.assertEquals(((Right(()), Right(())), List(0, 1))) } + test("racing send and sendAndClose should work in synchronous case") { + val test = Channel.synchronous[IO, Int].flatMap { ch => + ch.send(0).both(ch.sendAndClose(1)).parProduct(ch.stream.compile.toList) + } + + test.assertEquals(((Right(()), Right(())), List(0, 1))) + } } From 7d2a71eceb91cc970e4eec996b4c0c78d8a7b2d7 Mon Sep 17 00:00:00 2001 From: tothpeti Date: Thu, 24 Aug 2023 20:45:25 +0200 Subject: [PATCH 06/16] #3279 | Extend test cases --- .../scala/fs2/concurrent/ChannelSuite.scala | 21 +++++++++++++++++++ 1 file changed, 21 insertions(+) diff --git a/core/shared/src/test/scala/fs2/concurrent/ChannelSuite.scala b/core/shared/src/test/scala/fs2/concurrent/ChannelSuite.scala index a365650b5c..36eb1a7838 100644 --- a/core/shared/src/test/scala/fs2/concurrent/ChannelSuite.scala +++ b/core/shared/src/test/scala/fs2/concurrent/ChannelSuite.scala @@ -273,7 +273,12 @@ class ChannelSuite extends Fs2Suite { ch.send(0).both(ch.sendAndClose(1)).parProduct(ch.stream.compile.toList) } + val test2 = Channel.bounded[IO, Int](1).flatMap { ch => + ch.sendAndClose(1).both(ch.send(0)).parProduct(ch.stream.compile.toList) + } + test.assertEquals(((Right(()), Right(())), List(0, 1))) + test2.assertEquals(((Right(()), Left(Channel.Closed)), List(1))) } test("racing send and sendAndClose should work in bounded(2) case") { @@ -281,7 +286,12 @@ class ChannelSuite extends Fs2Suite { ch.send(0).both(ch.sendAndClose(1)).parProduct(ch.stream.compile.toList) } + val test2 = Channel.bounded[IO, Int](2).flatMap { ch => + ch.sendAndClose(1).both(ch.send(0)).parProduct(ch.stream.compile.toList) + } + test.assertEquals(((Right(()), Right(())), List(0, 1))) + test2.assertEquals(((Right(()), Left(Channel.Closed)), List(1))) } test("racing send and sendAndClose should work in unbounded case") { @@ -289,7 +299,12 @@ class ChannelSuite extends Fs2Suite { ch.send(0).both(ch.sendAndClose(1)).parProduct(ch.stream.compile.toList) } + val test2 = Channel.unbounded[IO, Int].flatMap { ch => + ch.sendAndClose(1).both(ch.send(0)).parProduct(ch.stream.compile.toList) + } + test.assertEquals(((Right(()), Right(())), List(0, 1))) + test2.assertEquals(((Right(()), Left(Channel.Closed)), List(1))) } test("racing send and sendAndClose should work in synchronous case") { @@ -297,6 +312,12 @@ class ChannelSuite extends Fs2Suite { ch.send(0).both(ch.sendAndClose(1)).parProduct(ch.stream.compile.toList) } + val test2 = Channel.synchronous[IO, Int].flatMap { ch => + ch.sendAndClose(1).both(ch.send(0)).parProduct(ch.stream.compile.toList) + } + test.assertEquals(((Right(()), Right(())), List(0, 1))) + test2.assertEquals(((Right(()), Left(Channel.Closed)), List(1))) } + } From 6d023acdbc78307b5540dd60aec162bf08ba0e35 Mon Sep 17 00:00:00 2001 From: tothpeti Date: Sun, 3 Sep 2023 14:13:25 +0200 Subject: [PATCH 07/16] #3279 | Refactor --- .../scala/fs2/concurrent/ChannelSuite.scala | 64 ++++++++++++------- 1 file changed, 40 insertions(+), 24 deletions(-) diff --git a/core/shared/src/test/scala/fs2/concurrent/ChannelSuite.scala b/core/shared/src/test/scala/fs2/concurrent/ChannelSuite.scala index 36eb1a7838..c67e2cdfd0 100644 --- a/core/shared/src/test/scala/fs2/concurrent/ChannelSuite.scala +++ b/core/shared/src/test/scala/fs2/concurrent/ChannelSuite.scala @@ -269,55 +269,71 @@ class ChannelSuite extends Fs2Suite { } test("racing send and sendAndClose should work in bounded(1) case") { - val test = Channel.bounded[IO, Int](1).flatMap { ch => + val result = Channel.bounded[IO, Int](1).flatMap { ch => ch.send(0).both(ch.sendAndClose(1)).parProduct(ch.stream.compile.toList) } - val test2 = Channel.bounded[IO, Int](1).flatMap { ch => - ch.sendAndClose(1).both(ch.send(0)).parProduct(ch.stream.compile.toList) - } + val expectedFirstCase = ((Right(()), Right(())), List(0, 1)) + val expectedSecondCase = ((Right(()), Left(Channel.Closed)), List(1)) - test.assertEquals(((Right(()), Right(())), List(0, 1))) - test2.assertEquals(((Right(()), Left(Channel.Closed)), List(1))) + result.map { + case obtained @ ((Right(()), Right(())), List(0, 1)) => + assertEquals(obtained, expectedFirstCase) + case obtained @ ((Right(()), Left(Channel.Closed)), List(1)) => + assertEquals(obtained, expectedSecondCase) + case _ => fail("An unknown test result was received.") + } } test("racing send and sendAndClose should work in bounded(2) case") { - val test = Channel.bounded[IO, Int](2).flatMap { ch => + val result = Channel.bounded[IO, Int](2).flatMap { ch => ch.send(0).both(ch.sendAndClose(1)).parProduct(ch.stream.compile.toList) } - val test2 = Channel.bounded[IO, Int](2).flatMap { ch => - ch.sendAndClose(1).both(ch.send(0)).parProduct(ch.stream.compile.toList) - } + val expectedFirstCase = ((Right(()), Right(())), List(0, 1)) + val expectedSecondCase = ((Right(()), Left(Channel.Closed)), List(1)) - test.assertEquals(((Right(()), Right(())), List(0, 1))) - test2.assertEquals(((Right(()), Left(Channel.Closed)), List(1))) + result.map { + case obtained @ ((Right(()), Right(())), List(0, 1)) => + assertEquals(obtained, expectedFirstCase) + case obtained @ ((Right(()), Left(Channel.Closed)), List(1)) => + assertEquals(obtained, expectedSecondCase) + case _ => fail("An unknown test result was received.") + } } test("racing send and sendAndClose should work in unbounded case") { - val test = Channel.unbounded[IO, Int].flatMap { ch => + val result = Channel.unbounded[IO, Int].flatMap { ch => ch.send(0).both(ch.sendAndClose(1)).parProduct(ch.stream.compile.toList) } - val test2 = Channel.unbounded[IO, Int].flatMap { ch => - ch.sendAndClose(1).both(ch.send(0)).parProduct(ch.stream.compile.toList) - } + val expectedFirstCase = ((Right(()), Right(())), List(0, 1)) + val expectedSecondCase = ((Right(()), Left(Channel.Closed)), List(1)) - test.assertEquals(((Right(()), Right(())), List(0, 1))) - test2.assertEquals(((Right(()), Left(Channel.Closed)), List(1))) + result.map { + case obtained @ ((Right(()), Right(())), List(0, 1)) => + assertEquals(obtained, expectedFirstCase) + case obtained @ ((Right(()), Left(Channel.Closed)), List(1)) => + assertEquals(obtained, expectedSecondCase) + case _ => fail("An unknown test result was received.") + } } test("racing send and sendAndClose should work in synchronous case") { - val test = Channel.synchronous[IO, Int].flatMap { ch => + val result = Channel.synchronous[IO, Int].flatMap { ch => ch.send(0).both(ch.sendAndClose(1)).parProduct(ch.stream.compile.toList) } - val test2 = Channel.synchronous[IO, Int].flatMap { ch => - ch.sendAndClose(1).both(ch.send(0)).parProduct(ch.stream.compile.toList) - } + val expectedFirstCase = ((Right(()), Right(())), List(0, 1)) + val expectedSecondCase = ((Right(()), Left(Channel.Closed)), List(1)) - test.assertEquals(((Right(()), Right(())), List(0, 1))) - test2.assertEquals(((Right(()), Left(Channel.Closed)), List(1))) + result.map { + case obtained @ ((Right(()), Right(())), List(0, 1)) => + assertEquals(obtained, expectedFirstCase) + case obtained @ ((Right(()), Left(Channel.Closed)), List(1)) => + assertEquals(obtained, expectedSecondCase) + case _ => fail("An unknown test result was received.") + } } } From c316652828bc10c2a4f1913de29030c3b76f15e9 Mon Sep 17 00:00:00 2001 From: Arman Bilge Date: Sun, 3 Sep 2023 08:14:24 -0700 Subject: [PATCH 08/16] Add MiMa filter --- build.sbt | 3 +++ 1 file changed, 3 insertions(+) diff --git a/build.sbt b/build.sbt index a5ebde3c82..a71fc99eb5 100644 --- a/build.sbt +++ b/build.sbt @@ -215,6 +215,9 @@ ThisBuild / mimaBinaryIssueFilters ++= Seq( ), ProblemFilters.exclude[DirectMissingMethodProblem]( "fs2.io.file.Watcher#DefaultWatcher.this" + ), + ProblemFilters.exclude[ReversedMissingMethodProblem]( + "fs2.concurrent.Channel.sendAndClose" ) ) From feda948a0dd8915eacbd653cfeef1a973a99861a Mon Sep 17 00:00:00 2001 From: tothpeti Date: Thu, 7 Sep 2023 20:15:24 +0200 Subject: [PATCH 09/16] Refactor --- .../main/scala/fs2/concurrent/Channel.scala | 1 + .../scala/fs2/concurrent/ChannelSuite.scala | 105 ++++++++---------- 2 files changed, 46 insertions(+), 60 deletions(-) diff --git a/core/shared/src/main/scala/fs2/concurrent/Channel.scala b/core/shared/src/main/scala/fs2/concurrent/Channel.scala index bc6947ffc2..8d48101fd6 100644 --- a/core/shared/src/main/scala/fs2/concurrent/Channel.scala +++ b/core/shared/src/main/scala/fs2/concurrent/Channel.scala @@ -111,6 +111,7 @@ sealed trait Channel[F[_], A] { def close: F[Either[Channel.Closed, Unit]] /** Sends an element through this channel, and closes it right after. + * This operation is atomic so it is guaranteed that this will be the last element sent to the channel. * * No-op if the channel is closed, see [[close]] for further info. */ diff --git a/core/shared/src/test/scala/fs2/concurrent/ChannelSuite.scala b/core/shared/src/test/scala/fs2/concurrent/ChannelSuite.scala index c67e2cdfd0..74f7de3bd0 100644 --- a/core/shared/src/test/scala/fs2/concurrent/ChannelSuite.scala +++ b/core/shared/src/test/scala/fs2/concurrent/ChannelSuite.scala @@ -252,88 +252,73 @@ class ChannelSuite extends Fs2Suite { TestControl.executeEmbed(test) } - test("sendAndClose closes right after sending the last element (bounded case)") { - val result = Channel.bounded[IO, Int](1).flatMap { ch => - ch.sendAndClose(0) *> ch.isClosed - } + def checkIfSendAndCloseClosing(channel: IO[Channel[IO, Int]]) = + channel + .flatMap { ch => + ch.sendAndClose(0) *> ch.isClosed + } + .assertEquals(true) - result.assertEquals(true) + test("sendAndClose closes right after sending the last element in bounded(1) case") { + val channel = Channel.bounded[IO, Int](1) + + checkIfSendAndCloseClosing(channel).parReplicateA_(if (isJVM) 10000 else 1) } - test("sendAndClose closes right after sending the last element (unbounded case)") { - val result = Channel.unbounded[IO, Int].flatMap { ch => - ch.sendAndClose(0) *> ch.isClosed - } + test("sendAndClose closes right after sending the last element in bounded(2) case") { + val channel = Channel.bounded[IO, Int](2) - result.assertEquals(true) + checkIfSendAndCloseClosing(channel).parReplicateA_(if (isJVM) 10000 else 1) } - test("racing send and sendAndClose should work in bounded(1) case") { - val result = Channel.bounded[IO, Int](1).flatMap { ch => - ch.send(0).both(ch.sendAndClose(1)).parProduct(ch.stream.compile.toList) - } + test("sendAndClose closes right after sending the last element in unbounded case") { + val channel = Channel.unbounded[IO, Int] + + checkIfSendAndCloseClosing(channel).parReplicateA_(if (isJVM) 10000 else 1) + } + def racingSendOperations(channel: IO[Channel[IO, Int]]) = { val expectedFirstCase = ((Right(()), Right(())), List(0, 1)) val expectedSecondCase = ((Right(()), Left(Channel.Closed)), List(1)) + val expectedThirdCase = ((Left(Channel.Closed), Right(())), List(1)) - result.map { - case obtained @ ((Right(()), Right(())), List(0, 1)) => - assertEquals(obtained, expectedFirstCase) - case obtained @ ((Right(()), Left(Channel.Closed)), List(1)) => - assertEquals(obtained, expectedSecondCase) - case _ => fail("An unknown test result was received.") - } + channel + .flatMap { ch => + ch.send(0).both(ch.sendAndClose(1)).parProduct(ch.stream.compile.toList) + } + .map { + case obtained @ ((Right(()), Right(())), List(0, 1)) => + assertEquals(obtained, expectedFirstCase) + case obtained @ ((Right(()), Left(Channel.Closed)), List(1)) => + assertEquals(obtained, expectedSecondCase) + case obtained @ ((Left(Channel.Closed), Right(())), List(1)) => + assertEquals(obtained, expectedThirdCase) + case e => fail(s"An unknown test result: $e") + } } - test("racing send and sendAndClose should work in bounded(2) case") { - val result = Channel.bounded[IO, Int](2).flatMap { ch => - ch.send(0).both(ch.sendAndClose(1)).parProduct(ch.stream.compile.toList) - } + test("racing send and sendAndClose should work in bounded(1) case") { + val channel = Channel.bounded[IO, Int](1) - val expectedFirstCase = ((Right(()), Right(())), List(0, 1)) - val expectedSecondCase = ((Right(()), Left(Channel.Closed)), List(1)) + racingSendOperations(channel).parReplicateA_(if (isJVM) 10000 else 1) + } - result.map { - case obtained @ ((Right(()), Right(())), List(0, 1)) => - assertEquals(obtained, expectedFirstCase) - case obtained @ ((Right(()), Left(Channel.Closed)), List(1)) => - assertEquals(obtained, expectedSecondCase) - case _ => fail("An unknown test result was received.") - } + test("racing send and sendAndClose should work in bounded(2) case") { + val channel = Channel.bounded[IO, Int](2) + + racingSendOperations(channel).parReplicateA_(if (isJVM) 10000 else 1) } test("racing send and sendAndClose should work in unbounded case") { - val result = Channel.unbounded[IO, Int].flatMap { ch => - ch.send(0).both(ch.sendAndClose(1)).parProduct(ch.stream.compile.toList) - } + val channel = Channel.unbounded[IO, Int] - val expectedFirstCase = ((Right(()), Right(())), List(0, 1)) - val expectedSecondCase = ((Right(()), Left(Channel.Closed)), List(1)) - - result.map { - case obtained @ ((Right(()), Right(())), List(0, 1)) => - assertEquals(obtained, expectedFirstCase) - case obtained @ ((Right(()), Left(Channel.Closed)), List(1)) => - assertEquals(obtained, expectedSecondCase) - case _ => fail("An unknown test result was received.") - } + racingSendOperations(channel).parReplicateA_(if (isJVM) 10000 else 1) } test("racing send and sendAndClose should work in synchronous case") { - val result = Channel.synchronous[IO, Int].flatMap { ch => - ch.send(0).both(ch.sendAndClose(1)).parProduct(ch.stream.compile.toList) - } + val channel = Channel.synchronous[IO, Int] - val expectedFirstCase = ((Right(()), Right(())), List(0, 1)) - val expectedSecondCase = ((Right(()), Left(Channel.Closed)), List(1)) - - result.map { - case obtained @ ((Right(()), Right(())), List(0, 1)) => - assertEquals(obtained, expectedFirstCase) - case obtained @ ((Right(()), Left(Channel.Closed)), List(1)) => - assertEquals(obtained, expectedSecondCase) - case _ => fail("An unknown test result was received.") - } + racingSendOperations(channel).parReplicateA_(if (isJVM) 10000 else 1) } } From d2615238ea7422d4ba760ef2e1fd4219456848e5 Mon Sep 17 00:00:00 2001 From: tothpeti Date: Thu, 7 Sep 2023 21:17:33 +0200 Subject: [PATCH 10/16] Refactor --- core/shared/src/main/scala/fs2/concurrent/Channel.scala | 2 +- .../shared/src/test/scala/fs2/concurrent/ChannelSuite.scala | 6 ++++++ 2 files changed, 7 insertions(+), 1 deletion(-) diff --git a/core/shared/src/main/scala/fs2/concurrent/Channel.scala b/core/shared/src/main/scala/fs2/concurrent/Channel.scala index 8d48101fd6..2859e3313c 100644 --- a/core/shared/src/main/scala/fs2/concurrent/Channel.scala +++ b/core/shared/src/main/scala/fs2/concurrent/Channel.scala @@ -176,7 +176,7 @@ object Channel { State(values, size, None, (a, producer) :: producers, close), signalClosure.whenA(close) *> notifyStream(waiting).as(rightUnit) <* - waitOnBound(producer, poll) + waitOnBound(producer, poll).whenA(!close) ) } } diff --git a/core/shared/src/test/scala/fs2/concurrent/ChannelSuite.scala b/core/shared/src/test/scala/fs2/concurrent/ChannelSuite.scala index 74f7de3bd0..cce98f9f56 100644 --- a/core/shared/src/test/scala/fs2/concurrent/ChannelSuite.scala +++ b/core/shared/src/test/scala/fs2/concurrent/ChannelSuite.scala @@ -277,6 +277,12 @@ class ChannelSuite extends Fs2Suite { checkIfSendAndCloseClosing(channel).parReplicateA_(if (isJVM) 10000 else 1) } + test("sendAndClose closes right after sending the last element in synchronous case") { + val channel = Channel.synchronous[IO, Int] + + checkIfSendAndCloseClosing(channel).parReplicateA_(if (isJVM) 10000 else 1) + } + def racingSendOperations(channel: IO[Channel[IO, Int]]) = { val expectedFirstCase = ((Right(()), Right(())), List(0, 1)) val expectedSecondCase = ((Right(()), Left(Channel.Closed)), List(1)) From e9f901d67a5599df2f84b0b14843f5193bb3b97f Mon Sep 17 00:00:00 2001 From: tothpeti Date: Fri, 8 Sep 2023 10:04:19 +0200 Subject: [PATCH 11/16] Refactor --- .../src/main/scala/fs2/concurrent/Channel.scala | 2 +- .../src/test/scala/fs2/concurrent/ChannelSuite.scala | 12 ++++++++++-- 2 files changed, 11 insertions(+), 3 deletions(-) diff --git a/core/shared/src/main/scala/fs2/concurrent/Channel.scala b/core/shared/src/main/scala/fs2/concurrent/Channel.scala index 2859e3313c..8d48101fd6 100644 --- a/core/shared/src/main/scala/fs2/concurrent/Channel.scala +++ b/core/shared/src/main/scala/fs2/concurrent/Channel.scala @@ -176,7 +176,7 @@ object Channel { State(values, size, None, (a, producer) :: producers, close), signalClosure.whenA(close) *> notifyStream(waiting).as(rightUnit) <* - waitOnBound(producer, poll).whenA(!close) + waitOnBound(producer, poll) ) } } diff --git a/core/shared/src/test/scala/fs2/concurrent/ChannelSuite.scala b/core/shared/src/test/scala/fs2/concurrent/ChannelSuite.scala index cce98f9f56..a31da91329 100644 --- a/core/shared/src/test/scala/fs2/concurrent/ChannelSuite.scala +++ b/core/shared/src/test/scala/fs2/concurrent/ChannelSuite.scala @@ -278,9 +278,17 @@ class ChannelSuite extends Fs2Suite { } test("sendAndClose closes right after sending the last element in synchronous case") { - val channel = Channel.synchronous[IO, Int] + val test = Channel + .synchronous[IO, Int] + .flatMap { ch => + ch.stream + .concurrently(Stream.emit(0).covary[IO].evalMap(ch.sendAndClose)) + .compile + .drain *> ch.isClosed + } + .assertEquals(true) - checkIfSendAndCloseClosing(channel).parReplicateA_(if (isJVM) 10000 else 1) + test.parReplicateA_(if (isJVM) 10000 else 1) } def racingSendOperations(channel: IO[Channel[IO, Int]]) = { From 8af9ff8b29319a7624bbc309e8b7b36ca85c8111 Mon Sep 17 00:00:00 2001 From: tothpeti Date: Fri, 8 Sep 2023 18:35:15 +0200 Subject: [PATCH 12/16] Refactor --- .../scala/fs2/concurrent/ChannelSuite.scala | 37 ++++++++----------- 1 file changed, 15 insertions(+), 22 deletions(-) diff --git a/core/shared/src/test/scala/fs2/concurrent/ChannelSuite.scala b/core/shared/src/test/scala/fs2/concurrent/ChannelSuite.scala index a31da91329..43552ec83c 100644 --- a/core/shared/src/test/scala/fs2/concurrent/ChannelSuite.scala +++ b/core/shared/src/test/scala/fs2/concurrent/ChannelSuite.scala @@ -253,42 +253,34 @@ class ChannelSuite extends Fs2Suite { } def checkIfSendAndCloseClosing(channel: IO[Channel[IO, Int]]) = - channel - .flatMap { ch => - ch.sendAndClose(0) *> ch.isClosed - } - .assertEquals(true) + channel.flatMap { ch => + ch.sendAndClose(0) *> ch.isClosed.assert + } test("sendAndClose closes right after sending the last element in bounded(1) case") { val channel = Channel.bounded[IO, Int](1) - checkIfSendAndCloseClosing(channel).parReplicateA_(if (isJVM) 10000 else 1) + checkIfSendAndCloseClosing(channel) } test("sendAndClose closes right after sending the last element in bounded(2) case") { val channel = Channel.bounded[IO, Int](2) - checkIfSendAndCloseClosing(channel).parReplicateA_(if (isJVM) 10000 else 1) + checkIfSendAndCloseClosing(channel) } test("sendAndClose closes right after sending the last element in unbounded case") { val channel = Channel.unbounded[IO, Int] - checkIfSendAndCloseClosing(channel).parReplicateA_(if (isJVM) 10000 else 1) + checkIfSendAndCloseClosing(channel) } test("sendAndClose closes right after sending the last element in synchronous case") { - val test = Channel - .synchronous[IO, Int] - .flatMap { ch => - ch.stream - .concurrently(Stream.emit(0).covary[IO].evalMap(ch.sendAndClose)) - .compile - .drain *> ch.isClosed + Channel.synchronous[IO, Int].flatMap { ch => + ch.stream.compile.drain.background.surround { + ch.sendAndClose(0) *> ch.isClosed.assert } - .assertEquals(true) - - test.parReplicateA_(if (isJVM) 10000 else 1) + } } def racingSendOperations(channel: IO[Channel[IO, Int]]) = { @@ -309,30 +301,31 @@ class ChannelSuite extends Fs2Suite { assertEquals(obtained, expectedThirdCase) case e => fail(s"An unknown test result: $e") } + .replicateA_(if (isJVM) 1000 else 1) } test("racing send and sendAndClose should work in bounded(1) case") { val channel = Channel.bounded[IO, Int](1) - racingSendOperations(channel).parReplicateA_(if (isJVM) 10000 else 1) + racingSendOperations(channel) } test("racing send and sendAndClose should work in bounded(2) case") { val channel = Channel.bounded[IO, Int](2) - racingSendOperations(channel).parReplicateA_(if (isJVM) 10000 else 1) + racingSendOperations(channel) } test("racing send and sendAndClose should work in unbounded case") { val channel = Channel.unbounded[IO, Int] - racingSendOperations(channel).parReplicateA_(if (isJVM) 10000 else 1) + racingSendOperations(channel) } test("racing send and sendAndClose should work in synchronous case") { val channel = Channel.synchronous[IO, Int] - racingSendOperations(channel).parReplicateA_(if (isJVM) 10000 else 1) + racingSendOperations(channel) } } From 207fbedc33eed7332150538fd8b762bafc349e43 Mon Sep 17 00:00:00 2001 From: tothpeti Date: Mon, 11 Sep 2023 17:36:18 +0200 Subject: [PATCH 13/16] Refactor --- .../main/scala/fs2/concurrent/Channel.scala | 6 ++-- .../scala/fs2/concurrent/ChannelSuite.scala | 36 +++++++++---------- 2 files changed, 20 insertions(+), 22 deletions(-) diff --git a/core/shared/src/main/scala/fs2/concurrent/Channel.scala b/core/shared/src/main/scala/fs2/concurrent/Channel.scala index 8d48101fd6..c586624db9 100644 --- a/core/shared/src/main/scala/fs2/concurrent/Channel.scala +++ b/core/shared/src/main/scala/fs2/concurrent/Channel.scala @@ -115,7 +115,7 @@ sealed trait Channel[F[_], A] { * * No-op if the channel is closed, see [[close]] for further info. */ - def sendAndClose(a: A): F[Either[Channel.Closed, Unit]] + def closeWithElement(a: A): F[Either[Channel.Closed, Unit]] /** Returns true if this channel is closed */ def isClosed: F[Boolean] @@ -176,7 +176,7 @@ object Channel { State(values, size, None, (a, producer) :: producers, close), signalClosure.whenA(close) *> notifyStream(waiting).as(rightUnit) <* - waitOnBound(producer, poll) + waitOnBound(producer, poll).whenA(!close) ) } } @@ -184,7 +184,7 @@ object Channel { def send(a: A) = sendImpl(a, false) - def sendAndClose(a: A) = sendImpl(a, true) + def closeWithElement(a: A) = sendImpl(a, true) def trySend(a: A) = state.flatModify { diff --git a/core/shared/src/test/scala/fs2/concurrent/ChannelSuite.scala b/core/shared/src/test/scala/fs2/concurrent/ChannelSuite.scala index 43552ec83c..17ddfb17ce 100644 --- a/core/shared/src/test/scala/fs2/concurrent/ChannelSuite.scala +++ b/core/shared/src/test/scala/fs2/concurrent/ChannelSuite.scala @@ -252,35 +252,33 @@ class ChannelSuite extends Fs2Suite { TestControl.executeEmbed(test) } - def checkIfSendAndCloseClosing(channel: IO[Channel[IO, Int]]) = + def checkIfCloseWithElementClosing(channel: IO[Channel[IO, Int]]) = channel.flatMap { ch => - ch.sendAndClose(0) *> ch.isClosed.assert + ch.closeWithElement(0) *> ch.isClosed.assert } - test("sendAndClose closes right after sending the last element in bounded(1) case") { + test("closeWithElement closes right after sending the last element in bounded(1) case") { val channel = Channel.bounded[IO, Int](1) - checkIfSendAndCloseClosing(channel) + checkIfCloseWithElementClosing(channel) } - test("sendAndClose closes right after sending the last element in bounded(2) case") { + test("closeWithElement closes right after sending the last element in bounded(2) case") { val channel = Channel.bounded[IO, Int](2) - checkIfSendAndCloseClosing(channel) + checkIfCloseWithElementClosing(channel) } - test("sendAndClose closes right after sending the last element in unbounded case") { + test("closeWithElement closes right after sending the last element in unbounded case") { val channel = Channel.unbounded[IO, Int] - checkIfSendAndCloseClosing(channel) + checkIfCloseWithElementClosing(channel) } - test("sendAndClose closes right after sending the last element in synchronous case") { - Channel.synchronous[IO, Int].flatMap { ch => - ch.stream.compile.drain.background.surround { - ch.sendAndClose(0) *> ch.isClosed.assert - } - } + test("closeWithElement closes right after sending the last element in synchronous case") { + val channel = Channel.synchronous[IO, Int] + + checkIfCloseWithElementClosing(channel) } def racingSendOperations(channel: IO[Channel[IO, Int]]) = { @@ -290,7 +288,7 @@ class ChannelSuite extends Fs2Suite { channel .flatMap { ch => - ch.send(0).both(ch.sendAndClose(1)).parProduct(ch.stream.compile.toList) + ch.send(0).both(ch.closeWithElement(1)).parProduct(ch.stream.compile.toList) } .map { case obtained @ ((Right(()), Right(())), List(0, 1)) => @@ -304,25 +302,25 @@ class ChannelSuite extends Fs2Suite { .replicateA_(if (isJVM) 1000 else 1) } - test("racing send and sendAndClose should work in bounded(1) case") { + test("racing send and closeWithElement should work in bounded(1) case") { val channel = Channel.bounded[IO, Int](1) racingSendOperations(channel) } - test("racing send and sendAndClose should work in bounded(2) case") { + test("racing send and closeWithElement should work in bounded(2) case") { val channel = Channel.bounded[IO, Int](2) racingSendOperations(channel) } - test("racing send and sendAndClose should work in unbounded case") { + test("racing send and closeWithElement should work in unbounded case") { val channel = Channel.unbounded[IO, Int] racingSendOperations(channel) } - test("racing send and sendAndClose should work in synchronous case") { + test("racing send and closeWithElement should work in synchronous case") { val channel = Channel.synchronous[IO, Int] racingSendOperations(channel) From 2d586e5f2932520fa4086cbac88e48f159c01bdc Mon Sep 17 00:00:00 2001 From: tothpeti Date: Mon, 11 Sep 2023 17:44:46 +0200 Subject: [PATCH 14/16] Refactor --- core/shared/src/main/scala/fs2/concurrent/Channel.scala | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/core/shared/src/main/scala/fs2/concurrent/Channel.scala b/core/shared/src/main/scala/fs2/concurrent/Channel.scala index c586624db9..df2f17b54a 100644 --- a/core/shared/src/main/scala/fs2/concurrent/Channel.scala +++ b/core/shared/src/main/scala/fs2/concurrent/Channel.scala @@ -110,8 +110,7 @@ sealed trait Channel[F[_], A] { */ def close: F[Either[Channel.Closed, Unit]] - /** Sends an element through this channel, and closes it right after. - * This operation is atomic so it is guaranteed that this will be the last element sent to the channel. + /** Gracefully closes this channel with a final element. This method will never block. * * No-op if the channel is closed, see [[close]] for further info. */ @@ -176,7 +175,7 @@ object Channel { State(values, size, None, (a, producer) :: producers, close), signalClosure.whenA(close) *> notifyStream(waiting).as(rightUnit) <* - waitOnBound(producer, poll).whenA(!close) + waitOnBound(producer, poll).unlessA(close) ) } } From eaeb13b6428168c9ec36ddd163bc114592371adc Mon Sep 17 00:00:00 2001 From: tothpeti Date: Mon, 11 Sep 2023 18:18:01 +0200 Subject: [PATCH 15/16] Fix ProblemFilter --- build.sbt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/build.sbt b/build.sbt index 898c00ec44..522c3c90a0 100644 --- a/build.sbt +++ b/build.sbt @@ -221,7 +221,7 @@ ThisBuild / mimaBinaryIssueFilters ++= Seq( "fs2.Chunk.platformIterable" ), ProblemFilters.exclude[ReversedMissingMethodProblem]( - "fs2.concurrent.Channel.sendAndClose" + "fs2.concurrent.Channel.closeWithElement" ) ) From f5c159aaf7e8c1477c3e6bd4e769b133db890318 Mon Sep 17 00:00:00 2001 From: tothpeti Date: Mon, 11 Sep 2023 19:19:15 +0200 Subject: [PATCH 16/16] Refactor --- .../src/test/scala/fs2/concurrent/ChannelSuite.scala | 7 ++----- 1 file changed, 2 insertions(+), 5 deletions(-) diff --git a/core/shared/src/test/scala/fs2/concurrent/ChannelSuite.scala b/core/shared/src/test/scala/fs2/concurrent/ChannelSuite.scala index 17ddfb17ce..6e15fffa5e 100644 --- a/core/shared/src/test/scala/fs2/concurrent/ChannelSuite.scala +++ b/core/shared/src/test/scala/fs2/concurrent/ChannelSuite.scala @@ -283,8 +283,7 @@ class ChannelSuite extends Fs2Suite { def racingSendOperations(channel: IO[Channel[IO, Int]]) = { val expectedFirstCase = ((Right(()), Right(())), List(0, 1)) - val expectedSecondCase = ((Right(()), Left(Channel.Closed)), List(1)) - val expectedThirdCase = ((Left(Channel.Closed), Right(())), List(1)) + val expectedSecondCase = ((Left(Channel.Closed), Right(())), List(1)) channel .flatMap { ch => @@ -293,10 +292,8 @@ class ChannelSuite extends Fs2Suite { .map { case obtained @ ((Right(()), Right(())), List(0, 1)) => assertEquals(obtained, expectedFirstCase) - case obtained @ ((Right(()), Left(Channel.Closed)), List(1)) => - assertEquals(obtained, expectedSecondCase) case obtained @ ((Left(Channel.Closed), Right(())), List(1)) => - assertEquals(obtained, expectedThirdCase) + assertEquals(obtained, expectedSecondCase) case e => fail(s"An unknown test result: $e") } .replicateA_(if (isJVM) 1000 else 1)