Skip to content

Commit

Permalink
[Spark] Harmonize path-based Delta table resolution
Browse files Browse the repository at this point in the history
  • Loading branch information
ryan-johnson-databricks committed Sep 27, 2023
1 parent 7f8fe8b commit 4ab6c66
Show file tree
Hide file tree
Showing 5 changed files with 92 additions and 69 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -403,37 +403,6 @@ class DeltaAnalysis(session: SparkSession)
throw DeltaErrors.notADeltaTableException("RESTORE")
}

// Resolve as a resolved table if the path is for delta table. For non delta table, we keep the
// path and pass it along in a ResolvedPathBasedNonDeltaTable. This is needed as DESCRIBE DETAIL
// supports both delta and non delta paths.
case u: UnresolvedPathBasedTable =>
val table = getPathBasedDeltaTable(u.path)
val tableExists = Try(table.tableExists).getOrElse(false)
if (tableExists) {
// Resolve it as a path-based Delta table
val catalog = session.sessionState.catalogManager.currentCatalog.asTableCatalog
ResolvedTable.create(
catalog, Identifier.of(Array(DeltaSourceUtils.ALT_NAME), u.path), table)
} else {
// Resolve it as a placeholder, to identify it as a non-Delta table.
ResolvedPathBasedNonDeltaTable(u.path, u.commandName)
}

case u: UnresolvedPathBasedDeltaTable =>
val table = getPathBasedDeltaTable(u.path)
if (!table.tableExists) {
throw DeltaErrors.notADeltaTableException(u.commandName, u.deltaTableIdentifier)
}
val catalog = session.sessionState.catalogManager.currentCatalog.asTableCatalog
ResolvedTable.create(catalog, u.identifier, table)

case u: UnresolvedPathBasedDeltaTableRelation =>
val table = getPathBasedDeltaTable(u.path, u.options.asScala.toMap)
if (!table.tableExists) {
throw DeltaErrors.notADeltaTableException(u.deltaTableIdentifier)
}
DataSourceV2Relation.create(table, None, Some(u.identifier), u.options)

case d: DescribeDeltaHistory if d.childrenResolved => d.toCommand

// This rule falls back to V1 nodes, since we don't have a V2 reader for Delta right now
Expand Down Expand Up @@ -615,12 +584,6 @@ class DeltaAnalysis(session: SparkSession)
)
}

private def getPathBasedDeltaTable(
path: String,
options: Map[String, String] = Map.empty): DeltaTableV2 = {
DeltaTableV2(session, new Path(path), options = options)
}

/**
* Instantiates a CreateDeltaTableCommand with CloneTableCommand as the child query.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ case class PreprocessTimeTravel(sparkSession: SparkSession) extends Rule[Logical
throw DeltaErrors.notADeltaTableException(commandName)
case tableRelation if tableRelation.resolved =>
tableRelation
case _ =>
case _ if ResolveDeltaPathTable.maybeSQLFile(sparkSession, ur.multipartIdentifier) =>
// If the identifier doesn't exist as a table, try resolving it as a path table.
ResolveDeltaPathTable.resolveAsPathTableRelation(sparkSession, ur).getOrElse {
ur.tableNotFound(ur.multipartIdentifier)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,63 +16,105 @@

package org.apache.spark.sql.delta

import scala.collection.JavaConverters._

import org.apache.spark.sql.catalyst.TimeTravel
import org.apache.spark.sql.delta.catalog.DeltaTableV2
import org.apache.spark.sql.delta.util.AnalysisHelper
import org.apache.spark.sql.delta.sources.DeltaSourceUtils
import org.apache.hadoop.fs.Path

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.analysis.{ResolvedTable, UnresolvedRelation, UnresolvedTable}
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.connector.catalog.CatalogV2Implicits.{CatalogHelper, MultipartIdentifierHelper}
import org.apache.spark.sql.connector.catalog.Identifier
import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation
import org.apache.spark.sql.util.CaseInsensitiveStringMap

/**
* Replaces [[UnresolvedTable]]s if the plan is for direct query on files.
*/
case class ResolveDeltaPathTable(sparkSession: SparkSession) extends Rule[LogicalPlan] {
import ResolveDeltaPathTable._

override def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperators {
case u: UnresolvedTable =>
ResolveDeltaPathTable.resolveAsPathTable(sparkSession, u.multipartIdentifier).getOrElse(u)

// Attempts to resolve a path-based Delta table, returning ResolvedPathBasedNonDeltaTable if
// unsuccessful. This is needed as DESCRIBE DETAIL supports both Delta and non-Delta paths.
case u: UnresolvedPathBasedTable =>
resolveAsPathTable(sparkSession, multipartIdentifier(u.path)).getOrElse {
// Resolve it as a placeholder, to identify it as a non-Delta table.
ResolvedPathBasedNonDeltaTable(u.path, u.commandName)
}

// Resolves a known path-based Delta table as a Table.
case u: UnresolvedPathBasedDeltaTable =>
resolveAsPathTable(sparkSession, multipartIdentifier(u.path)).getOrElse {
throw DeltaErrors.notADeltaTableException(u.commandName, u.deltaTableIdentifier)
}

// Resolves a known path-based Delta table as a Relation.
case u: UnresolvedPathBasedDeltaTableRelation =>
resolveAsPathTableRelation(sparkSession, multipartIdentifier(u.path), u.options).getOrElse {
throw DeltaErrors.notADeltaTableException(u.deltaTableIdentifier)
}

// Resolves delta.`/path/to/table` as a path-based Delta table, now that catalog lookup failed.
case u: UnresolvedTable if maybeSQLFile(sparkSession, u.multipartIdentifier) =>
resolveAsPathTable(sparkSession, u.multipartIdentifier).getOrElse(u)
}

// Helper that converts a path into the multipartIdentifier for a path-based Delta table
private def multipartIdentifier(path: String): Seq[String] = Seq(DeltaSourceUtils.ALT_NAME, path)
}

