Skip to content

Commit

Permalink
redurect implementation
Browse files Browse the repository at this point in the history
  • Loading branch information
kamcheungting-db committed Dec 1, 2024
1 parent 4d2c5cf commit d14f409
Show file tree
Hide file tree
Showing 10 changed files with 271 additions and 38 deletions.
6 changes: 6 additions & 0 deletions spark/src/main/resources/error/delta-error-classes.json
Original file line number Diff line number Diff line change
Expand Up @@ -2434,6 +2434,12 @@
],
"sqlState" : "0AKDD"
},
"DELTA_TABLE_UNRECOGNIZED_REDIRECT_SPEC" : {
"message" : [
"The Delta log contains unrecognized table redirect spec '<spec>'."
],
"sqlState" : "42704"
},
"DELTA_TARGET_TABLE_FINAL_SCHEMA_EMPTY" : {
"message" : [
"Target table final schema is empty."
Expand Down
15 changes: 11 additions & 4 deletions spark/src/main/scala/org/apache/spark/sql/delta/DeltaErrors.scala
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import org.apache.spark.sql.delta.hooks.AutoCompactType
import org.apache.spark.sql.delta.hooks.PostCommitHook
import org.apache.spark.sql.delta.metering.DeltaLogging
import org.apache.spark.sql.delta.redirect.NoRedirectRule
import org.apache.spark.sql.delta.redirect.RedirectSpec
import org.apache.spark.sql.delta.redirect.RedirectState
import org.apache.spark.sql.delta.schema.{DeltaInvariantViolationException, InvariantViolationException, SchemaUtils, UnsupportedDataTypeInfo}
import org.apache.spark.sql.delta.sources.DeltaSQLConf
Expand Down Expand Up @@ -345,19 +346,25 @@ trait DeltaErrorsBase
)
}

def unrecognizedRedirectSpec(spec: RedirectSpec): Throwable = {
new DeltaIllegalStateException(
errorClass = "DELTA_TABLE_UNRECOGNIZED_REDIRECT_SPEC",
messageParameters = Array(spec.toString)
)
}

