Skip to content

Commit

Permalink
[Spark] Add metadata cleanup logic for V2 Checkpoints
Browse files Browse the repository at this point in the history
MetadataCleanup has been updated to make it aware of V2 Checkpoint sidecar files. Sidecar files are only deleted if 1. They are not being used by any non-expired checkpoints AND 2. They have expired as per the log retention period.

Closes delta-io#2102

GitOrigin-RevId: 82f76853e049f47d5affc8fe82628c7ff7fecbf9
  • Loading branch information
dhruvarya-db authored and vkorukanti committed Oct 3, 2023
1 parent ccf7fbc commit 3768ebf
Show file tree
Hide file tree
Showing 3 changed files with 503 additions and 5 deletions.
202 changes: 198 additions & 4 deletions spark/src/main/scala/org/apache/spark/sql/delta/MetadataCleanup.scala
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,12 @@ import scala.collection.mutable.ArrayBuffer

import org.apache.spark.sql.delta.DeltaHistoryManager.BufferingLogDeletionIterator
import org.apache.spark.sql.delta.TruncationGranularity.{DAY, HOUR, TruncationGranularity}
import org.apache.spark.sql.delta.actions.Metadata
import org.apache.spark.sql.delta.actions.{Action, Metadata}
import org.apache.spark.sql.delta.metering.DeltaLogging
import org.apache.spark.sql.delta.util.FileNames
import org.apache.spark.sql.delta.util.FileNames.{checkpointVersion, listingPrefix, CheckpointFile, DeltaFile}
import org.apache.commons.lang3.time.DateUtils
import org.apache.hadoop.fs.{FileStatus, Path}
import org.apache.hadoop.fs.{FileStatus, FileSystem, Path}

