diff --git a/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/FlowTest.java b/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/FlowTest.java index f2e92c8863..7ca32b3abf 100644 --- a/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/FlowTest.java +++ b/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/FlowTest.java @@ -1663,4 +1663,20 @@ public void mustBeAbleToConvertToJavaInJava() { org.apache.pekko.stream.scaladsl.Flow.apply(); Flow javaFlow = scalaFlow.asJava(); } + + @Test + public void zipWithIndex() { + final List input = Arrays.asList(1, 2, 3); + final List> expected = + Arrays.asList(new Pair<>(1, 0L), new Pair<>(2, 1L), new Pair<>(3, 2L)); + + final List> result = + Source.from(input) + .via(Flow.of(Integer.class).zipWithIndex()) + .runWith(Sink.seq(), system) + .toCompletableFuture() + .join(); + + assertEquals(expected, result); + } } diff --git a/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/SourceTest.java b/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/SourceTest.java index 8464688043..4fd1735e8f 100644 --- a/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/SourceTest.java +++ b/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/SourceTest.java @@ -1504,4 +1504,12 @@ public void flattenOptionalOptional() throws Exception { .get(3, TimeUnit.SECONDS); Assert.assertEquals(Arrays.asList(2, 4, 6, 8, 10), resultList); } + + @Test + public void zipWithIndex() { + final List> resultList = + Source.range(1, 3).zipWithIndex().runWith(Sink.seq(), system).toCompletableFuture().join(); + assertEquals( + Arrays.asList(Pair.create(1, 0L), Pair.create(2, 1L), Pair.create(3, 2L)), resultList); + } } diff --git a/stream/src/main/scala/org/apache/pekko/stream/impl/JavaStreamSource.scala b/stream/src/main/scala/org/apache/pekko/stream/impl/JavaStreamSource.scala index 74bba55d0a..9ce904425a 100644 --- a/stream/src/main/scala/org/apache/pekko/stream/impl/JavaStreamSource.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/impl/JavaStreamSource.scala @@ -13,14 +13,14 @@ package org.apache.pekko.stream.impl +import java.util.Spliterator +import java.util.function.Consumer + import org.apache.pekko import pekko.annotation.InternalApi import pekko.stream._ import pekko.stream.stage.{ GraphStage, GraphStageLogic, OutHandler } -import java.util.Spliterator -import java.util.function.Consumer - /** INTERNAL API */ @InternalApi private[stream] final class JavaStreamSource[T, S <: java.util.stream.BaseStream[T, S]]( open: () => java.util.stream.BaseStream[T, S]) diff --git a/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/Ops.scala b/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/Ops.scala index 1b23fc96c9..504a264432 100644 --- a/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/Ops.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/Ops.scala @@ -26,6 +26,7 @@ import scala.util.control.{ NoStackTrace, NonFatal } import scala.util.control.Exception.Catcher import org.apache.pekko +import org.apache.pekko.japi.Pair import pekko.actor.{ ActorRef, Terminated } import pekko.annotation.InternalApi import pekko.event._ @@ -77,6 +78,48 @@ import pekko.util.ccompat._ } } +/** + * INTERNAL API + */ +@InternalApi private[pekko] object ZipWithIndex extends GraphStage[FlowShape[Any, (Any, Long)]] { + val in = Inlet[Any]("ZipWithIndex.in") + val out = Outlet[(Any, Long)]("ZipWithIndex.out") + override val shape = FlowShape(in, out) + override def initialAttributes: Attributes = DefaultAttributes.zipWithIndex + override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = + new GraphStageLogic(shape) with InHandler with OutHandler { + private var index = 0L + override def onPush(): Unit = { + push(out, (grab(in), index)) + index += 1 + } + + override def onPull(): Unit = pull(in) + setHandlers(in, out, this) + } +} + +/** + * INTERNAL API + */ +@InternalApi private[pekko] object ZipWithIndexJava extends GraphStage[FlowShape[Any, Pair[Any, Long]]] { + val in = Inlet[Any]("ZipWithIndex.in") + val out = Outlet[Pair[Any, Long]]("ZipWithIndex.out") + override val shape = FlowShape(in, out) + override def initialAttributes: Attributes = DefaultAttributes.zipWithIndex + override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = + new GraphStageLogic(shape) with InHandler with OutHandler { + private var index = 0L + override def onPush(): Unit = { + push(out, new Pair(grab(in), index)) + index += 1 + } + + override def onPull(): Unit = pull(in) + setHandlers(in, out, this) + } +} + /** * INTERNAL API */ diff --git a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Flow.scala b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Flow.scala index 91b0385e49..c2f8cf581e 100755 --- a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Flow.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Flow.scala @@ -26,6 +26,7 @@ import scala.concurrent.duration.FiniteDuration import scala.reflect.ClassTag import org.apache.pekko +import org.apache.pekko.stream.impl.fusing.ZipWithIndexJava import pekko.Done import pekko.NotUsed import pekko.actor.ActorRef @@ -3691,7 +3692,7 @@ final class Flow[In, Out, Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends Gr * '''Cancels when''' downstream cancels */ def zipWithIndex: Flow[In, Pair[Out, java.lang.Long], Mat] = - new Flow(delegate.zipWithIndex.map { case (elem, index) => Pair[Out, java.lang.Long](elem, index) }) + via(ZipWithIndexJava.asInstanceOf[Graph[FlowShape[Out, Pair[Out, java.lang.Long]], NotUsed]]) /** * If the first element has not passed through this operator before the provided timeout, the stream is failed diff --git a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala index bcd289b785..563ac47274 100755 --- a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala @@ -35,7 +35,7 @@ import pekko.japi.{ function, JavaPartialFunction, Pair } import pekko.japi.function.Creator import pekko.stream._ import pekko.stream.impl.{ LinearTraversalBuilder, UnfoldAsyncJava, UnfoldJava } -import pekko.stream.impl.fusing.ArraySource +import pekko.stream.impl.fusing.{ ArraySource, ZipWithIndexJava } import pekko.util.{ unused, _ } import pekko.util.FutureConverters._ import pekko.util.JavaDurationConverters._ @@ -2173,7 +2173,8 @@ final class Source[Out, Mat](delegate: scaladsl.Source[Out, Mat]) extends Graph[ * '''Cancels when''' downstream cancels */ def zipWithIndex: javadsl.Source[Pair[Out @uncheckedVariance, java.lang.Long], Mat] = - new Source(delegate.zipWithIndex.map { case (elem, index) => Pair[Out, java.lang.Long](elem, index) }) + new Source(delegate.via( + ZipWithIndexJava.asInstanceOf[Graph[FlowShape[Out, Pair[Out, java.lang.Long]], NotUsed]])) /** * Shortcut for running this `Source` with a foreach procedure. The given procedure is invoked diff --git a/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubFlow.scala b/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubFlow.scala index 20f73c4f01..8e96be733f 100755 --- a/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubFlow.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubFlow.scala @@ -29,6 +29,7 @@ import pekko.annotation.ApiMayChange import pekko.event.{ LogMarker, LoggingAdapter, MarkerLoggingAdapter } import pekko.japi.{ function, Pair } import pekko.stream._ +import pekko.stream.impl.fusing.ZipWithIndexJava import pekko.util.ConstantFun import pekko.util.FutureConverters._ import pekko.util.JavaDurationConverters._ @@ -2272,7 +2273,8 @@ class SubFlow[In, Out, Mat]( * '''Cancels when''' downstream cancels */ def zipWithIndex: SubFlow[In, pekko.japi.Pair[Out @uncheckedVariance, java.lang.Long], Mat] = - new SubFlow(delegate.zipWithIndex.map { case (elem, index) => pekko.japi.Pair[Out, java.lang.Long](elem, index) }) + new SubFlow(delegate.via( + ZipWithIndexJava.asInstanceOf[Graph[FlowShape[Out, Pair[Out, java.lang.Long]], NotUsed]])) /** * If the first element has not passed through this operator before the provided timeout, the stream is failed diff --git a/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubSource.scala b/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubSource.scala index 9ac6f9c714..b9ad87a782 100755 --- a/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubSource.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubSource.scala @@ -29,6 +29,7 @@ import pekko.annotation.ApiMayChange import pekko.event.{ LogMarker, LoggingAdapter, MarkerLoggingAdapter } import pekko.japi.{ function, Pair } import pekko.stream._ +import pekko.stream.impl.fusing.ZipWithIndexJava import pekko.util.ConstantFun import pekko.util.FutureConverters._ import pekko.util.JavaDurationConverters._ @@ -2246,8 +2247,9 @@ class SubSource[Out, Mat]( * * '''Cancels when''' downstream cancels */ - def zipWithIndex: javadsl.SubSource[pekko.japi.Pair[Out @uncheckedVariance, Long], Mat] = - new SubSource(delegate.zipWithIndex.map { case (elem, index) => pekko.japi.Pair(elem, index) }) + def zipWithIndex: javadsl.SubSource[pekko.japi.Pair[Out @uncheckedVariance, java.lang.Long], Mat] = + new SubSource(delegate.via( + ZipWithIndexJava.asInstanceOf[Graph[FlowShape[Out, Pair[Out, java.lang.Long]], NotUsed]])) /** * If the first element has not passed through this operator before the provided timeout, the stream is failed diff --git a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Flow.scala b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Flow.scala index 9c66727d51..e2db2e8daf 100755 --- a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Flow.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Flow.scala @@ -3304,16 +3304,7 @@ trait FlowOps[+Out, +Mat] { * * '''Cancels when''' downstream cancels */ - def zipWithIndex: Repr[(Out, Long)] = { - statefulMapConcat[(Out, Long)] { () => - var index: Long = 0L - elem => { - val zipped = (elem, index) - index += 1 - immutable.Iterable[(Out, Long)](zipped) - } - } - } + def zipWithIndex: Repr[(Out, Long)] = via(ZipWithIndex.asInstanceOf[Graph[FlowShape[Out, (Out, Long)], NotUsed]]) /** * Interleave is a deterministic merge of the given [[Source]] with elements of this [[Flow]]. diff --git a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/FlowWithContext.scala b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/FlowWithContext.scala index 88e9e392c6..91a72fc90a 100644 --- a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/FlowWithContext.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/FlowWithContext.scala @@ -16,8 +16,8 @@ package org.apache.pekko.stream.scaladsl import scala.annotation.unchecked.uncheckedVariance import org.apache.pekko -import pekko.annotation.ApiMayChange import pekko.NotUsed +import pekko.annotation.ApiMayChange import pekko.japi.Pair import pekko.stream._