Skip to content

Commit

Permalink
[Spark] In materialize merge source: Only log storage error if we act…
Browse files Browse the repository at this point in the history
…ually materialized (#2727)

<!--
Thanks for sending a pull request!  Here are some tips for you:
1. If this is your first time, please read our contributor guidelines:
https://github.com/delta-io/delta/blob/master/CONTRIBUTING.md
2. If the PR is unfinished, add '[WIP]' in your PR title, e.g., '[WIP]
Your PR title ...'.
  3. Be sure to keep the PR description updated to reflect all changes.
  4. Please write your PR title to summarize what this PR proposes.
5. If possible, provide a concise example to reproduce the issue for a
faster review.
6. If applicable, include the corresponding issue number in the PR title
and link it in the body.
-->

#### Which Delta project/connector is this regarding?
<!--
Please add the component selected below to the beginning of the pull
request title
For example: [Spark] Title of my pull request
-->

- [x] Spark
- [ ] Standalone
- [ ] Flink
- [ ] Kernel
- [ ] Other (fill in here)

## Description

Be more defensive about calling `Option.get` on `materializedSourceRDD`
and check first that it's actually `Some`. In fact only invoke the
entire branch where we log only when we actually did materialize the
source (and not, say, ran out of disk space before).

## How was this patch tested?


This PR adds a new test for the scenario where we throw an out of disk
space error without having materialized the source.

## Does this PR introduce _any_ user-facing changes?

No
  • Loading branch information
larsk-db authored Mar 8, 2024
1 parent 77d43ed commit 9c57f7b
Show file tree
Hide file tree
Showing 2 changed files with 25 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -180,9 +180,10 @@ trait MergeIntoMaterializeSource extends DeltaLogging with DeltaSparkPlanUtils {
RetryHandling.ExhaustedRetries
}

// Record if we ran out of executor disk space.
// Record if we ran out of executor disk space when we materialized the source.
case s: SparkException
if s.getMessage.contains("java.io.IOException: No space left on device") =>
if materializedSourceRDD.nonEmpty &&
s.getMessage.contains("java.io.IOException: No space left on device") =>
// Record situations where we ran out of disk space, possibly because of the space took
// by the materialized RDD.
recordDeltaEvent(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,21 +79,31 @@ trait MergeIntoMaterializeSourceTests



for (eager <- BOOLEAN_DOMAIN)
test(s"merge logs out of disk errors - eager=$eager") {
withSQLConf(DeltaSQLConf.MERGE_MATERIALIZE_SOURCE_EAGER.key -> eager.toString) {
for {
eager <- BOOLEAN_DOMAIN
materialized <- BOOLEAN_DOMAIN
} test(s"merge logs out of disk errors - eager=$eager, materialized=$materialized") {
import DeltaSQLConf.MergeMaterializeSource
withSQLConf(
DeltaSQLConf.MERGE_MATERIALIZE_SOURCE_EAGER.key -> eager.toString,
DeltaSQLConf.MERGE_MATERIALIZE_SOURCE.key ->
(if (materialized) MergeMaterializeSource.AUTO else MergeMaterializeSource.NONE)) {
val injectEx = new java.io.IOException("No space left on device")
testWithCustomErrorInjected[SparkException](injectEx) { (thrownEx, errorOpt) =>
// Compare messages instead of instances, since the equals method for these exceptions
// takes more into account.
assert(thrownEx.getCause.getMessage === injectEx.getMessage)
assert(errorOpt.isDefined)
val error = errorOpt.get
assert(error.errorType == MergeIntoMaterializeSourceErrorType.OUT_OF_DISK.toString)
assert(error.attempt == 1)
val storageLevel = StorageLevel.fromString(
spark.conf.get(DeltaSQLConf.MERGE_MATERIALIZE_SOURCE_RDD_STORAGE_LEVEL))
assert(error.materializedSourceRDDStorageLevel == storageLevel.toString)
if (materialized) {
assert(errorOpt.isDefined)
val error = errorOpt.get
assert(error.errorType == MergeIntoMaterializeSourceErrorType.OUT_OF_DISK.toString)
assert(error.attempt == 1)
val storageLevel = StorageLevel.fromString(
spark.conf.get(DeltaSQLConf.MERGE_MATERIALIZE_SOURCE_RDD_STORAGE_LEVEL))
assert(error.materializedSourceRDDStorageLevel == storageLevel.toString)
} else {
assert(errorOpt.isEmpty)
}
}
}
}
Expand All @@ -108,7 +118,7 @@ trait MergeIntoMaterializeSourceTests
}
}

private def testWithCustomErrorInjected[Intercept <: Exception with AnyRef : ClassTag](
private def testWithCustomErrorInjected[Intercept >: Null <: Exception with AnyRef : ClassTag](
inject: Exception)(
handle: (Intercept, Option[MergeIntoMaterializeSourceError]) => Unit): Unit = {
{
Expand All @@ -125,9 +135,7 @@ trait MergeIntoMaterializeSourceTests
.toDF("id")
.withColumn("value", rand())
.createOrReplaceTempView("s")
// I don't know why it this cast is necessary. `Intercept` is marked as `AnyRef` so
// it should just let me assign `null`, but the compiler keeps rejecting it.
var thrownException: Intercept = null.asInstanceOf[Intercept]
var thrownException: Intercept = null
val events = Log4jUsageLogger
.track {
thrownException = intercept[Intercept] {
Expand Down

0 comments on commit 9c57f7b

Please sign in to comment.