Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Sources should report stream latency of stuck events #104

Merged
merged 2 commits into from
Jan 9, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ object TestMetrics {
ref: Ref[IO, TestState],
emptyState: TestState,
config: Option[Metrics.StatsdConfig]
) extends Metrics[IO, TestState](ref, emptyState, config) {
) extends Metrics[IO, TestState](ref, IO.pure(emptyState), config) {
def count(c: Int) = ref.update(s => s.copy(counter = s.counter + c))
def time(t: FiniteDuration) = ref.update(s => s.copy(timer = s.timer + t))
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,6 @@ import com.snowplowanalytics.snowplow.sources.EventProcessingConfig.NoWindowing
import com.snowplowanalytics.snowplow.it.DockerPull
import com.snowplowanalytics.snowplow.it.kinesis._

import java.time.Instant

import Utils._

import org.specs2.specification.BeforeAll
Expand Down Expand Up @@ -60,23 +58,21 @@ class KinesisSourceSpec

for {
refProcessed <- Ref[IO].of[List[ReceivedEvents]](Nil)
t1 <- IO.realTimeInstant
_ <- putDataToKinesis(kinesisClient, testStream1Name, testPayload)
t2 <- IO.realTimeInstant
processingConfig = new EventProcessingConfig(NoWindowing)
refReportedLatencies <- Ref[IO].of(Vector.empty[FiniteDuration])
processingConfig = EventProcessingConfig(NoWindowing, tstamp => refReportedLatencies.update(_ :+ tstamp))
kinesisConfig = getKinesisSourceConfig(testStream1Name)
sourceAndAck <- KinesisSource.build[IO](kinesisConfig)
stream = sourceAndAck.stream(processingConfig, testProcessor(refProcessed))
fiber <- stream.compile.drain.start
_ <- IO.sleep(2.minutes)
processed <- refProcessed.get
reportedLatencies <- refReportedLatencies.get
_ <- fiber.cancel
} yield List(
processed must haveSize(1),
processed.head.events must beEqualTo(List(testPayload)),
processed.head.tstamp must beSome { tstamp: Instant =>
tstamp.toEpochMilli must beBetween(t1.toEpochMilli, t2.toEpochMilli)
}
reportedLatencies.max must beBetween(1.second, 2.minutes)
).reduce(_ and _)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,13 @@ import com.snowplowanalytics.snowplow.sinks.kinesis.KinesisSinkConfig
import java.net.URI
import java.nio.charset.StandardCharsets
import java.util.UUID
import java.time.Instant
import scala.concurrent.duration.DurationLong

import software.amazon.awssdk.services.kinesis.model.DescribeStreamRequest

object Utils {

case class ReceivedEvents(events: List[String], tstamp: Option[Instant])
case class ReceivedEvents(events: List[String])

def putDataToKinesis(
client: KinesisAsyncClient,
Expand Down Expand Up @@ -81,7 +80,7 @@ object Utils {
.build()

val out =
ReceivedEvents(client.getRecords(request).get().records().asScala.toList.map(record => new String(record.data.asByteArray())), None)
ReceivedEvents(client.getRecords(request).get().records().asScala.toList.map(record => new String(record.data.asByteArray())))
out
}

Expand All @@ -108,10 +107,10 @@ object Utils {
)

def testProcessor(ref: Ref[IO, List[ReceivedEvents]]): EventProcessor[IO] =
_.evalMap { case TokenedEvents(events, token, tstamp) =>
_.evalMap { case TokenedEvents(events, token) =>
val parsed = events.map(byteBuffer => StandardCharsets.UTF_8.decode(byteBuffer).toString)
for {
_ <- ref.update(_ :+ ReceivedEvents(parsed.toList, tstamp))
_ <- ref.update(_ :+ ReceivedEvents(parsed.toList))
} yield token
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,7 @@ import org.typelevel.log4cats.slf4j.Slf4jLogger
import scala.reflect._

import java.nio.ByteBuffer
import java.time.Instant
import scala.concurrent.duration.FiniteDuration
import scala.concurrent.duration.DurationLong

// kafka
import fs2.kafka._
Expand Down Expand Up @@ -48,11 +47,8 @@ object KafkaSource {
new LowLevelSource[F, KafkaCheckpoints[F]] {
def checkpointer: Checkpointer[F, KafkaCheckpoints[F]] = kafkaCheckpointer

def stream: Stream[F, Stream[F, LowLevelEvents[KafkaCheckpoints[F]]]] =
def stream: Stream[F, Stream[F, Option[LowLevelEvents[KafkaCheckpoints[F]]]]] =
kafkaStream(config, authHandlerClass)

def lastLiveness: F[FiniteDuration] =
Sync[F].realTime
}

case class OffsetAndCommit[F[_]](offset: Long, commit: F[Unit])
Expand All @@ -75,7 +71,7 @@ object KafkaSource {
private def kafkaStream[F[_]: Async, T <: AzureAuthenticationCallbackHandler](
config: KafkaSourceConfig,
authHandlerClass: ClassTag[T]
): Stream[F, Stream[F, LowLevelEvents[KafkaCheckpoints[F]]]] =
): Stream[F, Stream[F, Option[LowLevelEvents[KafkaCheckpoints[F]]]]] =
KafkaConsumer
.stream(consumerSettings[F, T](config, authHandlerClass))
.evalTap(_.subscribeTo(config.topicName))
Expand All @@ -89,7 +85,7 @@ object KafkaSource {

private def joinPartitions[F[_]: Async](
partitioned: PartitionedStreams[F]
): Stream[F, LowLevelEvents[KafkaCheckpoints[F]]] = {
): Stream[F, Option[LowLevelEvents[KafkaCheckpoints[F]]]] = {
val streams = partitioned.toSeq.map { case (topicPartition, stream) =>
stream.chunks
.flatMap { chunk =>
Expand All @@ -103,8 +99,8 @@ object KafkaSource {
val ts = ccr.record.timestamp
ts.logAppendTime.orElse(ts.createTime).orElse(ts.unknownTime)
}
val earliestTimestamp = if (timestamps.isEmpty) None else Some(Instant.ofEpochMilli(timestamps.min))
Stream.emit(LowLevelEvents(events, ack, earliestTimestamp))
val earliestTimestamp = if (timestamps.isEmpty) None else Some(timestamps.min.millis)
Stream.emit(Some(LowLevelEvents(events, ack, earliestTimestamp)))
case None =>
Stream.empty
}
Expand All @@ -117,6 +113,7 @@ object KafkaSource {
Stream
.emits(streams)
.parJoinUnbounded
.mergeHaltL(Stream.awakeDelay(10.seconds).map(_ => None).repeat) // keepalives
.onFinalize {
Logger[F].info(s"Stopping processing of partitions: $formatted")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
*/
package com.snowplowanalytics.snowplow.sources.kinesis

import cats.effect.{Async, Ref, Sync}
import cats.effect.{Async, Sync}
import cats.data.NonEmptyList
import cats.implicits._
import com.snowplowanalytics.snowplow.sources.SourceAndAck
Expand All @@ -19,53 +19,46 @@ import software.amazon.kinesis.lifecycle.events.{ProcessRecordsInput, ShardEnded
import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber

import java.util.concurrent.{CountDownLatch, SynchronousQueue}
import scala.concurrent.duration.FiniteDuration
import scala.concurrent.duration.DurationLong
import scala.jdk.CollectionConverters._

object KinesisSource {

private implicit def logger[F[_]: Sync]: SelfAwareStructuredLogger[F] = Slf4jLogger.getLogger[F]

def build[F[_]: Async](config: KinesisSourceConfig): F[SourceAndAck[F]] =
Ref.ofEffect(Sync[F].realTime).flatMap { liveness =>
LowLevelSource.toSourceAndAck {
new LowLevelSource[F, Map[String, Checkpointable]] {
def stream: Stream[F, Stream[F, LowLevelEvents[Map[String, Checkpointable]]]] =
kinesisStream(config, liveness)
LowLevelSource.toSourceAndAck {
new LowLevelSource[F, Map[String, Checkpointable]] {
def stream: Stream[F, Stream[F, Option[LowLevelEvents[Map[String, Checkpointable]]]]] =
kinesisStream(config)

def checkpointer: KinesisCheckpointer[F] =
new KinesisCheckpointer[F](config.checkpointThrottledBackoffPolicy)

def lastLiveness: F[FiniteDuration] =
liveness.get
}
def checkpointer: KinesisCheckpointer[F] =
new KinesisCheckpointer[F](config.checkpointThrottledBackoffPolicy)
}
}

// We enable fairness on the `SynchronousQueue` to ensure all Kinesis shards are sourced at an equal rate.
private val synchronousQueueFairness: Boolean = true

private def kinesisStream[F[_]: Async](
config: KinesisSourceConfig,
liveness: Ref[F, FiniteDuration]
): Stream[F, Stream[F, LowLevelEvents[Map[String, Checkpointable]]]] = {
config: KinesisSourceConfig
): Stream[F, Stream[F, Option[LowLevelEvents[Map[String, Checkpointable]]]]] = {
val actionQueue = new SynchronousQueue[KCLAction](synchronousQueueFairness)
for {
_ <- Stream.resource(KCLScheduler.populateQueue[F](config, actionQueue))
events <- Stream.emit(pullFromQueueAndEmit(actionQueue, liveness).stream).repeat
events <- Stream.emit(pullFromQueueAndEmit(actionQueue).stream).repeat
} yield events
}

private def pullFromQueueAndEmit[F[_]: Sync](
queue: SynchronousQueue[KCLAction],
liveness: Ref[F, FiniteDuration]
): Pull[F, LowLevelEvents[Map[String, Checkpointable]], Unit] =
Pull.eval(pullFromQueue(queue, liveness)).flatMap { case PullFromQueueResult(actions, hasShardEnd) =>
queue: SynchronousQueue[KCLAction]
): Pull[F, Option[LowLevelEvents[Map[String, Checkpointable]]], Unit] =
Pull.eval(pullFromQueue(queue)).flatMap { case PullFromQueueResult(actions, hasShardEnd) =>
val toEmit = actions.traverse {
case KCLAction.ProcessRecords(_, processRecordsInput) if processRecordsInput.records.asScala.isEmpty =>
Pull.done
Pull.output1(None)
case KCLAction.ProcessRecords(shardId, processRecordsInput) =>
Pull.output1(provideNextChunk(shardId, processRecordsInput)).covary[F]
Pull.output1(Some(provideNextChunk(shardId, processRecordsInput))).covary[F]
case KCLAction.ShardEnd(shardId, await, shardEndedInput) =>
handleShardEnd[F](shardId, await, shardEndedInput)
case KCLAction.KCLError(t) =>
Expand All @@ -81,14 +74,13 @@ object KinesisSource {
}
Pull.eval(log).covaryOutput *> toEmit *> Pull.done
} else
toEmit *> pullFromQueueAndEmit(queue, liveness)
toEmit *> pullFromQueueAndEmit(queue)
}

private case class PullFromQueueResult(actions: NonEmptyList[KCLAction], hasShardEnd: Boolean)

private def pullFromQueue[F[_]: Sync](queue: SynchronousQueue[KCLAction], liveness: Ref[F, FiniteDuration]): F[PullFromQueueResult] =
private def pullFromQueue[F[_]: Sync](queue: SynchronousQueue[KCLAction]): F[PullFromQueueResult] =
resolveNextAction(queue)
.productL(updateLiveness(liveness))
.flatMap {
case shardEnd: KCLAction.ShardEnd =>
// If we reached the end of one shard, it is likely we reached the end of other shards too.
Expand All @@ -115,9 +107,6 @@ object KinesisSource {
_ <- Sync[F].delay(queue.drainTo(ret))
} yield ret.asScala.toList

private def updateLiveness[F[_]: Sync](liveness: Ref[F, FiniteDuration]): F[Unit] =
Sync[F].realTime.flatMap(now => liveness.set(now))

private def provideNextChunk(shardId: String, input: ProcessRecordsInput) = {
val chunk = Chunk.javaList(input.records()).map(_.data())
val lastRecord = input.records.asScala.last // last is safe because we handled the empty case above
Expand All @@ -126,17 +115,21 @@ object KinesisSource {
new ExtendedSequenceNumber(lastRecord.sequenceNumber, lastRecord.subSequenceNumber),
input.checkpointer
)
LowLevelEvents(chunk, Map[String, Checkpointable](shardId -> checkpointable), Some(firstRecord.approximateArrivalTimestamp))
LowLevelEvents(
chunk,
Map[String, Checkpointable](shardId -> checkpointable),
Some(firstRecord.approximateArrivalTimestamp.toEpochMilli.millis)
)
}

private def handleShardEnd[F[_]](
shardId: String,
await: CountDownLatch,
shardEndedInput: ShardEndedInput
): Pull[F, LowLevelEvents[Map[String, Checkpointable]], Unit] = {
): Pull[F, Option[LowLevelEvents[Map[String, Checkpointable]]], Unit] = {
val checkpointable = Checkpointable.ShardEnd(shardEndedInput.checkpointer, await)
val last = LowLevelEvents(Chunk.empty, Map[String, Checkpointable](shardId -> checkpointable), None)
Pull.output1(last)
Pull.output1(Some(last))
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,6 @@ import fs2.{Chunk, Stream}
import org.typelevel.log4cats.Logger
import org.typelevel.log4cats.slf4j.Slf4jLogger

import java.time.Instant

// pubsub
import com.google.api.gax.core.{ExecutorProvider, FixedExecutorProvider}
import com.google.api.gax.grpc.ChannelPoolSettings
Expand All @@ -31,7 +29,7 @@ import com.snowplowanalytics.snowplow.sources.SourceAndAck
import com.snowplowanalytics.snowplow.sources.internal.{Checkpointer, LowLevelEvents, LowLevelSource}
import com.snowplowanalytics.snowplow.sources.pubsub.PubsubRetryOps.implicits._

import scala.concurrent.duration.{Duration, FiniteDuration}
import scala.concurrent.duration.{Duration, DurationLong, FiniteDuration}
import scala.jdk.CollectionConverters._

import java.util.concurrent.{ExecutorService, Executors}
Expand Down Expand Up @@ -64,17 +62,14 @@ object PubsubSource {
new LowLevelSource[F, Vector[Unique.Token]] {
def checkpointer: Checkpointer[F, Vector[Unique.Token]] = new PubsubCheckpointer(config.subscription, deferredResources)

def stream: Stream[F, Stream[F, LowLevelEvents[Vector[Unique.Token]]]] =
def stream: Stream[F, Stream[F, Option[LowLevelEvents[Vector[Unique.Token]]]]] =
pubsubStream(config, deferredResources)

def lastLiveness: F[FiniteDuration] =
Sync[F].realTime
}

private def pubsubStream[F[_]: Async](
config: PubsubSourceConfig,
deferredResources: Deferred[F, PubsubCheckpointer.Resources[F]]
): Stream[F, Stream[F, LowLevelEvents[Vector[Unique.Token]]]] =
): Stream[F, Stream[F, Option[LowLevelEvents[Vector[Unique.Token]]]]] =
for {
parallelPullCount <- Stream.eval(Sync[F].delay(chooseNumParallelPulls(config)))
stub <- Stream.resource(stubResource(config))
Expand All @@ -83,7 +78,6 @@ object PubsubSource {
} yield Stream
.fixedRateStartImmediately(config.debounceRequests, dampen = true)
.parEvalMapUnordered(parallelPullCount)(_ => pullAndManageState(config, stub, refStates))
.unNone
.prefetchN(parallelPullCount)
.concurrently(extendDeadlines(config, stub, refStates))
.onFinalize(nackRefStatesForShutdown(config, stub, refStates))
Expand Down Expand Up @@ -124,10 +118,10 @@ object PubsubSource {
}
}

private def earliestTimestampOfRecords(records: Vector[ReceivedMessage]): Instant = {
private def earliestTimestampOfRecords(records: Vector[ReceivedMessage]): FiniteDuration = {
val (tstampSeconds, tstampNanos) =
records.map(r => (r.getMessage.getPublishTime.getSeconds, r.getMessage.getPublishTime.getNanos)).min
Instant.ofEpochSecond(tstampSeconds, tstampNanos.toLong)
tstampSeconds.seconds + tstampNanos.toLong.nanos
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,14 @@ import java.nio.charset.StandardCharsets.UTF_8

abstract class Metrics[F[_]: Async, S <: Metrics.State](
ref: Ref[F, S],
emptyState: S,
initState: F[S],
config: Option[Metrics.StatsdConfig]
) {
def report: Stream[F, Nothing] =
Stream.resource(Metrics.makeReporters[F](config)).flatMap { reporters =>
def report = for {
state <- ref.getAndSet(emptyState)
nextState <- initState
state <- ref.getAndSet(nextState)
kv = state.toKVMetrics
_ <- reporters.traverse(_.report(kv))
} yield ()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,11 @@ import scala.concurrent.duration.FiniteDuration
* Whether to open a new [[EventProcessor]] to handle a timed window of events (e.g. for the
* transformer) or whether to feed events to a single [[EventProcessor]] in a continuous endless
* stream (e.g. Enrich)
*
* @param latencyConsumer
* common-streams apps should use the latencyConsumer to set the latency metric.
*/
case class EventProcessingConfig(windowing: EventProcessingConfig.Windowing)
case class EventProcessingConfig[F[_]](windowing: EventProcessingConfig.Windowing, latencyConsumer: FiniteDuration => F[Unit])

object EventProcessingConfig {

Expand Down
Loading
Loading