From e56a4e7effe09e5ec4e813f491a4fe2ec5ff349d Mon Sep 17 00:00:00 2001 From: Sumeet Varma Date: Mon, 16 Dec 2024 10:34:34 -0800 Subject: [PATCH] Parallel calls --- .../spark/sql/delta/DeltaHistoryManager.scala | 14 +++++- .../spark/sql/delta/util/DeltaEncoders.scala | 5 ++ .../sql/delta/DeltaHistoryManagerSuite.scala | 50 +++++++++++++++++++ 3 files changed, 67 insertions(+), 2 deletions(-) diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/DeltaHistoryManager.scala b/spark/src/main/scala/org/apache/spark/sql/delta/DeltaHistoryManager.scala index 079007ed512..92ea74d8a9c 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/DeltaHistoryManager.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/DeltaHistoryManager.scala @@ -211,6 +211,9 @@ class DeltaHistoryManager( start, Some(end), deltaLog.newDeltaHadoopConf()) + if (commits.isEmpty) { + throw DeltaErrors.noHistoryFound(deltaLog.logPath) + } lastCommitBeforeTimestamp(commits, time).getOrElse(commits.head) } } @@ -710,12 +713,19 @@ object DeltaHistoryManager extends DeltaLogging { startVersion, Some(math.min(startVersion + step, end)), conf.value) - lastCommitBeforeTimestamp(commits, time).getOrElse(commits.head) + if (commits.isEmpty) { + None + } else { + Some(lastCommitBeforeTimestamp(commits, time).getOrElse(commits.head)) + } } }.collect() // Spark should return the commits in increasing order as well - val commitList = monotonizeCommitTimestamps(possibleCommits) + val commitList = monotonizeCommitTimestamps(possibleCommits.flatten) + if (commitList.isEmpty) { + throw DeltaErrors.noHistoryFound(new Path(logPath)) + } lastCommitBeforeTimestamp(commitList, time).getOrElse(commitList.head) } diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/util/DeltaEncoders.scala b/spark/src/main/scala/org/apache/spark/sql/delta/util/DeltaEncoders.scala index 2720b9a1e51..b7bf6407d82 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/util/DeltaEncoders.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/util/DeltaEncoders.scala @@ -116,4 +116,9 @@ private[delta] trait DeltaEncoders { implicit def fsPartitionSpecEncoder : Encoder[(SerializableFileStatus, CatalogTypes.TablePartitionSpec)] = _fsPartitionSpecEncoder.get + + private lazy val _optionalHistoryCommitEncoder = + new DeltaEncoder[Option[DeltaHistoryManager.Commit]] + implicit def optionalHistoryCommitEncoder: Encoder[Option[DeltaHistoryManager.Commit]] = + _optionalHistoryCommitEncoder.get } diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/DeltaHistoryManagerSuite.scala b/spark/src/test/scala/org/apache/spark/sql/delta/DeltaHistoryManagerSuite.scala index aabe0852027..8e439ddd450 100644 --- a/spark/src/test/scala/org/apache/spark/sql/delta/DeltaHistoryManagerSuite.scala +++ b/spark/src/test/scala/org/apache/spark/sql/delta/DeltaHistoryManagerSuite.scala @@ -26,6 +26,8 @@ import java.util.{Date, Locale} import scala.concurrent.duration._ import scala.language.implicitConversions +import org.apache.spark.sql.delta.DeltaConfigs.IN_COMMIT_TIMESTAMPS_ENABLED + import com.databricks.spark.util.Log4jUsageLogger import org.apache.spark.sql.delta.DeltaHistoryManagerSuiteShims._ import org.apache.spark.sql.delta.DeltaTestUtils.createTestAddFile @@ -660,6 +662,54 @@ abstract class DeltaHistoryManagerBase extends DeltaTimeTravelTests testGetHistory(start = 2, endOpt = Some(1), versions = Seq.empty, expectedLogUpdates = 0) } } + + test("getCommitFromNonICTRange should handle empty history by throwing proper error") { + val tblName = "delta_table" + withTable(tblName) { + val start = 1540415658000L + generateCommits(tblName, start) + val deltaLog = DeltaLog.forTable(spark, getTableLocation(tblName)) + + val deltaFile = new File(FileNames.unsafeDeltaFile(deltaLog.logPath, 0).toUri) + assert(deltaFile.delete(), "Failed to delete delta log file") + + val e = intercept[DeltaAnalysisException] { + deltaLog.history.getCommitFromNonICTRange(0, 1, start) + } + + assert(e.getMessage.contains("DELTA_NO_COMMITS_FOUND")) + assert(e.getMessage.contains(deltaLog.logPath.toString)) + } + } + + test("parallel search handles empty commits in a partition correctly") { + if (coordinatedCommitsBackfillBatchSize.isDefined) { + cancel("This test is not compatible with coordinated commits backfill timestamps.") + } + val tblName = "delta_table" + withTable(tblName) { + // Small threshold to trigger parallel search + withSQLConf( + DeltaSQLConf.DELTA_HISTORY_PAR_SEARCH_THRESHOLD.key -> "3", + IN_COMMIT_TIMESTAMPS_ENABLED.key -> "false") { + val start = 1540415658000L + // Generate 10 commits which will be processed in parallel due to threshold=5 + val timestamps = (0 to 9).map(i => start + (i * 20).minutes) + generateCommits(tblName, timestamps: _*) + val table = DeltaTableV2(spark, TableIdentifier(tblName)) + val deltaLog = table.deltaLog + + // Delete all files in first partition to simulate concurrent metadata cleanup + val deltaFiles = (0 to 4).map { version => + new File(FileNames.unsafeDeltaFile(deltaLog.logPath, version).toUri) + } + deltaFiles.foreach(f => + assert(f.delete(), s"Failed to delete delta log file ${f.getPath}")) + assert( + deltaLog.history.getCommitFromNonICTRange(0, 9, start + (7 * 20).minutes).version == 7) + } + } + } } /** Uses V2 resolution code paths */