From 3768ebf0545504b08c3e9ac4b0425243e5a5bb98 Mon Sep 17 00:00:00 2001 From: Dhruv Arya Date: Fri, 29 Sep 2023 09:51:38 -0700 Subject: [PATCH] [Spark] Add metadata cleanup logic for V2 Checkpoints MetadataCleanup has been updated to make it aware of V2 Checkpoint sidecar files. Sidecar files are only deleted if 1. They are not being used by any non-expired checkpoints AND 2. They have expired as per the log retention period. Closes delta-io/delta#2102 GitOrigin-RevId: 82f76853e049f47d5affc8fe82628c7ff7fecbf9 --- .../spark/sql/delta/MetadataCleanup.scala | 202 +++++++++++++++++- .../spark/sql/delta/DeltaRetentionSuite.scala | 176 +++++++++++++++ .../sql/delta/DeltaRetentionSuiteBase.scala | 130 ++++++++++- 3 files changed, 503 insertions(+), 5 deletions(-) diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/MetadataCleanup.scala b/spark/src/main/scala/org/apache/spark/sql/delta/MetadataCleanup.scala index f6764232833..a839195d4f4 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/MetadataCleanup.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/MetadataCleanup.scala @@ -22,11 +22,12 @@ import scala.collection.mutable.ArrayBuffer import org.apache.spark.sql.delta.DeltaHistoryManager.BufferingLogDeletionIterator import org.apache.spark.sql.delta.TruncationGranularity.{DAY, HOUR, TruncationGranularity} -import org.apache.spark.sql.delta.actions.Metadata +import org.apache.spark.sql.delta.actions.{Action, Metadata} import org.apache.spark.sql.delta.metering.DeltaLogging +import org.apache.spark.sql.delta.util.FileNames import org.apache.spark.sql.delta.util.FileNames.{checkpointVersion, listingPrefix, CheckpointFile, DeltaFile} import org.apache.commons.lang3.time.DateUtils -import org.apache.hadoop.fs.{FileStatus, Path} +import org.apache.hadoop.fs.{FileStatus, FileSystem, Path} private[delta] object TruncationGranularity extends Enumeration { type TruncationGranularity = Value @@ -71,9 +72,36 @@ trait MetadataCleanup extends DeltaLogging { val fs = logPath.getFileSystem(newDeltaHadoopConf()) var numDeleted = 0 - listExpiredDeltaLogs(fileCutOffTime.getTime).map(_.getPath).foreach { path => + val expiredDeltaLogs = listExpiredDeltaLogs(fileCutOffTime.getTime) + if (expiredDeltaLogs.hasNext) { + // Trigger compatibility checkpoint creation logic only when this round of metadata cleanup + // is going to delete any deltas/checkpoint files. + // We need to create compat checkpoint before deleting delta/checkpoint files so that we + // don't have a window in b/w where the old checkpoint is deleted and there is no + // compat-checkpoint available. + val v2CompatCheckpointMetrics = new V2CompatCheckpointMetrics + createSinglePartCheckpointForBackwardCompat(snapshotToCleanup, v2CompatCheckpointMetrics) + logInfo(s"Compatibility checkpoint creation metrics: $v2CompatCheckpointMetrics") + } + var wasCheckpointDeleted = false + expiredDeltaLogs.map(_.getPath).foreach { path => // recursive = false - if (fs.delete(path, false)) numDeleted += 1 + if (fs.delete(path, false)) { + numDeleted += 1 + if (FileNames.isCheckpointFile(path)) { + wasCheckpointDeleted = true + } + } + } + if (wasCheckpointDeleted) { + // Trigger sidecar deletion only when some checkpoints have been deleted as part of this + // round of Metadata cleanup. + val sidecarDeletionMetrics = new SidecarDeletionMetrics + identifyAndDeleteUnreferencedSidecarFiles( + snapshotToCleanup, + fileCutOffTime.getTime, + sidecarDeletionMetrics) + logInfo(s"Sidecar deletion metrics: $sidecarDeletionMetrics") } logInfo(s"Deleted $numDeleted log files older than $formattedDate") } @@ -126,6 +154,172 @@ trait MetadataCleanup extends DeltaLogging { truncateDate(timeMillis, TruncationGranularity.DAY) } + /** + * Helper method to create a compatibility classic single file checkpoint file for this table. + * This is needed so that any legacy reader which do not understand [[V2CheckpointTableFeature]] + * could read the legacy classic checkpoint file and fail gracefully with Protocol requirement + * failure. + */ + protected def createSinglePartCheckpointForBackwardCompat( + snapshotToCleanup: Snapshot, + metrics: V2CompatCheckpointMetrics): Unit = { + // Do nothing if this table does not use V2 Checkpoints, or has no checkpoints at all. + if (!CheckpointProvider.isV2CheckpointEnabled(snapshotToCleanup)) return + if (snapshotToCleanup.checkpointProvider.isEmpty) return + + val startTimeMs = System.currentTimeMillis() + val hadoopConf = newDeltaHadoopConf() + val checkpointInstance = + CheckpointInstance(snapshotToCleanup.checkpointProvider.topLevelFiles.head.getPath) + // The current checkpoint provider is already using a checkpoint with the naming + // scheme of classic checkpoints. There is no need to create a compatibility checkpoint + // in this case. + if (checkpointInstance.format != CheckpointInstance.Format.V2) return + + val checkpointVersion = snapshotToCleanup.checkpointProvider.version + val checkpoints = listFrom(checkpointVersion) + .takeWhile(file => FileNames.getFileVersionOpt(file.getPath).exists(_ <= checkpointVersion)) + .collect { + case file if FileNames.isCheckpointFile(file) => CheckpointInstance(file.getPath) + } + .filter(_.format != CheckpointInstance.Format.V2) + .toArray + val availableNonV2Checkpoints = + getLatestCompleteCheckpointFromList(checkpoints, Some(checkpointVersion)) + if (availableNonV2Checkpoints.nonEmpty) { + metrics.v2CheckpointCompatLogicTimeTakenMs = System.currentTimeMillis() - startTimeMs + return + } + + // topLevelFileIndex must be non-empty when topLevelFiles are present + val shallowCopyDf = + loadIndex(snapshotToCleanup.checkpointProvider.topLevelFileIndex.get, Action.logSchema) + val finalPath = + FileNames.checkpointFileSingular(snapshotToCleanup.deltaLog.logPath, checkpointVersion) + Checkpoints.createCheckpointV2ParquetFile( + spark, + shallowCopyDf, + finalPath, + hadoopConf, + useRename = false) + metrics.v2CheckpointCompatLogicTimeTakenMs = System.currentTimeMillis() - startTimeMs + metrics.checkpointVersion = checkpointVersion + } + + /** Deletes any unreferenced files from the sidecar directory `_delta_log/_sidecar` */ + protected def identifyAndDeleteUnreferencedSidecarFiles( + snapshotToCleanup: Snapshot, + checkpointRetention: Long, + metrics: SidecarDeletionMetrics): Unit = { + val startTimeMs = System.currentTimeMillis() + // If v2 checkpoints are not enabled on the table, we don't need to attempt the sidecar cleanup. + if (!CheckpointProvider.isV2CheckpointEnabled(snapshotToCleanup)) return + + val hadoopConf = newDeltaHadoopConf() + val fs = sidecarDirPath.getFileSystem(hadoopConf) + // This can happen when the V2 Checkpoint feature is present in the Protocol but + // only Classic checkpoints have been created for the table. + if (!fs.exists(sidecarDirPath)) return + + val (parquetCheckpointFiles, otherFiles) = store + .listFrom(listingPrefix(logPath, 0), hadoopConf) + .collect { case CheckpointFile(status, _) => (status, CheckpointInstance(status.getPath)) } + .collect { case (fileStatus, ci) if ci.format.usesSidecars => fileStatus } + .toSeq + .partition(_.getPath.getName.endsWith("parquet")) + val (jsonCheckpointFiles, unknownFormatCheckpointFiles) = + otherFiles.partition(_.getPath.getName.endsWith("json")) + if (unknownFormatCheckpointFiles.nonEmpty) { + logWarning( + "Found checkpoint files other than parquet and json: " + + s"${unknownFormatCheckpointFiles.map(_.getPath.toString).mkString(",")}") + } + metrics.numActiveParquetCheckpointFiles = parquetCheckpointFiles.size + metrics.numActiveJsonCheckpointFiles = jsonCheckpointFiles.size + val parquetCheckpointsFileIndex = + DeltaLogFileIndex(DeltaLogFileIndex.CHECKPOINT_FILE_FORMAT_PARQUET, parquetCheckpointFiles) + val jsonCheckpointsFileIndex = + DeltaLogFileIndex(DeltaLogFileIndex.CHECKPOINT_FILE_FORMAT_JSON, jsonCheckpointFiles) + val identifyActiveSidecarsStartTimeMs = System.currentTimeMillis() + metrics.activeCheckpointsListingTimeTakenMs = identifyActiveSidecarsStartTimeMs - startTimeMs + import org.apache.spark.sql.delta.implicits._ + val df = (parquetCheckpointsFileIndex ++ jsonCheckpointsFileIndex) + .map(loadIndex(_, Action.logSchema(Set("sidecar")))) + .reduceOption(_ union _) + .getOrElse { return } + + val activeSidecarFiles = df + .select("sidecar.path") + .where("path is not null") + .as[String] + .collect() + .map(p => new Path(p).getName) // Get bare file names + .toSet + + val identifyAndDeleteSidecarsStartTimeMs = System.currentTimeMillis() + metrics.identifyActiveSidecarsTimeTakenMs = + identifyAndDeleteSidecarsStartTimeMs - identifyActiveSidecarsStartTimeMs + // Retain all files created in the checkpoint retention window - irrespective of whether they + // are referenced in a checkpoint or not. This is to make sure that we don't end up deleting an + // in-progress checkpoint. + val retentionTimestamp: Long = checkpointRetention + val sidecarFilesIterator = new Iterator[FileStatus] { + // Hadoop's RemoteIterator is neither java nor scala Iterator, so have to wrap it + val remoteIterator = fs.listStatusIterator(sidecarDirPath) + override def hasNext(): Boolean = remoteIterator.hasNext() + override def next(): FileStatus = remoteIterator.next() + } + val sidecarFilesToDelete = sidecarFilesIterator + .collect { case file if file.getModificationTime < retentionTimestamp => file.getPath } + .filterNot(path => activeSidecarFiles.contains(path.getName)) + val sidecarDeletionStartTimeMs = System.currentTimeMillis() + logInfo(s"Starting the deletion of unreferenced sidecar files") + val count = deleteMultiple(fs, sidecarFilesToDelete) + + logInfo(s"Deleted $count sidecar files") + metrics.numSidecarFilesDeleted = count + val endTimeMs = System.currentTimeMillis() + metrics.identifyAndDeleteSidecarsTimeTakenMs = + sidecarDeletionStartTimeMs - identifyAndDeleteSidecarsStartTimeMs + metrics.overallSidecarProcessingTimeTakenMs = endTimeMs - startTimeMs + } + + private def deleteMultiple(fs: FileSystem, paths: Iterator[Path]): Long = { + paths.map { path => + if (fs.delete(path, false)) 1L else 0L + }.sum + } + + /** Class to track metrics related to V2 Checkpoint Sidecars deletion. */ + protected class SidecarDeletionMetrics { + // number of sidecar files deleted + var numSidecarFilesDeleted: Long = -1 + // number of active parquet checkpoint files present in delta log directory + var numActiveParquetCheckpointFiles: Long = -1 + // number of active json checkpoint files present in delta log directory + var numActiveJsonCheckpointFiles: Long = -1 + // time taken (in ms) to list and identify active checkpoints + var activeCheckpointsListingTimeTakenMs: Long = -1 + // time taken (in ms) to list the sidecar directory to get all sidecars and delete those which + // aren't referenced by any checkpoint anymore + var identifyAndDeleteSidecarsTimeTakenMs: Long = -1 + // time taken (in ms) to read the active checkpoint json / parquet files and identify active + // sidecar files + var identifyActiveSidecarsTimeTakenMs: Long = -1 + // time taken (in ms) for everything related to sidecar processing + var overallSidecarProcessingTimeTakenMs: Long = -1 + } + + /** Class to track metrics related to V2 Compatibility checkpoint creation. */ + protected class V2CompatCheckpointMetrics { + // time taken (in ms) to run the v2 checkpoint compat logic + var v2CheckpointCompatLogicTimeTakenMs: Long = -1 + + // the version at which we have created a v2 compat checkpoint, -1 if no compat checkpoint was + // created. + var checkpointVersion: Long = -1 + } + /** * Finds a checkpoint such that we are able to construct table snapshot for all versions at or * greater than the checkpoint version returned. diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/DeltaRetentionSuite.scala b/spark/src/test/scala/org/apache/spark/sql/delta/DeltaRetentionSuite.scala index 7a820f33334..b66d74693e5 100644 --- a/spark/src/test/scala/org/apache/spark/sql/delta/DeltaRetentionSuite.scala +++ b/spark/src/test/scala/org/apache/spark/sql/delta/DeltaRetentionSuite.scala @@ -20,7 +20,9 @@ import java.io.File import scala.language.postfixOps +import org.apache.spark.sql.delta.DeltaTestUtils.createTestAddFile import org.apache.spark.sql.delta.actions.{Action, AddFile, RemoveFile, SetTransaction} +import org.apache.spark.sql.delta.sources.DeltaSQLConf import org.apache.spark.sql.delta.test.DeltaSQLCommandTest import org.apache.spark.sql.delta.test.DeltaTestImplicits._ import org.apache.spark.sql.delta.util.FileNames @@ -322,4 +324,178 @@ class DeltaRetentionSuite extends QueryTest assert(log.snapshot.numOfSetTransactions == 0) } } + + protected def cleanUpExpiredLogs(log: DeltaLog): Unit = { + val snapshot = log.update() + + val checkpointVersion = snapshot.logSegment.checkpointProvider.version + logInfo(s"snapshot version: ${snapshot.version} checkpoint: $checkpointVersion") + + log.cleanUpExpiredLogs(snapshot) + } + + for (v2CheckpointFormat <- V2Checkpoint.Format.ALL_AS_STRINGS) + test(s"sidecar file cleanup [v2CheckpointFormat: $v2CheckpointFormat]") { + val checkpointPolicy = CheckpointPolicy.V2.name + withSQLConf((DeltaSQLConf.CHECKPOINT_V2_TOP_LEVEL_FILE_FORMAT.key -> v2CheckpointFormat)) { + withTempDir { tempDir => + val startTime = System.currentTimeMillis() + val clock = new ManualClock(System.currentTimeMillis()) + val log = DeltaLog.forTable(spark, new Path(tempDir.getCanonicalPath), clock) + val logPath = new File(log.logPath.toUri) + val visitedFiles = scala.collection.mutable.Set.empty[String] + + spark.sql(s"""CREATE TABLE delta.`${tempDir.toString()}` (id Int) USING delta + | TBLPROPERTIES( + |-- Disable the async log cleanup as this test needs to manually trigger log + |-- clean up. + |'delta.enableExpiredLogCleanup' = 'false', + |'${DeltaConfigs.CHECKPOINT_POLICY.key}' = '$checkpointPolicy', + |'${DeltaConfigs.CHECKPOINT_WRITE_STATS_AS_STRUCT.key}' = 'false', + |'delta.checkpointInterval' = '100000', + |'delta.logRetentionDuration' = 'interval 6 days') + """.stripMargin) + + // day-1. Create a commit with 4 AddFiles. + clock.setTime(day(startTime, day = 1)) + val file = (1 to 4).map(i => createTestAddFile(i.toString)) + log.startTransaction().commit(file, testOp) + setModificationTimeOfNewFiles(log, clock, visitedFiles) + + // Trigger 1 commit and 1 checkpoint daily for next 8 days + val sidecarFiles = scala.collection.mutable.Map.empty[Long, String] + val oddCommitSidecarFile_1 = createSidecarFile(log, Seq(1)) + val evenCommitSidecarFile_1 = createSidecarFile(log, Seq(1)) + def commitAndCheckpoint(dayNumber: Int): Unit = { + clock.setTime(day(startTime, dayNumber)) + + // Write a new commit on each day + log.startTransaction().commit(Seq(log.unsafeVolatileSnapshot.metadata), testOp) + setModificationTimeOfNewFiles(log, clock, visitedFiles) + + // Write a new checkpoint on each day. Each checkpoint has 2 sodecars: + // 1. Common sidecar - one of oddCommitSidecarFile_1/evenCommitSidecarFile_1 + // 2. A new sidecar just created for this checkpoint. + val sidecarFile1 = + if (dayNumber % 2 == 0) evenCommitSidecarFile_1 else oddCommitSidecarFile_1 + val sidecarFile2 = createSidecarFile(log, Seq(2, 3, 4)) + val checkpointVersion = log.update().version + createV2CheckpointWithSidecarFile( + log, + checkpointVersion, + sidecarFileNames = Seq(sidecarFile1, sidecarFile2)) + setModificationTimeOfNewFiles(log, clock, visitedFiles) + sidecarFiles.put(checkpointVersion, sidecarFile2) + } + + (2 to 9).foreach { dayNumber => commitAndCheckpoint(dayNumber) } + clock.setTime(day(startTime, day = 10)) + log.update() + + // Assert all log files are present. + compareVersions(getCheckpointVersions(logPath), "checkpoint", 2 to 9) + compareVersions(getDeltaVersions(logPath), "delta", 0 to 9) + assert( + getSidecarFiles(log) === + Set( + evenCommitSidecarFile_1, + oddCommitSidecarFile_1) ++ sidecarFiles.values.toIndexedSeq) + + // Trigger metadata cleanup and validate that only last 6 days of deltas and checkpoints + // have been retained. + cleanUpExpiredLogs(log) + compareVersions(getCheckpointVersions(logPath), "checkpoint", 4 to 9) + compareVersions(getDeltaVersions(logPath), "delta", 4 to 9) + // Check that all active sidecars are retained and expired ones are deleted. + assert( + getSidecarFiles(log) === + Set(evenCommitSidecarFile_1, oddCommitSidecarFile_1) ++ + (4 to 9).map(sidecarFiles(_))) + + // Advance 1 day and again run metadata cleanup. + clock.setTime(day(startTime, day = 11)) + cleanUpExpiredLogs(log) + setModificationTimeOfNewFiles(log, clock, visitedFiles) + // Commit 4 and checkpoint 4 have expired and were deleted. + compareVersions(getCheckpointVersions(logPath), "checkpoint", 5 to 9) + compareVersions(getDeltaVersions(logPath), "delta", 5 to 9) + assert( + getSidecarFiles(log) === + Set(evenCommitSidecarFile_1, oddCommitSidecarFile_1) ++ + (5 to 9).map(sidecarFiles(_))) + + // do 1 more commit and checkpoint on day 13 and run metadata cleanup. + commitAndCheckpoint(dayNumber = 13) // commit and checkpoint 10 + compareVersions(getCheckpointVersions(logPath), "checkpoint", 5 to 10) + compareVersions(getDeltaVersions(logPath), "delta", 5 to 10) + cleanUpExpiredLogs(log) + setModificationTimeOfNewFiles(log, clock, visitedFiles) + // Version 5 and 6 checkpoints and deltas have expired and were deleted. + compareVersions(getCheckpointVersions(logPath), "checkpoint", 7 to 10) + compareVersions(getDeltaVersions(logPath), "delta", 7 to 10) + + assert( + getSidecarFiles(log) === + Set(evenCommitSidecarFile_1, oddCommitSidecarFile_1) ++ + (7 to 10).map(sidecarFiles(_))) + } + } + } + + for (v2CheckpointFormat <- V2Checkpoint.Format.ALL_AS_STRINGS) + test( + s"compat file created with metadata cleanup when checkpoints are deleted" + + s" [v2CheckpointFormat: $v2CheckpointFormat]") { + val checkpointPolicy = CheckpointPolicy.V2.name + withSQLConf((DeltaSQLConf.CHECKPOINT_V2_TOP_LEVEL_FILE_FORMAT.key -> v2CheckpointFormat)) { + withTempDir { tempDir => + val startTime = System.currentTimeMillis() + val clock = new ManualClock(System.currentTimeMillis()) + val log = DeltaLog.forTable(spark, new Path(tempDir.getCanonicalPath), clock) + val logPath = new File(log.logPath.toUri) + val visitedFiles = scala.collection.mutable.Set.empty[String] + + spark.sql(s"""CREATE TABLE delta.`${tempDir.toString()}` (id Int) USING delta + | TBLPROPERTIES( + |-- Disable the async log cleanup as this test needs to manually trigger log + |-- clean up. + |'delta.enableExpiredLogCleanup' = 'false', + |'${DeltaConfigs.CHECKPOINT_POLICY.key}' = '$checkpointPolicy', + |'${DeltaConfigs.CHECKPOINT_WRITE_STATS_AS_STRUCT.key}' = 'false', + |'delta.checkpointInterval' = '100000', + |'delta.logRetentionDuration' = 'interval 6 days') + """.stripMargin) + + (1 to 10).foreach { dayNum => + clock.setTime(day(startTime, dayNum)) + log.startTransaction().commit(Seq(), testOp) + setModificationTimeOfNewFiles(log, clock, visitedFiles) + clock.setTime(day(startTime, dayNum) + 10) + log.checkpoint(log.update()) + setModificationTimeOfNewFiles(log, clock, visitedFiles) + } + clock.setTime(day(startTime, 11)) + log.update() + compareVersions(getCheckpointVersions(logPath), "checkpoint", 1 to 10) + compareVersions(getDeltaVersions(logPath), "delta", 0 to 10) + + // 11th day Run metadata cleanup. + clock.setTime(day(startTime, 11)) + cleanUpExpiredLogs(log) + compareVersions(getCheckpointVersions(logPath), "checkpoint", 5 to 10) + compareVersions(getDeltaVersions(logPath), "delta", 5 to 10) + val checkpointInstancesForV10 = + getCheckpointFiles(logPath) + .filter(f => getFileVersions(Seq(f)).head == 10) + .map(f => new Path(f.getAbsolutePath)) + .sortBy(_.getName) + .map(CheckpointInstance.apply) + + assert(checkpointInstancesForV10.size == 2) + assert( + checkpointInstancesForV10.map(_.format) === + Seq(CheckpointInstance.Format.V2, CheckpointInstance.Format.SINGLE)) + } + } + } } diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/DeltaRetentionSuiteBase.scala b/spark/src/test/scala/org/apache/spark/sql/delta/DeltaRetentionSuiteBase.scala index e36dc6b6100..f1325a26401 100644 --- a/spark/src/test/scala/org/apache/spark/sql/delta/DeltaRetentionSuiteBase.scala +++ b/spark/src/test/scala/org/apache/spark/sql/delta/DeltaRetentionSuiteBase.scala @@ -18,10 +18,15 @@ package org.apache.spark.sql.delta import java.io.File +import scala.collection.mutable + import org.apache.spark.sql.delta.DeltaOperations.Truncate -import org.apache.spark.sql.delta.actions.Metadata +import org.apache.spark.sql.delta.DeltaTestUtils.createTestAddFile +import org.apache.spark.sql.delta.actions.{CheckpointMetadata, Metadata, SidecarFile} +import org.apache.spark.sql.delta.sources.DeltaSQLConf import org.apache.spark.sql.delta.test.DeltaTestImplicits._ import org.apache.spark.sql.delta.util.FileNames +import org.apache.spark.sql.delta.util.FileNames.{newV2CheckpointJsonFile, newV2CheckpointParquetFile} import org.apache.hadoop.fs.Path import org.apache.spark.SparkConf @@ -29,6 +34,7 @@ import org.apache.spark.sql.QueryTest import org.apache.spark.sql.catalyst.util.IntervalUtils import org.apache.spark.sql.test.SharedSparkSession import org.apache.spark.unsafe.types.UTF8String +import org.apache.spark.util.ManualClock trait DeltaRetentionSuiteBase extends QueryTest with SharedSparkSession { @@ -51,6 +57,128 @@ trait DeltaRetentionSuiteBase extends QueryTest protected def getLogFiles(dir: File): Seq[File] + protected def getFileVersions(files: Seq[File]): Set[Long] = { + files.map(f => f.getName()).map(s => s.substring(0, s.indexOf(".")).toLong).toSet + } + + protected def getDeltaVersions(dir: File): Set[Long] = { + getFileVersions(getDeltaFiles(dir)) + } + + protected def getSidecarFiles(log: DeltaLog): Set[String] = { + new java.io.File(log.sidecarDirPath.toUri) + .listFiles() + .filter(_.getName.endsWith(".parquet")) + .map(_.getName) + .toSet + } + + protected def getCheckpointVersions(dir: File): Set[Long] = { + getFileVersions(getCheckpointFiles(dir)) + } + + /** Compares the given versions with expected and generates a nice error message. */ + protected def compareVersions( + versions: Set[Long], + logType: String, + expected: Iterable[Int]): Unit = { + val expectedSet = expected.map(_.toLong).toSet + val deleted = expectedSet -- versions + val notDeleted = versions -- expectedSet + if (!(deleted.isEmpty && notDeleted.isEmpty)) { + fail(s"""Mismatch in log clean up for ${logType}s: + |Shouldn't be deleted but deleted: ${deleted.toArray.sorted.mkString("[", ", ", "]")} + |Should be deleted but not: ${notDeleted.toArray.sorted.mkString("[", ", ", "]")} + """.stripMargin) + } + } + + // Set modification time of the new files in _delta_log directory and mark them as visited. + def setModificationTimeOfNewFiles( + log: DeltaLog, + clock: ManualClock, + visitedFiled: mutable.Set[String]): Unit = { + val fs = log.logPath.getFileSystem(log.newDeltaHadoopConf()) + val allFiles = fs.listFiles(log.logPath, true) + while (allFiles.hasNext) { + val file = allFiles.next() + if (!visitedFiled.contains(file.getPath.toString)) { + visitedFiled += file.getPath.toString + fs.setTimes(file.getPath, clock.getTimeMillis(), 0) + } + } + } + + protected def day(startTime: Long, day: Int): Long = + startTime + intervalStringToMillis(s"interval $day days") + + // Create a sidecar file with given AddFiles inside it. + protected def createSidecarFile(log: DeltaLog, files: Seq[Int]): String = { + val sparkSession = spark + // scalastyle:off sparkimplicits + import sparkSession.implicits._ + // scalastyle:on sparkimplicits + var sidecarFileName: String = "" + withTempDir { dir => + val adds = files.map(i => createTestAddFile(i.toString)) + adds.map(_.wrap).toDF.repartition(1).write.mode("overwrite").parquet(dir.getAbsolutePath) + val srcPath = + new Path(dir.listFiles().filter(_.getName.endsWith("parquet")).head.getAbsolutePath) + val dstPath = new Path(log.sidecarDirPath, srcPath.getName) + val fs = srcPath.getFileSystem(log.newDeltaHadoopConf()) + fs.mkdirs(log.sidecarDirPath) + fs.rename(srcPath, dstPath) + sidecarFileName = fs.getFileStatus(dstPath).getPath.getName + } + sidecarFileName + } + + // Create a V2 Checkpoint at given version with given sidecar files. + protected def createV2CheckpointWithSidecarFile( + log: DeltaLog, + version: Long, + sidecarFileNames: Seq[String]): Unit = { + val hadoopConf = log.newDeltaHadoopConf() + val fs = log.logPath.getFileSystem(hadoopConf) + val sidecarFiles = sidecarFileNames.map { fileName => + val sidecarPath = new Path(log.sidecarDirPath, fileName) + val fileStatus = SerializableFileStatus.fromStatus(fs.getFileStatus(sidecarPath)) + SidecarFile(fileStatus) + } + val snapshot = log.getSnapshotAt(version) + val actionsForCheckpoint = + snapshot.nonFileActions ++ sidecarFiles :+ CheckpointMetadata(version) + val v2CheckpointFormat = + spark.conf.getOption(DeltaSQLConf.CHECKPOINT_V2_TOP_LEVEL_FILE_FORMAT.key) + v2CheckpointFormat match { + case Some(V2Checkpoint.Format.JSON.name) | None => + log.store.write( + newV2CheckpointJsonFile(log.logPath, version), + actionsForCheckpoint.map(_.json).toIterator, + overwrite = true, + hadoopConf = hadoopConf) + case Some(V2Checkpoint.Format.PARQUET.name) => + val parquetFile = newV2CheckpointParquetFile(log.logPath, version) + val sparkSession = spark + // scalastyle:off sparkimplicits + import sparkSession.implicits._ + // scalastyle:on sparkimplicits + val dfToWrite = actionsForCheckpoint.map(_.wrap).toDF + Checkpoints.createCheckpointV2ParquetFile( + spark, + dfToWrite, + parquetFile, + hadoopConf, + useRename = false) + case _ => + assert(false, "Invalid v2 checkpoint format") + } + log.writeLastCheckpointFile( + log, + LastCheckpointInfo(version, -1, None, None, None, None), + false) + } + /** * Start a txn that disables automatic log cleanup. Some tests may need to manually clean up logs * to get deterministic behaviors.