Skip to content

Commit

Permalink
Vacuum changes
Browse files Browse the repository at this point in the history
  • Loading branch information
sumeet-db committed Mar 7, 2024
1 parent 7b9c4ed commit 42f72ab
Show file tree
Hide file tree
Showing 2 changed files with 51 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -357,7 +357,8 @@ object TableFeature {
// Row IDs are still under development and only available in testing.
RowTrackingFeature,
InCommitTimestampTableFeature,
TypeWideningTableFeature)
TypeWideningTableFeature,
VacuumProtocolCheckTableFeature)
}
val featureMap = features.map(f => f.name.toLowerCase(Locale.ROOT) -> f).toMap
require(features.size == featureMap.size, "Lowercase feature names must not duplicate.")
Expand Down Expand Up @@ -656,6 +657,27 @@ object InCommitTimestampTableFeature
}
}

/**
* A ReaderWriter table feature for VACUUM. If this feature is enabled:
* A writer should follow one of the following:
* 1. Non-Support for Vacuum: Writers can explicitly state that they do not support VACUUM for
* any table, regardless of whether the Vacuum Protocol Check Table feature exists.
* 2. Implement Writer Protocol Check: Ensure that the VACUUM implementation includes a writer
* protocol check before any file deletions occur.
* Readers don't need to understand or change anything new; they just need to acknowledge the
* feature exists
*/
object VacuumProtocolCheckTableFeature
extends ReaderWriterFeature(name = "vacuum-protocol-check-dev")
with FeatureAutomaticallyEnabledByMetadata {

override def automaticallyUpdateProtocolOfExistingTables: Boolean = true

override def metadataRequiresFeatureToBeEnabled(
metadata: Metadata,
spark: SparkSession): Boolean = false
}

/**
* Features below are for testing only, and are being registered to the system only in the testing
* environment. See [[TableFeature.allSupportedFeaturesMap]] for the registration.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,8 @@ import org.apache.spark.sql.delta.sources.DeltaSQLConf
import org.apache.spark.sql.delta.test.DeltaSQLCommandTest
import org.apache.spark.sql.delta.test.DeltaTestImplicits._
import org.apache.spark.sql.delta.util.FileNames.deltaFile

import org.apache.spark.SparkConf

import org.apache.spark.sql.QueryTest
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.test.SharedSparkSession
Expand Down Expand Up @@ -459,6 +459,33 @@ class DeltaTableFeatureSuite
}
}

for(commandName <- Seq("ALTER", "CLONE", "REPLACE", "CREATE OR REPLACE")) {
test(s"Vacuum Protocol Check is disabled by default but can be enabled during $commandName") {
val table = "tbl"
withTable(table) {
spark.range(0).write.format("delta").saveAsTable(table)
val log = DeltaLog.forTable(spark, TableIdentifier(table))
val protocol = log.update().protocol
assert(!protocol.readerAndWriterFeatureNames.contains(VacuumProtocolCheckTableFeature.name))

val tblProperties1 = Seq(s"'delta.minWriterVersion' = $TABLE_FEATURES_MIN_WRITER_VERSION")
sql(buildTablePropertyModifyingCommand(
commandName, targetTableName = table, sourceTableName = table, tblProperties1))
val newProtocol1 = log.update().protocol
assert(!newProtocol1.readerAndWriterFeatureNames.contains(
VacuumProtocolCheckTableFeature.name))

val tblProperties2 = Seq(s"'$FEATURE_PROP_PREFIX${VacuumProtocolCheckTableFeature.name}' " +
s"= 'enabled', 'delta.minWriterVersion' = $TABLE_FEATURES_MIN_WRITER_VERSION")
sql(buildTablePropertyModifyingCommand(
commandName, targetTableName = table, sourceTableName = table, tblProperties2))
val newProtocol2 = log.update().protocol
assert(newProtocol2.readerAndWriterFeatureNames.contains(
VacuumProtocolCheckTableFeature.name))
}
}
}

private def buildTablePropertyModifyingCommand(
commandName: String,
targetTableName: String,
Expand Down

0 comments on commit 42f72ab

Please sign in to comment.