From 23aa41e582dbab04af23b26f781d1d3d74334f92 Mon Sep 17 00:00:00 2001 From: Annie Wang <170372889+anniewang-db@users.noreply.github.com> Date: Mon, 1 Jul 2024 14:33:56 -0700 Subject: [PATCH] [Hudi] Catch harmless HoodieExceptions when metadata conversion aborts (#3323) #### Which Delta project/connector is this regarding? - [ ] Spark - [ ] Standalone - [ ] Flink - [ ] Kernel - [x] Other (Hudi) ## Description This PR adds a clause to catch a few specific HoodieExceptions from the Hudi metadata conversion that are not data-corrupting and only cause the conversion to abort. This is acceptable because the conversion of this commit will just happen in a future Delta commit instead. These exceptions can occur due to IO failures or multiple writers trying to write to the metadata table at the same time. If multiple writers are writing to the metadata table at the same time, the issue that occurs is that both writers see a failed commit in the Hudi metadata timeline and try to roll it back. The faster writer is able to roll it back with no problem and no exception, but the slower writer will try to roll it back and find that the commit no longer exists (since it was already rolled back by the faster writer). This will lead to an error. However, since the Hudi metadata table is updated within the Hudi commit transaction, the entire transaction will abort if there is a failure in writing to the metadata table. Thus, the commit is not marked as completed and the state of the Hudi table is unchanged. The incompleted commits to both the metadata table and the actual table itself will be cleaned up in a later transaction. The changes that we wanted to make in this commit will be made in the next commit instead since the lastDeltaVersionConverted is unchanged in the table (no new data added). Also, similar errors can happen after the data is already committed and both writers are trying to clean up the metadata (in function markInstantsAsCleaned). Multiple writers may try to clean up the same Instant and the slower writer will again run into an error. (this is the "Error getting all file groups in pending clustering" error) Again, this does not lead to any data corruption because it is only cleanup step that gets aborted, and the data is already committed to the table. The cleanup will just be performed after a later commit instead. ## How was this patch tested? Unit tests ## Does this PR introduce _any_ user-facing changes? No --- .../sql/delta/hudi/HudiConversionTransaction.scala | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/hudi/src/main/scala/org/apache/spark/sql/delta/hudi/HudiConversionTransaction.scala b/hudi/src/main/scala/org/apache/spark/sql/delta/hudi/HudiConversionTransaction.scala index d3ae26a47fe..98812427326 100644 --- a/hudi/src/main/scala/org/apache/spark/sql/delta/hudi/HudiConversionTransaction.scala +++ b/hudi/src/main/scala/org/apache/spark/sql/delta/hudi/HudiConversionTransaction.scala @@ -48,6 +48,7 @@ import org.apache.hudi.config.HoodieArchivalConfig import org.apache.hudi.config.HoodieCleanConfig import org.apache.hudi.config.HoodieIndexConfig import org.apache.hudi.config.HoodieWriteConfig +import org.apache.hudi.exception.{HoodieException, HoodieRollbackException} import org.apache.hudi.index.HoodieIndex.IndexType.INMEMORY import org.apache.hudi.table.HoodieJavaTable import org.apache.hudi.table.action.clean.CleanPlanner @@ -154,6 +155,16 @@ class HudiConversionTransaction( markInstantsAsCleaned(table, writeClient.getConfig, engineContext) runArchiver(table, writeClient.getConfig, engineContext) } catch { + case e: HoodieException if e.getMessage == "Failed to update metadata" + || e.getMessage == "Error getting all file groups in pending clustering" + || e.getMessage == "Error fetching partition paths from metadata table" => + logInfo(s"[Thread=${Thread.currentThread().getName}] " + + s"Failed to fully update Hudi metadata table for Delta snapshot version $version. " + + s"This is likely due to a concurrent commit and should not lead to data corruption.") + case e: HoodieRollbackException => + logInfo(s"[Thread=${Thread.currentThread().getName}] " + + s"Failed to rollback Hudi metadata table for Delta snapshot version $version. " + + s"This is likely due to a concurrent commit and should not lead to data corruption.") case NonFatal(e) => recordHudiCommit(Some(e)) throw e