Skip to content

Commit

Permalink
chore: Avoid forwarding method on ArrayDequeue in stream module.
Browse files Browse the repository at this point in the history
  • Loading branch information
He-Pin committed Jan 5, 2025
1 parent 477fd39 commit 2a91f28
Show file tree
Hide file tree
Showing 3 changed files with 6 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ import pekko.stream.stage._
private val maxBuffer = inheritedAttributes.get[InputBuffer](InputBuffer(16, 16)).max
require(maxBuffer > 0, "Buffer size must be greater than 0")

private val buffer: util.Deque[In] = new util.ArrayDeque[In]()
private val buffer: util.ArrayDeque[In] = new util.ArrayDeque[In]()
private var acknowledgementReceived = false
private var completeReceived = false
private var completionSignalled = false
Expand Down Expand Up @@ -75,7 +75,7 @@ import pekko.stream.stage._
}

private def dequeueAndSend(): Unit = {
ref ! messageAdapter(self)(buffer.poll())
ref ! messageAdapter(self)(buffer.pollFirst())
}

private def finish(): Unit = {
Expand All @@ -85,7 +85,7 @@ import pekko.stream.stage._
}

def onPush(): Unit = {
buffer.offer(grab(in))
buffer.offerLast(grab(in))
if (acknowledgementReceived) {
dequeueAndSend()
acknowledgementReceived = false
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -524,7 +524,7 @@ private final case class SavedIslandData(
if (Debug) println(s"PUSH: $matValue => $matValueStack")

case Concat(first, next) =>
if (next ne EmptyTraversal) traversalStack.add(next)
if (next ne EmptyTraversal) traversalStack.addLast(next)
nextStep = first
case Pop =>
val popped = matValueStack.removeLast()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -796,7 +796,7 @@ import org.reactivestreams.Subscription
else if (currentLimit == 0) {
self ! Resume
} else {
shortCircuitBuffer.poll() match {
shortCircuitBuffer.pollFirst() match {
case b: BoundaryEvent => processEvent(b)
case Resume => finishShellRegistration()
case unexpected =>
Expand Down Expand Up @@ -845,7 +845,7 @@ import org.reactivestreams.Subscription
override def postStop(): Unit = {
if (shortCircuitBuffer ne null) {
while (!shortCircuitBuffer.isEmpty) {
shortCircuitBuffer.poll() match {
shortCircuitBuffer.pollFirst() match {
case b: BoundaryEvent =>
// signal to telemetry that this event won't be processed
b.cancel()
Expand Down

0 comments on commit 2a91f28

Please sign in to comment.