Skip to content

Operator Translation

Harish Butani edited this page Jan 13, 2022 · 8 revisions

Summary

OracleTable surface in Spark plans embedded in DataSource v2 scans. These are provided by the Oracle Catalog (see catalog integration) and represent a table scan operation in the Spark plan.

We rewrite these plans to push SQL operators applied on top of the DataSource v2 scans into equivalent oracle sql applied on the table(s) represented in the OracleTable structures. The rewritten plans contain DataSource v2 operators with associated Oracle Plans than encapsulates the oracle-sql to execute to process the Oracle SQL.

The DataSource v2 is processed by the Spark engine just like any Datsource v2 scan: see the read-path flow and write path flow documents on how this interaction works.

Currently we perform the rewrite on Catalyst Optimized Plans. We rewrite Optimized Spark Plans so as to minimize behavior changes on Spark execution for any operation not involving Oracle data sets. The current rewrite works on 'any' Spark Plan: parts of the Plan that can be pushed down are converted into DataSource v2 scan operators in the Spark plan. Other parts of the plan are untouched. Even how Spark plans are optimized is unchanged.

Since we rewrite Optimized Spark Plans the pushdown SQL generated has characteristics that match the physical algebra of Spark SQL Engine: for example count distinct and sum distinct are evaluated using the Expand Operator, which gets translated into a lateral join in Oracle SQL.

The basic unit of translated Oracle SQL is an OraQueryBlock that represents a SQl query block with select, where, group-by and join clauses, some of which maybe optional. The translation generates a DAG of OraQueryBlock. The base nodes are table references of Oracle Catalog tables. By and large the translation proceeds in a straightforward way with the caveats that are documented in sub-sections below.

Left Semi Joins

in subquery and exists correlated subquery predicates in Spark SQL get translated into a Left Semi-Join operation. We translate these into in subquery when generating Oracle SQL.

Example 1

For the Spark SQL query:

select c_long
from sparktest.unit_test
where c_int in (select c_int
                from sparktest.unit_test_partitioned
                where c_long = sparktest.unit_test.c_long
                )

the Spark Plan is:

