Skip to content

Commit

Permalink
[KYUUBI #5690][AUTHZ] Support insert into/overwrite path-based table …
Browse files Browse the repository at this point in the history
…for Delta Lake in Authz

### _Why are the changes needed?_
To close #5690 .
Support insert into/overwrite path-based table for Delta Lake in Authz plugin.

### _How was this patch tested?_
- [x] Add some test cases that check the changes thoroughly including negative and positive cases if possible

- [ ] Add screenshots for manual tests if appropriate

- [ ] [Run test](https://kyuubi.readthedocs.io/en/master/contributing/code/testing.html#running-tests) locally before make a pull request

### _Was this patch authored or co-authored using generative AI tooling?_
No.

Closes #5691 from zml1206/KYUUBI-5690.

Closes #5690

e1506ca [zml1206] update
704ce2c [zml1206] fix
bf34711 [zml1206] Support insert into/overwrite path-based table for Delta Lake in Authz

Authored-by: zml1206 <[email protected]>
Signed-off-by: Kent Yao <[email protected]>
  • Loading branch information
zml1206 authored and yaooqinn committed Nov 16, 2023
1 parent 10d0aec commit 23f32cf
Show file tree
Hide file tree
Showing 6 changed files with 73 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
org.apache.kyuubi.plugin.spark.authz.serde.BaseRelationFileIndexURIExtractor
org.apache.kyuubi.plugin.spark.authz.serde.CatalogStorageFormatURIExtractor
org.apache.kyuubi.plugin.spark.authz.serde.CatalogTableURIExtractor
org.apache.kyuubi.plugin.spark.authz.serde.DataSourceV2RelationURIExtractor
org.apache.kyuubi.plugin.spark.authz.serde.IdentifierURIExtractor
org.apache.kyuubi.plugin.spark.authz.serde.PartitionLocsSeqURIExtractor
org.apache.kyuubi.plugin.spark.authz.serde.PropertiesLocationUriExtractor
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,11 @@
"fieldName" : "query",
"fieldExtractor" : "LogicalPlanQueryExtractor"
} ],
"uriDescs" : [ ]
"uriDescs" : [ {
"fieldName" : "table",
"fieldExtractor" : "DataSourceV2RelationURIExtractor",
"isInput" : false
} ]
}, {
"classname" : "org.apache.spark.sql.catalyst.plans.logical.CacheTable",
"tableDescs" : [ ],
Expand Down Expand Up @@ -365,7 +369,11 @@
"fieldName" : "query",
"fieldExtractor" : "LogicalPlanQueryExtractor"
} ],
"uriDescs" : [ ]
"uriDescs" : [ {
"fieldName" : "table",
"fieldExtractor" : "DataSourceV2RelationURIExtractor",
"isInput" : false
} ]
}, {
"classname" : "org.apache.spark.sql.catalyst.plans.logical.OverwritePartitionsDynamic",
"tableDescs" : [ {
Expand All @@ -387,7 +395,11 @@
"fieldName" : "query",
"fieldExtractor" : "LogicalPlanQueryExtractor"
} ],
"uriDescs" : [ ]
"uriDescs" : [ {
"fieldName" : "table",
"fieldExtractor" : "DataSourceV2RelationURIExtractor",
"isInput" : false
} ]
}, {
"classname" : "org.apache.spark.sql.catalyst.plans.logical.RefreshTable",
"tableDescs" : [ {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import org.apache.spark.sql.catalyst.catalog.CatalogTable
import org.apache.spark.sql.catalyst.expressions.Expression
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, SubqueryAlias}
import org.apache.spark.sql.connector.catalog.Identifier
import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation
import org.apache.spark.sql.types.DataType
import org.apache.spark.unsafe.types.UTF8String

Expand Down Expand Up @@ -186,18 +187,18 @@ class ExpressionSeqTableExtractor extends TableExtractor {
class DataSourceV2RelationTableExtractor extends TableExtractor {
override def apply(spark: SparkSession, v1: AnyRef): Option[Table] = {
val plan = v1.asInstanceOf[LogicalPlan]
val maybeV2Relation = plan.find(_.getClass.getSimpleName == "DataSourceV2Relation")
maybeV2Relation match {
case None => None
case Some(v2Relation) =>
val maybeCatalogPlugin = invokeAs[Option[AnyRef]](v2Relation, "catalog")
val maybeCatalog = maybeCatalogPlugin.flatMap(catalogPlugin =>
plan.find(_.getClass.getSimpleName == "DataSourceV2Relation").get match {
case v2Relation: DataSourceV2Relation
if v2Relation.identifier == None ||
!isPathIdentifier(v2Relation.identifier.get.name(), spark) =>
val maybeCatalog = v2Relation.catalog.flatMap(catalogPlugin =>
lookupExtractor[CatalogPluginCatalogExtractor].apply(catalogPlugin))
lookupExtractor[TableTableExtractor].apply(spark, invokeAs[AnyRef](v2Relation, "table"))
lookupExtractor[TableTableExtractor].apply(spark, v2Relation.table)
.map { table =>
val maybeOwner = TableExtractor.getOwner(v2Relation)
table.copy(catalog = maybeCatalog, owner = maybeOwner)
}
case _ => None
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,10 @@ package org.apache.kyuubi.plugin.spark.authz.serde

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.catalog.{CatalogStorageFormat, CatalogTable}
import org.apache.spark.sql.catalyst.plans.logical.SubqueryAlias
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, SubqueryAlias}
import org.apache.spark.sql.connector.catalog.Identifier
import org.apache.spark.sql.execution.datasources.HadoopFsRelation
import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation

import org.apache.kyuubi.plugin.spark.authz.util.PathIdentifier._
import org.apache.kyuubi.util.reflect.ReflectUtils.invokeAs
Expand Down Expand Up @@ -115,3 +116,16 @@ class SubqueryAliasURIExtractor extends URIExtractor {
Seq(identifier.name).map(Uri)
}
}

class DataSourceV2RelationURIExtractor extends URIExtractor {
override def apply(spark: SparkSession, v1: AnyRef): Seq[Uri] = {
val plan = v1.asInstanceOf[LogicalPlan]
plan.find(_.getClass.getSimpleName == "DataSourceV2Relation").get match {
case v2Relation: DataSourceV2Relation
if v2Relation.identifier != None &&
isPathIdentifier(v2Relation.identifier.get.name, spark) =>
Seq(v2Relation.identifier.get.name).map(Uri)
case _ => Nil
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -270,7 +270,8 @@ object TableCommands extends CommandSpecs[TableCommandSpec] {
"table",
classOf[DataSourceV2RelationTableExtractor],
actionTypeDesc = Some(actionTypeDesc))
TableCommandSpec(cmd, Seq(tableDesc), queryDescs = Seq(queryQueryDesc))
val uriDescs = Seq(UriDesc("table", classOf[DataSourceV2RelationURIExtractor]))
TableCommandSpec(cmd, Seq(tableDesc), queryDescs = Seq(queryQueryDesc), uriDescs = uriDescs)
}

val ReplaceData = {
Expand Down Expand Up @@ -308,7 +309,8 @@ object TableCommands extends CommandSpecs[TableCommandSpec] {
"table",
classOf[DataSourceV2RelationTableExtractor],
actionTypeDesc = Some(actionTypeDesc))
TableCommandSpec(cmd, Seq(tableDesc), queryDescs = Seq(queryQueryDesc))
val uriDescs = Seq(UriDesc("table", classOf[DataSourceV2RelationURIExtractor]))
TableCommandSpec(cmd, Seq(tableDesc), queryDescs = Seq(queryQueryDesc), uriDescs = uriDescs)
}

val OverwritePartitionsDynamic = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -357,6 +357,36 @@ class DeltaCatalogRangerSparkExtensionSuite extends RangerSparkExtensionSuite {
doAs(admin, sql(updateTableSql))
})
}

test("insert path-based table") {
withSingleCallEnabled {
withCleanTmpResources(Seq((s"$namespace1.$table2", "table"), (s"$namespace1", "database"))) {
doAs(admin, sql(s"CREATE DATABASE IF NOT EXISTS $namespace1"))
doAs(admin, sql(createTableSql(namespace1, table2)))
withTempDir(path => {
doAs(admin, sql(createPathBasedTableSql(path)))
// insert into
val insertIntoSql = s"INSERT INTO delta.`$path` SELECT * FROM $namespace1.$table2"
interceptContains[AccessControlException](
doAs(someone, sql(insertIntoSql)))(
s"does not have [select] privilege on [$namespace1/$table2/id," +
s"$namespace1/$table2/name,$namespace1/$table2/gender," +
s"$namespace1/$table2/birthDate], [write] privilege on [[$path, $path/]]")
doAs(admin, sql(insertIntoSql))

// insert overwrite
val insertOverwriteSql =
s"INSERT OVERWRITE delta.`$path` SELECT * FROM $namespace1.$table2"
interceptContains[AccessControlException](
doAs(someone, sql(insertOverwriteSql)))(
s"does not have [select] privilege on [$namespace1/$table2/id," +
s"$namespace1/$table2/name,$namespace1/$table2/gender," +
s"$namespace1/$table2/birthDate], [write] privilege on [[$path, $path/]]")
doAs(admin, sql(insertOverwriteSql))
})
}
}
}
}

object DeltaCatalogRangerSparkExtensionSuite {
Expand Down

0 comments on commit 23f32cf

Please sign in to comment.