Skip to content

Commit

Permalink
[KYUUBI #5323] [AUTHZ] Drop Hive and Iceberg tables with PURGE option…
Browse files Browse the repository at this point in the history
… in tests

### _Why are the changes needed?_

- `DROP TABLE` for Iceberg tables only removes the table from catalog by default, which may contaminates other tests with same table
- Enable PURGE option for dropping Iceberg and Hive table
  - Iceberg Spark DDL `DROP TABLE ... PURGE`
  - To drop the table from the catalog and delete the table’s contents

### _How was this patch tested?_
- [ ] Add some test cases that check the changes thoroughly including negative and positive cases if possible

- [ ] Add screenshots for manual tests if appropriate

- [ ] [Run test](https://kyuubi.readthedocs.io/en/master/contributing/code/testing.html#running-tests) locally before make a pull request

### _Was this patch authored or co-authored using generative AI tooling?_

Closes #5323 from bowenliang123/iceberg-purge.

Closes #5323

ce4188d [Bowen Liang] purge

Authored-by: Bowen Liang <[email protected]>
Signed-off-by: Bowen Liang <[email protected]>
  • Loading branch information
bowenliang123 committed Oct 13, 2023
1 parent 74e52f6 commit 1b229b6
Show file tree
Hide file tree
Showing 6 changed files with 33 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import org.scalatest.Assertions._

import org.apache.kyuubi.Utils
import org.apache.kyuubi.plugin.spark.authz.RangerTestUsers._
import org.apache.kyuubi.plugin.spark.authz.V2JdbcTableCatalogPrivilegesBuilderSuite._
import org.apache.kyuubi.plugin.spark.authz.util.AuthZUtils._

trait SparkSessionProvider {
Expand Down Expand Up @@ -79,7 +80,15 @@ trait SparkSessionProvider {
f
} finally {
res.foreach {
case (t, "table") => doAs(admin, sql(s"DROP TABLE IF EXISTS $t"))
case (t, "table") => doAs(
admin, {
val purgeOption =
if (isSparkV32OrGreater && isCatalogSupportPurge(
spark.sessionState.catalogManager.currentCatalog.name())) {
"PURGE"
} else ""
sql(s"DROP TABLE IF EXISTS $t $purgeOption")
})
case (db, "database") => doAs(admin, sql(s"DROP DATABASE IF EXISTS $db"))
case (fn, "function") => doAs(admin, sql(s"DROP FUNCTION IF EXISTS $fn"))
case (view, "view") => doAs(admin, sql(s"DROP VIEW IF EXISTS $view"))
Expand All @@ -96,4 +105,11 @@ trait SparkSessionProvider {
doAs(user, assert(sql(query).collect() === result))
}

private def isCatalogSupportPurge(catalogName: String): Boolean = {
val unsupportedCatalogs = Set(v2JdbcTableCatalogClassName)
spark.conf.getOption(s"spark.sql.catalog.$catalogName") match {
case Some(catalog) if !unsupportedCatalogs.contains(catalog) => true
case _ => false
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import scala.util.Try

import org.scalatest.Outcome

import org.apache.kyuubi.plugin.spark.authz.V2JdbcTableCatalogPrivilegesBuilderSuite._
import org.apache.kyuubi.plugin.spark.authz.serde._
import org.apache.kyuubi.util.AssertionUtils._

Expand All @@ -38,9 +39,7 @@ class V2JdbcTableCatalogPrivilegesBuilderSuite extends V2CommandsPrivilegesSuite
val jdbcUrl: String = s"$dbUrl;create=true"

override def beforeAll(): Unit = {
spark.conf.set(
s"spark.sql.catalog.$catalogV2",
"org.apache.spark.sql.execution.datasources.v2.jdbc.JDBCTableCatalog")
spark.conf.set(s"spark.sql.catalog.$catalogV2", v2JdbcTableCatalogClassName)
spark.conf.set(s"spark.sql.catalog.$catalogV2.url", jdbcUrl)
spark.conf.set(
s"spark.sql.catalog.$catalogV2.driver",
Expand Down Expand Up @@ -170,3 +169,8 @@ class V2JdbcTableCatalogPrivilegesBuilderSuite extends V2CommandsPrivilegesSuite
}
}
}

object V2JdbcTableCatalogPrivilegesBuilderSuite {
val v2JdbcTableCatalogClassName: String =
"org.apache.spark.sql.execution.datasources.v2.jdbc.JDBCTableCatalog"
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import scala.util.Try
import org.apache.kyuubi.plugin.spark.authz.AccessControlException
import org.apache.kyuubi.plugin.spark.authz.RangerTestNamespace._
import org.apache.kyuubi.plugin.spark.authz.RangerTestUsers._
import org.apache.kyuubi.plugin.spark.authz.V2JdbcTableCatalogPrivilegesBuilderSuite._
import org.apache.kyuubi.plugin.spark.authz.util.AuthZUtils._

/**
Expand All @@ -44,9 +45,7 @@ class V2JdbcTableCatalogRangerSparkExtensionSuite extends RangerSparkExtensionSu
val jdbcUrl: String = s"$dbUrl;create=true"

override def beforeAll(): Unit = {
spark.conf.set(
s"spark.sql.catalog.$catalogV2",
"org.apache.spark.sql.execution.datasources.v2.jdbc.JDBCTableCatalog")
spark.conf.set(s"spark.sql.catalog.$catalogV2", v2JdbcTableCatalogClassName)
spark.conf.set(s"spark.sql.catalog.$catalogV2.url", jdbcUrl)
spark.conf.set(
s"spark.sql.catalog.$catalogV2.driver",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,13 @@ import scala.util.Try
import org.apache.spark.SparkConf
import org.scalatest.Outcome

import org.apache.kyuubi.plugin.spark.authz.V2JdbcTableCatalogPrivilegesBuilderSuite._

class DataMaskingForJDBCV2Suite extends DataMaskingTestBase {
override protected val extraSparkConf: SparkConf = {
new SparkConf()
.set("spark.sql.defaultCatalog", "testcat")
.set(
"spark.sql.catalog.testcat",
"org.apache.spark.sql.execution.datasources.v2.jdbc.JDBCTableCatalog")
.set("spark.sql.catalog.testcat", v2JdbcTableCatalogClassName)
.set(s"spark.sql.catalog.testcat.url", "jdbc:derby:memory:testcat;create=true")
.set(
s"spark.sql.catalog.testcat.driver",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,13 @@ import scala.util.Try
import org.apache.spark.SparkConf
import org.scalatest.Outcome

import org.apache.kyuubi.plugin.spark.authz.V2JdbcTableCatalogPrivilegesBuilderSuite._

class RowFilteringForJDBCV2Suite extends RowFilteringTestBase {
override protected val extraSparkConf: SparkConf = {
new SparkConf()
.set("spark.sql.defaultCatalog", "testcat")
.set(
"spark.sql.catalog.testcat",
"org.apache.spark.sql.execution.datasources.v2.jdbc.JDBCTableCatalog")
.set("spark.sql.catalog.testcat", v2JdbcTableCatalogClassName)
.set(s"spark.sql.catalog.testcat.url", "jdbc:derby:memory:testcat;create=true")
.set(
s"spark.sql.catalog.testcat.driver",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ trait IcebergMetadataTests extends HiveJDBCTestHelper with IcebergSuiteMixin wit
}
assert(!rs1.next())
} finally {
statement.execute(s"DROP TABLE IF EXISTS $cg.$db.tbl")
statement.execute(s"DROP TABLE IF EXISTS $cg.$db.tbl PURGE")
}
}
}
Expand Down

0 comments on commit 1b229b6

Please sign in to comment.