From 68a76ba7912ae3c55d748f25b2691c9cdd0585b1 Mon Sep 17 00:00:00 2001 From: Julek Sompolski Date: Thu, 5 Sep 2024 11:23:00 +0200 Subject: [PATCH 1/4] f --- .../delta/commands/merge/MergeIntoMaterializeSource.scala | 8 ++------ .../spark/sql/delta/MergeIntoMaterializeSourceSuite.scala | 5 ++--- 2 files changed, 4 insertions(+), 9 deletions(-) diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/commands/merge/MergeIntoMaterializeSource.scala b/spark/src/main/scala/org/apache/spark/sql/delta/commands/merge/MergeIntoMaterializeSource.scala index c27b5d1457b..dd329d25b16 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/commands/merge/MergeIntoMaterializeSource.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/commands/merge/MergeIntoMaterializeSource.scala @@ -176,8 +176,8 @@ trait MergeIntoMaterializeSource extends DeltaLogging with DeltaSparkPlanUtils { // SparkCoreErrors.checkpointRDDBlockIdNotFoundError from LocalCheckpointRDD.compute. case s: SparkException if materializedSourceRDD.nonEmpty && - s.getMessage.matches( - mergeMaterializedSourceRddBlockLostErrorRegex(materializedSourceRDD.get.id)) => + s.getErrorClass() == "CHECKPOINT_RDD_BLOCK_ID_NOT_FOUND" && + s.getMessageParameters().get("blockId") == materializedSourceRDD.get.id.toString => logWarning(log"Materialized ${MDC(DeltaLogKeys.OPERATION, operation)} source RDD block " + log"lost. ${MDC(DeltaLogKeys.OPERATION, operation)} needs to be restarted. " + log"This was attempt number ${MDC(DeltaLogKeys.ATTEMPT, attempt)}.") @@ -436,10 +436,6 @@ object MergeIntoMaterializeSource { MergeIntoMaterializeSourceReason.MATERIALIZED_REASONS.contains(materializeReason)) } - // This depends on SparkCoreErrors.checkpointRDDBlockIdNotFoundError msg - def mergeMaterializedSourceRddBlockLostErrorRegex(rddId: Int): String = - s"(?s).*Checkpoint block rdd_${rddId}_[0-9]+ not found!.*" - /** * @return The columns of the source plan that are used in this MERGE */ diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/MergeIntoMaterializeSourceSuite.scala b/spark/src/test/scala/org/apache/spark/sql/delta/MergeIntoMaterializeSourceSuite.scala index 4506d77ed2d..5f0ddd5c635 100644 --- a/spark/src/test/scala/org/apache/spark/sql/delta/MergeIntoMaterializeSourceSuite.scala +++ b/spark/src/test/scala/org/apache/spark/sql/delta/MergeIntoMaterializeSourceSuite.scala @@ -175,9 +175,8 @@ trait MergeIntoMaterializeSourceErrorTests extends MergeIntoMaterializeSourceMix checkpointedDf.collect() } assert(ex.isInstanceOf[SparkException], ex) - assert( - ex.getMessage().matches(mergeMaterializedSourceRddBlockLostErrorRegex(rdd.id)), - s"RDD id ${rdd.id}: Message: ${ex.getMessage}") + assert(ex.asInstanceOf[SparkException].getErrorClass() == "CHECKPOINT_RDD_BLOCK_ID_NOT_FOUND") + assert(ex.asInstanceOf[SparkException].getMessageParameters().get("blockId") == rdd.id.toString) } for { From 436d5d77224ccc6f2c372575ac84b5628e7808de Mon Sep 17 00:00:00 2001 From: Julek Sompolski Date: Thu, 5 Sep 2024 12:11:37 +0200 Subject: [PATCH 2/4] fix import --- .../apache/spark/sql/delta/MergeIntoMaterializeSourceSuite.scala | 1 - 1 file changed, 1 deletion(-) diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/MergeIntoMaterializeSourceSuite.scala b/spark/src/test/scala/org/apache/spark/sql/delta/MergeIntoMaterializeSourceSuite.scala index 5f0ddd5c635..983239597de 100644 --- a/spark/src/test/scala/org/apache/spark/sql/delta/MergeIntoMaterializeSourceSuite.scala +++ b/spark/src/test/scala/org/apache/spark/sql/delta/MergeIntoMaterializeSourceSuite.scala @@ -24,7 +24,6 @@ import scala.util.control.NonFatal import com.databricks.spark.util.{Log4jUsageLogger, MetricDefinitions, UsageRecord} import org.apache.spark.sql.delta.DeltaTestUtils._ import org.apache.spark.sql.delta.commands.merge.{MergeIntoMaterializeSourceError, MergeIntoMaterializeSourceErrorType, MergeIntoMaterializeSourceReason, MergeStats} -import org.apache.spark.sql.delta.commands.merge.MergeIntoMaterializeSource.mergeMaterializedSourceRddBlockLostErrorRegex import org.apache.spark.sql.delta.sources.DeltaSQLConf import org.apache.spark.sql.delta.test.DeltaSQLCommandTest import org.apache.spark.sql.delta.test.DeltaSQLTestUtils From 28b88b177a874e6d5861af355833f3e1af5f2d23 Mon Sep 17 00:00:00 2001 From: Julek Sompolski Date: Thu, 27 Mar 2025 12:54:56 +0100 Subject: [PATCH 3/4] retrigger From 33a01c59b27256b5670a5920299eebe5152192bf Mon Sep 17 00:00:00 2001 From: Julek Sompolski Date: Wed, 28 May 2025 13:19:52 +0200 Subject: [PATCH 4/4] retrigger