From abaa3698cb49a17a931ee8c4c6ab24e0dddf95e8 Mon Sep 17 00:00:00 2001 From: Angerszhuuuu Date: Sat, 21 Oct 2023 22:46:26 +0800 Subject: [PATCH] [KYUUBI #5447][AUTHZ] Support Hudi DeleteHoodieTableCommand/UpdateHoodieTableCommand/MergeIntoHoodieTableCommand ### _Why are the changes needed?_ To close #5447. Kyuubi authz Support hudi DeleteHoodieTableCommand/UpdateHoodieTableCommand/MergeIntoHoodieTableCommand - DeleteHoodieTableCommand: https://github.com/apache/hudi/blob/master/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/command/DeleteHoodieTableCommand.scala - UpdateHoodieTableCommand: https://github.com/apache/hudi/blob/master/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/command/UpdateHoodieTableCommand.scala - MergeIntoHoodieTableCommand: https://github.com/apache/hudi/blob/master/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/command/MergeIntoHoodieTableCommand.scala ### _How was this patch tested?_ - [x] Add some test cases that check the changes thoroughly including negative and positive cases if possible - [ ] Add screenshots for manual tests if appropriate - [ ] [Run test](https://kyuubi.readthedocs.io/en/master/contributing/code/testing.html#running-tests) locally before make a pull request ### _Was this patch authored or co-authored using generative AI tooling?_ No Closes #5482 from AngersZhuuuu/KYUUBI-5447. Closes #5447 2598af203 [Angerszhuuuu] Update HudiCatalogRangerSparkExtensionSuite.scala 08be589b7 [Angerszhuuuu] Update org.apache.kyuubi.plugin.spark.authz.serde.QueryExtractor 19497d12c [Angerszhuuuu] Update tableExtractors.scala df6e244e2 [Angerszhuuuu] update 1a72f1323 [Angerszhuuuu] update f7ca6846c [Angerszhuuuu] Merge branch 'master' into KYUUBI-5447 37006869b [Angerszhuuuu] [KYUUBI #5447][AUTHZ] Support hudi DeleteHoodieTableCommand/UpdateHoodieTableCommand/MergeIntoHoodieTableCommand Authored-by: Angerszhuuuu Signed-off-by: liangbowen --- ...bi.plugin.spark.authz.serde.QueryExtractor | 1 + ...bi.plugin.spark.authz.serde.TableExtractor | 2 + .../main/resources/table_command_spec.json | 63 +++++++++++++++++ .../spark/authz/serde/queryExtractors.scala | 8 +++ .../spark/authz/serde/tableExtractors.scala | 32 ++++++++- .../plugin/spark/authz/gen/HudiCommands.scala | 40 ++++++++++- ...HudiCatalogRangerSparkExtensionSuite.scala | 67 ++++++++++++++++++- 7 files changed, 209 insertions(+), 4 deletions(-) diff --git a/extensions/spark/kyuubi-spark-authz/src/main/resources/META-INF/services/org.apache.kyuubi.plugin.spark.authz.serde.QueryExtractor b/extensions/spark/kyuubi-spark-authz/src/main/resources/META-INF/services/org.apache.kyuubi.plugin.spark.authz.serde.QueryExtractor index c659114f944..2406a40e196 100644 --- a/extensions/spark/kyuubi-spark-authz/src/main/resources/META-INF/services/org.apache.kyuubi.plugin.spark.authz.serde.QueryExtractor +++ b/extensions/spark/kyuubi-spark-authz/src/main/resources/META-INF/services/org.apache.kyuubi.plugin.spark.authz.serde.QueryExtractor @@ -15,5 +15,6 @@ # limitations under the License. # +org.apache.kyuubi.plugin.spark.authz.serde.HudiMergeIntoSourceTableExtractor org.apache.kyuubi.plugin.spark.authz.serde.LogicalPlanOptionQueryExtractor org.apache.kyuubi.plugin.spark.authz.serde.LogicalPlanQueryExtractor diff --git a/extensions/spark/kyuubi-spark-authz/src/main/resources/META-INF/services/org.apache.kyuubi.plugin.spark.authz.serde.TableExtractor b/extensions/spark/kyuubi-spark-authz/src/main/resources/META-INF/services/org.apache.kyuubi.plugin.spark.authz.serde.TableExtractor index 78f836c65cd..33c8b8759fc 100644 --- a/extensions/spark/kyuubi-spark-authz/src/main/resources/META-INF/services/org.apache.kyuubi.plugin.spark.authz.serde.TableExtractor +++ b/extensions/spark/kyuubi-spark-authz/src/main/resources/META-INF/services/org.apache.kyuubi.plugin.spark.authz.serde.TableExtractor @@ -19,6 +19,8 @@ org.apache.kyuubi.plugin.spark.authz.serde.CatalogTableOptionTableExtractor org.apache.kyuubi.plugin.spark.authz.serde.CatalogTableTableExtractor org.apache.kyuubi.plugin.spark.authz.serde.DataSourceV2RelationTableExtractor org.apache.kyuubi.plugin.spark.authz.serde.ExpressionSeqTableExtractor +org.apache.kyuubi.plugin.spark.authz.serde.HudiDataSourceV2RelationTableExtractor +org.apache.kyuubi.plugin.spark.authz.serde.HudiMergeIntoTargetTableExtractor org.apache.kyuubi.plugin.spark.authz.serde.IdentifierTableExtractor org.apache.kyuubi.plugin.spark.authz.serde.LogicalRelationTableExtractor org.apache.kyuubi.plugin.spark.authz.serde.ResolvedDbObjectNameTableExtractor diff --git a/extensions/spark/kyuubi-spark-authz/src/main/resources/table_command_spec.json b/extensions/spark/kyuubi-spark-authz/src/main/resources/table_command_spec.json index c739fe295de..1d2b5dc88a2 100644 --- a/extensions/spark/kyuubi-spark-authz/src/main/resources/table_command_spec.json +++ b/extensions/spark/kyuubi-spark-authz/src/main/resources/table_command_spec.json @@ -1604,6 +1604,27 @@ } ], "opType" : "CREATETABLE", "queryDescs" : [ ] +}, { + "classname" : "org.apache.spark.sql.hudi.command.DeleteHoodieTableCommand", + "tableDescs" : [ { + "fieldName" : "dft", + "fieldExtractor" : "HudiDataSourceV2RelationTableExtractor", + "columnDesc" : null, + "actionTypeDesc" : { + "fieldName" : null, + "fieldExtractor" : null, + "actionType" : "UPDATE" + }, + "tableTypeDesc" : null, + "catalogDesc" : null, + "isInput" : false, + "setCurrentDatabaseIfMissing" : false + } ], + "opType" : "QUERY", + "queryDescs" : [ { + "fieldName" : "query", + "fieldExtractor" : "LogicalPlanQueryExtractor" + } ] }, { "classname" : "org.apache.spark.sql.hudi.command.DropHoodieTableCommand", "tableDescs" : [ { @@ -1643,6 +1664,27 @@ "fieldName" : "query", "fieldExtractor" : "LogicalPlanQueryExtractor" } ] +}, { + "classname" : "org.apache.spark.sql.hudi.command.MergeIntoHoodieTableCommand", + "tableDescs" : [ { + "fieldName" : "mergeInto", + "fieldExtractor" : "HudiMergeIntoTargetTableExtractor", + "columnDesc" : null, + "actionTypeDesc" : { + "fieldName" : null, + "fieldExtractor" : null, + "actionType" : "UPDATE" + }, + "tableTypeDesc" : null, + "catalogDesc" : null, + "isInput" : false, + "setCurrentDatabaseIfMissing" : false + } ], + "opType" : "QUERY", + "queryDescs" : [ { + "fieldName" : "mergeInto", + "fieldExtractor" : "HudiMergeIntoSourceTableExtractor" + } ] }, { "classname" : "org.apache.spark.sql.hudi.command.RepairHoodieTableCommand", "tableDescs" : [ { @@ -1705,4 +1747,25 @@ } ], "opType" : "TRUNCATETABLE", "queryDescs" : [ ] +}, { + "classname" : "org.apache.spark.sql.hudi.command.UpdateHoodieTableCommand", + "tableDescs" : [ { + "fieldName" : "ut", + "fieldExtractor" : "HudiDataSourceV2RelationTableExtractor", + "columnDesc" : null, + "actionTypeDesc" : { + "fieldName" : null, + "fieldExtractor" : null, + "actionType" : "UPDATE" + }, + "tableTypeDesc" : null, + "catalogDesc" : null, + "isInput" : false, + "setCurrentDatabaseIfMissing" : false + } ], + "opType" : "QUERY", + "queryDescs" : [ { + "fieldName" : "query", + "fieldExtractor" : "LogicalPlanQueryExtractor" + } ] } ] \ No newline at end of file diff --git a/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/serde/queryExtractors.scala b/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/serde/queryExtractors.scala index f6fc19ac280..4ac87e100e4 100644 --- a/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/serde/queryExtractors.scala +++ b/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/serde/queryExtractors.scala @@ -19,6 +19,8 @@ package org.apache.kyuubi.plugin.spark.authz.serde import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan +import org.apache.kyuubi.util.reflect.ReflectUtils.invokeAs + trait QueryExtractor extends (AnyRef => Option[LogicalPlan]) with Extractor object QueryExtractor { @@ -44,3 +46,9 @@ class LogicalPlanOptionQueryExtractor extends QueryExtractor { v1.asInstanceOf[Option[LogicalPlan]] } } + +class HudiMergeIntoSourceTableExtractor extends QueryExtractor { + override def apply(v1: AnyRef): Option[LogicalPlan] = { + new LogicalPlanQueryExtractor().apply(invokeAs[LogicalPlan](v1, "sourceTable")) + } +} diff --git a/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/serde/tableExtractors.scala b/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/serde/tableExtractors.scala index 57eab9634f7..47c486af360 100644 --- a/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/serde/tableExtractors.scala +++ b/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/serde/tableExtractors.scala @@ -25,7 +25,7 @@ import org.apache.spark.sql.SparkSession import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.catalog.CatalogTable import org.apache.spark.sql.catalyst.expressions.Expression -import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan +import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, SubqueryAlias} import org.apache.kyuubi.plugin.spark.authz.util.AuthZUtils._ import org.apache.kyuubi.util.reflect.ReflectUtils._ @@ -80,7 +80,9 @@ class TableIdentifierTableExtractor extends TableExtractor { val catalogTable = spark.sessionState.catalog.getTableMetadata(identifier) Option(catalogTable.owner).filter(_.nonEmpty) } catch { - case _: Exception => None + case e: Exception => + e.printStackTrace() + None } Some(Table(None, identifier.database, identifier.table, owner)) } @@ -240,3 +242,29 @@ class TableTableExtractor extends TableExtractor { lookupExtractor[StringTableExtractor].apply(spark, tableName) } } + +class HudiDataSourceV2RelationTableExtractor extends TableExtractor { + override def apply(spark: SparkSession, v1: AnyRef): Option[Table] = { + invokeAs[LogicalPlan](v1, "table") match { + // Match multipartIdentifier with tableAlias + case SubqueryAlias(_, SubqueryAlias(identifier, _)) => + new StringTableExtractor().apply(spark, identifier.toString()) + // Match multipartIdentifier without tableAlias + case SubqueryAlias(identifier, _) => + new StringTableExtractor().apply(spark, identifier.toString()) + } + } +} + +class HudiMergeIntoTargetTableExtractor extends TableExtractor { + override def apply(spark: SparkSession, v1: AnyRef): Option[Table] = { + invokeAs[LogicalPlan](v1, "targetTable") match { + // Match multipartIdentifier with tableAlias + case SubqueryAlias(_, SubqueryAlias(identifier, relation)) => + new StringTableExtractor().apply(spark, identifier.toString()) + // Match multipartIdentifier without tableAlias + case SubqueryAlias(identifier, _) => + new StringTableExtractor().apply(spark, identifier.toString()) + } + } +} diff --git a/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/gen/HudiCommands.scala b/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/gen/HudiCommands.scala index d7e40237bfc..522059f27b6 100644 --- a/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/gen/HudiCommands.scala +++ b/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/gen/HudiCommands.scala @@ -18,6 +18,7 @@ package org.apache.kyuubi.plugin.spark.authz.gen import org.apache.kyuubi.plugin.spark.authz.OperationType._ +import org.apache.kyuubi.plugin.spark.authz.PrivilegeObjectActionType._ import org.apache.kyuubi.plugin.spark.authz.serde._ import org.apache.kyuubi.plugin.spark.authz.serde.TableType._ @@ -165,6 +166,40 @@ object HudiCommands { TableCommandSpec(cmd, Seq(tableDesc), SHOWPARTITIONS) } + val DeleteHoodieTableCommand = { + val cmd = "org.apache.spark.sql.hudi.command.DeleteHoodieTableCommand" + val actionTypeDesc = ActionTypeDesc(actionType = Some(UPDATE)) + val tableDesc = + TableDesc( + "dft", + classOf[HudiDataSourceV2RelationTableExtractor], + actionTypeDesc = Some(actionTypeDesc)) + TableCommandSpec(cmd, Seq(tableDesc), queryDescs = Seq(QueryDesc("query"))) + } + + val UpdateHoodieTableCommand = { + val cmd = "org.apache.spark.sql.hudi.command.UpdateHoodieTableCommand" + val actionTypeDesc = ActionTypeDesc(actionType = Some(UPDATE)) + val tableDesc = + TableDesc( + "ut", + classOf[HudiDataSourceV2RelationTableExtractor], + actionTypeDesc = Some(actionTypeDesc)) + TableCommandSpec(cmd, Seq(tableDesc), queryDescs = Seq(QueryDesc("query"))) + } + + val MergeIntoHoodieTableCommand = { + val cmd = "org.apache.spark.sql.hudi.command.MergeIntoHoodieTableCommand" + val actionTypeDesc = ActionTypeDesc(actionType = Some(UPDATE)) + val tableDesc = + TableDesc( + "mergeInto", + classOf[HudiMergeIntoTargetTableExtractor], + actionTypeDesc = Some(actionTypeDesc)) + val queryDescs = QueryDesc("mergeInto", classOf[HudiMergeIntoSourceTableExtractor]) + TableCommandSpec(cmd, Seq(tableDesc), queryDescs = Seq(queryDescs)) + } + val data: Array[TableCommandSpec] = Array( AlterHoodieTableAddColumnsCommand, AlterHoodieTableChangeColumnCommand, @@ -176,10 +211,13 @@ object HudiCommands { CreateHoodieTableLikeCommand, CompactionHoodieTableCommand, CompactionShowHoodieTableCommand, + DeleteHoodieTableCommand, DropHoodieTableCommand, InsertIntoHoodieTableCommand, + MergeIntoHoodieTableCommand, RepairHoodieTableCommand, TruncateHoodieTableCommand, ShowHoodieTablePartitionsCommand, - Spark31AlterTableCommand) + Spark31AlterTableCommand, + UpdateHoodieTableCommand) } diff --git a/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/ranger/HudiCatalogRangerSparkExtensionSuite.scala b/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/ranger/HudiCatalogRangerSparkExtensionSuite.scala index 193446bb24f..fd7acd1295e 100644 --- a/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/ranger/HudiCatalogRangerSparkExtensionSuite.scala +++ b/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/ranger/HudiCatalogRangerSparkExtensionSuite.scala @@ -33,7 +33,7 @@ import org.apache.kyuubi.util.AssertionUtils.interceptContains */ @HudiTest class HudiCatalogRangerSparkExtensionSuite extends RangerSparkExtensionSuite { - override protected val catalogImpl: String = "hive" + override protected val catalogImpl: String = "in-memory" // TODO: Apache Hudi not support Spark 3.5 and Scala 2.13 yet, // should change after Apache Hudi support Spark 3.5 and Scala 2.13. private def isSupportedVersion = !isSparkV35OrGreater && !isScalaV213 @@ -407,4 +407,69 @@ class HudiCatalogRangerSparkExtensionSuite extends RangerSparkExtensionSuite { } } } + + test("DeleteHoodieTableCommand/UpdateHoodieTableCommand/MergeIntoHoodieTableCommand") { + withSingleCallEnabled { + withCleanTmpResources(Seq( + (s"$namespace1.$table1", "table"), + (s"$namespace1.$table2", "table"), + (namespace1, "database"))) { + doAs(admin, sql(s"CREATE DATABASE IF NOT EXISTS $namespace1")) + doAs( + admin, + sql( + s""" + |CREATE TABLE IF NOT EXISTS $namespace1.$table1(id int, name string, city string) + |USING HUDI + |OPTIONS ( + | type = 'cow', + | primaryKey = 'id', + | 'hoodie.datasource.hive_sync.enable' = 'false' + |) + |PARTITIONED BY(city) + |""".stripMargin)) + + doAs( + admin, + sql( + s""" + |CREATE TABLE IF NOT EXISTS $namespace1.$table2(id int, name string, city string) + |USING HUDI + |OPTIONS ( + | type = 'cow', + | primaryKey = 'id', + | 'hoodie.datasource.hive_sync.enable' = 'false' + |) + |PARTITIONED BY(city) + |""".stripMargin)) + + val deleteFrom = s"DELETE FROM $namespace1.$table1 WHERE id = 10" + interceptContains[AccessControlException] { + doAs(someone, sql(deleteFrom)) + }(s"does not have [update] privilege on [$namespace1/$table1]") + doAs(admin, sql(deleteFrom)) + + val updateSql = s"UPDATE $namespace1.$table1 SET name = 'test' WHERE id > 10" + interceptContains[AccessControlException] { + doAs(someone, sql(updateSql)) + }(s"does not have [update] privilege on [$namespace1/$table1]") + doAs(admin, sql(updateSql)) + + val mergeIntoSQL = + s""" + |MERGE INTO $namespace1.$table1 target + |USING $namespace1.$table2 source + |ON target.id = source.id + |WHEN MATCHED + |AND target.name == 'test' + | THEN UPDATE SET id = source.id, name = source.name, city = source.city + |""".stripMargin + interceptContains[AccessControlException] { + doAs(someone, sql(mergeIntoSQL)) + }(s"does not have [select] privilege on " + + s"[$namespace1/$table2/id,$namespace1/$table2/name,$namespace1/$table2/city]") + doAs(admin, sql(mergeIntoSQL)) + } + } + } }