diff --git a/config/config.aws.reference.hocon b/config/config.aws.reference.hocon index f719ccb2..89bac7e1 100644 --- a/config/config.aws.reference.hocon +++ b/config/config.aws.reference.hocon @@ -34,10 +34,6 @@ "maxRecords": 1000 } - # -- The number of batches of events which are pre-fetched from kinesis. - # -- Increasing this above 1 is not known to improve performance. - "bufferSize": 3 - # -- Name of this KCL worker used in the dynamodb lease table "workerIdentifier": ${HOSTNAME} @@ -219,6 +215,13 @@ # -- Set this to false if you use a query engine that dislikes non-nullable nested fields of a nullable struct. "respectIgluNullability": true + # -- Configuration of internal http client used for iglu resolver, alerts and telemetry + "http": { + "client": { + "maxConnectionsPerServer": 4 + } + } + "monitoring": { "metrics": { diff --git a/config/config.azure.reference.hocon b/config/config.azure.reference.hocon index 41f76fe1..0564a942 100644 --- a/config/config.azure.reference.hocon +++ b/config/config.azure.reference.hocon @@ -190,6 +190,13 @@ # -- Set this to false if you use a query engine that dislikes non-nullable nested fields of a nullable struct. "respectIgluNullability": true + # -- Configuration of internal http client used for iglu resolver, alerts and telemetry + "http": { + "client": { + "maxConnectionsPerServer": 4 + } + } + "monitoring": { "metrics": { diff --git a/config/config.gcp.reference.hocon b/config/config.gcp.reference.hocon index ac99c61b..457558e9 100644 --- a/config/config.gcp.reference.hocon +++ b/config/config.gcp.reference.hocon @@ -198,6 +198,13 @@ # -- Set this to false if you use a query engine that dislikes non-nullable nested fields of a nullable struct. "respectIgluNullability": true + # -- Configuration of internal http client used for iglu resolver, alerts and telemetry + "http": { + "client": { + "maxConnectionsPerServer": 4 + } + } + "monitoring": { "metrics": { diff --git a/modules/core/src/main/resources/reference.conf b/modules/core/src/main/resources/reference.conf index 9444fe9d..635877be 100644 --- a/modules/core/src/main/resources/reference.conf +++ b/modules/core/src/main/resources/reference.conf @@ -137,6 +137,10 @@ } } + "http": { + "client": ${snowplow.defaults.http.client} + } + "skipSchemas": [] "respectIgluNullability": true "exitOnMissingIgluSchema": true diff --git a/modules/core/src/main/scala/com.snowplowanalytics.snowplow.lakes/Config.scala b/modules/core/src/main/scala/com.snowplowanalytics.snowplow.lakes/Config.scala index c85c5d26..7dda8d4a 100644 --- a/modules/core/src/main/scala/com.snowplowanalytics.snowplow.lakes/Config.scala +++ b/modules/core/src/main/scala/com.snowplowanalytics.snowplow.lakes/Config.scala @@ -22,7 +22,7 @@ import scala.concurrent.duration.FiniteDuration import com.snowplowanalytics.iglu.client.resolver.Resolver.ResolverConfig import com.snowplowanalytics.iglu.core.SchemaCriterion -import com.snowplowanalytics.snowplow.runtime.{AcceptedLicense, Metrics => CommonMetrics, Retrying, Telemetry, Webhook} +import com.snowplowanalytics.snowplow.runtime.{AcceptedLicense, HttpClient, Metrics => CommonMetrics, Retrying, Telemetry, Webhook} import com.snowplowanalytics.iglu.core.circe.CirceIgluCodecs.schemaCriterionDecoder import com.snowplowanalytics.snowplow.runtime.HealthProbe.decoders._ @@ -40,7 +40,8 @@ case class Config[+Source, +Sink]( skipSchemas: List[SchemaCriterion], respectIgluNullability: Boolean, exitOnMissingIgluSchema: Boolean, - retries: Config.Retries + retries: Config.Retries, + http: Config.Http ) object Config { @@ -129,6 +130,8 @@ object Config { transientErrors: Retrying.Config.ForTransient ) + case class Http(client: HttpClient.Config) + implicit def decoder[Source: Decoder, Sink: Decoder]: Decoder[Config[Source, Sink]] = { implicit val configuration = Configuration.default.withDiscriminator("type") implicit val sinkWithMaxSize = for { @@ -151,6 +154,7 @@ object Config { implicit val healthProbeDecoder = deriveConfiguredDecoder[HealthProbe] implicit val monitoringDecoder = deriveConfiguredDecoder[Monitoring] implicit val retriesDecoder = deriveConfiguredDecoder[Retries] + implicit val httpDecoder = deriveConfiguredDecoder[Http] // TODO add specific lake-loader docs for license implicit val licenseDecoder = diff --git a/modules/core/src/main/scala/com.snowplowanalytics.snowplow.lakes/Environment.scala b/modules/core/src/main/scala/com.snowplowanalytics.snowplow.lakes/Environment.scala index 71ed501d..1f553d33 100644 --- a/modules/core/src/main/scala/com.snowplowanalytics.snowplow.lakes/Environment.scala +++ b/modules/core/src/main/scala/com.snowplowanalytics.snowplow.lakes/Environment.scala @@ -12,9 +12,7 @@ package com.snowplowanalytics.snowplow.lakes import cats.implicits._ import cats.effect.{Async, Resource, Sync} -import cats.effect.unsafe.implicits.global import org.http4s.client.Client -import org.http4s.blaze.client.BlazeClientBuilder import io.sentry.Sentry import com.snowplowanalytics.iglu.client.resolver.Resolver @@ -22,7 +20,7 @@ import com.snowplowanalytics.iglu.core.SchemaCriterion import com.snowplowanalytics.snowplow.sources.{EventProcessingConfig, SourceAndAck} import com.snowplowanalytics.snowplow.sinks.Sink import com.snowplowanalytics.snowplow.lakes.processing.LakeWriter -import com.snowplowanalytics.snowplow.runtime.{AppHealth, AppInfo, HealthProbe, Webhook} +import com.snowplowanalytics.snowplow.runtime.{AppHealth, AppInfo, HealthProbe, HttpClient, Webhook} /** * Resources and runtime-derived configuration needed for processing events @@ -72,7 +70,7 @@ object Environment { sourceReporter = sourceAndAck.isHealthy(config.main.monitoring.healthProbe.unhealthyLatency).map(_.showIfUnhealthy) appHealth <- Resource.eval(AppHealth.init[F, Alert, RuntimeService](List(sourceReporter))) resolver <- mkResolver[F](config.iglu) - httpClient <- BlazeClientBuilder[F].withExecutionContext(global.compute).resource + httpClient <- HttpClient.resource[F](config.main.http.client) _ <- HealthProbe.resource(config.main.monitoring.healthProbe.port, appHealth) _ <- Webhook.resource(config.main.monitoring.webhook, appInfo, httpClient, appHealth) badSink <- diff --git a/project/Dependencies.scala b/project/Dependencies.scala index 123a0913..c7502404 100644 --- a/project/Dependencies.scala +++ b/project/Dependencies.scala @@ -50,8 +50,8 @@ object Dependencies { val awsRegistry = "1.1.20" // Snowplow - val streams = "0.8.0-M2" - val igluClient = "3.0.0" + val streams = "0.8.0-M4" + val igluClient = "3.2.0" // Transitive overrides val protobuf = "3.25.1" @@ -71,7 +71,6 @@ object Dependencies { } - val blazeClient = "org.http4s" %% "http4s-blaze-client" % V.http4s val decline = "com.monovore" %% "decline-effect" % V.decline val circeGenericExtra = "io.circe" %% "circe-generic-extras" % V.circe val betterMonadicFor = "com.olegpy" %% "better-monadic-for" % V.betterMonadicFor @@ -153,7 +152,6 @@ object Dependencies { Spark.sqlForIcebergDelta % Provided, iceberg % Provided, igluClientHttp4s, - blazeClient, decline, sentry, circeGenericExtra, @@ -170,7 +168,7 @@ object Dependencies { awsS3, awsGlue, awsS3Transfer % Runtime, - awsSts % Runtime, + awsSts, hadoopClient ) ++ commonRuntimeDependencies