Skip to content

Commit

Permalink
Merge pull request #3280 from tothpeti/feature/add-channel-sendclose
Browse files Browse the repository at this point in the history
Add `Channel#closeWithElement`
  • Loading branch information
armanbilge authored Sep 12, 2023
2 parents e9e484b + f5c159a commit 8a4221e
Show file tree
Hide file tree
Showing 3 changed files with 92 additions and 5 deletions.
3 changes: 3 additions & 0 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -219,6 +219,9 @@ ThisBuild / mimaBinaryIssueFilters ++= Seq(
// Private internal method: #3274
ProblemFilters.exclude[DirectMissingMethodProblem](
"fs2.Chunk.platformIterable"
),
ProblemFilters.exclude[ReversedMissingMethodProblem](
"fs2.concurrent.Channel.closeWithElement"
)
)

Expand Down
22 changes: 17 additions & 5 deletions core/shared/src/main/scala/fs2/concurrent/Channel.scala
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,12 @@ sealed trait Channel[F[_], A] {
*/
def close: F[Either[Channel.Closed, Unit]]

/** Gracefully closes this channel with a final element. This method will never block.
*
* No-op if the channel is closed, see [[close]] for further info.
*/
def closeWithElement(a: A): F[Either[Channel.Closed, Unit]]

/** Returns true if this channel is closed */
def isClosed: F[Boolean]

Expand Down Expand Up @@ -151,7 +157,7 @@ object Channel {
.drain
}

def send(a: A) =
def sendImpl(a: A, close: Boolean) =
F.deferred[Unit].flatMap { producer =>
state.flatModifyFull { case (poll, state) =>
state match {
Expand All @@ -161,18 +167,24 @@ object Channel {
case State(values, size, waiting, producers, closed @ false) =>
if (size < capacity)
(
State(a :: values, size + 1, None, producers, false),
notifyStream(waiting).as(rightUnit)
State(a :: values, size + 1, None, producers, close),
signalClosure.whenA(close) *> notifyStream(waiting).as(rightUnit)
)
else
(
State(values, size, None, (a, producer) :: producers, false),
notifyStream(waiting).as(rightUnit) <* waitOnBound(producer, poll)
State(values, size, None, (a, producer) :: producers, close),
signalClosure.whenA(close) *>
notifyStream(waiting).as(rightUnit) <*
waitOnBound(producer, poll).unlessA(close)
)
}
}
}

def send(a: A) = sendImpl(a, false)

def closeWithElement(a: A) = sendImpl(a, true)

def trySend(a: A) =
state.flatModify {
case s @ State(_, _, _, _, closed @ true) =>
Expand Down
72 changes: 72 additions & 0 deletions core/shared/src/test/scala/fs2/concurrent/ChannelSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -251,4 +251,76 @@ class ChannelSuite extends Fs2Suite {

TestControl.executeEmbed(test)
}

def checkIfCloseWithElementClosing(channel: IO[Channel[IO, Int]]) =
channel.flatMap { ch =>
ch.closeWithElement(0) *> ch.isClosed.assert
}

test("closeWithElement closes right after sending the last element in bounded(1) case") {
val channel = Channel.bounded[IO, Int](1)

checkIfCloseWithElementClosing(channel)
}

test("closeWithElement closes right after sending the last element in bounded(2) case") {
val channel = Channel.bounded[IO, Int](2)

checkIfCloseWithElementClosing(channel)
}

test("closeWithElement closes right after sending the last element in unbounded case") {
val channel = Channel.unbounded[IO, Int]

checkIfCloseWithElementClosing(channel)
}

test("closeWithElement closes right after sending the last element in synchronous case") {
val channel = Channel.synchronous[IO, Int]

checkIfCloseWithElementClosing(channel)
}

def racingSendOperations(channel: IO[Channel[IO, Int]]) = {
val expectedFirstCase = ((Right(()), Right(())), List(0, 1))
val expectedSecondCase = ((Left(Channel.Closed), Right(())), List(1))

channel
.flatMap { ch =>
ch.send(0).both(ch.closeWithElement(1)).parProduct(ch.stream.compile.toList)
}
.map {
case obtained @ ((Right(()), Right(())), List(0, 1)) =>
assertEquals(obtained, expectedFirstCase)
case obtained @ ((Left(Channel.Closed), Right(())), List(1)) =>
assertEquals(obtained, expectedSecondCase)
case e => fail(s"An unknown test result: $e")
}
.replicateA_(if (isJVM) 1000 else 1)
}

test("racing send and closeWithElement should work in bounded(1) case") {
val channel = Channel.bounded[IO, Int](1)

racingSendOperations(channel)
}

test("racing send and closeWithElement should work in bounded(2) case") {
val channel = Channel.bounded[IO, Int](2)

racingSendOperations(channel)
}

test("racing send and closeWithElement should work in unbounded case") {
val channel = Channel.unbounded[IO, Int]

racingSendOperations(channel)
}

test("racing send and closeWithElement should work in synchronous case") {
val channel = Channel.synchronous[IO, Int]

racingSendOperations(channel)
}

}

0 comments on commit 8a4221e

Please sign in to comment.