Skip to content

Commit

Permalink
AuthZ should check hoodie procedures path resource privileges
Browse files Browse the repository at this point in the history
  • Loading branch information
yikf committed Jan 14, 2024
1 parent 3af7551 commit 14cfec1
Show file tree
Hide file tree
Showing 5 changed files with 150 additions and 75 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ org.apache.kyuubi.plugin.spark.authz.serde.BaseRelationFileIndexURIExtractor
org.apache.kyuubi.plugin.spark.authz.serde.CatalogStorageFormatURIExtractor
org.apache.kyuubi.plugin.spark.authz.serde.CatalogTableURIExtractor
org.apache.kyuubi.plugin.spark.authz.serde.DataSourceV2RelationURIExtractor
org.apache.kyuubi.plugin.spark.authz.serde.HudiCallProcedureInputUriExtractor
org.apache.kyuubi.plugin.spark.authz.serde.HudiCallProcedureOutputUriExtractor
org.apache.kyuubi.plugin.spark.authz.serde.IdentifierURIExtractor
org.apache.kyuubi.plugin.spark.authz.serde.PartitionLocsSeqURIExtractor
org.apache.kyuubi.plugin.spark.authz.serde.PropertiesLocationUriExtractor
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2033,7 +2033,17 @@
} ],
"opType" : "QUERY",
"queryDescs" : [ ],
"uriDescs" : [ ]
"uriDescs" : [ {
"fieldName" : "clone",
"fieldExtractor" : "HudiCallProcedureInputUriExtractor",
"isInput" : true,
"comment" : "Hudi"
}, {
"fieldName" : "clone",
"fieldExtractor" : "HudiCallProcedureOutputUriExtractor",
"isInput" : false,
"comment" : "Hudi"
} ]
}, {
"classname" : "org.apache.spark.sql.hudi.command.CompactionHoodiePathCommand",
"tableDescs" : [ ],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -311,7 +311,7 @@ class HudiMergeIntoTargetTableExtractor extends TableExtractor {
}
}

