Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

redurect implementation #3911

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions spark/src/main/resources/error/delta-error-classes.json
Original file line number Diff line number Diff line change
Expand Up @@ -2434,6 +2434,12 @@
],
"sqlState" : "0AKDD"
},
"DELTA_TABLE_UNRECOGNIZED_REDIRECT_SPEC" : {
"message" : [
"The Delta log contains unrecognized table redirect spec '<spec>'."
],
"sqlState" : "42704"
},
"DELTA_TARGET_TABLE_FINAL_SCHEMA_EMPTY" : {
"message" : [
"Target table final schema is empty."
Expand Down
15 changes: 11 additions & 4 deletions spark/src/main/scala/org/apache/spark/sql/delta/DeltaErrors.scala
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import org.apache.spark.sql.delta.hooks.AutoCompactType
import org.apache.spark.sql.delta.hooks.PostCommitHook
import org.apache.spark.sql.delta.metering.DeltaLogging
import org.apache.spark.sql.delta.redirect.NoRedirectRule
import org.apache.spark.sql.delta.redirect.RedirectSpec
import org.apache.spark.sql.delta.redirect.RedirectState
import org.apache.spark.sql.delta.schema.{DeltaInvariantViolationException, InvariantViolationException, SchemaUtils, UnsupportedDataTypeInfo}
import org.apache.spark.sql.delta.sources.DeltaSQLConf
Expand Down Expand Up @@ -345,19 +346,25 @@ trait DeltaErrorsBase
)
}

def unrecognizedRedirectSpec(spec: RedirectSpec): Throwable = {
new DeltaIllegalStateException(
errorClass = "DELTA_TABLE_UNRECOGNIZED_REDIRECT_SPEC",
messageParameters = Array(spec.toString)
)
}

