Skip to content

Commit

Permalink
[Spark]Update Delta File Resolution Logic with introduction of Manage…
Browse files Browse the repository at this point in the history
…d Commits (#2799)

#### Which Delta project/connector is this regarding?
- [x] Spark
- [ ] Standalone
- [ ] Flink
- [ ] Kernel
- [ ] Other (fill in here)

## Description

This PR introduces necessary adjustments in our approach to locating
delta files, prompted by the adoption of managed-commits. Previously,
certain code paths assumed the existence of delta files for a specific
version at a predictable path `_delta_log/$x.json`. This assumption is
no longer valid with managed-commits, where delta files may
alternatively be located at `_delta_log/_commits/$x.$uuid.json`. We
attempt to locate the correct delta files from the Snapshot's LogSegment
now.

## How was this patch tested?

Add managed-commits to some of the existing UTs

## Does this PR introduce _any_ user-facing changes?

No
  • Loading branch information
sumeet-db authored Mar 26, 2024
1 parent aa0af00 commit dbed7a9
Show file tree
Hide file tree
Showing 14 changed files with 171 additions and 44 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import scala.collection.mutable
import org.apache.spark.sql.delta.actions.{Action, CommitInfo, CommitMarker, JobInfo, NotebookInfo}
import org.apache.spark.sql.delta.metering.DeltaLogging
import org.apache.spark.sql.delta.storage.LogStore
import org.apache.spark.sql.delta.util.{DateTimeUtils, FileNames, TimestampFormatter}
import org.apache.spark.sql.delta.util.{DateTimeUtils, DeltaCommitFileProvider, FileNames, TimestampFormatter}
import org.apache.spark.sql.delta.util.FileNames._
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileStatus, Path}
Expand Down Expand Up @@ -73,20 +73,23 @@ class DeltaHistoryManager(
*/
def getHistory(
start: Long,
end: Option[Long] = None): Seq[DeltaHistory] = {
endOpt: Option[Long] = None): Seq[DeltaHistory] = {
import org.apache.spark.sql.delta.implicits._
val conf = getSerializableHadoopConf
val logPath = deltaLog.logPath.toString
val snapshot = endOpt.map(end => deltaLog.getSnapshotAt(end - 1)).getOrElse(deltaLog.update())
val commitFileProvider = DeltaCommitFileProvider(snapshot)
// We assume that commits are contiguous, therefore we try to load all of them in order
val info = spark.range(start, end.getOrElse(deltaLog.update().version) + 1)
val info = spark.range(start, snapshot.version + 1)
.mapPartitions { versions =>
val logStore = LogStore(SparkEnv.get.conf, conf.value)
val basePath = new Path(logPath)
val fs = basePath.getFileSystem(conf.value)
versions.flatMap { commit =>
try {
val ci = DeltaHistoryManager.getCommitInfo(logStore, basePath, commit, conf.value)
val metadata = fs.getFileStatus(FileNames.deltaFile(basePath, commit))
val deltaFile = commitFileProvider.deltaFile(commit)
val ci = DeltaHistoryManager.getCommitInfo(logStore, deltaFile, conf.value)
val metadata = fs.getFileStatus(deltaFile)
Some(ci.withTimestamp(metadata.getModificationTime))
} catch {
case _: FileNotFoundException =>
Expand Down Expand Up @@ -258,14 +261,13 @@ object DeltaHistoryManager extends DeltaLogging {
/** Get the persisted commit info (if available) for the given delta file. */
def getCommitInfoOpt(
logStore: LogStore,
basePath: Path,
version: Long,
deltaFile: Path,
hadoopConf: Configuration): Option[CommitInfo] = {
val logs = logStore.readAsIterator(FileNames.deltaFile(basePath, version), hadoopConf)
val logs = logStore.readAsIterator(deltaFile, hadoopConf)
try {
logs
.map(Action.fromJson)
.collectFirst { case c: CommitInfo => c.copy(version = Some(version)) }
.collectFirst { case c: CommitInfo => c.copy(version = Some(deltaVersion(deltaFile))) }
} finally {
logs.close()
}
Expand All @@ -278,11 +280,10 @@ object DeltaHistoryManager extends DeltaLogging {
*/
private def getCommitInfo(
logStore: LogStore,
basePath: Path,
version: Long,
deltaFile: Path,
hadoopConf: Configuration): CommitInfo = {
getCommitInfoOpt(logStore, basePath, version, hadoopConf).getOrElse {
CommitInfo.empty(Some(version))
getCommitInfoOpt(logStore, deltaFile, hadoopConf).getOrElse {
CommitInfo.empty(Some(deltaVersion(deltaFile)))
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ import org.apache.spark.sql.delta.schema.{SchemaMergingUtils, SchemaUtils}
import org.apache.spark.sql.delta.sources.DeltaSQLConf
import org.apache.spark.sql.delta.stats._
import org.apache.spark.sql.delta.storage.LogStore
import org.apache.spark.sql.delta.util.DeltaCommitFileProvider
import org.apache.spark.sql.util.ScalaExtensions._
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileStatus, Path}
Expand Down Expand Up @@ -1368,9 +1369,11 @@ trait OptimisticTransactionImpl extends TransactionalWrite
deltaLog,
"delta.commitLarge.failure",
data = Map("exception" -> Utils.exceptionString(e), "operation" -> op.name))
// Actions of a commit which went in before ours
// Actions of a commit which went in before ours.
// Requires updating deltaLog to retrieve these actions, as another writer may have used
// CommitStore for writing.
val logs = deltaLog.store.readAsIterator(
deltaFile(deltaLog.logPath, attemptVersion),
DeltaCommitFileProvider(deltaLog.update()).deltaFile(attemptVersion),
deltaLog.newDeltaHadoopConf())
try {
val winningCommitActions = logs.map(Action.fromJson)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import org.apache.spark.sql.delta.sources.DeltaSQLConf
import org.apache.spark.sql.delta.stats.DataSkippingReader
import org.apache.spark.sql.delta.stats.DeltaStatsColumnSpec
import org.apache.spark.sql.delta.stats.StatisticsCollection
import org.apache.spark.sql.delta.util.DeltaCommitFileProvider
import org.apache.spark.sql.delta.util.FileNames
import org.apache.spark.sql.delta.util.StateCache
import org.apache.spark.sql.util.ScalaExtensions._
Expand Down Expand Up @@ -122,8 +123,7 @@ class Snapshot(
try {
val commitInfoOpt = DeltaHistoryManager.getCommitInfoOpt(
deltaLog.store,
deltaLog.logPath,
version,
DeltaCommitFileProvider(this).deltaFile(version),
deltaLog.newDeltaHadoopConf())
CommitInfo.getRequiredInCommitTimestamp(commitInfoOpt, version.toString)
} catch {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,8 @@ trait SnapshotManagement { self: DeltaLog =>
.toArray
if (resultTuplesFromFsListingOpt.isEmpty && resultFromCommitStoreFiltered.nonEmpty) {
throw new IllegalStateException("No files found from the file system listing, but " +
"files found from the commit store. This is unexpected.")
s"files found from the commit store. This is unexpected. Commit Files: " +
s"${resultFromCommitStoreFiltered.map(_.getPath).mkString("Array(", ", ", ")")}")
}
// If result from fs listing is None and result from commit-store is empty, return none.
// This is used by caller to distinguish whether table doesn't exist.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import java.sql.Timestamp
import org.apache.spark.sql.delta.skipping.clustering.{ClusteredTableUtils, ClusteringColumnInfo}
import org.apache.spark.sql.delta.{DeltaErrors, DeltaLog, Snapshot, UnresolvedPathOrIdentifier}
import org.apache.spark.sql.delta.metering.DeltaLogging
import org.apache.spark.sql.delta.util.FileNames
import org.apache.spark.sql.delta.util.DeltaCommitFileProvider
import org.apache.hadoop.fs.Path

import org.apache.spark.sql.{Row, SparkSession}
Expand Down Expand Up @@ -158,7 +158,7 @@ case class DescribeDeltaDetailCommand(
deltaLog: DeltaLog,
snapshot: Snapshot,
tableMetadata: Option[CatalogTable]): Seq[Row] = {
val currentVersionPath = FileNames.deltaFile(deltaLog.logPath, snapshot.version)
val currentVersionPath = DeltaCommitFileProvider(snapshot).deltaFile(snapshot.version)
val fs = currentVersionPath.getFileSystem(deltaLog.newDeltaHadoopConf())
val tableName = tableMetadata.map(_.qualifiedName).getOrElse(snapshot.metadata.name)
val featureNames = (
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ trait AbstractBatchBackfillingCommitStore extends CommitStore with Logging {
commitVersion: Long,
actions: Iterator[String]): FileStatus = {
val uuidStr = generateUUID()
val commitPath = FileNames.uuidDeltaFile(logPath, commitVersion, Some(uuidStr))
val commitPath = FileNames.unbackfilledDeltaFile(logPath, commitVersion, Some(uuidStr))
logStore.write(commitPath, actions, overwrite = false, hadoopConf)
commitPath.getFileSystem(hadoopConf).getFileStatus(commitPath)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,9 +126,7 @@ class InMemoryCommitStore(val batchSize: Long) extends AbstractBatchBackfillingC
}
}

def registerTable(
logPath: Path,
maxCommitVersion: Long): Unit = {
def registerTable(logPath: Path, maxCommitVersion: Long): Unit = {
val newPerTableData = new PerTableData(maxCommitVersion)
if (perTableMap.putIfAbsent(logPath, newPerTableData) != null) {
throw new IllegalStateException(s"Table $logPath already exists in the commit store.")
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
/*
* Copyright (2021) The Delta Lake Project Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.delta.util

import org.apache.spark.sql.delta.Snapshot
import org.apache.spark.sql.delta.util.FileNames._
import org.apache.hadoop.fs.Path

case class DeltaCommitFileProvider(logPath: String, maxVersion: Long, uuids: Map[Long, String]) {
// Ensure the Path object is reused across Delta Files but not stored as part of the object state
// since it is not serializable.
@transient lazy val resolvedPath: Path = new Path(logPath)

def deltaFile(version: Long): Path = {
if (version > maxVersion) {
throw new IllegalStateException("Cannot resolve Delta table at version $version as the " +
"state is currently at version $maxVersion. The requested version may be incorrect or " +
"the state may be outdated. Please verify the requested version, update the state if " +
"necessary, and try again")
}
uuids.get(version) match {
case Some(uuid) => FileNames.unbackfilledDeltaFile(resolvedPath, version, Some(uuid))
case _ => FileNames.deltaFile(resolvedPath, version)
}
}
}

object DeltaCommitFileProvider {
def apply(snapshot: Snapshot): DeltaCommitFileProvider = {
val uuids = snapshot.logSegment.deltas
.collect { case UnbackfilledDeltaFile(_, version, uuid) => version -> uuid }
.toMap
new DeltaCommitFileProvider(snapshot.path.toString, snapshot.version, uuids)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import org.apache.hadoop.fs.{FileStatus, Path}
object FileNames {

val deltaFileRegex = raw"(\d+)\.json".r
val uuidDeltaFileRegex = raw"(\d+)\.[^.]+\.json".r
val uuidDeltaFileRegex = raw"(\d+)\.([^.]+)\.json".r
val compactedDeltaFileRegex = raw"(\d+).(\d+).compacted.json".r
val checksumFileRegex = raw"(\d+)\.crc".r
val checkpointFileRegex = raw"(\d+)\.checkpoint((\.\d+\.\d+)?\.parquet|\.[^.]+\.(json|parquet))".r
Expand All @@ -44,7 +44,10 @@ object FileNames {
* @param version The version of the delta file.
* @return The path to the un-backfilled delta file: <logPath>/_commits/<version>.<uuid>.json
*/
def uuidDeltaFile(logPath: Path, version: Long, uuidString: Option[String] = None): Path = {
def unbackfilledDeltaFile(
logPath: Path,
version: Long,
uuidString: Option[String] = None): Path = {
val basePath = commitDirPath(logPath)
val uuid = uuidString.getOrElse(UUID.randomUUID.toString)
new Path(basePath, f"$version%020d.$uuid.json")
Expand Down Expand Up @@ -120,6 +123,9 @@ object FileNames {
def isDeltaFile(path: Path): Boolean = DeltaFile.unapply(path).isDefined
def isDeltaFile(file: FileStatus): Boolean = isDeltaFile(file.getPath)

def isUnbackfilledDeltaFile(path: Path): Boolean = UnbackfilledDeltaFile.unapply(path).isDefined
def isUnbackfilledDeltaFile(file: FileStatus): Boolean = isUnbackfilledDeltaFile(file.getPath)

def isChecksumFile(path: Path): Boolean = checksumFilePattern.matcher(path.getName).matches()
def isChecksumFile(file: FileStatus): Boolean = isChecksumFile(file.getPath)

Expand Down Expand Up @@ -173,7 +179,7 @@ object FileNames {
unapply(f.getPath).map { case (_, version) => (f, version) }
def unapply(path: Path): Option[(Path, Long)] = {
val parentDirName = path.getParent.getName
// If parent is _commits dir, then match against uuid commit file.
// If parent is _commits dir, then match against unbackfilled commit file.
val regex = if (parentDirName == COMMIT_SUBDIR) uuidDeltaFileRegex else deltaFileRegex
regex.unapplySeq(path.getName).map(path -> _.head.toLong)
}
Expand All @@ -192,6 +198,21 @@ object FileNames {
}
}

object UnbackfilledDeltaFile {
def unapply(f: FileStatus): Option[(FileStatus, Long, String)] =
unapply(f.getPath).map { case (_, version, uuidString) => (f, version, uuidString) }
def unapply(path: Path): Option[(Path, Long, String)] = {
// If parent is _commits dir, then match against uuid commit file.
if (path.getParent.getName == COMMIT_SUBDIR) {
uuidDeltaFileRegex
.unapplySeq(path.getName)
.collect { case Seq(version, uuidString) => (path, version.toLong, uuidString) }
} else {
None
}
}
}

object FileType extends Enumeration {
val DELTA, CHECKPOINT, CHECKSUM, COMPACTED_DELTA, OTHER = Value
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,12 @@ import scala.concurrent.duration._
import scala.language.implicitConversions

import org.apache.spark.sql.delta.DeltaTestUtils.createTestAddFile
import org.apache.spark.sql.delta.actions.AddFile
import org.apache.spark.sql.delta.managedcommit.ManagedCommitBaseSuite
import org.apache.spark.sql.delta.sources.DeltaSQLConf
import org.apache.spark.sql.delta.stats.StatsUtils
import org.apache.spark.sql.delta.test.DeltaSQLCommandTest
import org.apache.spark.sql.delta.test.DeltaTestImplicits._
import org.apache.spark.sql.delta.util.DeltaCommitFileProvider
import org.apache.spark.sql.delta.util.FileNames
import org.scalatest.GivenWhenThen

Expand All @@ -49,7 +50,8 @@ trait DeltaTimeTravelTests extends QueryTest
with SharedSparkSession
with GivenWhenThen
with DeltaSQLCommandTest
with StatsUtils {
with StatsUtils
with ManagedCommitBaseSuite {
protected implicit def durationToLong(duration: FiniteDuration): Long = {
duration.toMillis
}
Expand Down Expand Up @@ -131,7 +133,8 @@ trait DeltaTimeTravelTests extends QueryTest
val rangeStart = startVersion * 10
val rangeEnd = rangeStart + 10
spark.range(rangeStart, rangeEnd).write.format("delta").mode("append").saveAsTable(table)
val file = new File(FileNames.deltaFile(deltaLog.logPath, startVersion).toUri)
val file = new File(DeltaCommitFileProvider(
deltaLog.update()).deltaFile(startVersion).toUri)
file.setLastModified(ts)
startVersion += 1
}
Expand Down Expand Up @@ -612,3 +615,7 @@ class DeltaHistoryManagerSuite extends DeltaHistoryManagerBase {
super.sparkConf.set(SQLConf.USE_V1_SOURCE_LIST.key, "parquet,json")
}
}

class ManagedCommitFill1DeltaHistoryManagerSuite extends DeltaHistoryManagerSuite {
override val managedCommitBackfillBatchSize: Option[Int] = Some(1)
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,18 +19,18 @@ package org.apache.spark.sql.delta
import java.io.File
import java.sql.Timestamp
import java.text.SimpleDateFormat
import java.util.{Calendar, Date, TimeZone}
import java.util.Date

import scala.concurrent.duration._
import scala.language.implicitConversions

import org.apache.spark.sql.delta.DeltaHistoryManager.BufferingLogDeletionIterator
import org.apache.spark.sql.delta.DeltaTestUtils.createTestAddFile
import org.apache.spark.sql.delta.actions.AddFile
import org.apache.spark.sql.delta.managedcommit.ManagedCommitBaseSuite
import org.apache.spark.sql.delta.test.DeltaSQLCommandTest
import org.apache.spark.sql.delta.test.DeltaSQLTestUtils
import org.apache.spark.sql.delta.test.DeltaTestImplicits._
import org.apache.spark.sql.delta.util.FileNames
import org.apache.spark.sql.delta.util.{DeltaCommitFileProvider, FileNames}
import org.apache.hadoop.fs.{FileStatus, Path}

import org.apache.spark.sql.{functions, AnalysisException, QueryTest, Row}
Expand All @@ -39,7 +39,8 @@ import org.apache.spark.sql.test.SharedSparkSession
class DeltaTimeTravelSuite extends QueryTest
with SharedSparkSession
with DeltaSQLTestUtils
with DeltaSQLCommandTest {
with DeltaSQLCommandTest
with ManagedCommitBaseSuite {

import testImplicits._

Expand All @@ -52,7 +53,7 @@ class DeltaTimeTravelSuite extends QueryTest
private implicit def longToTimestamp(ts: Long): Timestamp = new Timestamp(ts)

private def modifyCommitTimestamp(deltaLog: DeltaLog, version: Long, ts: Long): Unit = {
val file = new File(FileNames.deltaFile(deltaLog.logPath, version).toUri)
val file = new File(DeltaCommitFileProvider(deltaLog.update()).deltaFile(version).toUri)
file.setLastModified(ts)
val crc = new File(FileNames.checksumFile(deltaLog.logPath, version).toUri)
if (crc.exists()) {
Expand Down Expand Up @@ -762,3 +763,7 @@ class DeltaTimeTravelSuite extends QueryTest
}
}
}

class ManagedCommitFill1DeltaTimeTravelSuite extends DeltaTimeTravelSuite {
override val managedCommitBackfillBatchSize: Option[Int] = Some(1)
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import org.apache.spark.sql.delta.DeltaTestUtils.createTestAddFile
import org.apache.spark.sql.delta.actions.{Action, CommitInfo}
import org.apache.spark.sql.delta.sources.DeltaSQLConf
import org.apache.spark.sql.delta.test.DeltaSQLCommandTest
import org.apache.spark.sql.delta.util.{FileNames, JsonUtils}
import org.apache.spark.sql.delta.util.{DeltaCommitFileProvider, FileNames, JsonUtils}
import org.apache.hadoop.fs.Path

import org.apache.spark.sql.QueryTest
Expand All @@ -42,8 +42,11 @@ class InCommitTimestampSuite
}

private def getInCommitTimestamp(deltaLog: DeltaLog, version: Long): Long = {
val deltaFile = DeltaCommitFileProvider(deltaLog.unsafeVolatileSnapshot).deltaFile(version)
val commitInfo = DeltaHistoryManager.getCommitInfoOpt(
deltaLog.store, deltaLog.logPath, version, deltaLog.newDeltaHadoopConf())
deltaLog.store,
deltaFile,
deltaLog.newDeltaHadoopConf())
commitInfo.get.inCommitTimestamp.get
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ class ManagedCommitSuite
actions: Iterator[String],
updatedActions: UpdatedActions): CommitResponse = {
val uuidFile =
FileNames.uuidDeltaFile(logPath, commitVersion)
FileNames.unbackfilledDeltaFile(logPath, commitVersion)
logStore.write(uuidFile, actions, overwrite = false, hadoopConf)
val uuidFileStatus = uuidFile.getFileSystem(hadoopConf).getFileStatus(uuidFile)
val commitTime = uuidFileStatus.getModificationTime
Expand Down
Loading

0 comments on commit dbed7a9

Please sign in to comment.