abstract class HudiCallProcedureTableExtractor extends TableExtractor {
trait HudiCallProcedureExtractor {

protected def extractTableIdentifier(
procedure: AnyRef,
Expand All @@ -334,204 +334,236 @@ abstract class HudiCallProcedureTableExtractor extends TableExtractor {
}
}

case class ProcedureArgsInputOutputPair(
input: Option[String] = None,
output: Option[String] = None)
case class ProcedureArgsInputOutputTuple(
inputTable: Option[String] = None,
outputTable: Option[String] = None,
inputPath: Option[String] = None,
outputPath: Option[String] = None)

protected val PROCEDURE_CLASS_PATH = "org.apache.spark.sql.hudi.command.procedures"

protected val INVALID_INDEX = -1

// These pairs are used to get the procedure input/output args which user passed in call command.
protected val procedureArgsInputOutputPairs: Map[String, ProcedureArgsInputOutputPair] = Map(
protected val procedureArgsInputOutputPairs: Map[String, ProcedureArgsInputOutputTuple] = Map(
(
s"$PROCEDURE_CLASS_PATH.ArchiveCommitsProcedure",
ProcedureArgsInputOutputPair(output = Some("table"))),
ProcedureArgsInputOutputTuple(outputTable = Some("table"))),
(
s"$PROCEDURE_CLASS_PATH.CommitsCompareProcedure",
ProcedureArgsInputOutputPair(input = Some("table"))),
ProcedureArgsInputOutputTuple(inputTable = Some("table"))),
(
s"$PROCEDURE_CLASS_PATH.CopyToTableProcedure",
ProcedureArgsInputOutputPair(
input = Some("table"),
output = Some("new_table"))),
ProcedureArgsInputOutputTuple(
inputTable = Some("table"),
outputTable = Some("new_table"))),
(
s"$PROCEDURE_CLASS_PATH.CopyToTempViewProcedure",
ProcedureArgsInputOutputPair(input = Some("table"))),
ProcedureArgsInputOutputTuple(inputTable = Some("table"))),
(
s"$PROCEDURE_CLASS_PATH.CreateMetadataTableProcedure",
ProcedureArgsInputOutputPair(output = Some("table"))),
ProcedureArgsInputOutputTuple(outputTable = Some("table"))),
(
s"$PROCEDURE_CLASS_PATH.CreateSavepointProcedure",
ProcedureArgsInputOutputPair(output = Some("table"))),
ProcedureArgsInputOutputTuple(outputTable = Some("table"))),
(
s"$PROCEDURE_CLASS_PATH.DeleteMarkerProcedure",
ProcedureArgsInputOutputPair(output = Some("table"))),
ProcedureArgsInputOutputTuple(outputTable = Some("table"))),
(
s"$PROCEDURE_CLASS_PATH.DeleteMetadataTableProcedure",
ProcedureArgsInputOutputPair(output = Some("table"))),
ProcedureArgsInputOutputTuple(outputTable = Some("table"))),
(
s"$PROCEDURE_CLASS_PATH.DeleteSavepointProcedure",
ProcedureArgsInputOutputPair(output = Some("table"))),
ProcedureArgsInputOutputTuple(outputTable = Some("table"))),
(
s"$PROCEDURE_CLASS_PATH.ExportInstantsProcedure",
ProcedureArgsInputOutputPair(input = Some("table"))),
ProcedureArgsInputOutputTuple(inputTable = Some("table"))),
(
s"$PROCEDURE_CLASS_PATH.HdfsParquetImportProcedure",
ProcedureArgsInputOutputPair(output = Some("table"))),
ProcedureArgsInputOutputTuple(outputTable = Some("table"))),
(
s"$PROCEDURE_CLASS_PATH.HelpProcedure",
ProcedureArgsInputOutputPair()),
ProcedureArgsInputOutputTuple()),
(
s"$PROCEDURE_CLASS_PATH.HiveSyncProcedure",
ProcedureArgsInputOutputPair(input = Some("table"))),
ProcedureArgsInputOutputTuple(inputTable = Some("table"))),
(
s"$PROCEDURE_CLASS_PATH.InitMetadataTableProcedure",
ProcedureArgsInputOutputPair(output = Some("table"))),
ProcedureArgsInputOutputTuple(outputTable = Some("table"))),
(
s"$PROCEDURE_CLASS_PATH.RepairAddpartitionmetaProcedure",
ProcedureArgsInputOutputPair(output = Some("table"))),
ProcedureArgsInputOutputTuple(outputTable = Some("table"))),
(
s"$PROCEDURE_CLASS_PATH.RepairCorruptedCleanFilesProcedure",
ProcedureArgsInputOutputPair(output = Some("table"))),
ProcedureArgsInputOutputTuple(outputTable = Some("table"))),
(
s"$PROCEDURE_CLASS_PATH.RepairDeduplicateProcedure",
ProcedureArgsInputOutputPair(output = Some("table"))),
ProcedureArgsInputOutputTuple(outputTable = Some("table"))),
(
s"$PROCEDURE_CLASS_PATH.RepairMigratePartitionMetaProcedure",
ProcedureArgsInputOutputPair(output = Some("table"))),
ProcedureArgsInputOutputTuple(outputTable = Some("table"))),
(
s"$PROCEDURE_CLASS_PATH.RepairOverwriteHoodiePropsProcedure",
ProcedureArgsInputOutputPair(output = Some("table"))),
ProcedureArgsInputOutputTuple(outputTable = Some("table"))),
(
s"$PROCEDURE_CLASS_PATH.RollbackToInstantTimeProcedure",
ProcedureArgsInputOutputPair(output = Some("table"))),
ProcedureArgsInputOutputTuple(outputTable = Some("table"))),
(
s"$PROCEDURE_CLASS_PATH.RollbackToSavepointProcedure",
ProcedureArgsInputOutputPair(output = Some("table"))),
ProcedureArgsInputOutputTuple(outputTable = Some("table"))),
(
s"$PROCEDURE_CLASS_PATH.RunBootstrapProcedure",
ProcedureArgsInputOutputPair(
input = Some("table"),
output = Some("table"))),
ProcedureArgsInputOutputTuple(
inputTable = Some("table"),
outputTable = Some("table"))),
(
s"$PROCEDURE_CLASS_PATH.RunCleanProcedure",
ProcedureArgsInputOutputPair(
input = Some("table"),
output = Some("table"))),
ProcedureArgsInputOutputTuple(
inputTable = Some("table"),
outputTable = Some("table"))),
(
s"$PROCEDURE_CLASS_PATH.RunClusteringProcedure",
ProcedureArgsInputOutputPair(
input = Some("table"),
output = Some("table"))),
ProcedureArgsInputOutputTuple(
inputTable = Some("table"),
outputTable = Some("table"),
outputPath = Some("path"))),
(
s"$PROCEDURE_CLASS_PATH.RunCompactionProcedure",
ProcedureArgsInputOutputPair(
input = Some("table"),
output = Some("table"))),
ProcedureArgsInputOutputTuple(
inputTable = Some("table"),
outputTable = Some("table"),
outputPath = Some("path"))),
(
s"$PROCEDURE_CLASS_PATH.ShowArchivedCommitsProcedure",
ProcedureArgsInputOutputPair(input = Some("table"))),
ProcedureArgsInputOutputTuple(inputTable = Some("table"))),
(
s"$PROCEDURE_CLASS_PATH.ShowBootstrapMappingProcedure",
ProcedureArgsInputOutputPair(input = Some("table"))),
ProcedureArgsInputOutputTuple(inputTable = Some("table"))),
(
s"$PROCEDURE_CLASS_PATH.ShowClusteringProcedure",
ProcedureArgsInputOutputPair(input = Some("table"))),
ProcedureArgsInputOutputTuple(inputTable = Some("table"), inputPath = Some("path"))),
(
s"$PROCEDURE_CLASS_PATH.ShowCommitsProcedure",
ProcedureArgsInputOutputPair(input = Some("table"))),
ProcedureArgsInputOutputTuple(inputTable = Some("table"))),
(
s"$PROCEDURE_CLASS_PATH.ShowCommitExtraMetadataProcedure",
ProcedureArgsInputOutputPair(input = Some("table"))),
ProcedureArgsInputOutputTuple(inputTable = Some("table"))),
(
s"$PROCEDURE_CLASS_PATH.ShowCommitFilesProcedure",
ProcedureArgsInputOutputPair(input = Some("table"))),
ProcedureArgsInputOutputTuple(inputTable = Some("table"))),
(
s"$PROCEDURE_CLASS_PATH.ShowCommitPartitionsProcedure",
ProcedureArgsInputOutputPair(input = Some("table"))),
ProcedureArgsInputOutputTuple(inputTable = Some("table"))),
(
s"$PROCEDURE_CLASS_PATH.ShowCommitWriteStatsProcedure",
ProcedureArgsInputOutputPair(input = Some("table"))),
ProcedureArgsInputOutputTuple(inputTable = Some("table"))),
(
s"$PROCEDURE_CLASS_PATH.ShowCompactionProcedure",
ProcedureArgsInputOutputPair(input = Some("table"))),
ProcedureArgsInputOutputTuple(inputTable = Some("table"), inputPath = Some("path"))),
(
s"$PROCEDURE_CLASS_PATH.ShowFileSystemViewProcedure",
ProcedureArgsInputOutputPair(input = Some("table"))),
ProcedureArgsInputOutputTuple(inputTable = Some("table"))),
(
s"$PROCEDURE_CLASS_PATH.ShowFsPathDetailProcedure",
ProcedureArgsInputOutputPair()),
ProcedureArgsInputOutputTuple()),
(
s"$PROCEDURE_CLASS_PATH.ShowHoodieLogFileMetadataProcedure",
ProcedureArgsInputOutputPair(input = Some("table"))),
ProcedureArgsInputOutputTuple(inputTable = Some("table"))),
(
s"$PROCEDURE_CLASS_PATH.ShowHoodieLogFileRecordsProcedure",
ProcedureArgsInputOutputPair(input = Some("table"))),
ProcedureArgsInputOutputTuple(inputTable = Some("table"))),
(
s"$PROCEDURE_CLASS_PATH.ShowInvalidParquetProcedure",
ProcedureArgsInputOutputPair()),
ProcedureArgsInputOutputTuple()),
(
s"$PROCEDURE_CLASS_PATH.ShowMetadataTableFilesProcedure",
ProcedureArgsInputOutputPair(input = Some("table"))),
ProcedureArgsInputOutputTuple(inputTable = Some("table"))),
(
s"$PROCEDURE_CLASS_PATH.ShowMetadataTablePartitionsProcedure",
ProcedureArgsInputOutputPair(input = Some("table"))),
ProcedureArgsInputOutputTuple(inputTable = Some("table"))),
(
s"$PROCEDURE_CLASS_PATH.ShowMetadataTableStatsProcedure",
ProcedureArgsInputOutputPair(input = Some("table"))),
ProcedureArgsInputOutputTuple(inputTable = Some("table"))),
(
s"$PROCEDURE_CLASS_PATH.ShowRollbacksProcedure",
ProcedureArgsInputOutputPair(input = Some("table"))),
ProcedureArgsInputOutputTuple(inputTable = Some("table"))),
(
s"$PROCEDURE_CLASS_PATH.ShowSavepointsProcedure",
ProcedureArgsInputOutputPair(input = Some("table"))),
ProcedureArgsInputOutputTuple(inputTable = Some("table"))),
(
s"$PROCEDURE_CLASS_PATH.ShowTablePropertiesProcedure",
ProcedureArgsInputOutputPair(input = Some("table"))),
ProcedureArgsInputOutputTuple(inputTable = Some("table"))),
(
s"$PROCEDURE_CLASS_PATH.StatsFileSizeProcedure",
ProcedureArgsInputOutputPair(input = Some("table"))),
ProcedureArgsInputOutputTuple(inputTable = Some("table"))),
(
s"$PROCEDURE_CLASS_PATH.StatsWriteAmplificationProcedure",
ProcedureArgsInputOutputPair(input = Some("table"))),
ProcedureArgsInputOutputTuple(inputTable = Some("table"))),
(
s"$PROCEDURE_CLASS_PATH.UpgradeOrDowngradeProcedure",
ProcedureArgsInputOutputPair(output = Some("table"))),
ProcedureArgsInputOutputTuple(outputTable = Some("table"))),
(
s"$PROCEDURE_CLASS_PATH.ValidateHoodieSyncProcedure",
ProcedureArgsInputOutputPair(
input = Some("src_table"),
output = Some("dst_table"))),
ProcedureArgsInputOutputTuple(
inputTable = Some("src_table"),
outputTable = Some("dst_table"))),
(
s"$PROCEDURE_CLASS_PATH.ValidateMetadataTableFilesProcedure",
ProcedureArgsInputOutputPair(input = Some("table"))))
ProcedureArgsInputOutputTuple(inputTable = Some("table"))))
}

