Skip to content

Commit

Permalink
improvements
Browse files Browse the repository at this point in the history
  • Loading branch information
andreaschat-db committed Dec 18, 2024
1 parent f577290 commit 075d86d
Show file tree
Hide file tree
Showing 6 changed files with 248 additions and 24 deletions.
40 changes: 37 additions & 3 deletions python/delta/tables.py
Original file line number Diff line number Diff line change
Expand Up @@ -594,14 +594,43 @@ def addFeatureSupport(self, featureName: str) -> None:
DeltaTable._verify_type_str(featureName, "featureName")
self._jdt.addFeatureSupport(featureName)

@since(3.4) # type: ignore[arg-type]
def dropFeatureSupport(self, featureName: str, truncateHistory: Optional[bool] = None) -> None:
"""
Modify the protocol to drop a supported feature. The operation always normalizes the
resulting protocol. Protocol normalization is the process of converting a table features
protocol to the weakest possible form. This primarily refers to converting a table features
protocol to a legacy protocol. A table features protocol can be represented with the legacy
representation only when the feature set of the former exactly matches a legacy protocol.
Normalization can also decrease the reader version of a table features protocol when it is
higher than necessary. For example:
(1, 7, None, {AppendOnly, Invariants, CheckConstraints}) -> (1, 3)
(3, 7, None, {RowTracking}) -> (1, 7, RowTracking)
The dropFeatureSupport method can be used as follows:
delta.tables.DeltaTable.dropFeatureSupport("rowTracking")
:param featureName: The name of the feature to drop.
:param truncateHistory: Optional value whether to truncate history. If not specified,
the history is not truncated.
:return: None.
"""
DeltaTable._verify_type_str(featureName, "featureName")
if truncateHistory is None:
self._jdt.dropFeatureSupport(featureName)
else:
DeltaTable._verify_type_bool(truncateHistory, "truncateHistory")
self._jdt.dropFeatureSupport(featureName, truncateHistory)

