diff --git a/hudi/src/main/scala/org/apache/spark/sql/delta/hudi/HudiConversionTransaction.scala b/hudi/src/main/scala/org/apache/spark/sql/delta/hudi/HudiConversionTransaction.scala index ef56058e0b9..d3ae26a47fe 100644 --- a/hudi/src/main/scala/org/apache/spark/sql/delta/hudi/HudiConversionTransaction.scala +++ b/hudi/src/main/scala/org/apache/spark/sql/delta/hudi/HudiConversionTransaction.scala @@ -86,9 +86,10 @@ class HudiConversionTransaction( private var metaClient = providedMetaClient private val instantTime = convertInstantToCommit( Instant.ofEpochMilli(postCommitSnapshot.timestamp)) - private var writeStatuses: util.List[WriteStatus] = Collections.emptyList[WriteStatus] + private var writeStatuses: util.List[WriteStatus] = + new util.ArrayList[WriteStatus]() private var partitionToReplacedFileIds: util.Map[String, util.List[String]] = - Collections.emptyMap[String, util.List[String]] + new util.HashMap[String, util.List[String]]() private val version = postCommitSnapshot.version /** Tracks if this transaction has already committed. You can only commit once. */ @@ -101,7 +102,7 @@ class HudiConversionTransaction( def setCommitFileUpdates(actions: scala.collection.Seq[Action]): Unit = { // for all removed files, group by partition path and then map to // the file group ID (name in this case) - partitionToReplacedFileIds = actions + val newPartitionToReplacedFileIds = actions .map(_.wrap) .filter(action => action.remove != null) .map(_.remove) @@ -111,8 +112,9 @@ class HudiConversionTransaction( (partitionPath, path.getName)}) .groupBy(_._1).map(v => (v._1, v._2.map(_._2).asJava)) .asJava + partitionToReplacedFileIds.putAll(newPartitionToReplacedFileIds) // Convert the AddFiles to write statuses for the commit - writeStatuses = actions + val newWriteStatuses = actions .map(_.wrap) .filter(action => action.add != null) .map(_.add) @@ -120,6 +122,7 @@ class HudiConversionTransaction( convertAddFile(add, tablePath, instantTime) }) .asJava + writeStatuses.addAll(newWriteStatuses) } def commit(): Unit = { diff --git a/hudi/src/test/scala/org/apache/spark/sql/delta/hudi/ConvertToHudiSuite.scala b/hudi/src/test/scala/org/apache/spark/sql/delta/hudi/ConvertToHudiSuite.scala index dcb51da5136..111fd632a4e 100644 --- a/hudi/src/test/scala/org/apache/spark/sql/delta/hudi/ConvertToHudiSuite.scala +++ b/hudi/src/test/scala/org/apache/spark/sql/delta/hudi/ConvertToHudiSuite.scala @@ -32,6 +32,7 @@ import org.apache.spark.sql.avro.SchemaConverters import org.apache.spark.sql.delta.DeltaOperations.Truncate import org.apache.spark.sql.delta.{DeltaConfigs, DeltaLog, DeltaUnsupportedOperationException, OptimisticTransaction} import org.apache.spark.sql.delta.actions.{Action, AddFile, Metadata, RemoveFile} +import org.apache.spark.sql.delta.sources.DeltaSQLConf import org.apache.spark.sql.types.StructType import org.apache.spark.util.{ManualClock, Utils} import org.scalatest.concurrent.Eventually @@ -215,6 +216,25 @@ class ConvertToHudiSuite extends QueryTest with Eventually { verifyFilesAndSchemaMatch() } + test("all batches of actions are converted") { + withSQLConf( + DeltaSQLConf.HUDI_MAX_COMMITS_TO_CONVERT.key -> "3" + ) { + _sparkSession.sql( + s"""CREATE TABLE `$testTableName` (col1 INT) + | USING DELTA + |LOCATION '$testTablePath'""".stripMargin) + for (i <- 1 to 10) { + _sparkSession.sql(s"INSERT INTO `$testTableName` VALUES ($i)") + } + _sparkSession.sql( + s"""ALTER TABLE `$testTableName` SET TBLPROPERTIES ( + | 'delta.universalFormat.enabledFormats' = 'hudi' + |)""".stripMargin) + verifyFilesAndSchemaMatch() + } + } + def buildHudiMetaClient(): HoodieTableMetaClient = { val hadoopConf: Configuration = _sparkSession.sparkContext.hadoopConfiguration val storageConf : StorageConfiguration[_] = new HadoopStorageConfiguration(hadoopConf)