class HudiCallProcedureOutputTableExtractor
extends HudiCallProcedureTableExtractor {
extends TableExtractor with HudiCallProcedureExtractor {
override def apply(spark: SparkSession, v1: AnyRef): Option[Table] = {
val procedure = invokeAs[AnyRef](v1, "procedure")
val args = invokeAs[AnyRef](v1, "args")
procedureArgsInputOutputPairs.get(procedure.getClass.getName)
.filter(_.output.isDefined)
.filter(_.outputTable.isDefined)
.map { argsPairs =>
val tableIdentifier = extractTableIdentifier(procedure, args, argsPairs.output.get)
val tableIdentifier = extractTableIdentifier(procedure, args, argsPairs.outputTable.get)
lookupExtractor[StringTableExtractor].apply(spark, tableIdentifier.get).orNull
}
}
}

class HudiCallProcedureInputTableExtractor
extends HudiCallProcedureTableExtractor {
extends TableExtractor with HudiCallProcedureExtractor {
override def apply(spark: SparkSession, v1: AnyRef): Option[Table] = {
val procedure = invokeAs[AnyRef](v1, "procedure")
val args = invokeAs[AnyRef](v1, "args")
procedureArgsInputOutputPairs.get(procedure.getClass.getName)
.filter(_.input.isDefined)
.filter(_.inputTable.isDefined)
.map { argsPairs =>
val tableIdentifier = extractTableIdentifier(procedure, args, argsPairs.input.get)
val tableIdentifier = extractTableIdentifier(procedure, args, argsPairs.inputTable.get)
lookupExtractor[StringTableExtractor].apply(spark, tableIdentifier.get).orNull
}
}
}

class HudiCallProcedureInputUriExtractor
extends URIExtractor with HudiCallProcedureExtractor {
override def apply(spark: SparkSession, v1: AnyRef): Seq[Uri] = {
val procedure = invokeAs[AnyRef](v1, "procedure")
val args = invokeAs[AnyRef](v1, "args")
procedureArgsInputOutputPairs.get(procedure.getClass.getName)
.filter(_.inputPath.isDefined)
.map { argsPairs =>
val tableIdentifier = extractTableIdentifier(procedure, args, argsPairs.inputPath.get)
lookupExtractor[StringURIExtractor].apply(spark, tableIdentifier.get)
}.getOrElse(Nil)
}
}

class HudiCallProcedureOutputUriExtractor
extends URIExtractor with HudiCallProcedureExtractor {
override def apply(spark: SparkSession, v1: AnyRef): Seq[Uri] = {
val procedure = invokeAs[AnyRef](v1, "procedure")
val args = invokeAs[AnyRef](v1, "args")
procedureArgsInputOutputPairs.get(procedure.getClass.getName)
.filter(_.outputPath.isDefined)
.map { argsPairs =>
val tableIdentifier = extractTableIdentifier(procedure, args, argsPairs.outputPath.get)
lookupExtractor[StringURIExtractor].apply(spark, tableIdentifier.get)
}.getOrElse(Nil)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -288,7 +288,14 @@ object HudiCommands extends CommandSpecs[TableCommandSpec] {
"clone",
classOf[HudiCallProcedureOutputTableExtractor],
actionTypeDesc = Some(ActionTypeDesc(actionType = Some(UPDATE))),
setCurrentDatabaseIfMissing = true)))
setCurrentDatabaseIfMissing = true)),
uriDescs = Seq(
UriDesc(
"clone",
classOf[HudiCallProcedureInputUriExtractor],
isInput = true,
comment = "Hudi"),
UriDesc("clone", classOf[HudiCallProcedureOutputUriExtractor], comment = "Hudi")))
}

