diff --git a/config/config.aws.reference.hocon b/config/config.aws.reference.hocon index 92a523be..88d4e481 100644 --- a/config/config.aws.reference.hocon +++ b/config/config.aws.reference.hocon @@ -127,6 +127,12 @@ # "icebergTableProperties": { # "write.metadata.metrics.column.event_id": "count" # } +# +# # -- Any valid Iceberg write option +# # -- This can be blank in most setups because the loader already sets sensible defaults. +# "icebergWriteOptions": { +# "write-format": "parquet" +# } # } "bad": { @@ -181,11 +187,6 @@ # -- E.g. to change credentials provider "fs.s3a.aws.credentials.provider": "com.amazonaws.auth.InstanceProfileCredentialsProvider" } - - # -- Controls how many spark tasks run in parallel during writing the events to cloud storage. - # -- E.g. If there are 8 available processors, and cpuParallelismFraction = 0.5, then we have 4 spark tasks for writing. - # -- The default value is known to work well. Changing this setting might affect memory usage, file sizes, and/or latency. - "writerParallelismFraction": 0.5 } # Retry configuration for lake operation failures diff --git a/config/config.azure.reference.hocon b/config/config.azure.reference.hocon index 14b35900..578afc53 100644 --- a/config/config.azure.reference.hocon +++ b/config/config.azure.reference.hocon @@ -94,6 +94,12 @@ # "icebergTableProperties": { # "write.metadata.metrics.column.event_id": "count" # } +# +# # -- Any valid Iceberg write option +# # -- This can be blank in most setups because the loader already sets sensible defaults. +# "icebergWriteOptions": { +# "write-format": "parquet" +# } # } "bad": { @@ -145,11 +151,6 @@ # -- E.g. to enable the spark ui for debugging: "spark.ui.enabled": true } - - # -- Controls how many spark tasks run in parallel during writing the events to cloud storage. - # -- E.g. If there are 8 available processors, and cpuParallelismFraction = 0.5, then we have 4 spark tasks for writing. - # -- The default value is known to work well. Changing this setting might affect memory usage, file sizes, and/or latency. - "writerParallelismFraction": 0.5 } # Retry configuration for lake operation failures diff --git a/config/config.gcp.reference.hocon b/config/config.gcp.reference.hocon index 439674ec..3ab98bf7 100644 --- a/config/config.gcp.reference.hocon +++ b/config/config.gcp.reference.hocon @@ -116,6 +116,12 @@ # "icebergTableProperties": { # "write.metadata.metrics.column.event_id": "count" # } +# +# # -- Any valid Iceberg write option +# # -- This can be blank in most setups because the loader already sets sensible defaults. +# "icebergWriteOptions": { +# "write-format": "parquet" +# } # } "bad": { @@ -160,11 +166,6 @@ # -- E.g. to enable the spark ui for debugging: "spark.ui.enabled": true } - - # -- Controls how many spark tasks run in parallel during writing the events to cloud storage. - # -- E.g. If there are 8 available processors, and cpuParallelismFraction = 0.5, then we have 4 spark tasks for writing. - # -- The default value is known to work well. Changing this setting might affect memory usage, file sizes, and/or latency. - "writerParallelismFraction": 0.5 } # Retry configuration for lake operation failures diff --git a/modules/core/src/main/resources/fairscheduler.xml b/modules/core/src/main/resources/fairscheduler.xml new file mode 100644 index 00000000..3247490d --- /dev/null +++ b/modules/core/src/main/resources/fairscheduler.xml @@ -0,0 +1,8 @@ + + + + FIFO + 1000 + 1 + + diff --git a/modules/core/src/main/resources/reference.conf b/modules/core/src/main/resources/reference.conf index 02f29ad7..37395bf9 100644 --- a/modules/core/src/main/resources/reference.conf +++ b/modules/core/src/main/resources/reference.conf @@ -41,6 +41,12 @@ "write.metadata.metrics.column.true_tstamp": "full" } + "icebergWriteOptions": { + "merge-schema": "true" + "check-ordering": "false" + "distribution-mode": "none" + } + "hudiTableProperties": { "hoodie.table.name": "events" "hoodie.table.keygenerator.class": "org.apache.hudi.keygen.TimestampBasedKeyGenerator" @@ -121,9 +127,10 @@ "spark.sql.parquet.datetimeRebaseModeInWrite": "CORRECTED" "spark.memory.storageFraction": "0" "spark.databricks.delta.autoCompact.enabled": "false" + "spark.scheduler.mode": "FAIR" + "spark.sql.adaptive.enabled": "false" # False gives better performance on the type of shuffle done by Lake Loader } "gcpUserAgent": ${gcpUserAgent} - "writerParallelismFraction": 0.5 } "retries": { diff --git a/modules/core/src/main/scala/com.snowplowanalytics.snowplow.lakes/Config.scala b/modules/core/src/main/scala/com.snowplowanalytics.snowplow.lakes/Config.scala index da4a9f3d..0ea62b26 100644 --- a/modules/core/src/main/scala/com.snowplowanalytics.snowplow.lakes/Config.scala +++ b/modules/core/src/main/scala/com.snowplowanalytics.snowplow.lakes/Config.scala @@ -74,7 +74,8 @@ object Config { table: String, catalog: IcebergCatalog, location: URI, - icebergTableProperties: Map[String, String] + icebergTableProperties: Map[String, String], + icebergWriteOptions: Map[String, String] ) extends Target sealed trait IcebergCatalog @@ -100,8 +101,7 @@ object Config { case class Spark( taskRetries: Int, conf: Map[String, String], - gcpUserAgent: GcpUserAgent, - writerParallelismFraction: BigDecimal + gcpUserAgent: GcpUserAgent ) case class Metrics( diff --git a/modules/core/src/main/scala/com.snowplowanalytics.snowplow.lakes/processing/LakeWriter.scala b/modules/core/src/main/scala/com.snowplowanalytics.snowplow.lakes/processing/LakeWriter.scala index 7bfb796b..3d63488d 100644 --- a/modules/core/src/main/scala/com.snowplowanalytics.snowplow.lakes/processing/LakeWriter.scala +++ b/modules/core/src/main/scala/com.snowplowanalytics.snowplow.lakes/processing/LakeWriter.scala @@ -81,10 +81,9 @@ object LakeWriter { } for { session <- SparkUtils.session[F](config, w, target.location) - writerParallelism = chooseWriterParallelism(config) - mutex1 <- Resource.eval(Mutex[F]) - mutex2 <- Resource.eval(Mutex[F]) - } yield impl(session, w, writerParallelism, mutex1, mutex2) + writerParallelism = chooseWriterParallelism() + mutex <- Resource.eval(Mutex[F]) + } yield impl(session, w, writerParallelism, mutex) } def withHandledErrors[F[_]: Async]( @@ -129,22 +128,14 @@ object LakeWriter { /** * Implementation of the LakeWriter * - * The mutexes are needed because we allow overlapping windows. They prevent two different windows + * The mutex is needed because we allow overlapping windows. They prevent two different windows * from trying to run the same expensive operation at the same time. - * - * @param mutextForWriting - * Makes sure there is only ever one spark job trying to write events to the lake. This is a - * IO-intensive task. - * @param mutexForUnioning - * Makes sure there is only ever one spark job trying to union smaller DataFrames into a larger - * DataFrame, immediately before writing to the lake. This is a cpu-intensive task. */ private def impl[F[_]: Sync]( spark: SparkSession, w: Writer, writerParallelism: Int, - mutexForWriting: Mutex[F], - mutexForUnioning: Mutex[F] + mutex: Mutex[F] ): LakeWriter[F] = new LakeWriter[F] { def createTable: F[Unit] = w.prepareTable(spark) @@ -164,10 +155,8 @@ object LakeWriter { def commit(viewName: String): F[Unit] = for { - df <- mutexForUnioning.lock.surround { - SparkUtils.prepareFinalDataFrame(spark, viewName, writerParallelism) - } - _ <- mutexForWriting.lock + df <- SparkUtils.prepareFinalDataFrame(spark, viewName, writerParallelism) + _ <- mutex.lock .surround { w.write(df) } @@ -175,14 +164,12 @@ object LakeWriter { } /** - * Converts `writerParallelismFraction` into a suggested number of threads + * Allow spark to parallelize over _most_ of the available processors for writing to the lake, + * because this speeds up how quickly we can sink a batch. * - * For bigger instances (more cores) we want more parallelism in the writer. This avoids a - * situation where writing tasks exceed the length of a window, which causes an unbalanced use of - * cpu. + * But leave 1 processor always available, so that we are never blocked when trying to save one of + * the intermediate dataframes. */ - private def chooseWriterParallelism(config: Config.Spark): Int = - (Runtime.getRuntime.availableProcessors * config.writerParallelismFraction) - .setScale(0, BigDecimal.RoundingMode.UP) - .toInt + private def chooseWriterParallelism(): Int = + (Runtime.getRuntime.availableProcessors - 1).max(1) } diff --git a/modules/core/src/main/scala/com.snowplowanalytics.snowplow.lakes/processing/Processing.scala b/modules/core/src/main/scala/com.snowplowanalytics.snowplow.lakes/processing/Processing.scala index c38422c4..1567dc6c 100644 --- a/modules/core/src/main/scala/com.snowplowanalytics.snowplow.lakes/processing/Processing.scala +++ b/modules/core/src/main/scala/com.snowplowanalytics.snowplow.lakes/processing/Processing.scala @@ -81,11 +81,6 @@ object Processing { earliestCollectorTstamp: Option[Instant] ) - private case class Transformed( - events: List[Row], - schema: StructType - ) - private def eventProcessor[F[_]: Async: RegistryLookup]( env: Environment[F], deferredTableExists: F[Unit] @@ -126,13 +121,12 @@ object Processing { .through(handleParseFailures(env, badProcessor)) .through(BatchUp.noTimeout(env.inMemBatchBytes)) .through(transformBatch(env, badProcessor, ref)) - .through(sinkTransformedBatch(env, ref)) private def transformBatch[F[_]: RegistryLookup: Async]( env: Environment[F], badProcessor: BadRowProcessor, ref: Ref[F, WindowState] - ): Pipe[F, Batched, Transformed] = + ): Pipe[F, Batched, Nothing] = _.parEvalMapUnordered(env.cpuParallelism) { case Batched(events, entities, _, earliestCollectorTstamp) => for { _ <- Logger[F].debug(s"Processing batch of size ${events.size}") @@ -141,30 +135,29 @@ object Processing { _ <- rememberColumnNames(ref, nonAtomicFields.fields) (bad, rows) <- transformToSpark[F](badProcessor, events, nonAtomicFields) _ <- sendFailedEvents(env, badProcessor, bad) - _ <- ref.update { s => - val updatedCollectorTstamp = chooseEarliestTstamp(earliestCollectorTstamp, s.earliestCollectorTstamp) - s.copy(numEvents = s.numEvents + rows.size, earliestCollectorTstamp = updatedCollectorTstamp) - } - } yield Transformed(rows, SparkSchema.forBatch(nonAtomicFields.fields, env.respectIgluNullability)) - } + windowState <- ref.updateAndGet { s => + val updatedCollectorTstamp = chooseEarliestTstamp(earliestCollectorTstamp, s.earliestCollectorTstamp) + s.copy(numEvents = s.numEvents + rows.size, earliestCollectorTstamp = updatedCollectorTstamp) + } + _ <- sinkTransformedBatch(env, windowState, rows, SparkSchema.forBatch(nonAtomicFields.fields, env.respectIgluNullability)) + } yield () + }.drain private def sinkTransformedBatch[F[_]: Sync]( env: Environment[F], - ref: Ref[F, WindowState] - ): Pipe[F, Transformed, Nothing] = - _.evalMap { case Transformed(rows, schema) => - NonEmptyList.fromList(rows) match { - case Some(nel) => - for { - windowState <- ref.get - _ <- env.lakeWriter.localAppendRows(windowState.viewName, nel, schema) - _ <- Logger[F].debug(s"Finished processing batch of size ${rows.size}") - } yield () - case None => - Logger[F].debug(s"An in-memory batch yielded zero good events. Nothing will be saved to local disk.") - } - - }.drain + windowState: WindowState, + rows: List[Row], + schema: StructType + ): F[Unit] = + NonEmptyList.fromList(rows) match { + case Some(nel) => + for { + _ <- env.lakeWriter.localAppendRows(windowState.viewName, nel, schema) + _ <- Logger[F].debug(s"Finished processing batch of size ${rows.size}") + } yield () + case None => + Logger[F].debug(s"An in-memory batch yielded zero good events. Nothing will be saved to local disk.") + } private def rememberTokens[F[_]: Functor](ref: Ref[F, WindowState]): Pipe[F, TokenedEvents, Chunk[ByteBuffer]] = _.evalMap { case TokenedEvents(events, token) => diff --git a/modules/core/src/main/scala/com.snowplowanalytics.snowplow.lakes/processing/SparkUtils.scala b/modules/core/src/main/scala/com.snowplowanalytics.snowplow.lakes/processing/SparkUtils.scala index 52321b82..e362ba64 100644 --- a/modules/core/src/main/scala/com.snowplowanalytics.snowplow.lakes/processing/SparkUtils.scala +++ b/modules/core/src/main/scala/com.snowplowanalytics.snowplow.lakes/processing/SparkUtils.scala @@ -18,7 +18,7 @@ import org.typelevel.log4cats.Logger import org.typelevel.log4cats.slf4j.Slf4jLogger import org.apache.spark.sql.{DataFrame, Row, SparkSession} -import org.apache.spark.sql.functions.current_timestamp +import org.apache.spark.sql.functions.{col, current_timestamp} import org.apache.spark.sql.types.StructType import com.snowplowanalytics.snowplow.lakes.Config @@ -70,7 +70,11 @@ private[processing] object SparkUtils { for { _ <- Logger[F].debug(s"Initializing local DataFrame with name $viewName") _ <- Sync[F].blocking { - spark.emptyDataFrame.createTempView(viewName) + try { + spark.sparkContext.setLocalProperty("spark.scheduler.pool", "pool1") + spark.emptyDataFrame.createTempView(viewName) + } finally + spark.sparkContext.setLocalProperty("spark.scheduler.pool", null) } } yield () @@ -83,12 +87,16 @@ private[processing] object SparkUtils { for { _ <- Logger[F].debug(s"Saving batch of ${rows.size} events to local DataFrame $viewName") _ <- Sync[F].blocking { - spark - .createDataFrame(rows.toList.asJava, schema) - .coalesce(1) - .localCheckpoint() - .unionByName(spark.table(viewName), allowMissingColumns = true) - .createOrReplaceTempView(viewName) + try { + spark.sparkContext.setLocalProperty("spark.scheduler.pool", "pool1") + spark + .createDataFrame(rows.toList.asJava, schema) + .coalesce(1) + .localCheckpoint() + .unionByName(spark.table(viewName), allowMissingColumns = true) + .createOrReplaceTempView(viewName) + } finally + spark.sparkContext.setLocalProperty("spark.scheduler.pool", null) } } yield () @@ -97,17 +105,21 @@ private[processing] object SparkUtils { viewName: String, writerParallelism: Int ): F[DataFrame] = - Sync[F].blocking { + Sync[F].delay { spark .table(viewName) + .repartitionByRange(writerParallelism, col("event_name"), col("event_id")) + .sortWithinPartitions("event_name") .withColumn("load_tstamp", current_timestamp()) - .coalesce(writerParallelism) - .localCheckpoint() } def dropView[F[_]: Sync](spark: SparkSession, viewName: String): F[Unit] = Logger[F].info(s"Removing Spark data frame $viewName from local disk...") >> Sync[F].blocking { - spark.catalog.dropTempView(viewName) + try { + spark.sparkContext.setLocalProperty("spark.scheduler.pool", "pool1") + spark.catalog.dropTempView(viewName) + } finally + spark.sparkContext.setLocalProperty("spark.scheduler.pool", null) }.void } diff --git a/modules/core/src/main/scala/com.snowplowanalytics.snowplow.lakes/tables/IcebergWriter.scala b/modules/core/src/main/scala/com.snowplowanalytics.snowplow.lakes/tables/IcebergWriter.scala index ea8a4cd2..066a71a1 100644 --- a/modules/core/src/main/scala/com.snowplowanalytics.snowplow.lakes/tables/IcebergWriter.scala +++ b/modules/core/src/main/scala/com.snowplowanalytics.snowplow.lakes/tables/IcebergWriter.scala @@ -61,8 +61,7 @@ class IcebergWriter(config: Config.Iceberg) extends Writer { df.write .format("iceberg") .mode("append") - .option("merge-schema", true) - .option("check-ordering", false) + .options(config.icebergWriteOptions) .saveAsTable(fqTable) } diff --git a/project/Dependencies.scala b/project/Dependencies.scala index a25ec2cf..c088c981 100644 --- a/project/Dependencies.scala +++ b/project/Dependencies.scala @@ -15,12 +15,12 @@ object Dependencies { object Spark { // A version of Spark which is compatible with the current version of Iceberg and Delta - val forIcebergDelta = "3.5.3" + val forIcebergDelta = "3.5.4" val forIcebergDeltaMinor = "3.5" // Hudi can use a different version of Spark because we bundle a separate Docker image // This version of Spark must be compatible with the current version of Hudi - val forHudi = "3.5.3" + val forHudi = "3.5.4" val forHudiMinor = "3.5" } @@ -35,7 +35,7 @@ object Dependencies { val delta = "3.2.1" val hudi = "0.15.0" val hudiAws = "1.0.0-beta2" - val iceberg = "1.6.1" + val iceberg = "1.7.1" val hadoop = "3.4.1" val gcsConnector = "hadoop3-2.2.25" val hive = "3.1.3"