Skip to content

Commit

Permalink
Final upgrades for 0.3.0 (#60)
Browse files Browse the repository at this point in the history
* Bump common-streams to 0.9.0

See snowplow-incubator/common-streams#99 for the relevant change

This library upgrade brings improvements to the Kinesis source, which
should help on vertically larger instances.

* Bump snowflake-ingest-sdk to 3.0.0
  • Loading branch information
istreeter authored Nov 26, 2024
1 parent 4a8e223 commit 8c92a37
Show file tree
Hide file tree
Showing 3 changed files with 33 additions and 20 deletions.
11 changes: 11 additions & 0 deletions config/config.kinesis.reference.hocon
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,17 @@
"maxRecords": 1000
}

# -- Name of this KCL worker used in the dynamodb lease table
"workerIdentifier": ${HOSTNAME}

# -- Duration of shard leases. KCL workers must periodically refresh leases in the dynamodb table before this duration expires.
"leaseDuration": "10 seconds"

# -- Controls how to pick the max number of leases to steal at one time.
# -- E.g. If there are 4 available processors, and maxLeasesToStealAtOneTimeFactor = 2.0, then allow the KCL to steal up to 8 leases.
# -- Allows bigger instances to more quickly acquire the shard-leases they need to combat latency
"maxLeasesToStealAtOneTimeFactor": 2.0

}

"output": {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,15 +61,16 @@ class KinesisConfigSpec extends Specification with CatsEffect {
object KinesisConfigSpec {
private val minimalConfig = Config[KinesisSourceConfig, KinesisSinkConfig](
input = KinesisSourceConfig(
appName = "snowplow-snowflake-loader",
streamName = "snowplow-enriched-events",
workerIdentifier = "testWorkerId",
initialPosition = KinesisSourceConfig.InitialPosition.Latest,
retrievalMode = KinesisSourceConfig.Retrieval.Polling(1000),
customEndpoint = None,
dynamodbCustomEndpoint = None,
cloudwatchCustomEndpoint = None,
leaseDuration = 10.seconds
appName = "snowplow-snowflake-loader",
streamName = "snowplow-enriched-events",
workerIdentifier = "testWorkerId",
initialPosition = KinesisSourceConfig.InitialPosition.Latest,
retrievalMode = KinesisSourceConfig.Retrieval.Polling(1000),
customEndpoint = None,
dynamodbCustomEndpoint = None,
cloudwatchCustomEndpoint = None,
leaseDuration = 10.seconds,
maxLeasesToStealAtOneTimeFactor = BigDecimal(2.0)
),
output = Config.Output(
good = Config.Snowflake(
Expand Down Expand Up @@ -138,15 +139,16 @@ object KinesisConfigSpec {
*/
private val extendedConfig = Config[KinesisSourceConfig, KinesisSinkConfig](
input = KinesisSourceConfig(
appName = "snowplow-snowflake-loader",
streamName = "snowplow-enriched-events",
workerIdentifier = "testWorkerId",
initialPosition = KinesisSourceConfig.InitialPosition.TrimHorizon,
retrievalMode = KinesisSourceConfig.Retrieval.Polling(1000),
customEndpoint = None,
dynamodbCustomEndpoint = None,
cloudwatchCustomEndpoint = None,
leaseDuration = 10.seconds
appName = "snowplow-snowflake-loader",
streamName = "snowplow-enriched-events",
workerIdentifier = "testWorkerId",
initialPosition = KinesisSourceConfig.InitialPosition.TrimHorizon,
retrievalMode = KinesisSourceConfig.Retrieval.Polling(1000),
customEndpoint = None,
dynamodbCustomEndpoint = None,
cloudwatchCustomEndpoint = None,
leaseDuration = 10.seconds,
maxLeasesToStealAtOneTimeFactor = BigDecimal(2.0)
),
output = Config.Output(
good = Config.Snowflake(
Expand Down
4 changes: 2 additions & 2 deletions project/Dependencies.scala
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ object Dependencies {
val slf4j = "2.0.7"
val azureSdk = "1.9.1"
val sentry = "6.25.2"
val snowflake = "2.2.2"
val snowflake = "3.0.0"
val jaxb = "2.3.1"
val awsSdk2 = "2.25.16"
val netty = "4.1.100.Final" // Version override
Expand All @@ -35,7 +35,7 @@ object Dependencies {
val protobuf = "3.25.5" // Version override

// Snowplow
val streams = "0.8.1"
val streams = "0.9.0"

// tests
val specs2 = "4.20.0"
Expand Down

0 comments on commit 8c92a37

Please sign in to comment.