def invalidRedirectStateTransition(
table: String,
oldState: RedirectState,
newState: RedirectState): Unit = {
new DeltaIllegalStateException(
throw new DeltaIllegalStateException(
errorClass = "DELTA_TABLE_INVALID_REDIRECT_STATE_TRANSITION",
messageParameters = Array(
table, table, oldState.name, newState.name)
messageParameters = Array(table, oldState.name, newState.name)
)
}

def invalidRemoveTableRedirect(table: String, currentState: RedirectState): Unit = {
new DeltaIllegalStateException(
throw new DeltaIllegalStateException(
errorClass = "DELTA_TABLE_INVALID_REMOVE_TABLE_REDIRECT",
messageParameters = Array(table, table, currentState.name)
)
Expand Down
16 changes: 11 additions & 5 deletions spark/src/main/scala/org/apache/spark/sql/delta/DeltaLog.scala
Original file line number Diff line number Diff line change
Expand Up @@ -915,14 +915,14 @@ object DeltaLog extends DeltaLogging {
// scalastyle:on deltahadoopconfiguration
val fs = rawPath.getFileSystem(hadoopConf)
val path = fs.makeQualified(rawPath)
def createDeltaLog(): DeltaLog = recordDeltaOperation(
def createDeltaLog(tablePath: Path = path): DeltaLog = recordDeltaOperation(
null,
"delta.log.create",
Map(TAG_TAHOE_PATH -> path.getParent.toString)) {
Map(TAG_TAHOE_PATH -> tablePath.getParent.toString)) {
AnalysisHelper.allowInvokingTransformsInAnalyzer {
new DeltaLog(
logPath = path,
dataPath = path.getParent,
logPath = tablePath,
dataPath = tablePath.getParent,
options = fileSystemOptions,
allOptions = options,
clock = clock,
Expand All @@ -948,7 +948,13 @@ object DeltaLog extends DeltaLogging {
}
}

val deltaLog = getDeltaLogFromCache()
var deltaLog = getDeltaLogFromCache()
if (spark.conf.get(DeltaSQLConf.ENABLE_TABLE_REDIRECT_FEATURE) && deltaLog.tableExists) {
RedirectFeature.withRedirectedLocation(spark, deltaLog, initialCatalogTable) { redirectLoc =>
deltaLog = createDeltaLog(redirectLoc)
getOrCreateCache(spark.sessionState.conf).put(redirectLoc -> fileSystemOptions, deltaLog)
}
}
if (Option(deltaLog.sparkContext.get).map(_.isStopped).getOrElse(true)) {
// Invalid the cached `DeltaLog` and create a new one because the `SparkContext` of the cached
// `DeltaLog` has been stopped.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1274,12 +1274,13 @@ trait OptimisticTransactionImpl extends TransactionalWrite
op: DeltaOperations.Operation,
redirectConfig: TableRedirectConfiguration
): Unit = {
if (redirectConfig.spec.isRedirectDest(snapshot.path.toUri.getPath)) return
// Find all rules that match with the current application name.
// If appName is not present, its no-redirect-rule are included.
// If appName is present, includes its no-redirect-rule only when appName
// matches with "spark.app.name".
val rulesOfMatchedApps = redirectConfig.noRedirectRules.filter { rule =>
rule.appName.forall(_.equalsIgnoreCase(spark.conf.get("spark.app.name")))
rule.appName.forall(_.equalsIgnoreCase(spark.appName))
}
// Determine whether any rule is satisfied the given operation.
val noRuleSatisfied = !rulesOfMatchedApps.exists(_.allowedOperations.contains(op.name))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,9 +87,21 @@ case class DeltaTableV2(
System.currentTimeMillis()
}

private var _deltaLog: Option[DeltaLog] = None

def deltaLog: DeltaLog = _deltaLog.getOrElse {
val newDeltaLog = computeDeltaLog()
_deltaLog = Some(newDeltaLog)
newDeltaLog
}

def refreshDeltaLog(): Unit = {
_deltaLog = None
}

// The loading of the DeltaLog is lazy in order to reduce the amount of FileSystem calls,
// in cases where we will fallback to the V1 behavior.
lazy val deltaLog: DeltaLog = {
private def computeDeltaLog(): DeltaLog = {
DeltaTableV2.withEnrichedUnsupportedTableException(catalogTable, tableIdentifier) {
// Ideally the table storage properties should always be the same as the options load from
// the Delta log, as Delta CREATE TABLE command guarantees it. However, custom catalogs such
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,11 @@ import org.apache.spark.sql.delta.commands.columnmapping.RemoveColumnMappingComm
import org.apache.spark.sql.delta.constraints.{CharVarcharConstraint, Constraints}
import org.apache.spark.sql.delta.coordinatedcommits.CoordinatedCommitsUtils
import org.apache.spark.sql.delta.logging.DeltaLogKeys
import org.apache.spark.sql.delta.redirect.RedirectFeature
import org.apache.spark.sql.delta.schema.{SchemaMergingUtils, SchemaUtils}
import org.apache.spark.sql.delta.schema.SchemaUtils.transformSchema
import org.apache.spark.sql.delta.sources.DeltaSQLConf
import org.apache.spark.sql.delta.sources.DeltaSQLConf.ENABLE_TABLE_REDIRECT_FEATURE
import org.apache.spark.sql.delta.stats.StatisticsCollection
import org.apache.hadoop.fs.Path

Expand Down Expand Up @@ -112,9 +114,16 @@ trait AlterDeltaTableCommand extends DeltaCommand {
case class AlterTableSetPropertiesDeltaCommand(
table: DeltaTableV2,
configuration: Map[String, String])
extends LeafRunnableCommand with AlterDeltaTableCommand with IgnoreCachedData {
extends LeafRunnableCommand
with AlterDeltaTableCommand
with IgnoreCachedData {

override def run(sparkSession: SparkSession): Seq[Row] = {
val updateRedirectProperty = !RedirectFeature.hasRedirectConfig(configuration)
if (updateRedirectProperty) {
// Invalidate the cache delta log and refresh DeltaTableV2 to trigger delta log rebuild.
table.refreshDeltaLog()
}
val deltaLog = table.deltaLog

val rowTrackingPropertyKey = DeltaConfigs.ROW_TRACKING_ENABLED.key
Expand Down Expand Up @@ -165,6 +174,8 @@ case class AlterTableSetPropertiesDeltaCommand(

CoordinatedCommitsUtils.validateConfigurationsForAlterTableSetPropertiesDeltaCommand(
metadata.configuration, filteredConfs)
// If table redirect feature is updated, validates its property.
RedirectFeature.validateTableRedirect(txn.snapshot, table.catalogTable, configuration)
val newMetadata = metadata.copy(
description = configuration.getOrElse(TableCatalog.PROP_COMMENT, metadata.description),
configuration = metadata.configuration ++ filteredConfs)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,12 @@

package org.apache.spark.sql.delta.redirect

import java.util.UUID

import scala.collection.JavaConverters._
import scala.reflect.ClassTag

// scalastyle:off import.ordering.noEmptyLine
import org.apache.spark.sql.delta.{
DeltaConfig,
DeltaConfigs,
Expand All @@ -28,13 +32,16 @@ import org.apache.spark.sql.delta.{
RedirectWriterOnlyFeature,
Snapshot
}
import org.apache.spark.sql.delta.DeltaLog.logPathFor
import org.apache.spark.sql.delta.actions.Metadata
import org.apache.spark.sql.delta.util.JsonUtils
import com.fasterxml.jackson.annotation.JsonIgnore
import com.fasterxml.jackson.annotation.JsonProperty
import com.fasterxml.jackson.databind.ObjectMapper
import com.fasterxml.jackson.module.scala.DefaultScalaModule
import org.apache.hadoop.fs.Path

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.catalog.CatalogTable

/**
Expand Down Expand Up @@ -106,7 +113,10 @@ case object DropRedirectInProgress extends RedirectState {
* This is the abstract class of the redirect specification, which stores the information
* of accessing the redirect destination table.
*/
abstract class RedirectSpec()
abstract class RedirectSpec {
def isRedirectDest(logPath: String): Boolean
def isRedirectSource(logPath: String): Boolean
}

/**
* The default redirect spec that is used for OSS delta.
Expand All @@ -120,7 +130,11 @@ abstract class RedirectSpec()
* }
* @param tablePath this is the path where stores the redirect destination table's location.
*/
class PathBasedRedirectSpec(val tablePath: String) extends RedirectSpec
class PathBasedRedirectSpec(val tablePath: String) extends RedirectSpec {
def isRedirectDest(logPath: String): Boolean = tablePath == logPath

def isRedirectSource(logPath: String): Boolean = !isRedirectDest(logPath)
}

object PathBasedRedirectSpec {
/**
Expand Down Expand Up @@ -221,6 +235,26 @@ case class TableRedirectConfiguration(
val isInProgressState: Boolean = {
redirectState == EnableRedirectInProgress || redirectState == DropRedirectInProgress
}

private def isNoRedirectApp(spark: SparkSession): Boolean = {
noRedirectRules.exists { rule =>
rule.appName.exists(_.equalsIgnoreCase(spark.conf.get("spark.app.name")))
}
}

def needRedirect(spark: SparkSession, logPath: Path): Boolean = {
!isNoRedirectApp(spark) && !isInProgressState && spec.isRedirectSource(logPath.toUri.getPath)
}

def getRedirectLocation(deltaLog: DeltaLog, spark: SparkSession): Path = {
spec match {
case spec: PathBasedRedirectSpec =>
val location = new Path(spec.tablePath)
val fs = location.getFileSystem(deltaLog.newDeltaHadoopConf())
fs.makeQualified(location)
case other => throw DeltaErrors.unrecognizedRedirectSpec(other)
}
}
}

/**
Expand All @@ -238,9 +272,7 @@ class TableRedirect(val config: DeltaConfig[Option[String]]) {
*/
def getRedirectConfiguration(deltaLogMetadata: Metadata): Option[TableRedirectConfiguration] = {
config.fromMetaData(deltaLogMetadata).map { propertyValue =>
val mapper = new ObjectMapper()
mapper.registerModule(DefaultScalaModule)
mapper.readValue(propertyValue, classOf[TableRedirectConfiguration])
RedirectFeature.parseRedirectConfiguration(propertyValue)
}
}

Expand Down Expand Up @@ -292,18 +324,7 @@ class TableRedirect(val config: DeltaConfig[Option[String]]) {

val currentConfig = currentConfigOpt.get
val redirectState = currentConfig.redirectState
state match {
case RedirectReady =>
if (redirectState != EnableRedirectInProgress && redirectState != RedirectReady) {
DeltaErrors.invalidRedirectStateTransition(tableIdent, redirectState, state)
}
case DropRedirectInProgress =>
if (redirectState != RedirectReady) {
DeltaErrors.invalidRedirectStateTransition(tableIdent, redirectState, state)
}
case _ =>
DeltaErrors.invalidRedirectStateTransition(tableIdent, redirectState, state)
}
RedirectFeature.validateStateTransition(tableIdent, redirectState, state)
val properties = generateRedirectMetadata(currentConfig.`type`, state, spec, noRedirectRules)
val newConfigs = txn.metadata.configuration ++ properties
val newMetadata = txn.metadata.copy(configuration = newConfigs)
Expand Down Expand Up @@ -400,6 +421,40 @@ object RedirectFeature {
RedirectWriterOnly.isFeatureSupported(snapshot)
}

private def getProperties(
spark: SparkSession,
deltaLog: DeltaLog,
initialCatalogTable: Option[CatalogTable]): Map[String, String] = {
deltaLog.update().getProperties.toMap
}

def withRedirectedLocation(
spark: SparkSession,
deltaLog: DeltaLog,
initialCatalogTable: Option[CatalogTable])(func: Path => Unit): Unit = {
val properties = getProperties(spark, deltaLog, initialCatalogTable)
val redirectConfiguration = getRedirectConfiguration(properties)
redirectConfiguration.foreach { redirectConfig =>
if (redirectConfig.needRedirect(spark, deltaLog.logPath)) {
val redirectLocation = redirectConfig.getRedirectLocation(deltaLog, spark)
func(redirectLocation)
}
}
}

def parseRedirectConfiguration(configString: String): TableRedirectConfiguration = {
val mapper = new ObjectMapper()
mapper.registerModule(DefaultScalaModule)
mapper.readValue(configString, classOf[TableRedirectConfiguration])
}

private def getRedirectConfiguration(
properties: Map[String, String]): Option[TableRedirectConfiguration] = {
properties.get(DeltaConfigs.REDIRECT_READER_WRITER.key)
.orElse(properties.get(DeltaConfigs.REDIRECT_WRITER_ONLY.key))
.map(parseRedirectConfiguration)
}

/**
* Determine whether the operation `op` updates the existing redirect-reader-writer or
* redirect-writer-only table property of a table with `snapshot`.
Expand All @@ -425,4 +480,52 @@ object RedirectFeature {
RedirectReaderWriter.getRedirectConfiguration(snapshot.metadata)
}
}

def hasRedirectConfig(configs: Map[String, String]): Boolean =
getRedirectConfiguration(configs).isDefined

def validateTableRedirect(
snapshot: Snapshot,
catalogTable: Option[CatalogTable],
configs: Map[String, String]
): Unit = {
// Extract redirect configuration from the provided configs
val redirectConfigOpt = configs
.get(DeltaConfigs.REDIRECT_READER_WRITER.key).map(parseRedirectConfiguration)
redirectConfigOpt.foreach { redirectConfig =>
val currentRedirectConfigOpt = getRedirectConfiguration(snapshot)
val identifier = catalogTable
.map(_.identifier.quotedString)
.getOrElse(snapshot.deltaLog.logPath.toString)
val newState = redirectConfig.redirectState
// Validate state transitions based on current and new states
currentRedirectConfigOpt match {
case Some(currentConfig) =>
validateStateTransition(identifier, currentConfig.redirectState, newState)
case None if newState == DropRedirectInProgress =>
DeltaErrors.invalidRedirectStateTransition(identifier, newState, DropRedirectInProgress)
case _ => // No action required for valid transitions
}
}
}

// Helper method to validate state transitions
def validateStateTransition(
identifier: String,
currentState: RedirectState,
newState: RedirectState
): Unit = {
(currentState, newState) match {
case (state, RedirectReady) =>
if (state == DropRedirectInProgress) {
DeltaErrors.invalidRedirectStateTransition(identifier, state, newState)
}
case (state, DropRedirectInProgress) =>
if (state != RedirectReady) {
DeltaErrors.invalidRedirectStateTransition(identifier, state, newState)
}
case (state, _) =>
DeltaErrors.invalidRedirectStateTransition(identifier, state, newState)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2122,6 +2122,13 @@ trait DeltaSQLConfBase {
.booleanConf
.createWithDefault(false)

val ENABLE_TABLE_REDIRECT_FEATURE =
buildConf("enableTableRedirectFeature")
.doc("True if disabling the table redirect feature.")
.internal()
.booleanConf
.createWithDefault(false)

val DELTA_OPTIMIZE_WRITE_MAX_SHUFFLE_PARTITIONS =
buildConf("optimizeWrite.maxShufflePartitions")
.internal()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -599,7 +599,7 @@ class DeltaLogSuite extends QueryTest
assert(snapshot.version === 0)

val deltaLog2 = DeltaLog.forTable(spark, path)
assert(deltaLog2.snapshot.version === 0) // This shouldn't update
assert(deltaLog2.snapshot.version === 0)
val (_, snapshot2) = DeltaLog.forTableWithSnapshot(spark, path)
assert(snapshot2.version === 1) // This should get the latest snapshot
}
Expand Down
Loading

0 comments on commit d14f409

Please sign in to comment.