Skip to content

Commit e56a4e7

Browse files
committed
Parallel calls
1 parent fc81d12 commit e56a4e7

File tree

3 files changed

+67
-2
lines changed

3 files changed

+67
-2
lines changed

spark/src/main/scala/org/apache/spark/sql/delta/DeltaHistoryManager.scala

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -211,6 +211,9 @@ class DeltaHistoryManager(
211211
start,
212212
Some(end),
213213
deltaLog.newDeltaHadoopConf())
214+
if (commits.isEmpty) {
215+
throw DeltaErrors.noHistoryFound(deltaLog.logPath)
216+
}
214217
lastCommitBeforeTimestamp(commits, time).getOrElse(commits.head)
215218
}
216219
}
@@ -710,12 +713,19 @@ object DeltaHistoryManager extends DeltaLogging {
710713
startVersion,
711714
Some(math.min(startVersion + step, end)),
712715
conf.value)
713-
lastCommitBeforeTimestamp(commits, time).getOrElse(commits.head)
716+
if (commits.isEmpty) {
717+
None
718+
} else {
719+
Some(lastCommitBeforeTimestamp(commits, time).getOrElse(commits.head))
720+
}
714721
}
715722
}.collect()
716723

717724
// Spark should return the commits in increasing order as well
718-
val commitList = monotonizeCommitTimestamps(possibleCommits)
725+
val commitList = monotonizeCommitTimestamps(possibleCommits.flatten)
726+
if (commitList.isEmpty) {
727+
throw DeltaErrors.noHistoryFound(new Path(logPath))
728+
}
719729
lastCommitBeforeTimestamp(commitList, time).getOrElse(commitList.head)
720730
}
721731

spark/src/main/scala/org/apache/spark/sql/delta/util/DeltaEncoders.scala

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -116,4 +116,9 @@ private[delta] trait DeltaEncoders {
116116
implicit def fsPartitionSpecEncoder
117117
: Encoder[(SerializableFileStatus, CatalogTypes.TablePartitionSpec)]
118118
= _fsPartitionSpecEncoder.get
119+
120+
private lazy val _optionalHistoryCommitEncoder =
121+
new DeltaEncoder[Option[DeltaHistoryManager.Commit]]
122+
implicit def optionalHistoryCommitEncoder: Encoder[Option[DeltaHistoryManager.Commit]] =
123+
_optionalHistoryCommitEncoder.get
119124
}

spark/src/test/scala/org/apache/spark/sql/delta/DeltaHistoryManagerSuite.scala

Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,8 @@ import java.util.{Date, Locale}
2626
import scala.concurrent.duration._
2727
import scala.language.implicitConversions
2828

29+
import org.apache.spark.sql.delta.DeltaConfigs.IN_COMMIT_TIMESTAMPS_ENABLED
30+
2931
import com.databricks.spark.util.Log4jUsageLogger
3032
import org.apache.spark.sql.delta.DeltaHistoryManagerSuiteShims._
3133
import org.apache.spark.sql.delta.DeltaTestUtils.createTestAddFile
@@ -660,6 +662,54 @@ abstract class DeltaHistoryManagerBase extends DeltaTimeTravelTests
660662
testGetHistory(start = 2, endOpt = Some(1), versions = Seq.empty, expectedLogUpdates = 0)
661663
}
662664
}
665+
666+
test("getCommitFromNonICTRange should handle empty history by throwing proper error") {
667+
val tblName = "delta_table"
668+
withTable(tblName) {
669+
val start = 1540415658000L
670+
generateCommits(tblName, start)
671+
val deltaLog = DeltaLog.forTable(spark, getTableLocation(tblName))
672+
673+
val deltaFile = new File(FileNames.unsafeDeltaFile(deltaLog.logPath, 0).toUri)
674+
assert(deltaFile.delete(), "Failed to delete delta log file")
675+
676+
val e = intercept[DeltaAnalysisException] {
677+
deltaLog.history.getCommitFromNonICTRange(0, 1, start)
678+
}
679+
680+
assert(e.getMessage.contains("DELTA_NO_COMMITS_FOUND"))
681+
assert(e.getMessage.contains(deltaLog.logPath.toString))
682+
}
683+
}
684+
685+
test("parallel search handles empty commits in a partition correctly") {
686+
if (coordinatedCommitsBackfillBatchSize.isDefined) {
687+
cancel("This test is not compatible with coordinated commits backfill timestamps.")
688+
}
689+
val tblName = "delta_table"
690+
withTable(tblName) {
691+
// Small threshold to trigger parallel search
692+
withSQLConf(
693+
DeltaSQLConf.DELTA_HISTORY_PAR_SEARCH_THRESHOLD.key -> "3",
694+
IN_COMMIT_TIMESTAMPS_ENABLED.key -> "false") {
695+
val start = 1540415658000L
696+
// Generate 10 commits which will be processed in parallel due to threshold=5
697+
val timestamps = (0 to 9).map(i => start + (i * 20).minutes)
698+
generateCommits(tblName, timestamps: _*)
699+
val table = DeltaTableV2(spark, TableIdentifier(tblName))
700+
val deltaLog = table.deltaLog
701+
702+
// Delete all files in first partition to simulate concurrent metadata cleanup
703+
val deltaFiles = (0 to 4).map { version =>
704+
new File(FileNames.unsafeDeltaFile(deltaLog.logPath, version).toUri)
705+
}
706+
deltaFiles.foreach(f =>
707+
assert(f.delete(), s"Failed to delete delta log file ${f.getPath}"))
708+
assert(
709+
deltaLog.history.getCommitFromNonICTRange(0, 9, start + (7 * 20).minutes).version == 7)
710+
}
711+
}
712+
}
663713
}
664714

665715
/** Uses V2 resolution code paths */

0 commit comments

Comments
 (0)