Skip to content

Commit

Permalink
Repartition by event name before writing to the lake
Browse files Browse the repository at this point in the history
Previously, our Iceberg writer was using the [hash write distribution mode][1]
because that is the default for Iceberg. In this mode, Spark
repartitions by the dataframe immediately before writing to the lake.

After this commit, we explicitly repartition the dataframe as part of
the existing spark task for preparing the final dataframe. This means we
can change the Iceberg write distribution mode to `none`.

We partition by the combination `event_name + event_id`: the former
because it matches the lake partitioning, and the latter because it adds
salt ensures equally sized partitions.

Overall this seems to improve the time taken to write a window of events
to Iceberg. This fixes a problem we found, in which the write phase
could get too slow when under high load (Iceberg only): specifically, a
write was taking longer than the loader's "window" and this caused
periods of low cpu usage, where the loader's processing phase was
waiting for the write phase to catch up.

Note: this improvement will not help Snowplow users who have changed the
parition key to something different to our default. We might want to
make a follow-up change, in which it auto-discovers the lake's partition
key. For example, some users might want to partition by `app_id` instead
of `event_name`.

[1]: https://iceberg.apache.org/docs/1.7.1/spark-writes/#writing-distribution-modes
  • Loading branch information
istreeter committed Jan 9, 2025
1 parent 8f31f53 commit c5ea7d7
Show file tree
Hide file tree
Showing 11 changed files with 99 additions and 90 deletions.
11 changes: 6 additions & 5 deletions config/config.aws.reference.hocon
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,12 @@
# "icebergTableProperties": {
# "write.metadata.metrics.column.event_id": "count"
# }
#
# # -- Any valid Iceberg write option
# # -- This can be blank in most setups because the loader already sets sensible defaults.
# "icebergWriteOptions": {
# "write-format": "parquet"
# }
# }

"bad": {
Expand Down Expand Up @@ -181,11 +187,6 @@
# -- E.g. to change credentials provider
"fs.s3a.aws.credentials.provider": "com.amazonaws.auth.InstanceProfileCredentialsProvider"
}

# -- Controls how many spark tasks run in parallel during writing the events to cloud storage.
# -- E.g. If there are 8 available processors, and cpuParallelismFraction = 0.5, then we have 4 spark tasks for writing.
# -- The default value is known to work well. Changing this setting might affect memory usage, file sizes, and/or latency.
"writerParallelismFraction": 0.5
}

# Retry configuration for lake operation failures
Expand Down
11 changes: 6 additions & 5 deletions config/config.azure.reference.hocon
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,12 @@
# "icebergTableProperties": {
# "write.metadata.metrics.column.event_id": "count"
# }
#
# # -- Any valid Iceberg write option
# # -- This can be blank in most setups because the loader already sets sensible defaults.
# "icebergWriteOptions": {
# "write-format": "parquet"
# }
# }

"bad": {
Expand Down Expand Up @@ -145,11 +151,6 @@
# -- E.g. to enable the spark ui for debugging:
"spark.ui.enabled": true
}

# -- Controls how many spark tasks run in parallel during writing the events to cloud storage.
# -- E.g. If there are 8 available processors, and cpuParallelismFraction = 0.5, then we have 4 spark tasks for writing.
# -- The default value is known to work well. Changing this setting might affect memory usage, file sizes, and/or latency.
"writerParallelismFraction": 0.5
}

# Retry configuration for lake operation failures
Expand Down
11 changes: 6 additions & 5 deletions config/config.gcp.reference.hocon
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,12 @@
# "icebergTableProperties": {
# "write.metadata.metrics.column.event_id": "count"
# }
#
# # -- Any valid Iceberg write option
# # -- This can be blank in most setups because the loader already sets sensible defaults.
# "icebergWriteOptions": {
# "write-format": "parquet"
# }
# }

"bad": {
Expand Down Expand Up @@ -160,11 +166,6 @@
# -- E.g. to enable the spark ui for debugging:
"spark.ui.enabled": true
}

# -- Controls how many spark tasks run in parallel during writing the events to cloud storage.
# -- E.g. If there are 8 available processors, and cpuParallelismFraction = 0.5, then we have 4 spark tasks for writing.
# -- The default value is known to work well. Changing this setting might affect memory usage, file sizes, and/or latency.
"writerParallelismFraction": 0.5
}