!Project [C_LONG#13L]
!+- Join LeftSemi, ((C_INT#12 = C_INT#29) AND (c_long#30L = C_LONG#13L))
!   :- RelationV2[C_INT#12, C_LONG#13L] SPARKTEST.UNIT_TEST
!   +- RelationV2[C_INT#29, C_LONG#30L] SPARKTEST.UNIT_TEST_PARTITIONED

This gets translated to the following Oracle SQL:

select "C_LONG"
from SPARKTEST.UNIT_TEST
where  ("C_INT", "C_LONG") in
   ( select "C_INT", "C_LONG" from SPARKTEST.UNIT_TEST_PARTITIONED )

Example 2

For the Spark SQL query:

with ssales as
(select ss_item_sk
from store_sales
where ss_customer_sk = 8
),
ssales_other as
(select ss_item_sk
from store_sales
where ss_customer_sk = 10
)
select ss_item_sk
from ssales
where exists (select ssales_other.ss_item_sk
              from ssales_other
              where ssales_other.ss_item_sk = ssales.ss_item_sk
              )

the Spark Plan is:

!Join LeftSemi, (ss_item_sk#183 = SS_ITEM_SK#160)
!:- Project [SS_ITEM_SK#160]
!:  +- Filter (isnotnull(SS_CUSTOMER_SK#161) AND (SS_CUSTOMER_SK#161 = 8.000000000000000000))
!:     +- RelationV2[SS_ITEM_SK#160, SS_CUSTOMER_SK#161] TPCDS.STORE_SALES
!+- RelationV2[SS_ITEM_SK#183] TPCDS.STORE_SALES

This gets translated to the following Oracle SQL:

select "SS_ITEM_SK"
from TPCDS.STORE_SALES
where (("SS_CUSTOMER_SK" IS NOT NULL AND ("SS_CUSTOMER_SK" = ?)) AND  "SS_ITEM_SK" in ( select "SS_ITEM_SK"
from TPCDS.STORE_SALES
where ("SS_CUSTOMER_SK" IS NOT NULL AND ("SS_CUSTOMER_SK" = ?)) ))

Left Anti Joins (not in)

not in subquery predicates in Spark SQL get translated into a Left Anti-Join operation with an 'is null' equality conjunction. Spark generates a Join condition which allows for the equality check of the left and right columns to be null. So for the query:

 select c_long
from sparktest.unit_test
where c_int not in (select C_CURRENT_CDEMO_SK from customer
                where c_long = C_CUSTOMER_SK
                )

The LeftAntiJoin condition is:

  +- Join LeftAnti, (((cast(C_INT#11 as decimal(38,18)) = C_CURRENT_CDEMO_SK#20)
                          OR
                     isnull((cast(C_INT#11 as decimal(38,18)) = C_CURRENT_CDEMO_SK#20))
                   )
                    AND
                    (cast(cast(C_LONG#12L as decimal(20,0)) as decimal(38,18)) = C_CUSTOMER_SK#18))

The Join condition for 'c_int' and 'c_current_cdemo_sk' has the 'isnull(..)' equality check to allow for:

  • sparktest.unit_test rows with null value to be output from the join
  • all sparktest.unit_test rows to be output if there are any null rows in the right subquery.

If the extra join condition matches this pattern then we translate the LeftAnti join into a 'not in subquery' in Oracle SQL.

select "sparkora_0"."C_LONG"
from SPARKTEST.UNIT_TEST "sparkora_0"
where  "C_INT" NOT IN ( select "C_INT"
                        from SPARKTEST.UNIT_TEST_PARTITIONED """.stripMargin + """
                         where ("sparkora_0"."C_LONG" = "C_LONG")
                      )

Left Anti Joins (not exists)

For the Spark SQL query:

select c_long
from sparktest.unit_test
where not exists (select c_int
                from sparktest.unit_test_partitioned
                where c_long = sparktest.unit_test.c_long and
                      c_int = sparktest.unit_test.c_int
                )

the Spark Plan is:

!Project [C_LONG#187L]
!+- Join LeftAnti, ((c_long#204L = C_LONG#187L) AND (c_int#203 = C_INT#186))
!   :- RelationV2[C_INT#186, C_LONG#187L] SPARKTEST.UNIT_TEST
!   +- RelationV2[C_INT#203, C_LONG#204L] SPARKTEST.UNIT_TEST_PARTITIONED

This gets translated to the following Oracle SQL:

select "sparkora_0"."C_LONG"
from SPARKTEST.UNIT_TEST "sparkora_0"
where not exists ( select 1
                   from SPARKTEST.UNIT_TEST_PARTITIONED """.stripMargin + """
                   where (("sparkora_0"."C_LONG" = "C_LONG") AND ("sparkora_0"."C_INT" = "C_INT"))
                  )

Distinct and Cube/Rollup

Spark evaluates count distinct, sum distinct and cube/rollup calculations by expanding (and tagging) input rows into multiple rows. Each class of rows is used for a different aggregation.

Count/Sum Distinct

For example for the following distinct calculations:

  select c_int as ci, c_long as cl,
       sum(distinct c_decimal_scale_8) + count(distinct c_decimal_scale_5)
from sparktest.unit_test
group by  c_int + c_long, c_int, c_long

The optimized Spark Plan is:

Aggregate [(CAST(oracle.sparktest.unit_test.`c_int` AS BIGINT) + oracle.sparktest.unit_test.`c_long`)#34L, c_int#12, c_long#13L], [c_int#12 AS ci#0, c_long#13L AS cl#1L, CheckOverflow((promote_precision(cast(sum(if ((gid#33 = 2)) oracle.sparktest.unit_test.`c_decimal_scale_8`#36 else null) as decimal(36,8))) + promote_precision(cast(cast(count(if ((gid#33 = 1)) oracle.sparktest.unit_test.`c_decimal_scale_5`#35 else null) as decimal(20,0)) as decimal(36,8)))), DecimalType(36,8), true) AS (CAST(sum(DISTINCT c_decimal_scale_8) AS DECIMAL(36,8)) + CAST(CAST(count(DISTINCT c_decimal_scale_5) AS DECIMAL(20,0)) AS DECIMAL(36,8)))#21]
+- Aggregate [(CAST(oracle.sparktest.unit_test.`c_int` AS BIGINT) + oracle.sparktest.unit_test.`c_long`)#34L, c_int#12, c_long#13L, oracle.sparktest.unit_test.`c_decimal_scale_5`#35, oracle.sparktest.unit_test.`c_decimal_scale_8`#36, gid#33], [(CAST(oracle.sparktest.unit_test.`c_int` AS BIGINT) + oracle.sparktest.unit_test.`c_long`)#34L, c_int#12, c_long#13L, oracle.sparktest.unit_test.`c_decimal_scale_5`#35, oracle.sparktest.unit_test.`c_decimal_scale_8`#36, gid#33]
   +- Expand [ArrayBuffer((cast(c_int#12 as bigint) + c_long#13L), c_int#12, c_long#13L, c_decimal_scale_5#15, null, 1), ArrayBuffer((cast(c_int#12 as bigint) + c_long#13L), c_int#12, c_long#13L, null, c_decimal_scale_8#16, 2)], [(CAST(oracle.sparktest.unit_test.`c_int` AS BIGINT) + oracle.sparktest.unit_test.`c_long`)#34L, c_int#12, c_long#13L, oracle.sparktest.unit_test.`c_decimal_scale_5`#35, oracle.sparktest.unit_test.`c_decimal_scale_8`#36, gid#33]
      +- RelationV2[C_INT#12, C_LONG#13L, C_DECIMAL_SCALE_5#15, C_DECIMAL_SCALE_8#16] SPARKTEST.UNIT_TEST
  • Each input row is expanded to 2 rows
    • The first row is tagged with gid = 1; other is tagged with gid = 2
    • The gid=1 row has c_decimal_scale_8 = null
    • The gid=2 row has c_decimal_scale_5 = null
    • The rows also calculate the grouping expressions; in this example they are c_int + c_long, c_int, c_long
    • The rows also output any columns needed in aggregations; in this case they are c_decimal_scale_8, c_decimal_scale_5
  • The first Aggregate operation is for making the values distinct
    • It groups the incoming rows by c_int + c_long, c_int, c_long, c_decimal_scale_5, c_decimal_scale_8, gid.
  • The second Aggregate operation calculates the aggregates:
  Group By: c_int + c_long, c_int, c_long
  Aggregates:
    // gid=2 rows used to calculate sum distinct
    sum(if (gid =2) c_decimal_scale_8 else null)
   // gid=1 rows used to calculate count distinct
   count(if (gid =1) c_decimal_scale_5 else null

When translating to Oracle SQL, we use oracle's lateral inline view. So for the above example the Oracle SQL generated is (showing SQL without Aggregate pushdown):

  select "(CAST(oracle.sparktest.unit_test.`c_int` AS BIGINT) + oracle.sparktest.unit_test.`c_long`)",
       "C_INT", "C_LONG",
       "oracle.sparktest.unit_test.`c_decimal_scale_5`",
       "oracle.sparktest.unit_test.`c_decimal_scale_8`",
       "gid"
from SPARKTEST.UNIT_TEST   ,
     lateral (
        select ("C_INT" + "C_LONG") "(CAST(oracle.sparktest.unit_test.`c_int` AS BIGINT) + oracle.sparktest.unit_test.`c_long`)",
               "C_DECIMAL_SCALE_5" "oracle.sparktest.unit_test.`c_decimal_scale_5`",
               null "oracle.sparktest.unit_test.`c_decimal_scale_8`",
               1 "gid"
        from dual
        union all
        select ("C_INT" + "C_LONG"), null, "C_DECIMAL_SCALE_8", 2 from dual
     )

Cube/Rollup

For example for the following rollup calculations:

  select i_category
                  ,d_year
                  ,d_qoy
                  ,d_moy
                  ,s_store_id
                  ,sum(ss_sales_price*ss_quantity) sumsales
            from store_sales
                ,date_dim
                ,store
                ,item
       where  ss_sold_date_sk=d_date_sk
          and ss_item_sk=i_item_sk
          and ss_store_sk = s_store_sk
          and d_month_seq between 1200 and 1200+11
       group by  rollup(i_category, d_year, d_qoy, d_moy,s_store_id)

The optimized Spark plan is:

Aggregate [i_category#110, d_year#111, d_qoy#112, d_moy#113, s_store_id#114, spark_grouping_id#109L], [i_category#110, d_year#111, d_qoy#112, d_moy#113, s_store_id#114, sum(CheckOverflow((promote_precision(ss_sales_price#14)promote_precision(ss_quantity#11)), DecimalType(38,6), true)) AS sumsales#0]
+- Expand [List(SS_QUANTITY#11, SS_SALES_PRICE#14, i_category#93, d_year#30, d_qoy#34, d_moy#32, s_store_id#53, 0), List(SS_QUANTITY#11, SS_SALES_PRICE#14, i_category#93, d_year#30, d_qoy#34, d_moy#32, null, 1), List(SS_QUANTITY#11, SS_SALES_PRICE#14, i_category#93, d_year#30, d_qoy#34, null, null, 3), List(SS_QUANTITY#11, SS_SALES_PRICE#14, i_category#93, d_year#30, null, null, null, 7), List(SS_QUANTITY#11, SS_SALES_PRICE#14, i_category#93, null, null, null, null, 15), List(SS_QUANTITY#11, SS_SALES_PRICE#14, null, null, null, null, null, 31)], [SS_QUANTITY#11, SS_SALES_PRICE#14, i_category#110, d_year#111, d_qoy#112, d_moy#113, s_store_id#114, spark_grouping_id#109L]
   +- Project [SS_QUANTITY#11, SS_SALES_PRICE#14, i_category#93, d_year#30, d_qoy#34, d_moy#32, s_store_id#53]
      +- Join Inner, (ss_item_sk#3 = i_item_sk#81)
         :- Project [SS_ITEM_SK#3, SS_QUANTITY#11, SS_SALES_PRICE#14, D_YEAR#30, D_MOY#32, D_QOY#34, S_STORE_ID#53]
         :  +- Join Inner, (ss_store_sk#8 = s_store_sk#52)
         :     :- Project [SS_ITEM_SK#3, SS_STORE_SK#8, SS_QUANTITY#11, SS_SALES_PRICE#14, D_YEAR#30, D_MOY#32, D_QOY#34]
         :     :  +- Join Inner, (ss_sold_date_sk#1 = d_date_sk#24)
         :     :     :- Project [SS_SOLD_DATE_SK#1, SS_ITEM_SK#3, SS_STORE_SK#8, SS_QUANTITY#11, SS_SALES_PRICE#14]
         :     :     :  +- Filter isnotnull(SS_STORE_SK#8)
         :     :     :     +- RelationV2[SS_ITEM_SK#3, SS_STORE_SK#8, SS_QUANTITY#11, SS_SALES_PRICE#14, SS_SOLD_DATE_SK#1] TPCDS.STORE_SALES
         :     :     +- Project [D_DATE_SK#24, D_YEAR#30, D_MOY#32, D_QOY#34]
         :     :        +- Filter ((isnotnull(D_MONTH_SEQ#27) AND (D_MONTH_SEQ#27 >= 1200.000000000000000000)) AND (D_MONTH_SEQ#27 <= 1211.000000000000000000))
         :     :           +- RelationV2[D_DATE_SK#24, D_MONTH_SEQ#27, D_YEAR#30, D_MOY#32, D_QOY#34] TPCDS.DATE_DIM
         :     +- RelationV2[S_STORE_SK#52, S_STORE_ID#53] TPCDS.STORE
         +- RelationV2[I_ITEM_SK#81, I_CATEGORY#93] TPCDS.ITEM
  • Each input (after all joins are done) row is expanded for each rollup grouping set; in this case there are five groups, so each row is expanded five times.
  • Each expanded row is assigned a spark_grouping_id value. This is a integer value whose bit string identifies the grouping set that this row belongs to.
  • So spark_grouping_id=1 implies this is the grouping_set: (i_cat, year,qoy, moy)
    • not 0 implies column inclusion in grouping set
  • spark_grouping_id=7 implies this is the grouping_set: (i_cat, year)
  • The aggregation is done on the columns plus the spark_grouping_id column.
    • So aggregation operation :
      • Group by: i_category, d_year, d_qoy, d_moy,s_store_id, spark_grouping_id
      • Aggregations: sum(ss_sales_price*ss_quantity)
  • The output includes the spark_grouping_id column that can be used to identify the grouping set of the output row.

When translating to Oracle SQL, we use oracle's lateral inline view. So for the above example the Oracle SQL generated is (showing SQL without Aggregate pushdown):

  SELECT "SS_QUANTITY",
       "SS_SALES_PRICE",
       "i_category",
       "d_year",
       "d_qoy",
       "d_moy",
       "s_store_id",
       "spark_grouping_id"
FROM TPCDS.STORE_SALES
JOIN TPCDS.DATE_DIM ON ("SS_SOLD_DATE_SK" = "D_DATE_SK")
JOIN TPCDS.STORE ON ("SS_STORE_SK" = "S_STORE_SK")
JOIN TPCDS.ITEM ON ("SS_ITEM_SK" = "I_ITEM_SK") , LATERAL
  (SELECT "I_CATEGORY" "i_category",
          "D_YEAR" "d_year",
          "D_QOY" "d_qoy",
          "D_MOY" "d_moy",
          "S_STORE_ID" "s_store_id",
          0 "spark_grouping_id"
   FROM dual
   UNION ALL SELECT "I_CATEGORY",
                    "D_YEAR",
                    "D_QOY",
                    "D_MOY",
                    NULL,
                    1
   FROM dual
   UNION ALL SELECT "I_CATEGORY",
                    "D_YEAR",
                    "D_QOY",
                    NULL,
                    NULL,
                    3
   FROM dual
   UNION ALL SELECT "I_CATEGORY",
                    "D_YEAR",
                    NULL,
                    NULL,
                    NULL,
                    7
   FROM dual
   UNION ALL SELECT "I_CATEGORY",
                    NULL,
                    NULL,
                    NULL,
                    NULL,
                    15
   FROM dual
   UNION ALL SELECT NULL,
                    NULL,
                    NULL,
                    NULL,
                    NULL,
                    31
   FROM dual)
WHERE (("SS_STORE_SK" IS NOT NULL
        AND "SS_SOLD_DATE_SK" IS NOT NULL)
       AND (("D_MONTH_SEQ" IS NOT NULL
             AND ("D_MONTH_SEQ" >= 1200.000000000000000000))
            AND ("D_MONTH_SEQ" <= 1211.000000000000000000)))

Names in generated Oracle SQL

Ensure 'correct' column names are used in oracle-sql. This entails three things:

  • Where possible use the catalog name of a column instead of the case insensitive name used in Spark.
  • When there are multiple AttributeReferences with the same name, apply de-dup strategies of qualifying names or generating new names.
  • If the Spark name exceeds 30 characters applying truncation.

fix names logic

Column Name case:

In ''Spark SQL'' the default and preferred behavior is that of '''case insensitive''' resolution of column names. The Spark documentation says: 'highly discouraged to turn on case sensitive mode.' This means that if you have a column defined with the mixed case name 'aA', in Spark SQL it is acceptable to refer to it as aa or AA... Further the optimized Logical Plan will carry an AttributeReferences with that name(aa or AA..) used in the SQL statement.

Whereas in Oracle SQL, unquoted names are treated to mean uppercase names. Therefore unquoted names such as aa or AA .. used in a SQL statement are resolved to mean the column 'AA'.

So when generating Oracle SQL from a Spark Optimized Logical Plan we need to walk down the Spark plan to find the Catalog column that an AttributeReferences is for and use that name.

Column Qualification:

In Catalyst Optimized Plans, columns are identified by ExprId; so it is possible to end up with operator shapes that contain multiple columns with the same name. When generating Oracle SQL from these plans we need to disambiguate duplicate names.

We will operate on an Oracle Plan that is output from the Oracle Plan and apply the following disambiguation logic.

  • If there are multiple attributes with the same name among the inputs of a OraQueryBlock, we disambiguate these attributes by qualifying each by the input they are from. Inputs to the QueryBlock are assigned ''qualifiers''.
  • If the output of an Oracle Plan has multiple attributes with the same name, we generate new aliases form duplicate attributes.

Example:

In the following query, C_INT is a column in four table references. The optimized spark plan contains the AttributeReferences C_INT for all of them. The disambiguated oracle-sql is also listed below. C_INT references in the top query block are qualified and in the top select the projections have new aliases associated.

// SQL
select a.c_int, b.c_int, c.c_int, d.c_int
from sparktest.unit_test a,
     sparktest.unit_test_partitioned b,
     sparktest.unit_test c,
     sparktest.unit_test_partitioned d
where a.c_int = b.c_int and b.c_int = c.c_int and c.c_int = d.c_int

// Spark optimized plan without Oracle pushdown
!Join Inner, (c_int#46 = c_int#63)
!:- Join Inner, (c_int#27 = c_int#46)
!:  :- Join Inner, (c_int#10 = c_int#27)
!:  :  :- Filter isnotnull(C_INT#10)
!:  :  :  +- RelationV2[C_INT#10] SPARKTEST.UNIT_TEST
!:  :  +- Filter isnotnull(C_INT#27)
!:  :     +- RelationV2[C_INT#27] SPARKTEST.UNIT_TEST_PARTITIONED
!:  +- Filter isnotnull(C_INT#46)
!:     +- RelationV2[C_INT#46] SPARKTEST.UNIT_TEST
!+- Filter isnotnull(C_INT#63)
!   +- RelationV2[C_INT#63] SPARKTEST.UNIT_TEST_PARTITIONED

// oracle-sql generated
SELECT "sparkora_0"."C_INT" AS "C_INT_1_sparkora",
       "sparkora_1"."C_INT" AS "C_INT_2_sparkora",
       "sparkora_2"."C_INT" AS "C_INT_3_sparkora",
       "sparkora_3"."C_INT" AS "C_INT_4_sparkora"
FROM SPARKTEST.UNIT_TEST "sparkora_0"
JOIN SPARKTEST.UNIT_TEST_PARTITIONED "sparkora_1"
         ON ("sparkora_0"."C_INT" = "sparkora_1"."C_INT")
JOIN SPARKTEST.UNIT_TEST "sparkora_2"
         ON ("sparkora_1"."C_INT" = "sparkora_2"."C_INT")
JOIN SPARKTEST.UNIT_TEST_PARTITIONED "sparkora_3"
         ON ("sparkora_2"."C_INT" = "sparkora_3"."C_INT")
WHERE ((("sparkora_0"."C_INT" IS NOT NULL
         AND "sparkora_1"."C_INT" IS NOT NULL)
        AND "sparkora_2"."C_INT" IS NOT NULL)
       AND "sparkora_3"."C_INT" IS NOT NULL)