private[delta] object TruncationGranularity extends Enumeration {
type TruncationGranularity = Value
Expand Down Expand Up @@ -71,9 +72,36 @@ trait MetadataCleanup extends DeltaLogging {

val fs = logPath.getFileSystem(newDeltaHadoopConf())
var numDeleted = 0
listExpiredDeltaLogs(fileCutOffTime.getTime).map(_.getPath).foreach { path =>
val expiredDeltaLogs = listExpiredDeltaLogs(fileCutOffTime.getTime)
if (expiredDeltaLogs.hasNext) {
// Trigger compatibility checkpoint creation logic only when this round of metadata cleanup
// is going to delete any deltas/checkpoint files.
// We need to create compat checkpoint before deleting delta/checkpoint files so that we
// don't have a window in b/w where the old checkpoint is deleted and there is no
// compat-checkpoint available.
val v2CompatCheckpointMetrics = new V2CompatCheckpointMetrics
createSinglePartCheckpointForBackwardCompat(snapshotToCleanup, v2CompatCheckpointMetrics)
logInfo(s"Compatibility checkpoint creation metrics: $v2CompatCheckpointMetrics")
}
var wasCheckpointDeleted = false
expiredDeltaLogs.map(_.getPath).foreach { path =>
// recursive = false
if (fs.delete(path, false)) numDeleted += 1
if (fs.delete(path, false)) {
numDeleted += 1
if (FileNames.isCheckpointFile(path)) {
wasCheckpointDeleted = true
}
}
}
if (wasCheckpointDeleted) {
// Trigger sidecar deletion only when some checkpoints have been deleted as part of this
// round of Metadata cleanup.
val sidecarDeletionMetrics = new SidecarDeletionMetrics
identifyAndDeleteUnreferencedSidecarFiles(
snapshotToCleanup,
fileCutOffTime.getTime,
sidecarDeletionMetrics)
logInfo(s"Sidecar deletion metrics: $sidecarDeletionMetrics")
}
logInfo(s"Deleted $numDeleted log files older than $formattedDate")
}
Expand Down Expand Up @@ -126,6 +154,172 @@ trait MetadataCleanup extends DeltaLogging {
truncateDate(timeMillis, TruncationGranularity.DAY)
}

/**
* Helper method to create a compatibility classic single file checkpoint file for this table.
* This is needed so that any legacy reader which do not understand [[V2CheckpointTableFeature]]
* could read the legacy classic checkpoint file and fail gracefully with Protocol requirement
* failure.
*/
protected def createSinglePartCheckpointForBackwardCompat(
snapshotToCleanup: Snapshot,
metrics: V2CompatCheckpointMetrics): Unit = {
// Do nothing if this table does not use V2 Checkpoints, or has no checkpoints at all.
if (!CheckpointProvider.isV2CheckpointEnabled(snapshotToCleanup)) return
if (snapshotToCleanup.checkpointProvider.isEmpty) return

val startTimeMs = System.currentTimeMillis()
val hadoopConf = newDeltaHadoopConf()
val checkpointInstance =
CheckpointInstance(snapshotToCleanup.checkpointProvider.topLevelFiles.head.getPath)
// The current checkpoint provider is already using a checkpoint with the naming
// scheme of classic checkpoints. There is no need to create a compatibility checkpoint
// in this case.
if (checkpointInstance.format != CheckpointInstance.Format.V2) return

val checkpointVersion = snapshotToCleanup.checkpointProvider.version
val checkpoints = listFrom(checkpointVersion)
.takeWhile(file => FileNames.getFileVersionOpt(file.getPath).exists(_ <= checkpointVersion))
.collect {
case file if FileNames.isCheckpointFile(file) => CheckpointInstance(file.getPath)
}
.filter(_.format != CheckpointInstance.Format.V2)
.toArray
val availableNonV2Checkpoints =
getLatestCompleteCheckpointFromList(checkpoints, Some(checkpointVersion))
if (availableNonV2Checkpoints.nonEmpty) {
metrics.v2CheckpointCompatLogicTimeTakenMs = System.currentTimeMillis() - startTimeMs
return
}

// topLevelFileIndex must be non-empty when topLevelFiles are present
val shallowCopyDf =
loadIndex(snapshotToCleanup.checkpointProvider.topLevelFileIndex.get, Action.logSchema)
val finalPath =
FileNames.checkpointFileSingular(snapshotToCleanup.deltaLog.logPath, checkpointVersion)
Checkpoints.createCheckpointV2ParquetFile(
spark,
shallowCopyDf,
finalPath,
hadoopConf,
useRename = false)
metrics.v2CheckpointCompatLogicTimeTakenMs = System.currentTimeMillis() - startTimeMs
metrics.checkpointVersion = checkpointVersion
}

/** Deletes any unreferenced files from the sidecar directory `_delta_log/_sidecar` */
protected def identifyAndDeleteUnreferencedSidecarFiles(
snapshotToCleanup: Snapshot,
checkpointRetention: Long,
metrics: SidecarDeletionMetrics): Unit = {
val startTimeMs = System.currentTimeMillis()
// If v2 checkpoints are not enabled on the table, we don't need to attempt the sidecar cleanup.
if (!CheckpointProvider.isV2CheckpointEnabled(snapshotToCleanup)) return

val hadoopConf = newDeltaHadoopConf()
val fs = sidecarDirPath.getFileSystem(hadoopConf)
// This can happen when the V2 Checkpoint feature is present in the Protocol but
// only Classic checkpoints have been created for the table.
if (!fs.exists(sidecarDirPath)) return

val (parquetCheckpointFiles, otherFiles) = store
.listFrom(listingPrefix(logPath, 0), hadoopConf)
.collect { case CheckpointFile(status, _) => (status, CheckpointInstance(status.getPath)) }
.collect { case (fileStatus, ci) if ci.format.usesSidecars => fileStatus }
.toSeq
.partition(_.getPath.getName.endsWith("parquet"))
val (jsonCheckpointFiles, unknownFormatCheckpointFiles) =
otherFiles.partition(_.getPath.getName.endsWith("json"))
if (unknownFormatCheckpointFiles.nonEmpty) {
logWarning(
"Found checkpoint files other than parquet and json: " +
s"${unknownFormatCheckpointFiles.map(_.getPath.toString).mkString(",")}")
}
metrics.numActiveParquetCheckpointFiles = parquetCheckpointFiles.size
metrics.numActiveJsonCheckpointFiles = jsonCheckpointFiles.size
val parquetCheckpointsFileIndex =
DeltaLogFileIndex(DeltaLogFileIndex.CHECKPOINT_FILE_FORMAT_PARQUET, parquetCheckpointFiles)
val jsonCheckpointsFileIndex =
DeltaLogFileIndex(DeltaLogFileIndex.CHECKPOINT_FILE_FORMAT_JSON, jsonCheckpointFiles)
val identifyActiveSidecarsStartTimeMs = System.currentTimeMillis()
metrics.activeCheckpointsListingTimeTakenMs = identifyActiveSidecarsStartTimeMs - startTimeMs
import org.apache.spark.sql.delta.implicits._
val df = (parquetCheckpointsFileIndex ++ jsonCheckpointsFileIndex)
.map(loadIndex(_, Action.logSchema(Set("sidecar"))))
.reduceOption(_ union _)
.getOrElse { return }

val activeSidecarFiles = df
.select("sidecar.path")
.where("path is not null")
.as[String]
.collect()
.map(p => new Path(p).getName) // Get bare file names
.toSet

val identifyAndDeleteSidecarsStartTimeMs = System.currentTimeMillis()
metrics.identifyActiveSidecarsTimeTakenMs =
identifyAndDeleteSidecarsStartTimeMs - identifyActiveSidecarsStartTimeMs
// Retain all files created in the checkpoint retention window - irrespective of whether they
// are referenced in a checkpoint or not. This is to make sure that we don't end up deleting an
// in-progress checkpoint.
val retentionTimestamp: Long = checkpointRetention
val sidecarFilesIterator = new Iterator[FileStatus] {
// Hadoop's RemoteIterator is neither java nor scala Iterator, so have to wrap it
val remoteIterator = fs.listStatusIterator(sidecarDirPath)
override def hasNext(): Boolean = remoteIterator.hasNext()
override def next(): FileStatus = remoteIterator.next()
}
val sidecarFilesToDelete = sidecarFilesIterator
.collect { case file if file.getModificationTime < retentionTimestamp => file.getPath }
.filterNot(path => activeSidecarFiles.contains(path.getName))
val sidecarDeletionStartTimeMs = System.currentTimeMillis()
logInfo(s"Starting the deletion of unreferenced sidecar files")
val count = deleteMultiple(fs, sidecarFilesToDelete)

logInfo(s"Deleted $count sidecar files")
metrics.numSidecarFilesDeleted = count
val endTimeMs = System.currentTimeMillis()
metrics.identifyAndDeleteSidecarsTimeTakenMs =
sidecarDeletionStartTimeMs - identifyAndDeleteSidecarsStartTimeMs
metrics.overallSidecarProcessingTimeTakenMs = endTimeMs - startTimeMs
}

private def deleteMultiple(fs: FileSystem, paths: Iterator[Path]): Long = {
paths.map { path =>
if (fs.delete(path, false)) 1L else 0L
}.sum
}

/** Class to track metrics related to V2 Checkpoint Sidecars deletion. */
protected class SidecarDeletionMetrics {
// number of sidecar files deleted
var numSidecarFilesDeleted: Long = -1
// number of active parquet checkpoint files present in delta log directory
var numActiveParquetCheckpointFiles: Long = -1
// number of active json checkpoint files present in delta log directory
var numActiveJsonCheckpointFiles: Long = -1
// time taken (in ms) to list and identify active checkpoints
var activeCheckpointsListingTimeTakenMs: Long = -1
// time taken (in ms) to list the sidecar directory to get all sidecars and delete those which
// aren't referenced by any checkpoint anymore
var identifyAndDeleteSidecarsTimeTakenMs: Long = -1
// time taken (in ms) to read the active checkpoint json / parquet files and identify active
// sidecar files
var identifyActiveSidecarsTimeTakenMs: Long = -1
// time taken (in ms) for everything related to sidecar processing
var overallSidecarProcessingTimeTakenMs: Long = -1
}

/** Class to track metrics related to V2 Compatibility checkpoint creation. */
protected class V2CompatCheckpointMetrics {
// time taken (in ms) to run the v2 checkpoint compat logic
var v2CheckpointCompatLogicTimeTakenMs: Long = -1

// the version at which we have created a v2 compat checkpoint, -1 if no compat checkpoint was
// created.
var checkpointVersion: Long = -1
}

/**
* Finds a checkpoint such that we are able to construct table snapshot for all versions at or
* greater than the checkpoint version returned.
Expand Down
Loading

0 comments on commit 3768ebf

Please sign in to comment.