Skip to content

Commit

Permalink
chore: fuse handlers in CancellationBarrierGraphStage (#414)
Browse files Browse the repository at this point in the history
  • Loading branch information
He-Pin authored Nov 28, 2024
1 parent c3544a8 commit 65d3683
Showing 1 changed file with 17 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,31 +30,23 @@ class CancellationBarrierGraphStage[T] extends GraphStage[FlowShape[T, T]] {
override val shape: FlowShape[T, T] = FlowShape(in, out)

override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
new GraphStageLogic(shape) {
setHandler(
in,
new InHandler {
override def onPush(): Unit = emit(out, grab(in))
})

setHandler(
out,
new OutHandler {
override def onPull(): Unit = pull(in)

override def onDownstreamFinish(cause: Throwable): Unit = {
if (!hasBeenPulled(in))
new GraphStageLogic(shape) with InHandler with OutHandler {
override def onPush(): Unit = emit(out, grab(in))
override def onPull(): Unit = pull(in)

override def onDownstreamFinish(cause: Throwable): Unit = {
if (!hasBeenPulled(in))
pull(in)

setHandler(
in,
new InHandler {
override def onPush(): Unit = {
grab(in)
pull(in)

setHandler(
in,
new InHandler {
override def onPush(): Unit = {
grab(in)
pull(in)
}
})
}
})
}
})
}
setHandlers(in, out, this)
}
}

0 comments on commit 65d3683

Please sign in to comment.