diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/commands/merge/MergeIntoMaterializeSource.scala b/spark/src/main/scala/org/apache/spark/sql/delta/commands/merge/MergeIntoMaterializeSource.scala index 22f2ef5333f..6a6134dccca 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/commands/merge/MergeIntoMaterializeSource.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/commands/merge/MergeIntoMaterializeSource.scala @@ -180,9 +180,10 @@ trait MergeIntoMaterializeSource extends DeltaLogging with DeltaSparkPlanUtils { RetryHandling.ExhaustedRetries } - // Record if we ran out of executor disk space. + // Record if we ran out of executor disk space when we materialized the source. case s: SparkException - if s.getMessage.contains("java.io.IOException: No space left on device") => + if materializedSourceRDD.nonEmpty && + s.getMessage.contains("java.io.IOException: No space left on device") => // Record situations where we ran out of disk space, possibly because of the space took // by the materialized RDD. recordDeltaEvent( diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/MergeIntoMaterializeSourceSuite.scala b/spark/src/test/scala/org/apache/spark/sql/delta/MergeIntoMaterializeSourceSuite.scala index 4505863f087..3381c659701 100644 --- a/spark/src/test/scala/org/apache/spark/sql/delta/MergeIntoMaterializeSourceSuite.scala +++ b/spark/src/test/scala/org/apache/spark/sql/delta/MergeIntoMaterializeSourceSuite.scala @@ -79,21 +79,31 @@ trait MergeIntoMaterializeSourceTests - for (eager <- BOOLEAN_DOMAIN) - test(s"merge logs out of disk errors - eager=$eager") { - withSQLConf(DeltaSQLConf.MERGE_MATERIALIZE_SOURCE_EAGER.key -> eager.toString) { + for { + eager <- BOOLEAN_DOMAIN + materialized <- BOOLEAN_DOMAIN + } test(s"merge logs out of disk errors - eager=$eager, materialized=$materialized") { + import DeltaSQLConf.MergeMaterializeSource + withSQLConf( + DeltaSQLConf.MERGE_MATERIALIZE_SOURCE_EAGER.key -> eager.toString, + DeltaSQLConf.MERGE_MATERIALIZE_SOURCE.key -> + (if (materialized) MergeMaterializeSource.AUTO else MergeMaterializeSource.NONE)) { val injectEx = new java.io.IOException("No space left on device") testWithCustomErrorInjected[SparkException](injectEx) { (thrownEx, errorOpt) => // Compare messages instead of instances, since the equals method for these exceptions // takes more into account. assert(thrownEx.getCause.getMessage === injectEx.getMessage) - assert(errorOpt.isDefined) - val error = errorOpt.get - assert(error.errorType == MergeIntoMaterializeSourceErrorType.OUT_OF_DISK.toString) - assert(error.attempt == 1) - val storageLevel = StorageLevel.fromString( - spark.conf.get(DeltaSQLConf.MERGE_MATERIALIZE_SOURCE_RDD_STORAGE_LEVEL)) - assert(error.materializedSourceRDDStorageLevel == storageLevel.toString) + if (materialized) { + assert(errorOpt.isDefined) + val error = errorOpt.get + assert(error.errorType == MergeIntoMaterializeSourceErrorType.OUT_OF_DISK.toString) + assert(error.attempt == 1) + val storageLevel = StorageLevel.fromString( + spark.conf.get(DeltaSQLConf.MERGE_MATERIALIZE_SOURCE_RDD_STORAGE_LEVEL)) + assert(error.materializedSourceRDDStorageLevel == storageLevel.toString) + } else { + assert(errorOpt.isEmpty) + } } } } @@ -108,7 +118,7 @@ trait MergeIntoMaterializeSourceTests } } - private def testWithCustomErrorInjected[Intercept <: Exception with AnyRef : ClassTag]( + private def testWithCustomErrorInjected[Intercept >: Null <: Exception with AnyRef : ClassTag]( inject: Exception)( handle: (Intercept, Option[MergeIntoMaterializeSourceError]) => Unit): Unit = { { @@ -125,9 +135,7 @@ trait MergeIntoMaterializeSourceTests .toDF("id") .withColumn("value", rand()) .createOrReplaceTempView("s") - // I don't know why it this cast is necessary. `Intercept` is marked as `AnyRef` so - // it should just let me assign `null`, but the compiler keeps rejecting it. - var thrownException: Intercept = null.asInstanceOf[Intercept] + var thrownException: Intercept = null val events = Log4jUsageLogger .track { thrownException = intercept[Intercept] {