diff --git a/runtime/src/main/scala/org/apache/pekko/grpc/internal/CancellationBarrierGraphStage.scala b/runtime/src/main/scala/org/apache/pekko/grpc/internal/CancellationBarrierGraphStage.scala index a6cc7282..d59a98b8 100644 --- a/runtime/src/main/scala/org/apache/pekko/grpc/internal/CancellationBarrierGraphStage.scala +++ b/runtime/src/main/scala/org/apache/pekko/grpc/internal/CancellationBarrierGraphStage.scala @@ -30,31 +30,23 @@ class CancellationBarrierGraphStage[T] extends GraphStage[FlowShape[T, T]] { override val shape: FlowShape[T, T] = FlowShape(in, out) override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = - new GraphStageLogic(shape) { - setHandler( - in, - new InHandler { - override def onPush(): Unit = emit(out, grab(in)) - }) - - setHandler( - out, - new OutHandler { - override def onPull(): Unit = pull(in) - - override def onDownstreamFinish(cause: Throwable): Unit = { - if (!hasBeenPulled(in)) + new GraphStageLogic(shape) with InHandler with OutHandler { + override def onPush(): Unit = emit(out, grab(in)) + override def onPull(): Unit = pull(in) + + override def onDownstreamFinish(cause: Throwable): Unit = { + if (!hasBeenPulled(in)) + pull(in) + + setHandler( + in, + new InHandler { + override def onPush(): Unit = { + grab(in) pull(in) - - setHandler( - in, - new InHandler { - override def onPush(): Unit = { - grab(in) - pull(in) - } - }) - } - }) + } + }) + } + setHandlers(in, out, this) } }