Skip to content

Commit

Permalink
[KYUUBI apache#5964][BUG] Avoid check not fully optimized query for I…
Browse files Browse the repository at this point in the history
…nsertIntoDataSourceDirCommand and InsertIntoDataSourceCommand

# 🔍 Description
## Issue References 🔗

This pull request fixes apache#5964

## Describe Your Solution 🔧

InsertIntoDataSourceDirCommand and InsertIntoDataSourceCommand‘s query is not fully optimized, we direct check it's query will cause request privilege that we haven't used.
We can directly ignore the query's check. Since we will check it's generated plan. Still will request the correct privilege of the SQL

## Types of changes 🔖

- [x] Bugfix (non-breaking change which fixes an issue)
- [ ] New feature (non-breaking change which adds functionality)
- [ ] Breaking change (fix or feature that would cause existing functionality to change)

## Test Plan 🧪

#### Behavior Without This Pull Request ⚰️

#### Behavior With This Pull Request 🎉

#### Related Unit Tests

---

# Checklist 📝

- [ ] This patch was not authored or co-authored using [Generative Tooling](https://www.apache.org/legal/generative-tooling.html)

**Be nice. Be informative.**

Closes apache#5983 from AngersZhuuuu/KYUUBI-5964.

Closes apache#5964

1adcf8d [Angerszhuuuu] update
7204c9f [Angerszhuuuu] [KYUUBI-5964][BUG] Avoid check not fully optimized query for InsertIntoDataSourceDirCommand and InsertIntoDataSourceCommand

Authored-by: Angerszhuuuu <[email protected]>
Signed-off-by: Kent Yao <[email protected]>
  • Loading branch information
AngersZhuuuu authored and zhaohehuhu committed Mar 21, 2024
1 parent 5a7e79d commit ad5626b
Show file tree
Hide file tree
Showing 4 changed files with 20 additions and 52 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1398,11 +1398,7 @@
"classname" : "org.apache.spark.sql.execution.command.InsertIntoDataSourceDirCommand",
"tableDescs" : [ ],
"opType" : "QUERY",
"queryDescs" : [ {
"fieldName" : "query",
"fieldExtractor" : "LogicalPlanQueryExtractor",
"comment" : ""
} ],
"queryDescs" : [ ],
"uriDescs" : [ {
"fieldName" : "storage",
"fieldExtractor" : "CatalogStorageFormatURIExtractor",
Expand Down Expand Up @@ -1625,11 +1621,7 @@
"comment" : ""
} ],
"opType" : "QUERY",
"queryDescs" : [ {
"fieldName" : "query",
"fieldExtractor" : "LogicalPlanQueryExtractor",
"comment" : ""
} ],
"queryDescs" : [ ],
"uriDescs" : [ ]
}, {
"classname" : "org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1483,16 +1483,7 @@ class HiveCatalogPrivilegeBuilderSuite extends PrivilegesBuilderSuite {
.queryExecution.analyzed
val (in, out, operationType) = PrivilegesBuilder.build(plan, spark)
assert(operationType === QUERY)
assert(in.size === 1)
val po0 = in.head
assert(po0.actionType === PrivilegeObjectActionType.OTHER)
assert(po0.privilegeObjectType === PrivilegeObjectType.TABLE_OR_VIEW)
assertEqualsIgnoreCase(reusedDb)(po0.dbname)
assert(po0.objectName equalsIgnoreCase reusedPartTable.split("\\.").last)
assert(po0.columns === Seq("key", "value", "pid"))
checkTableOwner(po0)
val accessType0 = ranger.AccessType(po0, operationType, isInput = true)
assert(accessType0 === AccessType.SELECT)
assert(in.size === 0)

assert(out.size == 1)
val po1 = out.head
Expand Down Expand Up @@ -1534,18 +1525,7 @@ class HiveCatalogPrivilegeBuilderSuite extends PrivilegesBuilderSuite {
val plan = sql(sqlStr).queryExecution.analyzed
val (inputs, outputs, operationType) = PrivilegesBuilder.build(plan, spark)
assert(operationType === QUERY)
assert(inputs.size == 1)
inputs.foreach { po =>
assert(po.actionType === PrivilegeObjectActionType.OTHER)
assert(po.privilegeObjectType === PrivilegeObjectType.TABLE_OR_VIEW)
assert(po.catalog.isEmpty)
assertEqualsIgnoreCase(reusedDb)(po.dbname)
assertEqualsIgnoreCase(reusedTableShort)(po.objectName)
assert(po.columns === Seq("key", "value"))
checkTableOwner(po)
val accessType = ranger.AccessType(po, operationType, isInput = true)
assert(accessType === AccessType.SELECT)
}
assert(inputs.size === 0)

assert(outputs.size === 1)
outputs.foreach { po =>
Expand Down Expand Up @@ -1614,16 +1594,7 @@ class HiveCatalogPrivilegeBuilderSuite extends PrivilegesBuilderSuite {
.queryExecution.analyzed
val (in, out, operationType) = PrivilegesBuilder.build(plan, spark)
assert(operationType === QUERY)
assert(in.size === 1)
val po0 = in.head
assert(po0.actionType === PrivilegeObjectActionType.OTHER)
assert(po0.privilegeObjectType === PrivilegeObjectType.TABLE_OR_VIEW)
assertEqualsIgnoreCase(reusedDb)(po0.dbname)
assert(po0.objectName equalsIgnoreCase reusedPartTable.split("\\.").last)
assert(po0.columns === Seq("key", "value", "pid"))
checkTableOwner(po0)
val accessType0 = ranger.AccessType(po0, operationType, isInput = true)
assert(accessType0 === AccessType.SELECT)
assert(in.size === 0)

assert(out.size == 1)
val po1 = out.head
Expand All @@ -1639,6 +1610,7 @@ class HiveCatalogPrivilegeBuilderSuite extends PrivilegesBuilderSuite {
test("InsertIntoHiveDirCommand") {
val tableDirectory = getClass.getResource("/").getPath + "table_directory"
val directory = File(tableDirectory).createDirectory()
sql("set spark.sql.hive.convertMetastoreInsertDir=false")
val plan = sql(
s"""
|INSERT OVERWRITE DIRECTORY '${directory.path}'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -567,7 +567,7 @@ object TableCommands extends CommandSpecs[TableCommandSpec] {
"logicalRelation",
classOf[LogicalRelationTableExtractor],
actionTypeDesc = Some(actionTypeDesc))
TableCommandSpec(cmd, Seq(tableDesc), queryDescs = Seq(queryQueryDesc))
TableCommandSpec(cmd, Seq(tableDesc))
}

val InsertIntoHiveTable = {
Expand All @@ -585,9 +585,8 @@ object TableCommands extends CommandSpecs[TableCommandSpec] {

val InsertIntoDataSourceDir = {
val cmd = "org.apache.spark.sql.execution.command.InsertIntoDataSourceDirCommand"
val queryDesc = queryQueryDesc
val uriDesc = UriDesc("storage", classOf[CatalogStorageFormatURIExtractor])
TableCommandSpec(cmd, Nil, queryDescs = Seq(queryDesc), uriDescs = Seq(uriDesc))
TableCommandSpec(cmd, Nil, uriDescs = Seq(uriDesc))
}

val SaveIntoDataSourceCommand = {
Expand All @@ -610,6 +609,13 @@ object TableCommands extends CommandSpecs[TableCommandSpec] {
TableCommandSpec(cmd, Seq(tableDesc), queryDescs = Seq(queryDesc))
}

val InsertIntoHiveDirCommand = {
val cmd = "org.apache.spark.sql.hive.execution.InsertIntoHiveDirCommand"
val queryDesc = queryQueryDesc
val uriDesc = UriDesc("storage", classOf[CatalogStorageFormatURIExtractor])
TableCommandSpec(cmd, Nil, queryDescs = Seq(queryDesc), uriDescs = Seq(uriDesc))
}

val LoadData = {
val cmd = "org.apache.spark.sql.execution.command.LoadDataCommand"
val actionTypeDesc = overwriteActionTypeDesc.copy(fieldName = "isOverwrite")
Expand Down Expand Up @@ -723,8 +729,7 @@ object TableCommands extends CommandSpecs[TableCommandSpec] {
InsertIntoDataSourceDir,
SaveIntoDataSourceCommand,
InsertIntoHadoopFsRelationCommand,
InsertIntoDataSourceDir.copy(classname =
"org.apache.spark.sql.hive.execution.InsertIntoHiveDirCommand"),
InsertIntoHiveDirCommand,
InsertIntoHiveTable,
LoadData,
MergeIntoTable,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -757,7 +757,8 @@ class HiveCatalogRangerSparkExtensionSuite extends RangerSparkExtensionSuite {
s"""INSERT OVERWRITE DIRECTORY '/tmp/test_dir'
| USING parquet
| SELECT * FROM $db1.$table;""".stripMargin)))
assert(e.getMessage.contains(s"does not have [select] privilege on [$db1/$table/id]"))
assert(e.getMessage.contains(
s"does not have [write] privilege on [[/tmp/test_dir, /tmp/test_dir/]]"))
}
}

Expand Down Expand Up @@ -1080,8 +1081,7 @@ class HiveCatalogRangerSparkExtensionSuite extends RangerSparkExtensionSuite {
|INSERT OVERWRITE DIRECTORY '$path'
|USING parquet
|SELECT * FROM $db1.$table1""".stripMargin)))(
s"does not have [select] privilege on [$db1/$table1/id,$db1/$table1/scope], " +
s"[write] privilege on [[$path, $path/]]")
s"does not have [write] privilege on [[$path, $path/]]")
}
}
}
Expand Down Expand Up @@ -1122,8 +1122,7 @@ class HiveCatalogRangerSparkExtensionSuite extends RangerSparkExtensionSuite {
|INSERT OVERWRITE DIRECTORY '$path'
|USING parquet
|SELECT * FROM $db1.$table1""".stripMargin)))(
s"does not have [select] privilege on [$db1/$table1/id,$db1/$table1/scope], " +
s"[write] privilege on [[$path, $path/]]")
s"does not have [write] privilege on [[$path, $path/]]")

doAs(admin, sql(s"SELECT * FROM parquet.`$path`".stripMargin).explain(true))
interceptEndsWith[AccessControlException](
Expand Down

0 comments on commit ad5626b

Please sign in to comment.