Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

History truncation/validation support for writer features in DROP FEATURE command #3296

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion spark/src/main/resources/error/delta-error-classes.json
Original file line number Diff line number Diff line change
Expand Up @@ -948,7 +948,7 @@
},
"DELTA_FEATURE_DROP_HISTORY_TRUNCATION_NOT_ALLOWED" : {
"message" : [
"History truncation is only relevant for reader features."
"The particular feature does not require history truncation."
],
"sqlState" : "0AKDE"
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,22 @@ case class TestWriterFeaturePreDowngradeCommand(table: DeltaTableV2)
}
}

case class TestWriterWithHistoryValidationFeaturePreDowngradeCommand(table: DeltaTableV2)
extends PreDowngradeTableFeatureCommand
with DeltaLogging {
// To remove the feature we only need to remove the table property.
override def removeFeatureTracesIfNeeded(): Boolean = {
// Make sure feature data/metadata exist before proceeding.
if (TestRemovableWriterWithHistoryTruncationFeature.validateRemoval(table.initialSnapshot)) {
return false
}

val properties = Seq(TestRemovableWriterWithHistoryTruncationFeature.TABLE_PROP_KEY)
AlterTableUnsetPropertiesDeltaCommand(table, properties, ifExists = true).run(table.spark)
true
}
}

