Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP][KYUUBI #5367] Spark crashes with ClassCastException when resolving a join of masked tables #5468

Open
wants to merge 2 commits into
base: branch-1.7
Choose a base branch
from

Conversation

xza-m
Copy link

@xza-m xza-m commented Oct 19, 2023

Why are the changes needed?

I don't know how to reproduce this bug #5367. Please teach me repeat it. This is my test case.
This is my first time opening a pull request in this project. If there's something wrong,please point it out.
Thanks!

How was this patch tested?

  • Add some test cases that check the changes thoroughly including negative and positive cases if possible

  • Add screenshots for manual tests if appropriate

  • Run test locally before make a pull request

Was this patch authored or co-authored using generative AI tooling?

NO

@cxzl25 cxzl25 changed the title [WIP][KYUUBI #5367]: Spark crashes with ClassCastException when resolving a join of … [WIP][KYUUBI #5367] Spark crashes with ClassCastException when resolving a join of masked tables Oct 19, 2023
@codecov-commenter
Copy link

codecov-commenter commented Oct 19, 2023

Codecov Report

Merging #5468 (3b265df) into branch-1.7 (b933a54) will increase coverage by 0.00%.
The diff coverage is n/a.

@@              Coverage Diff              @@
##             branch-1.7    #5468   +/-   ##
=============================================
  Coverage         54.29%   54.30%           
  Complexity           13       13           
=============================================
  Files               571      571           
  Lines             31629    31629           
  Branches           4277     4277           
=============================================
+ Hits              17174    17176    +2     
+ Misses            12754    12750    -4     
- Partials           1701     1703    +2     

see 4 files with indirect coverage changes

📣 We’re building smart automated test selection to slash your CI/CD build times. Learn more

Copy link
Author

@xza-m xza-m left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

code style format

@xza-m
Copy link
Author

xza-m commented Oct 19, 2023

@yaooqinn Hello, Can you guide this?

@pan3793
Copy link
Member

pan3793 commented Oct 19, 2023

cc @bowenliang123

@bowenliang123
Copy link
Contributor

Hi @pilietis93, any idea from the reporter for this PR trying to reproduce the reported bug in #5092 ?
In which Spark version you have encounter the bug?

@pan3793
Copy link
Member

pan3793 commented Oct 19, 2023

@xza-m Kent and I are out of office until next Monday, so replying may be late

@xza-m
Copy link
Author

xza-m commented Oct 20, 2023

@xza-m Kent and I are out of office until next Monday, so replying may be late

Received. I will wait for the reporter's reply.

@pan3793 pan3793 requested a review from yaooqinn October 25, 2023 04:00
@LoseYSelf
Copy link

LoseYSelf commented Nov 29, 2023

i can provide some detail information,
as the test suit code

val df0 = spark.table("default.src") // only col2 is masked which is not used
      .select($"col0", $"col1")

        doAs("bob",
          df0.as("a")
            .join(
              right = df0.as("b"),
              joinExprs = $"a.col0" === $"b.col0" && $"a.col1" === $"b.col1",
              joinType = "left_outer").explain()) // crashes

the error happens in RuleApplyDataMaskingStage1,

finally in applyDataMasking method,
newPlan is 
Join LeftOuter, ((col0#131 = col0#131) AND (col1#132 = col1#132))
:- SubqueryAlias a
:  +- Project [col0#98, col1#99]
:     +- SubqueryAlias testcat.default.src
:        +- Filter (key#93 < 20)
:           +- RowFilterMarker
:              +- DataMaskingStage0Marker RelationV2[key#93, value1#94, value2#95, value3#96, value4#97, col0#98, col1#99, value5#100] testcat.default.src
:                 +- Project [key#93, md5(cast(cast(value1#94 as string) as binary)) AS value1#116, regexp_replace(regexp_replace(regexp_replace(value2#95, [A-Z], X, 1), [a-z], x, 1), [0-9], n, 1) AS value2#117, regexp_replace(regexp_replace(regexp_replace(value3#96, [A-Z], X, 5), [a-z], x, 5), [0-9], n, 5) AS value3#118, date_trunc(YEAR, value4#97, Some(America/Los_Angeles)) AS value4#119, col0#98, col1#99, concat(regexp_replace(regexp_replace(regexp_replace(left(value5#100, (length(value5#100) - 4)), [A-Z], X, 1), [a-z], x, 1), [0-9], n, 1), right(value5#100, 4)) AS value5#120]
:                    +- RelationV2[key#93, value1#94, value2#95, value3#96, value4#97, col0#98, col1#99, value5#100] testcat.default.src
+- SubqueryAlias b
   +- Project [col0#131, col1#132]
      +- SubqueryAlias testcat.default.src
         +- Filter (key#126 < 20)
            +- RowFilterMarker
               +- DataMaskingStage0Marker RelationV2[key#93, value1#94, value2#95, value3#96, value4#97, col0#98, col1#99, value5#100] testcat.default.src
                  +- Project [key#126, md5(cast(cast(value1#127 as string) as binary)) AS value1#121, regexp_replace(regexp_replace(regexp_replace(value2#128, [A-Z], X, 1), [a-z], x, 1), [0-9], n, 1) AS value2#122, regexp_replace(regexp_replace(regexp_replace(value3#129, [A-Z], X, 5), [a-z], x, 5), [0-9], n, 5) AS value3#123, date_trunc(YEAR, value4#130, Some(America/Los_Angeles)) AS value4#124, col0#131, col1#132, concat(regexp_replace(regexp_replace(regexp_replace(left(value5#133, (length(value5#133) - 4)), [A-Z], X, 1), [a-z], x, 1), [0-9], n, 1), right(value5#133, 4)) AS value5#125]
                     +- RelationV2[key#126, value1#127, value2#128, value3#129, value4#130, col0#131, col1#132, value5#133] testcat.default.src

the plan is 

Join LeftOuter, ((col0#98 = col0#131) AND (col1#99 = col1#132))
:- SubqueryAlias a
:  +- Project [col0#98, col1#99]
:     +- SubqueryAlias testcat.default.src
:        +- Filter (key#93 < 20)
:           +- RowFilterMarker
:              +- DataMaskingStage0Marker RelationV2[key#93, value1#94, value2#95, value3#96, value4#97, col0#98, col1#99, value5#100] testcat.default.src
:                 +- Project [key#93, md5(cast(cast(value1#94 as string) as binary)) AS value1#116, regexp_replace(regexp_replace(regexp_replace(value2#95, [A-Z], X, 1), [a-z], x, 1), [0-9], n, 1) AS value2#117, regexp_replace(regexp_replace(regexp_replace(value3#96, [A-Z], X, 5), [a-z], x, 5), [0-9], n, 5) AS value3#118, date_trunc(YEAR, value4#97, Some(America/Los_Angeles)) AS value4#119, col0#98, col1#99, concat(regexp_replace(regexp_replace(regexp_replace(left(value5#100, (length(value5#100) - 4)), [A-Z], X, 1), [a-z], x, 1), [0-9], n, 1), right(value5#100, 4)) AS value5#120]
:                    +- RelationV2[key#93, value1#94, value2#95, value3#96, value4#97, col0#98, col1#99, value5#100] testcat.default.src
+- SubqueryAlias b
   +- Project [col0#131, col1#132]
      +- SubqueryAlias testcat.default.src
         +- Filter (key#126 < 20)
            +- RowFilterMarker
               +- DataMaskingStage0Marker RelationV2[key#93, value1#94, value2#95, value3#96, value4#97, col0#98, col1#99, value5#100] testcat.default.src
                  +- Project [key#126, md5(cast(cast(value1#127 as string) as binary)) AS value1#121, regexp_replace(regexp_replace(regexp_replace(value2#128, [A-Z], X, 1), [a-z], x, 1), [0-9], n, 1) AS value2#122, regexp_replace(regexp_replace(regexp_replace(value3#129, [A-Z], X, 5), [a-z], x, 5), [0-9], n, 5) AS value3#123, date_trunc(YEAR, value4#130, Some(America/Los_Angeles)) AS value4#124, col0#131, col1#132, concat(regexp_replace(regexp_replace(regexp_replace(left(value5#133, (length(value5#133) - 4)), [A-Z], X, 1), [a-z], x, 1), [0-9], n, 1), right(value5#133, 4)) AS value5#125]
                     +- RelationV2[key#126, value1#127, value2#128, value3#129, value4#130, col0#131, col1#132, value5#133] testcat.default.src

the mainly difference is in SubqueryAlias b,DataMaskingStage0Marker RelationV2 is not same as Project RelationV2

it is changed in spark rule DeduplicateRelations

spark log
=== Applying Rule org.apache.spark.sql.catalyst.analysis.DeduplicateRelations ===
 'Join LeftOuter, (('a.col0 = 'b.col0) AND ('a.col1 = 'b.col1))                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                               'Join LeftOuter, (('a.col0 = 'b.col0) AND ('a.col1 = 'b.col1))
 :- SubqueryAlias a                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                           :- SubqueryAlias a
 :  +- Project [col0#98, col1#99]                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                             :  +- Project [col0#98, col1#99]
 :     +- SubqueryAlias testcat.default.src                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                   :     +- SubqueryAlias testcat.default.src
 :        +- Filter (key#93 < 20)                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                             :        +- Filter (key#93 < 20)
 :           +- RowFilterMarker                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                               :           +- RowFilterMarker
 :              +- DataMaskingStage0Marker RelationV2[key#93, value1#94, value2#95, value3#96, value4#97, col0#98, col1#99, value5#100] testcat.default.src                                                                                                                                                                                                                                                                                                                                                                                                                                                   :              +- DataMaskingStage0Marker RelationV2[key#93, value1#94, value2#95, value3#96, value4#97, col0#98, col1#99, value5#100] testcat.default.src
 :                 +- Project [key#93, md5(cast(cast(value1#94 as string) as binary)) AS value1#116, regexp_replace(regexp_replace(regexp_replace(value2#95, [A-Z], X, 1), [a-z], x, 1), [0-9], n, 1) AS value2#117, regexp_replace(regexp_replace(regexp_replace(value3#96, [A-Z], X, 5), [a-z], x, 5), [0-9], n, 5) AS value3#118, date_trunc(YEAR, value4#97, Some(America/Los_Angeles)) AS value4#119, col0#98, col1#99, concat(regexp_replace(regexp_replace(regexp_replace(left(value5#100, (length(value5#100) - 4)), [A-Z], X, 1), [a-z], x, 1), [0-9], n, 1), right(value5#100, 4)) AS value5#120]   :                 +- Project [key#93, md5(cast(cast(value1#94 as string) as binary)) AS value1#116, regexp_replace(regexp_replace(regexp_replace(value2#95, [A-Z], X, 1), [a-z], x, 1), [0-9], n, 1) AS value2#117, regexp_replace(regexp_replace(regexp_replace(value3#96, [A-Z], X, 5), [a-z], x, 5), [0-9], n, 5) AS value3#118, date_trunc(YEAR, value4#97, Some(America/Los_Angeles)) AS value4#119, col0#98, col1#99, concat(regexp_replace(regexp_replace(regexp_replace(left(value5#100, (length(value5#100) - 4)), [A-Z], X, 1), [a-z], x, 1), [0-9], n, 1), right(value5#100, 4)) AS value5#120]
 :                    +- RelationV2[key#93, value1#94, value2#95, value3#96, value4#97, col0#98, col1#99, value5#100] testcat.default.src                                                                                                                                                                                                                                                                                                                                                                                                                                                                     :                    +- RelationV2[key#93, value1#94, value2#95, value3#96, value4#97, col0#98, col1#99, value5#100] testcat.default.src
 +- SubqueryAlias b                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                           +- SubqueryAlias b
!   +- Project [col0#98, col1#99]                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                +- Project [col0#131, col1#132]
       +- SubqueryAlias testcat.default.src                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                         +- SubqueryAlias testcat.default.src
!         +- Filter (key#93 < 20)                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                      +- Filter (key#126 < 20)
             +- RowFilterMarker                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                           +- RowFilterMarker
                +- DataMaskingStage0Marker RelationV2[key#93, value1#94, value2#95, value3#96, value4#97, col0#98, col1#99, value5#100] testcat.default.src                                                                                                                                                                                                                                                                                                                                                                                                                                                                  +- DataMaskingStage0Marker RelationV2[key#93, value1#94, value2#95, value3#96, value4#97, col0#98, col1#99, value5#100] testcat.default.src
!                  +- Project [key#93, md5(cast(cast(value1#94 as string) as binary)) AS value1#121, regexp_replace(regexp_replace(regexp_replace(value2#95, [A-Z], X, 1), [a-z], x, 1), [0-9], n, 1) AS value2#122, regexp_replace(regexp_replace(regexp_replace(value3#96, [A-Z], X, 5), [a-z], x, 5), [0-9], n, 5) AS value3#123, date_trunc(YEAR, value4#97, Some(America/Los_Angeles)) AS value4#124, col0#98, col1#99, concat(regexp_replace(regexp_replace(regexp_replace(left(value5#100, (length(value5#100) - 4)), [A-Z], X, 1), [a-z], x, 1), [0-9], n, 1), right(value5#100, 4)) AS value5#125]                     +- Project [key#126, md5(cast(cast(value1#127 as string) as binary)) AS value1#121, regexp_replace(regexp_replace(regexp_replace(value2#128, [A-Z], X, 1), [a-z], x, 1), [0-9], n, 1) AS value2#122, regexp_replace(regexp_replace(regexp_replace(value3#129, [A-Z], X, 5), [a-z], x, 5), [0-9], n, 5) AS value3#123, date_trunc(YEAR, value4#130, Some(America/Los_Angeles)) AS value4#124, col0#131, col1#132, concat(regexp_replace(regexp_replace(regexp_replace(left(value5#133, (length(value5#133) - 4)), [A-Z], X, 1), [a-z], x, 1), [0-9], n, 1), right(value5#133, 4)) AS value5#125]
!                     +- RelationV2[key#93, value1#94, value2#95, value3#96, value4#97, col0#98, col1#99, value5#100] testcat.default.src                                                                                                                                                                                                                                                                                                                                                                                                                                                                                          +- RelationV2[key#126, value1#127, value2#128, value3#129, value4#130, col0#131, col1#132, value5#133] testcat.default.src
          

so i think we can inject a rule before RuleApplyDataMaskingStage1 to keep DataMaskingStage0Marker RelationV2 is same as
Project RelationV2
@pan3793

@sauliusvl
Copy link

Coincidentally I also tried debugging this a bit, here's my summary, hopefully it helps:

Before the join happens the table is captured here into the following:

DataMaskingStage0Marker Relation spark_catalog.tbl0[portal#0,user_id#1L,id#2,login#3,real_name#4] parquet
+- Project [portal#0, user_id#1L, id#2, null AS login#261, real_name#4]
   +- Relation spark_catalog.tbl0[portal#0,user_id#1L,id#2,login#3,real_name#4] parquet

All good so far, however when evaluating the join the deduplication rule transforms it into the following:

Join LeftOuter, ((portal#0 = portal#529) AND (id#2 = id#531))
:- SubqueryAlias a
:  +- Project [portal#0, id#2]
:     +- SubqueryAlias spark_catalog.tbl0
:        +- DataMaskingStage0Marker Relation spark_catalog.tbl0[portal#0,user_id#1L,id#2,login#3,real_name#4] parquet
:           +- Project [portal#0, user_id#1L, id#2, null AS login#261, real_name#4,]
:              +- Relation spark_catalog.tbl0[portal#0,user_id#1L,id#2,login#3,real_name#4] parquet
+- SubqueryAlias b
   +- Project [portal#529, id#531]
      +- SubqueryAlias spark_catalog.tbl0
         +- DataMaskingStage0Marker Relation spark_catalog.tbl0[portal#0,user_id#1L,id#2,login#3,real_name#4] parquet
            +- Project [portal#529, user_id#530L, id#531, null AS login#261, real_name#533]
               +- Relation spark_catalog.tbl0[portal#529,user_id#530L,id#531,login#532,real_name#533] parquet

Note that the IDs in the second DataMaskingStage0Marker are not updated - I'd guess it's because the original relation was captured by DataMaskingStage0Marker and Spark has no knowledge about it during the deduplication stage.

Because of this RuleApplyDataMaskingStage1 gets confused: at this point all columns are considered masked in the second relation, because this condition is always false due to rewritten ids. The end result of the stage is this:

DataMaskingStage1Marker
+- Join LeftOuter, ((portal#529 = portal#529) AND (id#531 = id#531))
   :- SubqueryAlias a
   :  +- Project [portal#0, id#2]
   :     +- SubqueryAlias spark_catalog.tbl0
   :        +- DataMaskingStage0Marker Relation spark_catalog.tbl0[portal#0,user_id#1L,id#2,login#3,real_name#4] parquet
   :           +- Project [portal#0, user_id#1L, id#2, null AS login#261, real_name#4]
   :              +- Relation spark_catalog.tbl0[portal#0,user_id#1L,id#2,login#3,real_name#4] parquet
   +- SubqueryAlias b
      +- Project [portal#529, id#531]
         +- SubqueryAlias spark_catalog.tbl0
            +- DataMaskingStage0Marker Relation spark_catalog.tbl0[portal#0,user_id#1L,id#2,login#3,real_name#4] parquet
               +- Project [portal#529, user_id#530L, id#531, null AS login#261, real_name#533]
                  +- Relation spark_catalog.tbl0[portal#529,user_id#530L,id#531,login#532,real_name#533] parquet

i.e. it got wrapped into DataMaskingStage1Marker here because the join condition changed (and looks wrong now). And thus we end up with the class cast exception, as Spark expected this to be a Join.

It wasn't obvious to me how to best solve this, just leaving my findings here.

@pilietis93
Copy link

pilietis93 commented Jan 23, 2024

@xza-m I found the reason why you're not able to reproduce this.

    val df0 = spark.table("default.src").selectExpr("'col0' col0", "'col1' col1")

    doAs(
      "bob",
      assert(
        df0.as("a").join(
          right = df0.as("b"),
          joinExprs = $"a.col0" === $"b.col0" && $"a.col1" === $"b.col1",
          joinType = "left_outer").collect() ===
          Seq(Row("col0", "col1", "col0", "col1"))))

Your test is using derived columns as join keys which are not part of the source table. This generates a different plan which doesn't trigger the bug. I can reproduce this by using key column instead:

      "bob", {
        val df0 = spark.table("default.src")
        val df1 = df0.as("a").join(
          right = df0.as("b"),
          joinExprs = $"a.key" === $"b.key",
          joinType = "left_outer")
        df1.explain(true) //  java.lang.ClassCastException: org.apache.kyuubi.plugin.spark.authz.ranger.datamasking.DataMaskingStage1Marker cannot be cast to org.apache.spark.sql.catalyst.plans.logical.Join
      }
    )

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

7 participants