Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[KYUUBI #5757][AUTHZ] Support alter path-based table for Delta Lake in Authz #5760

Merged
merged 1 commit into from
Nov 29, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -215,4 +215,4 @@
} ],
"opType" : "SWITCHDATABASE",
"uriDescs" : [ ]
} ]
} ]
Original file line number Diff line number Diff line change
Expand Up @@ -111,4 +111,4 @@
"comment" : ""
} ],
"opType" : "RELOADFUNCTION"
} ]
} ]
Original file line number Diff line number Diff line change
Expand Up @@ -111,4 +111,4 @@
"comment" : ""
} ],
"uriDescs" : [ ]
} ]
} ]
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,12 @@
} ],
"opType" : "ALTERTABLE_ADDCOLS",
"queryDescs" : [ ],
"uriDescs" : [ ]
"uriDescs" : [ {
"fieldName" : "child",
"fieldExtractor" : "ResolvedTableURIExtractor",
"isInput" : false,
"comment" : ""
} ]
}, {
"classname" : "org.apache.spark.sql.catalyst.plans.logical.AddPartitions",
"tableDescs" : [ {
Expand Down Expand Up @@ -45,7 +50,12 @@
} ],
"opType" : "ALTERTABLE_ADDCOLS",
"queryDescs" : [ ],
"uriDescs" : [ ]
"uriDescs" : [ {
"fieldName" : "child",
"fieldExtractor" : "ResolvedTableURIExtractor",
"isInput" : false,
"comment" : ""
} ]
}, {
"classname" : "org.apache.spark.sql.catalyst.plans.logical.AlterTable",
"tableDescs" : [ {
Expand All @@ -61,7 +71,12 @@
} ],
"opType" : "ALTERTABLE_PROPERTIES",
"queryDescs" : [ ],
"uriDescs" : [ ]
"uriDescs" : [ {
"fieldName" : "ident",
"fieldExtractor" : "IdentifierURIExtractor",
"isInput" : false,
"comment" : ""
} ]
}, {
"classname" : "org.apache.spark.sql.catalyst.plans.logical.AppendData",
"tableDescs" : [ {
Expand Down Expand Up @@ -319,7 +334,12 @@
} ],
"opType" : "ALTERTABLE_ADDCOLS",
"queryDescs" : [ ],
"uriDescs" : [ ]
"uriDescs" : [ {
"fieldName" : "child",
"fieldExtractor" : "ResolvedTableURIExtractor",
"isInput" : false,
"comment" : ""
} ]
}, {
"classname" : "org.apache.spark.sql.catalyst.plans.logical.DropPartitions",
"tableDescs" : [ {
Expand Down Expand Up @@ -478,7 +498,12 @@
} ],
"opType" : "ALTERTABLE_RENAMECOL",
"queryDescs" : [ ],
"uriDescs" : [ ]
"uriDescs" : [ {
"fieldName" : "child",
"fieldExtractor" : "ResolvedTableURIExtractor",
"isInput" : false,
"comment" : ""
} ]
}, {
"classname" : "org.apache.spark.sql.catalyst.plans.logical.RenamePartitions",
"tableDescs" : [ {
Expand Down Expand Up @@ -526,7 +551,12 @@
} ],
"opType" : "ALTERTABLE_REPLACECOLS",
"queryDescs" : [ ],
"uriDescs" : [ ]
"uriDescs" : [ {
"fieldName" : "child",
"fieldExtractor" : "ResolvedTableURIExtractor",
"isInput" : false,
"comment" : ""
} ]
}, {
"classname" : "org.apache.spark.sql.catalyst.plans.logical.ReplaceData",
"tableDescs" : [ {
Expand Down Expand Up @@ -676,7 +706,12 @@
} ],
"opType" : "ALTERTABLE_PROPERTIES",
"queryDescs" : [ ],
"uriDescs" : [ ]
"uriDescs" : [ {
"fieldName" : "table",
"fieldExtractor" : "ResolvedTableURIExtractor",
"isInput" : false,
"comment" : ""
} ]
}, {
"classname" : "org.apache.spark.sql.catalyst.plans.logical.ShowCreateTable",
"tableDescs" : [ {
Expand Down Expand Up @@ -2528,4 +2563,4 @@
"isInput" : false,
"comment" : "Delta"
} ]
} ]
} ]
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,8 @@ object TableCommands extends CommandSpecs[TableCommandSpec] {
val AlterTable = {
val cmd = "org.apache.spark.sql.catalyst.plans.logical.AlterTable"
val tableDesc = TableDesc("ident", classOf[IdentifierTableExtractor])
TableCommandSpec(cmd, Seq(tableDesc), ALTERTABLE_PROPERTIES)
val uriDescs = Seq(UriDesc("ident", classOf[IdentifierURIExtractor]))
TableCommandSpec(cmd, Seq(tableDesc), ALTERTABLE_PROPERTIES, uriDescs = uriDescs)
}

val AlterTableAddColumns = {
Expand All @@ -51,7 +52,8 @@ object TableCommands extends CommandSpecs[TableCommandSpec] {

val AddColumns = {
val cmd = "org.apache.spark.sql.catalyst.plans.logical.AddColumns"
TableCommandSpec(cmd, Seq(resolvedTableDesc), ALTERTABLE_ADDCOLS)
val uriDescs = Seq(UriDesc("child", classOf[ResolvedTableURIExtractor]))
TableCommandSpec(cmd, Seq(resolvedTableDesc), ALTERTABLE_ADDCOLS, uriDescs = uriDescs)
}

val AlterColumn = {
Expand All @@ -66,12 +68,12 @@ object TableCommands extends CommandSpecs[TableCommandSpec] {

val ReplaceColumns = {
val cmd = "org.apache.spark.sql.catalyst.plans.logical.ReplaceColumns"
TableCommandSpec(cmd, Seq(resolvedTableDesc), ALTERTABLE_REPLACECOLS)
AddColumns.copy(classname = cmd, opType = ALTERTABLE_REPLACECOLS)
}

val RenameColumn = {
val cmd = "org.apache.spark.sql.catalyst.plans.logical.RenameColumn"
TableCommandSpec(cmd, Seq(resolvedTableDesc), ALTERTABLE_RENAMECOL)
AddColumns.copy(classname = cmd, opType = ALTERTABLE_RENAMECOL)
}

val AlterTableAddPartition = {
Expand Down Expand Up @@ -638,7 +640,8 @@ object TableCommands extends CommandSpecs[TableCommandSpec] {
val SetTableProperties = {
val cmd = "org.apache.spark.sql.catalyst.plans.logical.SetTableProperties"
val tableDesc = TableDesc("table", classOf[ResolvedTableTableExtractor])
TableCommandSpec(cmd, Seq(tableDesc), ALTERTABLE_PROPERTIES)
val uriDescs = Seq(UriDesc("table", classOf[ResolvedTableURIExtractor]))
TableCommandSpec(cmd, Seq(tableDesc), ALTERTABLE_PROPERTIES, uriDescs = uriDescs)
}

val AddArchivesCommand = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import org.apache.kyuubi.plugin.spark.authz.AccessControlException
import org.apache.kyuubi.plugin.spark.authz.RangerTestNamespace._
import org.apache.kyuubi.plugin.spark.authz.RangerTestUsers._
import org.apache.kyuubi.plugin.spark.authz.ranger.DeltaCatalogRangerSparkExtensionSuite._
import org.apache.kyuubi.plugin.spark.authz.util.AuthZUtils.isSparkV32OrGreater
import org.apache.kyuubi.plugin.spark.authz.util.AuthZUtils.{isSparkV32OrGreater, isSparkV35OrGreater}
import org.apache.kyuubi.tags.DeltaTest
import org.apache.kyuubi.util.AssertionUtils._

Expand All @@ -41,6 +41,14 @@ class DeltaCatalogRangerSparkExtensionSuite extends RangerSparkExtensionSuite {
val table1 = "table1_delta"
val table2 = "table2_delta"

def propString(props: Map[String, String]): String =
if (props.isEmpty) ""
else {
props
.map { case (key, value) => s"'$key' = '$value'" }
.mkString("TBLPROPERTIES (", ",", ")")
}

def createTableSql(namespace: String, table: String): String =
s"""
|CREATE TABLE IF NOT EXISTS $namespace.$table (
Expand All @@ -53,7 +61,7 @@ class DeltaCatalogRangerSparkExtensionSuite extends RangerSparkExtensionSuite {
|PARTITIONED BY (gender)
|""".stripMargin

def createPathBasedTableSql(path: Path): String =
def createPathBasedTableSql(path: Path, props: Map[String, String] = Map.empty): String =
s"""
|CREATE TABLE IF NOT EXISTS delta.`$path` (
| id INT,
Expand All @@ -63,6 +71,7 @@ class DeltaCatalogRangerSparkExtensionSuite extends RangerSparkExtensionSuite {
|)
|USING DELTA
|PARTITIONED BY (gender)
|${propString(props)}
|""".stripMargin

override def withFixture(test: NoArgTest): Outcome = {
Expand Down Expand Up @@ -468,6 +477,95 @@ class DeltaCatalogRangerSparkExtensionSuite extends RangerSparkExtensionSuite {
doAs(admin, sql(vacuumTableSql2))
})
}

test("alter path-based table set properties") {
withTempDir(path => {
doAs(admin, sql(createPathBasedTableSql(path)))
val setPropertiesSql = s"ALTER TABLE delta.`$path`" +
s" SET TBLPROPERTIES ('delta.appendOnly' = 'true')"
interceptEndsWith[AccessControlException](
doAs(someone, sql(setPropertiesSql)))(
s"does not have [write] privilege on [[$path, $path/]]")
doAs(admin, sql(setPropertiesSql))
})
}

test("alter path-based table add columns") {
withTempDir(path => {
doAs(admin, sql(createPathBasedTableSql(path)))
val addColumnsSql = s"ALTER TABLE delta.`$path` ADD COLUMNS (age int)"
interceptEndsWith[AccessControlException](
doAs(someone, sql(addColumnsSql)))(
s"does not have [write] privilege on [[$path, $path/]]")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this behavior looks strange to me, add a column seems to have nothing to do with the path..

Copy link
Contributor Author

@zml1206 zml1206 Nov 24, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is how I understand it. The privileges of a path-based table are only read and write. alter add column modifies the schema of the table, so write privilege is required.

doAs(admin, sql(addColumnsSql))
})
}

test("alter path-based table change column") {
withTempDir(path => {
doAs(admin, sql(createPathBasedTableSql(path)))
val changeColumnSql = s"ALTER TABLE delta.`$path`" +
s" CHANGE COLUMN gender gender STRING AFTER birthDate"
interceptEndsWith[AccessControlException](
doAs(someone, sql(changeColumnSql)))(
s"does not have [write] privilege on [[$path, $path/]]")
doAs(admin, sql(changeColumnSql))
})
}

test("alter path-based table drop column") {
assume(
isSparkV32OrGreater,
"alter table drop column is available in Delta Lake 1.2.0 and above")

withTempDir(path => {
doAs(admin, sql(createPathBasedTableSql(path, Map("delta.columnMapping.mode" -> "name"))))
val dropColumnSql = s"ALTER TABLE delta.`$path` DROP COLUMN birthDate"
interceptEndsWith[AccessControlException](
doAs(someone, sql(dropColumnSql)))(
s"does not have [write] privilege on [[$path, $path/]]")
doAs(admin, sql(dropColumnSql))
})
}

test("alter path-based table rename column") {
assume(
isSparkV32OrGreater,
"alter table rename column is available in Delta Lake 1.2.0 and above")

withTempDir(path => {
doAs(admin, sql(createPathBasedTableSql(path, Map("delta.columnMapping.mode" -> "name"))))
val renameColumnSql = s"ALTER TABLE delta.`$path`" +
s" RENAME COLUMN birthDate TO dateOfBirth"
interceptEndsWith[AccessControlException](
doAs(someone, sql(renameColumnSql)))(
s"does not have [write] privilege on [[$path, $path/]]")
doAs(admin, sql(renameColumnSql))
})
}

test("alter path-based table replace columns") {
withTempDir(path => {
assume(
isSparkV32OrGreater,
"alter table replace columns is not available in Delta Lake 1.0.1")

doAs(admin, sql(createPathBasedTableSql(path, Map("delta.columnMapping.mode" -> "name"))))
val replaceColumnsSql = s"ALTER TABLE delta.`$path`" +
s" REPLACE COLUMNS (id INT, name STRING, gender STRING)"
interceptEndsWith[AccessControlException](
doAs(someone, sql(replaceColumnsSql)))(
s"does not have [write] privilege on [[$path, $path/]]")

// There was a bug before Delta Lake 3.0, it will throw AnalysisException message
// "Cannot drop column from a struct type with a single field:
// StructType(StructField(birthDate,TimestampType,true))".
// For details, see https://github.com/delta-io/delta/pull/1822
if (isSparkV35OrGreater) {
doAs(admin, sql(replaceColumnsSql))
}
})
}
}

object DeltaCatalogRangerSparkExtensionSuite {
Expand Down
Loading