Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[AUTHZ] Drop Hive and Iceberg tables with PURGE option in tests #5323

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import org.scalatest.Assertions._

import org.apache.kyuubi.Utils
import org.apache.kyuubi.plugin.spark.authz.RangerTestUsers._
import org.apache.kyuubi.plugin.spark.authz.V2JdbcTableCatalogPrivilegesBuilderSuite._
import org.apache.kyuubi.plugin.spark.authz.util.AuthZUtils._

trait SparkSessionProvider {
Expand Down Expand Up @@ -79,7 +80,15 @@ trait SparkSessionProvider {
f
} finally {
res.foreach {
case (t, "table") => doAs(admin, sql(s"DROP TABLE IF EXISTS $t"))
case (t, "table") => doAs(
admin, {
val purgeOption =
if (isSparkV32OrGreater && isCatalogSupportPurge(
spark.sessionState.catalogManager.currentCatalog.name())) {
"PURGE"
} else ""
sql(s"DROP TABLE IF EXISTS $t $purgeOption")
})
case (db, "database") => doAs(admin, sql(s"DROP DATABASE IF EXISTS $db"))
case (fn, "function") => doAs(admin, sql(s"DROP FUNCTION IF EXISTS $fn"))
case (view, "view") => doAs(admin, sql(s"DROP VIEW IF EXISTS $view"))
Expand All @@ -96,4 +105,11 @@ trait SparkSessionProvider {
doAs(user, assert(sql(query).collect() === result))
}

private def isCatalogSupportPurge(catalogName: String): Boolean = {
val unsupportedCatalogs = Set(v2JdbcTableCatalogClassName)
spark.conf.getOption(s"spark.sql.catalog.$catalogName") match {
case Some(catalog) if !unsupportedCatalogs.contains(catalog) => true
case _ => false
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import scala.util.Try

import org.scalatest.Outcome

import org.apache.kyuubi.plugin.spark.authz.V2JdbcTableCatalogPrivilegesBuilderSuite._
import org.apache.kyuubi.plugin.spark.authz.serde._
import org.apache.kyuubi.util.AssertionUtils._

Expand All @@ -38,9 +39,7 @@ class V2JdbcTableCatalogPrivilegesBuilderSuite extends V2CommandsPrivilegesSuite
val jdbcUrl: String = s"$dbUrl;create=true"

override def beforeAll(): Unit = {
spark.conf.set(
s"spark.sql.catalog.$catalogV2",
"org.apache.spark.sql.execution.datasources.v2.jdbc.JDBCTableCatalog")
spark.conf.set(s"spark.sql.catalog.$catalogV2", v2JdbcTableCatalogClassName)
spark.conf.set(s"spark.sql.catalog.$catalogV2.url", jdbcUrl)
spark.conf.set(
s"spark.sql.catalog.$catalogV2.driver",
Expand Down Expand Up @@ -170,3 +169,8 @@ class V2JdbcTableCatalogPrivilegesBuilderSuite extends V2CommandsPrivilegesSuite
}
}
}

object V2JdbcTableCatalogPrivilegesBuilderSuite {
val v2JdbcTableCatalogClassName: String =
"org.apache.spark.sql.execution.datasources.v2.jdbc.JDBCTableCatalog"
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import scala.util.Try
import org.apache.kyuubi.plugin.spark.authz.AccessControlException
import org.apache.kyuubi.plugin.spark.authz.RangerTestNamespace._
import org.apache.kyuubi.plugin.spark.authz.RangerTestUsers._
import org.apache.kyuubi.plugin.spark.authz.V2JdbcTableCatalogPrivilegesBuilderSuite._
import org.apache.kyuubi.plugin.spark.authz.util.AuthZUtils._

/**
Expand All @@ -44,9 +45,7 @@ class V2JdbcTableCatalogRangerSparkExtensionSuite extends RangerSparkExtensionSu
val jdbcUrl: String = s"$dbUrl;create=true"

override def beforeAll(): Unit = {
spark.conf.set(
s"spark.sql.catalog.$catalogV2",
"org.apache.spark.sql.execution.datasources.v2.jdbc.JDBCTableCatalog")
spark.conf.set(s"spark.sql.catalog.$catalogV2", v2JdbcTableCatalogClassName)
spark.conf.set(s"spark.sql.catalog.$catalogV2.url", jdbcUrl)
spark.conf.set(
s"spark.sql.catalog.$catalogV2.driver",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,13 @@ import scala.util.Try
import org.apache.spark.SparkConf
import org.scalatest.Outcome

import org.apache.kyuubi.plugin.spark.authz.V2JdbcTableCatalogPrivilegesBuilderSuite._

class DataMaskingForJDBCV2Suite extends DataMaskingTestBase {
override protected val extraSparkConf: SparkConf = {
new SparkConf()
.set("spark.sql.defaultCatalog", "testcat")
.set(
"spark.sql.catalog.testcat",
"org.apache.spark.sql.execution.datasources.v2.jdbc.JDBCTableCatalog")
.set("spark.sql.catalog.testcat", v2JdbcTableCatalogClassName)
.set(s"spark.sql.catalog.testcat.url", "jdbc:derby:memory:testcat;create=true")
.set(
s"spark.sql.catalog.testcat.driver",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,13 @@ import scala.util.Try
import org.apache.spark.SparkConf
import org.scalatest.Outcome

import org.apache.kyuubi.plugin.spark.authz.V2JdbcTableCatalogPrivilegesBuilderSuite._

class RowFilteringForJDBCV2Suite extends RowFilteringTestBase {
override protected val extraSparkConf: SparkConf = {
new SparkConf()
.set("spark.sql.defaultCatalog", "testcat")
.set(
"spark.sql.catalog.testcat",
"org.apache.spark.sql.execution.datasources.v2.jdbc.JDBCTableCatalog")
.set("spark.sql.catalog.testcat", v2JdbcTableCatalogClassName)
.set(s"spark.sql.catalog.testcat.url", "jdbc:derby:memory:testcat;create=true")
.set(
s"spark.sql.catalog.testcat.driver",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ trait IcebergMetadataTests extends HiveJDBCTestHelper with IcebergSuiteMixin wit
}
assert(!rs1.next())
} finally {
statement.execute(s"DROP TABLE IF EXISTS $cg.$db.tbl")
statement.execute(s"DROP TABLE IF EXISTS $cg.$db.tbl PURGE")
}
}
}
Expand Down