Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Performance Regression Caused by Schema Hash in Spark PartitionPruning with Wide Tables #11763

Open
2 of 3 tasks
wzx140 opened this issue Dec 12, 2024 · 1 comment · May be fixed by #11764
Open
2 of 3 tasks

Performance Regression Caused by Schema Hash in Spark PartitionPruning with Wide Tables #11763

wzx140 opened this issue Dec 12, 2024 · 1 comment · May be fixed by #11764
Labels
bug Something isn't working

Comments

@wzx140
Copy link
Contributor

wzx140 commented Dec 12, 2024

Apache Iceberg version

1.5.0

Query engine

Spark

Please describe the bug 🐞

Description:
In Spark’s optimization rule PartitionPruning, the method SparkBatchQueryScan#filterAttributes is called, which triggers the computation of Set<PartitionSpec> specs. During this process, it iterates over each file and parses the jsonString into PartitionSpec. To avoid repeated parsing, a cache map was added in org.apache.iceberg.PartitionSpecParser#fromJson with (schema, jsonStr) -> PartitionSpec.

However, when dealing with tables containing a large number of files and columns, calculating the schema hash can consume significant CPU time.

Proposed Solution:
Avro Schema mitigates this issue by caching the schema’s hashCode to avoid repeated computations. A similar optimization could be applied to Iceberg’s schema to reduce the performance regression caused by frequent schema hash calculations.

Reproduction Example:
I added a timer to the method org.apache.iceberg.spark.source.SparkPartitioningAwareScan

  protected Set<PartitionSpec> specs() {
    if (specs == null) {
      long ts = System.currentTimeMillis();
      // avoid calling equals/hashCode on specs as those methods are relatively expensive
      IntStream specIds = tasks().stream().mapToInt(task -> task.spec().specId()).distinct();
      this.specs = specIds.mapToObj(id -> table().specs().get(id)).collect(Collectors.toSet());
      LOG.warn("Scanned {} specs in {} ms", specs.size(), System.currentTimeMillis() - ts);
    }

    return specs;
  }

and tested the following SQL query on a table with 900,000 files and 1500+ columns:

SELECT SUM(HASH(s.reqId + t.reqId)) 
FROM table s
JOIN table t
ON s.reqId = t.reqId and s.partition = 'part1' and t.partition = 'part1'

This query triggers org.apache.spark.sql.execution.dynamicpruning.PartitionPruning optimization rule twice. Before the task execution, the driver spends approximately 150 seconds on pre-execution preparation, with over 140 seconds consumed in calculating PartitionSpec.

Flame Graph:
1

Thread Dump:

[email protected]/java.util.Arrays.hashCode(Arrays.java:4499)
[email protected]/java.util.Objects.hash(Objects.java:133)
org.apache.iceberg.types.Types$NestedField.hashCode(Types.java:523)
[email protected]/java.util.Arrays.hashCode(Arrays.java:4499)
[email protected]/java.util.Objects.hash(Objects.java:133)
org.apache.iceberg.types.Types$ListType.hashCode(Types.java:763)
[email protected]/java.util.Arrays.hashCode(Arrays.java:4499)
[email protected]/java.util.Objects.hash(Objects.java:133)
org.apache.iceberg.types.Types$NestedField.hashCode(Types.java:523)
[email protected]/java.util.Arrays.hashCode(Arrays.java:4499)
org.apache.iceberg.types.Types$StructType.hashCode(Types.java:630)
[email protected]/java.util.Arrays.hashCode(Arrays.java:4499)
org.apache.iceberg.relocated.com.google.common.base.Objects.hashCode(Objects.java:79)
org.apache.iceberg.util.Pair.hashCode(Pair.java:117)
[email protected]/java.util.concurrent.ConcurrentHashMap.get(ConcurrentHashMap.java:936)
org.apache.iceberg.shaded.com.github.benmanes.caffeine.cache.BoundedLocalCache.computeIfAbsent(BoundedLocalCache.java:2370)
org.apache.iceberg.shaded.com.github.benmanes.caffeine.cache.LocalCache.computeIfAbsent(LocalCache.java:108)
org.apache.iceberg.shaded.com.github.benmanes.caffeine.cache.LocalManualCache.get(LocalManualCache.java:62)
org.apache.iceberg.PartitionSpecParser.fromJson(PartitionSpecParser.java:86)
org.apache.iceberg.BaseContentScanTask.spec(BaseContentScanTask.java:71) => holding Monitor(org.apache.iceberg.BaseFileScanTask@520850087)
org.apache.iceberg.BaseFileScanTask.spec(BaseFileScanTask.java:34)
org.apache.iceberg.spark.source.SparkPartitioningAwareScan.lambda$specs$1(SparkPartitioningAwareScan.java:165)
org.apache.iceberg.spark.source.SparkPartitioningAwareScan$$Lambda$3617/0x00007f58ed482c28.applyAsInt(Unknown Source)
[email protected]/java.util.stream.ReferencePipeline$4$1.accept(ReferencePipeline.java:214)
[email protected]/java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1625)
[email protected]/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:509)
[email protected]/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:499)
[email protected]/java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:921)
[email protected]/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
[email protected]/java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:682)
org.apache.iceberg.spark.source.SparkPartitioningAwareScan.specs(SparkPartitioningAwareScan.java:166)
org.apache.iceberg.spark.source.SparkBatchQueryScan.filterAttributes(SparkBatchQueryScan.java:103)
app//org.apache.spark.sql.execution.dynamicpruning.PartitionPruning$.$anonfun$getFilterableTableScan$1(PartitionPruning.scala:82)
app//org.apache.spark.sql.execution.dynamicpruning.PartitionPruning$$$Lambda$3616/0x00007f58ed48c3b0.apply(Unknown Source)
app//scala.Option.flatMap(Option.scala:271)
app//org.apache.spark.sql.execution.dynamicpruning.PartitionPruning$.getFilterableTableScan(PartitionPruning.scala:62)
app//org.apache.spark.sql.execution.dynamicpruning.PartitionPruning$$anonfun$prune$1.$anonfun$applyOrElse$1(PartitionPruning.scala:258)
app//org.apache.spark.sql.execution.dynamicpruning.PartitionPruning$$anonfun$prune$1.$anonfun$applyOrElse$1$adapted(PartitionPruning.scala:241)
app//org.apache.spark.sql.execution.dynamicpruning.PartitionPruning$$anonfun$prune$1$$Lambda$3615/0x00007f58ed48bfd0.apply(Unknown Source)
app//scala.collection.immutable.List.foreach(List.scala:431)
app//org.apache.spark.sql.execution.dynamicpruning.PartitionPruning$$anonfun$prune$1.applyOrElse(PartitionPruning.scala:241)
app//org.apache.spark.sql.execution.dynamicpruning.PartitionPruning$$anonfun$prune$1.applyOrElse(PartitionPruning.scala:219)

Environment:
I tested this issue on Iceberg 1.5.0, and it is expected to persist in the latest version as well.

Willingness to contribute

  • I can contribute a fix for this bug independently
  • I would be willing to contribute a fix for this bug with guidance from the Iceberg community
  • I cannot contribute a fix for this bug at this time
@wzx140 wzx140 added the bug Something isn't working label Dec 12, 2024
@wzx140 wzx140 linked a pull request Dec 12, 2024 that will close this issue
@wzx140
Copy link
Contributor Author

wzx140 commented Dec 13, 2024

Performance Comparison After Adding Cache

Metric Before Adding Cache After Adding Cache
Pre-execution Preparation Time 154s 18s
Scan Spec Time 142s 5s

Pre-execution Preparation Time: the time interval from the first table load to the start of the first stage execution
Scan Spec Time: added a timer to the method SparkPartitioningAwareScan#specs

Flame Graph
Before Adding Cache: https://drive.google.com/file/d/1o68Q6n1c-BD7xwfM7ETO6fjbC3jjrlOr/view?usp=drive_link
After Adding Cache: https://drive.google.com/file/d/1YnGnEZ06Es7xs4fGVIZgnjDFS3L4cSR2/view?usp=drive_link

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
None yet
Development

Successfully merging a pull request may close this issue.

1 participant