From d5d0dc3cc0f6b32605e1b1409d40c025c1a6a8be Mon Sep 17 00:00:00 2001 From: zml1206 Date: Thu, 23 Nov 2023 22:29:22 +0800 Subject: [PATCH] Support alter path-based table for Delta Lake in Authz --- .../main/resources/table_command_spec.json | 42 ++++++-- .../spark/authz/gen/TableCommands.scala | 13 ++- ...eltaCatalogRangerSparkExtensionSuite.scala | 102 +++++++++++++++++- 3 files changed, 143 insertions(+), 14 deletions(-) diff --git a/extensions/spark/kyuubi-spark-authz/src/main/resources/table_command_spec.json b/extensions/spark/kyuubi-spark-authz/src/main/resources/table_command_spec.json index e4fb863c35d..eaaa6203d6b 100644 --- a/extensions/spark/kyuubi-spark-authz/src/main/resources/table_command_spec.json +++ b/extensions/spark/kyuubi-spark-authz/src/main/resources/table_command_spec.json @@ -12,7 +12,11 @@ } ], "opType" : "ALTERTABLE_ADDCOLS", "queryDescs" : [ ], - "uriDescs" : [ ] + "uriDescs" : [ { + "fieldName" : "child", + "fieldExtractor" : "ResolvedTableURIExtractor", + "isInput" : false + } ] }, { "classname" : "org.apache.spark.sql.catalyst.plans.logical.AddPartitions", "tableDescs" : [ { @@ -42,7 +46,11 @@ } ], "opType" : "ALTERTABLE_ADDCOLS", "queryDescs" : [ ], - "uriDescs" : [ ] + "uriDescs" : [ { + "fieldName" : "child", + "fieldExtractor" : "ResolvedTableURIExtractor", + "isInput" : false + } ] }, { "classname" : "org.apache.spark.sql.catalyst.plans.logical.AlterTable", "tableDescs" : [ { @@ -57,7 +65,11 @@ } ], "opType" : "ALTERTABLE_PROPERTIES", "queryDescs" : [ ], - "uriDescs" : [ ] + "uriDescs" : [ { + "fieldName" : "ident", + "fieldExtractor" : "IdentifierURIExtractor", + "isInput" : false + } ] }, { "classname" : "org.apache.spark.sql.catalyst.plans.logical.AppendData", "tableDescs" : [ { @@ -286,7 +298,11 @@ } ], "opType" : "ALTERTABLE_ADDCOLS", "queryDescs" : [ ], - "uriDescs" : [ ] + "uriDescs" : [ { + "fieldName" : "child", + "fieldExtractor" : "ResolvedTableURIExtractor", + "isInput" : false + } ] }, { "classname" : "org.apache.spark.sql.catalyst.plans.logical.DropPartitions", "tableDescs" : [ { @@ -429,7 +445,11 @@ } ], "opType" : "ALTERTABLE_RENAMECOL", "queryDescs" : [ ], - "uriDescs" : [ ] + "uriDescs" : [ { + "fieldName" : "child", + "fieldExtractor" : "ResolvedTableURIExtractor", + "isInput" : false + } ] }, { "classname" : "org.apache.spark.sql.catalyst.plans.logical.RenamePartitions", "tableDescs" : [ { @@ -474,7 +494,11 @@ } ], "opType" : "ALTERTABLE_REPLACECOLS", "queryDescs" : [ ], - "uriDescs" : [ ] + "uriDescs" : [ { + "fieldName" : "child", + "fieldExtractor" : "ResolvedTableURIExtractor", + "isInput" : false + } ] }, { "classname" : "org.apache.spark.sql.catalyst.plans.logical.ReplaceData", "tableDescs" : [ { @@ -606,7 +630,11 @@ } ], "opType" : "ALTERTABLE_PROPERTIES", "queryDescs" : [ ], - "uriDescs" : [ ] + "uriDescs" : [ { + "fieldName" : "table", + "fieldExtractor" : "ResolvedTableURIExtractor", + "isInput" : false + } ] }, { "classname" : "org.apache.spark.sql.catalyst.plans.logical.ShowCreateTable", "tableDescs" : [ { diff --git a/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/gen/TableCommands.scala b/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/gen/TableCommands.scala index ee0345a8c26..aced937b9a6 100644 --- a/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/gen/TableCommands.scala +++ b/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/gen/TableCommands.scala @@ -39,7 +39,8 @@ object TableCommands extends CommandSpecs[TableCommandSpec] { val AlterTable = { val cmd = "org.apache.spark.sql.catalyst.plans.logical.AlterTable" val tableDesc = TableDesc("ident", classOf[IdentifierTableExtractor]) - TableCommandSpec(cmd, Seq(tableDesc), ALTERTABLE_PROPERTIES) + val uriDescs = Seq(UriDesc("ident", classOf[IdentifierURIExtractor])) + TableCommandSpec(cmd, Seq(tableDesc), ALTERTABLE_PROPERTIES, uriDescs = uriDescs) } val AlterTableAddColumns = { @@ -51,7 +52,8 @@ object TableCommands extends CommandSpecs[TableCommandSpec] { val AddColumns = { val cmd = "org.apache.spark.sql.catalyst.plans.logical.AddColumns" - TableCommandSpec(cmd, Seq(resolvedTableDesc), ALTERTABLE_ADDCOLS) + val uriDescs = Seq(UriDesc("child", classOf[ResolvedTableURIExtractor])) + TableCommandSpec(cmd, Seq(resolvedTableDesc), ALTERTABLE_ADDCOLS, uriDescs = uriDescs) } val AlterColumn = { @@ -66,12 +68,12 @@ object TableCommands extends CommandSpecs[TableCommandSpec] { val ReplaceColumns = { val cmd = "org.apache.spark.sql.catalyst.plans.logical.ReplaceColumns" - TableCommandSpec(cmd, Seq(resolvedTableDesc), ALTERTABLE_REPLACECOLS) + AddColumns.copy(classname = cmd, opType = ALTERTABLE_REPLACECOLS) } val RenameColumn = { val cmd = "org.apache.spark.sql.catalyst.plans.logical.RenameColumn" - TableCommandSpec(cmd, Seq(resolvedTableDesc), ALTERTABLE_RENAMECOL) + AddColumns.copy(classname = cmd, opType = ALTERTABLE_RENAMECOL) } val AlterTableAddPartition = { @@ -638,7 +640,8 @@ object TableCommands extends CommandSpecs[TableCommandSpec] { val SetTableProperties = { val cmd = "org.apache.spark.sql.catalyst.plans.logical.SetTableProperties" val tableDesc = TableDesc("table", classOf[ResolvedTableTableExtractor]) - TableCommandSpec(cmd, Seq(tableDesc), ALTERTABLE_PROPERTIES) + val uriDescs = Seq(UriDesc("table", classOf[ResolvedTableURIExtractor])) + TableCommandSpec(cmd, Seq(tableDesc), ALTERTABLE_PROPERTIES, uriDescs = uriDescs) } val AddArchivesCommand = { diff --git a/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/ranger/DeltaCatalogRangerSparkExtensionSuite.scala b/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/ranger/DeltaCatalogRangerSparkExtensionSuite.scala index c1dda69896a..1ce8ad6765f 100644 --- a/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/ranger/DeltaCatalogRangerSparkExtensionSuite.scala +++ b/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/ranger/DeltaCatalogRangerSparkExtensionSuite.scala @@ -25,7 +25,7 @@ import org.apache.kyuubi.plugin.spark.authz.AccessControlException import org.apache.kyuubi.plugin.spark.authz.RangerTestNamespace._ import org.apache.kyuubi.plugin.spark.authz.RangerTestUsers._ import org.apache.kyuubi.plugin.spark.authz.ranger.DeltaCatalogRangerSparkExtensionSuite._ -import org.apache.kyuubi.plugin.spark.authz.util.AuthZUtils.isSparkV32OrGreater +import org.apache.kyuubi.plugin.spark.authz.util.AuthZUtils.{isSparkV32OrGreater, isSparkV35OrGreater} import org.apache.kyuubi.tags.DeltaTest import org.apache.kyuubi.util.AssertionUtils._ @@ -41,6 +41,14 @@ class DeltaCatalogRangerSparkExtensionSuite extends RangerSparkExtensionSuite { val table1 = "table1_delta" val table2 = "table2_delta" + def propString(props: Map[String, String]): String = + if (props.isEmpty) "" + else { + props + .map { case (key, value) => s"'$key' = '$value'" } + .mkString("TBLPROPERTIES (", ",", ")") + } + def createTableSql(namespace: String, table: String): String = s""" |CREATE TABLE IF NOT EXISTS $namespace.$table ( @@ -53,7 +61,7 @@ class DeltaCatalogRangerSparkExtensionSuite extends RangerSparkExtensionSuite { |PARTITIONED BY (gender) |""".stripMargin - def createPathBasedTableSql(path: Path): String = + def createPathBasedTableSql(path: Path, props: Map[String, String] = Map.empty): String = s""" |CREATE TABLE IF NOT EXISTS delta.`$path` ( | id INT, @@ -63,6 +71,7 @@ class DeltaCatalogRangerSparkExtensionSuite extends RangerSparkExtensionSuite { |) |USING DELTA |PARTITIONED BY (gender) + |${propString(props)} |""".stripMargin override def withFixture(test: NoArgTest): Outcome = { @@ -468,6 +477,95 @@ class DeltaCatalogRangerSparkExtensionSuite extends RangerSparkExtensionSuite { doAs(admin, sql(vacuumTableSql2)) }) } + + test("alter path-based table set properties") { + withTempDir(path => { + doAs(admin, sql(createPathBasedTableSql(path))) + val setPropertiesSql = s"ALTER TABLE delta.`$path`" + + s" SET TBLPROPERTIES ('delta.appendOnly' = 'true')" + interceptEndsWith[AccessControlException]( + doAs(someone, sql(setPropertiesSql)))( + s"does not have [write] privilege on [[$path, $path/]]") + doAs(admin, sql(setPropertiesSql)) + }) + } + + test("alter path-based table add columns") { + withTempDir(path => { + doAs(admin, sql(createPathBasedTableSql(path))) + val addColumnsSql = s"ALTER TABLE delta.`$path` ADD COLUMNS (age int)" + interceptEndsWith[AccessControlException]( + doAs(someone, sql(addColumnsSql)))( + s"does not have [write] privilege on [[$path, $path/]]") + doAs(admin, sql(addColumnsSql)) + }) + } + + test("alter path-based table change column") { + withTempDir(path => { + doAs(admin, sql(createPathBasedTableSql(path))) + val changeColumnSql = s"ALTER TABLE delta.`$path`" + + s" CHANGE COLUMN gender gender STRING AFTER birthDate" + interceptEndsWith[AccessControlException]( + doAs(someone, sql(changeColumnSql)))( + s"does not have [write] privilege on [[$path, $path/]]") + doAs(admin, sql(changeColumnSql)) + }) + } + + test("alter path-based table drop column") { + assume( + isSparkV32OrGreater, + "alter table drop column is available in Delta Lake 1.2.0 and above") + + withTempDir(path => { + doAs(admin, sql(createPathBasedTableSql(path, Map("delta.columnMapping.mode" -> "name")))) + val dropColumnSql = s"ALTER TABLE delta.`$path` DROP COLUMN birthDate" + interceptEndsWith[AccessControlException]( + doAs(someone, sql(dropColumnSql)))( + s"does not have [write] privilege on [[$path, $path/]]") + doAs(admin, sql(dropColumnSql)) + }) + } + + test("alter path-based table rename column") { + assume( + isSparkV32OrGreater, + "alter table rename column is available in Delta Lake 1.2.0 and above") + + withTempDir(path => { + doAs(admin, sql(createPathBasedTableSql(path, Map("delta.columnMapping.mode" -> "name")))) + val renameColumnSql = s"ALTER TABLE delta.`$path`" + + s" RENAME COLUMN birthDate TO dateOfBirth" + interceptEndsWith[AccessControlException]( + doAs(someone, sql(renameColumnSql)))( + s"does not have [write] privilege on [[$path, $path/]]") + doAs(admin, sql(renameColumnSql)) + }) + } + + test("alter path-based table replace columns") { + withTempDir(path => { + assume( + isSparkV32OrGreater, + "alter table replace columns is not available in Delta Lake 1.0.1") + + doAs(admin, sql(createPathBasedTableSql(path, Map("delta.columnMapping.mode" -> "name")))) + val replaceColumnsSql = s"ALTER TABLE delta.`$path`" + + s" REPLACE COLUMNS (id INT, name STRING, gender STRING)" + interceptEndsWith[AccessControlException]( + doAs(someone, sql(replaceColumnsSql)))( + s"does not have [write] privilege on [[$path, $path/]]") + + // There was a bug before Delta Lake 3.0, it will throw AnalysisException message + // "Cannot drop column from a struct type with a single field: + // StructType(StructField(birthDate,TimestampType,true))". + // For details, see https://github.com/delta-io/delta/pull/1822 + if (isSparkV35OrGreater) { + doAs(admin, sql(replaceColumnsSql)) + } + }) + } } object DeltaCatalogRangerSparkExtensionSuite {