Skip to content

Commit

Permalink
Add tests to check the behaviour of CLONE with row IDs.
Browse files Browse the repository at this point in the history
Closes #2678

GitOrigin-RevId: 9473242559e6957b3ce56925cfe1da256972ba1b
  • Loading branch information
longvu-db authored and allisonport-db committed Mar 7, 2024
1 parent 6cdacab commit 2fddb8b
Show file tree
Hide file tree
Showing 7 changed files with 266 additions and 68 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -611,9 +611,6 @@ trait OptimisticTransactionImpl extends TransactionalWrite
newMetadataTmp = MaterializedRowCommitVersion.updateMaterializedColumnName(
protocol, oldMetadata = snapshot.metadata, newMetadataTmp)

RowId.verifyMetadata(
snapshot.protocol, protocol, snapshot.metadata, newMetadataTmp, isCreatingNewTable)

assertMetadata(newMetadataTmp)
logInfo(s"Updated metadata from ${newMetadata.getOrElse("-")} to $newMetadataTmp")
newMetadata = Some(newMetadataTmp)
Expand Down
19 changes: 0 additions & 19 deletions spark/src/main/scala/org/apache/spark/sql/delta/RowId.scala
Original file line number Diff line number Diff line change
Expand Up @@ -66,25 +66,6 @@ object RowId {
isEnabled
}

/**
* Verifies that row IDs are only set as readable when a new table is created.
*/
private[delta] def verifyMetadata(
oldProtocol: Protocol,
newProtocol: Protocol,
oldMetadata: Metadata,
newMetadata: Metadata,
isCreatingNewTable: Boolean): Unit = {

val rowIdsEnabledBefore = isEnabled(oldProtocol, oldMetadata)
val rowIdsEnabledAfter = isEnabled(newProtocol, newMetadata)

if (rowIdsEnabledAfter && !rowIdsEnabledBefore && !isCreatingNewTable) {
throw new UnsupportedOperationException(
"Cannot enable Row IDs on an existing table.")
}
}

/**
* Assigns fresh row IDs to all AddFiles inside `actions` that do not have row IDs yet and emits
* a [[RowIdHighWaterMark]] action with the new high-water mark.
Expand Down
28 changes: 0 additions & 28 deletions spark/src/main/scala/org/apache/spark/sql/delta/RowTracking.scala
Original file line number Diff line number Diff line change
Expand Up @@ -60,32 +60,4 @@ object RowTracking {
throw DeltaErrors.convertToDeltaRowTrackingEnabledWithoutStatsCollection
}
}

/**
* Returns the sourceMetadata with the row tracking property coming from the targetMetadata.
*/
private[delta] def takeRowTrackingPropertyFromTarget(
targetMetadata: Metadata,
sourceMetadata: Metadata): Metadata = {
var newConfig = sourceMetadata.configuration - DeltaConfigs.ROW_TRACKING_ENABLED.key
targetMetadata.configuration.get(DeltaConfigs.ROW_TRACKING_ENABLED.key).foreach { v =>
newConfig += DeltaConfigs.ROW_TRACKING_ENABLED.key -> v
}
sourceMetadata.copy(configuration = newConfig)
}

/**
* Removes the row tracking property from the metadata.
*/
private[delta] def removeRowTrackingProperty(metadata: Metadata): Metadata = {
metadata.copy(configuration = metadata.configuration - DeltaConfigs.ROW_TRACKING_ENABLED.key)
}

