Skip to content

Commit

Permalink
[Spark] Drop feature support in DeltaTable Scala/Python APIs (#3952)
Browse files Browse the repository at this point in the history
<!--
Thanks for sending a pull request!  Here are some tips for you:
1. If this is your first time, please read our contributor guidelines:
https://github.com/delta-io/delta/blob/master/CONTRIBUTING.md
2. If the PR is unfinished, add '[WIP]' in your PR title, e.g., '[WIP]
Your PR title ...'.
  3. Be sure to keep the PR description updated to reflect all changes.
  4. Please write your PR title to summarize what this PR proposes.
5. If possible, provide a concise example to reproduce the issue for a
faster review.
6. If applicable, include the corresponding issue number in the PR title
and link it in the body.
-->

#### Which Delta project/connector is this regarding?
<!--
Please add the component selected below to the beginning of the pull
request title
For example: [Spark] Title of my pull request
-->

- [x] Spark
- [ ] Standalone
- [ ] Flink
- [ ] Kernel
- [ ] Other (fill in here)

## Description

<!--
- Describe what this PR changes.
- Describe why we need the change.
 
If this PR resolves an issue be sure to include "Resolves #XXX" to
correctly link and close the issue upon merge.
-->

This PR adds drop feature support in the DeltaTable API for both scala
and python APIs.


## How was this patch tested?

<!--
If tests were added, say they were added here. Please make sure to test
the changes thoroughly including negative and positive cases if
possible.
If the changes were tested in any way other than unit tests, please
clarify how you tested step by step (ideally copy and paste-able, so
that other reviewers can test and check, and descendants can verify in
the future).
If the changes were not tested, please explain why.
-->
Added UTs.

## Does this PR introduce _any_ user-facing changes?

<!--
If yes, please clarify the previous behavior and the change this PR
proposes - provide the console output, description and/or an example to
show the behavior difference if possible.
If possible, please also clarify if this is a user-facing change
compared to the released Delta Lake versions or within the unreleased
branches such as master.
If no, write 'No'.
-->
Yes. See description.
  • Loading branch information
andreaschat-db authored Dec 18, 2024
1 parent baa5518 commit 1cd6fed
Show file tree
Hide file tree
Showing 6 changed files with 248 additions and 24 deletions.
40 changes: 37 additions & 3 deletions python/delta/tables.py
Original file line number Diff line number Diff line change
Expand Up @@ -594,14 +594,43 @@ def addFeatureSupport(self, featureName: str) -> None:
DeltaTable._verify_type_str(featureName, "featureName")
self._jdt.addFeatureSupport(featureName)

@since(3.4) # type: ignore[arg-type]
def dropFeatureSupport(self, featureName: str, truncateHistory: Optional[bool] = None) -> None:
"""
Modify the protocol to drop a supported feature. The operation always normalizes the
resulting protocol. Protocol normalization is the process of converting a table features
protocol to the weakest possible form. This primarily refers to converting a table features
protocol to a legacy protocol. A table features protocol can be represented with the legacy
representation only when the feature set of the former exactly matches a legacy protocol.
Normalization can also decrease the reader version of a table features protocol when it is
higher than necessary. For example:
(1, 7, None, {AppendOnly, Invariants, CheckConstraints}) -> (1, 3)
(3, 7, None, {RowTracking}) -> (1, 7, RowTracking)
The dropFeatureSupport method can be used as follows:
delta.tables.DeltaTable.dropFeatureSupport("rowTracking")
:param featureName: The name of the feature to drop.
:param truncateHistory: Optional value whether to truncate history. If not specified,
the history is not truncated.
:return: None.
"""
DeltaTable._verify_type_str(featureName, "featureName")
if truncateHistory is None:
self._jdt.dropFeatureSupport(featureName)
else:
DeltaTable._verify_type_bool(truncateHistory, "truncateHistory")
self._jdt.dropFeatureSupport(featureName, truncateHistory)

@since(1.2) # type: ignore[arg-type]
def restoreToVersion(self, version: int) -> DataFrame:
"""
Restore the DeltaTable to an older version of the table specified by version number.
Example::
io.delta.tables.DeltaTable.restoreToVersion(1)
delta.tables.DeltaTable.restoreToVersion(1)
:param version: target version of restored table
:return: Dataframe with metrics of restore operation.
Expand All @@ -622,8 +651,8 @@ def restoreToTimestamp(self, timestamp: str) -> DataFrame:
Example::
io.delta.tables.DeltaTable.restoreToTimestamp('2021-01-01')
io.delta.tables.DeltaTable.restoreToTimestamp('2021-01-01 01:01:01')
delta.tables.DeltaTable.restoreToTimestamp('2021-01-01')
delta.tables.DeltaTable.restoreToTimestamp('2021-01-01 01:01:01')
:param timestamp: target timestamp of restored table
:return: Dataframe with metrics of restore operation.
Expand Down Expand Up @@ -658,6 +687,11 @@ def optimize(self) -> "DeltaOptimizeBuilder":
jbuilder = self._jdt.optimize()
return DeltaOptimizeBuilder(self._spark, jbuilder)

@classmethod # type: ignore[arg-type]
def _verify_type_bool(self, variable: bool, name: str) -> None:
if not isinstance(variable, bool) or variable is None:
raise ValueError("%s needs to be a boolean but got '%s'." % (name, type(variable)))

@staticmethod # type: ignore[arg-type]
def _verify_type_str(variable: str, name: str) -> None:
if not isinstance(variable, str) or variable is None:
Expand Down
85 changes: 72 additions & 13 deletions python/delta/tests/test_deltatable.py
Original file line number Diff line number Diff line change
Expand Up @@ -1151,16 +1151,19 @@ def test_delta_table_builder_with_bad_args(self) -> None:
with self.assertRaises(TypeError):
builder.property("1", 1) # type: ignore[arg-type]

def test_protocolUpgrade(self) -> None:
def __create_df_for_feature_tests(self) -> DeltaTable:
try:
self.spark.conf.set('spark.databricks.delta.minWriterVersion', '2')
self.spark.conf.set('spark.databricks.delta.minReaderVersion', '1')
self.spark.conf.set('spark.databricks.delta.minWriterVersion', '2')
self.__writeDeltaTable([('a', 1), ('b', 2), ('c', 3), ('d', 4)])
dt = DeltaTable.forPath(self.spark, self.tempFile)
dt.upgradeTableProtocol(1, 3)
return DeltaTable.forPath(self.spark, self.tempFile)
finally:
self.spark.conf.unset('spark.databricks.delta.minWriterVersion')
self.spark.conf.unset('spark.databricks.delta.minReaderVersion')
self.spark.conf.unset('spark.databricks.delta.minWriterVersion')

def test_protocolUpgrade(self) -> None:
dt = self.__create_df_for_feature_tests()
dt.upgradeTableProtocol(1, 3)

# cannot downgrade once upgraded
dt.upgradeTableProtocol(1, 2)
Expand Down Expand Up @@ -1189,14 +1192,7 @@ def test_protocolUpgrade(self) -> None:
dt.upgradeTableProtocol(1, {}) # type: ignore[arg-type]

def test_addFeatureSupport(self) -> None:
try:
self.spark.conf.set('spark.databricks.delta.minReaderVersion', '1')
self.spark.conf.set('spark.databricks.delta.minWriterVersion', '2')
self.__writeDeltaTable([('a', 1), ('b', 2), ('c', 3), ('d', 4)])
dt = DeltaTable.forPath(self.spark, self.tempFile)
finally:
self.spark.conf.unset('spark.databricks.delta.minReaderVersion')
self.spark.conf.unset('spark.databricks.delta.minWriterVersion')
dt = self.__create_df_for_feature_tests()

# bad args
with self.assertRaisesRegex(Py4JJavaError, "DELTA_UNSUPPORTED_FEATURES_IN_CONFIG"):
Expand Down Expand Up @@ -1224,6 +1220,69 @@ def test_addFeatureSupport(self) -> None:
self.assertEqual(sorted(dt_details["tableFeatures"]),
["appendOnly", "deletionVectors", "invariants"])

def test_dropFeatureSupport(self) -> None:
dt = self.__create_df_for_feature_tests()

dt.addFeatureSupport("testRemovableWriter")
dt_details = dt.detail().collect()[0].asDict()
self.assertTrue(dt_details["minReaderVersion"] == 1)
self.assertTrue(dt_details["minWriterVersion"] == 7, "Should upgrade to table features")
self.assertEqual(sorted(dt_details["tableFeatures"]),
["appendOnly", "invariants", "testRemovableWriter"])

# Attempt truncating the history when dropping a feature that is not required.
# This verifies the truncateHistory option was correctly passed.
with self.assertRaisesRegex(Exception,
"DELTA_FEATURE_DROP_HISTORY_TRUNCATION_NOT_ALLOWED"):
dt.dropFeatureSupport("testRemovableWriter", True)

dt.dropFeatureSupport("testRemovableWriter")
dt_details = dt.detail().collect()[0].asDict()
self.assertTrue(dt_details["minReaderVersion"] == 1)
self.assertTrue(dt_details["minWriterVersion"] == 2, "Should return to legacy protocol")

dt.addFeatureSupport("testRemovableReaderWriter")
dt_details = dt.detail().collect()[0].asDict()
self.assertTrue(dt_details["minReaderVersion"] == 3, "Should upgrade to table features")
self.assertTrue(dt_details["minWriterVersion"] == 7, "Should upgrade to table features")
self.assertEqual(sorted(dt_details["tableFeatures"]),
["appendOnly", "invariants", "testRemovableReaderWriter"])

dt.dropFeatureSupport("testRemovableReaderWriter")
dt_details = dt.detail().collect()[0].asDict()
self.assertTrue(dt_details["minReaderVersion"] == 1, "Should return to legacy protocol")
self.assertTrue(dt_details["minWriterVersion"] == 2, "Should return to legacy protocol")

# Try to drop an unsupported feature.
with self.assertRaisesRegex(Exception, "DELTA_FEATURE_DROP_UNSUPPORTED_CLIENT_FEATURE"):
dt.dropFeatureSupport("__invalid_feature__")

# Try to drop a feature that is not present in the protocol.
with self.assertRaisesRegex(Exception, "DELTA_FEATURE_DROP_FEATURE_NOT_PRESENT"):
dt.dropFeatureSupport("testRemovableReaderWriter")

# Try to drop a non-removable feature.
with self.assertRaisesRegex(Exception, "DELTA_FEATURE_DROP_NONREMOVABLE_FEATURE"):
dt.dropFeatureSupport("testReaderWriter")

with self.assertRaisesRegex(ValueError, "featureName needs to be a string"):
dt.dropFeatureSupport(12345) # type: ignore[arg-type]
with self.assertRaisesRegex(ValueError, "featureName needs to be a string"):
dt.dropFeatureSupport([12345]) # type: ignore[arg-type]
with self.assertRaisesRegex(ValueError, "featureName needs to be a string"):
dt.dropFeatureSupport({}) # type: ignore[arg-type]
with self.assertRaisesRegex(ValueError, "featureName needs to be a string"):
dt.dropFeatureSupport([]) # type: ignore[arg-type]

with self.assertRaisesRegex(ValueError, "truncateHistory needs to be a boolean"):
dt.dropFeatureSupport("testRemovableWriter", 12345) # type: ignore[arg-type]
with self.assertRaisesRegex(ValueError, "truncateHistory needs to be a boolean"):
dt.dropFeatureSupport("testRemovableWriter", [12345]) # type: ignore[arg-type]
with self.assertRaisesRegex(ValueError, "truncateHistory needs to be a boolean"):
dt.dropFeatureSupport("testRemovableWriter", {}) # type: ignore[arg-type]
with self.assertRaisesRegex(ValueError, "truncateHistory needs to be a boolean"):
dt.dropFeatureSupport("testRemovableWriter", []) # type: ignore[arg-type]

def test_restore_to_version(self) -> None:
self.__writeDeltaTable([('a', 1), ('b', 2)])
self.__overwriteDeltaTable([('a', 3), ('b', 2)],
Expand Down
71 changes: 69 additions & 2 deletions spark/src/main/scala/io/delta/tables/DeltaTable.scala
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,9 @@ import scala.collection.JavaConverters._

import org.apache.spark.sql.delta._
import org.apache.spark.sql.delta.DeltaTableUtils.withActiveSession
import org.apache.spark.sql.delta.actions.{Protocol, TableFeatureProtocolUtils}
import org.apache.spark.sql.delta.actions.TableFeatureProtocolUtils
import org.apache.spark.sql.delta.catalog.DeltaTableV2
import org.apache.spark.sql.delta.commands.AlterTableSetPropertiesDeltaCommand
import org.apache.spark.sql.delta.commands.{AlterTableDropFeatureDeltaCommand, AlterTableSetPropertiesDeltaCommand}
import org.apache.spark.sql.delta.sources.DeltaSQLConf
import io.delta.tables.execution._
import org.apache.hadoop.fs.Path
Expand Down Expand Up @@ -562,6 +562,73 @@ class DeltaTable private[tables](
toDataset(sparkSession, alterTableCmd)
}

private def executeDropFeature(featureName: String, truncateHistory: Option[Boolean]): Unit = {
val alterTableCmd = AlterTableDropFeatureDeltaCommand(
table = table,
featureName = featureName,
truncateHistory = truncateHistory.getOrElse(false))
toDataset(sparkSession, alterTableCmd)
}

/**
* Modify the protocol to drop a supported feature. The operation always normalizes the
* resulting protocol. Protocol normalization is the process of converting a table features
* protocol to the weakest possible form. This primarily refers to converting a table features
* protocol to a legacy protocol. A table features protocol can be represented with the legacy
* representation only when the feature set of the former exactly matches a legacy protocol.
* Normalization can also decrease the reader version of a table features protocol when it is
* higher than necessary. For example:
*
* (1, 7, None, {AppendOnly, Invariants, CheckConstraints}) -> (1, 3)
* (3, 7, None, {RowTracking}) -> (1, 7, RowTracking)
*
* The dropFeatureSupport method can be used as follows:
* {{{
* io.delta.tables.DeltaTable.dropFeatureSupport("rowTracking")
* }}}
*
* See online documentation for more details.
*
* @param featureName The name of the feature to drop.
* @param truncateHistory Whether to truncate history before downgrading the protocol.
* @return None.
* @since 3.4.0
*/
def dropFeatureSupport(
featureName: String,
truncateHistory: Boolean): Unit = withActiveSession(sparkSession) {
executeDropFeature(featureName, Some(truncateHistory))
}

/**
* Modify the protocol to drop a supported feature. The operation always normalizes the
* resulting protocol. Protocol normalization is the process of converting a table features
* protocol to the weakest possible form. This primarily refers to converting a table features
* protocol to a legacy protocol. A table features protocol can be represented with the legacy
* representation only when the feature set of the former exactly matches a legacy protocol.
* Normalization can also decrease the reader version of a table features protocol when it is
* higher than necessary. For example:
*
* (1, 7, None, {AppendOnly, Invariants, CheckConstraints}) -> (1, 3)
* (3, 7, None, {RowTracking}) -> (1, 7, RowTracking)
*
* The dropFeatureSupport method can be used as follows:
* {{{
* io.delta.tables.DeltaTable.dropFeatureSupport("rowTracking")
* }}}
*
* Note, this command will not truncate history.
*
* See online documentation for more details.
*
* @param featureName The name of the feature to drop.
* @return None.
* @since 3.4.0
*/
def dropFeatureSupport(featureName: String): Unit = withActiveSession(sparkSession) {
executeDropFeature(featureName, None)
}

/**
* Clone a DeltaTable to a given destination to mirror the existing table's data and metadata.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1044,7 +1044,7 @@ object TestReaderWriterMetadataAutoUpdateFeature
}
}

private[sql] object TestRemovableWriterFeature
object TestRemovableWriterFeature
extends WriterFeature(name = "testRemovableWriter")
with FeatureAutomaticallyEnabledByMetadata
with RemovableFeature {
Expand Down Expand Up @@ -1093,7 +1093,7 @@ private[sql] object TestRemovableWriterFeatureWithDependency
Set(TestRemovableReaderWriterFeature, TestRemovableWriterFeature)
}

private[sql] object TestRemovableReaderWriterFeature
object TestRemovableReaderWriterFeature
extends ReaderWriterFeature(name = "testRemovableReaderWriter")
with FeatureAutomaticallyEnabledByMetadata
with RemovableFeature {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -322,7 +322,7 @@ case class AlterTableDropFeatureDeltaCommand(
// Check whether the protocol contains the feature in either the writer features list or
// the reader+writer features list. Note, protocol needs to denormalized to allow dropping
// features from legacy protocols.
val protocol = table.initialSnapshot.protocol
val protocol = table.deltaLog.update().protocol
val protocolContainsFeatureName =
protocol.implicitlyAndExplicitlySupportedFeatures.map(_.name).contains(featureName)
if (!protocolContainsFeatureName) {
Expand Down
70 changes: 67 additions & 3 deletions spark/src/test/scala/io/delta/tables/DeltaTableSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import java.util.Locale
import scala.language.postfixOps

// scalastyle:off import.ordering.noEmptyLine
import org.apache.spark.sql.delta.{AppendOnlyTableFeature, DeltaIllegalArgumentException, DeltaLog, DeltaTableFeatureException, FakeFileSystem, InvariantsTableFeature, TestReaderWriterFeature, TestWriterFeature}
import org.apache.spark.sql.delta.{AppendOnlyTableFeature, DeltaIllegalArgumentException, DeltaLog, DeltaTableFeatureException, FakeFileSystem, InvariantsTableFeature, TestReaderWriterFeature, TestRemovableReaderWriterFeature, TestRemovableWriterFeature, TestWriterFeature}
import org.apache.spark.sql.delta.actions.{ Metadata, Protocol }
import org.apache.spark.sql.delta.storage.LocalLogStore
import org.apache.spark.sql.delta.test.DeltaSQLCommandTest
Expand Down Expand Up @@ -247,6 +247,7 @@ class DeltaTableHadoopOptionsSuite extends QueryTest
"cloneAtVersion",
"delete",
"detail",
"dropFeatureSupport",
"generate",
"history",
"merge",
Expand Down Expand Up @@ -631,8 +632,7 @@ class DeltaTableHadoopOptionsSuite extends QueryTest
}
}

test(
"addFeatureSupport - with filesystem options.") {
test("addFeatureSupport - with filesystem options.") {
withTempDir { dir =>
val path = fakeFileSystemPath(dir)
val fsOptions = fakeFileSystemOptions
Expand Down Expand Up @@ -670,6 +670,70 @@ class DeltaTableHadoopOptionsSuite extends QueryTest
}
}

test("dropFeatureSupport - with filesystem options.") {
withTempDir { dir =>
val path = fakeFileSystemPath(dir)
val fsOptions = fakeFileSystemOptions

// create a table with a default Protocol.
val testSchema = spark.range(1).schema
val log = DeltaLog.forTable(spark, new Path(path), fsOptions)
log.createLogDirectoriesIfNotExists()
log.store.write(
FileNames.unsafeDeltaFile(log.logPath, 0),
Iterator(Metadata(schemaString = testSchema.json).json, Protocol(1, 2).json),
overwrite = false,
log.newDeltaHadoopConf())
log.update()

// update the protocol to support a writer feature.
val table = DeltaTable.forPath(spark, path, fsOptions)
table.addFeatureSupport(TestRemovableWriterFeature.name)
assert(log.update().protocol === Protocol(1, 7).withFeatures(Seq(
AppendOnlyTableFeature,
InvariantsTableFeature,
TestRemovableWriterFeature)))

// Attempt truncating the history when dropping a feature that is not required.
// This verifies the truncateHistory option was correctly passed.
assert(intercept[DeltaTableFeatureException] {
table.dropFeatureSupport("testRemovableWriter", truncateHistory = true)
}.getErrorClass === "DELTA_FEATURE_DROP_HISTORY_TRUNCATION_NOT_ALLOWED")

// Drop feature.
table.dropFeatureSupport(TestRemovableWriterFeature.name)
// After dropping the feature we should return back to the original protocol.
assert(log.update().protocol === Protocol(1, 2))

table.addFeatureSupport(TestRemovableReaderWriterFeature.name)
assert(
log.update().protocol === Protocol(3, 7).withFeatures(Seq(
AppendOnlyTableFeature,
InvariantsTableFeature,
TestRemovableReaderWriterFeature)))

// Drop feature.
table.dropFeatureSupport(TestRemovableReaderWriterFeature.name)
// After dropping the feature we should return back to the original protocol.
assert(log.update().protocol === Protocol(1, 2))

// Try to drop an unsupported feature.
assert(intercept[DeltaTableFeatureException] {
table.dropFeatureSupport("__invalid_feature__")
}.getErrorClass === "DELTA_FEATURE_DROP_UNSUPPORTED_CLIENT_FEATURE")

// Try to drop a feature that is not present in the protocol.
assert(intercept[DeltaTableFeatureException] {
table.dropFeatureSupport(TestRemovableReaderWriterFeature.name)
}.getErrorClass === "DELTA_FEATURE_DROP_FEATURE_NOT_PRESENT")

// Try to drop a non-removable feature.
assert(intercept[DeltaTableFeatureException] {
table.dropFeatureSupport(TestReaderWriterFeature.name)
}.getErrorClass === "DELTA_FEATURE_DROP_NONREMOVABLE_FEATURE")
}
}

test("details - with filesystem options.") {
withTempDir{ dir =>
val path = fakeFileSystemPath(dir)
Expand Down

0 comments on commit 1cd6fed

Please sign in to comment.