diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/commands/merge/MergeIntoMaterializeSource.scala b/spark/src/main/scala/org/apache/spark/sql/delta/commands/merge/MergeIntoMaterializeSource.scala index c27b5d1457b..dd329d25b16 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/commands/merge/MergeIntoMaterializeSource.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/commands/merge/MergeIntoMaterializeSource.scala @@ -176,8 +176,8 @@ trait MergeIntoMaterializeSource extends DeltaLogging with DeltaSparkPlanUtils { // SparkCoreErrors.checkpointRDDBlockIdNotFoundError from LocalCheckpointRDD.compute. case s: SparkException if materializedSourceRDD.nonEmpty && - s.getMessage.matches( - mergeMaterializedSourceRddBlockLostErrorRegex(materializedSourceRDD.get.id)) => + s.getErrorClass() == "CHECKPOINT_RDD_BLOCK_ID_NOT_FOUND" && + s.getMessageParameters().get("blockId") == materializedSourceRDD.get.id.toString => logWarning(log"Materialized ${MDC(DeltaLogKeys.OPERATION, operation)} source RDD block " + log"lost. ${MDC(DeltaLogKeys.OPERATION, operation)} needs to be restarted. " + log"This was attempt number ${MDC(DeltaLogKeys.ATTEMPT, attempt)}.") @@ -436,10 +436,6 @@ object MergeIntoMaterializeSource { MergeIntoMaterializeSourceReason.MATERIALIZED_REASONS.contains(materializeReason)) } - // This depends on SparkCoreErrors.checkpointRDDBlockIdNotFoundError msg - def mergeMaterializedSourceRddBlockLostErrorRegex(rddId: Int): String = - s"(?s).*Checkpoint block rdd_${rddId}_[0-9]+ not found!.*" - /** * @return The columns of the source plan that are used in this MERGE */ diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/MergeIntoMaterializeSourceSuite.scala b/spark/src/test/scala/org/apache/spark/sql/delta/MergeIntoMaterializeSourceSuite.scala index 4506d77ed2d..983239597de 100644 --- a/spark/src/test/scala/org/apache/spark/sql/delta/MergeIntoMaterializeSourceSuite.scala +++ b/spark/src/test/scala/org/apache/spark/sql/delta/MergeIntoMaterializeSourceSuite.scala @@ -24,7 +24,6 @@ import scala.util.control.NonFatal import com.databricks.spark.util.{Log4jUsageLogger, MetricDefinitions, UsageRecord} import org.apache.spark.sql.delta.DeltaTestUtils._ import org.apache.spark.sql.delta.commands.merge.{MergeIntoMaterializeSourceError, MergeIntoMaterializeSourceErrorType, MergeIntoMaterializeSourceReason, MergeStats} -import org.apache.spark.sql.delta.commands.merge.MergeIntoMaterializeSource.mergeMaterializedSourceRddBlockLostErrorRegex import org.apache.spark.sql.delta.sources.DeltaSQLConf import org.apache.spark.sql.delta.test.DeltaSQLCommandTest import org.apache.spark.sql.delta.test.DeltaSQLTestUtils @@ -175,9 +174,8 @@ trait MergeIntoMaterializeSourceErrorTests extends MergeIntoMaterializeSourceMix checkpointedDf.collect() } assert(ex.isInstanceOf[SparkException], ex) - assert( - ex.getMessage().matches(mergeMaterializedSourceRddBlockLostErrorRegex(rdd.id)), - s"RDD id ${rdd.id}: Message: ${ex.getMessage}") + assert(ex.asInstanceOf[SparkException].getErrorClass() == "CHECKPOINT_RDD_BLOCK_ID_NOT_FOUND") + assert(ex.asInstanceOf[SparkException].getMessageParameters().get("blockId") == rdd.id.toString) } for {