-
Notifications
You must be signed in to change notification settings - Fork 10
Operator Translation
- Summary
- Left Semi Joins
- Left Anti Joins (not in)
- Left Anti Joins (not exists)
- Distinct and Cube/Rollup
- Names in generated Oracle sql
OracleTable surface in Spark Plans embedded in DataSource v2 scans. These are provided by the Oracle Catalog (see catalog integration) and represent a table scan operation in the Spark Plan.
We rewrite these plans to push SQL operators applied on top of the DataSource v2 scans into equivalent
oracle sql applied on the table(s) represented in the OracleTable
structures. The
rewritten plans contain DataSource v2
operators with associated Oracle Plans
than encapsulates the oracle-sql to execute to process the oracle sql.
The DataSource v2 is processed by the Spark engine just like any Datsource v2 scan: see the read-path flow and write path flow documents on how this interaction.
Currently we perform the rewrite on Catalyst Optimized Plans. We rewrite Optimized Spark Plans so as to minimize behavior changes on Spark execution for any operation not involving Oracle datasets. The current rewrite works on 'any' Spark Plan: parts of the Plan that can be pushed down are converted into DataSource v2 scan operators in the Spark Plan. Other parts of the Plan are untouched. Even how Spark Plans are optimized is unchanged.
Since we rewrite Optimized Spark Plans the pushdown sql generated has characteristics that match the
physical algebra of Spark SQL Engine: for example count distinct
and sum distinct
are evaluated using the
Expand Operator,
which gets translated into a lateral join
in oracle sql.
The basic unit of translated orale sql is a OraQueryBlock
that represents a SQl query block with a select, where, group-by and join
clauses, some of which maybe optional.
The translation generates a DAG of OraQueryBlock
, the base nodes are table references of Oracle Catalog tables.
By and large the translation proceeds in a straight-forward way with the caveats that are documented in sub-sections
below. All tpcds query translations can be found here ...
in subquery
and exists correlated subquery
predicates in Spark SQL
get translated into a Left Semi-Join
operation.
We translate these into in subquery
when generating Oracle SQL.
For the Spark SQL query:
select c_long
from sparktest.unit_test
where c_int in (select c_int
from sparktest.unit_test_partitioned
where c_long = sparktest.unit_test.c_long
)
the Spark Plan is:
!Project [C_LONG#13L]
!+- Join LeftSemi, ((C_INT#12 = C_INT#29) AND (c_long#30L = C_LONG#13L))
! :- RelationV2[C_INT#12, C_LONG#13L] SPARKTEST.UNIT_TEST
! +- RelationV2[C_INT#29, C_LONG#30L] SPARKTEST.UNIT_TEST_PARTITIONED
This gets translated to the following Oracle SQL:
select "C_LONG"
from SPARKTEST.UNIT_TEST
where ("C_INT", "C_LONG") in
( select "C_INT", "C_LONG" from SPARKTEST.UNIT_TEST_PARTITIONED )
For the Spark SQL query:
with ssales as
(select ss_item_sk
from store_sales
where ss_customer_sk = 8
),
ssales_other as
(select ss_item_sk
from store_sales
where ss_customer_sk = 10
)
select ss_item_sk
from ssales
where exists (select ssales_other.ss_item_sk
from ssales_other
where ssales_other.ss_item_sk = ssales.ss_item_sk
)
the Spark Plan is:
!Join LeftSemi, (ss_item_sk#183 = SS_ITEM_SK#160)
!:- Project [SS_ITEM_SK#160]
!: +- Filter (isnotnull(SS_CUSTOMER_SK#161) AND (SS_CUSTOMER_SK#161 = 8.000000000000000000))
!: +- RelationV2[SS_ITEM_SK#160, SS_CUSTOMER_SK#161] TPCDS.STORE_SALES
!+- RelationV2[SS_ITEM_SK#183] TPCDS.STORE_SALES
This gets translated to the following Oracle SQL:
select "SS_ITEM_SK"
from TPCDS.STORE_SALES
where (("SS_CUSTOMER_SK" IS NOT NULL AND ("SS_CUSTOMER_SK" = ?)) AND "SS_ITEM_SK" in ( select "SS_ITEM_SK"
from TPCDS.STORE_SALES
where ("SS_CUSTOMER_SK" IS NOT NULL AND ("SS_CUSTOMER_SK" = ?)) ))
not in subquery
predicates in Spark SQL
get translated into a Left Anti-Join
operation with a
'is null' equality conjunction. Spark generates a Join condition
which allows for the equality check of the left and right columns to be null.
So for the query:
select c_long
from sparktest.unit_test
where c_int not in (select C_CURRENT_CDEMO_SK from customer
where c_long = C_CUSTOMER_SK
)
The LeftAntiJoin
condition is:
+- Join LeftAnti, (((cast(C_INT#11 as decimal(38,18)) = C_CURRENT_CDEMO_SK#20)
OR
isnull((cast(C_INT#11 as decimal(38,18)) = C_CURRENT_CDEMO_SK#20))
)
AND
(cast(cast(C_LONG#12L as decimal(20,0)) as decimal(38,18)) = C_CUSTOMER_SK#18))
The join condition for 'c_int' and 'c_current_cdemo_sk' has the 'isnull(..)' equality check to allow for:
-
sparktest.unit_test
rows with null value to be output from the join - all
sparktest.unit_test
rows to be output if there are any null rows in the right subquery.
If the extra join
condition matches this pattern then we translate the LeftAnti
join into a 'not in subquery' in Oracle SQL.
select "sparkora_0"."C_LONG"
from SPARKTEST.UNIT_TEST "sparkora_0"
where "C_INT" NOT IN ( select "C_INT"
from SPARKTEST.UNIT_TEST_PARTITIONED """.stripMargin + """
where ("sparkora_0"."C_LONG" = "C_LONG")
)
For the Spark SQL query:
select c_long
from sparktest.unit_test
where not exists (select c_int
from sparktest.unit_test_partitioned
where c_long = sparktest.unit_test.c_long and
c_int = sparktest.unit_test.c_int
)
the Spark Plan is:
!Project [C_LONG#187L]
!+- Join LeftAnti, ((c_long#204L = C_LONG#187L) AND (c_int#203 = C_INT#186))
! :- RelationV2[C_INT#186, C_LONG#187L] SPARKTEST.UNIT_TEST
! +- RelationV2[C_INT#203, C_LONG#204L] SPARKTEST.UNIT_TEST_PARTITIONED
This gets translated to the following Oracle SQL:
select "sparkora_0"."C_LONG"
from SPARKTEST.UNIT_TEST "sparkora_0"
where not exists ( select 1
from SPARKTEST.UNIT_TEST_PARTITIONED """.stripMargin + """
where (("sparkora_0"."C_LONG" = "C_LONG") AND ("sparkora_0"."C_INT" = "C_INT"))
)
Spark evaluates count distinct
, sum distinct
and cube/rollup
calculations by
expanding(and tagging) input rows into multiple rows. Each class of rows
is used for a different aggregation.
For example for the following distinct calculations:
select c_int as ci, c_long as cl,
sum(distinct c_decimal_scale_8) + count(distinct c_decimal_scale_5)
from sparktest.unit_test
group by c_int + c_long, c_int, c_long
The optimized Spark Plan is:
Aggregate [(CAST(oracle.sparktest.unit_test.`c_int` AS BIGINT) + oracle.sparktest.unit_test.`c_long`)#34L, c_int#12, c_long#13L], [c_int#12 AS ci#0, c_long#13L AS cl#1L, CheckOverflow((promote_precision(cast(sum(if ((gid#33 = 2)) oracle.sparktest.unit_test.`c_decimal_scale_8`#36 else null) as decimal(36,8))) + promote_precision(cast(cast(count(if ((gid#33 = 1)) oracle.sparktest.unit_test.`c_decimal_scale_5`#35 else null) as decimal(20,0)) as decimal(36,8)))), DecimalType(36,8), true) AS (CAST(sum(DISTINCT c_decimal_scale_8) AS DECIMAL(36,8)) + CAST(CAST(count(DISTINCT c_decimal_scale_5) AS DECIMAL(20,0)) AS DECIMAL(36,8)))#21]
+- Aggregate [(CAST(oracle.sparktest.unit_test.`c_int` AS BIGINT) + oracle.sparktest.unit_test.`c_long`)#34L, c_int#12, c_long#13L, oracle.sparktest.unit_test.`c_decimal_scale_5`#35, oracle.sparktest.unit_test.`c_decimal_scale_8`#36, gid#33], [(CAST(oracle.sparktest.unit_test.`c_int` AS BIGINT) + oracle.sparktest.unit_test.`c_long`)#34L, c_int#12, c_long#13L, oracle.sparktest.unit_test.`c_decimal_scale_5`#35, oracle.sparktest.unit_test.`c_decimal_scale_8`#36, gid#33]
+- Expand [ArrayBuffer((cast(c_int#12 as bigint) + c_long#13L), c_int#12, c_long#13L, c_decimal_scale_5#15, null, 1), ArrayBuffer((cast(c_int#12 as bigint) + c_long#13L), c_int#12, c_long#13L, null, c_decimal_scale_8#16, 2)], [(CAST(oracle.sparktest.unit_test.`c_int` AS BIGINT) + oracle.sparktest.unit_test.`c_long`)#34L, c_int#12, c_long#13L, oracle.sparktest.unit_test.`c_decimal_scale_5`#35, oracle.sparktest.unit_test.`c_decimal_scale_8`#36, gid#33]
+- RelationV2[C_INT#12, C_LONG#13L, C_DECIMAL_SCALE_5#15, C_DECIMAL_SCALE_8#16] SPARKTEST.UNIT_TEST
- each input row is expanded to 2 rows
- 1 row is tagged with
gid = 1
; other is tagged withgid = 2
- the
gid=1
row hasc_decimal_scale_8 = null
- the
gid=2
row hasc_decimal_scale_5 = null
- the rows also calculate the grouping expressions; in this example they are
c_int + c_long, c_int, c_long
- the rows also output any columns needed in aggregations; in this case they are
c_decimal_scale_8, c_decimal_scale_5
- 1 row is tagged with
- The first Aggregate operation is for making the values distinct
- it groups the incoming rows by
c_int + c_long, c_int, c_long, c_decimal_scale_5, c_decimal_scale_8, gid
- it groups the incoming rows by
- The second Aggregate operation calculates the aggregates:
Group By: c_int + c_long, c_int, c_long
Aggregates:
// gid=2 rows used to calculate sum distinct
sum(if (gid =2) c_decimal_scale_8 else null)
// gid=1 rows used to calculate count distinct
count(if (gid =1) c_decimal_scale_5 else null
When translating to Oracle SQL, we use oracle's lateral inline view. So for the above example the oracle sql generated is(showing SQL w/o Aggregate pushdown):
select "(CAST(oracle.sparktest.unit_test.`c_int` AS BIGINT) + oracle.sparktest.unit_test.`c_long`)",
"C_INT", "C_LONG",
"oracle.sparktest.unit_test.`c_decimal_scale_5`",
"oracle.sparktest.unit_test.`c_decimal_scale_8`",
"gid"
from SPARKTEST.UNIT_TEST ,
lateral (
select ("C_INT" + "C_LONG") "(CAST(oracle.sparktest.unit_test.`c_int` AS BIGINT) + oracle.sparktest.unit_test.`c_long`)",
"C_DECIMAL_SCALE_5" "oracle.sparktest.unit_test.`c_decimal_scale_5`",
null "oracle.sparktest.unit_test.`c_decimal_scale_8`",
1 "gid"
from dual
union all
select ("C_INT" + "C_LONG"), null, "C_DECIMAL_SCALE_8", 2 from dual
)
For example for the following rollup calculations:
select i_category
,d_year
,d_qoy
,d_moy
,s_store_id
,sum(ss_sales_price*ss_quantity) sumsales
from store_sales
,date_dim
,store
,item
where ss_sold_date_sk=d_date_sk
and ss_item_sk=i_item_sk
and ss_store_sk = s_store_sk
and d_month_seq between 1200 and 1200+11
group by rollup(i_category, d_year, d_qoy, d_moy,s_store_id)
The optimized Spark Plan is:
Aggregate [i_category#110, d_year#111, d_qoy#112, d_moy#113, s_store_id#114, spark_grouping_id#109L], [i_category#110, d_year#111, d_qoy#112, d_moy#113, s_store_id#114, sum(CheckOverflow((promote_precision(ss_sales_price#14)promote_precision(ss_quantity#11)), DecimalType(38,6), true)) AS sumsales#0]
+- Expand [List(SS_QUANTITY#11, SS_SALES_PRICE#14, i_category#93, d_year#30, d_qoy#34, d_moy#32, s_store_id#53, 0), List(SS_QUANTITY#11, SS_SALES_PRICE#14, i_category#93, d_year#30, d_qoy#34, d_moy#32, null, 1), List(SS_QUANTITY#11, SS_SALES_PRICE#14, i_category#93, d_year#30, d_qoy#34, null, null, 3), List(SS_QUANTITY#11, SS_SALES_PRICE#14, i_category#93, d_year#30, null, null, null, 7), List(SS_QUANTITY#11, SS_SALES_PRICE#14, i_category#93, null, null, null, null, 15), List(SS_QUANTITY#11, SS_SALES_PRICE#14, null, null, null, null, null, 31)], [SS_QUANTITY#11, SS_SALES_PRICE#14, i_category#110, d_year#111, d_qoy#112, d_moy#113, s_store_id#114, spark_grouping_id#109L]
+- Project [SS_QUANTITY#11, SS_SALES_PRICE#14, i_category#93, d_year#30, d_qoy#34, d_moy#32, s_store_id#53]
+- Join Inner, (ss_item_sk#3 = i_item_sk#81)
:- Project [SS_ITEM_SK#3, SS_QUANTITY#11, SS_SALES_PRICE#14, D_YEAR#30, D_MOY#32, D_QOY#34, S_STORE_ID#53]
: +- Join Inner, (ss_store_sk#8 = s_store_sk#52)
: :- Project [SS_ITEM_SK#3, SS_STORE_SK#8, SS_QUANTITY#11, SS_SALES_PRICE#14, D_YEAR#30, D_MOY#32, D_QOY#34]
: : +- Join Inner, (ss_sold_date_sk#1 = d_date_sk#24)
: : :- Project [SS_SOLD_DATE_SK#1, SS_ITEM_SK#3, SS_STORE_SK#8, SS_QUANTITY#11, SS_SALES_PRICE#14]
: : : +- Filter isnotnull(SS_STORE_SK#8)
: : : +- RelationV2[SS_ITEM_SK#3, SS_STORE_SK#8, SS_QUANTITY#11, SS_SALES_PRICE#14, SS_SOLD_DATE_SK#1] TPCDS.STORE_SALES
: : +- Project [D_DATE_SK#24, D_YEAR#30, D_MOY#32, D_QOY#34]
: : +- Filter ((isnotnull(D_MONTH_SEQ#27) AND (D_MONTH_SEQ#27 >= 1200.000000000000000000)) AND (D_MONTH_SEQ#27 <= 1211.000000000000000000))
: : +- RelationV2[D_DATE_SK#24, D_MONTH_SEQ#27, D_YEAR#30, D_MOY#32, D_QOY#34] TPCDS.DATE_DIM
: +- RelationV2[S_STORE_SK#52, S_STORE_ID#53] TPCDS.STORE
+- RelationV2[I_ITEM_SK#81, I_CATEGORY#93] TPCDS.ITEM
- each input(after all joins are done) row is expanded for each rollup grouping set; in this case there are 5 groups, so each row is expanded 5 times.
- each expanded row is assigned a
spark_grouping_id
value. This is a integer value whose bit string identifies the grouping set that this row belongs to. - So
spark_grouping_id=1
implies this is the grouping_set:(i_cat, year,qoy, moy)
- not
0
implies column inclusion in grouping set
- not
-
spark_grouping_id=7
implies this is the grouping_set:(i_cat, year)
- The aggregation is done on the columns plus the
spark_grouping_id
column.- So aggregation operation :
- Group by:
i_category, d_year, d_qoy, d_moy,s_store_id, spark_grouping_id
- Aggregations:
sum(ss_sales_price*ss_quantity)
- Group by:
- So aggregation operation :
- The output includes the
spark_grouping_id
column that can be used to identify the grouping set of the output row.
When translating to Oracle SQL, we use oracle's lateral inline view. So for the above example the oracle sql generated is(showing SQL w/o Aggregate pushdown):
SELECT "SS_QUANTITY",
"SS_SALES_PRICE",
"i_category",
"d_year",
"d_qoy",
"d_moy",
"s_store_id",
"spark_grouping_id"
FROM TPCDS.STORE_SALES
JOIN TPCDS.DATE_DIM ON ("SS_SOLD_DATE_SK" = "D_DATE_SK")
JOIN TPCDS.STORE ON ("SS_STORE_SK" = "S_STORE_SK")
JOIN TPCDS.ITEM ON ("SS_ITEM_SK" = "I_ITEM_SK") , LATERAL
(SELECT "I_CATEGORY" "i_category",
"D_YEAR" "d_year",
"D_QOY" "d_qoy",
"D_MOY" "d_moy",
"S_STORE_ID" "s_store_id",
0 "spark_grouping_id"
FROM dual
UNION ALL SELECT "I_CATEGORY",
"D_YEAR",
"D_QOY",
"D_MOY",
NULL,
1
FROM dual
UNION ALL SELECT "I_CATEGORY",
"D_YEAR",
"D_QOY",
NULL,
NULL,
3
FROM dual
UNION ALL SELECT "I_CATEGORY",
"D_YEAR",
NULL,
NULL,
NULL,
7
FROM dual
UNION ALL SELECT "I_CATEGORY",
NULL,
NULL,
NULL,
NULL,
15
FROM dual
UNION ALL SELECT NULL,
NULL,
NULL,
NULL,
NULL,
31
FROM dual)
WHERE (("SS_STORE_SK" IS NOT NULL
AND "SS_SOLD_DATE_SK" IS NOT NULL)
AND (("D_MONTH_SEQ" IS NOT NULL
AND ("D_MONTH_SEQ" >= 1200.000000000000000000))
AND ("D_MONTH_SEQ" <= 1211.000000000000000000)))
Ensure 'correct' column names used in oracle-sql. This entails 3 things:
- where possible use the catalog name of a column instead of the case insensitive name used in Spark.
- When there are multiple AttributeReferences with the same name apply de-dup strategies of qualifying names or generating new names.
- If Spark name exceed 30 characters applying truncation.
Column Name case:
In ''Spark SQL'' the default and preferred behavior is that of '''case insensitive''' resolution of column names. Spark documenation says: 'highly discouraged to turn on case sensitive mode.' This means that if you have a column defined with the mixed case name 'aA', in Spark SQL it is ok to refer to it as aa or AA... Further the optimized Logical Plan will carry an AttributeReferences with that name(aa or AA..) used in the sql statement.
Whereas in oracle sql unquoted names are treated to mean upper-case names. So unquoted names such as aa or AA .. used in a sql statement are resolved to mean the column 'AA'.
So when generating oracle sql from a Spark Optimized Logical Plan we need to walk down the Spark Plan to find the Catalog column that an AttributeReferences is for and use that name.
Column Qualification:
In Catalyst Optimized Plans columns
are identified by ExprId; so it is
possible to end up with Operator shapes that contain multiple columns with the
same name. When generating Oracle SQL
from these plans we need to disambiguate
duplicate names.
We will operate on a Oracle Plan that is output from the Oracle Plan and apply the following disambiguation logic.
- if there are multiple attributes with the same name among the inputs of a OraQueryBlock, we disambiguate these attributes by qualifying each by the input they are from. Inputs to the QueryBlock are assigned ''qualifiers''.
- if the output of a Oracle Plan have multiple attributes with the same name. We generate new aliases form duplicate attributes.
Example:
In the following query C_INT
is a column in 4 table references. The optimized spark plan
contains the AttributeReferences C_INT
for all of them. With the disambiguated oracle-sql is also listed below. C_INT
references in the top query block are qualified and in the top select the projections
have new aliases associated.
// SQL
select a.c_int, b.c_int, c.c_int, d.c_int
from sparktest.unit_test a,
sparktest.unit_test_partitioned b,
sparktest.unit_test c,
sparktest.unit_test_partitioned d
where a.c_int = b.c_int and b.c_int = c.c_int and c.c_int = d.c_int
// Spark optimized plan without oracle pushdown
!Join Inner, (c_int#46 = c_int#63)
!:- Join Inner, (c_int#27 = c_int#46)
!: :- Join Inner, (c_int#10 = c_int#27)
!: : :- Filter isnotnull(C_INT#10)
!: : : +- RelationV2[C_INT#10] SPARKTEST.UNIT_TEST
!: : +- Filter isnotnull(C_INT#27)
!: : +- RelationV2[C_INT#27] SPARKTEST.UNIT_TEST_PARTITIONED
!: +- Filter isnotnull(C_INT#46)
!: +- RelationV2[C_INT#46] SPARKTEST.UNIT_TEST
!+- Filter isnotnull(C_INT#63)
! +- RelationV2[C_INT#63] SPARKTEST.UNIT_TEST_PARTITIONED
// oracle-sql generated
SELECT "sparkora_0"."C_INT" AS "C_INT_1_sparkora",
"sparkora_1"."C_INT" AS "C_INT_2_sparkora",
"sparkora_2"."C_INT" AS "C_INT_3_sparkora",
"sparkora_3"."C_INT" AS "C_INT_4_sparkora"
FROM SPARKTEST.UNIT_TEST "sparkora_0"
JOIN SPARKTEST.UNIT_TEST_PARTITIONED "sparkora_1"
ON ("sparkora_0"."C_INT" = "sparkora_1"."C_INT")
JOIN SPARKTEST.UNIT_TEST "sparkora_2"
ON ("sparkora_1"."C_INT" = "sparkora_2"."C_INT")
JOIN SPARKTEST.UNIT_TEST_PARTITIONED "sparkora_3"
ON ("sparkora_2"."C_INT" = "sparkora_3"."C_INT")
WHERE ((("sparkora_0"."C_INT" IS NOT NULL
AND "sparkora_1"."C_INT" IS NOT NULL)
AND "sparkora_2"."C_INT" IS NOT NULL)
AND "sparkora_3"."C_INT" IS NOT NULL)
- Quick Start
- Latest Demo
- Configuration
- Catalog
- Translation
- Query Splitting details
- DML Operations
- Language Integration
- Dockerized Demo env.
- Sharded Database
- Developer Notes