Skip to content

Commit

Permalink
BatchUp should be compatible with ListOfList (close #32)
Browse files Browse the repository at this point in the history
  • Loading branch information
istreeter committed Nov 23, 2023
1 parent 0570c9f commit bb2367c
Show file tree
Hide file tree
Showing 4 changed files with 91 additions and 56 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
*/
package com.snowplowanalytics.snowplow.runtime.processing

import cats.{Foldable, Semigroup}
import cats.Foldable
import cats.implicits._
import cats.effect.{Async, Sync}
import fs2.{Chunk, Pipe, Pull, Stream}
Expand All @@ -24,12 +24,14 @@ object BatchUp {
* @note
* typically "weight" means "number of bytes". But it could also be any other measurement.
*/
trait Batchable[A] extends Semigroup[A] {
trait Batchable[A, B] {
def weightOf(a: A): Long
def single(a: A): B
def combine(b: B, a: A): B
}

object Batchable {
def apply[A](implicit batchable: Batchable[A]): Batchable[A] = batchable
def apply[A, B](implicit batchable: Batchable[A, B]): Batchable[A, B] = batchable
}

/**
Expand All @@ -49,27 +51,27 @@ object BatchUp {
* @return
* A FS2 Pipe which combines small `A`s to bigger `A`s.
*/
def withTimeout[F[_]: Async, A: Batchable](maxWeight: Long, maxDelay: FiniteDuration): Pipe[F, A, A] = {
def withTimeout[F[_]: Async, A, B: Batchable[A, *]](maxWeight: Long, maxDelay: FiniteDuration): Pipe[F, A, B] = {

def go(timedPull: Pull.Timed[F, A], wasPending: Option[A]): Pull[F, A, Unit] =
def go(timedPull: Pull.Timed[F, A], wasPending: PendingBatch[B]): Pull[F, B, Unit] =
timedPull.uncons.flatMap {
case None =>
// Upstream finished cleanly. Emit whatever is pending and we're done.
Pull.outputOption1[F, A](wasPending) *> Pull.done
Pull.outputOption1[F, B](wasPending.value) *> Pull.done
case Some((Left(_), next)) =>
// Timer timed-out. Emit whatever is pending.
Pull.outputOption1[F, A](wasPending) *> go(next, None)
Pull.outputOption1[F, B](wasPending.value) *> go(next, PendingBatch.empty)
case Some((Right(chunk), next)) =>
// Upstream emitted something to us. We might already have a pending element.
val result = combineByWeight(maxWeight, Chunk.fromOption(wasPending) ++ chunk)
Pull.output[F, A](Chunk.from(result.toEmit)) *>
val result: CombineByWeightResult[B] = combineByWeight[A, B](maxWeight, wasPending, chunk)
Pull.output[F, B](Chunk.from(result.toEmit)) *>
handleTimerReset(wasPending, result, next, maxDelay) *>
go(next, result.notAtSize)
go(next, PendingBatch(result.doNotEmitYet, result.pendingWeight))
}

in =>
in.pull.timed { timedPull =>
go(timedPull, None)
go(timedPull, PendingBatch.empty)
}.stream
}

Expand All @@ -93,62 +95,80 @@ object BatchUp {
* @return
* A FS2 Pipe which combines small `A`s to bigger `A`s.
*/
def noTimeout[F[_]: Sync, A: Batchable](maxWeight: Long): Pipe[F, A, A] = {
def go(stream: Stream[F, A], unflushed: Option[A]): Pull[F, A, Unit] =
def noTimeout[F[_]: Sync, A, B: Batchable[A, *]](maxWeight: Long): Pipe[F, A, B] = {
def go(stream: Stream[F, A], unflushed: PendingBatch[B]): Pull[F, B, Unit] =
stream.pull.uncons.flatMap {
case None =>
// Upstream finished cleanly. Emit whatever is pending and we're done.
Pull.outputOption1[F, A](unflushed) *> Pull.done
Pull.outputOption1[F, B](unflushed.value) *> Pull.done
case Some((chunk, next)) =>
val CombineByWeightResult(notAtSize, toEmit) = combineByWeight(maxWeight, Chunk.fromOption(unflushed) ++ chunk)
Pull.output[F, A](Chunk.from(toEmit)) *>
go(next, notAtSize)
val CombineByWeightResult(doNotEmitYet, pendingWeight, toEmit) = combineByWeight(maxWeight, unflushed, chunk)
Pull.output[F, B](Chunk.from(toEmit)) *>
go(next, PendingBatch(doNotEmitYet, pendingWeight))
}

in => go(in, None).stream
in => go(in, PendingBatch.empty).stream
}

private case class PendingBatch[B](value: Option[B], weight: Long)

private object PendingBatch {
def empty[B]: PendingBatch[B] = PendingBatch(None, 0L)
}

/**
* The result of combining a chunk of `A`s, while not exceeding total weight.
*
* @param notAtSize
* Optionally an `A` that does not yet exceed the maximum allowed size. We should not emit this
* `A` but instead wait in case we can combine it with other `A`s later.
* Optionally a batch `B` that does not yet exceed the maximum allowed size. We should not emit
* this `B` but instead wait in case we can combine it with other `A`s later.
* @param toEmit
* The combined `A`s which meet size requirements. These should be emitted downstream because we
* The combined `B`s which meet size requirements. These should be emitted downstream because we
* cannot combine them with anything more.
*/
private case class CombineByWeightResult[A](notAtSize: Option[A], toEmit: Vector[A])
private case class CombineByWeightResult[B](
doNotEmitYet: Option[B],
pendingWeight: Long,
toEmit: Vector[B]
)

/**
* Combine a chunk of `A`s, while not exceeding the max allowed weight
*
* @param maxWeight
* the maximum allowed weight (e.g. max allowed number of bytes)
* @param notAtSize
* the maximum allowed weight (e.g. max allowed number of bytes)
* @param chunk
* the `A`s we need to combine into larger `A`s.
* @return
* The result of combining `A`s
*/
private def combineByWeight[A: Batchable](maxWeight: Long, chunk: Chunk[A]): CombineByWeightResult[A] =
Foldable[Chunk].foldLeft(chunk, CombineByWeightResult[A](None, Vector.empty)) {
case (CombineByWeightResult(None, toEmit), next) =>
if (Batchable[A].weightOf(next) >= maxWeight)
CombineByWeightResult(None, toEmit :+ next)
private def combineByWeight[A, B](
maxWeight: Long,
notAtSize: PendingBatch[B],
chunk: Chunk[A]
)(implicit B: Batchable[A, B]
): CombineByWeightResult[B] =
Foldable[Chunk].foldLeft(chunk, CombineByWeightResult[B](notAtSize.value, notAtSize.weight, Vector.empty)) {
case (CombineByWeightResult(None, _, toEmit), next) =>
val nextWeight = B.weightOf(next)
if (nextWeight >= maxWeight)
CombineByWeightResult(None, 0L, toEmit :+ B.single(next))
else
CombineByWeightResult(Some(next), toEmit)
case (CombineByWeightResult(Some(notAtSize), toEmit), next) =>
val nextWeight = Batchable[A].weightOf(next)
CombineByWeightResult(Some(B.single(next)), nextWeight, toEmit)
case (CombineByWeightResult(Some(notAtSize), notAtSizeWeight, toEmit), next) =>
val nextWeight = B.weightOf(next)
if (nextWeight >= maxWeight)
CombineByWeightResult(None, toEmit :+ notAtSize :+ next)
CombineByWeightResult(None, 0L, toEmit :+ notAtSize :+ B.single(next))
else {
val notAtSizeWeight = Batchable[A].weightOf(notAtSize)
if (nextWeight + notAtSizeWeight > maxWeight)
CombineByWeightResult(Some(next), toEmit :+ notAtSize)
else if (nextWeight + notAtSizeWeight === maxWeight)
CombineByWeightResult(None, toEmit :+ (notAtSize |+| next))
val combinedWeight = nextWeight + notAtSizeWeight
if (combinedWeight > maxWeight)
CombineByWeightResult(Some(B.single(next)), nextWeight, toEmit :+ notAtSize)
else if (combinedWeight === maxWeight)
CombineByWeightResult(None, 0L, toEmit :+ B.combine(notAtSize, next))
else
CombineByWeightResult(Some(notAtSize |+| next), toEmit)
CombineByWeightResult(Some(B.combine(notAtSize, next)), combinedWeight, toEmit)
}
}

Expand All @@ -166,16 +186,16 @@ object BatchUp {
* @param maxDelay
* value to use for a new timeout, if needed
*/
private def handleTimerReset[F[_], A](
wasPending: Option[A],
result: CombineByWeightResult[A],
private def handleTimerReset[F[_], A, B](
wasPending: PendingBatch[B],
result: CombineByWeightResult[B],
timedPull: Pull.Timed[F, A],
maxDelay: FiniteDuration
): Pull[F, Nothing, Unit] =
if (result.notAtSize.isEmpty) {
if (result.doNotEmitYet.isEmpty) {
// We're emitting everything so cancel any existing timeout
timedPull.timeout(Duration.Zero)
} else if (result.toEmit.nonEmpty || wasPending.isEmpty) {
} else if (result.toEmit.nonEmpty || wasPending.value.isEmpty) {
// There is no existing timeout on the pending element, so start a new timeout
timedPull.timeout(maxDelay)
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,9 @@ class BatchUpSpec extends Specification with CatsEffect {
def is = s2"""
The BatchUp pipe without timeout should:
Combine strings, respecting max size $noTimeout1
Combine strings, respecting max size, for Stream comprising random-sized Chunks $noTimeout2
Wait until stream ends before emitting an under-sized element $noTimeout3
Combine strings, respecting max size for elements of different sizes $noTimeout2
Combine strings, respecting max size, for Stream comprising random-sized Chunks $noTimeout3
Wait until stream ends before emitting an under-sized element $noTimeout4
The BatchUp pipe with timeout should:
Combine strings, respecting max size $timeout1
Combine strings, respecting max size, for Stream comprising random-sized Chunks $timeout2
Expand All @@ -33,7 +34,7 @@ class BatchUpSpec extends Specification with CatsEffect {

def noTimeout1 = {
val input = Stream("a", "b", "c", "d", "e", "f", "g", "h")
val pipe = BatchUp.noTimeout[IO, String](3)
val pipe = BatchUp.noTimeout[IO, String, String](3)
val expected = List("abc", "def", "gh")

for {
Expand All @@ -42,18 +43,28 @@ class BatchUpSpec extends Specification with CatsEffect {
}

def noTimeout2 = {
val input = Stream("a", "bb", "ccc", "dd", "eeeee", "f", "gggggggggg")
val pipe = BatchUp.noTimeout[IO, String, String](5)
val expected = List("abb", "cccdd", "eeeee", "f", "gggggggggg")

for {
result <- input.through(pipe).compile.toList
} yield result must beEqualTo(expected)
}

def noTimeout3 = {
val input = Stream("a", "b", "c", "d", "e", "f", "g", "h").rechunkRandomly(0.0, 2.0)
val pipe = BatchUp.noTimeout[IO, String](3)
val pipe = BatchUp.noTimeout[IO, String, String](3)
val expected = List("abc", "def", "gh")

for {
result <- input.through(pipe).compile.toList
} yield result must beEqualTo(expected)
}

def noTimeout3 = {
def noTimeout4 = {
val input = Stream("a", "b", "c", "d") ++ Stream.sleep_[IO](5.minutes)
val pipe = BatchUp.noTimeout[IO, String](3)
val pipe = BatchUp.noTimeout[IO, String, String](3)

val test = input.through(pipe).evalMap { str =>
// emit the string + the time the string was emitted by the pipe under test
Expand All @@ -71,7 +82,7 @@ class BatchUpSpec extends Specification with CatsEffect {

def timeout1 = {
val input = Stream("a", "b", "c", "d", "e", "f", "g", "h")
val pipe = BatchUp.withTimeout[IO, String](3, 1.second)
val pipe = BatchUp.withTimeout[IO, String, String](3, 1.second)
val expected = List("abc", "def", "gh")

for {
Expand All @@ -81,7 +92,7 @@ class BatchUpSpec extends Specification with CatsEffect {

def timeout2 = {
val input = Stream("a", "b", "c", "d", "e", "f", "g", "h").rechunkRandomly(0.0, 2.0)
val pipe = BatchUp.withTimeout[IO, String](3, 1.second)
val pipe = BatchUp.withTimeout[IO, String, String](3, 1.second)
val expected = List("abc", "def", "gh")

for {
Expand All @@ -91,7 +102,7 @@ class BatchUpSpec extends Specification with CatsEffect {

def timeout3 = {
val input = Stream("a", "b") ++ Stream.sleep_[IO](5.minutes) ++ Stream("c", "d", "e", "f")
val pipe = BatchUp.withTimeout[IO, String](3, 1.second)
val pipe = BatchUp.withTimeout[IO, String, String](3, 1.second)

val test = input.through(pipe).evalMap { str =>
// emit the string + the time the string was emitted by the pipe under test
Expand All @@ -109,7 +120,7 @@ class BatchUpSpec extends Specification with CatsEffect {

def timeout4 = {
val input = Stream("a", "b") ++ Stream.sleep_[IO](5.seconds) ++ Stream("c", "d")
val pipe = BatchUp.withTimeout[IO, String](3, 10.seconds)
val pipe = BatchUp.withTimeout[IO, String, String](3, 10.seconds)

val test = input.through(pipe).evalMap { str =>
// emit the string + the time the string was emitted by the pipe under test
Expand All @@ -127,7 +138,7 @@ class BatchUpSpec extends Specification with CatsEffect {

def timeout5 = {
val input = Stream("a", "b", "c", "d") ++ Stream.sleep_[IO](10.seconds)
val pipe = BatchUp.withTimeout[IO, String](3, 60.seconds)
val pipe = BatchUp.withTimeout[IO, String, String](3, 60.seconds)

val test = input.through(pipe).evalMap { str =>
// emit the string + the time the string was emitted by the pipe under test
Expand All @@ -147,9 +158,10 @@ class BatchUpSpec extends Specification with CatsEffect {

object BatchUpSpec {

implicit val stringBatchable: BatchUp.Batchable[String] = new BatchUp.Batchable[String] {
def combine(x: String, y: String): String = s"$x$y"
def weightOf(x: String): Long = x.length.toLong
implicit val stringBatchable: BatchUp.Batchable[String, String] = new BatchUp.Batchable[String, String] {
def single(a: String): String = a
def combine(b: String, a: String): String = s"$b$a"
def weightOf(a: String): Long = a.length.toLong
}

}
1 change: 1 addition & 0 deletions project/BuildSettings.scala
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ object BuildSettings {
scalacOptions += "-Wconf:origin=scala.collection.compat.*:s",
Test / fork := true,
addCompilerPlugin(Dependencies.betterMonadicFor),
addCompilerPlugin(Dependencies.kindProjector),
ThisBuild / autoAPIMappings := true,
ThisBuild / dynverVTagPrefix := false, // Otherwise git tags required to have v-prefix
ThisBuild / dynverSeparator := "-", // to be compatible with docker
Expand Down
2 changes: 2 additions & 0 deletions project/Dependencies.scala
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ object Dependencies {
val circeExtra = "0.14.3"
val circeConfig = "0.10.0"
val betterMonadicFor = "0.3.1"
val kindProjector = "0.13.2"
val collectionCompat = "2.11.0"
val refined = "0.11.0"

Expand Down Expand Up @@ -64,6 +65,7 @@ object Dependencies {
val circeGenericExtra = "io.circe" %% "circe-generic-extras" % V.circeExtra
val circeLiteral = "io.circe" %% "circe-literal" % V.circe
val betterMonadicFor = "com.olegpy" %% "better-monadic-for" % V.betterMonadicFor
val kindProjector = "org.typelevel" %% "kind-projector" % V.kindProjector cross CrossVersion.full
val collectionCompat = "org.scala-lang.modules" %% "scala-collection-compat" % V.collectionCompat
val refined = "eu.timepit" %% "refined" % V.refined

Expand Down

0 comments on commit bb2367c

Please sign in to comment.