Skip to content

Commit

Permalink
[KYUUBI apache#5447][AUTHZ] Support hudi DeleteHoodieTableCommand/Upd…
Browse files Browse the repository at this point in the history
…ateHoodieTableCommand/MergeIntoHoodieTableCommand
  • Loading branch information
AngersZhuuuu committed Oct 19, 2023
1 parent 48bdc7d commit 3700686
Show file tree
Hide file tree
Showing 7 changed files with 212 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -17,3 +17,4 @@

org.apache.kyuubi.plugin.spark.authz.serde.LogicalPlanOptionQueryExtractor
org.apache.kyuubi.plugin.spark.authz.serde.LogicalPlanQueryExtractor
org.apache.kyuubi.plugin.spark.authz.serde.HudiMergeIntoSourceTableExtractor
Original file line number Diff line number Diff line change
Expand Up @@ -27,3 +27,5 @@ org.apache.kyuubi.plugin.spark.authz.serde.ResolvedTableTableExtractor
org.apache.kyuubi.plugin.spark.authz.serde.StringTableExtractor
org.apache.kyuubi.plugin.spark.authz.serde.TableIdentifierTableExtractor
org.apache.kyuubi.plugin.spark.authz.serde.TableTableExtractor
org.apache.kyuubi.plugin.spark.authz.serde.HudiDataSourceV2RelationTableExtractor
org.apache.kyuubi.plugin.spark.authz.serde.HudiMergeIntoTargetTableExtractor
Original file line number Diff line number Diff line change
Expand Up @@ -1604,6 +1604,27 @@
} ],
"opType" : "CREATETABLE",
"queryDescs" : [ ]
}, {
"classname" : "org.apache.spark.sql.hudi.command.DeleteHoodieTableCommand",
"tableDescs" : [ {
"fieldName" : "dft",
"fieldExtractor" : "HudiDataSourceV2RelationTableExtractor",
"columnDesc" : null,
"actionTypeDesc" : {
"fieldName" : null,
"fieldExtractor" : null,
"actionType" : "UPDATE"
},
"tableTypeDesc" : null,
"catalogDesc" : null,
"isInput" : false,
"setCurrentDatabaseIfMissing" : false
} ],
"opType" : "QUERY",
"queryDescs" : [ {
"fieldName" : "query",
"fieldExtractor" : "LogicalPlanQueryExtractor"
} ]
}, {
"classname" : "org.apache.spark.sql.hudi.command.DropHoodieTableCommand",
"tableDescs" : [ {
Expand Down Expand Up @@ -1643,6 +1664,27 @@
"fieldName" : "query",
"fieldExtractor" : "LogicalPlanQueryExtractor"
} ]
}, {
"classname" : "org.apache.spark.sql.hudi.command.MergeIntoHoodieTableCommand",
"tableDescs" : [ {
"fieldName" : "mergeInto",
"fieldExtractor" : "HudiMergeIntoTargetTableExtractor",
"columnDesc" : null,
"actionTypeDesc" : {
"fieldName" : null,
"fieldExtractor" : null,
"actionType" : "UPDATE"
},
"tableTypeDesc" : null,
"catalogDesc" : null,
"isInput" : false,
"setCurrentDatabaseIfMissing" : false
} ],
"opType" : "QUERY",
"queryDescs" : [ {
"fieldName" : "mergeInto",
"fieldExtractor" : "HudiMergeIntoSourceTableExtractor"
} ]
}, {
"classname" : "org.apache.spark.sql.hudi.command.RepairHoodieTableCommand",
"tableDescs" : [ {
Expand Down Expand Up @@ -1688,4 +1730,25 @@
} ],
"opType" : "TRUNCATETABLE",
"queryDescs" : [ ]
}, {
"classname" : "org.apache.spark.sql.hudi.command.UpdateHoodieTableCommand",
"tableDescs" : [ {
"fieldName" : "ut",
"fieldExtractor" : "HudiDataSourceV2RelationTableExtractor",
"columnDesc" : null,
"actionTypeDesc" : {
"fieldName" : null,
"fieldExtractor" : null,
"actionType" : "UPDATE"
},
"tableTypeDesc" : null,
"catalogDesc" : null,
"isInput" : false,
"setCurrentDatabaseIfMissing" : false
} ],
"opType" : "QUERY",
"queryDescs" : [ {
"fieldName" : "query",
"fieldExtractor" : "LogicalPlanQueryExtractor"
} ]
} ]
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ package org.apache.kyuubi.plugin.spark.authz.serde

