#Spark to BigQuery Query Pushdown Usage Instructions and Technical Details
Query Pushdown is an advanced optimization technique in which Spark transformations/actions (hereby referred to as “Spark operations”) performed for reading data into Spark from BigQuery are translated into SQL queries and pushed down to BigQuery from the open-source spark-bigquery-connector (hereby referred to as “connector”) enabling improved read performance. With BigQuery as the data source for Spark, the connector can push large and complex Spark SQL queries to be processed in BigQuery thus bringing the computation next to the data and reducing the I/O overhead. This capability combines the robust query-processing of BigQuery with the computational capabilities of Spark and its ecosystem.
spark-bigquery-connector release 0.27.1 contains the full Query Pushdown functionality. It is released as a Preview release. Query Pushdown can be used with both Spark SQL and DataFrame APIs.
Please take a look at the documentation for Connector to Spark Compatibility Matrix and Connector to Dataproc Image Compatibility Matrix to determine which connector to use for your particular use case.
Note: Query pushdown is not enabled on the Spark session by default. It has to be explicitly enabled.
Note: Query Pushdown is either enabled or disabled on the entire Spark session rather than on particular read queries. If you want to run some queries with Query Pushdown enabled and others with Query Pushdown disabled, you will need to explicitly enable/disable the Query Pushdown functionality as shown in the next section. To validate if the Query Pushdown was used for a particular query, use the Section “Validating/Debugging Query Pushdown”
To enable query pushdown from Python, you will need to create a Spark session with the correct jar and call BigQueryConnectorUtils.enablePushdownSession on the Spark context.
from pyspark.sql import SparkSession
from pyspark import SparkFiles
spark = SparkSession.builder \
.appName('Pushdown demo') \
.config('spark.jars','gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.12-0.27.1.jar') \
.config('spark.submit.pyFiles','gs://spark-lib/bigquery/spark-bigquery-support-0.27.1.zip') \
.config('spark.files','gs://spark-lib/bigquery/spark-bigquery-support-0.27.1.zip') \
.getOrCreate()
# Code to enable query pushdown start
# extract the spark-bigquery-support zip file
import zipfile
with zipfile.ZipFile(SparkFiles.get("spark-bigquery-support-0.27.0.zip")) as zf:
zf.extractall()
try:
import pkg_resources
pkg_resources.declare_namespace(__name__)
except ImportError:
import pkgutil
__path__ = pkgutil.extend_path(__path__, __name__)
# Enable pushdown
from google.cloud.spark.bigquery import big_query_connector_utils
big_query_connector_utils.enablePushdownSession(spark)
# Code to enable query pushdown end
Note: If the above code gives ModuleNotFoundError, you can directly enable pushdown for the Spark session as spark.sparkContext._jvm.com.google.cloud.spark.bigquery.BigQueryConnectorUtils.enablePushdownSession(spark._jsparkSession)
Note: that this will enable query pushdown on the entire Spark session. If you want to disable pushdown, you will need to call BigQueryConnectorUtils.disablePushdownSession
# Disable pushdown
from google.cloud.spark.bigquery import big_query_connector_utils
big_query_connector_utils.disablePushdownSession(spark)
Note: that materializationDataset option has to be passed in during the spark.read call for queries to be pushed down to BigQuery. This dataset holds the temporary tables generated by the SQL query that is run in BigQuery. These temporary tables will get deleted after 24 hours.
To enable Query Pushdown from Java/Scala, you will first need to include the connector as a dependency. The instructions for the same are here. Then, you can call com.google.cloud.spark.bigquery.BigQueryConnectorUtils.enablePushdownSession(spark)
to enable Query Pushdown for your Spark session from your project code.
If you want to disable Query Pushdown for the already enabled Spark session, you can call com.google.cloud.spark.bigquery.BigQueryConnectorUtils.disablePushdownSession(spark)
If you experience any issues with Query Pushdown, you can use df.explain()
on the DataFrame. If you pass the argument true you will get a lot more output that includes pre and post optimization phases.
You can be sure that the entire query has been pushed down if you only see Spark(Version)BigQueryPushdownPlan as part of the physical plan output.
Note: If you are running jobs on Dataproc using the connector, the job logs will also give information about any errors encountered during query pushdown
The following Spark functions/operations are supported in Query Pushdown.
Spark Logical Plan Construct | Spark SQL function name |
---|---|
Average | avg |
Corr | corr |
CovPopulation | covar_pop |
CovSample | covar_samp |
Count | count |
Max | max |
Min | min |
StddevPop | stddev_pop |
StddevSamp | stddev_samp |
Sum | sum |
VariancePop | var_pop |
VarianceSamp | var_samp |
Spark Logical Plan Construct | Spark SQL function name |
---|---|
And | and |
Between | between |
Contains | contains |
EndsWith | endswith |
EqualNullSafe | <=> |
EqualTo | == |
GreaterThan | > |
GreaterThanOrEqual | >= |
In | in |
IsNull | isnull |
IsNotNull | isnotnull |
LessThan | < |
LessThanOrEqual | <= |
Not | ! |
Or | or |
StartsWith | startswith |
NotEqual | != |
NotGreaterThan | < |
NotGreaterThanOrEqual | <= |
NotLessThan | > |
NotLessThanOrEqual | >= |
Spark Logical Plan Construct | Spark SQL function name |
---|---|
‘+’ (addition) | + |
‘-‘ (subtraction) | - |
‘*’(multiplication) | * |
‘/’ (division) | / |
‘-‘ (unary negation) | - |
Abs | abs |
Acos | acos |
Asin | asin |
Atan | atan |
CheckOverflow | |
Cos | cos |
Cosh | cosh |
Exp | exp |
Floor | floor |
Greatest | greatest |
Least | least |
Log | log |
Log10 | log10 |
Pi | pi |
Pow | pow |
PromotePrecision | |
Rand | rand |
Round | round |
Sin | sin |
Sinh | sinh |
Sqrt | sqrt |
Tan | tan |
Tanh | tanh |
IsNan | isnan |
SigNum | signum |
Spark Logical Plan Construct | Spark SQL function name |
---|---|
Alias (AS expressions) | |
BitwiseAnd | Bit_and ,& |
BitwiseNot | ~ |
BitwiseOr | Bit_or |
BitwiseXor | Bit_xor, ^ |
CaseWhen | case |
Cast(child, t, _) | cast |
Coalesce | coalesce |
If | if |
IsNull | ifnull |
IsNotNull | isnotnull |
ScalarSubquery | |
ShiftLeft | shiftleft |
ShiftRight | shiftright |
SortOrder | |
UnscaledValue |
Spark Logical Plan Construct | Spark SQL function name |
---|---|
Ascii | ascii |
Concat(children) | concat |
Contains | contains |
EndsWith | endswith |
Length | length |
Like | like |
Lower | lower |
StartsWith | startswith |
StringLPad | lpad |
StringRPad | rpad |
StringTranslate | translate |
StringTrim | trim |
StringTrimLeft | ltrim |
StringTrimRight | rtrim |
Substring | Substring, substr |
Upper | upper |
StringInstr | instr |
InitCap | initcap |
SoundEx | soundex |
RegExpExtract | regexp_extract |
RegExpReplace | regexp_replace |
FormatString | format_string |
FormatNumber | format_number |
Base64 | base64 |
UnBase64 | unbase64 |
Upper | upper |
Spark Logical Plan Construct | Spark SQL function name |
---|---|
DateAdd | date_add |
DateSub | date_sub |
Month | month |
Quarter | quarter |
DateTrunc | date_trunc |
Year | year |
Spark Logical Plan Construct | Spark SQL function name |
---|---|
DenseRank | dense_rank |
Rank | rank |
RowNumber | row_number |
PercentRank | percent_rank |
Spark Logical Plan Construct | Spark SQL function name |
---|---|
Aggregate functions and group-by clauses | GROUP BY clause |
Distinct | DISTINCT clause |
Exists | EXISTS clause |
Filters | WHERE clause |
In/InSet | IN clause |
Having | HAVING clause |
Joins (Inner, LeftOuter, RightOuter, FullOuter, Cross, LeftSemi, LeftAnti) | JOIN clause |
Limits | LIMIT clause |
Projections | SELECT clause |
Sorts (ORDER BY) | ORDER BY and SORT BY clauses |
Union and Union All | UNION and UNION ALL clauses |
Window functions and Windowing clauses | WINDOW clause |
- Because the translation in Query Pushdown from the Spark LogicalPlan to SQL requires almost a one-to-one translation of Spark SQL operators to BigQuery expressions, not all of Spark SQL operators can be pushed down. When the pushdown fails, the connector falls back to the original execution plan. The unsupported operations are instead performed in Spark.
- Query Pushdown will guarantee ordered results only if there is an outer ORDER BY clause in the Spark SQL query and the total number of results can fit in a single partition.
- Spark UDFs cannot be pushed down to BigQuery
- Query Pushdown functionality is not bug-for-bug compatible with Spark. So, the results will be based on BigQuery’s behavior (which follows SQL standard)
- Full outer, right outer and left outer joins return WrappedArray instead of null, if the schema type is RECORD
To understand how query pushdown works, let’s take a look at the typical flow of a Spark DataFrame query. The Catalyst optimizer (a general library for representing and performing Spark operations) performs a set of source-agnostic optimizations on the logical plan of a DataFrame. Since DataFrames are executed lazily, Spark can evaluate and optimize relational operators applied to a DataFrame and only execute the DataFrame when an action is invoked.
When an action is invoked, Spark’s Catalyst optimizer first produces an optimized logical plan. The process then moves to the physical planning stage.
Let us consider a simple join query
select ss_item_sk, ss_ticket_number, ss_customer_sk
from store_sales
join store_returns
on (sr_item_sk = ss_item_sk and sr_ticket_number = ss_ticket_number)
Without Pushdown, the above Query is translated into a Physical Plan as
== Physical Plan ==
*(2) Project [ss_item_sk#748L, ss_ticket_number#755L, ss_customer_sk#749L]
+- *(2) BroadcastHashJoin [ss_item_sk#748L, ss_ticket_number#755L], [sr_item_sk#794L, sr_ticket_number#801L], Inner, BuildRight
:- *(2) Scan com.google.cloud.spark.bigquery.direct.DirectBigQueryRelation@69e8f7a5 [ss_item_sk#748L,ss_customer_sk#749L,ss_ticket_number#755L] PushedFilters: [*IsNotNull(ss_item_sk), *IsNotNull(ss_ticket_number)], ReadSchema: struct<ss_item_sk:bigint,ss_customer_sk:bigint,ss_ticket_number:bigint>
+- BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, true], input[1, bigint, true]))
+- *(1) Scan com.google.cloud.spark.bigquery.direct.DirectBigQueryRelation@738153d8 [sr_item_sk#794L,sr_ticket_number#801L] PushedFilters: [*IsNotNull(sr_item_sk), *IsNotNull(sr_ticket_number)], ReadSchema: struct<sr_item_sk:bigint,sr_ticket_number:bigint>
The Physical Plan need to scan both the tables before performing the Join. This approach is typically not ideal for a more capable Spark data source, such as BigQuery since the data needs to be transferred over the network.
With Pushdown enabled, Catalyst inserts a BigQuery plan as
== Physical Plan ==
Spark24BigQueryPushdownPlan [SUBQUERY_14_COL_0#748L, SUBQUERY_14_COL_1#755L, SUBQUERY_14_COL_2#749L], PreScala213BigQueryRDD[4] at RDD at PreScala213BigQueryRDD.java:61
This generated SQL is pushed down and executed in BigQuery and the result is sent to the Spark executor nodes. Which corresponds to a significant reduce in data that needs to be transferred to Spark over the network to improve the response times.
In the connector, customers can read data in Spark from a BigQuery table by using the DataFrame APIs or Spark SQL. An example using Spark SQL is shown below
val wordsDF = spark.read.bigquery("bigquery-public-data.samples.shakespeare").cache()
wordsDF.createOrReplaceTempView("words")
// Perform word count.
val wordCountDF = spark.sql("SELECT word, SUM(word_count) AS word_count FROM words GROUP BY word")
wordCountDF.show()
When a read is issued, the connector performs the following steps on the Spark driver.
- Reads the corresponding table metadata using the BigQuery API.
- Creates a ReadSession using the BigQuery Storage Read API due to which multiple streams are returned over the table data that needs to be read
- Distributes the streams across Spark executors which are responsible for actually reading the data and storing it in DataFrames
Query pushdown is an optimization in this process in which the connector code in the Spark driver performs a translation of the Spark operations into BigQuery Standard SQL and creates a temporary table in BigQuery which is then read by Spark executors. Thus, the data that needs to be read by the executors is (potentially) significantly reduced.
- Reads the corresponding table metadata using the BigQuery API.
- Generate SQL by translating the Optimized Spark LogicalPlan. Create temorary table in BigQuery from the generated SQL.
- Creates a ReadSession over the temporary table created in Step 2.
- Distributes the streams across Spark executors which are responsible for actually reading the data and storing it in DataFrames