Skip to content
This repository has been archived by the owner on May 18, 2022. It is now read-only.

Physical Plans (Java operators VS Scala operators VS raw SQL queries) #15

Open
arnaudframmery opened this issue Jan 19, 2022 · 1 comment

Comments

@arnaudframmery
Copy link
Contributor

arnaudframmery commented Jan 19, 2022

We can look at the physical plans generated by the tranformations that are applied on our dataframes and see that the result is quite different depending on the operator that we are using. This can be an explanation about the performance gap that we can notice during the benchmarks.

These query plans are from a filter performing the operation Severity = 4

With Java Operators

(1) Scan csv 
Output [47]: [ID#16, Severity#17, ...]
Batched: false
Location: InMemoryFileIndex [file:/C:/Users/arnau/par-student-spark-atoti/target/classes/csv/US_accidents_Dec20_updated.csv]

(2) Filter [codegen id : 1]
Input [47]: [ID#16, Severity#17, ...]
Condition : io.atoti.spark.condition.EqualCondition$$Lambda$2640/[email protected]

=> We can see that the condition is not clearly understand by Spark, and so no optimization is available

With Scala Operators

(1) Scan csv 
Output [47]: [ID#16, Severity#17, ...]
Batched: false
Location: InMemoryFileIndex [file:/C:/Users/arnau/par-student-spark-atoti/target/classes/csv/US_accidents_Dec20_updated.csv]
PushedFilters: [IsNotNull(Severity), EqualTo(Severity,4)]

(2) Filter [codegen id : 1]
Input [47]: [ID#16, Severity#17, ...]
Condition : (isnotnull(Severity#17) AND (Severity#17 = 4))

=> We can notice that the condition is clearly understand by Spark

With a raw SQL query

(1) Scan csv 
Output [47]: [ID#16, Severity#17, ...]
Batched: false
Location: InMemoryFileIndex [file:/C:/Users/arnau/par-student-spark-atoti/target/classes/csv/US_accidents_Dec20_updated.csv]
PushedFilters: [IsNotNull(Severity), EqualTo(Severity,4)]

(2) Filter [codegen id : 1]
Input [47]: [ID#16, Severity#17, ...]
Condition : (isnotnull(Severity#17) AND (Severity#17 = 4)

=> We have the exact same result than with the scala operators

@arnaudframmery
Copy link
Contributor Author

These query plans are from a group by where we count the number of lines for each level of Severity

With Java Operators

(1) Scan csv 
Output [47]: [ID#16, Severity#17, ...]
Batched: false
Location: InMemoryFileIndex [file:/C:/Users/arnau/par-student-spark-atoti/target/classes/csv/US_accidents_Dec20_updated.csv]

(2) Filter
Input [47]: [ID#16, Severity#17, ...]
Condition : io.atoti.spark.condition.TrueCondition$$Lambda$2647/[email protected]

(3) Project
Output [1]: [Severity#17]
Input [47]: [ID#16, Severity#17, ...]

(4) HashAggregate
Input [1]: [Severity#17]
Keys [1]: [Severity#17]
Functions [1]: [partial_count(1)]
Aggregate Attributes [1]: [count#163L]
Results [2]: [Severity#17, count#164L]

(5) Exchange
Input [2]: [Severity#17, count#164L]
Arguments: hashpartitioning(Severity#17, 200), ENSURE_REQUIREMENTS, [id=#41]

(6) HashAggregate
Input [2]: [Severity#17, count#164L]
Keys [1]: [Severity#17]
Functions [1]: [count(1)]
Aggregate Attributes [1]: [count(1)#110L]
Results [2]: [Severity#17, count(1)#110L AS severity_count#111L]

(7) AdaptiveSparkPlan
Output [2]: [Severity#17, severity_count#111L]
Arguments: isFinalPlan=false

=> The presence of a condition is because of the implementation of our API AggregateQuery() that can handle a condition. By default if no condition is given by the user, a True condition is generated. We can see here that this True condition generate extra steps.

With Scala Operators

(1) Scan csv 
Output [1]: [Severity#17]
Batched: false
Location: InMemoryFileIndex [file:/C:/Users/arnau/par-student-spark-atoti/target/classes/csv/US_accidents_Dec20_updated.csv]
ReadSchema: struct<Severity:int>

(2) HashAggregate
Input [1]: [Severity#17]
Keys [1]: [Severity#17]
Functions [1]: [partial_count(1)]
Aggregate Attributes [1]: [count#163L]
Results [2]: [Severity#17, count#164L]

(3) Exchange
Input [2]: [Severity#17, count#164L]
Arguments: hashpartitioning(Severity#17, 200), ENSURE_REQUIREMENTS, [id=#33]

(4) HashAggregate
Input [2]: [Severity#17, count#164L]
Keys [1]: [Severity#17]
Functions [1]: [count(1)]
Aggregate Attributes [1]: [count(1)#110L]
Results [2]: [Severity#17, count(1)#110L AS severity_count#111L]

(5) AdaptiveSparkPlan
Output [2]: [Severity#17, severity_count#111L]
Arguments: isFinalPlan=false

=> We can see that Spark had been able to optimize the query by removing the filter that is useless

With a raw SQL query

(1) Scan csv 
Output [1]: [Severity#17]
Batched: false
Location: InMemoryFileIndex [file:/C:/Users/arnau/par-student-spark-atoti/target/classes/csv/US_accidents_Dec20_updated.csv]
ReadSchema: struct<Severity:int>

(2) HashAggregate
Input [1]: [Severity#17]
Keys [1]: [Severity#17]
Functions [1]: [partial_count(1)]
Aggregate Attributes [1]: [count#114L]
Results [2]: [Severity#17, count#115L]

(3) Exchange
Input [2]: [Severity#17, count#115L]
Arguments: hashpartitioning(Severity#17, 200), ENSURE_REQUIREMENTS, [id=#33]

(4) HashAggregate
Input [2]: [Severity#17, count#115L]
Keys [1]: [Severity#17]
Functions [1]: [count(1)]
Aggregate Attributes [1]: [count(1)#111L]
Results [2]: [Severity#17, count(1)#111L AS severity_count#110L]

(5) AdaptiveSparkPlan
Output [2]: [Severity#17, severity_count#110L]
Arguments: isFinalPlan=false

=> We can notice that it is exactly the same physical plan that the one with Scala operators

Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant