From 35ab6c7040ae71483bbb6d656a732e7536518707 Mon Sep 17 00:00:00 2001 From: Ian Streeter Date: Mon, 25 Nov 2024 13:39:55 +0000 Subject: [PATCH 1/2] Bump common-streams to 0.9.0 See snowplow-incubator/common-streams#99 for the relevant change This library upgrade brings improvements to the Kinesis source, which should help on vertically larger instances. --- config/config.kinesis.reference.hocon | 11 ++++++ .../snowflake/KinesisConfigSpec.scala | 38 ++++++++++--------- project/Dependencies.scala | 2 +- 3 files changed, 32 insertions(+), 19 deletions(-) diff --git a/config/config.kinesis.reference.hocon b/config/config.kinesis.reference.hocon index c05033a..ba9d9e6 100644 --- a/config/config.kinesis.reference.hocon +++ b/config/config.kinesis.reference.hocon @@ -32,6 +32,17 @@ "maxRecords": 1000 } + # -- Name of this KCL worker used in the dynamodb lease table + "workerIdentifier": ${HOSTNAME} + + # -- Duration of shard leases. KCL workers must periodically refresh leases in the dynamodb table before this duration expires. + "leaseDuration": "10 seconds" + + # -- Controls how to pick the max number of leases to steal at one time. + # -- E.g. If there are 4 available processors, and maxLeasesToStealAtOneTimeFactor = 2.0, then allow the KCL to steal up to 8 leases. + # -- Allows bigger instances to more quickly acquire the shard-leases they need to combat latency + "maxLeasesToStealAtOneTimeFactor": 2.0 + } "output": { diff --git a/modules/kinesis/src/test/scala/com/snowplowanalytics/snowplow/snowflake/KinesisConfigSpec.scala b/modules/kinesis/src/test/scala/com/snowplowanalytics/snowplow/snowflake/KinesisConfigSpec.scala index e666149..af18dc1 100644 --- a/modules/kinesis/src/test/scala/com/snowplowanalytics/snowplow/snowflake/KinesisConfigSpec.scala +++ b/modules/kinesis/src/test/scala/com/snowplowanalytics/snowplow/snowflake/KinesisConfigSpec.scala @@ -61,15 +61,16 @@ class KinesisConfigSpec extends Specification with CatsEffect { object KinesisConfigSpec { private val minimalConfig = Config[KinesisSourceConfig, KinesisSinkConfig]( input = KinesisSourceConfig( - appName = "snowplow-snowflake-loader", - streamName = "snowplow-enriched-events", - workerIdentifier = "testWorkerId", - initialPosition = KinesisSourceConfig.InitialPosition.Latest, - retrievalMode = KinesisSourceConfig.Retrieval.Polling(1000), - customEndpoint = None, - dynamodbCustomEndpoint = None, - cloudwatchCustomEndpoint = None, - leaseDuration = 10.seconds + appName = "snowplow-snowflake-loader", + streamName = "snowplow-enriched-events", + workerIdentifier = "testWorkerId", + initialPosition = KinesisSourceConfig.InitialPosition.Latest, + retrievalMode = KinesisSourceConfig.Retrieval.Polling(1000), + customEndpoint = None, + dynamodbCustomEndpoint = None, + cloudwatchCustomEndpoint = None, + leaseDuration = 10.seconds, + maxLeasesToStealAtOneTimeFactor = BigDecimal(2.0) ), output = Config.Output( good = Config.Snowflake( @@ -138,15 +139,16 @@ object KinesisConfigSpec { */ private val extendedConfig = Config[KinesisSourceConfig, KinesisSinkConfig]( input = KinesisSourceConfig( - appName = "snowplow-snowflake-loader", - streamName = "snowplow-enriched-events", - workerIdentifier = "testWorkerId", - initialPosition = KinesisSourceConfig.InitialPosition.TrimHorizon, - retrievalMode = KinesisSourceConfig.Retrieval.Polling(1000), - customEndpoint = None, - dynamodbCustomEndpoint = None, - cloudwatchCustomEndpoint = None, - leaseDuration = 10.seconds + appName = "snowplow-snowflake-loader", + streamName = "snowplow-enriched-events", + workerIdentifier = "testWorkerId", + initialPosition = KinesisSourceConfig.InitialPosition.TrimHorizon, + retrievalMode = KinesisSourceConfig.Retrieval.Polling(1000), + customEndpoint = None, + dynamodbCustomEndpoint = None, + cloudwatchCustomEndpoint = None, + leaseDuration = 10.seconds, + maxLeasesToStealAtOneTimeFactor = BigDecimal(2.0) ), output = Config.Output( good = Config.Snowflake( diff --git a/project/Dependencies.scala b/project/Dependencies.scala index a8f83c9..b10ead3 100644 --- a/project/Dependencies.scala +++ b/project/Dependencies.scala @@ -35,7 +35,7 @@ object Dependencies { val protobuf = "3.25.5" // Version override // Snowplow - val streams = "0.8.1" + val streams = "0.9.0" // tests val specs2 = "4.20.0" From 7d6e399394866314f56a8776d2b759234f21408b Mon Sep 17 00:00:00 2001 From: Ian Streeter Date: Tue, 26 Nov 2024 13:47:09 +0000 Subject: [PATCH 2/2] Bump snowflake-ingest-sdk to 3.0.0 --- project/Dependencies.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/project/Dependencies.scala b/project/Dependencies.scala index b10ead3..3b133da 100644 --- a/project/Dependencies.scala +++ b/project/Dependencies.scala @@ -24,7 +24,7 @@ object Dependencies { val slf4j = "2.0.7" val azureSdk = "1.9.1" val sentry = "6.25.2" - val snowflake = "2.2.2" + val snowflake = "3.0.0" val jaxb = "2.3.1" val awsSdk2 = "2.25.16" val netty = "4.1.100.Final" // Version override