case class TestReaderWriterFeaturePreDowngradeCommand(table: DeltaTableV2)
extends PreDowngradeTableFeatureCommand
with DeltaLogging {
Expand Down
53 changes: 45 additions & 8 deletions spark/src/main/scala/org/apache/spark/sql/delta/TableFeature.scala
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ sealed trait FeatureAutomaticallyEnabledByMetadata { this: TableFeature =>

/**
* A trait indicating a feature can be removed. Classes that extend the trait need to
* implement the following three functions:
* implement the following four functions:
*
* a) preDowngradeCommand. This is where all required actions for removing the feature are
* implemented. For example, to remove the DVs feature we need to remove metadata config
Expand All @@ -171,13 +171,20 @@ sealed trait FeatureAutomaticallyEnabledByMetadata { this: TableFeature =>
* protocol downgrade is committed to the table. When the protocol downgrade txn conflicts,
* the validation is repeated against the winning txn snapshot. As soon as the protocol
* downgrade succeeds, all subsequent interleaved txns are aborted.
* The implementation should return true if there are no feature traces in the latest
* version. False otherwise.
*
* c) actionUsesFeature. For reader+writer features we check whether past versions contain any
* traces of the removed feature. This is achieved by calling [[actionUsesFeature]] for
* every action of every reachable commit version in the log. Note, a feature may leave traces
* in both data and metadata. Depending on the feature, we need to check several types of
* actions such as Metadata, AddFile, RemoveFile etc.
* Writer features should directly return false.
* c) requiresHistoryTruncation. It indicates whether the table history needs to be clear
* of all feature traces before downgrading the protocol. This is by default true
* for all reader+writer features and false for writer features.
* WARNING: Disabling [[requiresHistoryTruncation]] for relevant features could result to
* incorrect snapshot reconstruction.
*
* d) actionUsesFeature. For features that require history truncation we verify whether past
* versions contain any traces of the removed feature. This is achieved by calling
* [[actionUsesFeature]] for every action of every reachable commit version in the log.
* Note, a feature may leave traces in both data and metadata. Depending on the feature, we
* need to check several types of actions such as Metadata, AddFile, RemoveFile etc.
*
* WARNING: actionUsesFeature should not check Protocol actions for the feature being removed,
* because at the time actionUsesFeature is invoked the protocol downgrade did not happen yet.
Expand All @@ -191,6 +198,7 @@ sealed trait FeatureAutomaticallyEnabledByMetadata { this: TableFeature =>
sealed trait RemovableFeature { self: TableFeature =>
def preDowngradeCommand(table: DeltaTableV2): PreDowngradeTableFeatureCommand
def validateRemoval(snapshot: Snapshot): Boolean
def requiresHistoryTruncation: Boolean = isReaderWriterFeature
def actionUsesFeature(action: Action): Boolean

/**
Expand All @@ -216,7 +224,7 @@ sealed trait RemovableFeature { self: TableFeature =>
def historyContainsFeature(
spark: SparkSession,
downgradeTxnReadSnapshot: Snapshot): Boolean = {
require(isReaderWriterFeature)
require(requiresHistoryTruncation)
val deltaLog = downgradeTxnReadSnapshot.deltaLog
val earliestCheckpointVersion = deltaLog.findEarliestReliableCheckpoint.getOrElse(0L)
val toVersion = downgradeTxnReadSnapshot.version
Expand Down Expand Up @@ -356,6 +364,7 @@ object TableFeature {
TestReaderWriterMetadataNoAutoUpdateFeature,
TestRemovableWriterFeature,
TestRemovableWriterFeatureWithDependency,
TestRemovableWriterWithHistoryTruncationFeature,
TestRemovableLegacyWriterFeature,
TestRemovableReaderWriterFeature,
TestRemovableLegacyReaderWriterFeature,
Expand Down Expand Up @@ -1012,3 +1021,31 @@ object TestWriterFeatureWithTransitiveDependency

override def requiredFeatures: Set[TableFeature] = Set(TestFeatureWithDependency)
}

object TestRemovableWriterWithHistoryTruncationFeature
extends WriterFeature(name = "TestRemovableWriterWithHistoryTruncationFeature")
with FeatureAutomaticallyEnabledByMetadata
with RemovableFeature {

val TABLE_PROP_KEY = "_123TestRemovableWriterWithHistoryTruncationFeature321_"

override def metadataRequiresFeatureToBeEnabled(
metadata: Metadata,
spark: SparkSession): Boolean = {
metadata.configuration.get(TABLE_PROP_KEY).exists(_.toBoolean)
}

/** Make sure the property is not enabled on the table. */
override def validateRemoval(snapshot: Snapshot): Boolean =
!snapshot.metadata.configuration.get(TABLE_PROP_KEY).exists(_.toBoolean)

override def preDowngradeCommand(table: DeltaTableV2): PreDowngradeTableFeatureCommand =
TestWriterWithHistoryValidationFeaturePreDowngradeCommand(table)

override def actionUsesFeature(action: Action): Boolean = action match {
case m: Metadata => m.configuration.get(TABLE_PROP_KEY).exists(_.toBoolean)
case _ => false
}

override def requiresHistoryTruncation: Boolean = true
}
Original file line number Diff line number Diff line change
Expand Up @@ -303,7 +303,7 @@ case class AlterTableDropFeatureDeltaCommand(
throw DeltaErrors.dropTableFeatureFeatureNotSupportedByProtocol(featureName)
}

if (truncateHistory && !removableFeature.isReaderWriterFeature) {
if (truncateHistory && !removableFeature.requiresHistoryTruncation) {
throw DeltaErrors.tableFeatureDropHistoryTruncationNotAllowed()
}

Expand All @@ -320,11 +320,11 @@ case class AlterTableDropFeatureDeltaCommand(
//
// Note, for features that cannot be disabled we solely rely for correctness on
// validateRemoval.
val isReaderWriterFeature = removableFeature.isReaderWriterFeature
val requiresHistoryValidation = removableFeature.requiresHistoryTruncation
val startTimeNs = System.nanoTime()
val preDowngradeMadeChanges =
removableFeature.preDowngradeCommand(table).removeFeatureTracesIfNeeded()
if (isReaderWriterFeature) {
if (requiresHistoryValidation) {
// Generate a checkpoint after the cleanup that is based on commits that do not use
// the feature. This intends to help slow-moving tables to qualify for history truncation
// asap. The checkpoint is based on a new commit to avoid creating a checkpoint
Expand Down Expand Up @@ -357,7 +357,7 @@ case class AlterTableDropFeatureDeltaCommand(
// Note, if this txn conflicts, we check all winning commits for traces of the feature.
// Therefore, we do not need to check again for historical versions during conflict
// resolution.
if (isReaderWriterFeature) {
if (requiresHistoryValidation) {
// Clean up expired logs before checking history. This also makes sure there is no
// concurrent metadataCleanup during findEarliestReliableCheckpoint. Note, this
// cleanUpExpiredLogs call truncates the cutoff at a minute granularity.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1600,7 +1600,7 @@ trait DeltaErrorsSuiteBase
throw DeltaErrors.tableFeatureDropHistoryTruncationNotAllowed()
}
checkErrorMessage(e, Some("DELTA_FEATURE_DROP_HISTORY_TRUNCATION_NOT_ALLOWED"),
Some("0AKDE"), Some("History truncation is only relevant for reader features."))
Some("0AKDE"), Some("The particular feature does not require history truncation."))
}
{
val logRetention = DeltaConfigs.LOG_RETENTION
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3512,6 +3512,51 @@ trait DeltaProtocolVersionSuiteBase extends QueryTest
truncateHistory = truncateHistory)
}

for (truncateHistory <- BOOLEAN_DOMAIN)
test("Writer features that require history validation/truncation." +
s" - truncateHistory: $truncateHistory") {
withTempDir { dir =>
val clock = new ManualClock(System.currentTimeMillis())
val deltaLog = DeltaLog.forTable(spark, dir, clock)

createTableWithFeature(deltaLog,
TestRemovableWriterWithHistoryTruncationFeature,
TestRemovableWriterWithHistoryTruncationFeature.TABLE_PROP_KEY)

// Add some data.
spark.range(100).write.format("delta").mode("overwrite").save(dir.getCanonicalPath)

val e1 = intercept[DeltaTableFeatureException] {
AlterTableDropFeatureDeltaCommand(
DeltaTableV2(spark, deltaLog.dataPath),
TestRemovableWriterWithHistoryTruncationFeature.name).run(spark)
}
checkError(
exception = e1,
errorClass = "DELTA_FEATURE_DROP_WAIT_FOR_RETENTION_PERIOD",
parameters = Map(
"feature" -> TestRemovableWriterWithHistoryTruncationFeature.name,
"logRetentionPeriodKey" -> "delta.logRetentionDuration",
"logRetentionPeriod" -> "30 days",
"truncateHistoryLogRetentionPeriod" -> "24 hours"))

// Pretend retention period has passed.
val clockAdvanceMillis = if (truncateHistory) {
DeltaConfigs.getMilliSeconds(truncateHistoryDefaultLogRetention)
} else {
deltaLog.deltaRetentionMillis(deltaLog.update().metadata)
}
clock.advance(clockAdvanceMillis + TimeUnit.MINUTES.toMillis(5))

AlterTableDropFeatureDeltaCommand(
table = DeltaTableV2(spark, deltaLog.dataPath),
featureName = TestRemovableWriterWithHistoryTruncationFeature.name,
truncateHistory = truncateHistory).run(spark)

assert(deltaLog.update().protocol === Protocol(1, 1))
}
}

private def dropV2CheckpointsTableFeature(spark: SparkSession, log: DeltaLog): Unit = {
spark.sql(s"ALTER TABLE delta.`${log.dataPath}` DROP FEATURE " +
s"`${V2CheckpointTableFeature.name}`")
Expand Down
Loading