From d14f409d43e284bf097ca944c40eb0c8f9c5650f Mon Sep 17 00:00:00 2001 From: Kam Cheung Ting Date: Sun, 1 Dec 2024 00:23:35 -0800 Subject: [PATCH] redurect implementation --- .../resources/error/delta-error-classes.json | 6 + .../apache/spark/sql/delta/DeltaErrors.scala | 15 +- .../org/apache/spark/sql/delta/DeltaLog.scala | 16 +- .../sql/delta/OptimisticTransaction.scala | 3 +- .../sql/delta/catalog/DeltaTableV2.scala | 14 +- .../commands/alterDeltaTableCommands.scala | 13 +- .../sql/delta/redirect/TableRedirect.scala | 137 +++++++++++++++--- .../sql/delta/sources/DeltaSQLConf.scala | 7 + .../spark/sql/delta/DeltaLogSuite.scala | 2 +- .../spark/sql/delta/TableRedirectSuite.scala | 96 +++++++++++- 10 files changed, 271 insertions(+), 38 deletions(-) diff --git a/spark/src/main/resources/error/delta-error-classes.json b/spark/src/main/resources/error/delta-error-classes.json index 6906fddd141..d09f68c8bc2 100644 --- a/spark/src/main/resources/error/delta-error-classes.json +++ b/spark/src/main/resources/error/delta-error-classes.json @@ -2434,6 +2434,12 @@ ], "sqlState" : "0AKDD" }, + "DELTA_TABLE_UNRECOGNIZED_REDIRECT_SPEC" : { + "message" : [ + "The Delta log contains unrecognized table redirect spec ''." + ], + "sqlState" : "42704" + }, "DELTA_TARGET_TABLE_FINAL_SCHEMA_EMPTY" : { "message" : [ "Target table final schema is empty." diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/DeltaErrors.scala b/spark/src/main/scala/org/apache/spark/sql/delta/DeltaErrors.scala index 82efebb44ab..bbf0bf4b02c 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/DeltaErrors.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/DeltaErrors.scala @@ -32,6 +32,7 @@ import org.apache.spark.sql.delta.hooks.AutoCompactType import org.apache.spark.sql.delta.hooks.PostCommitHook import org.apache.spark.sql.delta.metering.DeltaLogging import org.apache.spark.sql.delta.redirect.NoRedirectRule +import org.apache.spark.sql.delta.redirect.RedirectSpec import org.apache.spark.sql.delta.redirect.RedirectState import org.apache.spark.sql.delta.schema.{DeltaInvariantViolationException, InvariantViolationException, SchemaUtils, UnsupportedDataTypeInfo} import org.apache.spark.sql.delta.sources.DeltaSQLConf @@ -345,19 +346,25 @@ trait DeltaErrorsBase ) } + def unrecognizedRedirectSpec(spec: RedirectSpec): Throwable = { + new DeltaIllegalStateException( + errorClass = "DELTA_TABLE_UNRECOGNIZED_REDIRECT_SPEC", + messageParameters = Array(spec.toString) + ) + } + def invalidRedirectStateTransition( table: String, oldState: RedirectState, newState: RedirectState): Unit = { - new DeltaIllegalStateException( + throw new DeltaIllegalStateException( errorClass = "DELTA_TABLE_INVALID_REDIRECT_STATE_TRANSITION", - messageParameters = Array( - table, table, oldState.name, newState.name) + messageParameters = Array(table, oldState.name, newState.name) ) } def invalidRemoveTableRedirect(table: String, currentState: RedirectState): Unit = { - new DeltaIllegalStateException( + throw new DeltaIllegalStateException( errorClass = "DELTA_TABLE_INVALID_REMOVE_TABLE_REDIRECT", messageParameters = Array(table, table, currentState.name) ) diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/DeltaLog.scala b/spark/src/main/scala/org/apache/spark/sql/delta/DeltaLog.scala index 991f9597ad1..af23c0cc5ac 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/DeltaLog.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/DeltaLog.scala @@ -915,14 +915,14 @@ object DeltaLog extends DeltaLogging { // scalastyle:on deltahadoopconfiguration val fs = rawPath.getFileSystem(hadoopConf) val path = fs.makeQualified(rawPath) - def createDeltaLog(): DeltaLog = recordDeltaOperation( + def createDeltaLog(tablePath: Path = path): DeltaLog = recordDeltaOperation( null, "delta.log.create", - Map(TAG_TAHOE_PATH -> path.getParent.toString)) { + Map(TAG_TAHOE_PATH -> tablePath.getParent.toString)) { AnalysisHelper.allowInvokingTransformsInAnalyzer { new DeltaLog( - logPath = path, - dataPath = path.getParent, + logPath = tablePath, + dataPath = tablePath.getParent, options = fileSystemOptions, allOptions = options, clock = clock, @@ -948,7 +948,13 @@ object DeltaLog extends DeltaLogging { } } - val deltaLog = getDeltaLogFromCache() + var deltaLog = getDeltaLogFromCache() + if (spark.conf.get(DeltaSQLConf.ENABLE_TABLE_REDIRECT_FEATURE) && deltaLog.tableExists) { + RedirectFeature.withRedirectedLocation(spark, deltaLog, initialCatalogTable) { redirectLoc => + deltaLog = createDeltaLog(redirectLoc) + getOrCreateCache(spark.sessionState.conf).put(redirectLoc -> fileSystemOptions, deltaLog) + } + } if (Option(deltaLog.sparkContext.get).map(_.isStopped).getOrElse(true)) { // Invalid the cached `DeltaLog` and create a new one because the `SparkContext` of the cached // `DeltaLog` has been stopped. diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/OptimisticTransaction.scala b/spark/src/main/scala/org/apache/spark/sql/delta/OptimisticTransaction.scala index dfb23a8e0fb..38cf68d1dfc 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/OptimisticTransaction.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/OptimisticTransaction.scala @@ -1274,12 +1274,13 @@ trait OptimisticTransactionImpl extends TransactionalWrite op: DeltaOperations.Operation, redirectConfig: TableRedirectConfiguration ): Unit = { + if (redirectConfig.spec.isRedirectDest(snapshot.path.toUri.getPath)) return // Find all rules that match with the current application name. // If appName is not present, its no-redirect-rule are included. // If appName is present, includes its no-redirect-rule only when appName // matches with "spark.app.name". val rulesOfMatchedApps = redirectConfig.noRedirectRules.filter { rule => - rule.appName.forall(_.equalsIgnoreCase(spark.conf.get("spark.app.name"))) + rule.appName.forall(_.equalsIgnoreCase(spark.appName)) } // Determine whether any rule is satisfied the given operation. val noRuleSatisfied = !rulesOfMatchedApps.exists(_.allowedOperations.contains(op.name)) diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/catalog/DeltaTableV2.scala b/spark/src/main/scala/org/apache/spark/sql/delta/catalog/DeltaTableV2.scala index ac35d2ae9ed..ddffd01648f 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/catalog/DeltaTableV2.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/catalog/DeltaTableV2.scala @@ -87,9 +87,21 @@ case class DeltaTableV2( System.currentTimeMillis() } + private var _deltaLog: Option[DeltaLog] = None + + def deltaLog: DeltaLog = _deltaLog.getOrElse { + val newDeltaLog = computeDeltaLog() + _deltaLog = Some(newDeltaLog) + newDeltaLog + } + + def refreshDeltaLog(): Unit = { + _deltaLog = None + } + // The loading of the DeltaLog is lazy in order to reduce the amount of FileSystem calls, // in cases where we will fallback to the V1 behavior. - lazy val deltaLog: DeltaLog = { + private def computeDeltaLog(): DeltaLog = { DeltaTableV2.withEnrichedUnsupportedTableException(catalogTable, tableIdentifier) { // Ideally the table storage properties should always be the same as the options load from // the Delta log, as Delta CREATE TABLE command guarantees it. However, custom catalogs such diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/commands/alterDeltaTableCommands.scala b/spark/src/main/scala/org/apache/spark/sql/delta/commands/alterDeltaTableCommands.scala index 95c0810b55f..e7ccd7a404d 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/commands/alterDeltaTableCommands.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/commands/alterDeltaTableCommands.scala @@ -33,9 +33,11 @@ import org.apache.spark.sql.delta.commands.columnmapping.RemoveColumnMappingComm import org.apache.spark.sql.delta.constraints.{CharVarcharConstraint, Constraints} import org.apache.spark.sql.delta.coordinatedcommits.CoordinatedCommitsUtils import org.apache.spark.sql.delta.logging.DeltaLogKeys +import org.apache.spark.sql.delta.redirect.RedirectFeature import org.apache.spark.sql.delta.schema.{SchemaMergingUtils, SchemaUtils} import org.apache.spark.sql.delta.schema.SchemaUtils.transformSchema import org.apache.spark.sql.delta.sources.DeltaSQLConf +import org.apache.spark.sql.delta.sources.DeltaSQLConf.ENABLE_TABLE_REDIRECT_FEATURE import org.apache.spark.sql.delta.stats.StatisticsCollection import org.apache.hadoop.fs.Path @@ -112,9 +114,16 @@ trait AlterDeltaTableCommand extends DeltaCommand { case class AlterTableSetPropertiesDeltaCommand( table: DeltaTableV2, configuration: Map[String, String]) - extends LeafRunnableCommand with AlterDeltaTableCommand with IgnoreCachedData { + extends LeafRunnableCommand + with AlterDeltaTableCommand + with IgnoreCachedData { override def run(sparkSession: SparkSession): Seq[Row] = { + val updateRedirectProperty = !RedirectFeature.hasRedirectConfig(configuration) + if (updateRedirectProperty) { + // Invalidate the cache delta log and refresh DeltaTableV2 to trigger delta log rebuild. + table.refreshDeltaLog() + } val deltaLog = table.deltaLog val rowTrackingPropertyKey = DeltaConfigs.ROW_TRACKING_ENABLED.key @@ -165,6 +174,8 @@ case class AlterTableSetPropertiesDeltaCommand( CoordinatedCommitsUtils.validateConfigurationsForAlterTableSetPropertiesDeltaCommand( metadata.configuration, filteredConfs) + // If table redirect feature is updated, validates its property. + RedirectFeature.validateTableRedirect(txn.snapshot, table.catalogTable, configuration) val newMetadata = metadata.copy( description = configuration.getOrElse(TableCatalog.PROP_COMMENT, metadata.description), configuration = metadata.configuration ++ filteredConfs) diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/redirect/TableRedirect.scala b/spark/src/main/scala/org/apache/spark/sql/delta/redirect/TableRedirect.scala index 4de0a92e21c..5710a732914 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/redirect/TableRedirect.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/redirect/TableRedirect.scala @@ -16,8 +16,12 @@ package org.apache.spark.sql.delta.redirect +import java.util.UUID + +import scala.collection.JavaConverters._ import scala.reflect.ClassTag +// scalastyle:off import.ordering.noEmptyLine import org.apache.spark.sql.delta.{ DeltaConfig, DeltaConfigs, @@ -28,13 +32,16 @@ import org.apache.spark.sql.delta.{ RedirectWriterOnlyFeature, Snapshot } +import org.apache.spark.sql.delta.DeltaLog.logPathFor import org.apache.spark.sql.delta.actions.Metadata import org.apache.spark.sql.delta.util.JsonUtils import com.fasterxml.jackson.annotation.JsonIgnore import com.fasterxml.jackson.annotation.JsonProperty import com.fasterxml.jackson.databind.ObjectMapper import com.fasterxml.jackson.module.scala.DefaultScalaModule +import org.apache.hadoop.fs.Path +import org.apache.spark.sql.SparkSession import org.apache.spark.sql.catalyst.catalog.CatalogTable /** @@ -106,7 +113,10 @@ case object DropRedirectInProgress extends RedirectState { * This is the abstract class of the redirect specification, which stores the information * of accessing the redirect destination table. */ -abstract class RedirectSpec() +abstract class RedirectSpec { + def isRedirectDest(logPath: String): Boolean + def isRedirectSource(logPath: String): Boolean +} /** * The default redirect spec that is used for OSS delta. @@ -120,7 +130,11 @@ abstract class RedirectSpec() * } * @param tablePath this is the path where stores the redirect destination table's location. */ -class PathBasedRedirectSpec(val tablePath: String) extends RedirectSpec +class PathBasedRedirectSpec(val tablePath: String) extends RedirectSpec { + def isRedirectDest(logPath: String): Boolean = tablePath == logPath + + def isRedirectSource(logPath: String): Boolean = !isRedirectDest(logPath) +} object PathBasedRedirectSpec { /** @@ -221,6 +235,26 @@ case class TableRedirectConfiguration( val isInProgressState: Boolean = { redirectState == EnableRedirectInProgress || redirectState == DropRedirectInProgress } + + private def isNoRedirectApp(spark: SparkSession): Boolean = { + noRedirectRules.exists { rule => + rule.appName.exists(_.equalsIgnoreCase(spark.conf.get("spark.app.name"))) + } + } + + def needRedirect(spark: SparkSession, logPath: Path): Boolean = { + !isNoRedirectApp(spark) && !isInProgressState && spec.isRedirectSource(logPath.toUri.getPath) + } + + def getRedirectLocation(deltaLog: DeltaLog, spark: SparkSession): Path = { + spec match { + case spec: PathBasedRedirectSpec => + val location = new Path(spec.tablePath) + val fs = location.getFileSystem(deltaLog.newDeltaHadoopConf()) + fs.makeQualified(location) + case other => throw DeltaErrors.unrecognizedRedirectSpec(other) + } + } } /** @@ -238,9 +272,7 @@ class TableRedirect(val config: DeltaConfig[Option[String]]) { */ def getRedirectConfiguration(deltaLogMetadata: Metadata): Option[TableRedirectConfiguration] = { config.fromMetaData(deltaLogMetadata).map { propertyValue => - val mapper = new ObjectMapper() - mapper.registerModule(DefaultScalaModule) - mapper.readValue(propertyValue, classOf[TableRedirectConfiguration]) + RedirectFeature.parseRedirectConfiguration(propertyValue) } } @@ -292,18 +324,7 @@ class TableRedirect(val config: DeltaConfig[Option[String]]) { val currentConfig = currentConfigOpt.get val redirectState = currentConfig.redirectState - state match { - case RedirectReady => - if (redirectState != EnableRedirectInProgress && redirectState != RedirectReady) { - DeltaErrors.invalidRedirectStateTransition(tableIdent, redirectState, state) - } - case DropRedirectInProgress => - if (redirectState != RedirectReady) { - DeltaErrors.invalidRedirectStateTransition(tableIdent, redirectState, state) - } - case _ => - DeltaErrors.invalidRedirectStateTransition(tableIdent, redirectState, state) - } + RedirectFeature.validateStateTransition(tableIdent, redirectState, state) val properties = generateRedirectMetadata(currentConfig.`type`, state, spec, noRedirectRules) val newConfigs = txn.metadata.configuration ++ properties val newMetadata = txn.metadata.copy(configuration = newConfigs) @@ -400,6 +421,40 @@ object RedirectFeature { RedirectWriterOnly.isFeatureSupported(snapshot) } + private def getProperties( + spark: SparkSession, + deltaLog: DeltaLog, + initialCatalogTable: Option[CatalogTable]): Map[String, String] = { + deltaLog.update().getProperties.toMap + } + + def withRedirectedLocation( + spark: SparkSession, + deltaLog: DeltaLog, + initialCatalogTable: Option[CatalogTable])(func: Path => Unit): Unit = { + val properties = getProperties(spark, deltaLog, initialCatalogTable) + val redirectConfiguration = getRedirectConfiguration(properties) + redirectConfiguration.foreach { redirectConfig => + if (redirectConfig.needRedirect(spark, deltaLog.logPath)) { + val redirectLocation = redirectConfig.getRedirectLocation(deltaLog, spark) + func(redirectLocation) + } + } + } + + def parseRedirectConfiguration(configString: String): TableRedirectConfiguration = { + val mapper = new ObjectMapper() + mapper.registerModule(DefaultScalaModule) + mapper.readValue(configString, classOf[TableRedirectConfiguration]) + } + + private def getRedirectConfiguration( + properties: Map[String, String]): Option[TableRedirectConfiguration] = { + properties.get(DeltaConfigs.REDIRECT_READER_WRITER.key) + .orElse(properties.get(DeltaConfigs.REDIRECT_WRITER_ONLY.key)) + .map(parseRedirectConfiguration) + } + /** * Determine whether the operation `op` updates the existing redirect-reader-writer or * redirect-writer-only table property of a table with `snapshot`. @@ -425,4 +480,52 @@ object RedirectFeature { RedirectReaderWriter.getRedirectConfiguration(snapshot.metadata) } } + + def hasRedirectConfig(configs: Map[String, String]): Boolean = + getRedirectConfiguration(configs).isDefined + + def validateTableRedirect( + snapshot: Snapshot, + catalogTable: Option[CatalogTable], + configs: Map[String, String] + ): Unit = { + // Extract redirect configuration from the provided configs + val redirectConfigOpt = configs + .get(DeltaConfigs.REDIRECT_READER_WRITER.key).map(parseRedirectConfiguration) + redirectConfigOpt.foreach { redirectConfig => + val currentRedirectConfigOpt = getRedirectConfiguration(snapshot) + val identifier = catalogTable + .map(_.identifier.quotedString) + .getOrElse(snapshot.deltaLog.logPath.toString) + val newState = redirectConfig.redirectState + // Validate state transitions based on current and new states + currentRedirectConfigOpt match { + case Some(currentConfig) => + validateStateTransition(identifier, currentConfig.redirectState, newState) + case None if newState == DropRedirectInProgress => + DeltaErrors.invalidRedirectStateTransition(identifier, newState, DropRedirectInProgress) + case _ => // No action required for valid transitions + } + } + } + + // Helper method to validate state transitions + def validateStateTransition( + identifier: String, + currentState: RedirectState, + newState: RedirectState + ): Unit = { + (currentState, newState) match { + case (state, RedirectReady) => + if (state == DropRedirectInProgress) { + DeltaErrors.invalidRedirectStateTransition(identifier, state, newState) + } + case (state, DropRedirectInProgress) => + if (state != RedirectReady) { + DeltaErrors.invalidRedirectStateTransition(identifier, state, newState) + } + case (state, _) => + DeltaErrors.invalidRedirectStateTransition(identifier, state, newState) + } + } } diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/sources/DeltaSQLConf.scala b/spark/src/main/scala/org/apache/spark/sql/delta/sources/DeltaSQLConf.scala index fc99ea8e2b4..735e360a982 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/sources/DeltaSQLConf.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/sources/DeltaSQLConf.scala @@ -2122,6 +2122,13 @@ trait DeltaSQLConfBase { .booleanConf .createWithDefault(false) + val ENABLE_TABLE_REDIRECT_FEATURE = + buildConf("enableTableRedirectFeature") + .doc("True if disabling the table redirect feature.") + .internal() + .booleanConf + .createWithDefault(false) + val DELTA_OPTIMIZE_WRITE_MAX_SHUFFLE_PARTITIONS = buildConf("optimizeWrite.maxShufflePartitions") .internal() diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/DeltaLogSuite.scala b/spark/src/test/scala/org/apache/spark/sql/delta/DeltaLogSuite.scala index 1423d85db0e..539b65cefbc 100644 --- a/spark/src/test/scala/org/apache/spark/sql/delta/DeltaLogSuite.scala +++ b/spark/src/test/scala/org/apache/spark/sql/delta/DeltaLogSuite.scala @@ -599,7 +599,7 @@ class DeltaLogSuite extends QueryTest assert(snapshot.version === 0) val deltaLog2 = DeltaLog.forTable(spark, path) - assert(deltaLog2.snapshot.version === 0) // This shouldn't update + assert(deltaLog2.snapshot.version === 0) val (_, snapshot2) = DeltaLog.forTableWithSnapshot(spark, path) assert(snapshot2.version === 1) // This should get the latest snapshot } diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/TableRedirectSuite.scala b/spark/src/test/scala/org/apache/spark/sql/delta/TableRedirectSuite.scala index f8ccc1ee708..10f7045261c 100644 --- a/spark/src/test/scala/org/apache/spark/sql/delta/TableRedirectSuite.scala +++ b/spark/src/test/scala/org/apache/spark/sql/delta/TableRedirectSuite.scala @@ -18,6 +18,7 @@ package org.apache.spark.sql.delta import java.io.File +import com.databricks.sql.managedcatalog.{ManagedCatalogSessionCatalog, TestMode} import org.apache.spark.sql.delta.coordinatedcommits.CoordinatedCommitsBaseSuite import org.apache.spark.sql.delta.redirect.{ DropRedirectInProgress, @@ -26,15 +27,19 @@ import org.apache.spark.sql.delta.redirect.{ PathBasedRedirectSpec, RedirectReaderWriter, RedirectReady, + RedirectSpec, RedirectState, RedirectWriterOnly, - TableRedirect + TableRedirect, + TableRedirectConfiguration } import org.apache.spark.sql.delta.sources.DeltaSQLConf import org.apache.spark.sql.delta.test.{DeltaSQLCommandTest, DeltaSQLTestUtils} +import org.apache.spark.sql.delta.util.JsonUtils +import org.apache.commons.text.StringEscapeUtils import org.apache.hadoop.fs.Path -import org.apache.spark.sql.{QueryTest, SaveMode, SparkSession} +import org.apache.spark.sql.{QueryTest, SaveMode} import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.catalog.CatalogTable import org.apache.spark.sql.test.SharedSparkSession @@ -84,15 +89,20 @@ class TableRedirectSuite extends QueryTest } } - def redirectTest(label: String)(f: (DeltaLog, File, File, CatalogTable) => Unit): Unit = { + def redirectTest( + label: String, + accessSource: Boolean = false + )(f: (DeltaLog, File, File, CatalogTable) => Unit): Unit = { test(s"basic table redirect: $label") { withTempDir { sourceTablePath => withTempDir { destTablePath => - withTable("t1") { - sql(s"CREATE external TABLE t1(c0 long)USING delta LOCATION '$sourceTablePath';") - val catalogTable = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t1")) - val deltaLog = DeltaLog.forTable(spark, new Path(sourceTablePath.getCanonicalPath)) - f(deltaLog, sourceTablePath, destTablePath, catalogTable) + withSQLConf(DeltaSQLConf.ENABLE_TABLE_REDIRECT_FEATURE.key -> accessSource.toString) { + withTable("t1", "t2") { + sql(s"CREATE external TABLE t1(c0 long) USING delta LOCATION '$sourceTablePath';") + val catalogTable = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t1")) + val deltaLog = DeltaLog.forTable(spark, new Path(sourceTablePath.getCanonicalPath)) + f(deltaLog, sourceTablePath, destTablePath, catalogTable) + } } } } @@ -293,5 +303,75 @@ class TableRedirectSuite extends QueryTest sql(s"delete from delta.`$source` where c0 = 1") } } + + def alterRedirect( + table: String, + redirectType: String, + redirectState: RedirectState, + spec: RedirectSpec, + noRedirectRules: Set[NoRedirectRule] + ): Unit = { + val enableConfig = TableRedirectConfiguration( + redirectType, + redirectState.name, + JsonUtils.toJson(spec), + noRedirectRules + ) + val enableConfigJson = StringEscapeUtils.escapeJson(JsonUtils.toJson(enableConfig)) + sql(s"alter table $table set TBLPROPERTIES('$featureName' = '$enableConfigJson')") + } + + redirectTest(s"Redirect $featureName: modify table property", accessSource = true) { + case (_, source, dest, catalogTable) => + if (!TestMode.unityCatalogTestsEnabled) { + val redirectSpec = new PathBasedRedirectSpec(dest.getCanonicalPath + "/_delta_log") + val redirectType = PathBasedRedirectSpec.REDIRECT_TYPE + val destPath = dest.toString + val srcPath = source.toString + sql(s"CREATE external TABLE t2(c0 long) USING delta LOCATION '$dest';") + sql(s"insert into t2 values(1),(2),(3),(4),(5)") + val destTable = s"delta.`$destPath`" + val srcTable = s"delta.`$srcPath`" + // Initialize the redirection by moving table into EnableRedirectInProgress state. + alterRedirect(srcTable, redirectType, EnableRedirectInProgress, redirectSpec, Set.empty) + alterRedirect(destTable, redirectType, EnableRedirectInProgress, redirectSpec, Set.empty) + // Delta log is cloned, then moves both redirect destination table and redirect source + // table to RedirectReady state. + alterRedirect(srcTable, redirectType, RedirectReady, redirectSpec, Set.empty) + alterRedirect(destTable, redirectType, RedirectReady, redirectSpec, Set.empty) + sql(s"insert into $srcTable values(1), (2), (3)") + sql(s"insert into $destTable values(1), (2), (3)") + sql(s"insert into t1 values(1), (2), (3)") + sql(s"insert into t2 values(1), (2), (3)") + + var result = sql("select * from t1").collect() + assert(result.length == 17) + result = sql("select * from t2").collect() + assert(result.length == 17) + result = sql(s"select * from $srcTable ").collect() + assert(result.length == 17) + result = sql(s"select * from $destTable ").collect() + assert(result.length == 17) + val root = new Path(catalogTable.location) + val fs = root.getFileSystem(spark.sessionState.newHadoopConf()) + var files = fs.listStatus(new Path(srcPath + "/_delta_log")) + .filter(_.getPath.toString.endsWith(".json")) + assert(files.length == 3) + files = fs.listStatus(new Path(destPath + "/_delta_log")) + .filter(_.getPath.toString.endsWith(".json")) + assert(files.length == 9) + // Drop redirection by moving both redirect destination table and redirect source table to + // DropRedirectInProgress. + alterRedirect(destTable, redirectType, DropRedirectInProgress, redirectSpec, Set.empty) + alterRedirect(srcTable, redirectType, DropRedirectInProgress, redirectSpec, Set.empty) + // Remove table redirect feature from redirect source table and verify table content. + sql(s"alter table $srcTable unset TBLPROPERTIES('$featureName')") + result = sql("select * from t1").collect() + assert(result.length == 0) + sql("insert into t1 values(1), (2), (3), (4)") + result = sql("select * from t1").collect() + assert(result.length == 4) + } + } } }