# Retry configuration for lake operation failures
Expand Down
8 changes: 8 additions & 0 deletions modules/core/src/main/resources/fairscheduler.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
<?xml version="1.0"?>
<allocations>
<pool name="pool1">
<schedulingMode>FIFO</schedulingMode>
<weight>1000</weight>
<minShare>1</minShare>
</pool>
</allocations>
9 changes: 8 additions & 1 deletion modules/core/src/main/resources/reference.conf
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,12 @@
"write.metadata.metrics.column.true_tstamp": "full"
}

"icebergWriteOptions": {
"merge-schema": "true"
"check-ordering": "false"
"distribution-mode": "none"
}

"hudiTableProperties": {
"hoodie.table.name": "events"
"hoodie.table.keygenerator.class": "org.apache.hudi.keygen.TimestampBasedKeyGenerator"
Expand Down Expand Up @@ -121,9 +127,10 @@
"spark.sql.parquet.datetimeRebaseModeInWrite": "CORRECTED"
"spark.memory.storageFraction": "0"
"spark.databricks.delta.autoCompact.enabled": "false"
"spark.scheduler.mode": "FAIR"
"spark.sql.adaptive.enabled": "false" # False gives better performance on the type of shuffle done by Lake Loader
}
"gcpUserAgent": ${gcpUserAgent}
"writerParallelismFraction": 0.5
}

