From 075d86df97ac44786afc942bd5738ec9a8902440 Mon Sep 17 00:00:00 2001 From: andreaschat-db Date: Wed, 18 Dec 2024 19:04:18 +0100 Subject: [PATCH] improvements --- python/delta/tables.py | 40 ++++++++- python/delta/tests/test_deltatable.py | 85 ++++++++++++++++--- .../scala/io/delta/tables/DeltaTable.scala | 71 +++++++++++++++- .../apache/spark/sql/delta/TableFeature.scala | 4 +- .../commands/alterDeltaTableCommands.scala | 2 +- .../io/delta/tables/DeltaTableSuite.scala | 70 ++++++++++++++- 6 files changed, 248 insertions(+), 24 deletions(-) diff --git a/python/delta/tables.py b/python/delta/tables.py index ee52c77e386..045af3bb037 100644 --- a/python/delta/tables.py +++ b/python/delta/tables.py @@ -594,6 +594,35 @@ def addFeatureSupport(self, featureName: str) -> None: DeltaTable._verify_type_str(featureName, "featureName") self._jdt.addFeatureSupport(featureName) + @since(3.4) # type: ignore[arg-type] + def dropFeatureSupport(self, featureName: str, truncateHistory: Optional[bool] = None) -> None: + """ + Modify the protocol to drop a supported feature. The operation always normalizes the + resulting protocol. Protocol normalization is the process of converting a table features + protocol to the weakest possible form. This primarily refers to converting a table features + protocol to a legacy protocol. A table features protocol can be represented with the legacy + representation only when the feature set of the former exactly matches a legacy protocol. + Normalization can also decrease the reader version of a table features protocol when it is + higher than necessary. For example: + + (1, 7, None, {AppendOnly, Invariants, CheckConstraints}) -> (1, 3) + (3, 7, None, {RowTracking}) -> (1, 7, RowTracking) + + The dropFeatureSupport method can be used as follows: + delta.tables.DeltaTable.dropFeatureSupport("rowTracking") + + :param featureName: The name of the feature to drop. + :param truncateHistory: Optional value whether to truncate history. If not specified, + the history is not truncated. + :return: None. + """ + DeltaTable._verify_type_str(featureName, "featureName") + if truncateHistory is None: + self._jdt.dropFeatureSupport(featureName) + else: + DeltaTable._verify_type_bool(truncateHistory, "truncateHistory") + self._jdt.dropFeatureSupport(featureName, truncateHistory) + @since(1.2) # type: ignore[arg-type] def restoreToVersion(self, version: int) -> DataFrame: """ @@ -601,7 +630,7 @@ def restoreToVersion(self, version: int) -> DataFrame: Example:: - io.delta.tables.DeltaTable.restoreToVersion(1) + delta.tables.DeltaTable.restoreToVersion(1) :param version: target version of restored table :return: Dataframe with metrics of restore operation. @@ -622,8 +651,8 @@ def restoreToTimestamp(self, timestamp: str) -> DataFrame: Example:: - io.delta.tables.DeltaTable.restoreToTimestamp('2021-01-01') - io.delta.tables.DeltaTable.restoreToTimestamp('2021-01-01 01:01:01') + delta.tables.DeltaTable.restoreToTimestamp('2021-01-01') + delta.tables.DeltaTable.restoreToTimestamp('2021-01-01 01:01:01') :param timestamp: target timestamp of restored table :return: Dataframe with metrics of restore operation. @@ -658,6 +687,11 @@ def optimize(self) -> "DeltaOptimizeBuilder": jbuilder = self._jdt.optimize() return DeltaOptimizeBuilder(self._spark, jbuilder) + @classmethod + def _verify_type_bool(self, variable, name): + if not isinstance(variable, bool) or variable is None: + raise ValueError("%s needs to be a boolean but got '%s'." % (name, type(variable))) + @staticmethod # type: ignore[arg-type] def _verify_type_str(variable: str, name: str) -> None: if not isinstance(variable, str) or variable is None: diff --git a/python/delta/tests/test_deltatable.py b/python/delta/tests/test_deltatable.py index a263e99ec96..12ce95658cf 100644 --- a/python/delta/tests/test_deltatable.py +++ b/python/delta/tests/test_deltatable.py @@ -1151,16 +1151,19 @@ def test_delta_table_builder_with_bad_args(self) -> None: with self.assertRaises(TypeError): builder.property("1", 1) # type: ignore[arg-type] - def test_protocolUpgrade(self) -> None: + def __create_df_for_feature_tests(self) -> DeltaTable: try: - self.spark.conf.set('spark.databricks.delta.minWriterVersion', '2') self.spark.conf.set('spark.databricks.delta.minReaderVersion', '1') + self.spark.conf.set('spark.databricks.delta.minWriterVersion', '2') self.__writeDeltaTable([('a', 1), ('b', 2), ('c', 3), ('d', 4)]) - dt = DeltaTable.forPath(self.spark, self.tempFile) - dt.upgradeTableProtocol(1, 3) + return DeltaTable.forPath(self.spark, self.tempFile) finally: - self.spark.conf.unset('spark.databricks.delta.minWriterVersion') self.spark.conf.unset('spark.databricks.delta.minReaderVersion') + self.spark.conf.unset('spark.databricks.delta.minWriterVersion') + + def test_protocolUpgrade(self) -> None: + dt = self.__create_df_for_feature_tests() + dt.upgradeTableProtocol(1, 3) # cannot downgrade once upgraded dt.upgradeTableProtocol(1, 2) @@ -1189,14 +1192,7 @@ def test_protocolUpgrade(self) -> None: dt.upgradeTableProtocol(1, {}) # type: ignore[arg-type] def test_addFeatureSupport(self) -> None: - try: - self.spark.conf.set('spark.databricks.delta.minReaderVersion', '1') - self.spark.conf.set('spark.databricks.delta.minWriterVersion', '2') - self.__writeDeltaTable([('a', 1), ('b', 2), ('c', 3), ('d', 4)]) - dt = DeltaTable.forPath(self.spark, self.tempFile) - finally: - self.spark.conf.unset('spark.databricks.delta.minReaderVersion') - self.spark.conf.unset('spark.databricks.delta.minWriterVersion') + dt = self.__create_df_for_feature_tests() # bad args with self.assertRaisesRegex(Py4JJavaError, "DELTA_UNSUPPORTED_FEATURES_IN_CONFIG"): @@ -1224,6 +1220,69 @@ def test_addFeatureSupport(self) -> None: self.assertEqual(sorted(dt_details["tableFeatures"]), ["appendOnly", "deletionVectors", "invariants"]) + def test_dropFeatureSupport(self) -> None: + dt = self.__create_df_for_feature_tests() + + dt.addFeatureSupport("testRemovableWriter") + dt_details = dt.detail().collect()[0].asDict() + self.assertTrue(dt_details["minReaderVersion"] == 1) + self.assertTrue(dt_details["minWriterVersion"] == 7, "Should upgrade to table features") + self.assertEqual(sorted(dt_details["tableFeatures"]), + ["appendOnly", "invariants", "testRemovableWriter"]) + + # Attempt truncating the history when dropping a feature that is not required. + # This verifies the truncateHistory option was correctly passed. + with self.assertRaisesRegex(Exception, + "DELTA_FEATURE_DROP_HISTORY_TRUNCATION_NOT_ALLOWED"): + dt.dropFeatureSupport("testRemovableWriter", True) + + dt.dropFeatureSupport("testRemovableWriter") + dt_details = dt.detail().collect()[0].asDict() + self.assertTrue(dt_details["minReaderVersion"] == 1) + self.assertTrue(dt_details["minWriterVersion"] == 2, "Should return to legacy protocol") + + dt.addFeatureSupport("testRemovableReaderWriter") + dt_details = dt.detail().collect()[0].asDict() + self.assertTrue(dt_details["minReaderVersion"] == 3, "Should upgrade to table features") + self.assertTrue(dt_details["minWriterVersion"] == 7, "Should upgrade to table features") + self.assertEqual(sorted(dt_details["tableFeatures"]), + ["appendOnly", "invariants", "testRemovableReaderWriter"]) + + dt.dropFeatureSupport("testRemovableReaderWriter") + dt_details = dt.detail().collect()[0].asDict() + self.assertTrue(dt_details["minReaderVersion"] == 1, "Should return to legacy protocol") + self.assertTrue(dt_details["minWriterVersion"] == 2, "Should return to legacy protocol") + + # Try to drop an unsupported feature. + with self.assertRaisesRegex(Exception, "DELTA_FEATURE_DROP_UNSUPPORTED_CLIENT_FEATURE"): + dt.dropFeatureSupport("__invalid_feature__") + + # Try to drop a feature that is not present in the protocol. + with self.assertRaisesRegex(Exception, "DELTA_FEATURE_DROP_FEATURE_NOT_PRESENT"): + dt.dropFeatureSupport("testRemovableReaderWriter") + + # Try to drop a non-removable feature. + with self.assertRaisesRegex(Exception, "DELTA_FEATURE_DROP_NONREMOVABLE_FEATURE"): + dt.dropFeatureSupport("testReaderWriter") + + with self.assertRaisesRegex(ValueError, "featureName needs to be a string"): + dt.dropFeatureSupport(12345) # type: ignore[arg-type] + with self.assertRaisesRegex(ValueError, "featureName needs to be a string"): + dt.dropFeatureSupport([12345]) # type: ignore[arg-type] + with self.assertRaisesRegex(ValueError, "featureName needs to be a string"): + dt.dropFeatureSupport({}) # type: ignore[arg-type] + with self.assertRaisesRegex(ValueError, "featureName needs to be a string"): + dt.dropFeatureSupport([]) # type: ignore[arg-type] + + with self.assertRaisesRegex(ValueError, "truncateHistory needs to be a boolean"): + dt.dropFeatureSupport("testRemovableWriter", 12345) # type: ignore[arg-type] + with self.assertRaisesRegex(ValueError, "truncateHistory needs to be a boolean"): + dt.dropFeatureSupport("testRemovableWriter", [12345]) # type: ignore[arg-type] + with self.assertRaisesRegex(ValueError, "truncateHistory needs to be a boolean"): + dt.dropFeatureSupport("testRemovableWriter", {}) # type: ignore[arg-type] + with self.assertRaisesRegex(ValueError, "truncateHistory needs to be a boolean"): + dt.dropFeatureSupport("testRemovableWriter", []) # type: ignore[arg-type] + def test_restore_to_version(self) -> None: self.__writeDeltaTable([('a', 1), ('b', 2)]) self.__overwriteDeltaTable([('a', 3), ('b', 2)], diff --git a/spark/src/main/scala/io/delta/tables/DeltaTable.scala b/spark/src/main/scala/io/delta/tables/DeltaTable.scala index a7b5bb9e031..bfe79484eab 100644 --- a/spark/src/main/scala/io/delta/tables/DeltaTable.scala +++ b/spark/src/main/scala/io/delta/tables/DeltaTable.scala @@ -20,9 +20,9 @@ import scala.collection.JavaConverters._ import org.apache.spark.sql.delta._ import org.apache.spark.sql.delta.DeltaTableUtils.withActiveSession -import org.apache.spark.sql.delta.actions.{Protocol, TableFeatureProtocolUtils} +import org.apache.spark.sql.delta.actions.TableFeatureProtocolUtils import org.apache.spark.sql.delta.catalog.DeltaTableV2 -import org.apache.spark.sql.delta.commands.AlterTableSetPropertiesDeltaCommand +import org.apache.spark.sql.delta.commands.{AlterTableDropFeatureDeltaCommand, AlterTableSetPropertiesDeltaCommand} import org.apache.spark.sql.delta.sources.DeltaSQLConf import io.delta.tables.execution._ import org.apache.hadoop.fs.Path @@ -562,6 +562,73 @@ class DeltaTable private[tables]( toDataset(sparkSession, alterTableCmd) } + private def executeDropFeature(featureName: String, truncateHistory: Option[Boolean]): Unit = { + val alterTableCmd = AlterTableDropFeatureDeltaCommand( + table = table, + featureName = featureName, + truncateHistory = truncateHistory.getOrElse(false)) + toDataset(sparkSession, alterTableCmd) + } + + /** + * Modify the protocol to drop a supported feature. The operation always normalizes the + * resulting protocol. Protocol normalization is the process of converting a table features + * protocol to the weakest possible form. This primarily refers to converting a table features + * protocol to a legacy protocol. A table features protocol can be represented with the legacy + * representation only when the feature set of the former exactly matches a legacy protocol. + * Normalization can also decrease the reader version of a table features protocol when it is + * higher than necessary. For example: + * + * (1, 7, None, {AppendOnly, Invariants, CheckConstraints}) -> (1, 3) + * (3, 7, None, {RowTracking}) -> (1, 7, RowTracking) + * + * The dropFeatureSupport method can be used as follows: + * {{{ + * io.delta.tables.DeltaTable.dropFeatureSupport("rowTracking") + * }}} + * + * See online documentation for more details. + * + * @param featureName The name of the feature to drop. + * @param truncateHistory Whether to truncate history before downgrading the protocol. + * @return None. + * @since 3.4.0 + */ + def dropFeatureSupport( + featureName: String, + truncateHistory: Boolean): Unit = withActiveSession(sparkSession) { + executeDropFeature(featureName, Some(truncateHistory)) + } + + /** + * Modify the protocol to drop a supported feature. The operation always normalizes the + * resulting protocol. Protocol normalization is the process of converting a table features + * protocol to the weakest possible form. This primarily refers to converting a table features + * protocol to a legacy protocol. A table features protocol can be represented with the legacy + * representation only when the feature set of the former exactly matches a legacy protocol. + * Normalization can also decrease the reader version of a table features protocol when it is + * higher than necessary. For example: + * + * (1, 7, None, {AppendOnly, Invariants, CheckConstraints}) -> (1, 3) + * (3, 7, None, {RowTracking}) -> (1, 7, RowTracking) + * + * The dropFeatureSupport method can be used as follows: + * {{{ + * io.delta.tables.DeltaTable.dropFeatureSupport("rowTracking") + * }}} + * + * Note, this command will not truncate history. + * + * See online documentation for more details. + * + * @param featureName The name of the feature to drop. + * @return None. + * @since 3.4.0 + */ + def dropFeatureSupport(featureName: String): Unit = withActiveSession(sparkSession) { + executeDropFeature(featureName, None) + } + /** * Clone a DeltaTable to a given destination to mirror the existing table's data and metadata. * diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/TableFeature.scala b/spark/src/main/scala/org/apache/spark/sql/delta/TableFeature.scala index 8bf787de260..d972dd1e031 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/TableFeature.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/TableFeature.scala @@ -1044,7 +1044,7 @@ object TestReaderWriterMetadataAutoUpdateFeature } } -private[sql] object TestRemovableWriterFeature +object TestRemovableWriterFeature extends WriterFeature(name = "testRemovableWriter") with FeatureAutomaticallyEnabledByMetadata with RemovableFeature { @@ -1093,7 +1093,7 @@ private[sql] object TestRemovableWriterFeatureWithDependency Set(TestRemovableReaderWriterFeature, TestRemovableWriterFeature) } -private[sql] object TestRemovableReaderWriterFeature +object TestRemovableReaderWriterFeature extends ReaderWriterFeature(name = "testRemovableReaderWriter") with FeatureAutomaticallyEnabledByMetadata with RemovableFeature { diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/commands/alterDeltaTableCommands.scala b/spark/src/main/scala/org/apache/spark/sql/delta/commands/alterDeltaTableCommands.scala index a2f92ef8c53..df7e7371682 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/commands/alterDeltaTableCommands.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/commands/alterDeltaTableCommands.scala @@ -322,7 +322,7 @@ case class AlterTableDropFeatureDeltaCommand( // Check whether the protocol contains the feature in either the writer features list or // the reader+writer features list. Note, protocol needs to denormalized to allow dropping // features from legacy protocols. - val protocol = table.initialSnapshot.protocol + val protocol = table.deltaLog.update().protocol val protocolContainsFeatureName = protocol.implicitlyAndExplicitlySupportedFeatures.map(_.name).contains(featureName) if (!protocolContainsFeatureName) { diff --git a/spark/src/test/scala/io/delta/tables/DeltaTableSuite.scala b/spark/src/test/scala/io/delta/tables/DeltaTableSuite.scala index 4420d23f246..f74baf8ae14 100644 --- a/spark/src/test/scala/io/delta/tables/DeltaTableSuite.scala +++ b/spark/src/test/scala/io/delta/tables/DeltaTableSuite.scala @@ -22,7 +22,7 @@ import java.util.Locale import scala.language.postfixOps // scalastyle:off import.ordering.noEmptyLine -import org.apache.spark.sql.delta.{AppendOnlyTableFeature, DeltaIllegalArgumentException, DeltaLog, DeltaTableFeatureException, FakeFileSystem, InvariantsTableFeature, TestReaderWriterFeature, TestWriterFeature} +import org.apache.spark.sql.delta.{AppendOnlyTableFeature, DeltaIllegalArgumentException, DeltaLog, DeltaTableFeatureException, FakeFileSystem, InvariantsTableFeature, TestReaderWriterFeature, TestRemovableReaderWriterFeature, TestRemovableWriterFeature, TestWriterFeature} import org.apache.spark.sql.delta.actions.{ Metadata, Protocol } import org.apache.spark.sql.delta.storage.LocalLogStore import org.apache.spark.sql.delta.test.DeltaSQLCommandTest @@ -247,6 +247,7 @@ class DeltaTableHadoopOptionsSuite extends QueryTest "cloneAtVersion", "delete", "detail", + "dropFeatureSupport", "generate", "history", "merge", @@ -631,8 +632,7 @@ class DeltaTableHadoopOptionsSuite extends QueryTest } } - test( - "addFeatureSupport - with filesystem options.") { + test("addFeatureSupport - with filesystem options.") { withTempDir { dir => val path = fakeFileSystemPath(dir) val fsOptions = fakeFileSystemOptions @@ -670,6 +670,70 @@ class DeltaTableHadoopOptionsSuite extends QueryTest } } + test("dropFeatureSupport - with filesystem options.") { + withTempDir { dir => + val path = fakeFileSystemPath(dir) + val fsOptions = fakeFileSystemOptions + + // create a table with a default Protocol. + val testSchema = spark.range(1).schema + val log = DeltaLog.forTable(spark, new Path(path), fsOptions) + log.createLogDirectoriesIfNotExists() + log.store.write( + FileNames.unsafeDeltaFile(log.logPath, 0), + Iterator(Metadata(schemaString = testSchema.json).json, Protocol(1, 2).json), + overwrite = false, + log.newDeltaHadoopConf()) + log.update() + + // update the protocol to support a writer feature. + val table = DeltaTable.forPath(spark, path, fsOptions) + table.addFeatureSupport(TestRemovableWriterFeature.name) + assert(log.update().protocol === Protocol(1, 7).withFeatures(Seq( + AppendOnlyTableFeature, + InvariantsTableFeature, + TestRemovableWriterFeature))) + + // Attempt truncating the history when dropping a feature that is not required. + // This verifies the truncateHistory option was correctly passed. + assert(intercept[DeltaTableFeatureException] { + table.dropFeatureSupport("testRemovableWriter", truncateHistory = true) + }.getErrorClass === "DELTA_FEATURE_DROP_HISTORY_TRUNCATION_NOT_ALLOWED") + + // Drop feature. + table.dropFeatureSupport(TestRemovableWriterFeature.name) + // After dropping the feature we should return back to the original protocol. + assert(log.update().protocol === Protocol(1, 2)) + + table.addFeatureSupport(TestRemovableReaderWriterFeature.name) + assert( + log.update().protocol === Protocol(3, 7).withFeatures(Seq( + AppendOnlyTableFeature, + InvariantsTableFeature, + TestRemovableReaderWriterFeature))) + + // Drop feature. + table.dropFeatureSupport(TestRemovableReaderWriterFeature.name) + // After dropping the feature we should return back to the original protocol. + assert(log.update().protocol === Protocol(1, 2)) + + // Try to drop an unsupported feature. + assert(intercept[DeltaTableFeatureException] { + table.dropFeatureSupport("__invalid_feature__") + }.getErrorClass === "DELTA_FEATURE_DROP_UNSUPPORTED_CLIENT_FEATURE") + + // Try to drop a feature that is not present in the protocol. + assert(intercept[DeltaTableFeatureException] { + table.dropFeatureSupport(TestRemovableReaderWriterFeature.name) + }.getErrorClass === "DELTA_FEATURE_DROP_FEATURE_NOT_PRESENT") + + // Try to drop a non-removable feature. + assert(intercept[DeltaTableFeatureException] { + table.dropFeatureSupport(TestReaderWriterFeature.name) + }.getErrorClass === "DELTA_FEATURE_DROP_NONREMOVABLE_FEATURE") + } + } + test("details - with filesystem options.") { withTempDir{ dir => val path = fakeFileSystemPath(dir)