From 0af9f2a79cfde766b36685fbf2db32fbcec8fb7c Mon Sep 17 00:00:00 2001 From: Angerszhuuuu Date: Sun, 4 Feb 2024 16:39:55 +0800 Subject: [PATCH] [KYUUBI #5594][AUTHZ] BuildQuery should respect normal node's input MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit # :mag: Description ## Issue References ๐Ÿ”— This pull request fixes #5594 ## Describe Your Solution ๐Ÿ”ง For case ``` def filter_func(iterator): for pdf in iterator: yield pdf[pdf.id == 1] df = spark.read.table("test_mapinpandas") execute_result = df.mapInPandas(filter_func, df.schema).show() ``` The logical plan is ``` GlobalLimit 21 +- LocalLimit 21 +- Project [cast(id#5 as string) AS id#11, name#6] +- MapInPandas filter_func(id#0, name#1), [id#5, name#6] +- HiveTableRelation [`default`.`test_mapinpandas`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, Data Cols: [id#0, name#1], Partition Cols: []] ``` When handle `MapInPandas`, we didn't match its input with `HiveTableRelation`, cause we miss input table's columns. This pr fix this In this pr, we remove the branch of each project such as `Project`, `Aggregate` etc, handle it together. ## Types of changes :bookmark: - [x] Bugfix (non-breaking change which fixes an issue) - [ ] New feature (non-breaking change which adds functionality) - [ ] Breaking change (fix or feature that would cause existing functionality to change) ## Test Plan ๐Ÿงช #### Behavior Without This Pull Request :coffin: For case ``` def filter_func(iterator): for pdf in iterator: yield pdf[pdf.id == 1] df = spark.read.table("test_mapinpandas") execute_result = df.mapInPandas(filter_func, df.schema).show() ``` We miss column info of table `test_mapinpandas` #### Behavior With This Pull Request :tada: We got privilege object of table `test_mapinpandas` with it's column info. #### Related Unit Tests --- # Checklists ## ๐Ÿ“ Author Self Checklist - [x] My code follows the [style guidelines](https://kyuubi.readthedocs.io/en/master/contributing/code/style.html) of this project - [x] I have performed a self-review - [x] I have commented my code, particularly in hard-to-understand areas - [x] I have made corresponding changes to the documentation - [x] My changes generate no new warnings - [x] I have added tests that prove my fix is effective or that my feature works - [x] New and existing unit tests pass locally with my changes - [x] This patch was not authored or co-authored using [Generative Tooling](https://www.apache.org/legal/generative-tooling.html) ## ๐Ÿ“ Committer Pre-Merge Checklist - [x] Pull request title is okay. - [x] No license issues. - [x] Milestone correctly set? - [x] Test coverage is ok - [x] Assignees are selected. - [x] Minimum number of approvals - [x] No changes are requested **Be nice. Be informative.** Closes #5787 from AngersZhuuuu/KYUUBI-5594-approach2. Closes #5594 e08545599 [Angerszhuuuu] Update RangerSparkExtensionSuite.scala 49f09fb0a [Angerszhuuuu] Update RangerSparkExtensionSuite.scala 4781f75b9 [Angerszhuuuu] Update PrivilegesBuilderSuite.scala 9e9208d38 [Angerszhuuuu] Update V2JdbcTableCatalogRangerSparkExtensionSuite.scala 626d3dd88 [Angerszhuuuu] Update RangerSparkExtensionSuite.scala 3d69997de [Angerszhuuuu] Update PrivilegesBuilderSuite.scala 6eb4b8e1a [Angerszhuuuu] Update RangerSparkExtensionSuite.scala 61efb8ae3 [Angerszhuuuu] update 794ebb7be [Angerszhuuuu] Merge branch 'master' into KYUUBI-5594-approach2 a236da86b [Angerszhuuuu] Update PrivilegesBuilderSuite.scala 74bd3f4d5 [Angerszhuuuu] Update RangerSparkExtensionSuite.scala 4acbc4276 [Angerszhuuuu] Merge branch 'KYUUBI-5594-approach2' of https://github.com/AngersZhuuuu/incubator-kyuubi into KYUUBI-5594-approach2 266f7e877 [Angerszhuuuu] update a6c784546 [Angerszhuuuu] Update PrivilegesBuilder.scala d785d5fdf [Angerszhuuuu] Merge branch 'master' into KYUUBI-5594-approach2 014ef3b84 [Angerszhuuuu] Update PrivilegesBuilder.scala 7e1cd37a1 [Angerszhuuuu] Merge branch 'master' into KYUUBI-5594-approach2 71d266162 [Angerszhuuuu] update db9594170 [Angerszhuuuu] update 490eb95c2 [Angerszhuuuu] update 70d110e89 [Angerszhuuuu] Merge branch 'master' into KYUUBI-5594-approach2 e6a587718 [Angerszhuuuu] Update PrivilegesBuilder.scala 5ff22b103 [Angerszhuuuu] Update PrivilegesBuilder.scala e6843014b [Angerszhuuuu] Update PrivilegesBuilder.scala 594b202f7 [Angerszhuuuu] Update PrivilegesBuilder.scala 2f87c61e1 [Angerszhuuuu] Update RangerSparkExtensionSuite.scala 1de8c1c68 [Angerszhuuuu] Update PrivilegesBuilder.scala ad17255d7 [Angerszhuuuu] Update PrivilegesBuilderSuite.scala 4f5e8505f [Angerszhuuuu] update 64349ed97 [Angerszhuuuu] Update PrivilegesBuilder.scala 11b7a4c13 [Angerszhuuuu] Update PrivilegesBuilder.scala 9a58fb0c4 [Angerszhuuuu] update d0b022ec9 [Angerszhuuuu] Update RuleApplyPermanentViewMarker.scala e0f28a640 [Angerszhuuuu] Merge branch 'master' into KYUUBI-5594 0ebdd5de5 [Angerszhuuuu] Merge branch 'master' into KYUUBI-5594 8e53236ac [Angerszhuuuu] update 3bafa7ca5 [Angerszhuuuu] update d6e984e07 [Angerszhuuuu] update b00bf5e20 [Angerszhuuuu] Update PrivilegesBuilder.scala 821422852 [Angerszhuuuu] update 93fc6892b [Angerszhuuuu] Merge branch 'master' into KYUUBI-5594 04184e39d [Angerszhuuuu] update 0bb762467 [Angerszhuuuu] Revert "Revert "Update PrivilegesBuilder.scala"" f481283ae [Angerszhuuuu] Revert "Update PrivilegesBuilder.scala" 9f871822f [Angerszhuuuu] Revert "Update PrivilegesBuilder.scala" 29b67c457 [Angerszhuuuu] Update PrivilegesBuilder.scala 8785ad1ab [Angerszhuuuu] Update PrivilegesBuilder.scala 270f21dcc [Angerszhuuuu] Update RangerSparkExtensionSuite.scala 60872efcb [Angerszhuuuu] Update RangerSparkExtensionSuite.scala c34f32ea2 [Angerszhuuuu] Merge branch 'master' into KYUUBI-5594 86fc4756a [Angerszhuuuu] Update PrivilegesBuilder.scala 404f1ea4c [Angerszhuuuu] Update PrivilegesBuilder.scala dcca394e0 [Angerszhuuuu] Update PrivilegesBuilder.scala c2c6fa447 [Angerszhuuuu] Update PrivilegesBuilder.scala 6f6a36e5b [Angerszhuuuu] Merge branch 'master' into KYUUBI-5594]-AUTH]BuildQuery-should-respect-normal-node's-input 4dd47a124 [Angerszhuuuu] update c549b6a1a [Angerszhuuuu] update 80013b981 [Angerszhuuuu] Update PrivilegesBuilder.scala 3cbba422a [Angerszhuuuu] Update PrivilegesBuilder.scala Authored-by: Angerszhuuuu Signed-off-by: Cheng Pan --- .../spark/authz/PrivilegesBuilder.scala | 93 ++++++---- .../spark/authz/PrivilegesBuilderSuite.scala | 44 +++-- .../authz/V2CommandsPrivilegesSuite.scala | 4 +- ...eltaCatalogRangerSparkExtensionSuite.scala | 33 ++-- ...HudiCatalogRangerSparkExtensionSuite.scala | 2 +- ...bergCatalogRangerSparkExtensionSuite.scala | 126 ++++++++------ .../ranger/RangerSparkExtensionSuite.scala | 163 ++++++++++-------- ...ableCatalogRangerSparkExtensionSuite.scala | 155 +++++++++-------- 8 files changed, 337 insertions(+), 283 deletions(-) diff --git a/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/PrivilegesBuilder.scala b/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/PrivilegesBuilder.scala index 2d452ba9d67..01266eb2c85 100644 --- a/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/PrivilegesBuilder.scala +++ b/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/PrivilegesBuilder.scala @@ -20,7 +20,7 @@ package org.apache.kyuubi.plugin.spark.authz import scala.collection.mutable.ArrayBuffer import org.apache.spark.sql.SparkSession -import org.apache.spark.sql.catalyst.expressions.{Expression, NamedExpression} +import org.apache.spark.sql.catalyst.expressions.{AttributeSet, Expression, NamedExpression} import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.execution.command.ExplainCommand import org.slf4j.LoggerFactory @@ -69,44 +69,20 @@ object PrivilegesBuilder { if (projectionList.isEmpty) { privilegeObjects += PrivilegeObject(table, plan.output.map(_.name)) } else { - val cols = (projectionList ++ conditionList).flatMap(collectLeaves) - .filter(plan.outputSet.contains).map(_.name).distinct - privilegeObjects += PrivilegeObject(table, cols) + val cols = columnPrune(projectionList ++ conditionList, plan.outputSet) + privilegeObjects += PrivilegeObject(table, cols.map(_.name).distinct) } } + def columnPrune(projectionList: Seq[Expression], output: AttributeSet): Seq[NamedExpression] = { + (projectionList ++ conditionList) + .flatMap(collectLeaves) + .filter(output.contains) + } + plan match { case p if p.getTagValue(KYUUBI_AUTHZ_TAG).nonEmpty => - case p: Project => buildQuery(p.child, privilegeObjects, p.projectList, conditionList, spark) - - case j: Join => - val cols = - conditionList ++ j.condition.map(expr => collectLeaves(expr)).getOrElse(Nil) - buildQuery(j.left, privilegeObjects, projectionList, cols, spark) - buildQuery(j.right, privilegeObjects, projectionList, cols, spark) - - case f: Filter => - val cols = conditionList ++ collectLeaves(f.condition) - buildQuery(f.child, privilegeObjects, projectionList, cols, spark) - - case w: Window => - val orderCols = w.orderSpec.flatMap(orderSpec => collectLeaves(orderSpec)) - val partitionCols = w.partitionSpec.flatMap(partitionSpec => collectLeaves(partitionSpec)) - val cols = conditionList ++ orderCols ++ partitionCols - buildQuery(w.child, privilegeObjects, projectionList, cols, spark) - - case s: Sort => - val sortCols = s.order.flatMap(sortOrder => collectLeaves(sortOrder)) - val cols = conditionList ++ sortCols - buildQuery(s.child, privilegeObjects, projectionList, cols, spark) - - case a: Aggregate => - val aggCols = - (a.aggregateExpressions ++ a.groupingExpressions).flatMap(e => collectLeaves(e)) - val cols = conditionList ++ aggCols - buildQuery(a.child, privilegeObjects, projectionList, cols, spark) - case scan if isKnownScan(scan) && scan.resolved => val tables = getScanSpec(scan).tables(scan, spark) // If the the scan is table-based, we check privileges on the table we found @@ -125,7 +101,33 @@ object PrivilegesBuilder { case p => for (child <- p.children) { - buildQuery(child, privilegeObjects, projectionList, conditionList, spark) + // If current plan's references don't have relation to it's input, have two cases + // 1. `MapInPandas`, `ScriptTransformation` + // 2. `Project` output only have constant value + if (columnPrune(p.references.toSeq ++ p.output, p.inputSet).isEmpty) { + // If plan is project and output don't have relation to input, can ignore. + if (!p.isInstanceOf[Project]) { + buildQuery( + child, + privilegeObjects, + p.inputSet.map(_.toAttribute).toSeq, + Nil, + spark) + } + } else { + buildQuery( + child, + privilegeObjects, + // Here we use `projectList ++ p.reference` do column prune. + // For `Project`, `Aggregate`, plan's output is contained by plan's referenced + // For `Filter`, `Sort` etc... it rely on upper `Project` node, + // since we wrap a `Project` before call `buildQuery()`. + // So here we use upper node's projectionList and current's references + // to do column pruning can get the correct column. + columnPrune(projectionList ++ p.references.toSeq, p.inputSet).distinct, + conditionList ++ p.references, + spark) + } } } } @@ -221,7 +223,26 @@ object PrivilegesBuilder { LOG.debug(ud.error(plan, e)) } } - spec.queries(plan).foreach(buildQuery(_, inputObjs, spark = spark)) + spec.queries(plan).foreach { p => + if (p.resolved) { + buildQuery(Project(p.output, p), inputObjs, spark = spark) + } else { + try { + // For spark 3.1, Some command such as CreateTableASSelect, its query was unresolved, + // Before this pr, we just ignore it, now we support this. + val analyzed = spark.sessionState.analyzer.execute(p) + buildQuery(Project(analyzed.output, analyzed), inputObjs, spark = spark) + } catch { + case e: Exception => + LOG.debug( + s""" + |Failed to analyze unresolved + |$p + |due to ${e.getMessage}""".stripMargin, + e) + } + } + } spec.operationType case classname if FUNCTION_COMMAND_SPECS.contains(classname) => @@ -315,7 +336,7 @@ object PrivilegesBuilder { case cmd: Command => buildCommand(cmd, inputObjs, outputObjs, spark) // Queries case _ => - buildQuery(plan, inputObjs, spark = spark) + buildQuery(Project(plan.output, plan), inputObjs, spark = spark) OperationType.QUERY } (inputObjs, outputObjs, opType) diff --git a/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/PrivilegesBuilderSuite.scala b/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/PrivilegesBuilderSuite.scala index 673a2e43726..d8b672a5680 100644 --- a/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/PrivilegesBuilderSuite.scala +++ b/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/PrivilegesBuilderSuite.scala @@ -59,11 +59,15 @@ abstract class PrivilegesBuilderSuite extends AnyFunSuite protected def checkColumns(plan: LogicalPlan, cols: Seq[String]): Unit = { val (in, out, _) = PrivilegesBuilder.build(plan, spark) assert(out.isEmpty, "Queries shall not check output privileges") - val po = in.head - assert(po.actionType === PrivilegeObjectActionType.OTHER) - assert(po.privilegeObjectType === PrivilegeObjectType.TABLE_OR_VIEW) - assert(po.columns === cols) - checkTableOwner(po) + if (in.nonEmpty) { + val po = in.head + assert(po.actionType === PrivilegeObjectActionType.OTHER) + assert(po.privilegeObjectType === PrivilegeObjectType.TABLE_OR_VIEW) + assert(po.columns === cols) + checkTableOwner(po) + } else { + assert(cols.isEmpty) + } } protected def checkColumns(query: String, cols: Seq[String]): Unit = { @@ -365,7 +369,7 @@ abstract class PrivilegesBuilderSuite extends AnyFunSuite assertEqualsIgnoreCase(reusedPartTableShort)(po0.objectName) if (isSparkV32OrGreater) { // Query in AlterViewAsCommand can not be resolved before SPARK-34698 - assert(po0.columns === Seq("key", "value", "pid")) + assert(po0.columns === Seq("key", "pid", "value")) checkTableOwner(po0) } val accessType0 = ranger.AccessType(po0, operationType, isInput = true) @@ -526,12 +530,8 @@ abstract class PrivilegesBuilderSuite extends AnyFunSuite assert(po0.privilegeObjectType === PrivilegeObjectType.TABLE_OR_VIEW) assertEqualsIgnoreCase(reusedDb)(po0.dbname) assertEqualsIgnoreCase(reusedTableShort)(po0.objectName) - if (isSparkV32OrGreater) { - assert(po0.columns.head === "key") - checkTableOwner(po0) - } else { - assert(po0.columns.isEmpty) - } + assert(po0.columns.head === "key") + checkTableOwner(po0) val accessType0 = ranger.AccessType(po0, operationType, isInput = true) assert(accessType0 === AccessType.SELECT) @@ -549,12 +549,8 @@ abstract class PrivilegesBuilderSuite extends AnyFunSuite assert(po0.privilegeObjectType === PrivilegeObjectType.TABLE_OR_VIEW) assertEqualsIgnoreCase(reusedDb)(po0.dbname) assertEqualsIgnoreCase(reusedTableShort)(po0.objectName) - if (isSparkV32OrGreater) { - assert(po0.columns === Seq("key", "value")) - checkTableOwner(po0) - } else { - assert(po0.columns.isEmpty) - } + assert(po0.columns === Seq("key", "value")) + checkTableOwner(po0) val accessType0 = ranger.AccessType(po0, operationType, isInput = true) assert(accessType0 === AccessType.SELECT) @@ -1050,7 +1046,7 @@ abstract class PrivilegesBuilderSuite extends AnyFunSuite assertEqualsIgnoreCase(reusedDb)(po.dbname) assertStartsWithIgnoreCase(reusedTableShort)(po.objectName) assert( - po.columns === Seq("value", "pid", "key"), + po.columns === Seq("value", "key", "pid"), s"$reusedPartTable both 'key', 'value' and 'pid' should be authenticated") checkTableOwner(po) val accessType = ranger.AccessType(po, operationType, isInput = true) @@ -1107,7 +1103,7 @@ abstract class PrivilegesBuilderSuite extends AnyFunSuite assertEqualsIgnoreCase(reusedDb)(po.dbname) assertStartsWithIgnoreCase(reusedTableShort)(po.objectName) assert( - po.columns === Seq("key", "value"), + po.columns.sorted === Seq("key", "value").sorted, s"$reusedPartTable 'key' is the join key and 'pid' is omitted") checkTableOwner(po) val accessType = ranger.AccessType(po, operationType, isInput = true) @@ -1218,7 +1214,7 @@ abstract class PrivilegesBuilderSuite extends AnyFunSuite assertEqualsIgnoreCase(reusedDb)(po.dbname) assertStartsWithIgnoreCase(reusedTableShort)(po.objectName) assert( - po.columns === Seq("key", "value", "pid"), + po.columns === Seq("key", "pid", "value"), s"$reusedPartTable both 'key', 'value' and 'pid' should be authenticated") checkTableOwner(po) val accessType = ranger.AccessType(po, operationType, isInput = true) @@ -1625,7 +1621,7 @@ class HiveCatalogPrivilegeBuilderSuite extends PrivilegesBuilderSuite { assert(po0.privilegeObjectType === PrivilegeObjectType.TABLE_OR_VIEW) assertEqualsIgnoreCase(reusedDb)(po0.dbname) assert(po0.objectName equalsIgnoreCase reusedPartTable.split("\\.").last) - assert(po0.columns === Seq("key", "value", "pid")) + assert(po0.columns === Seq("key", "pid", "value")) checkTableOwner(po0) val accessType0 = ranger.AccessType(po0, operationType, isInput = true) assert(accessType0 === AccessType.SELECT) @@ -1721,7 +1717,7 @@ class HiveCatalogPrivilegeBuilderSuite extends PrivilegesBuilderSuite { assert(out1.isEmpty) val pi1 = in1.head assert(pi1.columns.size === 3) - assert(pi1.columns === Seq("key", "value", "pid")) + assert(pi1.columns === Seq("key", "pid", "value")) // case2: Some columns are involved, and the group column is not selected. val plan2 = sql(s"SELECT COUNT(key) FROM $reusedPartTable GROUP BY pid") @@ -1741,7 +1737,7 @@ class HiveCatalogPrivilegeBuilderSuite extends PrivilegesBuilderSuite { assert(out3.isEmpty) val pi3 = in3.head assert(pi3.columns.size === 2) - assert(pi3.columns === Seq("key", "pid")) + assert(pi3.columns === Seq("pid", "key")) // case4: HAVING & GROUP clause val plan4 = sql(s"SELECT COUNT(key) FROM $reusedPartTable GROUP BY pid HAVING MAX(key) > 1000") diff --git a/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/V2CommandsPrivilegesSuite.scala b/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/V2CommandsPrivilegesSuite.scala index 62b7939b3cb..1b6e07b77dd 100644 --- a/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/V2CommandsPrivilegesSuite.scala +++ b/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/V2CommandsPrivilegesSuite.scala @@ -127,7 +127,7 @@ abstract class V2CommandsPrivilegesSuite extends PrivilegesBuilderSuite { assert(po0.catalog.isEmpty) assertEqualsIgnoreCase(reusedDb)(po0.dbname) assertEqualsIgnoreCase(reusedTableShort)(po0.objectName) - assert(po0.columns.take(2) === Seq("key", "value")) + assert(po0.columns === Seq("a", "key", "value")) checkTableOwner(po0) assert(outputs.size === 1) @@ -186,7 +186,7 @@ abstract class V2CommandsPrivilegesSuite extends PrivilegesBuilderSuite { assert(po0.catalog.isEmpty) assertEqualsIgnoreCase(reusedDb)(po0.dbname) assertEqualsIgnoreCase(reusedTableShort)(po0.objectName) - assert(po0.columns.take(2) === Seq("key", "value")) + assert(po0.columns === Seq("a", "key", "value")) checkTableOwner(po0) assert(outputs.size === 1) diff --git a/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/ranger/DeltaCatalogRangerSparkExtensionSuite.scala b/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/ranger/DeltaCatalogRangerSparkExtensionSuite.scala index 1ce8ad6765f..dbf88d7d028 100644 --- a/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/ranger/DeltaCatalogRangerSparkExtensionSuite.scala +++ b/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/ranger/DeltaCatalogRangerSparkExtensionSuite.scala @@ -215,8 +215,9 @@ class DeltaCatalogRangerSparkExtensionSuite extends RangerSparkExtensionSuite { s" SELECT * FROM $namespace1.$table2" interceptEndsWith[AccessControlException]( doAs(someone, sql(insertIntoSql)))( - s"does not have [select] privilege on [$namespace1/$table2/id,$namespace1/$table2/name," + - s"$namespace1/$table2/gender,$namespace1/$table2/birthDate]," + + s"does not have [select] privilege on " + + s"[$namespace1/$table2/birthDate,$namespace1/$table2/gender," + + s"$namespace1/$table2/id,$namespace1/$table2/name]," + s" [update] privilege on [$namespace1/$table1]") doAs(admin, sql(insertIntoSql)) @@ -225,8 +226,9 @@ class DeltaCatalogRangerSparkExtensionSuite extends RangerSparkExtensionSuite { s" SELECT * FROM $namespace1.$table2" interceptEndsWith[AccessControlException]( doAs(someone, sql(insertOverwriteSql)))( - s"does not have [select] privilege on [$namespace1/$table2/id,$namespace1/$table2/name," + - s"$namespace1/$table2/gender,$namespace1/$table2/birthDate]," + + s"does not have [select] privilege on " + + s"[$namespace1/$table2/birthDate,$namespace1/$table2/gender," + + s"$namespace1/$table2/id,$namespace1/$table2/name]," + s" [update] privilege on [$namespace1/$table1]") doAs(admin, sql(insertOverwriteSql)) } @@ -283,8 +285,9 @@ class DeltaCatalogRangerSparkExtensionSuite extends RangerSparkExtensionSuite { |""".stripMargin interceptEndsWith[AccessControlException]( doAs(someone, sql(mergeIntoSql)))( - s"does not have [select] privilege on [$namespace1/$table2/id,$namespace1/$table2/name," + - s"$namespace1/$table2/gender,$namespace1/$table2/birthDate]," + + s"does not have [select] privilege on " + + s"[$namespace1/$table2/birthDate,$namespace1/$table2/gender," + + s"$namespace1/$table2/id,$namespace1/$table2/name]," + s" [update] privilege on [$namespace1/$table1]") doAs(admin, sql(mergeIntoSql)) } @@ -378,9 +381,9 @@ class DeltaCatalogRangerSparkExtensionSuite extends RangerSparkExtensionSuite { val insertIntoSql = s"INSERT INTO delta.`$path` SELECT * FROM $namespace1.$table2" interceptEndsWith[AccessControlException]( doAs(someone, sql(insertIntoSql)))( - s"does not have [select] privilege on [$namespace1/$table2/id," + - s"$namespace1/$table2/name,$namespace1/$table2/gender," + - s"$namespace1/$table2/birthDate], [write] privilege on [[$path, $path/]]") + s"does not have [select] privilege on [$namespace1/$table2/birthDate," + + s"$namespace1/$table2/gender,$namespace1/$table2/id," + + s"$namespace1/$table2/name], [write] privilege on [[$path, $path/]]") doAs(admin, sql(insertIntoSql)) // insert overwrite @@ -388,9 +391,9 @@ class DeltaCatalogRangerSparkExtensionSuite extends RangerSparkExtensionSuite { s"INSERT OVERWRITE delta.`$path` SELECT * FROM $namespace1.$table2" interceptEndsWith[AccessControlException]( doAs(someone, sql(insertOverwriteSql)))( - s"does not have [select] privilege on [$namespace1/$table2/id," + - s"$namespace1/$table2/name,$namespace1/$table2/gender," + - s"$namespace1/$table2/birthDate], [write] privilege on [[$path, $path/]]") + s"does not have [select] privilege on [$namespace1/$table2/birthDate," + + s"$namespace1/$table2/gender,$namespace1/$table2/id," + + s"$namespace1/$table2/name], [write] privilege on [[$path, $path/]]") doAs(admin, sql(insertOverwriteSql)) }) } @@ -433,9 +436,9 @@ class DeltaCatalogRangerSparkExtensionSuite extends RangerSparkExtensionSuite { |""".stripMargin interceptEndsWith[AccessControlException]( doAs(someone, sql(mergeIntoSql)))( - s"does not have [select] privilege on [$namespace1/$table2/id," + - s"$namespace1/$table2/name,$namespace1/$table2/gender," + - s"$namespace1/$table2/birthDate], [write] privilege on [[$path, $path/]]") + s"does not have [select] privilege on [$namespace1/$table2/birthDate," + + s"$namespace1/$table2/gender,$namespace1/$table2/id," + + s"$namespace1/$table2/name], [write] privilege on [[$path, $path/]]") doAs(admin, sql(mergeIntoSql)) }) } diff --git a/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/ranger/HudiCatalogRangerSparkExtensionSuite.scala b/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/ranger/HudiCatalogRangerSparkExtensionSuite.scala index a7c0c7662a2..72d4130ef95 100644 --- a/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/ranger/HudiCatalogRangerSparkExtensionSuite.scala +++ b/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/ranger/HudiCatalogRangerSparkExtensionSuite.scala @@ -507,7 +507,7 @@ class HudiCatalogRangerSparkExtensionSuite extends RangerSparkExtensionSuite { interceptEndsWith[AccessControlException] { doAs(someone, sql(mergeIntoSQL)) }(s"does not have [select] privilege on " + - s"[$namespace1/$table2/id,$namespace1/$table2/name,$namespace1/$table2/city], " + + s"[$namespace1/$table2/city,$namespace1/$table2/id,$namespace1/$table2/name], " + s"[update] privilege on [$namespace1/$table1]") doAs(admin, sql(mergeIntoSQL)) } diff --git a/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/ranger/IcebergCatalogRangerSparkExtensionSuite.scala b/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/ranger/IcebergCatalogRangerSparkExtensionSuite.scala index 677b3945dda..cf798cdfae8 100644 --- a/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/ranger/IcebergCatalogRangerSparkExtensionSuite.scala +++ b/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/ranger/IcebergCatalogRangerSparkExtensionSuite.scala @@ -93,73 +93,87 @@ class IcebergCatalogRangerSparkExtensionSuite extends RangerSparkExtensionSuite } test("[KYUUBI #3515] MERGE INTO") { - val mergeIntoSql = - s""" - |MERGE INTO $catalogV2.$bobNamespace.$bobSelectTable AS target - |USING $catalogV2.$namespace1.$table1 AS source - |ON target.id = source.id - |WHEN MATCHED AND (target.name='delete') THEN DELETE - |WHEN MATCHED AND (target.name='update') THEN UPDATE SET target.city = source.city + withSingleCallEnabled { + val mergeIntoSql = + s""" + |MERGE INTO $catalogV2.$bobNamespace.$bobSelectTable AS target + |USING $catalogV2.$namespace1.$table1 AS source + |ON target.id = source.id + |WHEN MATCHED AND (target.name='delete') THEN DELETE + |WHEN MATCHED AND (target.name='update') THEN UPDATE SET target.city = source.city """.stripMargin - // MergeIntoTable: Using a MERGE INTO Statement - val e1 = intercept[AccessControlException]( - doAs( - someone, - sql(mergeIntoSql))) - assert(e1.getMessage.contains(s"does not have [select] privilege" + - s" on [$namespace1/$table1/id]")) - - withSingleCallEnabled { - interceptEndsWith[AccessControlException](doAs(someone, sql(mergeIntoSql)))( - if (isSparkV35OrGreater) { - s"does not have [select] privilege on [$namespace1/table1/id" + - s",$namespace1/$table1/name,$namespace1/$table1/city]" - } else { - "does not have " + - s"[select] privilege on [$namespace1/$table1/id,$namespace1/$table1/name,$namespace1/$table1/city]," + - s" [update] privilege on [$bobNamespace/$bobSelectTable]" - }) + // MergeIntoTable: Using a MERGE INTO Statement + val e1 = intercept[AccessControlException]( + doAs( + someone, + sql(mergeIntoSql))) + assert(e1.getMessage.contains(s"does not have [select] privilege" + + s" on [$namespace1/$table1/city,$namespace1/$table1/id,$namespace1/$table1/name]")) + + withSingleCallEnabled { + interceptEndsWith[AccessControlException](doAs(someone, sql(mergeIntoSql)))( + if (isSparkV35OrGreater) { + s"does not have [select] privilege on [$namespace1/table1/city" + + s",$namespace1/$table1/id,$namespace1/$table1/name]" + } else { + "does not have " + + s"[select] privilege on [$namespace1/$table1/city,$namespace1/$table1/id,$namespace1/$table1/name]," + + s" [update] privilege on [$bobNamespace/$bobSelectTable]" + }) + + interceptEndsWith[AccessControlException] { + doAs(bob, sql(mergeIntoSql)) + }(s"does not have [update] privilege on [$bobNamespace/$bobSelectTable]") + } - interceptEndsWith[AccessControlException] { - doAs(bob, sql(mergeIntoSql)) - }(s"does not have [update] privilege on [$bobNamespace/$bobSelectTable]") + doAs(admin, sql(mergeIntoSql)) } - - doAs(admin, sql(mergeIntoSql)) } test("[KYUUBI #3515] UPDATE TABLE") { - // UpdateTable - interceptEndsWith[AccessControlException] { - doAs(someone, sql(s"UPDATE $catalogV2.$namespace1.$table1 SET city='Guangzhou' WHERE id=1")) - }(if (isSparkV35OrGreater) { - s"does not have [select] privilege on [$namespace1/$table1/id]" - } else { - s"does not have [update] privilege on [$namespace1/$table1]" - }) + withSingleCallEnabled { + // UpdateTable + interceptEndsWith[AccessControlException] { + doAs( + someone, + sql(s"UPDATE $catalogV2.$namespace1.$table1 SET city='Guangzhou' WHERE id=1")) + }(if (isSparkV35OrGreater) { + s"does not have [select] privilege on " + + s"[$namespace1/$table1/_file,$namespace1/$table1/_pos," + + s"$namespace1/$table1/id,$namespace1/$table1/name,$namespace1/$table1/city], " + + s"[update] privilege on [$namespace1/$table1]" + } else { + s"does not have [update] privilege on [$namespace1/$table1]" + }) - doAs( - admin, - sql(s"UPDATE $catalogV2.$namespace1.$table1 SET city='Guangzhou' " + - " WHERE id=1")) + doAs( + admin, + sql(s"UPDATE $catalogV2.$namespace1.$table1 SET city='Guangzhou' " + + " WHERE id=1")) + } } test("[KYUUBI #3515] DELETE FROM TABLE") { - // DeleteFromTable - interceptEndsWith[AccessControlException] { - doAs(someone, sql(s"DELETE FROM $catalogV2.$namespace1.$table1 WHERE id=2")) - }(if (isSparkV34OrGreater) { - s"does not have [select] privilege on [$namespace1/$table1/id]" - } else { - s"does not have [update] privilege on [$namespace1/$table1]" - }) - - interceptEndsWith[AccessControlException] { - doAs(bob, sql(s"DELETE FROM $catalogV2.$bobNamespace.$bobSelectTable WHERE id=2")) - }(s"does not have [update] privilege on [$bobNamespace/$bobSelectTable]") - - doAs(admin, sql(s"DELETE FROM $catalogV2.$namespace1.$table1 WHERE id=2")) + withSingleCallEnabled { + // DeleteFromTable + interceptEndsWith[AccessControlException] { + doAs(someone, sql(s"DELETE FROM $catalogV2.$namespace1.$table1 WHERE id=2")) + }(if (isSparkV34OrGreater) { + s"does not have [select] privilege on " + + s"[$namespace1/$table1/_file,$namespace1/$table1/_pos," + + s"$namespace1/$table1/city,$namespace1/$table1/id,$namespace1/$table1/name], " + + s"[update] privilege on [$namespace1/$table1]" + } else { + s"does not have [update] privilege on [$namespace1/$table1]" + }) + + interceptEndsWith[AccessControlException] { + doAs(bob, sql(s"DELETE FROM $catalogV2.$bobNamespace.$bobSelectTable WHERE id=2")) + }(s"does not have [update] privilege on [$bobNamespace/$bobSelectTable]") + + doAs(admin, sql(s"DELETE FROM $catalogV2.$namespace1.$table1 WHERE id=2")) + } } test("[KYUUBI #3666] Support {OWNER} variable for queries run on CatalogV2") { diff --git a/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/ranger/RangerSparkExtensionSuite.scala b/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/ranger/RangerSparkExtensionSuite.scala index 43333ea7763..6feb63eb95c 100644 --- a/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/ranger/RangerSparkExtensionSuite.scala +++ b/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/ranger/RangerSparkExtensionSuite.scala @@ -23,12 +23,15 @@ import java.nio.file.Path import scala.util.Try import org.apache.hadoop.security.UserGroupInformation -import org.apache.spark.sql.{Row, SparkSessionExtensions} +import org.apache.spark.sql.{DataFrame, Row, SparkSessionExtensions} import org.apache.spark.sql.catalyst.analysis.NoSuchTableException import org.apache.spark.sql.catalyst.catalog.HiveTableRelation +import org.apache.spark.sql.catalyst.expressions.PythonUDF import org.apache.spark.sql.catalyst.plans.logical.Statistics import org.apache.spark.sql.execution.columnar.InMemoryRelation import org.apache.spark.sql.execution.datasources.LogicalRelation +import org.apache.spark.sql.functions.col +import org.apache.spark.sql.types.{IntegerType, StructField, StructType} import org.scalatest.BeforeAndAfterAll // scalastyle:off import org.scalatest.funsuite.AnyFunSuite @@ -555,11 +558,7 @@ class HiveCatalogRangerSparkExtensionSuite extends RangerSparkExtensionSuite { val e2 = intercept[AccessControlException]( doAs(someone, sql(s"CREATE VIEW $permView AS SELECT * FROM $table"))) - if (isSparkV32OrGreater) { - assert(e2.getMessage.contains(s"does not have [select] privilege on [default/$table/id]")) - } else { - assert(e2.getMessage.contains(s"does not have [select] privilege on [$table]")) - } + assert(e2.getMessage.contains(s"does not have [select] privilege on [default/$table/id]")) } } @@ -638,14 +637,12 @@ class HiveCatalogRangerSparkExtensionSuite extends RangerSparkExtensionSuite { s" FROM $db1.$srcTable1 as tb1" + s" JOIN $db1.$srcTable2 as tb2" + s" on tb1.id = tb2.id" - val e1 = intercept[AccessControlException](doAs(someone, sql(insertSql1))) - assert(e1.getMessage.contains(s"does not have [select] privilege on [$db1/$srcTable1/id]")) withSingleCallEnabled { - val e2 = intercept[AccessControlException](doAs(someone, sql(insertSql1))) - assert(e2.getMessage.contains(s"does not have" + + val e = intercept[AccessControlException](doAs(someone, sql(insertSql1))) + assert(e.getMessage.contains(s"does not have" + s" [select] privilege on" + - s" [$db1/$srcTable1/id,$db1/$srcTable1/name,$db1/$srcTable1/city," + + s" [$db1/$srcTable1/city,$db1/$srcTable1/id,$db1/$srcTable1/name," + s"$db1/$srcTable2/age,$db1/$srcTable2/id]," + s" [update] privilege on [$db1/$sinkTable1/id,$db1/$sinkTable1/age," + s"$db1/$sinkTable1/name,$db1/$sinkTable1/city]")) @@ -675,11 +672,13 @@ class HiveCatalogRangerSparkExtensionSuite extends RangerSparkExtensionSuite { sql(s"CREATE TABLE IF NOT EXISTS $db1.$srcTable1" + s" (id int, name string, city string)")) - val e1 = intercept[AccessControlException]( - doAs(someone, sql(s"CACHE TABLE $cacheTable2 select * from $db1.$srcTable1"))) - assert( - e1.getMessage.contains(s"does not have [select] privilege on [$db1/$srcTable1/id]")) - + withSingleCallEnabled { + val e1 = intercept[AccessControlException]( + doAs(someone, sql(s"CACHE TABLE $cacheTable2 select * from $db1.$srcTable1"))) + assert( + e1.getMessage.contains(s"does not have [select] privilege on " + + s"[$db1/$srcTable1/city,$db1/$srcTable1/id,$db1/$srcTable1/name]")) + } doAs(admin, sql(s"CACHE TABLE $cacheTable3 SELECT 1 AS a, 2 AS b ")) doAs(someone, sql(s"CACHE TABLE $cacheTable4 select 1 as a, 2 as b ")) } @@ -888,9 +887,15 @@ class HiveCatalogRangerSparkExtensionSuite extends RangerSparkExtensionSuite { doAs( someone, sql(s"SELECT id as new_id, name, max_scope FROM $db1.$view1".stripMargin).show())) - assert(e2.getMessage.contains( - s"does not have [select] privilege on " + - s"[$db1/$view1/id,$db1/$view1/name,$db1/$view1/max_scope]")) + if (isSparkV35OrGreater) { + assert(e2.getMessage.contains( + s"does not have [select] privilege on " + + s"[$db1/$view1/id,$db1/$view1/max_scope,$db1/$view1/name]")) + } else { + assert(e2.getMessage.contains( + s"does not have [select] privilege on " + + s"[$db1/$view1/name,$db1/$view1/id,$db1/$view1/max_scope]")) + } } } } @@ -927,17 +932,11 @@ class HiveCatalogRangerSparkExtensionSuite extends RangerSparkExtensionSuite { |AS |SELECT count(*) as cnt, sum(id) as sum_id FROM $db1.$table1 """.stripMargin)) - interceptEndsWith[AccessControlException]( - doAs(someone, sql(s"SELECT count(*) FROM $db1.$table1").show()))( - s"does not have [select] privilege on [$db1/$table1/id,$db1/$table1/scope]") + checkAnswer(someone, s"SELECT count(*) FROM $db1.$table1", Row(0) :: Nil) - interceptEndsWith[AccessControlException]( - doAs(someone, sql(s"SELECT count(*) FROM $db1.$view1").show()))( - s"does not have [select] privilege on [$db1/$view1/id,$db1/$view1/scope]") + checkAnswer(someone, s"SELECT count(*) FROM $db1.$view1", Row(0) :: Nil) - interceptEndsWith[AccessControlException]( - doAs(someone, sql(s"SELECT count(*) FROM $db1.$view2").show()))( - s"does not have [select] privilege on [$db1/$view2/cnt,$db1/$view2/sum_id]") + checkAnswer(someone, s"SELECT count(*) FROM $db1.$view2", Row(1) :: Nil) interceptEndsWith[AccessControlException]( doAs(someone, sql(s"SELECT count(id) FROM $db1.$table1 WHERE id > 10").show()))( @@ -1321,7 +1320,7 @@ class HiveCatalogRangerSparkExtensionSuite extends RangerSparkExtensionSuite { doAs( someone, sql(s"SELECT typeof(id), typeof(typeof(day)) FROM $db1.$table1").collect()))( - s"does not have [select] privilege on [$db1/$table1/id,$db1/$table1/day]") + s"does not have [select] privilege on [$db1/$table1/day,$db1/$table1/id]") interceptEndsWith[AccessControlException]( doAs( someone, @@ -1331,7 +1330,7 @@ class HiveCatalogRangerSparkExtensionSuite extends RangerSparkExtensionSuite { |typeof(cast(id as string)), |typeof(substring(day, 1, 3)) |FROM $db1.$table1""".stripMargin).collect()))( - s"does not have [select] privilege on [$db1/$table1/id,$db1/$table1/day]") + s"does not have [select] privilege on [$db1/$table1/day,$db1/$table1/id]") checkAnswer( admin, s""" @@ -1414,60 +1413,76 @@ class HiveCatalogRangerSparkExtensionSuite extends RangerSparkExtensionSuite { } } - test("[KYUUBI #5884] PVM should inherit MultiInstance and wrap with new exprId") { + test("[KYUUBI #5594][AUTHZ] BuildQuery should respect normal node's input ") { + assume(!isSparkV35OrGreater, "mapInPandas not supported after spark 3.5") val db1 = defaultDb val table1 = "table1" - val perm_view = "perm_view" val view1 = "view1" - val view2 = "view2" - val view3 = "view3" withSingleCallEnabled { - withCleanTmpResources(Seq.empty) { - sql("set spark.sql.legacy.storeAnalyzedPlanForView=true") - doAs(admin, sql(s"CREATE TABLE IF NOT EXISTS $db1.$table1(id int, scope int)")) - doAs(admin, sql(s"CREATE VIEW $db1.$perm_view AS SELECT * FROM $db1.$table1")) - - doAs( - admin, - sql( - s""" - |CREATE OR REPLACE TEMPORARY VIEW $view1 AS - |SELECT * - |FROM $db1.$perm_view - |WHERE id > 10 - |""".stripMargin)) + withCleanTmpResources(Seq((s"$db1.$table1", "table"), (s"$db1.$view1", "view"))) { + doAs(admin, sql(s"CREATE TABLE IF NOT EXISTS $db1.$table1 (id int, scope int)")) + doAs(admin, sql(s"CREATE VIEW $db1.$view1 AS SELECT * FROM $db1.$table1")) - doAs( - admin, - sql( - s""" - |CREATE OR REPLACE TEMPORARY VIEW $view2 AS - |SELECT * - |FROM $view1 - |WHERE scope < 10 - |""".stripMargin)) + val table = spark.read.table(s"$db1.$table1") + val mapTableInPandasUDF = PythonUDF( + "mapInPandasUDF", + null, + StructType(Seq(StructField("id", IntegerType), StructField("scope", IntegerType))), + table.queryExecution.analyzed.output, + 205, + true) + interceptContains[AccessControlException]( + doAs( + someone, + invokeAs( + table, + "mapInPandas", + (classOf[PythonUDF], mapTableInPandasUDF)) + .asInstanceOf[DataFrame].select(col("id"), col("scope")).limit(1).show(true)))( + s"does not have [select] privilege on [$db1/$table1/id,$db1/$table1/scope]") - doAs( - admin, - sql( - s""" - |CREATE OR REPLACE TEMPORARY VIEW $view3 AS - |SELECT * - |FROM $view1 - |WHERE scope is not null - |""".stripMargin)) + val view = spark.read.table(s"$db1.$view1") + val mapViewInPandasUDF = PythonUDF( + "mapInPandasUDF", + null, + StructType(Seq(StructField("id", IntegerType), StructField("scope", IntegerType))), + view.queryExecution.analyzed.output, + 205, + true) + interceptContains[AccessControlException]( + doAs( + someone, + invokeAs( + view, + "mapInPandas", + (classOf[PythonUDF], mapViewInPandasUDF)) + .asInstanceOf[DataFrame].select(col("id"), col("scope")).limit(1).show(true)))( + s"does not have [select] privilege on [$db1/$view1/id,$db1/$view1/scope]") + } + } + } + test("[KYUUBI #5594][AUTHZ] BuildQuery should respect sort agg input") { + val db1 = defaultDb + val table1 = "table1" + val view1 = "view1" + withSingleCallEnabled { + withCleanTmpResources(Seq((s"$db1.$table1", "table"), (s"$db1.$view1", "view"))) { + doAs(admin, sql(s"CREATE TABLE IF NOT EXISTS $db1.$table1 (id int, scope int)")) + doAs(admin, sql(s"CREATE VIEW $db1.$view1 AS SELECT * FROM $db1.$table1")) + checkAnswer( + someone, + s"SELECT count(*) FROM $db1.$table1 WHERE id > 1", + Row(0) :: Nil) + checkAnswer( + someone, + s"SELECT count(*) FROM $db1.$view1 WHERE id > 1", + Row(0) :: Nil) interceptContains[AccessControlException]( doAs( someone, - sql( - s""" - |SELECT a.*, b.scope as new_scope - |FROM $view2 a - |JOIN $view3 b - |ON a.id == b.id - |""".stripMargin).collect()))(s"does not have [select] privilege on " + - s"[$db1/$perm_view/id,$db1/$perm_view/scope,$db1/$perm_view/scope,$db1/$perm_view/id]") + sql(s"SELECT count(id) FROM $db1.$view1 WHERE id > 1").collect()))( + s"does not have [select] privilege on [$db1/$view1/id]") } } } diff --git a/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/ranger/V2JdbcTableCatalogRangerSparkExtensionSuite.scala b/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/ranger/V2JdbcTableCatalogRangerSparkExtensionSuite.scala index 046052d558d..3a22f45d5b4 100644 --- a/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/ranger/V2JdbcTableCatalogRangerSparkExtensionSuite.scala +++ b/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/ranger/V2JdbcTableCatalogRangerSparkExtensionSuite.scala @@ -107,20 +107,23 @@ class V2JdbcTableCatalogRangerSparkExtensionSuite extends RangerSparkExtensionSu } test("[KYUUBI #3424] CREATE TABLE") { - // CreateTable - val e2 = intercept[AccessControlException]( - doAs(someone, sql(s"CREATE TABLE IF NOT EXISTS $catalogV2.$namespace1.$table2"))) - assert(e2.getMessage.contains(s"does not have [create] privilege" + - s" on [$namespace1/$table2]")) + withSingleCallEnabled { + // CreateTable + val e2 = intercept[AccessControlException]( + doAs(someone, sql(s"CREATE TABLE IF NOT EXISTS $catalogV2.$namespace1.$table2"))) + assert(e2.getMessage.contains(s"does not have [create] privilege" + + s" on [$namespace1/$table2]")) - // CreateTableAsSelect - val e21 = intercept[AccessControlException]( - doAs( - someone, - sql(s"CREATE TABLE IF NOT EXISTS $catalogV2.$namespace1.$table2" + - s" AS select * from $catalogV2.$namespace1.$table1"))) - assert(e21.getMessage.contains(s"does not have [select] privilege" + - s" on [$namespace1/$table1/id]")) + // CreateTableAsSelect + + val e21 = intercept[AccessControlException]( + doAs( + someone, + sql(s"CREATE TABLE IF NOT EXISTS $catalogV2.$namespace1.$table2" + + s" AS select * from $catalogV2.$namespace1.$table1"))) + assert(e21.getMessage.contains(s"does not have [select] privilege" + + s" on [$namespace1/$table1/city,$namespace1/$table1/id,$namespace1/$table1/name]")) + } } test("[KYUUBI #3424] DROP TABLE") { @@ -133,69 +136,74 @@ class V2JdbcTableCatalogRangerSparkExtensionSuite extends RangerSparkExtensionSu test("[KYUUBI #3424] INSERT TABLE") { // AppendData: Insert Using a VALUES Clause - val e4 = intercept[AccessControlException]( - doAs( - someone, - sql(s"INSERT INTO $catalogV2.$namespace1.$outputTable1 (id, name, city)" + - s" VALUES (1, 'bowenliang123', 'Guangzhou')"))) - assert(e4.getMessage.contains(s"does not have [update] privilege" + - s" on [$namespace1/$outputTable1]")) + withSingleCallEnabled { - // AppendData: Insert Using a TABLE Statement - val e42 = intercept[AccessControlException]( - doAs( - someone, - sql(s"INSERT INTO $catalogV2.$namespace1.$outputTable1 (id, name, city)" + - s" TABLE $catalogV2.$namespace1.$table1"))) - assert(e42.getMessage.contains(s"does not have [select] privilege" + - s" on [$namespace1/$table1/id]")) + val e4 = intercept[AccessControlException]( + doAs( + someone, + sql(s"INSERT INTO $catalogV2.$namespace1.$outputTable1 (id, name, city)" + + s" VALUES (1, 'bowenliang123', 'Guangzhou')"))) + assert(e4.getMessage.contains(s"does not have [update] privilege" + + s" on [$namespace1/$outputTable1]")) - // AppendData: Insert Using a SELECT Statement - val e43 = intercept[AccessControlException]( - doAs( - someone, - sql(s"INSERT INTO $catalogV2.$namespace1.$outputTable1 (id, name, city)" + - s" SELECT * from $catalogV2.$namespace1.$table1"))) - assert(e43.getMessage.contains(s"does not have [select] privilege" + - s" on [$namespace1/$table1/id]")) + // AppendData: Insert Using a TABLE Statement + val e42 = intercept[AccessControlException]( + doAs( + someone, + sql(s"INSERT INTO $catalogV2.$namespace1.$outputTable1 (id, name, city)" + + s" TABLE $catalogV2.$namespace1.$table1"))) + assert(e42.getMessage.contains(s"does not have [select] privilege" + + s" on [$namespace1/$table1/city,$namespace1/$table1/id,$namespace1/$table1/name]")) - // OverwriteByExpression: Insert Overwrite - val e44 = intercept[AccessControlException]( - doAs( - someone, - sql(s"INSERT OVERWRITE $catalogV2.$namespace1.$outputTable1 (id, name, city)" + - s" VALUES (1, 'bowenliang123', 'Guangzhou')"))) - assert(e44.getMessage.contains(s"does not have [update] privilege" + - s" on [$namespace1/$outputTable1]")) + // AppendData: Insert Using a SELECT Statement + val e43 = intercept[AccessControlException]( + doAs( + someone, + sql(s"INSERT INTO $catalogV2.$namespace1.$outputTable1 (id, name, city)" + + s" SELECT * from $catalogV2.$namespace1.$table1"))) + assert(e43.getMessage.contains(s"does not have [select] privilege" + + s" on [$namespace1/$table1/city,$namespace1/$table1/id,$namespace1/$table1/name]")) + + // OverwriteByExpression: Insert Overwrite + val e44 = intercept[AccessControlException]( + doAs( + someone, + sql(s"INSERT OVERWRITE $catalogV2.$namespace1.$outputTable1 (id, name, city)" + + s" VALUES (1, 'bowenliang123', 'Guangzhou')"))) + assert(e44.getMessage.contains(s"does not have [update] privilege" + + s" on [$namespace1/$outputTable1]")) + } } test("[KYUUBI #3424] MERGE INTO") { - val mergeIntoSql = - s""" - |MERGE INTO $catalogV2.$namespace1.$outputTable1 AS target - |USING $catalogV2.$namespace1.$table1 AS source - |ON target.id = source.id - |WHEN MATCHED AND (target.name='delete') THEN DELETE - |WHEN MATCHED AND (target.name='update') THEN UPDATE SET target.city = source.city + withSingleCallEnabled { + val mergeIntoSql = + s""" + |MERGE INTO $catalogV2.$namespace1.$outputTable1 AS target + |USING $catalogV2.$namespace1.$table1 AS source + |ON target.id = source.id + |WHEN MATCHED AND (target.name='delete') THEN DELETE + |WHEN MATCHED AND (target.name='update') THEN UPDATE SET target.city = source.city """.stripMargin - // MergeIntoTable: Using a MERGE INTO Statement - val e1 = intercept[AccessControlException]( - doAs( - someone, - sql(mergeIntoSql))) - assert(e1.getMessage.contains(s"does not have [select] privilege" + - s" on [$namespace1/$table1/id]")) - - withSingleCallEnabled { - val e2 = intercept[AccessControlException]( + // MergeIntoTable: Using a MERGE INTO Statement + val e1 = intercept[AccessControlException]( doAs( someone, sql(mergeIntoSql))) - assert(e2.getMessage.contains(s"does not have" + - s" [select] privilege" + - s" on [$namespace1/$table1/id,$namespace1/table1/name,$namespace1/$table1/city]," + - s" [update] privilege on [$namespace1/$outputTable1]")) + assert(e1.getMessage.contains(s"does not have [select] privilege" + + s" on [$namespace1/$table1/city,$namespace1/$table1/id,$namespace1/$table1/name]")) + + withSingleCallEnabled { + val e2 = intercept[AccessControlException]( + doAs( + someone, + sql(mergeIntoSql))) + assert(e2.getMessage.contains(s"does not have" + + s" [select] privilege" + + s" on [$namespace1/$table1/city,$namespace1/table1/id,$namespace1/$table1/name]," + + s" [update] privilege on [$namespace1/$outputTable1]")) + } } } @@ -220,17 +228,14 @@ class V2JdbcTableCatalogRangerSparkExtensionSuite extends RangerSparkExtensionSu test("[KYUUBI #3424] CACHE TABLE") { // CacheTable - val e7 = intercept[AccessControlException]( - doAs( - someone, - sql(s"CACHE TABLE $cacheTable1" + - s" AS select * from $catalogV2.$namespace1.$table1"))) - if (isSparkV32OrGreater) { - assert(e7.getMessage.contains(s"does not have [select] privilege" + - s" on [$namespace1/$table1/id]")) - } else { + withSingleCallEnabled { + val e7 = intercept[AccessControlException]( + doAs( + someone, + sql(s"CACHE TABLE $cacheTable1" + + s" AS select * from $catalogV2.$namespace1.$table1"))) assert(e7.getMessage.contains(s"does not have [select] privilege" + - s" on [$catalogV2.$namespace1/$table1]")) + s" on [$namespace1/$table1/city,$namespace1/$table1/id,$namespace1/$table1/name]")) } }