import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan

import org.apache.kyuubi.util.reflect.ReflectUtils.invokeAs

trait QueryExtractor extends (AnyRef => Option[LogicalPlan]) with Extractor

object QueryExtractor {
Expand All @@ -44,3 +46,9 @@ class LogicalPlanOptionQueryExtractor extends QueryExtractor {
v1.asInstanceOf[Option[LogicalPlan]]
}
}

class HudiMergeIntoSourceTableExtractor extends QueryExtractor {
override def apply(v1: AnyRef): Option[LogicalPlan] = {
new LogicalPlanQueryExtractor().apply(invokeAs[LogicalPlan](v1, "sourceTable"))
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,15 @@ package org.apache.kyuubi.plugin.spark.authz.serde
import java.util.{Map => JMap}

import scala.collection.JavaConverters._
import scala.collection.mutable.ArrayBuffer

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.catalog.CatalogTable
import org.apache.spark.sql.catalyst.expressions.Expression
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan

import org.apache.kyuubi.plugin.spark.authz.{PrivilegeObject, PrivilegesBuilder}
import org.apache.kyuubi.plugin.spark.authz.util.AuthZUtils._
import org.apache.kyuubi.util.reflect.ReflectUtils._

Expand Down Expand Up @@ -240,3 +242,38 @@ class TableTableExtractor extends TableExtractor {
lookupExtractor[StringTableExtractor].apply(spark, tableName)
}
}

class HudiDataSourceV2RelationTableExtractor extends TableExtractor {
override def apply(spark: SparkSession, v1: AnyRef): Option[Table] = {
val outputObjs = new ArrayBuffer[PrivilegeObject]
PrivilegesBuilder.buildQuery(invokeAs[LogicalPlan](v1, "table"), outputObjs, spark = spark)
if (outputObjs.isEmpty) {
None
} else {
Option(Table(
outputObjs.head.catalog,
Option(outputObjs.head.dbname),
outputObjs.head.objectName,
outputObjs.head.owner))
}
}
}

class HudiMergeIntoTargetTableExtractor extends TableExtractor {
override def apply(spark: SparkSession, v1: AnyRef): Option[Table] = {
val outputObjs = new ArrayBuffer[PrivilegeObject]
PrivilegesBuilder.buildQuery(
invokeAs[LogicalPlan](v1, "targetTable"),
outputObjs,
spark = spark)
if (outputObjs.isEmpty) {
None
} else {
Option(Table(
outputObjs.head.catalog,
Option(outputObjs.head.dbname),
outputObjs.head.objectName,
outputObjs.head.owner))
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.kyuubi.plugin.spark.authz.gen

import org.apache.kyuubi.plugin.spark.authz.OperationType._
import org.apache.kyuubi.plugin.spark.authz.PrivilegeObjectActionType._
import org.apache.kyuubi.plugin.spark.authz.serde._
import org.apache.kyuubi.plugin.spark.authz.serde.TableType._

Expand Down Expand Up @@ -154,6 +155,40 @@ object HudiCommands {
TableCommandSpec(cmd, Seq(tableDesc), queryDescs = Seq(QueryDesc("query")))
}

val DeleteHoodieTableCommand = {
val cmd = "org.apache.spark.sql.hudi.command.DeleteHoodieTableCommand"
val actionTypeDesc = ActionTypeDesc(actionType = Some(UPDATE))
val tableDesc =
TableDesc(
"dft",
classOf[HudiDataSourceV2RelationTableExtractor],
actionTypeDesc = Some(actionTypeDesc))
TableCommandSpec(cmd, Seq(tableDesc), queryDescs = Seq(QueryDesc("query")))
}

val UpdateHoodieTableCommand = {
val cmd = "org.apache.spark.sql.hudi.command.UpdateHoodieTableCommand"
val actionTypeDesc = ActionTypeDesc(actionType = Some(UPDATE))
val tableDesc =
TableDesc(
"ut",
classOf[HudiDataSourceV2RelationTableExtractor],
actionTypeDesc = Some(actionTypeDesc))
TableCommandSpec(cmd, Seq(tableDesc), queryDescs = Seq(QueryDesc("query")))
}

val MergeIntoHoodieTableCommand = {
val cmd = "org.apache.spark.sql.hudi.command.MergeIntoHoodieTableCommand"
val actionTypeDesc = ActionTypeDesc(actionType = Some(UPDATE))
val tableDesc =
TableDesc(
"mergeInto",
classOf[HudiMergeIntoTargetTableExtractor],
actionTypeDesc = Some(actionTypeDesc))
val queryDescs = QueryDesc("mergeInto", classOf[HudiMergeIntoSourceTableExtractor])
TableCommandSpec(cmd, Seq(tableDesc), queryDescs = Seq(queryDescs))
}

val data: Array[TableCommandSpec] = Array(
AlterHoodieTableAddColumnsCommand,
AlterHoodieTableChangeColumnCommand,
Expand All @@ -165,9 +200,12 @@ object HudiCommands {
CreateHoodieTableLikeCommand,
CompactionHoodieTableCommand,
CompactionShowHoodieTableCommand,
DeleteHoodieTableCommand,
DropHoodieTableCommand,
InsertIntoHoodieTableCommand,
MergeIntoHoodieTableCommand,
RepairHoodieTableCommand,
TruncateHoodieTableCommand,
Spark31AlterTableCommand)
Spark31AlterTableCommand,
UpdateHoodieTableCommand)
}
Original file line number Diff line number Diff line change
Expand Up @@ -370,4 +370,66 @@ class HudiCatalogRangerSparkExtensionSuite extends RangerSparkExtensionSuite {
}
}
}

test("DeleteHoodieTableCommand/UpdateHoodieTableCommand/MergeIntoHoodieTableCommand") {
withSingleCallEnabled {
withCleanTmpResources(Seq(
(s"$namespace1.$table1", "table"),
(s"$namespace1.$table2", "table"),
(namespace1, "database"))) {
doAs(admin, sql(s"CREATE DATABASE IF NOT EXISTS $namespace1"))
doAs(
admin,
sql(
s"""
|CREATE TABLE IF NOT EXISTS $namespace1.$table1(id int, name string, city string)
|USING HUDI
|OPTIONS (
| type = 'cow',
| primaryKey = 'id',
| 'hoodie.datasource.hive_sync.enable' = 'false'
|)
|PARTITIONED BY(city)
|""".stripMargin))

doAs(
admin,
sql(
s"""
|CREATE TABLE IF NOT EXISTS $namespace1.$table2(id int, name string, city string)
|USING HUDI
|OPTIONS (
| type = 'cow',
| primaryKey = 'id',
| 'hoodie.datasource.hive_sync.enable' = 'false'
|)
|PARTITIONED BY(city)
|""".stripMargin))

val deleteFrom = s"DELETE FROM $namespace1.$table1 WHERE id = 10"
interceptContains[AccessControlException] {
doAs(someone, sql(deleteFrom))
}(s"does not have [update] privilege on [$namespace1/$table1]")

val updateSql = s"UPDATE $namespace1.$table1 SET name = 'test' WHERE id > 10"
interceptContains[AccessControlException] {
doAs(someone, sql(updateSql))
}(s"does not have [update] privilege on [$namespace1/$table1]")

val mergeIntoSQL =
s"""
|MERGE INTO $namespace1.$table1 target
|USING $namespace1.$table2 source
|ON target.id = source.id
|WHEN MATCHED
|AND target.name == 'test'
| THEN UPDATE SET id = source.id, name = source.name, city = source.city
|""".stripMargin
interceptContains[AccessControlException] {
doAs(someone, sql(mergeIntoSQL))
}(s"does not have [select] privilege on " +
s"[$namespace1/$table2/id,$namespace1/$table2/name,$namespace1/$table2/city]")
}
}
}
}

0 comments on commit 3700686

Please sign in to comment.