"retries": {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,8 @@ object Config {
table: String,
catalog: IcebergCatalog,
location: URI,
icebergTableProperties: Map[String, String]
icebergTableProperties: Map[String, String],
icebergWriteOptions: Map[String, String]
) extends Target

sealed trait IcebergCatalog
Expand All @@ -100,8 +101,7 @@ object Config {
case class Spark(
taskRetries: Int,
conf: Map[String, String],
gcpUserAgent: GcpUserAgent,
writerParallelismFraction: BigDecimal
gcpUserAgent: GcpUserAgent
)

case class Metrics(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,10 +81,9 @@ object LakeWriter {
}
for {
session <- SparkUtils.session[F](config, w, target.location)
writerParallelism = chooseWriterParallelism(config)
mutex1 <- Resource.eval(Mutex[F])
mutex2 <- Resource.eval(Mutex[F])
} yield impl(session, w, writerParallelism, mutex1, mutex2)
writerParallelism = chooseWriterParallelism()
mutex <- Resource.eval(Mutex[F])
} yield impl(session, w, writerParallelism, mutex)
}

def withHandledErrors[F[_]: Async](
Expand Down Expand Up @@ -129,22 +128,14 @@ object LakeWriter {
/**
* Implementation of the LakeWriter
*
* The mutexes are needed because we allow overlapping windows. They prevent two different windows
* The mutex is needed because we allow overlapping windows. They prevent two different windows
* from trying to run the same expensive operation at the same time.
*
* @param mutextForWriting
* Makes sure there is only ever one spark job trying to write events to the lake. This is a
* IO-intensive task.
* @param mutexForUnioning
* Makes sure there is only ever one spark job trying to union smaller DataFrames into a larger
* DataFrame, immediately before writing to the lake. This is a cpu-intensive task.
*/
private def impl[F[_]: Sync](
spark: SparkSession,
w: Writer,
writerParallelism: Int,
mutexForWriting: Mutex[F],
mutexForUnioning: Mutex[F]
mutex: Mutex[F]
): LakeWriter[F] = new LakeWriter[F] {
def createTable: F[Unit] =
w.prepareTable(spark)
Expand All @@ -164,25 +155,21 @@ object LakeWriter {

def commit(viewName: String): F[Unit] =
for {
df <- mutexForUnioning.lock.surround {
SparkUtils.prepareFinalDataFrame(spark, viewName, writerParallelism)
}
_ <- mutexForWriting.lock
df <- SparkUtils.prepareFinalDataFrame(spark, viewName, writerParallelism)
_ <- mutex.lock
.surround {
w.write(df)
}
} yield ()
}

/**
* Converts `writerParallelismFraction` into a suggested number of threads
* Allow spark to parallelize over _most_ of the available processors for writing to the lake,
* because this speeds up how quickly we can sink a batch.
*
* For bigger instances (more cores) we want more parallelism in the writer. This avoids a
* situation where writing tasks exceed the length of a window, which causes an unbalanced use of
* cpu.
* But leave 1 processor always available, so that we are never blocked when trying to save one of
* the intermediate dataframes.
*/
private def chooseWriterParallelism(config: Config.Spark): Int =
(Runtime.getRuntime.availableProcessors * config.writerParallelismFraction)
.setScale(0, BigDecimal.RoundingMode.UP)
.toInt
private def chooseWriterParallelism(): Int =
(Runtime.getRuntime.availableProcessors - 1).max(1)
}
Original file line number Diff line number Diff line change
Expand Up @@ -81,11 +81,6 @@ object Processing {
earliestCollectorTstamp: Option[Instant]
)

private case class Transformed(
events: List[Row],
schema: StructType
)

private def eventProcessor[F[_]: Async: RegistryLookup](
env: Environment[F],
deferredTableExists: F[Unit]
Expand Down Expand Up @@ -126,13 +121,12 @@ object Processing {
.through(handleParseFailures(env, badProcessor))
.through(BatchUp.noTimeout(env.inMemBatchBytes))
.through(transformBatch(env, badProcessor, ref))
.through(sinkTransformedBatch(env, ref))

private def transformBatch[F[_]: RegistryLookup: Async](
env: Environment[F],
badProcessor: BadRowProcessor,
ref: Ref[F, WindowState]
): Pipe[F, Batched, Transformed] =
): Pipe[F, Batched, Nothing] =
_.parEvalMapUnordered(env.cpuParallelism) { case Batched(events, entities, _, earliestCollectorTstamp) =>
for {
_ <- Logger[F].debug(s"Processing batch of size ${events.size}")
Expand All @@ -141,30 +135,29 @@ object Processing {
_ <- rememberColumnNames(ref, nonAtomicFields.fields)
(bad, rows) <- transformToSpark[F](badProcessor, events, nonAtomicFields)
_ <- sendFailedEvents(env, badProcessor, bad)
_ <- ref.update { s =>
val updatedCollectorTstamp = chooseEarliestTstamp(earliestCollectorTstamp, s.earliestCollectorTstamp)
s.copy(numEvents = s.numEvents + rows.size, earliestCollectorTstamp = updatedCollectorTstamp)
}
} yield Transformed(rows, SparkSchema.forBatch(nonAtomicFields.fields, env.respectIgluNullability))
}
windowState <- ref.updateAndGet { s =>
val updatedCollectorTstamp = chooseEarliestTstamp(earliestCollectorTstamp, s.earliestCollectorTstamp)
s.copy(numEvents = s.numEvents + rows.size, earliestCollectorTstamp = updatedCollectorTstamp)
}
_ <- sinkTransformedBatch(env, windowState, rows, SparkSchema.forBatch(nonAtomicFields.fields, env.respectIgluNullability))
} yield ()
}.drain

private def sinkTransformedBatch[F[_]: Sync](
env: Environment[F],
ref: Ref[F, WindowState]
): Pipe[F, Transformed, Nothing] =
_.evalMap { case Transformed(rows, schema) =>
NonEmptyList.fromList(rows) match {
case Some(nel) =>
for {
windowState <- ref.get
_ <- env.lakeWriter.localAppendRows(windowState.viewName, nel, schema)
_ <- Logger[F].debug(s"Finished processing batch of size ${rows.size}")
} yield ()
case None =>
Logger[F].debug(s"An in-memory batch yielded zero good events. Nothing will be saved to local disk.")
}

}.drain
windowState: WindowState,
rows: List[Row],
schema: StructType
): F[Unit] =
NonEmptyList.fromList(rows) match {
case Some(nel) =>
for {
_ <- env.lakeWriter.localAppendRows(windowState.viewName, nel, schema)
_ <- Logger[F].debug(s"Finished processing batch of size ${rows.size}")
} yield ()
case None =>
Logger[F].debug(s"An in-memory batch yielded zero good events. Nothing will be saved to local disk.")
}

private def rememberTokens[F[_]: Functor](ref: Ref[F, WindowState]): Pipe[F, TokenedEvents, Chunk[ByteBuffer]] =
_.evalMap { case TokenedEvents(events, token) =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ import org.typelevel.log4cats.Logger
import org.typelevel.log4cats.slf4j.Slf4jLogger

import org.apache.spark.sql.{DataFrame, Row, SparkSession}
import org.apache.spark.sql.functions.current_timestamp
import org.apache.spark.sql.functions.{col, current_timestamp}
import org.apache.spark.sql.types.StructType

import com.snowplowanalytics.snowplow.lakes.Config
Expand Down Expand Up @@ -70,7 +70,11 @@ private[processing] object SparkUtils {
for {
_ <- Logger[F].debug(s"Initializing local DataFrame with name $viewName")
_ <- Sync[F].blocking {
spark.emptyDataFrame.createTempView(viewName)
try {
spark.sparkContext.setLocalProperty("spark.scheduler.pool", "pool1")
spark.emptyDataFrame.createTempView(viewName)
} finally
spark.sparkContext.setLocalProperty("spark.scheduler.pool", null)
}
} yield ()

Expand All @@ -83,12 +87,16 @@ private[processing] object SparkUtils {
for {
_ <- Logger[F].debug(s"Saving batch of ${rows.size} events to local DataFrame $viewName")
_ <- Sync[F].blocking {
spark
.createDataFrame(rows.toList.asJava, schema)
.coalesce(1)
.localCheckpoint()
.unionByName(spark.table(viewName), allowMissingColumns = true)
.createOrReplaceTempView(viewName)
try {
spark.sparkContext.setLocalProperty("spark.scheduler.pool", "pool1")
spark
.createDataFrame(rows.toList.asJava, schema)
.coalesce(1)
.localCheckpoint()
.unionByName(spark.table(viewName), allowMissingColumns = true)
.createOrReplaceTempView(viewName)
} finally
spark.sparkContext.setLocalProperty("spark.scheduler.pool", null)
}
} yield ()

Expand All @@ -97,17 +105,21 @@ private[processing] object SparkUtils {
viewName: String,
writerParallelism: Int
): F[DataFrame] =
Sync[F].blocking {
Sync[F].delay {
spark
.table(viewName)
.repartitionByRange(writerParallelism, col("event_name"), col("event_id"))
.sortWithinPartitions("event_name")
.withColumn("load_tstamp", current_timestamp())
.coalesce(writerParallelism)
.localCheckpoint()
}

def dropView[F[_]: Sync](spark: SparkSession, viewName: String): F[Unit] =
Logger[F].info(s"Removing Spark data frame $viewName from local disk...") >>
Sync[F].blocking {
spark.catalog.dropTempView(viewName)
try {
spark.sparkContext.setLocalProperty("spark.scheduler.pool", "pool1")
spark.catalog.dropTempView(viewName)
} finally
spark.sparkContext.setLocalProperty("spark.scheduler.pool", null)
}.void
}
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,7 @@ class IcebergWriter(config: Config.Iceberg) extends Writer {
df.write
.format("iceberg")
.mode("append")
.option("merge-schema", true)
.option("check-ordering", false)
.options(config.icebergWriteOptions)
.saveAsTable(fqTable)
}

Expand Down
6 changes: 3 additions & 3 deletions project/Dependencies.scala
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,12 @@ object Dependencies {
object Spark {

// A version of Spark which is compatible with the current version of Iceberg and Delta
val forIcebergDelta = "3.5.3"
val forIcebergDelta = "3.5.4"
val forIcebergDeltaMinor = "3.5"

// Hudi can use a different version of Spark because we bundle a separate Docker image
// This version of Spark must be compatible with the current version of Hudi
val forHudi = "3.5.3"
val forHudi = "3.5.4"
val forHudiMinor = "3.5"
}

Expand All @@ -35,7 +35,7 @@ object Dependencies {
val delta = "3.2.1"
val hudi = "0.15.0"
val hudiAws = "1.0.0-beta2"
val iceberg = "1.6.1"
val iceberg = "1.7.1"
val hadoop = "3.4.1"
val gcsConnector = "hadoop3-2.2.25"
val hive = "3.1.3"
Expand Down

0 comments on commit c5ea7d7

Please sign in to comment.