def invalidRedirectStateTransition(
table: String,
oldState: RedirectState,
newState: RedirectState): Unit = {
new DeltaIllegalStateException(
throw new DeltaIllegalStateException(
errorClass = "DELTA_TABLE_INVALID_REDIRECT_STATE_TRANSITION",
messageParameters = Array(
table, table, oldState.name, newState.name)
messageParameters = Array(table, oldState.name, newState.name)
)
}

def invalidRemoveTableRedirect(table: String, currentState: RedirectState): Unit = {
new DeltaIllegalStateException(
throw new DeltaIllegalStateException(
errorClass = "DELTA_TABLE_INVALID_REMOVE_TABLE_REDIRECT",
messageParameters = Array(table, table, currentState.name)
)
Expand Down
16 changes: 11 additions & 5 deletions spark/src/main/scala/org/apache/spark/sql/delta/DeltaLog.scala
Original file line number Diff line number Diff line change
Expand Up @@ -915,14 +915,14 @@ object DeltaLog extends DeltaLogging {
// scalastyle:on deltahadoopconfiguration
val fs = rawPath.getFileSystem(hadoopConf)
val path = fs.makeQualified(rawPath)
def createDeltaLog(): DeltaLog = recordDeltaOperation(
def createDeltaLog(tablePath: Path = path): DeltaLog = recordDeltaOperation(
null,
"delta.log.create",
Map(TAG_TAHOE_PATH -> path.getParent.toString)) {
Map(TAG_TAHOE_PATH -> tablePath.getParent.toString)) {
AnalysisHelper.allowInvokingTransformsInAnalyzer {
new DeltaLog(
logPath = path,
dataPath = path.getParent,
logPath = tablePath,
dataPath = tablePath.getParent,
options = fileSystemOptions,
allOptions = options,
clock = clock,
Expand All @@ -948,7 +948,13 @@ object DeltaLog extends DeltaLogging {
}
}

val deltaLog = getDeltaLogFromCache()
var deltaLog = getDeltaLogFromCache()
if (spark.conf.get(DeltaSQLConf.ENABLE_TABLE_REDIRECT_FEATURE) && deltaLog.tableExists) {
RedirectFeature.withRedirectedLocation(spark, deltaLog, initialCatalogTable) { redirectLoc =>
deltaLog = createDeltaLog(redirectLoc)
getOrCreateCache(spark.sessionState.conf).put(redirectLoc -> fileSystemOptions, deltaLog)
}
}
if (Option(deltaLog.sparkContext.get).map(_.isStopped).getOrElse(true)) {
// Invalid the cached `DeltaLog` and create a new one because the `SparkContext` of the cached
// `DeltaLog` has been stopped.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1274,12 +1274,13 @@ trait OptimisticTransactionImpl extends TransactionalWrite
op: DeltaOperations.Operation,
redirectConfig: TableRedirectConfiguration
): Unit = {
if (redirectConfig.spec.isRedirectDest(snapshot.path.toUri.getPath)) return
// Find all rules that match with the current application name.
// If appName is not present, its no-redirect-rule are included.
// If appName is present, includes its no-redirect-rule only when appName
// matches with "spark.app.name".
val rulesOfMatchedApps = redirectConfig.noRedirectRules.filter { rule =>
rule.appName.forall(_.equalsIgnoreCase(spark.conf.get("spark.app.name")))
rule.appName.forall(_.equalsIgnoreCase(spark.appName))
}
// Determine whether any rule is satisfied the given operation.
val noRuleSatisfied = !rulesOfMatchedApps.exists(_.allowedOperations.contains(op.name))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,9 +87,21 @@ case class DeltaTableV2(
System.currentTimeMillis()
}

private var _deltaLog: Option[DeltaLog] = None

def deltaLog: DeltaLog = _deltaLog.getOrElse {
val newDeltaLog = computeDeltaLog()
_deltaLog = Some(newDeltaLog)
newDeltaLog
}

def refreshDeltaLog(): Unit = {
_deltaLog = None
}

// The loading of the DeltaLog is lazy in order to reduce the amount of FileSystem calls,
// in cases where we will fallback to the V1 behavior.
lazy val deltaLog: DeltaLog = {
private def computeDeltaLog(): DeltaLog = {
DeltaTableV2.withEnrichedUnsupportedTableException(catalogTable, tableIdentifier) {
// Ideally the table storage properties should always be the same as the options load from
// the Delta log, as Delta CREATE TABLE command guarantees it. However, custom catalogs such
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,11 @@ import org.apache.spark.sql.delta.commands.columnmapping.RemoveColumnMappingComm
import org.apache.spark.sql.delta.constraints.{CharVarcharConstraint, Constraints}
import org.apache.spark.sql.delta.coordinatedcommits.CoordinatedCommitsUtils
import org.apache.spark.sql.delta.logging.DeltaLogKeys
import org.apache.spark.sql.delta.redirect.RedirectFeature
import org.apache.spark.sql.delta.schema.{SchemaMergingUtils, SchemaUtils}
import org.apache.spark.sql.delta.schema.SchemaUtils.transformSchema
import org.apache.spark.sql.delta.sources.DeltaSQLConf
import org.apache.spark.sql.delta.sources.DeltaSQLConf.ENABLE_TABLE_REDIRECT_FEATURE
import org.apache.spark.sql.delta.stats.StatisticsCollection
import org.apache.hadoop.fs.Path

Expand Down Expand Up @@ -112,9 +114,16 @@ trait AlterDeltaTableCommand extends DeltaCommand {
case class AlterTableSetPropertiesDeltaCommand(
table: DeltaTableV2,
configuration: Map[String, String])
extends LeafRunnableCommand with AlterDeltaTableCommand with IgnoreCachedData {
extends LeafRunnableCommand
with AlterDeltaTableCommand
with IgnoreCachedData {

override def run(sparkSession: SparkSession): Seq[Row] = {
val updateRedirectProperty = !RedirectFeature.hasRedirectConfig(configuration)
if (updateRedirectProperty) {
// Invalidate the cache delta log and refresh DeltaTableV2 to trigger delta log rebuild.
table.refreshDeltaLog()
}
val deltaLog = table.deltaLog

val rowTrackingPropertyKey = DeltaConfigs.ROW_TRACKING_ENABLED.key
Expand Down Expand Up @@ -165,6 +174,8 @@ case class AlterTableSetPropertiesDeltaCommand(

CoordinatedCommitsUtils.validateConfigurationsForAlterTableSetPropertiesDeltaCommand(
metadata.configuration, filteredConfs)
// If table redirect feature is updated, validates its property.
RedirectFeature.validateTableRedirect(txn.snapshot, table.catalogTable, configuration)
val newMetadata = metadata.copy(
description = configuration.getOrElse(TableCatalog.PROP_COMMENT, metadata.description),
configuration = metadata.configuration ++ filteredConfs)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,12 @@

package org.apache.spark.sql.delta.redirect

import java.util.UUID

import scala.collection.JavaConverters._
import scala.reflect.ClassTag

// scalastyle:off import.ordering.noEmptyLine
import org.apache.spark.sql.delta.{
DeltaConfig,
DeltaConfigs,
Expand All @@ -28,13 +32,16 @@ import org.apache.spark.sql.delta.{
RedirectWriterOnlyFeature,
Snapshot
}
import org.apache.spark.sql.delta.DeltaLog.logPathFor
import org.apache.spark.sql.delta.actions.Metadata
import org.apache.spark.sql.delta.util.JsonUtils
import com.fasterxml.jackson.annotation.JsonIgnore
import com.fasterxml.jackson.annotation.JsonProperty
import com.fasterxml.jackson.databind.ObjectMapper
import com.fasterxml.jackson.module.scala.DefaultScalaModule
import org.apache.hadoop.fs.Path

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.catalog.CatalogTable

/**
Expand Down Expand Up @@ -106,7 +113,10 @@ case object DropRedirectInProgress extends RedirectState {
* This is the abstract class of the redirect specification, which stores the information
* of accessing the redirect destination table.
*/
abstract class RedirectSpec()
abstract class RedirectSpec {
def isRedirectDest(logPath: String): Boolean
def isRedirectSource(logPath: String): Boolean
}

/**
* The default redirect spec that is used for OSS delta.
Expand All @@ -120,7 +130,11 @@ abstract class RedirectSpec()
* }
* @param tablePath this is the path where stores the redirect destination table's location.
*/
class PathBasedRedirectSpec(val tablePath: String) extends RedirectSpec
class PathBasedRedirectSpec(val tablePath: String) extends RedirectSpec {
def isRedirectDest(logPath: String): Boolean = tablePath == logPath

def isRedirectSource(logPath: String): Boolean = !isRedirectDest(logPath)
}

object PathBasedRedirectSpec {
/**
Expand Down Expand Up @@ -221,6 +235,26 @@ case class TableRedirectConfiguration(
val isInProgressState: Boolean = {
redirectState == EnableRedirectInProgress || redirectState == DropRedirectInProgress
}

private def isNoRedirectApp(spark: SparkSession): Boolean = {
noRedirectRules.exists { rule =>
rule.appName.exists(_.equalsIgnoreCase(spark.conf.get("spark.app.name")))
}
}

def needRedirect(spark: SparkSession, logPath: Path): Boolean = {
!isNoRedirectApp(spark) && !isInProgressState && spec.isRedirectSource(logPath.toUri.getPath)
}

def getRedirectLocation(deltaLog: DeltaLog, spark: SparkSession): Path = {
spec match {
case spec: PathBasedRedirectSpec =>
val location = new Path(spec.tablePath)
val fs = location.getFileSystem(deltaLog.newDeltaHadoopConf())
fs.makeQualified(location)
case other => throw DeltaErrors.unrecognizedRedirectSpec(other)
}
}
}

/**
Expand All @@ -238,9 +272,7 @@ class TableRedirect(val config: DeltaConfig[Option[String]]) {
*/
def getRedirectConfiguration(deltaLogMetadata: Metadata): Option[TableRedirectConfiguration] = {
config.fromMetaData(deltaLogMetadata).map { propertyValue =>
val mapper = new ObjectMapper()
mapper.registerModule(DefaultScalaModule)
mapper.readValue(propertyValue, classOf[TableRedirectConfiguration])
RedirectFeature.parseRedirectConfiguration(propertyValue)
}
}

Expand Down Expand Up @@ -292,18 +324,7 @@ class TableRedirect(val config: DeltaConfig[Option[String]]) {

val currentConfig = currentConfigOpt.get
val redirectState = currentConfig.redirectState
state match {
case RedirectReady =>
if (redirectState != EnableRedirectInProgress && redirectState != RedirectReady) {
DeltaErrors.invalidRedirectStateTransition(tableIdent, redirectState, state)
}
case DropRedirectInProgress =>
if (redirectState != RedirectReady) {
DeltaErrors.invalidRedirectStateTransition(tableIdent, redirectState, state)
}
case _ =>
DeltaErrors.invalidRedirectStateTransition(tableIdent, redirectState, state)
}
RedirectFeature.validateStateTransition(tableIdent, redirectState, state)
val properties = generateRedirectMetadata(currentConfig.`type`, state, spec, noRedirectRules)
val newConfigs = txn.metadata.configuration ++ properties
val newMetadata = txn.metadata.copy(configuration = newConfigs)
Expand Down Expand Up @@ -400,6 +421,40 @@ object RedirectFeature {
RedirectWriterOnly.isFeatureSupported(snapshot)
}

private def getProperties(
spark: SparkSession,
deltaLog: DeltaLog,
initialCatalogTable: Option[CatalogTable]): Map[String, String] = {
deltaLog.update().getProperties.toMap
}

def withRedirectedLocation(
spark: SparkSession,
deltaLog: DeltaLog,
initialCatalogTable: Option[CatalogTable])(func: Path => Unit): Unit = {
val properties = getProperties(spark, deltaLog, initialCatalogTable)
val redirectConfiguration = getRedirectConfiguration(properties)
redirectConfiguration.foreach { redirectConfig =>
if (redirectConfig.needRedirect(spark, deltaLog.logPath)) {
val redirectLocation = redirectConfig.getRedirectLocation(deltaLog, spark)
func(redirectLocation)
}
}
}

def parseRedirectConfiguration(configString: String): TableRedirectConfiguration = {
val mapper = new ObjectMapper()
mapper.registerModule(DefaultScalaModule)
mapper.readValue(configString, classOf[TableRedirectConfiguration])
}

private def getRedirectConfiguration(
properties: Map[String, String]): Option[TableRedirectConfiguration] = {
properties.get(DeltaConfigs.REDIRECT_READER_WRITER.key)
.orElse(properties.get(DeltaConfigs.REDIRECT_WRITER_ONLY.key))
.map(parseRedirectConfiguration)
}

/**
* Determine whether the operation `op` updates the existing redirect-reader-writer or
* redirect-writer-only table property of a table with `snapshot`.
Expand All @@ -425,4 +480,52 @@ object RedirectFeature {
RedirectReaderWriter.getRedirectConfiguration(snapshot.metadata)
}
}

def hasRedirectConfig(configs: Map[String, String]): Boolean =
getRedirectConfiguration(configs).isDefined

def validateTableRedirect(
snapshot: Snapshot,
catalogTable: Option[CatalogTable],
configs: Map[String, String]
): Unit = {
// Extract redirect configuration from the provided configs
val redirectConfigOpt = configs
.get(DeltaConfigs.REDIRECT_READER_WRITER.key).map(parseRedirectConfiguration)
redirectConfigOpt.foreach { redirectConfig =>
val currentRedirectConfigOpt = getRedirectConfiguration(snapshot)
val identifier = catalogTable
.map(_.identifier.quotedString)
.getOrElse(snapshot.deltaLog.logPath.toString)
val newState = redirectConfig.redirectState
// Validate state transitions based on current and new states
currentRedirectConfigOpt match {
case Some(currentConfig) =>
validateStateTransition(identifier, currentConfig.redirectState, newState)
case None if newState == DropRedirectInProgress =>
DeltaErrors.invalidRedirectStateTransition(identifier, newState, DropRedirectInProgress)
case _ => // No action required for valid transitions
}
}
}

// Helper method to validate state transitions
def validateStateTransition(
identifier: String,
currentState: RedirectState,
newState: RedirectState
): Unit = {
(currentState, newState) match {
case (state, RedirectReady) =>
if (state == DropRedirectInProgress) {
DeltaErrors.invalidRedirectStateTransition(identifier, state, newState)
}
case (state, DropRedirectInProgress) =>
if (state != RedirectReady) {
DeltaErrors.invalidRedirectStateTransition(identifier, state, newState)
}
case (state, _) =>
DeltaErrors.invalidRedirectStateTransition(identifier, state, newState)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2122,6 +2122,13 @@ trait DeltaSQLConfBase {
.booleanConf
.createWithDefault(false)

val ENABLE_TABLE_REDIRECT_FEATURE =
buildConf("enableTableRedirectFeature")
.doc("True if disabling the table redirect feature.")
.internal()
.booleanConf
.createWithDefault(false)

val DELTA_OPTIMIZE_WRITE_MAX_SHUFFLE_PARTITIONS =
buildConf("optimizeWrite.maxShufflePartitions")
.internal()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -599,7 +599,7 @@ class DeltaLogSuite extends QueryTest
assert(snapshot.version === 0)

val deltaLog2 = DeltaLog.forTable(spark, path)
assert(deltaLog2.snapshot.version === 0) // This shouldn't update
assert(deltaLog2.snapshot.version === 0)
val (_, snapshot2) = DeltaLog.forTableWithSnapshot(spark, path)
assert(snapshot2.version === 1) // This should get the latest snapshot
}
Expand Down
Loading
Loading