diff --git a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/SourceWithContext.scala b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/SourceWithContext.scala index 48f6a286ac..70ff3e38b2 100644 --- a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/SourceWithContext.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/SourceWithContext.scala @@ -57,36 +57,44 @@ object SourceWithContext { SourceWithContext.fromTuples(Source.fromGraph(GraphDSL.createGraph(source, viaFlow)(combine) { implicit b => (s, viaF) => import GraphDSL.Implicits._ - val broadcast = b.add(Broadcast[(Option[SOut], Ctx)](2)) - val merge = b.add(Merge[(Option[FOut], Ctx)](2)) - val unzip = b.add(Unzip[SOut, Ctx]()) - val zipper = b.add(Zip[FOut, Ctx]()) - - val filterAvailable = Flow[(Option[SOut], Ctx)].collect { - case (Some(f), ctx) => (f, ctx) - } - - val filterUnavailable = Flow[(Option[SOut], Ctx)].collect { - case (None, ctx) => (Option.empty[FOut], ctx) + case class IndexedCtx(idx: Long, ctx: Ctx) + val partition = b.add(Partition[(Option[SOut], IndexedCtx)](2, + { + case (None, _) => 0 + case (Some(_), _) => 1 + })) + + val sequence = Flow[(Option[SOut], Ctx)].zipWithIndex + .map { + case ((opt, ctx), idx) => (opt, IndexedCtx(idx, ctx)) + } + + val unzip = b.add(Unzip[Option[SOut], IndexedCtx]()) + val zipper = b.add(Zip[FOut, IndexedCtx]()) + val mergeSequence = b.add(MergeSequence[(Option[FOut], IndexedCtx)](2)(_._2.idx)) + val unwrapSome = b.add(Flow[Option[SOut]].map { + case Some(elem) => elem + case _ => throw new IllegalStateException("Only expects Some") + }) + val unwrap = Flow[(Option[FOut], IndexedCtx)].map { + case (opt, indexedCtx) => (opt, indexedCtx.ctx) } - val mapIntoOption = Flow[(FOut, Ctx)].map { - case (f, ctx) => (Some(f), ctx) + val mapIntoOption = Flow[(FOut, IndexedCtx)].map { + case (elem, indexedCtx) => (Some(elem), indexedCtx) } - s ~> broadcast.in - - broadcast.out(0) ~> filterAvailable ~> unzip.in - - unzip.out0 ~> viaF ~> zipper.in0 - unzip.out1 ~> zipper.in1 - - zipper.out ~> mapIntoOption ~> merge.in(0) - - broadcast.out(1) ~> filterUnavailable ~> merge.in(1) + //format: off + s ~> sequence ~> partition.in + partition.out(0).asInstanceOf[Outlet[(Option[FOut], IndexedCtx)]] ~> mergeSequence.in(0) + partition.out(1) ~> unzip.in + unzip.out0 ~> unwrapSome ~> viaF ~> zipper.in0 + unzip.out1 ~> zipper.in1 + zipper.out ~> mapIntoOption ~> mergeSequence.in(1) - SourceShape(merge.out) + //format: on + SourceShape((mergeSequence.out ~> unwrap).outlet) })) }