-
Notifications
You must be signed in to change notification settings - Fork 1.7k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[Spark] Drop feature support in DeltaTable Scala/Python APIs #3952
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -594,14 +594,43 @@ def addFeatureSupport(self, featureName: str) -> None: | |
DeltaTable._verify_type_str(featureName, "featureName") | ||
self._jdt.addFeatureSupport(featureName) | ||
|
||
@since(3.4) # type: ignore[arg-type] | ||
def dropFeatureSupport(self, featureName: str, truncateHistory: Optional[bool] = None) -> None: | ||
""" | ||
Modify the protocol to drop a supported feature. The operation always normalizes the | ||
resulting protocol. Protocol normalization is the process of converting a table features | ||
andreaschat-db marked this conversation as resolved.
Show resolved
Hide resolved
|
||
protocol to the weakest possible form. This primarily refers to converting a table features | ||
andreaschat-db marked this conversation as resolved.
Show resolved
Hide resolved
|
||
protocol to a legacy protocol. A table features protocol can be represented with the legacy | ||
representation only when the feature set of the former exactly matches a legacy protocol. | ||
Normalization can also decrease the reader version of a table features protocol when it is | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why don't we make the Scala text description and Python text description identical? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. That was my initial intention but I diverted later on :D. |
||
higher than necessary. For example: | ||
|
||
(1, 7, None, {AppendOnly, Invariants, CheckConstraints}) -> (1, 3) | ||
(3, 7, None, {RowTracking}) -> (1, 7, RowTracking) | ||
|
||
The dropFeatureSupport method can be used as follows: | ||
delta.tables.DeltaTable.dropFeatureSupport("rowTracking") | ||
|
||
:param featureName: The name of the feature to drop. | ||
:param truncateHistory: Optional value whether to truncate history. If not specified, | ||
the history is not truncated. | ||
:return: None. | ||
""" | ||
DeltaTable._verify_type_str(featureName, "featureName") | ||
if truncateHistory is None: | ||
self._jdt.dropFeatureSupport(featureName) | ||
else: | ||
DeltaTable._verify_type_bool(truncateHistory, "truncateHistory") | ||
self._jdt.dropFeatureSupport(featureName, truncateHistory) | ||
|
||
@since(1.2) # type: ignore[arg-type] | ||
def restoreToVersion(self, version: int) -> DataFrame: | ||
""" | ||
Restore the DeltaTable to an older version of the table specified by version number. | ||
|
||
Example:: | ||
|
||
io.delta.tables.DeltaTable.restoreToVersion(1) | ||
delta.tables.DeltaTable.restoreToVersion(1) | ||
|
||
:param version: target version of restored table | ||
:return: Dataframe with metrics of restore operation. | ||
|
@@ -622,8 +651,8 @@ def restoreToTimestamp(self, timestamp: str) -> DataFrame: | |
|
||
Example:: | ||
|
||
io.delta.tables.DeltaTable.restoreToTimestamp('2021-01-01') | ||
io.delta.tables.DeltaTable.restoreToTimestamp('2021-01-01 01:01:01') | ||
delta.tables.DeltaTable.restoreToTimestamp('2021-01-01') | ||
delta.tables.DeltaTable.restoreToTimestamp('2021-01-01 01:01:01') | ||
|
||
:param timestamp: target timestamp of restored table | ||
:return: Dataframe with metrics of restore operation. | ||
|
@@ -658,6 +687,11 @@ def optimize(self) -> "DeltaOptimizeBuilder": | |
jbuilder = self._jdt.optimize() | ||
return DeltaOptimizeBuilder(self._spark, jbuilder) | ||
|
||
@classmethod # type: ignore[arg-type] | ||
def _verify_type_bool(self, variable: bool, name: str) -> None: | ||
if not isinstance(variable, bool) or variable is None: | ||
raise ValueError("%s needs to be a boolean but got '%s'." % (name, type(variable))) | ||
|
||
@staticmethod # type: ignore[arg-type] | ||
def _verify_type_str(variable: str, name: str) -> None: | ||
if not isinstance(variable, str) or variable is None: | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -20,9 +20,9 @@ import scala.collection.JavaConverters._ | |
|
||
import org.apache.spark.sql.delta._ | ||
import org.apache.spark.sql.delta.DeltaTableUtils.withActiveSession | ||
import org.apache.spark.sql.delta.actions.{Protocol, TableFeatureProtocolUtils} | ||
import org.apache.spark.sql.delta.actions.TableFeatureProtocolUtils | ||
import org.apache.spark.sql.delta.catalog.DeltaTableV2 | ||
import org.apache.spark.sql.delta.commands.AlterTableSetPropertiesDeltaCommand | ||
import org.apache.spark.sql.delta.commands.{AlterTableDropFeatureDeltaCommand, AlterTableSetPropertiesDeltaCommand} | ||
import org.apache.spark.sql.delta.sources.DeltaSQLConf | ||
import io.delta.tables.execution._ | ||
import org.apache.hadoop.fs.Path | ||
|
@@ -562,6 +562,73 @@ class DeltaTable private[tables]( | |
toDataset(sparkSession, alterTableCmd) | ||
} | ||
|
||
private def executeDropFeature(featureName: String, truncateHistory: Option[Boolean]): Unit = { | ||
val alterTableCmd = AlterTableDropFeatureDeltaCommand( | ||
table = table, | ||
featureName = featureName, | ||
truncateHistory = truncateHistory.getOrElse(false)) | ||
toDataset(sparkSession, alterTableCmd) | ||
} | ||
|
||
/** | ||
* Modify the protocol to drop a supported feature. The operation always normalizes the | ||
* resulting protocol. Protocol normalization is the process of converting a table features | ||
andreaschat-db marked this conversation as resolved.
Show resolved
Hide resolved
|
||
* protocol to the weakest possible form. This primarily refers to converting a table features | ||
andreaschat-db marked this conversation as resolved.
Show resolved
Hide resolved
|
||
* protocol to a legacy protocol. A table features protocol can be represented with the legacy | ||
* representation only when the feature set of the former exactly matches a legacy protocol. | ||
* Normalization can also decrease the reader version of a table features protocol when it is | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. "decrease the reader version" -> This should apply to writer version as well, so any version? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. No it only applies to the reader version.
andreaschat-db marked this conversation as resolved.
Show resolved
Hide resolved
|
||
* higher than necessary. For example: | ||
* | ||
* (1, 7, None, {AppendOnly, Invariants, CheckConstraints}) -> (1, 3) | ||
* (3, 7, None, {RowTracking}) -> (1, 7, RowTracking) | ||
* | ||
* The dropFeatureSupport method can be used as follows: | ||
* {{{ | ||
* io.delta.tables.DeltaTable.dropFeatureSupport("rowTracking") | ||
* }}} | ||
* | ||
* See online documentation for more details. | ||
* | ||
* @param featureName The name of the feature to drop. | ||
* @param truncateHistory Whether to truncate history before downgrading the protocol. | ||
* @return None. | ||
* @since 3.4.0 | ||
*/ | ||
def dropFeatureSupport( | ||
featureName: String, | ||
truncateHistory: Boolean): Unit = withActiveSession(sparkSession) { | ||
executeDropFeature(featureName, Some(truncateHistory)) | ||
} | ||
|
||
/** | ||
* Modify the protocol to drop a supported feature. The operation always normalizes the | ||
* resulting protocol. Protocol normalization is the process of converting a table features | ||
* protocol to the weakest possible form. This primarily refers to converting a table features | ||
* protocol to a legacy protocol. A table features protocol can be represented with the legacy | ||
* representation only when the feature set of the former exactly matches a legacy protocol. | ||
* Normalization can also decrease the reader version of a table features protocol when it is | ||
* higher than necessary. For example: | ||
* | ||
* (1, 7, None, {AppendOnly, Invariants, CheckConstraints}) -> (1, 3) | ||
* (3, 7, None, {RowTracking}) -> (1, 7, RowTracking) | ||
* | ||
* The dropFeatureSupport method can be used as follows: | ||
* {{{ | ||
* io.delta.tables.DeltaTable.dropFeatureSupport("rowTracking") | ||
* }}} | ||
* | ||
* Note, this command will not truncate history. | ||
* | ||
* See online documentation for more details. | ||
* | ||
* @param featureName The name of the feature to drop. | ||
* @return None. | ||
* @since 3.4.0 | ||
*/ | ||
def dropFeatureSupport(featureName: String): Unit = withActiveSession(sparkSession) { | ||
executeDropFeature(featureName, None) | ||
} | ||
|
||
andreaschat-db marked this conversation as resolved.
Show resolved
Hide resolved
|
||
/** | ||
* Clone a DeltaTable to a given destination to mirror the existing table's data and metadata. | ||
* | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -322,7 +322,7 @@ case class AlterTableDropFeatureDeltaCommand( | |
// Check whether the protocol contains the feature in either the writer features list or | ||
// the reader+writer features list. Note, protocol needs to denormalized to allow dropping | ||
// features from legacy protocols. | ||
val protocol = table.initialSnapshot.protocol | ||
val protocol = table.deltaLog.update().protocol | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'm happy with changing this to There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yeah good question. That command was until now primarily only accessible with spark SQL. It seems that when using SQL the command would get table with a fresh snapshot. However, now with the DeltaTable API the user directly controls which |
||
val protocolContainsFeatureName = | ||
protocol.implicitlyAndExplicitlySupportedFeatures.map(_.name).contains(featureName) | ||
if (!protocolContainsFeatureName) { | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -22,7 +22,7 @@ import java.util.Locale | |
import scala.language.postfixOps | ||
|
||
// scalastyle:off import.ordering.noEmptyLine | ||
import org.apache.spark.sql.delta.{AppendOnlyTableFeature, DeltaIllegalArgumentException, DeltaLog, DeltaTableFeatureException, FakeFileSystem, InvariantsTableFeature, TestReaderWriterFeature, TestWriterFeature} | ||
import org.apache.spark.sql.delta.{AppendOnlyTableFeature, DeltaIllegalArgumentException, DeltaLog, DeltaTableFeatureException, FakeFileSystem, InvariantsTableFeature, TestReaderWriterFeature, TestRemovableReaderWriterFeature, TestRemovableWriterFeature, TestWriterFeature} | ||
import org.apache.spark.sql.delta.actions.{ Metadata, Protocol } | ||
import org.apache.spark.sql.delta.storage.LocalLogStore | ||
import org.apache.spark.sql.delta.test.DeltaSQLCommandTest | ||
|
@@ -247,6 +247,7 @@ class DeltaTableHadoopOptionsSuite extends QueryTest | |
"cloneAtVersion", | ||
"delete", | ||
"detail", | ||
"dropFeatureSupport", | ||
"generate", | ||
"history", | ||
"merge", | ||
|
@@ -631,8 +632,7 @@ class DeltaTableHadoopOptionsSuite extends QueryTest | |
} | ||
} | ||
|
||
test( | ||
"addFeatureSupport - with filesystem options.") { | ||
test("addFeatureSupport - with filesystem options.") { | ||
withTempDir { dir => | ||
val path = fakeFileSystemPath(dir) | ||
val fsOptions = fakeFileSystemOptions | ||
|
@@ -670,6 +670,70 @@ class DeltaTableHadoopOptionsSuite extends QueryTest | |
} | ||
} | ||
|
||
test("dropFeatureSupport - with filesystem options.") { | ||
withTempDir { dir => | ||
val path = fakeFileSystemPath(dir) | ||
val fsOptions = fakeFileSystemOptions | ||
|
||
// create a table with a default Protocol. | ||
val testSchema = spark.range(1).schema | ||
val log = DeltaLog.forTable(spark, new Path(path), fsOptions) | ||
log.createLogDirectoriesIfNotExists() | ||
log.store.write( | ||
FileNames.unsafeDeltaFile(log.logPath, 0), | ||
Iterator(Metadata(schemaString = testSchema.json).json, Protocol(1, 2).json), | ||
overwrite = false, | ||
log.newDeltaHadoopConf()) | ||
log.update() | ||
|
||
// update the protocol to support a writer feature. | ||
val table = DeltaTable.forPath(spark, path, fsOptions) | ||
table.addFeatureSupport(TestRemovableWriterFeature.name) | ||
assert(log.update().protocol === Protocol(1, 7).withFeatures(Seq( | ||
AppendOnlyTableFeature, | ||
InvariantsTableFeature, | ||
TestRemovableWriterFeature))) | ||
|
||
// Attempt truncating the history when dropping a feature that is not required. | ||
// This verifies the truncateHistory option was correctly passed. | ||
assert(intercept[DeltaTableFeatureException] { | ||
table.dropFeatureSupport("testRemovableWriter", truncateHistory = true) | ||
}.getErrorClass === "DELTA_FEATURE_DROP_HISTORY_TRUNCATION_NOT_ALLOWED") | ||
|
||
// Drop feature. | ||
table.dropFeatureSupport(TestRemovableWriterFeature.name) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Should we add a test where after dropping the feature, the protocol doesn't change? Like dropping a readerWriterFeature from a set of 2 readerWriterFeatures should remain (3, 7) There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. All there cases (and more) are covered in the |
||
// After dropping the feature we should return back to the original protocol. | ||
assert(log.update().protocol === Protocol(1, 2)) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is there a way to check explicitly the list of features still remain here? Or that we are implicitly having all the supported features of 1, 2 due to the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||
|
||
table.addFeatureSupport(TestRemovableReaderWriterFeature.name) | ||
assert( | ||
log.update().protocol === Protocol(3, 7).withFeatures(Seq( | ||
AppendOnlyTableFeature, | ||
InvariantsTableFeature, | ||
TestRemovableReaderWriterFeature))) | ||
|
||
// Drop feature. | ||
table.dropFeatureSupport(TestRemovableReaderWriterFeature.name) | ||
// After dropping the feature we should return back to the original protocol. | ||
assert(log.update().protocol === Protocol(1, 2)) | ||
andreaschat-db marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
// Try to drop an unsupported feature. | ||
assert(intercept[DeltaTableFeatureException] { | ||
table.dropFeatureSupport("__invalid_feature__") | ||
}.getErrorClass === "DELTA_FEATURE_DROP_UNSUPPORTED_CLIENT_FEATURE") | ||
|
||
// Try to drop a feature that is not present in the protocol. | ||
assert(intercept[DeltaTableFeatureException] { | ||
table.dropFeatureSupport(TestRemovableReaderWriterFeature.name) | ||
}.getErrorClass === "DELTA_FEATURE_DROP_FEATURE_NOT_PRESENT") | ||
|
||
// Try to drop a non-removable feature. | ||
assert(intercept[DeltaTableFeatureException] { | ||
table.dropFeatureSupport(TestReaderWriterFeature.name) | ||
}.getErrorClass === "DELTA_FEATURE_DROP_NONREMOVABLE_FEATURE") | ||
} | ||
} | ||
|
||
test("details - with filesystem options.") { | ||
withTempDir{ dir => | ||
val path = fakeFileSystemPath(dir) | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: Similar to the
alterDeltaTableCommand
's description.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I could not find where this is mentioned in
alterDeltaTableCommand
but "existing" is redundant in that case since we are already using "supported."