object ResolveDeltaPathTable
{
object ResolveDeltaPathTable {

/** Adapted from spark's [[ResolveSQLOnFile#maybeSQLFile]] */
def maybeSQLFile(sparkSession: SparkSession, multipartIdentifier: Seq[String]): Boolean = {
sparkSession.sessionState.conf.runSQLonFile && multipartIdentifier.size == 2
}

/** Convenience wrapper for UnresolvedRelation */
private[delta] def resolveAsPathTableRelation(
sparkSession: SparkSession,
u: UnresolvedRelation) : Option[DataSourceV2Relation] = {
resolveAsPathTableRelation(sparkSession, u.multipartIdentifier, u.options)
}

/**
* Try resolving the input table as a Path table.
* If the path table exists, return a [[DataSourceV2Relation]] instance. Otherwise, return None.
*/
def resolveAsPathTableRelation(
private[delta] def resolveAsPathTableRelation(
sparkSession: SparkSession,
u: UnresolvedRelation) : Option[DataSourceV2Relation] = {
resolveAsPathTable(sparkSession, u.multipartIdentifier).map { resolvedTable =>
DataSourceV2Relation.create(
resolvedTable.table, Some(resolvedTable.catalog), Some(resolvedTable.identifier))
multipartIdentifier: Seq[String],
options: CaseInsensitiveStringMap) : Option[DataSourceV2Relation] = {
// NOTE: [[ResolvedTable]] always provides a [[TableCatalog]], even for path-based tables, but
// we ignore it here because [[ResolvedRelation]] for Delta tables do not specify a catalog.
resolveAsPathTable(sparkSession, multipartIdentifier, options.asScala.toMap).map { resolved =>
DataSourceV2Relation.create(resolved.table, None, Some(resolved.identifier), options)
}
}

/**
* Try resolving the input table as a Path table.
* If the path table exists, return a [[ResolvedTable]] instance. Otherwise, return None.
*/
def resolveAsPathTable(
private[delta] def resolveAsPathTable(
sparkSession: SparkSession,
multipartIdentifier: Seq[String]): Option[ResolvedTable] = {
val sessionState = sparkSession.sessionState
if (!sessionState.conf.runSQLonFile || multipartIdentifier.size != 2) {
return None
}
multipartIdentifier: Seq[String],
options: Map[String, String] = Map.empty): Option[ResolvedTable] = {
val tableId = multipartIdentifier.asTableIdentifier
if (DeltaTableUtils.isValidPath(tableId)) {
val deltaTableV2 = DeltaTableV2(sparkSession, new Path(tableId.table))
val sessionCatalog = sessionState.catalogManager.v2SessionCatalog.asTableCatalog
Some(ResolvedTable.create(sessionCatalog, multipartIdentifier.asIdentifier, deltaTableV2))
} else {
None
val deltaTableV2 = DeltaTableV2(sparkSession, new Path(tableId.table), options = options)
if (deltaTableV2.tableExists) {
val identifier = multipartIdentifier.asIdentifier
val catalog = sparkSession.sessionState.catalogManager.v2SessionCatalog.asTableCatalog
return Some(ResolvedTable.create(catalog, identifier, deltaTableV2))
}
}
None // Not a Delta table
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -318,26 +318,44 @@ case class DeltaTableV2(
object DeltaTableV2 {
/** Resolves a path into a DeltaTableV2, leveraging standard v2 table resolution. */
def apply(spark: SparkSession, tablePath: Path, cmd: String): DeltaTableV2 =
resolve(spark, UnresolvedPathBasedDeltaTable(tablePath.toString, cmd), cmd)
extractFromResolvedTable(resolve(spark, tablePath, cmd), cmd)

/** Resolves a table identifier into a DeltaTableV2, leveraging standard v2 table resolution. */
def apply(spark: SparkSession, tableId: TableIdentifier, cmd: String): DeltaTableV2 = {
resolve(spark, UnresolvedTable(tableId.nameParts, cmd, None), cmd)
def apply(spark: SparkSession, tableId: TableIdentifier, cmd: String): DeltaTableV2 =
extractFromResolvedTable(resolve(spark, tableId, cmd), cmd)

/** Attempts to resolve a path-based Delta table, using standard v2 table resolution. */
def resolve(spark: SparkSession, tablePath: Path, cmd: String): LogicalPlan =
resolve(spark, UnresolvedPathBasedDeltaTable(tablePath.toString, cmd))

/** Attempts to resolve a Delta table by name, using standard v2 table resolution. */
def resolve(spark: SparkSession, tableId: TableIdentifier, cmd: String): LogicalPlan = {
resolve(spark, UnresolvedTable(tableId.nameParts, cmd, None))
}

/** Applies standard v2 table resolution to an unresolved Delta table plan node */
def resolve(spark: SparkSession, unresolved: LogicalPlan, cmd: String): DeltaTableV2 =
extractFrom(spark.sessionState.analyzer.ResolveRelations(unresolved), cmd)
def resolve(spark: SparkSession, unresolved: LogicalPlan): LogicalPlan = {
// Resolve catalog-based tables first, then deal with any path-based tables that remain.
val catalogResolved = spark.sessionState.analyzer.ResolveRelations(unresolved)
ResolveDeltaPathTable(spark)(catalogResolved)
}

/**
* Extracts the DeltaTableV2 from a resolved Delta table plan node, throwing "table not found" if
* the node does not actually represent a resolved Delta table.
*/
def extractFrom(plan: LogicalPlan, cmd: String): DeltaTableV2 = plan match {
case ResolvedTable(_, _, d: DeltaTableV2, _) => d
def extractFromResolvedTable(resolved: LogicalPlan, cmd: String): DeltaTableV2 = {
extractFromResolvedTableOpt(resolved).getOrElse {
throw DeltaErrors.notADeltaTableException(cmd)
}
}

/** Attempts to extract a resolved DeltaTableV2 from a plan node. */
def extractFromResolvedTableOpt(resolved: LogicalPlan): Option[DeltaTableV2] = resolved match {
case ResolvedTable(_, _, d: DeltaTableV2, _) => Some(d)
case ResolvedTable(_, _, t: V1Table, _) if DeltaTableUtils.isDeltaTable(t.catalogTable) =>
DeltaTableV2(SparkSession.active, new Path(t.v1Table.location), Some(t.v1Table))
case _ => throw DeltaErrors.notADeltaTableException(cmd)
Some(DeltaTableV2(SparkSession.active, new Path(t.v1Table.location), Some(t.v1Table)))
case _ => None
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -287,7 +287,7 @@ trait DeltaCommand extends DeltaLogging {
*/
def getDeltaTable(target: LogicalPlan, cmd: String): DeltaTableV2 = {
// TODO: Remove this wrapper and let former callers invoke DeltaTableV2.extractFrom directly.
DeltaTableV2.extractFrom(target, cmd)
DeltaTableV2.extractFromResolvedTable(target, cmd)
}

/**
Expand Down

0 comments on commit 4ab6c66

Please sign in to comment.