diff --git a/modules/streams-core/src/main/scala/com.snowplowanalytics.snowplow/sources/EventProcessingConfig.scala b/modules/streams-core/src/main/scala/com.snowplowanalytics.snowplow/sources/EventProcessingConfig.scala index 758fd5e..0ca4f6b 100644 --- a/modules/streams-core/src/main/scala/com.snowplowanalytics.snowplow/sources/EventProcessingConfig.scala +++ b/modules/streams-core/src/main/scala/com.snowplowanalytics.snowplow/sources/EventProcessingConfig.scala @@ -50,6 +50,11 @@ object EventProcessingConfig { * Controls how many windows are allowed to start eagerly ahead of an earlier window that is * still being finalized. For example, if numEagerWindows=2 then window 42 is allowed to start * while windows 40 and 41 are still finalizing. + * + * The `firstWindowScaling` lies in the range 0.25 to 0.5. This range comes from experience with + * the lake loader: 1. Helps the app quickly reach a stable cpu usage; 2. Avoids a problem in + * which the loader pulls in more events in the first window than what it can possibly sink within + * the second window. */ case class TimedWindows( duration: FiniteDuration, @@ -61,7 +66,7 @@ object EventProcessingConfig { def build[F[_]: Sync](duration: FiniteDuration, numEagerWindows: Int): F[TimedWindows] = for { random <- Random.scalaUtilRandom - factor <- random.nextDouble + factor <- random.betweenDouble(0.25, 0.5) } yield TimedWindows(duration, factor, numEagerWindows) } diff --git a/modules/streams-core/src/main/scala/com.snowplowanalytics.snowplow/sources/internal/LowLevelSource.scala b/modules/streams-core/src/main/scala/com.snowplowanalytics.snowplow/sources/internal/LowLevelSource.scala index 7daa059..3ebc13a 100644 --- a/modules/streams-core/src/main/scala/com.snowplowanalytics.snowplow/sources/internal/LowLevelSource.scala +++ b/modules/streams-core/src/main/scala/com.snowplowanalytics.snowplow/sources/internal/LowLevelSource.scala @@ -283,7 +283,11 @@ private[sources] object LowLevelSource { * by flushing all pending events to S3 and then sending the SQS message. */ private def timedWindows[F[_]: Async, A](config: EventProcessingConfig.TimedWindows): Pipe[F, A, Stream[F, A]] = { - def go(timedPull: Pull.Timed[F, A], current: Option[Queue[F, Option[A]]]): Pull[F, Stream[F, A], Unit] = + def go( + timedPull: Pull.Timed[F, A], + current: Option[Queue[F, Option[A]]], + nextDuration: FiniteDuration + ): Pull[F, Stream[F, A], Unit] = timedPull.uncons.attempt.flatMap { case Right(None) => current match { @@ -292,8 +296,8 @@ private[sources] object LowLevelSource { } case Right(Some((Left(_), next))) => current match { - case None => go(next, None) - case Some(q) => Pull.eval(q.offer(None)) >> go(next, None) + case None => go(next, None, nextDuration) + case Some(q) => Pull.eval(q.offer(None)) >> go(next, None, nextDuration) } case Right(Some((Right(chunk), next))) => current match { @@ -302,11 +306,11 @@ private[sources] object LowLevelSource { q <- Pull.eval(Queue.synchronous[F, Option[A]]) _ <- Pull.output1(Stream.fromQueueNoneTerminated(q)) _ <- Pull.eval(chunk.traverse(a => q.offer(Some(a)))) - _ <- Pull.eval(Logger[F].info(s"Opening new window with duration ${config.duration}")) >> next.timeout(config.duration) - } yield go(next, Some(q)) + _ <- Pull.eval(Logger[F].info(s"Opening new window with duration $nextDuration")) >> next.timeout(nextDuration) + } yield go(next, Some(q), (nextDuration * 2).min(config.duration)) pull.flatten case Some(q) => - Pull.eval(chunk.traverse(a => q.offer(Some(a)))) >> go(next, Some(q)) + Pull.eval(chunk.traverse(a => q.offer(Some(a)))) >> go(next, Some(q), nextDuration) } case Left(throwable) => current match { @@ -324,7 +328,7 @@ private[sources] object LowLevelSource { _ <- timedPull.timeout(timeout) q <- Pull.eval(Queue.synchronous[F, Option[A]]) _ <- Pull.output1(Stream.fromQueueNoneTerminated(q)) - } yield go(timedPull, Some(q)) + } yield go(timedPull, Some(q), (2 * timeout).min(config.duration)) pull.flatten } .stream @@ -338,6 +342,6 @@ private[sources] object LowLevelSource { * time. All instances in the group should end windows at slightly different times, so that * downstream gets a more steady flow of completed batches. */ - private def timeoutForFirstWindow(config: EventProcessingConfig.TimedWindows) = + private def timeoutForFirstWindow(config: EventProcessingConfig.TimedWindows): FiniteDuration = (config.duration.toMillis * config.firstWindowScaling).toLong.milliseconds } diff --git a/modules/streams-core/src/test/scala/com.snowplowanalytics.snowplow/sources/internal/LowLevelSourceSpec.scala b/modules/streams-core/src/test/scala/com.snowplowanalytics.snowplow/sources/internal/LowLevelSourceSpec.scala index 00f3ac8..4b6961c 100644 --- a/modules/streams-core/src/test/scala/com.snowplowanalytics.snowplow/sources/internal/LowLevelSourceSpec.scala +++ b/modules/streams-core/src/test/scala/com.snowplowanalytics.snowplow/sources/internal/LowLevelSourceSpec.scala @@ -476,7 +476,7 @@ class LowLevelSourceSpec extends Specification with CatsEffect { sourceAndAck <- LowLevelSource.toSourceAndAck(testLowLevelSource(refActions, testConfig)) processor = windowedProcessor(refActions, testConfig) fiber <- sourceAndAck.stream(config, processor).compile.drain.start - _ <- IO.sleep(91.seconds) + _ <- IO.sleep(131.seconds) _ <- fiber.cancel result <- refActions.get } yield result must beEqualTo( @@ -490,15 +490,21 @@ class LowLevelSourceSpec extends Specification with CatsEffect { Action.ProcessorReceivedEvents("1970-01-01T00:00:22Z", List("5", "6")), Action.ProcessorReceivedEvents("1970-01-01T00:00:33Z", List("7", "8")), Action.ProcessorReceivedEvents("1970-01-01T00:00:44Z", List("9", "10")), + Action.ProcessorReachedEndOfWindow("1970-01-01T00:00:52Z"), + Action.Checkpointed(List("5", "6", "7", "8", "9", "10")), + Action.ProcessorStartedWindow("1970-01-01T00:00:55Z"), Action.ProcessorReceivedEvents("1970-01-01T00:00:55Z", List("11", "12")), Action.ProcessorReceivedEvents("1970-01-01T00:01:06Z", List("13", "14")), Action.ProcessorReceivedEvents("1970-01-01T00:01:17Z", List("15", "16")), - Action.ProcessorReachedEndOfWindow("1970-01-01T00:01:22Z"), - Action.Checkpointed(List("5", "6", "7", "8", "9", "10", "11", "12", "13", "14", "15", "16")), - Action.ProcessorStartedWindow("1970-01-01T00:01:28Z"), Action.ProcessorReceivedEvents("1970-01-01T00:01:28Z", List("17", "18")), - Action.ProcessorReachedEndOfWindow("1970-01-01T00:01:31Z"), - Action.Checkpointed(List("17", "18")) + Action.ProcessorReceivedEvents("1970-01-01T00:01:39Z", List("19", "20")), + Action.ProcessorReceivedEvents("1970-01-01T00:01:50Z", List("21", "22")), + Action.ProcessorReachedEndOfWindow("1970-01-01T00:01:55Z"), + Action.Checkpointed(List("11", "12", "13", "14", "15", "16", "17", "18", "19", "20", "21", "22")), + Action.ProcessorStartedWindow("1970-01-01T00:02:01Z"), + Action.ProcessorReceivedEvents("1970-01-01T00:02:01Z", List("23", "24")), + Action.ProcessorReachedEndOfWindow("1970-01-01T00:02:11Z"), + Action.Checkpointed(List("23", "24")) ) )