From bf2495529c3df91df708cdb65a3627e2729ebc0c Mon Sep 17 00:00:00 2001 From: Paddy Xu Date: Fri, 3 Oct 2025 09:49:17 +0000 Subject: [PATCH 1/4] . --- .../MergeIntoMaterializeSourceShims.scala | 25 ++++++++++++++++++ .../MergeIntoMaterializeSourceShims.scala | 26 +++++++++++++++++++ .../merge/MergeIntoMaterializeSource.scala | 8 ++---- 3 files changed, 53 insertions(+), 6 deletions(-) create mode 100644 spark/src/main/scala-spark-3.5/shims/MergeIntoMaterializeSourceShims.scala create mode 100644 spark/src/main/scala-spark-master/shims/MergeIntoMaterializeSourceShims.scala diff --git a/spark/src/main/scala-spark-3.5/shims/MergeIntoMaterializeSourceShims.scala b/spark/src/main/scala-spark-3.5/shims/MergeIntoMaterializeSourceShims.scala new file mode 100644 index 00000000000..e29ccd3e7ed --- /dev/null +++ b/spark/src/main/scala-spark-3.5/shims/MergeIntoMaterializeSourceShims.scala @@ -0,0 +1,25 @@ +/* + * Copyright (2021) The Delta Lake Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.databricks.sql.transaction.tahoe.commands.merge + +object MergeIntoMaterializeSourceShims { + + /** In Spark 3.5 we can only check for the error message :( */ + def mergeMaterializedSourceRddBlockLostError(e: SparkException, rddId: Int): Boolean = { + s.getMessage.matches(s"(?s).*Checkpoint block rdd_${rddId}_[0-9]+ not found!.*") + } +} diff --git a/spark/src/main/scala-spark-master/shims/MergeIntoMaterializeSourceShims.scala b/spark/src/main/scala-spark-master/shims/MergeIntoMaterializeSourceShims.scala new file mode 100644 index 00000000000..8a2435ecb88 --- /dev/null +++ b/spark/src/main/scala-spark-master/shims/MergeIntoMaterializeSourceShims.scala @@ -0,0 +1,26 @@ +/* + * Copyright (2021) The Delta Lake Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.databricks.sql.transaction.tahoe.commands.merge + +object MergeIntoMaterializeSourceShims { + + /** In Spark 4.0+ we could check on error class, which is more stable. */ + def mergeMaterializedSourceRddBlockLostError(e: SparkException, rddId: Int): Boolean = { + e.getErrorClass == "CHECKPOINT_RDD_BLOCK_ID_NOT_FOUND" && + e.getMessageParameters.get("rddId").contains(s"rdd_${rddId}") + } +} diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/commands/merge/MergeIntoMaterializeSource.scala b/spark/src/main/scala/org/apache/spark/sql/delta/commands/merge/MergeIntoMaterializeSource.scala index 13ee66cd8e7..ddaf195f831 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/commands/merge/MergeIntoMaterializeSource.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/commands/merge/MergeIntoMaterializeSource.scala @@ -163,8 +163,8 @@ trait MergeIntoMaterializeSource extends DeltaLogging with DeltaSparkPlanUtils { // SparkCoreErrors.checkpointRDDBlockIdNotFoundError from LocalCheckpointRDD.compute. case s: SparkException if materializedSourceRDD.nonEmpty && - s.getMessage.matches( - mergeMaterializedSourceRddBlockLostErrorRegex(materializedSourceRDD.get.id)) => + MergeIntoMaterializeSourceShims.mergeMaterializedSourceRddBlockLostError( + materializedSourceRDD.get.id)) => log.warn("Materialized Merge source RDD block lost. Merge needs to be restarted. " + s"This was attempt number $attempt.") if (!isLastAttempt) { @@ -401,10 +401,6 @@ object MergeIntoMaterializeSource { MergeIntoMaterializeSourceReason.MATERIALIZED_REASONS.contains(materializeReason)) } - // This depends on SparkCoreErrors.checkpointRDDBlockIdNotFoundError msg - def mergeMaterializedSourceRddBlockLostErrorRegex(rddId: Int): String = - s"(?s).*Checkpoint block rdd_${rddId}_[0-9]+ not found!.*" - /** * @return The columns of the source plan that are used in this MERGE */ From cba75cac06aabbb24a792aa1f32ee1d6dff7eada Mon Sep 17 00:00:00 2001 From: Paddy Xu Date: Fri, 3 Oct 2025 10:18:15 +0000 Subject: [PATCH 2/4] OOPS --- .../sql/delta/commands/merge/MergeIntoMaterializeSource.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/commands/merge/MergeIntoMaterializeSource.scala b/spark/src/main/scala/org/apache/spark/sql/delta/commands/merge/MergeIntoMaterializeSource.scala index b4b936408e6..5141744be49 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/commands/merge/MergeIntoMaterializeSource.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/commands/merge/MergeIntoMaterializeSource.scala @@ -177,7 +177,7 @@ trait MergeIntoMaterializeSource extends DeltaLogging with DeltaSparkPlanUtils { case s: SparkException if materializedSourceRDD.nonEmpty && MergeIntoMaterializeSourceShims.mergeMaterializedSourceRddBlockLostError( - materializedSourceRDD.get.id)) => + materializedSourceRDD.get.id) => logWarning(log"Materialized ${MDC(DeltaLogKeys.OPERATION, operation)} source RDD block " + log"lost. ${MDC(DeltaLogKeys.OPERATION, operation)} needs to be restarted. " + log"This was attempt number ${MDC(DeltaLogKeys.ATTEMPT, attempt)}.") From 577ddb79114ab045afa86e78992d54a347f6ed45 Mon Sep 17 00:00:00 2001 From: Paddy Xu Date: Fri, 3 Oct 2025 12:27:49 +0000 Subject: [PATCH 3/4] fix tests --- .../shims/MergeIntoMaterializeSourceShims.scala | 6 ++++-- .../shims/MergeIntoMaterializeSourceShims.scala | 4 +++- .../delta/commands/merge/MergeIntoMaterializeSource.scala | 1 + .../spark/sql/delta/MergeIntoMaterializeSourceSuite.scala | 7 ++++--- 4 files changed, 12 insertions(+), 6 deletions(-) diff --git a/spark/src/main/scala-spark-3.5/shims/MergeIntoMaterializeSourceShims.scala b/spark/src/main/scala-spark-3.5/shims/MergeIntoMaterializeSourceShims.scala index e29ccd3e7ed..1e0658a47c0 100644 --- a/spark/src/main/scala-spark-3.5/shims/MergeIntoMaterializeSourceShims.scala +++ b/spark/src/main/scala-spark-3.5/shims/MergeIntoMaterializeSourceShims.scala @@ -14,12 +14,14 @@ * limitations under the License. */ -package com.databricks.sql.transaction.tahoe.commands.merge +package org.apache.spark.sql.delta.commands.merge + +import org.apache.spark.SparkException object MergeIntoMaterializeSourceShims { /** In Spark 3.5 we can only check for the error message :( */ def mergeMaterializedSourceRddBlockLostError(e: SparkException, rddId: Int): Boolean = { - s.getMessage.matches(s"(?s).*Checkpoint block rdd_${rddId}_[0-9]+ not found!.*") + e.getMessage.matches(s"(?s).*Checkpoint block rdd_${rddId}_[0-9]+ not found!.*") } } diff --git a/spark/src/main/scala-spark-master/shims/MergeIntoMaterializeSourceShims.scala b/spark/src/main/scala-spark-master/shims/MergeIntoMaterializeSourceShims.scala index 8a2435ecb88..1cbbf0f0edf 100644 --- a/spark/src/main/scala-spark-master/shims/MergeIntoMaterializeSourceShims.scala +++ b/spark/src/main/scala-spark-master/shims/MergeIntoMaterializeSourceShims.scala @@ -14,7 +14,9 @@ * limitations under the License. */ -package com.databricks.sql.transaction.tahoe.commands.merge +package org.apache.spark.sql.delta.commands.merge + +import org.apache.spark.SparkException object MergeIntoMaterializeSourceShims { diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/commands/merge/MergeIntoMaterializeSource.scala b/spark/src/main/scala/org/apache/spark/sql/delta/commands/merge/MergeIntoMaterializeSource.scala index 5141744be49..919d3e9f5de 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/commands/merge/MergeIntoMaterializeSource.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/commands/merge/MergeIntoMaterializeSource.scala @@ -177,6 +177,7 @@ trait MergeIntoMaterializeSource extends DeltaLogging with DeltaSparkPlanUtils { case s: SparkException if materializedSourceRDD.nonEmpty && MergeIntoMaterializeSourceShims.mergeMaterializedSourceRddBlockLostError( + s, materializedSourceRDD.get.id) => logWarning(log"Materialized ${MDC(DeltaLogKeys.OPERATION, operation)} source RDD block " + log"lost. ${MDC(DeltaLogKeys.OPERATION, operation)} needs to be restarted. " + diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/MergeIntoMaterializeSourceSuite.scala b/spark/src/test/scala/org/apache/spark/sql/delta/MergeIntoMaterializeSourceSuite.scala index 4506d77ed2d..d9587b0c27d 100644 --- a/spark/src/test/scala/org/apache/spark/sql/delta/MergeIntoMaterializeSourceSuite.scala +++ b/spark/src/test/scala/org/apache/spark/sql/delta/MergeIntoMaterializeSourceSuite.scala @@ -24,7 +24,7 @@ import scala.util.control.NonFatal import com.databricks.spark.util.{Log4jUsageLogger, MetricDefinitions, UsageRecord} import org.apache.spark.sql.delta.DeltaTestUtils._ import org.apache.spark.sql.delta.commands.merge.{MergeIntoMaterializeSourceError, MergeIntoMaterializeSourceErrorType, MergeIntoMaterializeSourceReason, MergeStats} -import org.apache.spark.sql.delta.commands.merge.MergeIntoMaterializeSource.mergeMaterializedSourceRddBlockLostErrorRegex +import org.apache.spark.sql.delta.commands.merge.MergeIntoMaterializeSourceShims import org.apache.spark.sql.delta.sources.DeltaSQLConf import org.apache.spark.sql.delta.test.DeltaSQLCommandTest import org.apache.spark.sql.delta.test.DeltaSQLTestUtils @@ -176,8 +176,9 @@ trait MergeIntoMaterializeSourceErrorTests extends MergeIntoMaterializeSourceMix } assert(ex.isInstanceOf[SparkException], ex) assert( - ex.getMessage().matches(mergeMaterializedSourceRddBlockLostErrorRegex(rdd.id)), - s"RDD id ${rdd.id}: Message: ${ex.getMessage}") + MergeIntoMaterializeSourceShims.mergeMaterializedSourceRddBlockLostError( + ex.asInstanceOf[SparkException], + rdd.id)) } for { From 6b12b47decc5793f9dcd99e7af4f0f0074087c5a Mon Sep 17 00:00:00 2001 From: Paddy Xu Date: Fri, 3 Oct 2025 14:12:18 +0000 Subject: [PATCH 4/4] fix tests --- .../shims/MergeIntoMaterializeSourceShims.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/spark/src/main/scala-spark-master/shims/MergeIntoMaterializeSourceShims.scala b/spark/src/main/scala-spark-master/shims/MergeIntoMaterializeSourceShims.scala index 1cbbf0f0edf..b9452385643 100644 --- a/spark/src/main/scala-spark-master/shims/MergeIntoMaterializeSourceShims.scala +++ b/spark/src/main/scala-spark-master/shims/MergeIntoMaterializeSourceShims.scala @@ -23,6 +23,6 @@ object MergeIntoMaterializeSourceShims { /** In Spark 4.0+ we could check on error class, which is more stable. */ def mergeMaterializedSourceRddBlockLostError(e: SparkException, rddId: Int): Boolean = { e.getErrorClass == "CHECKPOINT_RDD_BLOCK_ID_NOT_FOUND" && - e.getMessageParameters.get("rddId").contains(s"rdd_${rddId}") + e.getMessageParameters.get("rddBlockId").contains(s"rdd_${rddId}") } }