Skip to content

Commit

Permalink
common-streams 0.10.0
Browse files Browse the repository at this point in the history
Common streams 0.10.0 brings significant to changes to the Kinesis and
Pubsub sources:

- PubSub source completely re-written to be a wrapper around UnaryPull
  snowplow-incubator/common-streams#101
- Kinesis source is more efficient when the stream is re-sharded
  snowplow-incubator/common-streams#102
- Kinesis source better tuned for larger deployments
  snowplow-incubator/common-streams#99

And improvements to latency metrics:
- Sources should report stream latency of stuck events
  snowplow-incubator/common-streams#104
  • Loading branch information
istreeter committed Jan 3, 2025
1 parent cb3176e commit 5ca2239
Show file tree
Hide file tree
Showing 8 changed files with 50 additions and 16 deletions.
11 changes: 11 additions & 0 deletions config/config.aws.reference.hocon
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,17 @@

# -- Duration of shard leases. KCL workers must periodically refresh leases in the dynamodb table before this duration expires.
"leaseDuration": "10 seconds"

# -- Controls how to pick the max number of leases to steal at one time.
# -- E.g. If there are 4 available processors, and maxLeasesToStealAtOneTimeFactor = 2.0, then allow the KCL to steal up to 8 leases.
# -- Allows bigger instances to more quickly acquire the shard-leases they need to combat latency
"maxLeasesToStealAtOneTimeFactor": 2.0

# -- Configures how to backoff and retry in case of DynamoDB provisioned throughput limits
"checkpointThrottledBackoffPolicy": {
"minBackoff": "100 millis"
"maxBackoff": "1 second"
}
}

"output": {
Expand Down
25 changes: 16 additions & 9 deletions config/config.gcp.reference.hocon
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,22 @@
# -- The number of threads is equal to this factor multiplied by the number of availble cpu cores
"parallelPullFactor": 0.5

# -- How many bytes can be buffered by the loader app before blocking the pubsub client library
# -- from fetching more events.
# -- This is a balance between memory usage vs how efficiently the app can operate. The default value works well.
"bufferMaxBytes": 10000000

# -- Sets min/max boundaries on the value by which an ack deadline is extended.
# -- The actual value used is guided by runtime statistics collected by the pubsub client library.
"minDurationPerAckExtension": "60 seconds"
"maxDurationPerAckExtension": "600 seconds"
# -- Pubsub ack deadlines are extended for this duration when needed.
# -- A sensible value is double the size of the "windowing" config parameter, but no higher than 10 minutes.
"durationPerAckExtension": "600 seconds"

# -- Controls when ack deadlines are re-extended, for a message that is close to exceeding its ack deadline.
# -- For example, if `durationPerAckExtension` is `600 seconds` and `minRemainingAckDeadline` is `0.1` then the Source
# -- will wait until there is `60 seconds` left of the remining deadline, before re-extending the message deadline.
"minRemainingAckDeadline": 0.1

# -- How many pubsub messages to pull from the server in a single request.
"maxMessagesPerPull": 1000

# -- Adds an artifical delay between consecutive requests to pubsub for more messages.
# -- Under some circumstances, this was found to slightly alleviate a problem in which pubsub might re-deliver
# -- the same messages multiple times.
"debounceRequests": "100 millis"
}

"output": {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ object Environment {
windowing <- Resource.eval(EventProcessingConfig.TimedWindows.build(config.main.windowing, config.main.numEagerWindows))
lakeWriter <- LakeWriter.build(config.main.spark, config.main.output.good)
lakeWriterWrapped = LakeWriter.withHandledErrors(lakeWriter, appHealth, config.main.retries, destinationSetupErrorCheck)
metrics <- Resource.eval(Metrics.build(config.main.monitoring.metrics))
metrics <- Resource.eval(Metrics.build(config.main.monitoring.metrics, sourceAndAck))
cpuParallelism = chooseCpuParallelism(config.main)
} yield Environment(
appInfo = appInfo,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,15 @@

package com.snowplowanalytics.snowplow.lakes

import cats.Functor
import cats.effect.Async
import cats.effect.kernel.Ref
import cats.implicits._
import fs2.Stream

import scala.concurrent.duration.{Duration, FiniteDuration}

import com.snowplowanalytics.snowplow.sources.SourceAndAck
import com.snowplowanalytics.snowplow.runtime.{Metrics => CommonMetrics}

trait Metrics[F[_]] {
Expand All @@ -32,8 +34,8 @@ trait Metrics[F[_]] {

object Metrics {

def build[F[_]: Async](config: Config.Metrics): F[Metrics[F]] =
Ref[F].of(State.empty).map(impl(config, _))
def build[F[_]: Async](config: Config.Metrics, sourceAndAck: SourceAndAck[F]): F[Metrics[F]] =
Ref.ofEffect(State.initialize(sourceAndAck)).map(impl(config, _, sourceAndAck))

private case class State(
received: Int,
Expand All @@ -53,11 +55,18 @@ object Metrics {
}

private object State {
def empty: State = State(0, 0, 0, Duration.Zero, None, None)
def initialize[F[_]: Functor](sourceAndAck: SourceAndAck[F]): F[State] =
sourceAndAck.currentStreamLatency.map { latency =>
State(0, 0, 0, latency.getOrElse(Duration.Zero), None, None)
}
}

private def impl[F[_]: Async](config: Config.Metrics, ref: Ref[F, State]): Metrics[F] =
new CommonMetrics[F, State](ref, State.empty, config.statsd) with Metrics[F] {
private def impl[F[_]: Async](
config: Config.Metrics,
ref: Ref[F, State],
sourceAndAck: SourceAndAck[F]
): Metrics[F] =
new CommonMetrics[F, State](ref, State.initialize(sourceAndAck), config.statsd) with Metrics[F] {
def addReceived(count: Int): F[Unit] =
ref.update(s => s.copy(received = s.received + count))
def addBad(count: Int): F[Unit] =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,9 @@ object MockEnvironment {

def isHealthy(maxAllowedProcessingLatency: FiniteDuration): IO[SourceAndAck.HealthStatus] =
IO.pure(SourceAndAck.Healthy)

def currentStreamLatency: IO[Option[FiniteDuration]] =
IO.pure(None)
}

private def testSink(ref: Ref[IO, Vector[Action]]): Sink[IO] = Sink[IO] { batch =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,9 @@ object TestSparkEnvironment {

def isHealthy(maxAllowedProcessingLatency: FiniteDuration): IO[SourceAndAck.HealthStatus] =
IO.pure(SourceAndAck.Healthy)

def currentStreamLatency: IO[Option[FiniteDuration]] =
IO.pure(None)
}

private def testHttpClient: Client[IO] = Client[IO] { _ =>
Expand Down
1 change: 1 addition & 0 deletions modules/gcp/src/main/resources/application.conf
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
"input": ${snowplow.defaults.sources.pubsub}
"input": {
"gcpUserAgent": ${gcpUserAgent}
"durationPerAckExtension": "600 seconds"
}
"output": {
"bad": ${snowplow.defaults.sinks.pubsub}
Expand Down
2 changes: 1 addition & 1 deletion project/Dependencies.scala
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ object Dependencies {
val awsRegistry = "1.1.20"

// Snowplow
val streams = "0.8.2-M1"
val streams = "0.10.0-M2"
val igluClient = "4.0.0"

// Transitive overrides
Expand Down

0 comments on commit 5ca2239

Please sign in to comment.