From c5ea7d7927f47db1ab930ee52a453ee279313f98 Mon Sep 17 00:00:00 2001 From: Ian Streeter Date: Mon, 30 Dec 2024 07:30:51 +0000 Subject: [PATCH] Repartition by event name before writing to the lake Previously, our Iceberg writer was using the [hash write distribution mode][1] because that is the default for Iceberg. In this mode, Spark repartitions by the dataframe immediately before writing to the lake. After this commit, we explicitly repartition the dataframe as part of the existing spark task for preparing the final dataframe. This means we can change the Iceberg write distribution mode to `none`. We partition by the combination `event_name + event_id`: the former because it matches the lake partitioning, and the latter because it adds salt ensures equally sized partitions. Overall this seems to improve the time taken to write a window of events to Iceberg. This fixes a problem we found, in which the write phase could get too slow when under high load (Iceberg only): specifically, a write was taking longer than the loader's "window" and this caused periods of low cpu usage, where the loader's processing phase was waiting for the write phase to catch up. Note: this improvement will not help Snowplow users who have changed the parition key to something different to our default. We might want to make a follow-up change, in which it auto-discovers the lake's partition key. For example, some users might want to partition by `app_id` instead of `event_name`. [1]: https://iceberg.apache.org/docs/1.7.1/spark-writes/#writing-distribution-modes --- config/config.aws.reference.hocon | 11 +++-- config/config.azure.reference.hocon | 11 +++-- config/config.gcp.reference.hocon | 11 +++-- .../core/src/main/resources/fairscheduler.xml | 8 +++ .../core/src/main/resources/reference.conf | 9 +++- .../Config.scala | 6 +-- .../processing/LakeWriter.scala | 39 +++++---------- .../processing/Processing.scala | 49 ++++++++----------- .../processing/SparkUtils.scala | 36 +++++++++----- .../tables/IcebergWriter.scala | 3 +- project/Dependencies.scala | 6 +-- 11 files changed, 99 insertions(+), 90 deletions(-) create mode 100644 modules/core/src/main/resources/fairscheduler.xml diff --git a/config/config.aws.reference.hocon b/config/config.aws.reference.hocon index 92a523be..88d4e481 100644 --- a/config/config.aws.reference.hocon +++ b/config/config.aws.reference.hocon @@ -127,6 +127,12 @@ # "icebergTableProperties": { # "write.metadata.metrics.column.event_id": "count" # } +# +# # -- Any valid Iceberg write option +# # -- This can be blank in most setups because the loader already sets sensible defaults. +# "icebergWriteOptions": { +# "write-format": "parquet" +# } # } "bad": { @@ -181,11 +187,6 @@ # -- E.g. to change credentials provider "fs.s3a.aws.credentials.provider": "com.amazonaws.auth.InstanceProfileCredentialsProvider" } - - # -- Controls how many spark tasks run in parallel during writing the events to cloud storage. - # -- E.g. If there are 8 available processors, and cpuParallelismFraction = 0.5, then we have 4 spark tasks for writing. - # -- The default value is known to work well. Changing this setting might affect memory usage, file sizes, and/or latency. - "writerParallelismFraction": 0.5 } # Retry configuration for lake operation failures diff --git a/config/config.azure.reference.hocon b/config/config.azure.reference.hocon index 14b35900..578afc53 100644 --- a/config/config.azure.reference.hocon +++ b/config/config.azure.reference.hocon @@ -94,6 +94,12 @@ # "icebergTableProperties": { # "write.metadata.metrics.column.event_id": "count" # } +# +# # -- Any valid Iceberg write option +# # -- This can be blank in most setups because the loader already sets sensible defaults. +# "icebergWriteOptions": { +# "write-format": "parquet" +# } # } "bad": { @@ -145,11 +151,6 @@ # -- E.g. to enable the spark ui for debugging: "spark.ui.enabled": true } - - # -- Controls how many spark tasks run in parallel during writing the events to cloud storage. - # -- E.g. If there are 8 available processors, and cpuParallelismFraction = 0.5, then we have 4 spark tasks for writing. - # -- The default value is known to work well. Changing this setting might affect memory usage, file sizes, and/or latency. - "writerParallelismFraction": 0.5 } # Retry configuration for lake operation failures diff --git a/config/config.gcp.reference.hocon b/config/config.gcp.reference.hocon index 439674ec..3ab98bf7 100644 --- a/config/config.gcp.reference.hocon +++ b/config/config.gcp.reference.hocon @@ -116,6 +116,12 @@ # "icebergTableProperties": { # "write.metadata.metrics.column.event_id": "count" # } +# +# # -- Any valid Iceberg write option +# # -- This can be blank in most setups because the loader already sets sensible defaults. +# "icebergWriteOptions": { +# "write-format": "parquet" +# } # } "bad": { @@ -160,11 +166,6 @@ # -- E.g. to enable the spark ui for debugging: "spark.ui.enabled": true } - - # -- Controls how many spark tasks run in parallel during writing the events to cloud storage. - # -- E.g. If there are 8 available processors, and cpuParallelismFraction = 0.5, then we have 4 spark tasks for writing. - # -- The default value is known to work well. Changing this setting might affect memory usage, file sizes, and/or latency. - "writerParallelismFraction": 0.5 } # Retry configuration for lake operation failures diff --git a/modules/core/src/main/resources/fairscheduler.xml b/modules/core/src/main/resources/fairscheduler.xml new file mode 100644 index 00000000..3247490d --- /dev/null +++ b/modules/core/src/main/resources/fairscheduler.xml @@ -0,0 +1,8 @@ + + + + FIFO + 1000 + 1 + + diff --git a/modules/core/src/main/resources/reference.conf b/modules/core/src/main/resources/reference.conf index 02f29ad7..37395bf9 100644 --- a/modules/core/src/main/resources/reference.conf +++ b/modules/core/src/main/resources/reference.conf @@ -41,6 +41,12 @@ "write.metadata.metrics.column.true_tstamp": "full" } + "icebergWriteOptions": { + "merge-schema": "true" + "check-ordering": "false" + "distribution-mode": "none" + } + "hudiTableProperties": { "hoodie.table.name": "events" "hoodie.table.keygenerator.class": "org.apache.hudi.keygen.TimestampBasedKeyGenerator" @@ -121,9 +127,10 @@ "spark.sql.parquet.datetimeRebaseModeInWrite": "CORRECTED" "spark.memory.storageFraction": "0" "spark.databricks.delta.autoCompact.enabled": "false" + "spark.scheduler.mode": "FAIR" + "spark.sql.adaptive.enabled": "false" # False gives better performance on the type of shuffle done by Lake Loader } "gcpUserAgent": ${gcpUserAgent} - "writerParallelismFraction": 0.5 } "retries": { diff --git a/modules/core/src/main/scala/com.snowplowanalytics.snowplow.lakes/Config.scala b/modules/core/src/main/scala/com.snowplowanalytics.snowplow.lakes/Config.scala index da4a9f3d..0ea62b26 100644 --- a/modules/core/src/main/scala/com.snowplowanalytics.snowplow.lakes/Config.scala +++ b/modules/core/src/main/scala/com.snowplowanalytics.snowplow.lakes/Config.scala @@ -74,7 +74,8 @@ object Config { table: String, catalog: IcebergCatalog, location: URI, - icebergTableProperties: Map[String, String] + icebergTableProperties: Map[String, String], + icebergWriteOptions: Map[String, String] ) extends Target sealed trait IcebergCatalog @@ -100,8 +101,7 @@ object Config { case class Spark( taskRetries: Int, conf: Map[String, String], - gcpUserAgent: GcpUserAgent, - writerParallelismFraction: BigDecimal + gcpUserAgent: GcpUserAgent ) case class Metrics( diff --git a/modules/core/src/main/scala/com.snowplowanalytics.snowplow.lakes/processing/LakeWriter.scala b/modules/core/src/main/scala/com.snowplowanalytics.snowplow.lakes/processing/LakeWriter.scala index 7bfb796b..3d63488d 100644 --- a/modules/core/src/main/scala/com.snowplowanalytics.snowplow.lakes/processing/LakeWriter.scala +++ b/modules/core/src/main/scala/com.snowplowanalytics.snowplow.lakes/processing/LakeWriter.scala @@ -81,10 +81,9 @@ object LakeWriter { } for { session <- SparkUtils.session[F](config, w, target.location) - writerParallelism = chooseWriterParallelism(config) - mutex1 <- Resource.eval(Mutex[F]) - mutex2 <- Resource.eval(Mutex[F]) - } yield impl(session, w, writerParallelism, mutex1, mutex2) + writerParallelism = chooseWriterParallelism() + mutex <- Resource.eval(Mutex[F]) + } yield impl(session, w, writerParallelism, mutex) } def withHandledErrors[F[_]: Async]( @@ -129,22 +128,14 @@ object LakeWriter { /** * Implementation of the LakeWriter * - * The mutexes are needed because we allow overlapping windows. They prevent two different windows + * The mutex is needed because we allow overlapping windows. They prevent two different windows * from trying to run the same expensive operation at the same time. - * - * @param mutextForWriting - * Makes sure there is only ever one spark job trying to write events to the lake. This is a - * IO-intensive task. - * @param mutexForUnioning - * Makes sure there is only ever one spark job trying to union smaller DataFrames into a larger - * DataFrame, immediately before writing to the lake. This is a cpu-intensive task. */ private def impl[F[_]: Sync]( spark: SparkSession, w: Writer, writerParallelism: Int, - mutexForWriting: Mutex[F], - mutexForUnioning: Mutex[F] + mutex: Mutex[F] ): LakeWriter[F] = new LakeWriter[F] { def createTable: F[Unit] = w.prepareTable(spark) @@ -164,10 +155,8 @@ object LakeWriter { def commit(viewName: String): F[Unit] = for { - df <- mutexForUnioning.lock.surround { - SparkUtils.prepareFinalDataFrame(spark, viewName, writerParallelism) - } - _ <- mutexForWriting.lock + df <- SparkUtils.prepareFinalDataFrame(spark, viewName, writerParallelism) + _ <- mutex.lock .surround { w.write(df) } @@ -175,14 +164,12 @@ object LakeWriter { } /** - * Converts `writerParallelismFraction` into a suggested number of threads + * Allow spark to parallelize over _most_ of the available processors for writing to the lake, + * because this speeds up how quickly we can sink a batch. * - * For bigger instances (more cores) we want more parallelism in the writer. This avoids a - * situation where writing tasks exceed the length of a window, which causes an unbalanced use of - * cpu. + * But leave 1 processor always available, so that we are never blocked when trying to save one of + * the intermediate dataframes. */ - private def chooseWriterParallelism(config: Config.Spark): Int = - (Runtime.getRuntime.availableProcessors * config.writerParallelismFraction) - .setScale(0, BigDecimal.RoundingMode.UP) - .toInt + private def chooseWriterParallelism(): Int = + (Runtime.getRuntime.availableProcessors - 1).max(1) } diff --git a/modules/core/src/main/scala/com.snowplowanalytics.snowplow.lakes/processing/Processing.scala b/modules/core/src/main/scala/com.snowplowanalytics.snowplow.lakes/processing/Processing.scala index c38422c4..1567dc6c 100644 --- a/modules/core/src/main/scala/com.snowplowanalytics.snowplow.lakes/processing/Processing.scala +++ b/modules/core/src/main/scala/com.snowplowanalytics.snowplow.lakes/processing/Processing.scala @@ -81,11 +81,6 @@ object Processing { earliestCollectorTstamp: Option[Instant] ) - private case class Transformed( - events: List[Row], - schema: StructType - ) - private def eventProcessor[F[_]: Async: RegistryLookup]( env: Environment[F], deferredTableExists: F[Unit] @@ -126,13 +121,12 @@ object Processing { .through(handleParseFailures(env, badProcessor)) .through(BatchUp.noTimeout(env.inMemBatchBytes)) .through(transformBatch(env, badProcessor, ref)) - .through(sinkTransformedBatch(env, ref)) private def transformBatch[F[_]: RegistryLookup: Async]( env: Environment[F], badProcessor: BadRowProcessor, ref: Ref[F, WindowState] - ): Pipe[F, Batched, Transformed] = + ): Pipe[F, Batched, Nothing] = _.parEvalMapUnordered(env.cpuParallelism) { case Batched(events, entities, _, earliestCollectorTstamp) => for { _ <- Logger[F].debug(s"Processing batch of size ${events.size}") @@ -141,30 +135,29 @@ object Processing { _ <- rememberColumnNames(ref, nonAtomicFields.fields) (bad, rows) <- transformToSpark[F](badProcessor, events, nonAtomicFields) _ <- sendFailedEvents(env, badProcessor, bad) - _ <- ref.update { s => - val updatedCollectorTstamp = chooseEarliestTstamp(earliestCollectorTstamp, s.earliestCollectorTstamp) - s.copy(numEvents = s.numEvents + rows.size, earliestCollectorTstamp = updatedCollectorTstamp) - } - } yield Transformed(rows, SparkSchema.forBatch(nonAtomicFields.fields, env.respectIgluNullability)) - } + windowState <- ref.updateAndGet { s => + val updatedCollectorTstamp = chooseEarliestTstamp(earliestCollectorTstamp, s.earliestCollectorTstamp) + s.copy(numEvents = s.numEvents + rows.size, earliestCollectorTstamp = updatedCollectorTstamp) + } + _ <- sinkTransformedBatch(env, windowState, rows, SparkSchema.forBatch(nonAtomicFields.fields, env.respectIgluNullability)) + } yield () + }.drain private def sinkTransformedBatch[F[_]: Sync]( env: Environment[F], - ref: Ref[F, WindowState] - ): Pipe[F, Transformed, Nothing] = - _.evalMap { case Transformed(rows, schema) => - NonEmptyList.fromList(rows) match { - case Some(nel) => - for { - windowState <- ref.get - _ <- env.lakeWriter.localAppendRows(windowState.viewName, nel, schema) - _ <- Logger[F].debug(s"Finished processing batch of size ${rows.size}") - } yield () - case None => - Logger[F].debug(s"An in-memory batch yielded zero good events. Nothing will be saved to local disk.") - } - - }.drain + windowState: WindowState, + rows: List[Row], + schema: StructType + ): F[Unit] = + NonEmptyList.fromList(rows) match { + case Some(nel) => + for { + _ <- env.lakeWriter.localAppendRows(windowState.viewName, nel, schema) + _ <- Logger[F].debug(s"Finished processing batch of size ${rows.size}") + } yield () + case None => + Logger[F].debug(s"An in-memory batch yielded zero good events. Nothing will be saved to local disk.") + } private def rememberTokens[F[_]: Functor](ref: Ref[F, WindowState]): Pipe[F, TokenedEvents, Chunk[ByteBuffer]] = _.evalMap { case TokenedEvents(events, token) => diff --git a/modules/core/src/main/scala/com.snowplowanalytics.snowplow.lakes/processing/SparkUtils.scala b/modules/core/src/main/scala/com.snowplowanalytics.snowplow.lakes/processing/SparkUtils.scala index 52321b82..e362ba64 100644 --- a/modules/core/src/main/scala/com.snowplowanalytics.snowplow.lakes/processing/SparkUtils.scala +++ b/modules/core/src/main/scala/com.snowplowanalytics.snowplow.lakes/processing/SparkUtils.scala @@ -18,7 +18,7 @@ import org.typelevel.log4cats.Logger import org.typelevel.log4cats.slf4j.Slf4jLogger import org.apache.spark.sql.{DataFrame, Row, SparkSession} -import org.apache.spark.sql.functions.current_timestamp +import org.apache.spark.sql.functions.{col, current_timestamp} import org.apache.spark.sql.types.StructType import com.snowplowanalytics.snowplow.lakes.Config @@ -70,7 +70,11 @@ private[processing] object SparkUtils { for { _ <- Logger[F].debug(s"Initializing local DataFrame with name $viewName") _ <- Sync[F].blocking { - spark.emptyDataFrame.createTempView(viewName) + try { + spark.sparkContext.setLocalProperty("spark.scheduler.pool", "pool1") + spark.emptyDataFrame.createTempView(viewName) + } finally + spark.sparkContext.setLocalProperty("spark.scheduler.pool", null) } } yield () @@ -83,12 +87,16 @@ private[processing] object SparkUtils { for { _ <- Logger[F].debug(s"Saving batch of ${rows.size} events to local DataFrame $viewName") _ <- Sync[F].blocking { - spark - .createDataFrame(rows.toList.asJava, schema) - .coalesce(1) - .localCheckpoint() - .unionByName(spark.table(viewName), allowMissingColumns = true) - .createOrReplaceTempView(viewName) + try { + spark.sparkContext.setLocalProperty("spark.scheduler.pool", "pool1") + spark + .createDataFrame(rows.toList.asJava, schema) + .coalesce(1) + .localCheckpoint() + .unionByName(spark.table(viewName), allowMissingColumns = true) + .createOrReplaceTempView(viewName) + } finally + spark.sparkContext.setLocalProperty("spark.scheduler.pool", null) } } yield () @@ -97,17 +105,21 @@ private[processing] object SparkUtils { viewName: String, writerParallelism: Int ): F[DataFrame] = - Sync[F].blocking { + Sync[F].delay { spark .table(viewName) + .repartitionByRange(writerParallelism, col("event_name"), col("event_id")) + .sortWithinPartitions("event_name") .withColumn("load_tstamp", current_timestamp()) - .coalesce(writerParallelism) - .localCheckpoint() } def dropView[F[_]: Sync](spark: SparkSession, viewName: String): F[Unit] = Logger[F].info(s"Removing Spark data frame $viewName from local disk...") >> Sync[F].blocking { - spark.catalog.dropTempView(viewName) + try { + spark.sparkContext.setLocalProperty("spark.scheduler.pool", "pool1") + spark.catalog.dropTempView(viewName) + } finally + spark.sparkContext.setLocalProperty("spark.scheduler.pool", null) }.void } diff --git a/modules/core/src/main/scala/com.snowplowanalytics.snowplow.lakes/tables/IcebergWriter.scala b/modules/core/src/main/scala/com.snowplowanalytics.snowplow.lakes/tables/IcebergWriter.scala index ea8a4cd2..066a71a1 100644 --- a/modules/core/src/main/scala/com.snowplowanalytics.snowplow.lakes/tables/IcebergWriter.scala +++ b/modules/core/src/main/scala/com.snowplowanalytics.snowplow.lakes/tables/IcebergWriter.scala @@ -61,8 +61,7 @@ class IcebergWriter(config: Config.Iceberg) extends Writer { df.write .format("iceberg") .mode("append") - .option("merge-schema", true) - .option("check-ordering", false) + .options(config.icebergWriteOptions) .saveAsTable(fqTable) } diff --git a/project/Dependencies.scala b/project/Dependencies.scala index a25ec2cf..c088c981 100644 --- a/project/Dependencies.scala +++ b/project/Dependencies.scala @@ -15,12 +15,12 @@ object Dependencies { object Spark { // A version of Spark which is compatible with the current version of Iceberg and Delta - val forIcebergDelta = "3.5.3" + val forIcebergDelta = "3.5.4" val forIcebergDeltaMinor = "3.5" // Hudi can use a different version of Spark because we bundle a separate Docker image // This version of Spark must be compatible with the current version of Hudi - val forHudi = "3.5.3" + val forHudi = "3.5.4" val forHudiMinor = "3.5" } @@ -35,7 +35,7 @@ object Dependencies { val delta = "3.2.1" val hudi = "0.15.0" val hudiAws = "1.0.0-beta2" - val iceberg = "1.6.1" + val iceberg = "1.7.1" val hadoop = "3.4.1" val gcsConnector = "hadoop3-2.2.25" val hive = "3.1.3"