override def specs: Seq[TableCommandSpec] = Seq(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -644,4 +644,28 @@ class HudiCatalogRangerSparkExtensionSuite extends RangerSparkExtensionSuite {
doAs(admin, sql(showCommitsSql))
}
}

test("ShowClusteringProcedure") {
val path = "hdfs://demo/test/hudi/path"
val showCommitsSql = s"CALL SHOW_CLUSTERING(path => '$path')"
interceptEndsWith[AccessControlException] {
doAs(someone, sql(showCommitsSql))
}(s"does not have [read] privilege on [[$path, $path/]]")
}

test("RunClusteringProcedure") {
val path = "hdfs://demo/test/hudi/path"
val showCommitsSql = s"CALL RUN_CLUSTERING(path => '$path')"
interceptEndsWith[AccessControlException] {
doAs(someone, sql(showCommitsSql))
}(s"does not have [write] privilege on [[$path, $path/]]")
}

test("RunCompactionProcedure") {
val path = "hdfs://demo/test/hudi/path"
val showCommitsSql = s"CALL RUN_COMPACTION(path => '$path')"
interceptEndsWith[AccessControlException] {
doAs(someone, sql(showCommitsSql))
}(s"does not have [write] privilege on [[$path, $path/]]")
}
}

0 comments on commit 14cfec1

Please sign in to comment.