Skip to content

Commit

Permalink
Ensure first window is always small (#106)
Browse files Browse the repository at this point in the history
This change affects windowing apps e.g. Lake Loader.

Before this change, the first window was a random size, to avoid write
conflicts between several loaders finishing their windows at similar
times. After this change, the first window is still random, but sized in
the ranged 25% to 50% of the regular window size. This was shown in Lake
Loader to give a smoother starting cpu profile, and it prevents the
loader from pulling in too many events into the first window.
  • Loading branch information
istreeter authored Jan 13, 2025
1 parent 0d42df5 commit 385ae5a
Show file tree
Hide file tree
Showing 3 changed files with 30 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,11 @@ object EventProcessingConfig {
* Controls how many windows are allowed to start eagerly ahead of an earlier window that is
* still being finalized. For example, if numEagerWindows=2 then window 42 is allowed to start
* while windows 40 and 41 are still finalizing.
*
* The `firstWindowScaling` lies in the range 0.25 to 0.5. This range comes from experience with
* the lake loader: 1. Helps the app quickly reach a stable cpu usage; 2. Avoids a problem in
* which the loader pulls in more events in the first window than what it can possibly sink within
* the second window.
*/
case class TimedWindows(
duration: FiniteDuration,
Expand All @@ -61,7 +66,7 @@ object EventProcessingConfig {
def build[F[_]: Sync](duration: FiniteDuration, numEagerWindows: Int): F[TimedWindows] =
for {
random <- Random.scalaUtilRandom
factor <- random.nextDouble
factor <- random.betweenDouble(0.25, 0.5)
} yield TimedWindows(duration, factor, numEagerWindows)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -283,7 +283,11 @@ private[sources] object LowLevelSource {
* by flushing all pending events to S3 and then sending the SQS message.
*/
private def timedWindows[F[_]: Async, A](config: EventProcessingConfig.TimedWindows): Pipe[F, A, Stream[F, A]] = {
def go(timedPull: Pull.Timed[F, A], current: Option[Queue[F, Option[A]]]): Pull[F, Stream[F, A], Unit] =
def go(
timedPull: Pull.Timed[F, A],
current: Option[Queue[F, Option[A]]],
nextDuration: FiniteDuration
): Pull[F, Stream[F, A], Unit] =
timedPull.uncons.attempt.flatMap {
case Right(None) =>
current match {
Expand All @@ -292,8 +296,8 @@ private[sources] object LowLevelSource {
}
case Right(Some((Left(_), next))) =>
current match {
case None => go(next, None)
case Some(q) => Pull.eval(q.offer(None)) >> go(next, None)
case None => go(next, None, nextDuration)
case Some(q) => Pull.eval(q.offer(None)) >> go(next, None, nextDuration)
}
case Right(Some((Right(chunk), next))) =>
current match {
Expand All @@ -302,11 +306,11 @@ private[sources] object LowLevelSource {
q <- Pull.eval(Queue.synchronous[F, Option[A]])
_ <- Pull.output1(Stream.fromQueueNoneTerminated(q))
_ <- Pull.eval(chunk.traverse(a => q.offer(Some(a))))
_ <- Pull.eval(Logger[F].info(s"Opening new window with duration ${config.duration}")) >> next.timeout(config.duration)
} yield go(next, Some(q))
_ <- Pull.eval(Logger[F].info(s"Opening new window with duration $nextDuration")) >> next.timeout(nextDuration)
} yield go(next, Some(q), (nextDuration * 2).min(config.duration))
pull.flatten
case Some(q) =>
Pull.eval(chunk.traverse(a => q.offer(Some(a)))) >> go(next, Some(q))
Pull.eval(chunk.traverse(a => q.offer(Some(a)))) >> go(next, Some(q), nextDuration)
}
case Left(throwable) =>
current match {
Expand All @@ -324,7 +328,7 @@ private[sources] object LowLevelSource {
_ <- timedPull.timeout(timeout)
q <- Pull.eval(Queue.synchronous[F, Option[A]])
_ <- Pull.output1(Stream.fromQueueNoneTerminated(q))
} yield go(timedPull, Some(q))
} yield go(timedPull, Some(q), (2 * timeout).min(config.duration))
pull.flatten
}
.stream
Expand All @@ -338,6 +342,6 @@ private[sources] object LowLevelSource {
* time. All instances in the group should end windows at slightly different times, so that
* downstream gets a more steady flow of completed batches.
*/
private def timeoutForFirstWindow(config: EventProcessingConfig.TimedWindows) =
private def timeoutForFirstWindow(config: EventProcessingConfig.TimedWindows): FiniteDuration =
(config.duration.toMillis * config.firstWindowScaling).toLong.milliseconds
}
Original file line number Diff line number Diff line change
Expand Up @@ -476,7 +476,7 @@ class LowLevelSourceSpec extends Specification with CatsEffect {
sourceAndAck <- LowLevelSource.toSourceAndAck(testLowLevelSource(refActions, testConfig))
processor = windowedProcessor(refActions, testConfig)
fiber <- sourceAndAck.stream(config, processor).compile.drain.start
_ <- IO.sleep(91.seconds)
_ <- IO.sleep(131.seconds)
_ <- fiber.cancel
result <- refActions.get
} yield result must beEqualTo(
Expand All @@ -490,15 +490,21 @@ class LowLevelSourceSpec extends Specification with CatsEffect {
Action.ProcessorReceivedEvents("1970-01-01T00:00:22Z", List("5", "6")),
Action.ProcessorReceivedEvents("1970-01-01T00:00:33Z", List("7", "8")),
Action.ProcessorReceivedEvents("1970-01-01T00:00:44Z", List("9", "10")),
Action.ProcessorReachedEndOfWindow("1970-01-01T00:00:52Z"),
Action.Checkpointed(List("5", "6", "7", "8", "9", "10")),
Action.ProcessorStartedWindow("1970-01-01T00:00:55Z"),
Action.ProcessorReceivedEvents("1970-01-01T00:00:55Z", List("11", "12")),
Action.ProcessorReceivedEvents("1970-01-01T00:01:06Z", List("13", "14")),
Action.ProcessorReceivedEvents("1970-01-01T00:01:17Z", List("15", "16")),
Action.ProcessorReachedEndOfWindow("1970-01-01T00:01:22Z"),
Action.Checkpointed(List("5", "6", "7", "8", "9", "10", "11", "12", "13", "14", "15", "16")),
Action.ProcessorStartedWindow("1970-01-01T00:01:28Z"),
Action.ProcessorReceivedEvents("1970-01-01T00:01:28Z", List("17", "18")),
Action.ProcessorReachedEndOfWindow("1970-01-01T00:01:31Z"),
Action.Checkpointed(List("17", "18"))
Action.ProcessorReceivedEvents("1970-01-01T00:01:39Z", List("19", "20")),
Action.ProcessorReceivedEvents("1970-01-01T00:01:50Z", List("21", "22")),
Action.ProcessorReachedEndOfWindow("1970-01-01T00:01:55Z"),
Action.Checkpointed(List("11", "12", "13", "14", "15", "16", "17", "18", "19", "20", "21", "22")),
Action.ProcessorStartedWindow("1970-01-01T00:02:01Z"),
Action.ProcessorReceivedEvents("1970-01-01T00:02:01Z", List("23", "24")),
Action.ProcessorReachedEndOfWindow("1970-01-01T00:02:11Z"),
Action.Checkpointed(List("23", "24"))
)
)

Expand Down

0 comments on commit 385ae5a

Please sign in to comment.