-
Notifications
You must be signed in to change notification settings - Fork 1.7k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[Spark] Skip collecting commit stats to prevent computing Snapshot State #2718
[Spark] Skip collecting commit stats to prevent computing Snapshot State #2718
Conversation
txnId = Some(txnId)) | ||
recordDeltaEvent(deltaLog, DeltaLogging.DELTA_COMMIT_STATS_OPTYPE, data = stats) | ||
|
||
if (spark.sessionState.conf.getConf(DeltaSQLConf.DELTA_COLLECT_COMMIT_STATS)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When false can't we record basic info that doesn't require expensive extra computation?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi @felipepessoto yeah that's a good idea. I will amend this PR to add your suggestion.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi @felipepessoto I amended the PR so now it only skips logging the expensive fields.
(I also removed the part of this PR which was about skipping calculating the row ID high watermark. That one is a slightly different issue, because it forces the computation of the old snapshot, not the new snapshot. I still think there is an issue there to be solved, but that can be tackled separately)
Does it make sense to you what I am trying to achieve here? Have I explained OK why it is a big problem to compute the SnapshotState during each transaction? There is an opportunity here to make Delta much much faster for streaming writes.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Makes sense to me. Thanks. We need some of the maintainers to review it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@istreeter just curious about CREATE TABLE scenario, have you measured the diff with your optimization? Creating an empty table takes several seconds, I hope this also improves it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi @felipepessoto yes there is an improvement. I tested with a remote bucket in S3:
spark.sql("""CREATE TABLE delta.`s3a://<redacted>/empty` (id string) USING DELTA """)
Before this PR, the job takes 9.4 seconds and I can see from the logs the snapshot state is computed.
After this PR, the job takes 6.4 seconds, and it skips computing the snapshot state.
I repeated this test several times, and the timings were fairly consistent.
Can you provide more details what is the impact/perfomance degradation when reading from the table? |
Hi @talgo, you asked about reading from the table. This change has no impact whatsoever on reading, because it does not touch that part of the code. This is entirely about writing to the table. I can provide some more details about the impact on writing to the table. Here is some log output from a spark job which wrote 1278 new rows into a Delta Table.
From the timestamps on the left, you can see there is a 65 second gap between the lines After this PR, we completely eliminate that 65 second gap. The overall time to write rows is made faster by 65 seconds. This change will have most impact on streaming jobs that write into Delta, because those types of jobs need to do frequent commits. |
b5bf7be
to
564e6ba
Compare
@vkorukanti if you have a chance to review this. Looks a great improvement 137s vs 63s, reduction of 54%: #2718 (comment) And it only impact Stats Logs. |
@istreeter If I understand correctly, this change would shift the cost of state reconstruction from the writer who produces the post-commit snapshot, to a subsequent reader who consumes that snapshot on the same cluster. If no such reader exists (e.g. the table is write-mostly or the reader is on a different cluster), then the change is a net win. That's what the microbenchmark above was measuring: just writes, no read afterward. For an iterated workload that does write -> read -> ... -> write -> read, I would expect no change in the overall latency of a write -> read pair. Does that sound correct? |
Hi @scovich yes that sounds correct, as you described it. If every write is always followed by a read on the same cluster, then there is no net change in the number of state reconstructions. This PR would just shift the state reconstruction to a different place. However, if you consider any other workload pattern, (in which not every write is followed by a read on the same cluster) then there is benefit from not reconstructing the state as part of the write. For example, this workload pattern would also benefit: write -> write -> read -> write -> write -> read -> write -> write -> read. At my company, we use a dedicated cluster just for writing. So the pattern is simply write -> write -> write -> write -> write etc. There should be no reason to ever do state reconstruction on that usage pattern. |
Is the dedicated cluster writing repeatedly? Or is it ephemeral, with each cluster instance performing one write and then shutting down? Asking because even "blind" writes actually do involve reads. At a minimum, they must access the snapshot's protocol and metadata. If the workload uses app ids to track stream progress, then that triggers a full state reconstruction as well. If it's a merge or update instead of a blind insert, then there's also a full-blown query in the write path. So for write -> write on the same cluster, we have:
In such a case, this PR wouldn't change overall timing. It just changes whether "post-commit snapshot" or "write" step pays the snapshot cost. Note: I 100% agree the optimization is helpful when it applies, and that we should probably incorporate it into Delta. I just worry it might not apply as often as we wish it did, in practice. Is the microbenchmark fully representative of the actual workload you hope to optimize? |
If we have multiple writers, computing the Snapshot on read also improves perf by skipping loading a snapshot (after my write) that will be stale anyway because of the other writers? |
Hi @ryan-johnson-databricks the cluster we use for writing is long-lived and not ephemeral. Once the cluster is up, and once the spark context is created, then we re-use that spark context and write a batch of rows in Delta every ~1 minute. Each write is an append; never a merge, never an update.
The logs I shared in this comment are real logs from a production workload. The logs were taken after the cluster had already been up and running for several hours. Imagine a repeating loop with those log lines repeated again and again. I am certain that eliminating the state reconstruction would make this workload faster. However, I believe this PR in its current state only goes half way to solving that problem! After this PR, Delta will avoid computing state for the post-commit snapshot. But currently, Delta also computes state for the pre-commit snapshot on this line: val readRowIdHighWatermark =
RowId.extractHighWatermark(snapshot).getOrElse(RowId.MISSING_HIGH_WATER_MARK) So far I have not worked out how to avoid computing state on that line, although I believe it should be possible. For our workload, we are not interested in row tracking or watermarks. (And even if we were -- I still reckon it could be achieved without computing snapshot state). One final thought... it would be awesome if we could have a unit test that proves that we don't compute snapshot state during writes. This was beyond my abilities to add such a unit test! |
My questions actually came from looking at those logs -- they ONLY show the commit path itself, and creation of the post-commit snapshot (251407). They do not include creation of the snapshot the transaction read from (251406). If the workload runs in a loop, then one minute later we'd get a second transaction that starts from 251407 (and triggers P&M at a minimum), and which tries to create 251408. Thus, the first transaction avoids the cost but the second transaction ends up paying it one minute later. Can you check what happens across 2-3 inserts instead of just the commit step of one insert in isolation? I expect that the 65 seconds which disappeared from that log snippet really just moved somewhere else.
Should be easy enough to check Snapshot.stateReconstructionTriggered after the operation you care about, at least for the really expensive computed state. It wouldn't cover protocol, metadata, or checkpoint size tho, because those don't rely on state reconstruction in the first place. NOTE: In general, we can't avoid computing at least protocol and metadata, because they're required to interpret the snapshot correctly (table schema, table features, etc). But they're also much cheaper to compute than the full state reconstruction. |
That actually looks like a simple (performance) bug: AFAIK, the rowid high watermark only matters if the table supports rowids, but the extractHighWatermark method body lacks any |
Sure, I ran a simple script like this: (1 to 100).foreach { i =>
spark.sql(s"INSERT INTO delta.`file:///tmp/test_delta` (v) values ($i)")
} I attach here two versions of the stdout logs:
From the logs, you will see that before this PR, we get log lines like
The delta spec here makes a distinction between when row tracking is supported vs when row tracking is enabled. I think ideally we want to avoid computing snapshot state whenever row tracking is not enabled. Whereas I think your suggested change would only avoid the computation if row tracking is not supported. |
The spec does require keeping track of the high-water mark and setting base row ids / default row commit whenever the feature is supported, not only enabled. The
so as Ryan suggested, the simplest would be to skip the expensive snapshot recomputation if the feature isn't supported |
Ah, that makes sense now. I didn't know you had made additional changes to avoid triggering snapshot state on the read path, in addition to the write path changes in this PR.
I had to word the my suggested change the way I did, in order to comply with the spec:
Writers can't assign row ids without knowing the current high watermark, so if the feature is "supported" in the table protocol then we have to pull the high watermark in order to commit. In practice, I don't think the distinction between "enabled" and "supported" should matter, because "supported" is anyway a transient state, used e.g. while a table is trying to enable rowids for the first time:
Update: ninja'd by @johanl-db |
OK I read bit more of the protocol, and now I understand the point you're making: the writer MUST calculate the high watermark if That is OK for me, because the tables I write to do not have (I do wonder if it is possible to implement a cheap way of checking the high watermark. Similar to how there is a cheap way to check protocol and metadata. Then tables with row tracking enabled could also get the benefit of the change I'm making. But I won't pursue that idea in this current PR). I have a question for the people following this thread.... I do believe this PR will make a big performance difference for many uses of Delta. But currently, users would only get that speed-up if they change I only added the configuration option because I didn't want to upset someone who is depending on that logging. |
564e6ba
to
b166e50
Compare
That's a great question and I don't have a good answer. More configs are always annoying, but people also complain (loudly) if their workload breaks. |
I think numFilesTotal and sizeInBytesTotal are important stats and we should keep the config. |
@felipepessoto I do see your point that it's useful to track the growth of a table over time. But it seems that information just isn't naturally available inside How would you feel if val stats = SnapshotStateStats(
numFilesTotal = _computedState.numOfFiles,
sizeInBytesTotal = _computedState.sizeInBytes
)
recordDeltaEvent(deltaLog, "delta.snapshot.stats", data = stats) Admittedly you would not get the stats for every transaction. But you would get it every time Delta does a post-commit checkpoint. And you would get it every time Spark reads a version of the table. |
I also don’t have a good answer, depends on how people use it, that’s why I think we need the config flag. @sezruby any thoughts about it? |
Some users might use it to monitor their workload. There's no good way to monitor Delta table workload, so it would be better to keep the config. It's existing code, no regression risk on keeping it. Yet, there's a way to recalculate the metrics when it's needed, we could remove it I think. |
278595f
to
8bb142d
Compare
If we keep the config option, and if it is set to true by default (i.e. the backwards-compatible behaviour) then we all know realistically most users will never change that config to something different. I just think that is a real lost opportunity! I'll stress it again: I strongly believe this PR can make Delta workloads much faster for many users under many circumstances. I suspect, without proof, that Delta users will appreciate that performance gain more so than they would appreciate the logging. I think the options available to us are:
I really think 3 is the best option. But you folks here have better knowledge than me of what Delta users might want. |
I tend to agree that we should Update: Fix inverted logic... |
8bb142d
to
6400d8a
Compare
Thank you everyone for pointing me in the right direction :) I have made the following changes to the PR:
|
spark/src/main/scala/org/apache/spark/sql/delta/OptimisticTransaction.scala
Show resolved
Hide resolved
spark/src/main/scala/org/apache/spark/sql/delta/OptimisticTransaction.scala
Outdated
Show resolved
Hide resolved
spark/src/main/scala/org/apache/spark/sql/delta/OptimisticTransaction.scala
Outdated
Show resolved
Hide resolved
6400d8a
to
7c6532a
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Very nice optimization!
spark/src/main/scala/org/apache/spark/sql/delta/OptimisticTransaction.scala
Outdated
Show resolved
Hide resolved
spark/src/test/scala/org/apache/spark/sql/delta/OptimisticTransactionSuite.scala
Outdated
Show resolved
Hide resolved
|
UPDATE: bytesNew is only added files, doesn't take removed files in consideration |
@felipepessoto I think the only way to get If you think it's possible to cheaply do |
7c6532a
to
5256d0c
Compare
@istreeter Thanks for working on this change. Please resolve the conflicts on the PR so that it could be merged? |
#### Which Delta project/connector is this regarding? - [x] Spark - [ ] Standalone - [ ] Flink - [ ] Kernel - [ ] Other (fill in here) ## Description Before this PR, Delta computes a [SnapshotState](https://github.com/delta-io/delta/blob/v3.1.0/spark/src/main/scala/org/apache/spark/sql/delta/SnapshotState.scala#L46-L58) during every commit. Computing a SnapshotState is fairly slow and expensive, because it involves reading the entirety of a checkpoint, sidecars, and log segment. For many types of commit, it should be unnecessary to compute the SnapshotState. After this PR, an transaction can avoid computing the SnapshotState of a newly created snapshot. Skipping the computation is enabled via a spark configuration option `spark.databricks.delta.commitStats.force=false` This change can have a big performance impact when writing into a Delta Table. Especially when the table comprises a large number of underlying data files. ## How was this patch tested? - Locally built delta-spark - Ran a small spark job to insert rows into a delta table - Inspected log4j output to see if snapshot state was computed - Repeated again, this time setting `spark.databricks.delta.commitStats.force=false` ## Does this PR introduce _any_ user-facing changes? Yes, after this PR the user can set spark config option `spark.databricks.delta.commitStats.force=false` to avoid computing SnapshotState after a commit.
5256d0c
to
c17db3b
Compare
@prakharjain09 Fixed!! It was an easy rebase. Nothing significant changed since it was approved. |
Seems like some unit tests are failing with this change and needs to be fixed. |
Hi @prakharjain09 the unit tests now all pass 🎉 I pushed one extra commit which amends a couple of pre-existing unit tests. I think the fix is reasonable -- it just sets things up so the starting point is from the same place that is was before this PR. I pushed it as an extra commit so it is obvious what I have changed. |
Which Delta project/connector is this regarding?
Description
Before this PR, Delta computes a SnapshotState during every commit. Computing a SnapshotState is fairly slow and expensive, because it involves reading the entirety of a checkpoint, sidecars, and log segment.
For many types of commit, it should be unnecessary to compute the SnapshotState.
After this PR, a transaction can avoid computing the SnapshotState of a newly created snapshot. Skipping the computation is enabled via a spark configuration option
spark.databricks.delta.commitStats.collect=false
This change can have a big performance impact when writing into a Delta Table. Especially when the table comprises a large number of underlying data files.
How was this patch tested?
spark.databricks.delta.commitStats.collect=false
Simple demo job that triggers computing SnapshotState, before this PR:
Does this PR introduce any user-facing changes?
Yes, after this PR the user can set spark config option
spark.databricks.delta.commitStats.collect=false
to avoid computing SnapshotState after a commit.