/**
* Removes the row tracking table feature from the protocol.
*/
private[delta] def removeRowTrackingTableFeature(protocol: Protocol): Protocol = {
val writerFeaturesWithoutRowTracking = protocol.writerFeatures.map(_ - RowTrackingFeature.name)
protocol.copy(writerFeatures = writerFeaturesWithoutRowTracking)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -280,16 +280,10 @@ abstract class CloneTableBase(
id = UUID.randomUUID().toString,
name = targetSnapshot.metadata.name,
description = targetSnapshot.metadata.description)
// If it's a new table, we remove the row tracking table property to create a 1:1 CLONE of
// the source, just without row tracking. If it's an existing table, we take whatever
// setting is currently on the target, as the setting should be independent between
// target and source.
if (!tableExists(targetSnapshot)) {
clonedMetadata = RowTracking.removeRowTrackingProperty(clonedMetadata)
} else {
clonedMetadata = RowTracking.takeRowTrackingPropertyFromTarget(
targetMetadata = targetSnapshot.metadata,
sourceMetadata = clonedMetadata)
// Existing target table
if (tableExists(targetSnapshot)) {
// Set the ID equal to the target ID
clonedMetadata = clonedMetadata.copy(id = targetSnapshot.metadata.id)
}
clonedMetadata
}
Expand Down Expand Up @@ -356,14 +350,12 @@ abstract class CloneTableBase(
conf.getConf(DeltaSQLConf.RESTORE_TABLE_PROTOCOL_DOWNGRADE_ALLOWED) ||
// It's not a real downgrade if the table doesn't exist before the CLONE.
!tableExists(txn.snapshot)
val sourceProtocolWithoutRowTracking = RowTracking.removeRowTrackingTableFeature(sourceProtocol)

if (protocolDowngradeAllowed) {
minReaderVersion = minReaderVersion.max(sourceProtocol.minReaderVersion)
minWriterVersion = minWriterVersion.max(sourceProtocol.minWriterVersion)
val minProtocol = Protocol(minReaderVersion, minWriterVersion).withFeatures(enabledFeatures)
// Row tracking settings should be independent between target and source.
sourceProtocolWithoutRowTracking.merge(minProtocol)
sourceProtocol.merge(minProtocol)
} else {
// Take the maximum of all protocol versions being merged to ensure that table features
// from table property overrides are correctly added to the table feature list or are only
Expand All @@ -373,8 +365,7 @@ abstract class CloneTableBase(
minWriterVersion = Seq(
targetProtocol.minWriterVersion, sourceProtocol.minWriterVersion, minWriterVersion).max
val minProtocol = Protocol(minReaderVersion, minWriterVersion).withFeatures(enabledFeatures)
// Row tracking settings should be independent between target and source.
targetProtocol.merge(sourceProtocolWithoutRowTracking, minProtocol)
targetProtocol.merge(sourceProtocol, minProtocol)
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,15 @@ case class AlterTableSetPropertiesDeltaCommand(

override def run(sparkSession: SparkSession): Seq[Row] = {
val deltaLog = table.deltaLog

val rowTrackingPropertyKey = DeltaConfigs.ROW_TRACKING_ENABLED.key
val enableRowTracking = configuration.keySet.contains(rowTrackingPropertyKey) &&
configuration(rowTrackingPropertyKey).toBoolean

if (enableRowTracking) {
// TODO(longvu-db): This will be removed once we support backfill.
throw new UnsupportedOperationException("Cannot enable Row IDs on an existing table.")
}
val columnMappingPropertyKey = DeltaConfigs.COLUMN_MAPPING_MODE.key
val disableColumnMapping = configuration.get(columnMappingPropertyKey).contains("none")
val columnMappingRemovalAllowed = sparkSession.sessionState.conf.getConf(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,248 @@
/*
* Copyright (2021) The Delta Lake Project Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.delta.rowid

import org.apache.spark.sql.delta.{DeltaConfigs, DeltaIllegalStateException, DeltaLog, RowId}
import org.apache.spark.sql.delta.actions.TableFeatureProtocolUtils.TABLE_FEATURES_MIN_WRITER_VERSION
import org.apache.spark.sql.delta.sources.DeltaSQLConf

import org.apache.spark.SparkFunSuite
import org.apache.spark.sql.QueryTest
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.test.SharedSparkSession

class RowIdCloneSuite
extends QueryTest
with SharedSparkSession
with RowIdTestUtils {


val numRows = 10

test("clone assigns fresh row IDs when explicitly adding row IDs support") {
withTables(
TableSetupInfo(tableName = "source",
rowIdsEnabled = false, tableState = TableState.NON_EMPTY),
TableSetupInfo(tableName = "target",
rowIdsEnabled = false, tableState = TableState.NON_EXISTING)) {
cloneTable(
targetTableName = "target",
sourceTableName = "source",
tblProperties = s"'$rowTrackingFeatureName' = 'supported'" :: Nil)

val (targetLog, snapshot) = DeltaLog.forTableWithSnapshot(spark, TableIdentifier("target"))
assertRowIdsAreValid(targetLog)
assert(RowId.isSupported(snapshot.protocol))
assert(!RowId.isEnabled(snapshot.protocol, snapshot.metadata))
}
}

test("clone a table with row tracking enabled into non-existing target " +
"enables row tracking even if disabled by default") {
withTables(
TableSetupInfo(tableName = "source",
rowIdsEnabled = true, tableState = TableState.NON_EMPTY),
TableSetupInfo(tableName = "target",
rowIdsEnabled = false, tableState = TableState.NON_EXISTING)) {
withRowTrackingEnabled(enabled = false) {
cloneTable(targetTableName = "target", sourceTableName = "source")

val (targetLog, snapshot) =
DeltaLog.forTableWithSnapshot(spark, TableIdentifier("target"))
assertRowIdsAreValid(targetLog)
assert(RowId.isSupported(targetLog.update().protocol))
assert(RowId.isEnabled(snapshot.protocol, snapshot.metadata))
}
}
}

test("clone that add row ID feature using table property override " +
"doesn't enable row IDs on target") {
withTables(
TableSetupInfo(tableName = "source",
rowIdsEnabled = false, tableState = TableState.NON_EMPTY),
TableSetupInfo(tableName = "target",
rowIdsEnabled = false, tableState = TableState.EMPTY)) {

cloneTable(
targetTableName = "target",
sourceTableName = "source",
tblProperties = s"'$rowTrackingFeatureName' = 'supported'" ::
s"'delta.minWriterVersion' = $TABLE_FEATURES_MIN_WRITER_VERSION" :: Nil)

val (targetLog, snapshot) = DeltaLog.forTableWithSnapshot(spark, TableIdentifier("target"))
assertRowIdsAreValid(targetLog)
assert(RowId.isSupported(snapshot.protocol))
assert(!RowId.isEnabled(snapshot.protocol, snapshot.metadata))
}
}

test("clone can disable row IDs using property override") {
withTables(
TableSetupInfo(tableName = "source",
rowIdsEnabled = true, tableState = TableState.NON_EMPTY),
TableSetupInfo(tableName = "target",
rowIdsEnabled = true, tableState = TableState.EMPTY)) {

cloneTable(
targetTableName = "target",
sourceTableName = "source",
tblProperties = s"'${DeltaConfigs.ROW_TRACKING_ENABLED.key}' = false" :: Nil)

val (targetLog, snapshot) = DeltaLog.forTableWithSnapshot(spark, TableIdentifier("target"))
assertRowIdsAreValid(targetLog)
assert(RowId.isSupported(snapshot.protocol))
assert(!RowId.isEnabled(snapshot.protocol, snapshot.metadata))
}
}

test("clone throws error when assigning row IDs without stats") {
withSQLConf(
DeltaSQLConf.DELTA_COLLECT_STATS.key -> "false") {
withTable("source", "target") {
withRowTrackingEnabled(enabled = false) {
spark.range(end = 10)
.write.format("delta").saveAsTable("source")
}
withRowTrackingEnabled(enabled = true) {
// enable stats to create table with row IDs
withSQLConf(DeltaSQLConf.DELTA_COLLECT_STATS.key -> "true") {
spark.range(0).write.format("delta").saveAsTable("target")
}
val err = intercept[DeltaIllegalStateException] {
cloneTable(targetTableName = "target", sourceTableName = "source")
}
checkError(err, "DELTA_ROW_ID_ASSIGNMENT_WITHOUT_STATS")
}
}
}
}

test("clone can enable row tracking on empty target using property override") {
withTables(
TableSetupInfo(tableName = "source",
rowIdsEnabled = false, tableState = TableState.NON_EMPTY),
TableSetupInfo(tableName = "target",
rowIdsEnabled = false, tableState = TableState.EMPTY)) {

cloneTable(
targetTableName = "target",
sourceTableName = "source",
tblProperties = s"'${DeltaConfigs.ROW_TRACKING_ENABLED.key}' = true" :: Nil)

val (targetLog, snapshot) = DeltaLog.forTableWithSnapshot(spark, TableIdentifier("target"))
assertRowIdsAreValid(targetLog)
assert(RowId.isSupported(snapshot.protocol))
assert(RowId.isEnabled(snapshot.protocol, snapshot.metadata))
}
}

test("clone assigns fresh row IDs for empty target") {
withTables(
TableSetupInfo(tableName = "source",
rowIdsEnabled = false, tableState = TableState.NON_EMPTY),
TableSetupInfo(tableName = "target",
rowIdsEnabled = true, tableState = TableState.EMPTY)) {
cloneTable(targetTableName = "target", sourceTableName = "source")

val (targetLog, snapshot) = DeltaLog.forTableWithSnapshot(spark, TableIdentifier("target"))
assertRowIdsAreValid(targetLog)
assert(RowId.isSupported(snapshot.protocol))
assert(!RowId.isEnabled(snapshot.protocol, snapshot.metadata))
}
}

test("clone can't assign row IDs for non-empty target") {
withTables(
TableSetupInfo(tableName = "source",
rowIdsEnabled = false, tableState = TableState.NON_EMPTY),
TableSetupInfo(tableName = "target",
rowIdsEnabled = true, tableState = TableState.NON_EMPTY)) {
assert(intercept[DeltaIllegalStateException] {
cloneTable(targetTableName = "target", sourceTableName = "source")
}.getErrorClass === "DELTA_UNSUPPORTED_NON_EMPTY_CLONE")
}
}

test("clone from source with row tracking enabled into existing empty target " +
"without row tracking enables row tracking") {
withTables(
TableSetupInfo(tableName = "source",
rowIdsEnabled = true, tableState = TableState.NON_EMPTY),
TableSetupInfo(tableName = "target",
rowIdsEnabled = false, tableState = TableState.EMPTY)) {
cloneTable(targetTableName = "target", sourceTableName = "source")

val (targetLog, snapshot) =
DeltaLog.forTableWithSnapshot(spark, TableIdentifier("target"))
assertRowIdsAreValid(targetLog)
assert(RowId.isSupported(targetLog.update().protocol))
assert(RowId.isEnabled(snapshot.protocol, snapshot.metadata))
}
}

def cloneTable(
targetTableName: String,
sourceTableName: String,
tblProperties: Seq[String] = Seq.empty): Unit = {
val tblPropertiesStr = if (tblProperties.nonEmpty) {
s"TBLPROPERTIES ${tblProperties.mkString("(", ",", ")")}"
} else {
""
}
sql(
s"""
|CREATE OR REPLACE TABLE $targetTableName
|SHALLOW CLONE $sourceTableName
|$tblPropertiesStr
|""".stripMargin)
}

final object TableState extends Enumeration {
type TableState = Value
val NON_EMPTY, EMPTY, NON_EXISTING = Value
}

case class TableSetupInfo(
tableName: String,
rowIdsEnabled: Boolean,
tableState: TableState.TableState)

private def withTables(tables: TableSetupInfo*)(f: => Unit): Unit = {
if (tables.isEmpty) {
f
} else {
val firstTable = tables.head
withTable(firstTable.tableName) {
firstTable.tableState match {
case TableState.NON_EMPTY | TableState.EMPTY =>
val rows = if (firstTable.tableState == TableState.NON_EMPTY) {
spark.range(start = 0, end = numRows, step = 1, numPartitions = 1)
} else {
spark.range(0)
}
withRowTrackingEnabled(enabled = firstTable.rowIdsEnabled) {
rows.write.format("delta").saveAsTable(firstTable.tableName)
}
case TableState.NON_EXISTING =>
}

withTables(tables.drop(1): _*)(f)
}
}
}
}
Loading

0 comments on commit 2fddb8b

Please sign in to comment.