@since(1.2) # type: ignore[arg-type]
def restoreToVersion(self, version: int) -> DataFrame:
"""
Restore the DeltaTable to an older version of the table specified by version number.
Example::
io.delta.tables.DeltaTable.restoreToVersion(1)
delta.tables.DeltaTable.restoreToVersion(1)
:param version: target version of restored table
:return: Dataframe with metrics of restore operation.
Expand All @@ -622,8 +651,8 @@ def restoreToTimestamp(self, timestamp: str) -> DataFrame:
Example::
io.delta.tables.DeltaTable.restoreToTimestamp('2021-01-01')
io.delta.tables.DeltaTable.restoreToTimestamp('2021-01-01 01:01:01')
delta.tables.DeltaTable.restoreToTimestamp('2021-01-01')
delta.tables.DeltaTable.restoreToTimestamp('2021-01-01 01:01:01')
:param timestamp: target timestamp of restored table
:return: Dataframe with metrics of restore operation.
Expand Down Expand Up @@ -658,6 +687,11 @@ def optimize(self) -> "DeltaOptimizeBuilder":
jbuilder = self._jdt.optimize()
return DeltaOptimizeBuilder(self._spark, jbuilder)

@classmethod
def _verify_type_bool(self, variable, name):
if not isinstance(variable, bool) or variable is None:
raise ValueError("%s needs to be a boolean but got '%s'." % (name, type(variable)))

@staticmethod # type: ignore[arg-type]
def _verify_type_str(variable: str, name: str) -> None:
if not isinstance(variable, str) or variable is None:
Expand Down
85 changes: 72 additions & 13 deletions python/delta/tests/test_deltatable.py
Original file line number Diff line number Diff line change
Expand Up @@ -1151,16 +1151,19 @@ def test_delta_table_builder_with_bad_args(self) -> None:
with self.assertRaises(TypeError):
builder.property("1", 1) # type: ignore[arg-type]

def test_protocolUpgrade(self) -> None:
def __create_df_for_feature_tests(self) -> DeltaTable:
try:
self.spark.conf.set('spark.databricks.delta.minWriterVersion', '2')
self.spark.conf.set('spark.databricks.delta.minReaderVersion', '1')
self.spark.conf.set('spark.databricks.delta.minWriterVersion', '2')
self.__writeDeltaTable([('a', 1), ('b', 2), ('c', 3), ('d', 4)])
dt = DeltaTable.forPath(self.spark, self.tempFile)
dt.upgradeTableProtocol(1, 3)
return DeltaTable.forPath(self.spark, self.tempFile)
finally:
self.spark.conf.unset('spark.databricks.delta.minWriterVersion')
self.spark.conf.unset('spark.databricks.delta.minReaderVersion')
self.spark.conf.unset('spark.databricks.delta.minWriterVersion')

def test_protocolUpgrade(self) -> None:
dt = self.__create_df_for_feature_tests()
dt.upgradeTableProtocol(1, 3)

# cannot downgrade once upgraded
dt.upgradeTableProtocol(1, 2)
Expand Down Expand Up @@ -1189,14 +1192,7 @@ def test_protocolUpgrade(self) -> None:
dt.upgradeTableProtocol(1, {}) # type: ignore[arg-type]

def test_addFeatureSupport(self) -> None:
try:
self.spark.conf.set('spark.databricks.delta.minReaderVersion', '1')
self.spark.conf.set('spark.databricks.delta.minWriterVersion', '2')
self.__writeDeltaTable([('a', 1), ('b', 2), ('c', 3), ('d', 4)])
dt = DeltaTable.forPath(self.spark, self.tempFile)
finally:
self.spark.conf.unset('spark.databricks.delta.minReaderVersion')
self.spark.conf.unset('spark.databricks.delta.minWriterVersion')
dt = self.__create_df_for_feature_tests()

# bad args
with self.assertRaisesRegex(Py4JJavaError, "DELTA_UNSUPPORTED_FEATURES_IN_CONFIG"):
Expand Down Expand Up @@ -1224,6 +1220,69 @@ def test_addFeatureSupport(self) -> None:
self.assertEqual(sorted(dt_details["tableFeatures"]),
["appendOnly", "deletionVectors", "invariants"])

def test_dropFeatureSupport(self) -> None:
dt = self.__create_df_for_feature_tests()

dt.addFeatureSupport("testRemovableWriter")
dt_details = dt.detail().collect()[0].asDict()
self.assertTrue(dt_details["minReaderVersion"] == 1)
self.assertTrue(dt_details["minWriterVersion"] == 7, "Should upgrade to table features")
self.assertEqual(sorted(dt_details["tableFeatures"]),
["appendOnly", "invariants", "testRemovableWriter"])

# Attempt truncating the history when dropping a feature that is not required.
# This verifies the truncateHistory option was correctly passed.
with self.assertRaisesRegex(Exception,
"DELTA_FEATURE_DROP_HISTORY_TRUNCATION_NOT_ALLOWED"):
dt.dropFeatureSupport("testRemovableWriter", True)

dt.dropFeatureSupport("testRemovableWriter")
dt_details = dt.detail().collect()[0].asDict()
self.assertTrue(dt_details["minReaderVersion"] == 1)
self.assertTrue(dt_details["minWriterVersion"] == 2, "Should return to legacy protocol")

dt.addFeatureSupport("testRemovableReaderWriter")
dt_details = dt.detail().collect()[0].asDict()
self.assertTrue(dt_details["minReaderVersion"] == 3, "Should upgrade to table features")
self.assertTrue(dt_details["minWriterVersion"] == 7, "Should upgrade to table features")
self.assertEqual(sorted(dt_details["tableFeatures"]),
["appendOnly", "invariants", "testRemovableReaderWriter"])

dt.dropFeatureSupport("testRemovableReaderWriter")
dt_details = dt.detail().collect()[0].asDict()
self.assertTrue(dt_details["minReaderVersion"] == 1, "Should return to legacy protocol")
self.assertTrue(dt_details["minWriterVersion"] == 2, "Should return to legacy protocol")

# Try to drop an unsupported feature.
with self.assertRaisesRegex(Exception, "DELTA_FEATURE_DROP_UNSUPPORTED_CLIENT_FEATURE"):
dt.dropFeatureSupport("__invalid_feature__")

# Try to drop a feature that is not present in the protocol.
with self.assertRaisesRegex(Exception, "DELTA_FEATURE_DROP_FEATURE_NOT_PRESENT"):
dt.dropFeatureSupport("testRemovableReaderWriter")

# Try to drop a non-removable feature.
with self.assertRaisesRegex(Exception, "DELTA_FEATURE_DROP_NONREMOVABLE_FEATURE"):
dt.dropFeatureSupport("testReaderWriter")

with self.assertRaisesRegex(ValueError, "featureName needs to be a string"):
dt.dropFeatureSupport(12345) # type: ignore[arg-type]
with self.assertRaisesRegex(ValueError, "featureName needs to be a string"):
dt.dropFeatureSupport([12345]) # type: ignore[arg-type]
with self.assertRaisesRegex(ValueError, "featureName needs to be a string"):
dt.dropFeatureSupport({}) # type: ignore[arg-type]
with self.assertRaisesRegex(ValueError, "featureName needs to be a string"):
dt.dropFeatureSupport([]) # type: ignore[arg-type]

with self.assertRaisesRegex(ValueError, "truncateHistory needs to be a boolean"):
dt.dropFeatureSupport("testRemovableWriter", 12345) # type: ignore[arg-type]
with self.assertRaisesRegex(ValueError, "truncateHistory needs to be a boolean"):
dt.dropFeatureSupport("testRemovableWriter", [12345]) # type: ignore[arg-type]
with self.assertRaisesRegex(ValueError, "truncateHistory needs to be a boolean"):
dt.dropFeatureSupport("testRemovableWriter", {}) # type: ignore[arg-type]
with self.assertRaisesRegex(ValueError, "truncateHistory needs to be a boolean"):
dt.dropFeatureSupport("testRemovableWriter", []) # type: ignore[arg-type]

def test_restore_to_version(self) -> None:
self.__writeDeltaTable([('a', 1), ('b', 2)])
self.__overwriteDeltaTable([('a', 3), ('b', 2)],
Expand Down
71 changes: 69 additions & 2 deletions spark/src/main/scala/io/delta/tables/DeltaTable.scala
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,9 @@ import scala.collection.JavaConverters._

import org.apache.spark.sql.delta._
import org.apache.spark.sql.delta.DeltaTableUtils.withActiveSession
import org.apache.spark.sql.delta.actions.{Protocol, TableFeatureProtocolUtils}
import org.apache.spark.sql.delta.actions.TableFeatureProtocolUtils
import org.apache.spark.sql.delta.catalog.DeltaTableV2
import org.apache.spark.sql.delta.commands.AlterTableSetPropertiesDeltaCommand
import org.apache.spark.sql.delta.commands.{AlterTableDropFeatureDeltaCommand, AlterTableSetPropertiesDeltaCommand}
import org.apache.spark.sql.delta.sources.DeltaSQLConf
import io.delta.tables.execution._
import org.apache.hadoop.fs.Path
Expand Down Expand Up @@ -562,6 +562,73 @@ class DeltaTable private[tables](
toDataset(sparkSession, alterTableCmd)
}

private def executeDropFeature(featureName: String, truncateHistory: Option[Boolean]): Unit = {
val alterTableCmd = AlterTableDropFeatureDeltaCommand(
table = table,
featureName = featureName,
truncateHistory = truncateHistory.getOrElse(false))
toDataset(sparkSession, alterTableCmd)
}

/**
* Modify the protocol to drop a supported feature. The operation always normalizes the
* resulting protocol. Protocol normalization is the process of converting a table features
* protocol to the weakest possible form. This primarily refers to converting a table features
* protocol to a legacy protocol. A table features protocol can be represented with the legacy
* representation only when the feature set of the former exactly matches a legacy protocol.
* Normalization can also decrease the reader version of a table features protocol when it is
* higher than necessary. For example:
*
* (1, 7, None, {AppendOnly, Invariants, CheckConstraints}) -> (1, 3)
* (3, 7, None, {RowTracking}) -> (1, 7, RowTracking)
*
* The dropFeatureSupport method can be used as follows:
* {{{
* io.delta.tables.DeltaTable.dropFeatureSupport("rowTracking")
* }}}
*
* See online documentation for more details.
*
* @param featureName The name of the feature to drop.
* @param truncateHistory Whether to truncate history before downgrading the protocol.
* @return None.
* @since 3.4.0
*/
def dropFeatureSupport(
featureName: String,
truncateHistory: Boolean): Unit = withActiveSession(sparkSession) {
executeDropFeature(featureName, Some(truncateHistory))
}

/**
* Modify the protocol to drop a supported feature. The operation always normalizes the
* resulting protocol. Protocol normalization is the process of converting a table features
* protocol to the weakest possible form. This primarily refers to converting a table features
* protocol to a legacy protocol. A table features protocol can be represented with the legacy
* representation only when the feature set of the former exactly matches a legacy protocol.
* Normalization can also decrease the reader version of a table features protocol when it is
* higher than necessary. For example:
*
* (1, 7, None, {AppendOnly, Invariants, CheckConstraints}) -> (1, 3)
* (3, 7, None, {RowTracking}) -> (1, 7, RowTracking)
*
* The dropFeatureSupport method can be used as follows:
* {{{
* io.delta.tables.DeltaTable.dropFeatureSupport("rowTracking")
* }}}
*
* Note, this command will not truncate history.
*
* See online documentation for more details.
*
* @param featureName The name of the feature to drop.
* @return None.
* @since 3.4.0
*/
def dropFeatureSupport(featureName: String): Unit = withActiveSession(sparkSession) {
executeDropFeature(featureName, None)
}

/**
* Clone a DeltaTable to a given destination to mirror the existing table's data and metadata.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1044,7 +1044,7 @@ object TestReaderWriterMetadataAutoUpdateFeature
}
}

private[sql] object TestRemovableWriterFeature
object TestRemovableWriterFeature
extends WriterFeature(name = "testRemovableWriter")
with FeatureAutomaticallyEnabledByMetadata
with RemovableFeature {
Expand Down Expand Up @@ -1093,7 +1093,7 @@ private[sql] object TestRemovableWriterFeatureWithDependency
Set(TestRemovableReaderWriterFeature, TestRemovableWriterFeature)
}

private[sql] object TestRemovableReaderWriterFeature
object TestRemovableReaderWriterFeature
extends ReaderWriterFeature(name = "testRemovableReaderWriter")
with FeatureAutomaticallyEnabledByMetadata
with RemovableFeature {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -322,7 +322,7 @@ case class AlterTableDropFeatureDeltaCommand(
// Check whether the protocol contains the feature in either the writer features list or
// the reader+writer features list. Note, protocol needs to denormalized to allow dropping
// features from legacy protocols.
val protocol = table.initialSnapshot.protocol
val protocol = table.deltaLog.update().protocol
val protocolContainsFeatureName =
protocol.implicitlyAndExplicitlySupportedFeatures.map(_.name).contains(featureName)
if (!protocolContainsFeatureName) {
Expand Down
70 changes: 67 additions & 3 deletions spark/src/test/scala/io/delta/tables/DeltaTableSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import java.util.Locale
import scala.language.postfixOps

// scalastyle:off import.ordering.noEmptyLine
import org.apache.spark.sql.delta.{AppendOnlyTableFeature, DeltaIllegalArgumentException, DeltaLog, DeltaTableFeatureException, FakeFileSystem, InvariantsTableFeature, TestReaderWriterFeature, TestWriterFeature}
import org.apache.spark.sql.delta.{AppendOnlyTableFeature, DeltaIllegalArgumentException, DeltaLog, DeltaTableFeatureException, FakeFileSystem, InvariantsTableFeature, TestReaderWriterFeature, TestRemovableReaderWriterFeature, TestRemovableWriterFeature, TestWriterFeature}
import org.apache.spark.sql.delta.actions.{ Metadata, Protocol }
import org.apache.spark.sql.delta.storage.LocalLogStore
import org.apache.spark.sql.delta.test.DeltaSQLCommandTest
Expand Down Expand Up @@ -247,6 +247,7 @@ class DeltaTableHadoopOptionsSuite extends QueryTest
"cloneAtVersion",
"delete",
"detail",
"dropFeatureSupport",
"generate",
"history",
"merge",
Expand Down Expand Up @@ -631,8 +632,7 @@ class DeltaTableHadoopOptionsSuite extends QueryTest
}
}

test(
"addFeatureSupport - with filesystem options.") {
test("addFeatureSupport - with filesystem options.") {
withTempDir { dir =>
val path = fakeFileSystemPath(dir)
val fsOptions = fakeFileSystemOptions
Expand Down Expand Up @@ -670,6 +670,70 @@ class DeltaTableHadoopOptionsSuite extends QueryTest
}
}

test("dropFeatureSupport - with filesystem options.") {
withTempDir { dir =>
val path = fakeFileSystemPath(dir)
val fsOptions = fakeFileSystemOptions

// create a table with a default Protocol.
val testSchema = spark.range(1).schema
val log = DeltaLog.forTable(spark, new Path(path), fsOptions)
log.createLogDirectoriesIfNotExists()
log.store.write(
FileNames.unsafeDeltaFile(log.logPath, 0),
Iterator(Metadata(schemaString = testSchema.json).json, Protocol(1, 2).json),
overwrite = false,
log.newDeltaHadoopConf())
log.update()

// update the protocol to support a writer feature.
val table = DeltaTable.forPath(spark, path, fsOptions)
table.addFeatureSupport(TestRemovableWriterFeature.name)
assert(log.update().protocol === Protocol(1, 7).withFeatures(Seq(
AppendOnlyTableFeature,
InvariantsTableFeature,
TestRemovableWriterFeature)))

// Attempt truncating the history when dropping a feature that is not required.
// This verifies the truncateHistory option was correctly passed.
assert(intercept[DeltaTableFeatureException] {
table.dropFeatureSupport("testRemovableWriter", truncateHistory = true)
}.getErrorClass === "DELTA_FEATURE_DROP_HISTORY_TRUNCATION_NOT_ALLOWED")

// Drop feature.
table.dropFeatureSupport(TestRemovableWriterFeature.name)
// After dropping the feature we should return back to the original protocol.
assert(log.update().protocol === Protocol(1, 2))

table.addFeatureSupport(TestRemovableReaderWriterFeature.name)
assert(
log.update().protocol === Protocol(3, 7).withFeatures(Seq(
AppendOnlyTableFeature,
InvariantsTableFeature,
TestRemovableReaderWriterFeature)))

// Drop feature.
table.dropFeatureSupport(TestRemovableReaderWriterFeature.name)
// After dropping the feature we should return back to the original protocol.
assert(log.update().protocol === Protocol(1, 2))

// Try to drop an unsupported feature.
assert(intercept[DeltaTableFeatureException] {
table.dropFeatureSupport("__invalid_feature__")
}.getErrorClass === "DELTA_FEATURE_DROP_UNSUPPORTED_CLIENT_FEATURE")

// Try to drop a feature that is not present in the protocol.
assert(intercept[DeltaTableFeatureException] {
table.dropFeatureSupport(TestRemovableReaderWriterFeature.name)
}.getErrorClass === "DELTA_FEATURE_DROP_FEATURE_NOT_PRESENT")

// Try to drop a non-removable feature.
assert(intercept[DeltaTableFeatureException] {
table.dropFeatureSupport(TestReaderWriterFeature.name)
}.getErrorClass === "DELTA_FEATURE_DROP_NONREMOVABLE_FEATURE")
}
}

test("details - with filesystem options.") {
withTempDir{ dir =>
val path = fakeFileSystemPath(dir)
Expand Down

0 comments on commit 075d86d

Please sign in to comment.