Skip to content

Commit

Permalink
Invalid legacy protocol normalization
Browse files Browse the repository at this point in the history
  • Loading branch information
andreaschat-db committed Jul 3, 2024
1 parent ce5e24e commit 469529a
Show file tree
Hide file tree
Showing 4 changed files with 41 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -281,7 +281,7 @@ trait TableFeatureSupport { this: Protocol =>
.withWriterFeatures(mergedWriterFeatures)
.withFeatures(mergedImplicitFeatures)

mergedProtocol.downgradeProtocolVersionsIfNeeded
mergedProtocol.denormalize.normalize
}

/**
Expand Down Expand Up @@ -325,7 +325,7 @@ trait TableFeatureSupport { this: Protocol =>
case f =>
throw DeltaErrors.dropTableFeatureNonRemovableFeature(f.name)
}
newProtocol.downgradeProtocolVersionsIfNeeded
newProtocol.normalize
}

/**
Expand All @@ -337,7 +337,7 @@ trait TableFeatureSupport { this: Protocol =>
* Note, when a table is initialized with table features (3, 7), by default there are no legacy
* features. After we remove the last native feature we downgrade the protocol to (1, 1).
*/
def downgradeProtocolVersionsIfNeeded: Protocol = {
def normalize: Protocol = {
if (!supportsWriterFeatures) return this

val (minReaderVersion, minWriterVersion) =
Expand All @@ -360,7 +360,7 @@ trait TableFeatureSupport { this: Protocol =>
def denormalize: Protocol = {
if (supportsWriterFeatures) return this

val (minReaderVersion, minWriterVersion) =
val (minReaderVersion, _) =
TableFeatureProtocolUtils.minimumRequiredVersions(implicitlySupportedFeatures.toSeq)

Protocol(minReaderVersion, TABLE_FEATURES_MIN_WRITER_VERSION)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -256,7 +256,8 @@ object Protocol {
minProtocolComponentsFromMetadata(spark, metadata)
Protocol(readerVersion, writerVersion)
.withFeatures(enabledFeatures)
.downgradeProtocolVersionsIfNeeded
.denormalize
.normalize
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,12 +119,29 @@ class DeltaProtocolTransitionsSuite
expectedProtocol = Protocol(1, 2))
}

test("CREATE TABLE invalid legacy protocols") {
/*
// , (1, 5)
for ((readerVersion, writerVersion) <- Seq((2, 1), (2, 2), (2, 3), (2, 4)))
test("Invalid legacy protocol normalization" +
s" - invalidProtocol($readerVersion, $writerVersion)") {

val expectedReaderVersion = 1
val expectedWriterVersion = Math.min(writerVersion, 4)

testProtocolTransition(
createTableProtocol = Some(Protocol(1, 5)),
expectedProtocol = Protocol(1, 4))
*/
createTableProtocol = Some(Protocol(readerVersion, writerVersion)),
expectedProtocol = Protocol(expectedReaderVersion, expectedWriterVersion))

withSQLConf(
DeltaSQLConf.DELTA_PROTOCOL_DEFAULT_READER_VERSION.key -> readerVersion.toString,
DeltaSQLConf.DELTA_PROTOCOL_DEFAULT_WRITER_VERSION.key -> writerVersion.toString) {
testProtocolTransition(
expectedProtocol = Protocol(expectedReaderVersion, expectedWriterVersion))

testProtocolTransition(
createTableProtocol = Some(Protocol(1, 1)),
alterTableProtocol = Some(Protocol(readerVersion, writerVersion)),
expectedProtocol = Protocol(expectedReaderVersion, expectedWriterVersion))
}
}

test("TABLE CREATION with enabled features by default") {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -767,31 +767,31 @@ trait DeltaProtocolVersionSuiteBase extends QueryTest

test("protocol downgrade is a no-op") {
withTempDir { path =>
val log = createTableWithProtocol(Protocol(2, 3), path)
assert(log.update().protocol === Protocol(2, 3))
val log = createTableWithProtocol(Protocol(2, 5), path)
assert(log.update().protocol === Protocol(2, 5))

{ // DeltaLog API. This API is internal-only and will fail when downgrade.

val e = intercept[ProtocolDowngradeException] {
log.upgradeProtocol(Protocol(1, 2))
}
assert(log.update().protocol == Protocol(2, 3))
assert(log.update().protocol == Protocol(2, 5))
assert(e.getErrorClass.contains("DELTA_INVALID_PROTOCOL_DOWNGRADE"))
}
{ // DeltaTable API
val table = io.delta.tables.DeltaTable.forPath(spark, path.getCanonicalPath)
val events = Log4jUsageLogger.track {
table.upgradeTableProtocol(1, 2)
}
assert(log.update().protocol == Protocol(2, 3))
assert(log.update().protocol == Protocol(2, 5))
assert(events.count(_.tags.get("opType").contains("delta.protocol.downgradeIgnored")) === 1)
}
{ // SQL API
val events = Log4jUsageLogger.track {
sql(s"ALTER TABLE delta.`${path.getCanonicalPath}` " +
"SET TBLPROPERTIES (delta.minWriterVersion = 2)")
}
assert(log.update().protocol == Protocol(2, 3))
assert(log.update().protocol == Protocol(2, 5))
assert(events.count(_.tags.get("opType").contains("delta.protocol.downgradeIgnored")) === 1)
}
}
Expand Down Expand Up @@ -1674,23 +1674,23 @@ trait DeltaProtocolVersionSuiteBase extends QueryTest

testAlterTable(
name = "downgrade reader version is a no-op",
tableProtocol = Protocol(2, 2),
tableProtocol = Protocol(2, 5),
props = Map(DeltaConfigs.MIN_READER_VERSION.key -> "1"),
expectedFinalProtocol = Some(Protocol(2, 2)))
expectedFinalProtocol = Some(Protocol(2, 5)))

testAlterTable(
name = "downgrade writer version is a no-op",
tableProtocol = Protocol(2, 2),
tableProtocol = Protocol(1, 3),
props = Map(DeltaConfigs.MIN_WRITER_VERSION.key -> "1"),
expectedFinalProtocol = Some(Protocol(2, 2)))
expectedFinalProtocol = Some(Protocol(1, 3)))

testAlterTable(
name = "downgrade both reader and versions version is a no-op",
tableProtocol = Protocol(2, 2),
tableProtocol = Protocol(2, 5),
props = Map(
DeltaConfigs.MIN_READER_VERSION.key -> "1",
DeltaConfigs.MIN_WRITER_VERSION.key -> "1"),
expectedFinalProtocol = Some(Protocol(2, 2)))
expectedFinalProtocol = Some(Protocol(2, 5)))

testAlterTable(
name = "downgrade reader but upgrade writer versions (legacy protocol)",
Expand All @@ -1711,12 +1711,12 @@ trait DeltaProtocolVersionSuiteBase extends QueryTest

testAlterTable(
name = "downgrade while enabling a feature will become an upgrade",
tableProtocol = Protocol(2, 2),
tableProtocol = Protocol(1, 2),
props = Map(
DeltaConfigs.MIN_READER_VERSION.key -> "1",
DeltaConfigs.MIN_WRITER_VERSION.key -> "1",
DeltaConfigs.CHANGE_DATA_FEED.key -> "true"),
expectedFinalProtocol = Some(Protocol(2, 4)))
expectedFinalProtocol = Some(Protocol(1, 4)))

testAlterTable(
"legacy protocol, legacy feature, metadata",
Expand Down

0 comments on commit 469529a

Please sign in to comment.