From bb2367c4b78c32e37cbb15a4624dc0f8a827401c Mon Sep 17 00:00:00 2001 From: Ian Streeter Date: Wed, 22 Nov 2023 19:11:41 +0000 Subject: [PATCH] BatchUp should be compatible with ListOfList (close #32) --- .../snowplow/runtime/processing/BatchUp.scala | 104 +++++++++++------- .../runtime/processing/BatchUpSpec.scala | 40 ++++--- project/BuildSettings.scala | 1 + project/Dependencies.scala | 2 + 4 files changed, 91 insertions(+), 56 deletions(-) diff --git a/modules/runtime-common/src/main/scala/com/snowplowanalytics/snowplow/runtime/processing/BatchUp.scala b/modules/runtime-common/src/main/scala/com/snowplowanalytics/snowplow/runtime/processing/BatchUp.scala index 78ad46f..9121203 100644 --- a/modules/runtime-common/src/main/scala/com/snowplowanalytics/snowplow/runtime/processing/BatchUp.scala +++ b/modules/runtime-common/src/main/scala/com/snowplowanalytics/snowplow/runtime/processing/BatchUp.scala @@ -7,7 +7,7 @@ */ package com.snowplowanalytics.snowplow.runtime.processing -import cats.{Foldable, Semigroup} +import cats.Foldable import cats.implicits._ import cats.effect.{Async, Sync} import fs2.{Chunk, Pipe, Pull, Stream} @@ -24,12 +24,14 @@ object BatchUp { * @note * typically "weight" means "number of bytes". But it could also be any other measurement. */ - trait Batchable[A] extends Semigroup[A] { + trait Batchable[A, B] { def weightOf(a: A): Long + def single(a: A): B + def combine(b: B, a: A): B } object Batchable { - def apply[A](implicit batchable: Batchable[A]): Batchable[A] = batchable + def apply[A, B](implicit batchable: Batchable[A, B]): Batchable[A, B] = batchable } /** @@ -49,27 +51,27 @@ object BatchUp { * @return * A FS2 Pipe which combines small `A`s to bigger `A`s. */ - def withTimeout[F[_]: Async, A: Batchable](maxWeight: Long, maxDelay: FiniteDuration): Pipe[F, A, A] = { + def withTimeout[F[_]: Async, A, B: Batchable[A, *]](maxWeight: Long, maxDelay: FiniteDuration): Pipe[F, A, B] = { - def go(timedPull: Pull.Timed[F, A], wasPending: Option[A]): Pull[F, A, Unit] = + def go(timedPull: Pull.Timed[F, A], wasPending: PendingBatch[B]): Pull[F, B, Unit] = timedPull.uncons.flatMap { case None => // Upstream finished cleanly. Emit whatever is pending and we're done. - Pull.outputOption1[F, A](wasPending) *> Pull.done + Pull.outputOption1[F, B](wasPending.value) *> Pull.done case Some((Left(_), next)) => // Timer timed-out. Emit whatever is pending. - Pull.outputOption1[F, A](wasPending) *> go(next, None) + Pull.outputOption1[F, B](wasPending.value) *> go(next, PendingBatch.empty) case Some((Right(chunk), next)) => // Upstream emitted something to us. We might already have a pending element. - val result = combineByWeight(maxWeight, Chunk.fromOption(wasPending) ++ chunk) - Pull.output[F, A](Chunk.from(result.toEmit)) *> + val result: CombineByWeightResult[B] = combineByWeight[A, B](maxWeight, wasPending, chunk) + Pull.output[F, B](Chunk.from(result.toEmit)) *> handleTimerReset(wasPending, result, next, maxDelay) *> - go(next, result.notAtSize) + go(next, PendingBatch(result.doNotEmitYet, result.pendingWeight)) } in => in.pull.timed { timedPull => - go(timedPull, None) + go(timedPull, PendingBatch.empty) }.stream } @@ -93,62 +95,80 @@ object BatchUp { * @return * A FS2 Pipe which combines small `A`s to bigger `A`s. */ - def noTimeout[F[_]: Sync, A: Batchable](maxWeight: Long): Pipe[F, A, A] = { - def go(stream: Stream[F, A], unflushed: Option[A]): Pull[F, A, Unit] = + def noTimeout[F[_]: Sync, A, B: Batchable[A, *]](maxWeight: Long): Pipe[F, A, B] = { + def go(stream: Stream[F, A], unflushed: PendingBatch[B]): Pull[F, B, Unit] = stream.pull.uncons.flatMap { case None => // Upstream finished cleanly. Emit whatever is pending and we're done. - Pull.outputOption1[F, A](unflushed) *> Pull.done + Pull.outputOption1[F, B](unflushed.value) *> Pull.done case Some((chunk, next)) => - val CombineByWeightResult(notAtSize, toEmit) = combineByWeight(maxWeight, Chunk.fromOption(unflushed) ++ chunk) - Pull.output[F, A](Chunk.from(toEmit)) *> - go(next, notAtSize) + val CombineByWeightResult(doNotEmitYet, pendingWeight, toEmit) = combineByWeight(maxWeight, unflushed, chunk) + Pull.output[F, B](Chunk.from(toEmit)) *> + go(next, PendingBatch(doNotEmitYet, pendingWeight)) } - in => go(in, None).stream + in => go(in, PendingBatch.empty).stream + } + + private case class PendingBatch[B](value: Option[B], weight: Long) + + private object PendingBatch { + def empty[B]: PendingBatch[B] = PendingBatch(None, 0L) } /** * The result of combining a chunk of `A`s, while not exceeding total weight. * * @param notAtSize - * Optionally an `A` that does not yet exceed the maximum allowed size. We should not emit this - * `A` but instead wait in case we can combine it with other `A`s later. + * Optionally a batch `B` that does not yet exceed the maximum allowed size. We should not emit + * this `B` but instead wait in case we can combine it with other `A`s later. * @param toEmit - * The combined `A`s which meet size requirements. These should be emitted downstream because we + * The combined `B`s which meet size requirements. These should be emitted downstream because we * cannot combine them with anything more. */ - private case class CombineByWeightResult[A](notAtSize: Option[A], toEmit: Vector[A]) + private case class CombineByWeightResult[B]( + doNotEmitYet: Option[B], + pendingWeight: Long, + toEmit: Vector[B] + ) /** * Combine a chunk of `A`s, while not exceeding the max allowed weight * * @param maxWeight * the maximum allowed weight (e.g. max allowed number of bytes) + * @param notAtSize + * the maximum allowed weight (e.g. max allowed number of bytes) * @param chunk * the `A`s we need to combine into larger `A`s. * @return * The result of combining `A`s */ - private def combineByWeight[A: Batchable](maxWeight: Long, chunk: Chunk[A]): CombineByWeightResult[A] = - Foldable[Chunk].foldLeft(chunk, CombineByWeightResult[A](None, Vector.empty)) { - case (CombineByWeightResult(None, toEmit), next) => - if (Batchable[A].weightOf(next) >= maxWeight) - CombineByWeightResult(None, toEmit :+ next) + private def combineByWeight[A, B]( + maxWeight: Long, + notAtSize: PendingBatch[B], + chunk: Chunk[A] + )(implicit B: Batchable[A, B] + ): CombineByWeightResult[B] = + Foldable[Chunk].foldLeft(chunk, CombineByWeightResult[B](notAtSize.value, notAtSize.weight, Vector.empty)) { + case (CombineByWeightResult(None, _, toEmit), next) => + val nextWeight = B.weightOf(next) + if (nextWeight >= maxWeight) + CombineByWeightResult(None, 0L, toEmit :+ B.single(next)) else - CombineByWeightResult(Some(next), toEmit) - case (CombineByWeightResult(Some(notAtSize), toEmit), next) => - val nextWeight = Batchable[A].weightOf(next) + CombineByWeightResult(Some(B.single(next)), nextWeight, toEmit) + case (CombineByWeightResult(Some(notAtSize), notAtSizeWeight, toEmit), next) => + val nextWeight = B.weightOf(next) if (nextWeight >= maxWeight) - CombineByWeightResult(None, toEmit :+ notAtSize :+ next) + CombineByWeightResult(None, 0L, toEmit :+ notAtSize :+ B.single(next)) else { - val notAtSizeWeight = Batchable[A].weightOf(notAtSize) - if (nextWeight + notAtSizeWeight > maxWeight) - CombineByWeightResult(Some(next), toEmit :+ notAtSize) - else if (nextWeight + notAtSizeWeight === maxWeight) - CombineByWeightResult(None, toEmit :+ (notAtSize |+| next)) + val combinedWeight = nextWeight + notAtSizeWeight + if (combinedWeight > maxWeight) + CombineByWeightResult(Some(B.single(next)), nextWeight, toEmit :+ notAtSize) + else if (combinedWeight === maxWeight) + CombineByWeightResult(None, 0L, toEmit :+ B.combine(notAtSize, next)) else - CombineByWeightResult(Some(notAtSize |+| next), toEmit) + CombineByWeightResult(Some(B.combine(notAtSize, next)), combinedWeight, toEmit) } } @@ -166,16 +186,16 @@ object BatchUp { * @param maxDelay * value to use for a new timeout, if needed */ - private def handleTimerReset[F[_], A]( - wasPending: Option[A], - result: CombineByWeightResult[A], + private def handleTimerReset[F[_], A, B]( + wasPending: PendingBatch[B], + result: CombineByWeightResult[B], timedPull: Pull.Timed[F, A], maxDelay: FiniteDuration ): Pull[F, Nothing, Unit] = - if (result.notAtSize.isEmpty) { + if (result.doNotEmitYet.isEmpty) { // We're emitting everything so cancel any existing timeout timedPull.timeout(Duration.Zero) - } else if (result.toEmit.nonEmpty || wasPending.isEmpty) { + } else if (result.toEmit.nonEmpty || wasPending.value.isEmpty) { // There is no existing timeout on the pending element, so start a new timeout timedPull.timeout(maxDelay) } else { diff --git a/modules/runtime-common/src/test/scala/com/snowplowanalytics/snowplow/runtime/processing/BatchUpSpec.scala b/modules/runtime-common/src/test/scala/com/snowplowanalytics/snowplow/runtime/processing/BatchUpSpec.scala index a47b416..c1cbebe 100644 --- a/modules/runtime-common/src/test/scala/com/snowplowanalytics/snowplow/runtime/processing/BatchUpSpec.scala +++ b/modules/runtime-common/src/test/scala/com/snowplowanalytics/snowplow/runtime/processing/BatchUpSpec.scala @@ -21,8 +21,9 @@ class BatchUpSpec extends Specification with CatsEffect { def is = s2""" The BatchUp pipe without timeout should: Combine strings, respecting max size $noTimeout1 - Combine strings, respecting max size, for Stream comprising random-sized Chunks $noTimeout2 - Wait until stream ends before emitting an under-sized element $noTimeout3 + Combine strings, respecting max size for elements of different sizes $noTimeout2 + Combine strings, respecting max size, for Stream comprising random-sized Chunks $noTimeout3 + Wait until stream ends before emitting an under-sized element $noTimeout4 The BatchUp pipe with timeout should: Combine strings, respecting max size $timeout1 Combine strings, respecting max size, for Stream comprising random-sized Chunks $timeout2 @@ -33,7 +34,7 @@ class BatchUpSpec extends Specification with CatsEffect { def noTimeout1 = { val input = Stream("a", "b", "c", "d", "e", "f", "g", "h") - val pipe = BatchUp.noTimeout[IO, String](3) + val pipe = BatchUp.noTimeout[IO, String, String](3) val expected = List("abc", "def", "gh") for { @@ -42,8 +43,18 @@ class BatchUpSpec extends Specification with CatsEffect { } def noTimeout2 = { + val input = Stream("a", "bb", "ccc", "dd", "eeeee", "f", "gggggggggg") + val pipe = BatchUp.noTimeout[IO, String, String](5) + val expected = List("abb", "cccdd", "eeeee", "f", "gggggggggg") + + for { + result <- input.through(pipe).compile.toList + } yield result must beEqualTo(expected) + } + + def noTimeout3 = { val input = Stream("a", "b", "c", "d", "e", "f", "g", "h").rechunkRandomly(0.0, 2.0) - val pipe = BatchUp.noTimeout[IO, String](3) + val pipe = BatchUp.noTimeout[IO, String, String](3) val expected = List("abc", "def", "gh") for { @@ -51,9 +62,9 @@ class BatchUpSpec extends Specification with CatsEffect { } yield result must beEqualTo(expected) } - def noTimeout3 = { + def noTimeout4 = { val input = Stream("a", "b", "c", "d") ++ Stream.sleep_[IO](5.minutes) - val pipe = BatchUp.noTimeout[IO, String](3) + val pipe = BatchUp.noTimeout[IO, String, String](3) val test = input.through(pipe).evalMap { str => // emit the string + the time the string was emitted by the pipe under test @@ -71,7 +82,7 @@ class BatchUpSpec extends Specification with CatsEffect { def timeout1 = { val input = Stream("a", "b", "c", "d", "e", "f", "g", "h") - val pipe = BatchUp.withTimeout[IO, String](3, 1.second) + val pipe = BatchUp.withTimeout[IO, String, String](3, 1.second) val expected = List("abc", "def", "gh") for { @@ -81,7 +92,7 @@ class BatchUpSpec extends Specification with CatsEffect { def timeout2 = { val input = Stream("a", "b", "c", "d", "e", "f", "g", "h").rechunkRandomly(0.0, 2.0) - val pipe = BatchUp.withTimeout[IO, String](3, 1.second) + val pipe = BatchUp.withTimeout[IO, String, String](3, 1.second) val expected = List("abc", "def", "gh") for { @@ -91,7 +102,7 @@ class BatchUpSpec extends Specification with CatsEffect { def timeout3 = { val input = Stream("a", "b") ++ Stream.sleep_[IO](5.minutes) ++ Stream("c", "d", "e", "f") - val pipe = BatchUp.withTimeout[IO, String](3, 1.second) + val pipe = BatchUp.withTimeout[IO, String, String](3, 1.second) val test = input.through(pipe).evalMap { str => // emit the string + the time the string was emitted by the pipe under test @@ -109,7 +120,7 @@ class BatchUpSpec extends Specification with CatsEffect { def timeout4 = { val input = Stream("a", "b") ++ Stream.sleep_[IO](5.seconds) ++ Stream("c", "d") - val pipe = BatchUp.withTimeout[IO, String](3, 10.seconds) + val pipe = BatchUp.withTimeout[IO, String, String](3, 10.seconds) val test = input.through(pipe).evalMap { str => // emit the string + the time the string was emitted by the pipe under test @@ -127,7 +138,7 @@ class BatchUpSpec extends Specification with CatsEffect { def timeout5 = { val input = Stream("a", "b", "c", "d") ++ Stream.sleep_[IO](10.seconds) - val pipe = BatchUp.withTimeout[IO, String](3, 60.seconds) + val pipe = BatchUp.withTimeout[IO, String, String](3, 60.seconds) val test = input.through(pipe).evalMap { str => // emit the string + the time the string was emitted by the pipe under test @@ -147,9 +158,10 @@ class BatchUpSpec extends Specification with CatsEffect { object BatchUpSpec { - implicit val stringBatchable: BatchUp.Batchable[String] = new BatchUp.Batchable[String] { - def combine(x: String, y: String): String = s"$x$y" - def weightOf(x: String): Long = x.length.toLong + implicit val stringBatchable: BatchUp.Batchable[String, String] = new BatchUp.Batchable[String, String] { + def single(a: String): String = a + def combine(b: String, a: String): String = s"$b$a" + def weightOf(a: String): Long = a.length.toLong } } diff --git a/project/BuildSettings.scala b/project/BuildSettings.scala index 69a83f4..8f1cfc4 100644 --- a/project/BuildSettings.scala +++ b/project/BuildSettings.scala @@ -43,6 +43,7 @@ object BuildSettings { scalacOptions += "-Wconf:origin=scala.collection.compat.*:s", Test / fork := true, addCompilerPlugin(Dependencies.betterMonadicFor), + addCompilerPlugin(Dependencies.kindProjector), ThisBuild / autoAPIMappings := true, ThisBuild / dynverVTagPrefix := false, // Otherwise git tags required to have v-prefix ThisBuild / dynverSeparator := "-", // to be compatible with docker diff --git a/project/Dependencies.scala b/project/Dependencies.scala index f985a8b..19df29a 100644 --- a/project/Dependencies.scala +++ b/project/Dependencies.scala @@ -22,6 +22,7 @@ object Dependencies { val circeExtra = "0.14.3" val circeConfig = "0.10.0" val betterMonadicFor = "0.3.1" + val kindProjector = "0.13.2" val collectionCompat = "2.11.0" val refined = "0.11.0" @@ -64,6 +65,7 @@ object Dependencies { val circeGenericExtra = "io.circe" %% "circe-generic-extras" % V.circeExtra val circeLiteral = "io.circe" %% "circe-literal" % V.circe val betterMonadicFor = "com.olegpy" %% "better-monadic-for" % V.betterMonadicFor + val kindProjector = "org.typelevel" %% "kind-projector" % V.kindProjector cross CrossVersion.full val collectionCompat = "org.scala-lang.modules" %% "scala-collection-compat" % V.collectionCompat val refined = "eu.timepit" %% "refined" % V.refined