diff --git a/config/config.aws.reference.hocon b/config/config.aws.reference.hocon index 85da83e4..92a523be 100644 --- a/config/config.aws.reference.hocon +++ b/config/config.aws.reference.hocon @@ -39,6 +39,17 @@ # -- Duration of shard leases. KCL workers must periodically refresh leases in the dynamodb table before this duration expires. "leaseDuration": "10 seconds" + + # -- Controls how to pick the max number of leases to steal at one time. + # -- E.g. If there are 4 available processors, and maxLeasesToStealAtOneTimeFactor = 2.0, then allow the KCL to steal up to 8 leases. + # -- Allows bigger instances to more quickly acquire the shard-leases they need to combat latency + "maxLeasesToStealAtOneTimeFactor": 2.0 + + # -- Configures how to backoff and retry in case of DynamoDB provisioned throughput limits + "checkpointThrottledBackoffPolicy": { + "minBackoff": "100 millis" + "maxBackoff": "1 second" + } } "output": { diff --git a/config/config.gcp.reference.hocon b/config/config.gcp.reference.hocon index 8d8658cc..439674ec 100644 --- a/config/config.gcp.reference.hocon +++ b/config/config.gcp.reference.hocon @@ -14,15 +14,22 @@ # -- The number of threads is equal to this factor multiplied by the number of availble cpu cores "parallelPullFactor": 0.5 - # -- How many bytes can be buffered by the loader app before blocking the pubsub client library - # -- from fetching more events. - # -- This is a balance between memory usage vs how efficiently the app can operate. The default value works well. - "bufferMaxBytes": 10000000 - - # -- Sets min/max boundaries on the value by which an ack deadline is extended. - # -- The actual value used is guided by runtime statistics collected by the pubsub client library. - "minDurationPerAckExtension": "60 seconds" - "maxDurationPerAckExtension": "600 seconds" + # -- Pubsub ack deadlines are extended for this duration when needed. + # -- A sensible value is double the size of the "windowing" config parameter, but no higher than 10 minutes. + "durationPerAckExtension": "600 seconds" + + # -- Controls when ack deadlines are re-extended, for a message that is close to exceeding its ack deadline. + # -- For example, if `durationPerAckExtension` is `600 seconds` and `minRemainingAckDeadline` is `0.1` then the Source + # -- will wait until there is `60 seconds` left of the remining deadline, before re-extending the message deadline. + "minRemainingAckDeadline": 0.1 + + # -- How many pubsub messages to pull from the server in a single request. + "maxMessagesPerPull": 1000 + + # -- Adds an artifical delay between consecutive requests to pubsub for more messages. + # -- Under some circumstances, this was found to slightly alleviate a problem in which pubsub might re-deliver + # -- the same messages multiple times. + "debounceRequests": "100 millis" } "output": { diff --git a/modules/core/src/main/scala/com.snowplowanalytics.snowplow.lakes/Environment.scala b/modules/core/src/main/scala/com.snowplowanalytics.snowplow.lakes/Environment.scala index 5283e891..55b9120d 100644 --- a/modules/core/src/main/scala/com.snowplowanalytics.snowplow.lakes/Environment.scala +++ b/modules/core/src/main/scala/com.snowplowanalytics.snowplow.lakes/Environment.scala @@ -78,7 +78,7 @@ object Environment { windowing <- Resource.eval(EventProcessingConfig.TimedWindows.build(config.main.windowing, config.main.numEagerWindows)) lakeWriter <- LakeWriter.build(config.main.spark, config.main.output.good) lakeWriterWrapped = LakeWriter.withHandledErrors(lakeWriter, appHealth, config.main.retries, destinationSetupErrorCheck) - metrics <- Resource.eval(Metrics.build(config.main.monitoring.metrics)) + metrics <- Resource.eval(Metrics.build(config.main.monitoring.metrics, sourceAndAck)) cpuParallelism = chooseCpuParallelism(config.main) } yield Environment( appInfo = appInfo, diff --git a/modules/core/src/main/scala/com.snowplowanalytics.snowplow.lakes/Metrics.scala b/modules/core/src/main/scala/com.snowplowanalytics.snowplow.lakes/Metrics.scala index 5d9c9104..93ac5de9 100644 --- a/modules/core/src/main/scala/com.snowplowanalytics.snowplow.lakes/Metrics.scala +++ b/modules/core/src/main/scala/com.snowplowanalytics.snowplow.lakes/Metrics.scala @@ -10,6 +10,7 @@ package com.snowplowanalytics.snowplow.lakes +import cats.Functor import cats.effect.Async import cats.effect.kernel.Ref import cats.implicits._ @@ -17,6 +18,7 @@ import fs2.Stream import scala.concurrent.duration.{Duration, FiniteDuration} +import com.snowplowanalytics.snowplow.sources.SourceAndAck import com.snowplowanalytics.snowplow.runtime.{Metrics => CommonMetrics} trait Metrics[F[_]] { @@ -32,8 +34,8 @@ trait Metrics[F[_]] { object Metrics { - def build[F[_]: Async](config: Config.Metrics): F[Metrics[F]] = - Ref[F].of(State.empty).map(impl(config, _)) + def build[F[_]: Async](config: Config.Metrics, sourceAndAck: SourceAndAck[F]): F[Metrics[F]] = + Ref.ofEffect(State.initialize(sourceAndAck)).map(impl(config, _, sourceAndAck)) private case class State( received: Int, @@ -53,11 +55,18 @@ object Metrics { } private object State { - def empty: State = State(0, 0, 0, Duration.Zero, None, None) + def initialize[F[_]: Functor](sourceAndAck: SourceAndAck[F]): F[State] = + sourceAndAck.currentStreamLatency.map { latency => + State(0, 0, 0, latency.getOrElse(Duration.Zero), None, None) + } } - private def impl[F[_]: Async](config: Config.Metrics, ref: Ref[F, State]): Metrics[F] = - new CommonMetrics[F, State](ref, State.empty, config.statsd) with Metrics[F] { + private def impl[F[_]: Async]( + config: Config.Metrics, + ref: Ref[F, State], + sourceAndAck: SourceAndAck[F] + ): Metrics[F] = + new CommonMetrics[F, State](ref, State.initialize(sourceAndAck), config.statsd) with Metrics[F] { def addReceived(count: Int): F[Unit] = ref.update(s => s.copy(received = s.received + count)) def addBad(count: Int): F[Unit] = diff --git a/modules/core/src/test/scala/com.snowplowanalytics.snowplow.lakes/MockEnvironment.scala b/modules/core/src/test/scala/com.snowplowanalytics.snowplow.lakes/MockEnvironment.scala index b7ab183e..f29a6c98 100644 --- a/modules/core/src/test/scala/com.snowplowanalytics.snowplow.lakes/MockEnvironment.scala +++ b/modules/core/src/test/scala/com.snowplowanalytics.snowplow.lakes/MockEnvironment.scala @@ -135,6 +135,9 @@ object MockEnvironment { def isHealthy(maxAllowedProcessingLatency: FiniteDuration): IO[SourceAndAck.HealthStatus] = IO.pure(SourceAndAck.Healthy) + + def currentStreamLatency: IO[Option[FiniteDuration]] = + IO.pure(None) } private def testSink(ref: Ref[IO, Vector[Action]]): Sink[IO] = Sink[IO] { batch => diff --git a/modules/core/src/test/scala/com.snowplowanalytics.snowplow.lakes/TestSparkEnvironment.scala b/modules/core/src/test/scala/com.snowplowanalytics.snowplow.lakes/TestSparkEnvironment.scala index 7279b449..7d1101d9 100644 --- a/modules/core/src/test/scala/com.snowplowanalytics.snowplow.lakes/TestSparkEnvironment.scala +++ b/modules/core/src/test/scala/com.snowplowanalytics.snowplow.lakes/TestSparkEnvironment.scala @@ -70,6 +70,9 @@ object TestSparkEnvironment { def isHealthy(maxAllowedProcessingLatency: FiniteDuration): IO[SourceAndAck.HealthStatus] = IO.pure(SourceAndAck.Healthy) + + def currentStreamLatency: IO[Option[FiniteDuration]] = + IO.pure(None) } private def testHttpClient: Client[IO] = Client[IO] { _ => diff --git a/modules/gcp/src/main/resources/application.conf b/modules/gcp/src/main/resources/application.conf index 95907252..dfb84592 100644 --- a/modules/gcp/src/main/resources/application.conf +++ b/modules/gcp/src/main/resources/application.conf @@ -10,6 +10,7 @@ "input": ${snowplow.defaults.sources.pubsub} "input": { "gcpUserAgent": ${gcpUserAgent} + "durationPerAckExtension": "600 seconds" } "output": { "bad": ${snowplow.defaults.sinks.pubsub} diff --git a/project/Dependencies.scala b/project/Dependencies.scala index 0eefe15e..f22a9119 100644 --- a/project/Dependencies.scala +++ b/project/Dependencies.scala @@ -49,7 +49,7 @@ object Dependencies { val awsRegistry = "1.1.20" // Snowplow - val streams = "0.8.2-M1" + val streams = "0.10.0-M2" val igluClient = "4.0.0" // Transitive overrides