Skip to content

Commit

Permalink
Merge pull request #6 from arenadata/bugfix/ADH-5569
Browse files Browse the repository at this point in the history
[ADH-5569] (3.5.2) Force a purge during table/partition drop in case if the 'external.table.purge' table option is present
  • Loading branch information
Asmoday authored Jan 24, 2025
2 parents 2204984 + 753ee77 commit b69d269
Show file tree
Hide file tree
Showing 5 changed files with 35 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.aggregate._
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.util.{ArrayData, GenericArrayData}
import org.apache.spark.sql.connector.catalog.Table
import org.apache.spark.sql.errors.QueryCompilationErrors
import org.apache.spark.sql.execution.QueryExecution
import org.apache.spark.sql.execution.datasources.{DataSourceUtils, InMemoryFileIndex}
Expand Down Expand Up @@ -440,4 +441,15 @@ object CommandUtils extends Logging {
case NonFatal(e) => logWarning(s"Exception when attempting to uncache $name", e)
}
}

def isPurgeableExternalTable(table: CatalogTable): Boolean = {
table.properties.get("external.table.purge") match {
case Some(value) => value.toBoolean
case None => false
}
}

def isPurgeableExternalTable(table: Table): Boolean = {
Option(table.properties.get("external.table.purge")).exists(_.toBoolean)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ import org.apache.spark.sql.connector.catalog.CatalogManager.SESSION_CATALOG_NAM
import org.apache.spark.sql.connector.catalog.SupportsNamespaces._
import org.apache.spark.sql.errors.QueryCompilationErrors
import org.apache.spark.sql.errors.QueryExecutionErrors.hiveTableWithAnsiIntervalsError
import org.apache.spark.sql.execution.command.CommandUtils.isPurgeableExternalTable
import org.apache.spark.sql.execution.datasources.{DataSource, DataSourceUtils, FileFormat, HadoopFsRelation, LogicalRelation}
import org.apache.spark.sql.execution.datasources.v2.FileDataSourceV2
import org.apache.spark.sql.internal.{HiveSerDe, SQLConf}
Expand Down Expand Up @@ -224,9 +225,10 @@ case class DropTableCommand(
val catalog = sparkSession.sessionState.catalog

if (catalog.tableExists(tableName)) {
val table = catalog.getTableMetadata(tableName)
// If the command DROP VIEW is to drop a table or DROP TABLE is to drop a view
// issue an exception.
catalog.getTableMetadata(tableName).tableType match {
table.tableType match {
case CatalogTableType.VIEW if !isView =>
throw QueryCompilationErrors.wrongCommandForObjectTypeError(
operation = "DROP TABLE",
Expand All @@ -252,8 +254,10 @@ case class DropTableCommand(
} catch {
case NonFatal(e) => log.warn(e.toString, e)
}

catalog.refreshTable(tableName)
catalog.dropTable(tableName, ifExists, purge)
val effectivePurge = purge || isPurgeableExternalTable(table)
catalog.dropTable(tableName, ifExists, effectivePurge)
} else if (ifExists) {
// no-op
} else {
Expand Down Expand Up @@ -622,8 +626,9 @@ case class AlterTableDropPartitionCommand(
sparkSession.sessionState.conf.resolver)
}

val effectivePurge = purge || isPurgeableExternalTable(table)
catalog.dropPartitions(
table.identifier, normalizedSpecs, ignoreIfNotExists = ifExists, purge = purge,
table.identifier, normalizedSpecs, ignoreIfNotExists = ifExists, purge = effectivePurge,
retainData = retainData)

sparkSession.catalog.refreshTable(table.identifier.quotedString)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ import org.apache.spark.sql.catalyst.util.{escapeSingleQuotedString, quoteIfNeed
import org.apache.spark.sql.catalyst.util.ResolveDefaultColumns.CURRENT_DEFAULT_COLUMN_METADATA_KEY
import org.apache.spark.sql.connector.catalog.CatalogV2Implicits.TableIdentifierHelper
import org.apache.spark.sql.errors.{QueryCompilationErrors, QueryExecutionErrors}
import org.apache.spark.sql.execution.command.CommandUtils.isPurgeableExternalTable
import org.apache.spark.sql.execution.datasources.DataSource
import org.apache.spark.sql.execution.datasources.csv.CSVFileFormat
import org.apache.spark.sql.execution.datasources.json.JsonFileFormat
Expand Down Expand Up @@ -460,13 +461,8 @@ case class TruncateTableCommand(
val catalog = spark.sessionState.catalog
val table = catalog.getTableMetadata(tableName)
val tableIdentWithDB = table.identifier.quotedString
val purgeOption: Option[String] = table.properties.get("external.table.purge")
val purge: Boolean = purgeOption match {
case Some(value) => value.toBoolean
case None => false
}

if (table.tableType == CatalogTableType.EXTERNAL && !purge) {
if (table.tableType == CatalogTableType.EXTERNAL && !isPurgeableExternalTable(table)) {
throw QueryCompilationErrors.truncateTableOnExternalTablesError(tableIdentWithDB)
}
if (table.partitionColumnNames.isEmpty && partitionSpec.isDefined) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import org.apache.spark.sql.catalyst.analysis.{NoSuchPartitionsException, Resolv
import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.connector.catalog.{SupportsAtomicPartitionManagement, SupportsPartitionManagement}
import org.apache.spark.sql.errors.QueryExecutionErrors
import org.apache.spark.sql.execution.command.CommandUtils.isPurgeableExternalTable

/**
* Physical plan node for dropping partitions of table.
Expand All @@ -48,16 +49,20 @@ case class DropPartitionExec(
val isTableAltered = existsPartIdents match {
case Seq() => false // Nothing will be done
case Seq(partIdent) =>
if (purge) table.purgePartition(partIdent) else table.dropPartition(partIdent)
if (shouldPurge) table.purgePartition(partIdent) else table.dropPartition(partIdent)
case _ if table.isInstanceOf[SupportsAtomicPartitionManagement] =>
val idents = existsPartIdents.toArray
val atomicTable = table.asAtomicPartitionable
if (purge) atomicTable.purgePartitions(idents) else atomicTable.dropPartitions(idents)
if (shouldPurge) atomicTable.purgePartitions(idents) else atomicTable.dropPartitions(idents)
case _ =>
throw QueryExecutionErrors.cannotDropMultiPartitionsOnNonatomicPartitionTableError(
table.name())
}
if (isTableAltered) refreshCache()
Seq.empty
}

private def shouldPurge: Boolean = {
purge || isPurgeableExternalTable(table)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.connector.catalog.{Identifier, TableCatalog}
import org.apache.spark.sql.errors.QueryCompilationErrors
import org.apache.spark.sql.execution.command.CommandUtils.isPurgeableExternalTable

/**
* Physical plan node for dropping a table.
Expand All @@ -35,7 +36,11 @@ case class DropTableExec(
override def run(): Seq[InternalRow] = {
if (catalog.tableExists(ident)) {
invalidateCache()
if (purge) catalog.purgeTable(ident) else catalog.dropTable(ident)
if (purge || isPurgeableExternalTable(catalog.loadTable(ident))) {
catalog.purgeTable(ident)
} else {
catalog.dropTable(ident)
}
} else if (!ifExists) {
throw QueryCompilationErrors.noSuchTableError(
catalog.name() +: ident.namespace() :+ ident.name())
Expand Down

0 comments on commit b69d269

Please sign in to comment.