From 23f32cf1512fe28839e2af450a5f733fa257cbeb Mon Sep 17 00:00:00 2001 From: zml1206 Date: Thu, 16 Nov 2023 10:12:43 +0800 Subject: [PATCH] [KYUUBI #5690][AUTHZ] Support insert into/overwrite path-based table for Delta Lake in Authz ### _Why are the changes needed?_ To close #5690 . Support insert into/overwrite path-based table for Delta Lake in Authz plugin. ### _How was this patch tested?_ - [x] Add some test cases that check the changes thoroughly including negative and positive cases if possible - [ ] Add screenshots for manual tests if appropriate - [ ] [Run test](https://kyuubi.readthedocs.io/en/master/contributing/code/testing.html#running-tests) locally before make a pull request ### _Was this patch authored or co-authored using generative AI tooling?_ No. Closes #5691 from zml1206/KYUUBI-5690. Closes #5690 e1506ca93 [zml1206] update 704ce2c6e [zml1206] fix bf3471109 [zml1206] Support insert into/overwrite path-based table for Delta Lake in Authz Authored-by: zml1206 Signed-off-by: Kent Yao --- ...uubi.plugin.spark.authz.serde.URIExtractor | 1 + .../main/resources/table_command_spec.json | 18 +++++++++-- .../spark/authz/serde/tableExtractors.scala | 15 +++++----- .../spark/authz/serde/uriExtractors.scala | 16 +++++++++- .../spark/authz/gen/TableCommands.scala | 6 ++-- ...eltaCatalogRangerSparkExtensionSuite.scala | 30 +++++++++++++++++++ 6 files changed, 73 insertions(+), 13 deletions(-) diff --git a/extensions/spark/kyuubi-spark-authz/src/main/resources/META-INF/services/org.apache.kyuubi.plugin.spark.authz.serde.URIExtractor b/extensions/spark/kyuubi-spark-authz/src/main/resources/META-INF/services/org.apache.kyuubi.plugin.spark.authz.serde.URIExtractor index fb869f6027c..60f761233bc 100644 --- a/extensions/spark/kyuubi-spark-authz/src/main/resources/META-INF/services/org.apache.kyuubi.plugin.spark.authz.serde.URIExtractor +++ b/extensions/spark/kyuubi-spark-authz/src/main/resources/META-INF/services/org.apache.kyuubi.plugin.spark.authz.serde.URIExtractor @@ -18,6 +18,7 @@ org.apache.kyuubi.plugin.spark.authz.serde.BaseRelationFileIndexURIExtractor org.apache.kyuubi.plugin.spark.authz.serde.CatalogStorageFormatURIExtractor org.apache.kyuubi.plugin.spark.authz.serde.CatalogTableURIExtractor +org.apache.kyuubi.plugin.spark.authz.serde.DataSourceV2RelationURIExtractor org.apache.kyuubi.plugin.spark.authz.serde.IdentifierURIExtractor org.apache.kyuubi.plugin.spark.authz.serde.PartitionLocsSeqURIExtractor org.apache.kyuubi.plugin.spark.authz.serde.PropertiesLocationUriExtractor diff --git a/extensions/spark/kyuubi-spark-authz/src/main/resources/table_command_spec.json b/extensions/spark/kyuubi-spark-authz/src/main/resources/table_command_spec.json index 57fa3b26759..4442e6868a6 100644 --- a/extensions/spark/kyuubi-spark-authz/src/main/resources/table_command_spec.json +++ b/extensions/spark/kyuubi-spark-authz/src/main/resources/table_command_spec.json @@ -79,7 +79,11 @@ "fieldName" : "query", "fieldExtractor" : "LogicalPlanQueryExtractor" } ], - "uriDescs" : [ ] + "uriDescs" : [ { + "fieldName" : "table", + "fieldExtractor" : "DataSourceV2RelationURIExtractor", + "isInput" : false + } ] }, { "classname" : "org.apache.spark.sql.catalyst.plans.logical.CacheTable", "tableDescs" : [ ], @@ -365,7 +369,11 @@ "fieldName" : "query", "fieldExtractor" : "LogicalPlanQueryExtractor" } ], - "uriDescs" : [ ] + "uriDescs" : [ { + "fieldName" : "table", + "fieldExtractor" : "DataSourceV2RelationURIExtractor", + "isInput" : false + } ] }, { "classname" : "org.apache.spark.sql.catalyst.plans.logical.OverwritePartitionsDynamic", "tableDescs" : [ { @@ -387,7 +395,11 @@ "fieldName" : "query", "fieldExtractor" : "LogicalPlanQueryExtractor" } ], - "uriDescs" : [ ] + "uriDescs" : [ { + "fieldName" : "table", + "fieldExtractor" : "DataSourceV2RelationURIExtractor", + "isInput" : false + } ] }, { "classname" : "org.apache.spark.sql.catalyst.plans.logical.RefreshTable", "tableDescs" : [ { diff --git a/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/serde/tableExtractors.scala b/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/serde/tableExtractors.scala index ce595c66b19..82448f9cdad 100644 --- a/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/serde/tableExtractors.scala +++ b/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/serde/tableExtractors.scala @@ -28,6 +28,7 @@ import org.apache.spark.sql.catalyst.catalog.CatalogTable import org.apache.spark.sql.catalyst.expressions.Expression import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, SubqueryAlias} import org.apache.spark.sql.connector.catalog.Identifier +import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation import org.apache.spark.sql.types.DataType import org.apache.spark.unsafe.types.UTF8String @@ -186,18 +187,18 @@ class ExpressionSeqTableExtractor extends TableExtractor { class DataSourceV2RelationTableExtractor extends TableExtractor { override def apply(spark: SparkSession, v1: AnyRef): Option[Table] = { val plan = v1.asInstanceOf[LogicalPlan] - val maybeV2Relation = plan.find(_.getClass.getSimpleName == "DataSourceV2Relation") - maybeV2Relation match { - case None => None - case Some(v2Relation) => - val maybeCatalogPlugin = invokeAs[Option[AnyRef]](v2Relation, "catalog") - val maybeCatalog = maybeCatalogPlugin.flatMap(catalogPlugin => + plan.find(_.getClass.getSimpleName == "DataSourceV2Relation").get match { + case v2Relation: DataSourceV2Relation + if v2Relation.identifier == None || + !isPathIdentifier(v2Relation.identifier.get.name(), spark) => + val maybeCatalog = v2Relation.catalog.flatMap(catalogPlugin => lookupExtractor[CatalogPluginCatalogExtractor].apply(catalogPlugin)) - lookupExtractor[TableTableExtractor].apply(spark, invokeAs[AnyRef](v2Relation, "table")) + lookupExtractor[TableTableExtractor].apply(spark, v2Relation.table) .map { table => val maybeOwner = TableExtractor.getOwner(v2Relation) table.copy(catalog = maybeCatalog, owner = maybeOwner) } + case _ => None } } } diff --git a/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/serde/uriExtractors.scala b/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/serde/uriExtractors.scala index 7d483351668..eff8427462d 100644 --- a/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/serde/uriExtractors.scala +++ b/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/serde/uriExtractors.scala @@ -19,9 +19,10 @@ package org.apache.kyuubi.plugin.spark.authz.serde import org.apache.spark.sql.SparkSession import org.apache.spark.sql.catalyst.catalog.{CatalogStorageFormat, CatalogTable} -import org.apache.spark.sql.catalyst.plans.logical.SubqueryAlias +import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, SubqueryAlias} import org.apache.spark.sql.connector.catalog.Identifier import org.apache.spark.sql.execution.datasources.HadoopFsRelation +import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation import org.apache.kyuubi.plugin.spark.authz.util.PathIdentifier._ import org.apache.kyuubi.util.reflect.ReflectUtils.invokeAs @@ -115,3 +116,16 @@ class SubqueryAliasURIExtractor extends URIExtractor { Seq(identifier.name).map(Uri) } } + +class DataSourceV2RelationURIExtractor extends URIExtractor { + override def apply(spark: SparkSession, v1: AnyRef): Seq[Uri] = { + val plan = v1.asInstanceOf[LogicalPlan] + plan.find(_.getClass.getSimpleName == "DataSourceV2Relation").get match { + case v2Relation: DataSourceV2Relation + if v2Relation.identifier != None && + isPathIdentifier(v2Relation.identifier.get.name, spark) => + Seq(v2Relation.identifier.get.name).map(Uri) + case _ => Nil + } + } +} diff --git a/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/gen/TableCommands.scala b/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/gen/TableCommands.scala index 4d7dc2ac5da..ee0345a8c26 100644 --- a/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/gen/TableCommands.scala +++ b/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/gen/TableCommands.scala @@ -270,7 +270,8 @@ object TableCommands extends CommandSpecs[TableCommandSpec] { "table", classOf[DataSourceV2RelationTableExtractor], actionTypeDesc = Some(actionTypeDesc)) - TableCommandSpec(cmd, Seq(tableDesc), queryDescs = Seq(queryQueryDesc)) + val uriDescs = Seq(UriDesc("table", classOf[DataSourceV2RelationURIExtractor])) + TableCommandSpec(cmd, Seq(tableDesc), queryDescs = Seq(queryQueryDesc), uriDescs = uriDescs) } val ReplaceData = { @@ -308,7 +309,8 @@ object TableCommands extends CommandSpecs[TableCommandSpec] { "table", classOf[DataSourceV2RelationTableExtractor], actionTypeDesc = Some(actionTypeDesc)) - TableCommandSpec(cmd, Seq(tableDesc), queryDescs = Seq(queryQueryDesc)) + val uriDescs = Seq(UriDesc("table", classOf[DataSourceV2RelationURIExtractor])) + TableCommandSpec(cmd, Seq(tableDesc), queryDescs = Seq(queryQueryDesc), uriDescs = uriDescs) } val OverwritePartitionsDynamic = { diff --git a/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/ranger/DeltaCatalogRangerSparkExtensionSuite.scala b/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/ranger/DeltaCatalogRangerSparkExtensionSuite.scala index 514b7970b41..331bd380df0 100644 --- a/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/ranger/DeltaCatalogRangerSparkExtensionSuite.scala +++ b/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/ranger/DeltaCatalogRangerSparkExtensionSuite.scala @@ -357,6 +357,36 @@ class DeltaCatalogRangerSparkExtensionSuite extends RangerSparkExtensionSuite { doAs(admin, sql(updateTableSql)) }) } + + test("insert path-based table") { + withSingleCallEnabled { + withCleanTmpResources(Seq((s"$namespace1.$table2", "table"), (s"$namespace1", "database"))) { + doAs(admin, sql(s"CREATE DATABASE IF NOT EXISTS $namespace1")) + doAs(admin, sql(createTableSql(namespace1, table2))) + withTempDir(path => { + doAs(admin, sql(createPathBasedTableSql(path))) + // insert into + val insertIntoSql = s"INSERT INTO delta.`$path` SELECT * FROM $namespace1.$table2" + interceptContains[AccessControlException]( + doAs(someone, sql(insertIntoSql)))( + s"does not have [select] privilege on [$namespace1/$table2/id," + + s"$namespace1/$table2/name,$namespace1/$table2/gender," + + s"$namespace1/$table2/birthDate], [write] privilege on [[$path, $path/]]") + doAs(admin, sql(insertIntoSql)) + + // insert overwrite + val insertOverwriteSql = + s"INSERT OVERWRITE delta.`$path` SELECT * FROM $namespace1.$table2" + interceptContains[AccessControlException]( + doAs(someone, sql(insertOverwriteSql)))( + s"does not have [select] privilege on [$namespace1/$table2/id," + + s"$namespace1/$table2/name,$namespace1/$table2/gender," + + s"$namespace1/$table2/birthDate], [write] privilege on [[$path, $path/]]") + doAs(admin, sql(insertOverwriteSql)) + }) + } + } + } } object DeltaCatalogRangerSparkExtensionSuite {