From 47871d8adb82f47240347934156b2abee5426e15 Mon Sep 17 00:00:00 2001 From: Annie Wang <170372889+anniewang-db@users.noreply.github.com> Date: Thu, 27 Jun 2024 09:37:35 -0700 Subject: [PATCH] [Hudi] setCommitFileUpdates bug fix (#3309) #### Which Delta project/connector is this regarding? - [ ] Spark - [ ] Standalone - [ ] Flink - [ ] Kernel - [x] Other (Hudi read support) ## Description This PR fixes a bug in the Delta->Hudi metadata conversion logic. When the number of actions to convert is greater than the action batch size (can be changed by user), the previous code incorrectly only converted the last batch instead of converting all batches. ## How was this patch tested? Unit test ## Does this PR introduce _any_ user-facing changes? No --- .../hudi/HudiConversionTransaction.scala | 11 ++++++---- .../sql/delta/hudi/ConvertToHudiSuite.scala | 20 +++++++++++++++++++ 2 files changed, 27 insertions(+), 4 deletions(-) diff --git a/hudi/src/main/scala/org/apache/spark/sql/delta/hudi/HudiConversionTransaction.scala b/hudi/src/main/scala/org/apache/spark/sql/delta/hudi/HudiConversionTransaction.scala index ef56058e0b9..d3ae26a47fe 100644 --- a/hudi/src/main/scala/org/apache/spark/sql/delta/hudi/HudiConversionTransaction.scala +++ b/hudi/src/main/scala/org/apache/spark/sql/delta/hudi/HudiConversionTransaction.scala @@ -86,9 +86,10 @@ class HudiConversionTransaction( private var metaClient = providedMetaClient private val instantTime = convertInstantToCommit( Instant.ofEpochMilli(postCommitSnapshot.timestamp)) - private var writeStatuses: util.List[WriteStatus] = Collections.emptyList[WriteStatus] + private var writeStatuses: util.List[WriteStatus] = + new util.ArrayList[WriteStatus]() private var partitionToReplacedFileIds: util.Map[String, util.List[String]] = - Collections.emptyMap[String, util.List[String]] + new util.HashMap[String, util.List[String]]() private val version = postCommitSnapshot.version /** Tracks if this transaction has already committed. You can only commit once. */ @@ -101,7 +102,7 @@ class HudiConversionTransaction( def setCommitFileUpdates(actions: scala.collection.Seq[Action]): Unit = { // for all removed files, group by partition path and then map to // the file group ID (name in this case) - partitionToReplacedFileIds = actions + val newPartitionToReplacedFileIds = actions .map(_.wrap) .filter(action => action.remove != null) .map(_.remove) @@ -111,8 +112,9 @@ class HudiConversionTransaction( (partitionPath, path.getName)}) .groupBy(_._1).map(v => (v._1, v._2.map(_._2).asJava)) .asJava + partitionToReplacedFileIds.putAll(newPartitionToReplacedFileIds) // Convert the AddFiles to write statuses for the commit - writeStatuses = actions + val newWriteStatuses = actions .map(_.wrap) .filter(action => action.add != null) .map(_.add) @@ -120,6 +122,7 @@ class HudiConversionTransaction( convertAddFile(add, tablePath, instantTime) }) .asJava + writeStatuses.addAll(newWriteStatuses) } def commit(): Unit = { diff --git a/hudi/src/test/scala/org/apache/spark/sql/delta/hudi/ConvertToHudiSuite.scala b/hudi/src/test/scala/org/apache/spark/sql/delta/hudi/ConvertToHudiSuite.scala index dcb51da5136..111fd632a4e 100644 --- a/hudi/src/test/scala/org/apache/spark/sql/delta/hudi/ConvertToHudiSuite.scala +++ b/hudi/src/test/scala/org/apache/spark/sql/delta/hudi/ConvertToHudiSuite.scala @@ -32,6 +32,7 @@ import org.apache.spark.sql.avro.SchemaConverters import org.apache.spark.sql.delta.DeltaOperations.Truncate import org.apache.spark.sql.delta.{DeltaConfigs, DeltaLog, DeltaUnsupportedOperationException, OptimisticTransaction} import org.apache.spark.sql.delta.actions.{Action, AddFile, Metadata, RemoveFile} +import org.apache.spark.sql.delta.sources.DeltaSQLConf import org.apache.spark.sql.types.StructType import org.apache.spark.util.{ManualClock, Utils} import org.scalatest.concurrent.Eventually @@ -215,6 +216,25 @@ class ConvertToHudiSuite extends QueryTest with Eventually { verifyFilesAndSchemaMatch() } + test("all batches of actions are converted") { + withSQLConf( + DeltaSQLConf.HUDI_MAX_COMMITS_TO_CONVERT.key -> "3" + ) { + _sparkSession.sql( + s"""CREATE TABLE `$testTableName` (col1 INT) + | USING DELTA + |LOCATION '$testTablePath'""".stripMargin) + for (i <- 1 to 10) { + _sparkSession.sql(s"INSERT INTO `$testTableName` VALUES ($i)") + } + _sparkSession.sql( + s"""ALTER TABLE `$testTableName` SET TBLPROPERTIES ( + | 'delta.universalFormat.enabledFormats' = 'hudi' + |)""".stripMargin) + verifyFilesAndSchemaMatch() + } + } + def buildHudiMetaClient(): HoodieTableMetaClient = { val hadoopConf: Configuration = _sparkSession.sparkContext.hadoopConfiguration val storageConf : StorageConfiguration[_] = new HadoopStorageConfiguration(hadoopConf)