diff --git a/stream/src/main/scala/org/apache/pekko/stream/impl/ActorRefBackpressureSinkStage.scala b/stream/src/main/scala/org/apache/pekko/stream/impl/ActorRefBackpressureSinkStage.scala index c5224843c5..47f3676638 100644 --- a/stream/src/main/scala/org/apache/pekko/stream/impl/ActorRefBackpressureSinkStage.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/impl/ActorRefBackpressureSinkStage.scala @@ -45,7 +45,7 @@ import pekko.stream.stage._ private val maxBuffer = inheritedAttributes.get[InputBuffer](InputBuffer(16, 16)).max require(maxBuffer > 0, "Buffer size must be greater than 0") - private val buffer: util.Deque[In] = new util.ArrayDeque[In]() + private val buffer: util.ArrayDeque[In] = new util.ArrayDeque[In]() private var acknowledgementReceived = false private var completeReceived = false private var completionSignalled = false @@ -75,7 +75,7 @@ import pekko.stream.stage._ } private def dequeueAndSend(): Unit = { - ref ! messageAdapter(self)(buffer.poll()) + ref ! messageAdapter(self)(buffer.pollFirst()) } private def finish(): Unit = { @@ -85,7 +85,7 @@ import pekko.stream.stage._ } def onPush(): Unit = { - buffer.offer(grab(in)) + buffer.offerLast(grab(in)) if (acknowledgementReceived) { dequeueAndSend() acknowledgementReceived = false diff --git a/stream/src/main/scala/org/apache/pekko/stream/impl/PhasedFusingActorMaterializer.scala b/stream/src/main/scala/org/apache/pekko/stream/impl/PhasedFusingActorMaterializer.scala index c130aed607..fbff399df9 100644 --- a/stream/src/main/scala/org/apache/pekko/stream/impl/PhasedFusingActorMaterializer.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/impl/PhasedFusingActorMaterializer.scala @@ -524,7 +524,7 @@ private final case class SavedIslandData( if (Debug) println(s"PUSH: $matValue => $matValueStack") case Concat(first, next) => - if (next ne EmptyTraversal) traversalStack.add(next) + if (next ne EmptyTraversal) traversalStack.addLast(next) nextStep = first case Pop => val popped = matValueStack.removeLast() diff --git a/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/ActorGraphInterpreter.scala b/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/ActorGraphInterpreter.scala index ad5f3b325a..d4f6bced23 100644 --- a/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/ActorGraphInterpreter.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/ActorGraphInterpreter.scala @@ -796,7 +796,7 @@ import org.reactivestreams.Subscription else if (currentLimit == 0) { self ! Resume } else { - shortCircuitBuffer.poll() match { + shortCircuitBuffer.pollFirst() match { case b: BoundaryEvent => processEvent(b) case Resume => finishShellRegistration() case unexpected => @@ -845,7 +845,7 @@ import org.reactivestreams.Subscription override def postStop(): Unit = { if (shortCircuitBuffer ne null) { while (!shortCircuitBuffer.isEmpty) { - shortCircuitBuffer.poll() match { + shortCircuitBuffer.pollFirst() match { case b: BoundaryEvent => // signal to telemetry that this event won't be processed b.cancel()