Skip to content

Commit

Permalink
Use ListOfList as primary data structure for streaming apps (close #31)
Browse files Browse the repository at this point in the history
  • Loading branch information
istreeter committed Nov 23, 2023
1 parent fc251ae commit 0570c9f
Show file tree
Hide file tree
Showing 9 changed files with 306 additions and 43 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ import cats.effect.kernel.Resource
import cats.implicits._
import cats.Monad
import com.snowplowanalytics.snowplow.sinks.{Sink, Sinkable}
import fs2.Chunk
import fs2.kafka._

import java.util.UUID
Expand All @@ -32,7 +31,7 @@ object KafkaSink {

private def fromFs2Producer[F[_]: Monad](config: KafkaSinkConfig, producer: KafkaProducer[F, String, Array[Byte]]): Sink[F] =
Sink { batch =>
val records = Chunk.from(batch.map(toProducerRecord(config, _)))
val records = batch.copyToChunk.map(toProducerRecord(config, _))
producer.produce(records).flatten.void
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import software.amazon.awssdk.regions.providers.DefaultAwsRegionProviderChain
import software.amazon.awssdk.regions.Region

import com.snowplowanalytics.snowplow.it.kinesis._
import com.snowplowanalytics.snowplow.sinks.{Sink, Sinkable}
import com.snowplowanalytics.snowplow.sinks.{ListOfList, Sink, Sinkable}

import Utils._

Expand All @@ -44,7 +44,7 @@ class KinesisSinkSpec extends CatsResource[IO, (Region, LocalStackContainer, Sin

def e1 = withResource { case (region, localstack, testSink) =>
val testPayload = "test-payload"
val testInput = List(Sinkable(testPayload.getBytes(), Some("myPk"), Map(("", ""))))
val testInput = ListOfList.ofItems(Sinkable(testPayload.getBytes(), Some("myPk"), Map(("", ""))))

for {
kinesisClient <- getKinesisClient(localstack.getEndpoint, region)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
package com.snowplowanalytics.snowplow.sinks.kinesis

import cats.implicits._
import cats.{Applicative, Monoid, Parallel}
import cats.{Applicative, Parallel}
import cats.effect.{Async, Resource, Sync}
import cats.effect.kernel.Ref

Expand All @@ -30,7 +30,7 @@ import java.nio.charset.StandardCharsets.UTF_8

import scala.jdk.CollectionConverters._

import com.snowplowanalytics.snowplow.sinks.{Sink, Sinkable}
import com.snowplowanalytics.snowplow.sinks.{ListOfList, Sink, Sinkable}

object KinesisSink {

Expand Down Expand Up @@ -70,7 +70,7 @@ object KinesisSink {
* big as possible with respecting the record limit and the size limit.
*/
private[kinesis] def group[A](
records: List[A],
records: ListOfList[A],
recordLimit: Int,
sizeLimit: Int,
getRecordSize: A => Int
Expand Down Expand Up @@ -112,8 +112,8 @@ object KinesisSink {
kinesis.putRecords(putRecordsRequest)
}

private def toKinesisRecords(records: List[Sinkable]): List[PutRecordsRequestEntry] =
records.map { r =>
private def toKinesisRecords(records: ListOfList[Sinkable]): ListOfList[PutRecordsRequestEntry] =
records.mapUnordered { r =>
val data = SdkBytes.fromByteArrayUnsafe(r.bytes)
val prre = PutRecordsRequestEntry
.builder()
Expand All @@ -133,24 +133,13 @@ object KinesisSink {
* A message to help with logging
*/
private case class TryBatchResult(
nextBatchAttempt: Vector[PutRecordsRequestEntry],
nextBatchAttempt: List[PutRecordsRequestEntry],
hadNonThrottleErrors: Boolean,
exampleInternalError: Option[String]
)

private object TryBatchResult {

implicit private def tryBatchResultMonoid: Monoid[TryBatchResult] =
new Monoid[TryBatchResult] {
override val empty: TryBatchResult = TryBatchResult(Vector.empty, false, None)
override def combine(x: TryBatchResult, y: TryBatchResult): TryBatchResult =
TryBatchResult(
x.nextBatchAttempt ++ y.nextBatchAttempt,
x.hadNonThrottleErrors || y.hadNonThrottleErrors,
x.exampleInternalError.orElse(y.exampleInternalError)
)
}

/**
* The build method creates a TryBatchResult, which:
*
Expand All @@ -164,21 +153,21 @@ object KinesisSink {
if (prr.failedRecordCount().toInt =!= 0)
records
.zip(prr.records().asScala)
.foldMap { case (orig, recordResult) =>
.foldLeft(TryBatchResult(Nil, false, None)) { case (acc, (orig, recordResult)) =>
Option(recordResult.errorCode()) match {
// If the record had no error, treat as success
case None =>
TryBatchResult(Vector.empty, false, None)
case None => acc
// If it had a throughput exception, mark that and provide the original
case Some("ProvisionedThroughputExceededException") =>
TryBatchResult(Vector(orig), false, None)
acc.copy(nextBatchAttempt = orig :: acc.nextBatchAttempt)
// If any other error, mark success and throttled false for this record, and provide the original
case Some(_) =>
TryBatchResult(Vector(orig), true, Option(recordResult.errorMessage()))
TryBatchResult(orig :: acc.nextBatchAttempt, true, acc.exampleInternalError.orElse(Option(recordResult.errorMessage())))

}
}
else
TryBatchResult(Vector.empty, false, None)
TryBatchResult(Nil, false, None)
}

/**
Expand All @@ -192,7 +181,7 @@ object KinesisSink {
streamName: String,
kinesis: KinesisClient,
records: List[PutRecordsRequestEntry]
): F[Vector[PutRecordsRequestEntry]] =
): F[List[PutRecordsRequestEntry]] =
Logger[F].debug(s"Writing ${records.size} records to ${streamName}") *>
Sync[F]
.blocking(putRecords(kinesis, streamName, records))
Expand All @@ -211,23 +200,23 @@ object KinesisSink {
requestLimits: RequestLimits,
kinesis: KinesisClient,
streamName: String,
records: List[Sinkable]
records: ListOfList[Sinkable]
): F[Unit] = {
val policyForThrottling = Retries.fibonacci[F](throttlingErrorsPolicy)

// First, tryWriteToKinesis - the AWS SDK will handle retries. If there are still failures after that, it will:
// - return messages for retries if we only hit throttliing
// - raise an error if we still have non-throttle failures after the SDK has carried out retries
def runAndCaptureFailures(ref: Ref[F, List[PutRecordsRequestEntry]]): F[List[PutRecordsRequestEntry]] =
def runAndCaptureFailures(ref: Ref[F, ListOfList[PutRecordsRequestEntry]]): F[ListOfList[PutRecordsRequestEntry]] =
for {
records <- ref.get
failures <- group(records, requestLimits.recordLimit, requestLimits.bytesLimit, getRecordSize)
.parTraverse(g => tryWriteToKinesis(streamName, kinesis, g))
flattened = failures.flatten
_ <- ref.set(flattened)
} yield flattened
listOfList = ListOfList.of(failures)
_ <- ref.set(listOfList)
} yield listOfList
for {
ref <- Ref.of[F, List[PutRecordsRequestEntry]](toKinesisRecords(records))
ref <- Ref.of[F, ListOfList[PutRecordsRequestEntry]](toKinesisRecords(records))
failures <- runAndCaptureFailures(ref)
.retryingOnFailures(
policy = policyForThrottling,
Expand Down Expand Up @@ -268,7 +257,7 @@ object KinesisSink {
}

private def failureMessageForThrottling(
records: List[PutRecordsRequestEntry],
records: ListOfList[PutRecordsRequestEntry],
streamName: String
): String =
s"Exceeded Kinesis provisioned throughput: ${records.size} records failed writing to $streamName."
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ import com.google.cloud.pubsub.v1.Publisher
import com.google.protobuf.UnsafeSnowplowOps
import com.google.pubsub.v1.{ProjectTopicName, PubsubMessage}
import com.snowplowanalytics.snowplow.pubsub.FutureInterop
import com.snowplowanalytics.snowplow.sinks.{Sink, Sinkable}
import com.snowplowanalytics.snowplow.sinks.{ListOfList, Sink, Sinkable}
import org.threeten.bp.{Duration => ThreetenDuration}

import scala.jdk.CollectionConverters._
Expand All @@ -31,8 +31,8 @@ object PubsubSink {
Sink(sinkBatch[F](p, _))
}

private def sinkBatch[F[_]: Async](publisher: Publisher, batch: List[Sinkable]): F[Unit] =
Foldable[List]
private def sinkBatch[F[_]: Async](publisher: Publisher, batch: ListOfList[Sinkable]): F[Unit] =
Foldable[ListOfList]
.foldM(batch, List.empty[ApiFuture[String]]) { case (futures, Sinkable(bytes, _, attributes)) =>
for {
uuid <- Async[F].delay(UUID.randomUUID)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,149 @@
/*
* Copyright (c) 2023-present Snowplow Analytics Ltd. All rights reserved.
*
* This program is licensed to you under the Snowplow Community License Version 1.0,
* and you may not use this file except in compliance with the Snowplow Community License Version 1.0.
* You may obtain a copy of the Snowplow Community License Version 1.0 at https://docs.snowplow.io/community-license-1.0
*/
package com.snowplowanalytics.snowplow.sinks

import fs2.Chunk
import cats.{Eval, Foldable, Monad, Monoid}
import scala.collection.compat._

/**
* A data structure that is efficient for most Snowplow streaming apps
*
* This is implemented as a `List[List[A]]`. But the inner Lists are hidden from the developer, so
* to force us into only using efficient methods.
*
* A `ListOfList` has these features:
*
* - **Fast prepend** when building bigger batches from smaller batches e.g.
* `batchesOfEvents.prepend(anotherBatch)`.
* - **Fast folding** e.g. Foldable[ListOfList].foldMap(batches)(event => ???)
*
* It is ideal for situations where:
*
* - We don't care about order of events within a batch
* - We want to minimize how often we copy data structures
* - We don't need fast lookup by index
* - We want to batch up small batches into large batches of events
*
* It is deliberately missing a few features, so to force us into efficient usage patterns:
*
* - No `.size` or `.length` methods. In Snowplow apps we manage batch size by other means.
* - No `.traverse`. Instead we can use:
*
* ```
* Foldable[ListOfList].traverseUnordered(listOfList)(a => IO { ??? })
* ```
*/
class ListOfList[+A](private val value: List[List[A]]) extends AnyVal {

def isEmpty: Boolean = value.forall(_.isEmpty)

/** Fast prepend a batch to the beginning of this ListOfList */
def prepend[B >: A](elems: List[B]): ListOfList[B] =
ListOfList.of(elems :: value)

/**
* Apply a transformation function `f` to every element in the ListOfList
*
* The resulting `ListOfList` does not have the same order as the input List. This is helpful in
* Snowplow apps where order of events within batches is not important.
*/
def mapUnordered[B](f: A => B): ListOfList[B] =
ListOfList.of {
List {
value.foldLeft(List.empty[B]) { case (bs, list) =>
list.foldLeft(bs) { case (bs, a) =>
f(a) :: bs
}
}
}
}

/**
* An `Iterable` which is a lightweight wrapper over the underlying `ListOfList`.
*
* This is efficient because it does not do a copy of the data structure
*/
def asIterable: Iterable[A] =
Iterable.from(value.foldLeft[Iterator[A]](Iterator.empty)(_ ++ _))

/**
* Converts the ListOfList to a `fs2.Chunk`.
*
* This does an inefficient copy of the underlying data, and so should only be used when a 3rd
* party library requires a `Chunk`.
*/
def copyToChunk: Chunk[A] =
Chunk.from(value).flatMap(Chunk.from(_))

/**
* Converts the ListOfList to an IndexedSeq
*
* This does an inefficient copy of the underlying data, and so should only be used when we need
* to fast lookup by index, for a range of indexes.
*/
def copyToIndexedSeq: IndexedSeq[A] =
asIterable.toIndexedSeq
}

object ListOfList {

def ofItems[A](elems: A*): ListOfList[A] =
new ListOfList(List(List(elems: _*)))

def ofLists[A](elems: List[A]*): ListOfList[A] =
new ListOfList(List(elems: _*))

def of[A](value: List[List[A]]): ListOfList[A] =
new ListOfList(value)

val empty: ListOfList[Nothing] = new ListOfList(Nil)

/** Inspired by the cats Foldable instance for List */
implicit def listOfListFoldable: Foldable[ListOfList] = new Foldable[ListOfList] {

override def toIterable[A](fa: ListOfList[A]): Iterable[A] =
fa.asIterable

def foldLeft[A, B](fa: ListOfList[A], b: B)(f: (B, A) => B): B =
fa.value.foldLeft(b) { case (acc, list) =>
list.foldLeft(acc)(f)
}

def foldRight[A, B](fa: ListOfList[A], lb: Eval[B])(f: (A, Eval[B]) => Eval[B]): Eval[B] = {
def loop(as: List[List[A]]): Eval[B] =
as match {
case Nil => lb
case Nil :: rest => loop(rest)
case (h :: t) :: rest => f(h, Eval.defer(loop(t :: rest)))
}
Eval.defer(loop(fa.value))
}

override def foldMap[A, B](fa: ListOfList[A])(f: A => B)(implicit B: Monoid[B]): B =
B.combineAll(toIterable(fa).map(f))

override def foldM[G[_], A, B](fa: ListOfList[A], z: B)(f: (B, A) => G[B])(implicit G: Monad[G]): G[B] = {
def step(in: (List[A], List[List[A]], B)): G[Either[(List[A], List[List[A]], B), B]] =
in match {
case (Nil, Nil, b) => G.pure(Right(b))
case (Nil, h :: t, b) => step((h, t, b))
case (h :: t, rest, b) =>
G.map(f(b, h)) { bnext =>
Left((t, rest, bnext))
}
}

fa.value match {
case Nil => G.pure(z)
case h :: t => G.tailRecM((h, t, z))(step)
}
}

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,17 +17,17 @@ trait Sink[F[_]] {
/**
* Writes a batch of events to the external sink, handling partition keys and message attributes
*/
def sink(batch: List[Sinkable]): F[Unit]
def sink(batch: ListOfList[Sinkable]): F[Unit]

/** Writes a batch of events to the sink using an empty partition key and attributes */
def sinkSimple(batch: List[Array[Byte]]): F[Unit] =
sink(batch.map(Sinkable(_, None, Map.empty)))
def sinkSimple(batch: ListOfList[Array[Byte]]): F[Unit] =
sink(batch.mapUnordered(Sinkable(_, None, Map.empty)))

}

object Sink {

def apply[F[_]](f: List[Sinkable] => F[Unit]): Sink[F] = new Sink[F] {
def sink(batch: List[Sinkable]): F[Unit] = f(batch)
def apply[F[_]](f: ListOfList[Sinkable] => F[Unit]): Sink[F] = new Sink[F] {
def sink(batch: ListOfList[Sinkable]): F[Unit] = f(batch)
}
}
Loading

0 comments on commit 0570c9f

Please sign in to comment.