From b4871dae5c71120411e3f256b3e197624810b499 Mon Sep 17 00:00:00 2001 From: sivabalan Date: Fri, 22 Nov 2024 21:10:30 -0800 Subject: [PATCH] Removed WriteStatus usages across Metadata Writer flows --- .../org/apache/hudi/client/BaseHoodieClient.java | 6 ++---- .../hudi/client/BaseHoodieTableServiceClient.java | 11 +++++------ .../apache/hudi/client/BaseHoodieWriteClient.java | 15 +++++++-------- .../metadata/HoodieBackedTableMetadataWriter.java | 4 +--- .../hudi/metadata/HoodieTableMetadataWriter.java | 8 +++----- .../hudi/table/action/BaseActionExecutor.java | 6 ++---- .../action/commit/BaseCommitActionExecutor.java | 4 ++-- .../index/WriteStatBasedIndexingCatchupTask.java | 2 +- .../common/testutils/HoodieMetadataTestTable.java | 12 ++++++------ .../client/HoodieFlinkTableServiceClient.java | 7 +++---- .../hudi/client/HoodieFlinkWriteClient.java | 12 +++++------- .../commit/BaseFlinkCommitActionExecutor.java | 3 +-- .../apache/hudi/client/HoodieJavaWriteClient.java | 2 +- .../commit/BaseJavaCommitActionExecutor.java | 3 +-- .../apache/hudi/client/SparkRDDWriteClient.java | 2 +- .../SparkBootstrapCommitActionExecutor.java | 2 +- .../commit/BaseSparkCommitActionExecutor.java | 2 +- .../hudi/client/TestSparkRDDWriteClient.java | 2 +- .../functional/TestConsistentBucketIndex.java | 2 +- .../hudi/io/TestHoodieTimelineArchiver.java | 2 +- ...TestHoodieSparkMergeOnReadTableCompaction.java | 4 ++-- .../TestSparkNonBlockingConcurrencyControl.java | 11 +---------- .../hudi/testutils/HoodieCleanerTestBase.java | 2 +- .../hudi/metadata/BaseFileRecordParsingUtils.java | 2 -- .../hudi/metadata/HoodieTableMetadataUtil.java | 4 ++-- .../sink/clustering/ClusteringCommitSink.java | 3 +-- .../internal/DataSourceInternalWriterHelper.java | 2 +- .../TestSparkConsistentBucketClustering.java | 4 ++-- .../TestSparkSortAndSizeClustering.java | 2 +- 29 files changed, 57 insertions(+), 84 deletions(-) diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieClient.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieClient.java index 7651057c05f7a..c711ac5a388cf 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieClient.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieClient.java @@ -24,7 +24,6 @@ import org.apache.hudi.client.heartbeat.HoodieHeartbeatClient; import org.apache.hudi.client.transaction.TransactionManager; import org.apache.hudi.client.utils.TransactionUtils; -import org.apache.hudi.common.data.HoodieData; import org.apache.hudi.common.engine.HoodieEngineContext; import org.apache.hudi.common.model.HoodieCommitMetadata; import org.apache.hudi.common.model.HoodieWriteStat; @@ -278,14 +277,13 @@ protected void finalizeWrite(HoodieTable table, String instantTime, List writeStatuses) { + protected void writeTableMetadata(HoodieTable table, String instantTime, HoodieCommitMetadata metadata) { context.setJobStatus(this.getClass().getSimpleName(), "Committing to metadata table: " + config.getTableName()); Option metadataWriterOpt = table.getMetadataWriter(instantTime); if (metadataWriterOpt.isPresent()) { try (HoodieTableMetadataWriter metadataWriter = metadataWriterOpt.get()) { - metadataWriter.updateFromWriteStatuses(metadata, writeStatuses, instantTime); + metadataWriter.update(metadata, instantTime); } catch (Exception e) { if (e instanceof HoodieException) { throw (HoodieException) e; diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java index fa43da009ede1..21cb613429922 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java @@ -341,7 +341,7 @@ protected void completeCompaction(HoodieCommitMetadata metadata, HoodieTable tab this.txnManager.beginTransaction(Option.of(compactionInstant), Option.empty()); finalizeWrite(table, compactionCommitTime, writeStats); // commit to data table after committing to metadata table. - writeTableMetadata(table, compactionCommitTime, metadata, context.emptyHoodieData()); + writeTableMetadata(table, compactionCommitTime, metadata); LOG.info("Committing Compaction {}", compactionCommitTime); LOG.debug("Compaction {} finished with result: {}", compactionCommitTime, metadata); CompactHelpers.getInstance().completeInflightCompaction(table, compactionCommitTime, metadata); @@ -404,7 +404,7 @@ protected void completeLogCompaction(HoodieCommitMetadata metadata, HoodieTable preCommit(metadata); finalizeWrite(table, logCompactionCommitTime, writeStats); // commit to data table after committing to metadata table. - writeTableMetadata(table, logCompactionCommitTime, metadata, context.emptyHoodieData()); + writeTableMetadata(table, logCompactionCommitTime, metadata); LOG.info("Committing Log Compaction {}", logCompactionCommitTime); LOG.debug("Log Compaction {} finished with result {}", logCompactionCommitTime, metadata); CompactHelpers.getInstance().completeInflightLogCompaction(table, logCompactionCommitTime, metadata); @@ -490,7 +490,7 @@ public HoodieWriteMetadata cluster(String clusteringInstant, boolean shouldCo // TODO : Where is shouldComplete used ? if (shouldComplete && clusteringMetadata.getCommitMetadata().isPresent()) { - completeClustering((HoodieReplaceCommitMetadata) clusteringMetadata.getCommitMetadata().get(), table, clusteringInstant, Option.ofNullable(convertToWriteStatus(writeMetadata))); + completeClustering((HoodieReplaceCommitMetadata) clusteringMetadata.getCommitMetadata().get(), table, clusteringInstant); } return clusteringMetadata; } @@ -526,8 +526,7 @@ public HoodieWriteMetadata managePartitionTTL(String instantTime) { private void completeClustering(HoodieReplaceCommitMetadata metadata, HoodieTable table, - String clusteringCommitTime, - Option> writeStatuses) { + String clusteringCommitTime) { List writeStats = metadata.getWriteStats(); handleWriteErrors(writeStats, TableServiceType.CLUSTER); final HoodieInstant clusteringInstant = ClusteringUtils.getInflightClusteringInstant(clusteringCommitTime, @@ -542,7 +541,7 @@ private void completeClustering(HoodieReplaceCommitMetadata metadata, preCommit(metadata); } // Update table's metadata (table) - writeTableMetadata(table, clusteringInstant.requestedTime(), metadata, writeStatuses.orElseGet(context::emptyHoodieData)); + writeTableMetadata(table, clusteringInstant.requestedTime(), metadata); LOG.info("Committing Clustering {}", clusteringCommitTime); LOG.debug("Clustering {} finished with result {}", clusteringCommitTime, metadata); diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java index 07df2e8b7f115..ee27e454f0a93 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java @@ -34,7 +34,6 @@ import org.apache.hudi.common.HoodiePendingRollbackInfo; import org.apache.hudi.common.config.HoodieCommonConfig; import org.apache.hudi.common.config.HoodieConfig; -import org.apache.hudi.common.data.HoodieData; import org.apache.hudi.common.engine.HoodieEngineContext; import org.apache.hudi.common.model.ActionType; import org.apache.hudi.common.model.HoodieCommitMetadata; @@ -216,12 +215,12 @@ public abstract boolean commit(String instantTime, O writeStatuses, Option> partitionToReplacedFileIds, Option> extraPreCommitFunc); - public boolean commitStats(String instantTime, HoodieData writeStatuses, List stats, Option> extraMetadata, + public boolean commitStats(String instantTime, List stats, Option> extraMetadata, String commitActionType) { - return commitStats(instantTime, writeStatuses, stats, extraMetadata, commitActionType, Collections.emptyMap(), Option.empty()); + return commitStats(instantTime, stats, extraMetadata, commitActionType, Collections.emptyMap(), Option.empty()); } - public boolean commitStats(String instantTime, HoodieData writeStatuses, List stats, + public boolean commitStats(String instantTime, List stats, Option> extraMetadata, String commitActionType, Map> partitionToReplaceFileIds, Option> extraPreCommitFunc) { @@ -243,7 +242,7 @@ public boolean commitStats(String instantTime, HoodieData writeStat if (extraPreCommitFunc.isPresent()) { extraPreCommitFunc.get().accept(table.getMetaClient(), metadata); } - commit(table, commitActionType, instantTime, metadata, stats, writeStatuses); + commit(table, commitActionType, instantTime, metadata, stats); postCommit(table, metadata, instantTime, extraMetadata); LOG.info("Committed " + instantTime); } catch (IOException e) { @@ -282,7 +281,7 @@ public boolean commitStats(String instantTime, HoodieData writeStat } protected void commit(HoodieTable table, String commitActionType, String instantTime, HoodieCommitMetadata metadata, - List stats, HoodieData writeStatuses) throws IOException { + List stats) throws IOException { LOG.info("Committing " + instantTime + " action " + commitActionType); HoodieActiveTimeline activeTimeline = table.getActiveTimeline(); // Finalize write @@ -293,7 +292,7 @@ protected void commit(HoodieTable table, String commitActionType, String instant saveInternalSchema(table, instantTime, metadata); } // update Metadata table - writeTableMetadata(table, instantTime, metadata, writeStatuses); + writeTableMetadata(table, instantTime, metadata); activeTimeline.saveAsComplete(false, table.getMetaClient().createNewInstant(HoodieInstant.State.INFLIGHT, commitActionType, instantTime), serializeCommitMetadata(table.getMetaClient().getCommitMetadataSerDe(), metadata)); } @@ -1611,7 +1610,7 @@ public void commitTableChange(InternalSchema newSchema, HoodieTableMetaClient me // try to save history schemas FileBasedInternalSchemaStorageManager schemasManager = new FileBasedInternalSchemaStorageManager(metaClient); schemasManager.persistHistorySchemaStr(instantTime, SerDeHelper.inheritSchemas(newSchema, historySchemaStr)); - commitStats(instantTime, context.emptyHoodieData(), Collections.emptyList(), Option.of(extraMeta), commitActionType); + commitStats(instantTime, Collections.emptyList(), Option.of(extraMeta), commitActionType); } private InternalSchema getInternalSchema(TableSchemaResolver schemaUtil) { diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java index 3697e4617a55d..5fe297fad4e53 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java @@ -25,7 +25,6 @@ import org.apache.hudi.avro.model.HoodieRestorePlan; import org.apache.hudi.avro.model.HoodieRollbackMetadata; import org.apache.hudi.client.BaseHoodieWriteClient; -import org.apache.hudi.client.WriteStatus; import org.apache.hudi.common.config.HoodieMetadataConfig; import org.apache.hudi.common.data.HoodieData; import org.apache.hudi.common.engine.EngineType; @@ -1043,12 +1042,11 @@ public void buildMetadataPartitions(HoodieEngineContext engineContext, List writeStatus, String instantTime) { + public void update(HoodieCommitMetadata commitMetadata, String instantTime) { processAndCommit(instantTime, () -> { Map> partitionToRecordMap = HoodieTableMetadataUtil.convertMetadataToRecords( diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataWriter.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataWriter.java index 7578949a8e027..119c97f0fceee 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataWriter.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataWriter.java @@ -22,7 +22,6 @@ import org.apache.hudi.avro.model.HoodieIndexPartitionInfo; import org.apache.hudi.avro.model.HoodieRestoreMetadata; import org.apache.hudi.avro.model.HoodieRollbackMetadata; -import org.apache.hudi.client.WriteStatus; import org.apache.hudi.common.data.HoodieData; import org.apache.hudi.common.engine.HoodieEngineContext; import org.apache.hudi.common.model.HoodieCommitMetadata; @@ -57,16 +56,15 @@ public interface HoodieTableMetadataWriter extends Serializable, AutoCloseable { /** * Update the metadata table due to a COMMIT operation. - * * @param commitMetadata commit metadata of the operation of interest. * @param instantTime instant time of the commit. */ - void updateFromWriteStatuses(HoodieCommitMetadata commitMetadata, HoodieData writeStatuses, String instantTime); + void update(HoodieCommitMetadata commitMetadata, String instantTime); /** * Update the metadata table due to a COMMIT or REPLACECOMMIT operation. - * As compared to {@link #updateFromWriteStatuses(HoodieCommitMetadata, HoodieData, String)}, this method - * directly updates metadata with the given records, instead of first converting {@link WriteStatus} to {@link HoodieRecord}. + * As compared to {@link #update(HoodieCommitMetadata, String)}, this method + * directly updates metadata with the given records, instead of generating HoodieRecords based on HoodieCommitMetadata. * * @param commitMetadata commit metadata of the operation of interest. * @param records records to update metadata with. diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/BaseActionExecutor.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/BaseActionExecutor.java index c4ca56778328f..5940129c91d70 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/BaseActionExecutor.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/BaseActionExecutor.java @@ -21,8 +21,6 @@ import org.apache.hudi.avro.model.HoodieCleanMetadata; import org.apache.hudi.avro.model.HoodieRestoreMetadata; import org.apache.hudi.avro.model.HoodieRollbackMetadata; -import org.apache.hudi.client.WriteStatus; -import org.apache.hudi.common.data.HoodieData; import org.apache.hudi.common.engine.HoodieEngineContext; import org.apache.hudi.common.model.HoodieCommitMetadata; import org.apache.hudi.common.model.WriteOperationType; @@ -72,7 +70,7 @@ public BaseActionExecutor(HoodieEngineContext context, HoodieWriteConfig config, * * @param metadata commit metadata of interest. */ - protected final void writeTableMetadata(HoodieCommitMetadata metadata, HoodieData writeStatus, String actionType) { + protected final void writeTableMetadata(HoodieCommitMetadata metadata, String actionType) { // Recreate MDT for insert_overwrite_table operation. if (table.getConfig().isMetadataTableEnabled() && WriteOperationType.INSERT_OVERWRITE_TABLE == metadata.getOperationType()) { @@ -83,7 +81,7 @@ protected final void writeTableMetadata(HoodieCommitMetadata metadata, HoodieDat Option metadataWriterOpt = table.getMetadataWriter(instantTime); if (metadataWriterOpt.isPresent()) { try (HoodieTableMetadataWriter metadataWriter = metadataWriterOpt.get()) { - metadataWriter.updateFromWriteStatuses(metadata, writeStatus, instantTime); + metadataWriter.update(metadata, instantTime); } catch (Exception e) { if (e instanceof HoodieException) { throw (HoodieException) e; diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseCommitActionExecutor.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseCommitActionExecutor.java index ea20983819f4e..055fd7e10bac2 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseCommitActionExecutor.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseCommitActionExecutor.java @@ -211,7 +211,7 @@ protected void autoCommit(HoodieWriteMetadata result) { protected abstract void commit(HoodieWriteMetadata result); - protected void commit(HoodieData writeStatuses, HoodieWriteMetadata result, List writeStats) { + protected void commit(HoodieWriteMetadata result, List writeStats) { String actionType = getCommitActionType(); LOG.info("Committing " + instantTime + ", action Type " + actionType + ", operation Type " + operationType); result.setCommitted(true); @@ -222,7 +222,7 @@ protected void commit(HoodieData writeStatuses, HoodieWriteMetadata HoodieActiveTimeline activeTimeline = table.getActiveTimeline(); HoodieCommitMetadata metadata = result.getCommitMetadata().get(); - writeTableMetadata(metadata, writeStatuses, actionType); + writeTableMetadata(metadata, actionType); // cannot serialize maps with null values metadata.getExtraMetadata().entrySet().removeIf(entry -> entry.getValue() == null); activeTimeline.saveAsComplete(false, diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/index/WriteStatBasedIndexingCatchupTask.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/index/WriteStatBasedIndexingCatchupTask.java index 727028e1eb61d..25230049a769f 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/index/WriteStatBasedIndexingCatchupTask.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/index/WriteStatBasedIndexingCatchupTask.java @@ -50,6 +50,6 @@ public WriteStatBasedIndexingCatchupTask(HoodieTableMetadataWriter metadataWrite public void updateIndexForWriteAction(HoodieInstant instant) throws IOException { HoodieCommitMetadata commitMetadata = metaClient.getCommitMetadataSerDe().deserialize(instant, metaClient.getActiveTimeline().getInstantDetails(instant).get(), HoodieCommitMetadata.class); - metadataWriter.updateFromWriteStatuses(commitMetadata, engineContext.emptyHoodieData(), instant.requestedTime()); + metadataWriter.update(commitMetadata, instant.requestedTime()); } } diff --git a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/common/testutils/HoodieMetadataTestTable.java b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/common/testutils/HoodieMetadataTestTable.java index b4a2f5a2d46c8..14c4885b4d0b7 100644 --- a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/common/testutils/HoodieMetadataTestTable.java +++ b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/common/testutils/HoodieMetadataTestTable.java @@ -92,7 +92,7 @@ public HoodieCommitMetadata doWriteOperation(String commitTime, WriteOperationTy partitionToFilesNameLengthMap, bootstrap, true); if (writer != null && !createInflightCommit) { writer.performTableServices(Option.of(commitTime)); - writer.updateFromWriteStatuses(commitMetadata, context.get().emptyHoodieData(), commitTime); + writer.update(commitMetadata, commitTime); } // DT should be committed after MDT. if (!createInflightCommit) { @@ -110,7 +110,7 @@ public HoodieCommitMetadata doWriteOperation(String commitTime, WriteOperationTy public HoodieTestTable moveInflightCommitToComplete(String instantTime, HoodieCommitMetadata metadata) throws IOException { super.moveInflightCommitToComplete(instantTime, metadata); if (writer != null) { - writer.updateFromWriteStatuses(metadata, context.get().emptyHoodieData(), instantTime); + writer.update(metadata, instantTime); } return this; } @@ -119,7 +119,7 @@ public HoodieTestTable moveInflightCommitToComplete(String instantTime, HoodieCo public HoodieTestTable moveInflightCompactionToComplete(String instantTime, HoodieCommitMetadata metadata) throws IOException { super.moveInflightCompactionToComplete(instantTime, metadata); if (writer != null) { - writer.updateFromWriteStatuses(metadata, context.get().emptyHoodieData(), instantTime); + writer.update(metadata, instantTime); } return this; } @@ -146,7 +146,7 @@ public void repeatClean(String cleanCommitTime, public HoodieTestTable addCompaction(String instantTime, HoodieCommitMetadata commitMetadata) throws Exception { super.addCompaction(instantTime, commitMetadata); if (writer != null) { - writer.updateFromWriteStatuses(commitMetadata, context.get().emptyHoodieData(), instantTime); + writer.update(commitMetadata, instantTime); } return this; } @@ -177,7 +177,7 @@ public HoodieTestTable addReplaceCommit( HoodieReplaceCommitMetadata completeReplaceMetadata) throws Exception { super.addReplaceCommit(instantTime, requestedReplaceMetadata, inflightReplaceMetadata, completeReplaceMetadata); if (writer != null) { - writer.updateFromWriteStatuses(completeReplaceMetadata, context.get().emptyHoodieData(), instantTime); + writer.update(completeReplaceMetadata, instantTime); } return this; } @@ -187,7 +187,7 @@ public HoodieTestTable addCluster(String instantTime, HoodieRequestedReplaceMeta HoodieReplaceCommitMetadata completeReplaceMetadata) throws Exception { super.addCluster(instantTime, requestedReplaceMetadata, inflightReplaceMetadata, completeReplaceMetadata); if (writer != null) { - writer.updateFromWriteStatuses(completeReplaceMetadata, context.get().emptyHoodieData(), instantTime); + writer.update(completeReplaceMetadata, instantTime); } return this; } diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkTableServiceClient.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkTableServiceClient.java index 5fac2d9a03746..2e4dca9db039f 100644 --- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkTableServiceClient.java +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkTableServiceClient.java @@ -88,7 +88,7 @@ protected void completeCompaction(HoodieCommitMetadata metadata, HoodieTable tab // commit to data table after committing to metadata table. // Do not do any conflict resolution here as we do with regular writes. We take the lock here to ensure all writes to metadata table happens within a // single lock (single writer). Because more than one write to metadata table will result in conflicts since all of them updates the same partition. - writeTableMetadata(table, compactionCommitTime, metadata, context.emptyHoodieData()); + writeTableMetadata(table, compactionCommitTime, metadata); LOG.info("Committing Compaction {} finished with result {}.", compactionCommitTime, metadata); CompactHelpers.getInstance().completeInflightCompaction(table, compactionCommitTime, metadata); } finally { @@ -113,8 +113,7 @@ protected void completeCompaction(HoodieCommitMetadata metadata, HoodieTable tab protected void completeClustering( HoodieReplaceCommitMetadata metadata, HoodieTable>, List, List> table, - String clusteringCommitTime, - Option> writeStatuses) { + String clusteringCommitTime) { this.context.setJobStatus(this.getClass().getSimpleName(), "Collect clustering write status and commit clustering"); HoodieInstant clusteringInstant = ClusteringUtils.getInflightClusteringInstant(clusteringCommitTime, table.getActiveTimeline(), table.getInstantGenerator()).get(); List writeStats = metadata.getPartitionToWriteStats().entrySet().stream().flatMap(e -> @@ -135,7 +134,7 @@ protected void completeClustering( // commit to data table after committing to metadata table. // We take the lock here to ensure all writes to metadata table happens within a single lock (single writer). // Because more than one write to metadata table will result in conflicts since all of them updates the same partition. - writeTableMetadata(table, clusteringCommitTime, metadata, writeStatuses.orElseGet(context::emptyHoodieData)); + writeTableMetadata(table, clusteringCommitTime, metadata); LOG.info("Committing Clustering {} finished with result {}.", clusteringCommitTime, metadata); ClusteringUtils.transitionClusteringOrReplaceInflightToComplete( diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java index 3bac02db25ac1..facd9d4906a41 100644 --- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java @@ -109,7 +109,7 @@ public boolean commit(String instantTime, List writeStatuses, Optio .values().stream() .map(duplicates -> duplicates.stream().reduce(WriteStatMerger::merge).get()) .collect(Collectors.toList()); - return commitStats(instantTime, HoodieListData.eager(writeStatuses), merged, extraMetadata, commitActionType, partitionToReplacedFileIds, extraPreCommitFunc); + return commitStats(instantTime, merged, extraMetadata, commitActionType, partitionToReplacedFileIds, extraPreCommitFunc); } @Override @@ -375,9 +375,8 @@ public HoodieWriteMetadata> cluster(final String clusteringIns private void completeClustering( HoodieReplaceCommitMetadata metadata, HoodieTable>, List, List> table, - String clusteringCommitTime, - Option> writeStatuses) { - ((HoodieFlinkTableServiceClient) tableServiceClient).completeClustering(metadata, table, clusteringCommitTime, writeStatuses); + String clusteringCommitTime) { + ((HoodieFlinkTableServiceClient) tableServiceClient).completeClustering(metadata, table, clusteringCommitTime); } @Override @@ -394,11 +393,10 @@ public void completeTableService( TableServiceType tableServiceType, HoodieCommitMetadata metadata, HoodieTable>, List, List> table, - String commitInstant, - Option> writeStatuses) { + String commitInstant) { switch (tableServiceType) { case CLUSTER: - completeClustering((HoodieReplaceCommitMetadata) metadata, table, commitInstant, writeStatuses); + completeClustering((HoodieReplaceCommitMetadata) metadata, table, commitInstant); break; case COMPACT: completeCompaction(metadata, table, commitInstant); diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/BaseFlinkCommitActionExecutor.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/BaseFlinkCommitActionExecutor.java index 750ef71bc37b5..96a6488194aec 100644 --- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/BaseFlinkCommitActionExecutor.java +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/BaseFlinkCommitActionExecutor.java @@ -123,8 +123,7 @@ protected String getCommitActionType() { @Override protected void commit(HoodieWriteMetadata> result) { - commit(HoodieListData.eager(result.getWriteStatuses()), result, - result.getWriteStatuses().stream().map(WriteStatus::getStat).collect(Collectors.toList())); + commit(result, result.getWriteStatuses().stream().map(WriteStatus::getStat).collect(Collectors.toList())); } protected void setCommitMetadata(HoodieWriteMetadata> result) { diff --git a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/HoodieJavaWriteClient.java b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/HoodieJavaWriteClient.java index 968454a1b3dbb..4742c25c557dd 100644 --- a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/HoodieJavaWriteClient.java +++ b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/HoodieJavaWriteClient.java @@ -88,7 +88,7 @@ public boolean commit(String instantTime, Map> partitionToReplacedFileIds, Option> extraPreCommitFunc) { List writeStats = writeStatuses.stream().map(WriteStatus::getStat).collect(Collectors.toList()); - return commitStats(instantTime, HoodieListData.eager(writeStatuses), writeStats, extraMetadata, commitActionType, partitionToReplacedFileIds, + return commitStats(instantTime, writeStats, extraMetadata, commitActionType, partitionToReplacedFileIds, extraPreCommitFunc); } diff --git a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/BaseJavaCommitActionExecutor.java b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/BaseJavaCommitActionExecutor.java index 054f0342ccfeb..e3a388661b0a8 100644 --- a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/BaseJavaCommitActionExecutor.java +++ b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/BaseJavaCommitActionExecutor.java @@ -189,8 +189,7 @@ protected Pair, WorkloadStat> buildProfile(List> result) { - commit(HoodieListData.eager(result.getWriteStatuses()), result, - result.getWriteStatuses().stream().map(WriteStatus::getStat).collect(Collectors.toList())); + commit(result, result.getWriteStatuses().stream().map(WriteStatus::getStat).collect(Collectors.toList())); } protected void setCommitMetadata(HoodieWriteMetadata> result) { diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java index 2e639b2819297..12c9a0f80b756 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java @@ -90,7 +90,7 @@ public boolean commit(String instantTime, JavaRDD writeStatuses, Op Option> extraPreCommitFunc) { context.setJobStatus(this.getClass().getSimpleName(), "Committing stats: " + config.getTableName()); List writeStats = writeStatuses.map(WriteStatus::getStat).collect(); - return commitStats(instantTime, HoodieJavaRDD.of(writeStatuses), writeStats, extraMetadata, commitActionType, partitionToReplacedFileIds, extraPreCommitFunc); + return commitStats(instantTime, writeStats, extraMetadata, commitActionType, partitionToReplacedFileIds, extraPreCommitFunc); } @Override diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/bootstrap/SparkBootstrapCommitActionExecutor.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/bootstrap/SparkBootstrapCommitActionExecutor.java index e7175c55d9f64..9b842064000f6 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/bootstrap/SparkBootstrapCommitActionExecutor.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/bootstrap/SparkBootstrapCommitActionExecutor.java @@ -214,7 +214,7 @@ protected void commit(HoodieWriteMetadata> result) { LOG.info("Finished writing bootstrap index for source " + config.getBootstrapSourceBasePath() + " in table " + config.getBasePath()); } - commit(result.getWriteStatuses(), result, bootstrapSourceAndStats.values().stream() + commit(result, bootstrapSourceAndStats.values().stream() .flatMap(f -> f.stream().map(Pair::getValue)).collect(Collectors.toList())); LOG.info("Committing metadata bootstrap !!"); } diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/BaseSparkCommitActionExecutor.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/BaseSparkCommitActionExecutor.java index 36902a8c3f239..94d8748166bc9 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/BaseSparkCommitActionExecutor.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/BaseSparkCommitActionExecutor.java @@ -289,7 +289,7 @@ protected void setCommitMetadata(HoodieWriteMetadata> re @Override protected void commit(HoodieWriteMetadata> result) { context.setJobStatus(this.getClass().getSimpleName(), "Commit write status collect: " + config.getTableName()); - commit(result.getWriteStatuses(), result, result.getWriteStats().isPresent() + commit(result, result.getWriteStats().isPresent() ? result.getWriteStats().get() : result.getWriteStatuses().map(WriteStatus::getStat).collectAsList()); } diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestSparkRDDWriteClient.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestSparkRDDWriteClient.java index a299b2f337a87..303f83241f79e 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestSparkRDDWriteClient.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestSparkRDDWriteClient.java @@ -140,7 +140,7 @@ void testWriteClientReleaseResourcesShouldOnlyUnpersistRelevantRdds( String metadataTableBasePath = HoodieTableMetadata.getMetadataTableBasePath(writeConfig.getBasePath()); List metadataTableCacheIds0 = context().getCachedDataIds(HoodieDataCacheKey.of(metadataTableBasePath, instant0)); List metadataTableCacheIds1 = context().getCachedDataIds(HoodieDataCacheKey.of(metadataTableBasePath, instant1)); - writeClient.commitStats(instant1, context().parallelize(writeStatuses, 1), writeStatuses.stream().map(WriteStatus::getStat).collect(Collectors.toList()), + writeClient.commitStats(instant1, writeStatuses.stream().map(WriteStatus::getStat).collect(Collectors.toList()), Option.empty(), metaClient.getCommitActionType()); writeClient.close(); diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestConsistentBucketIndex.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestConsistentBucketIndex.java index c790b3cebdc93..812c17d3743c4 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestConsistentBucketIndex.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestConsistentBucketIndex.java @@ -275,7 +275,7 @@ private List writeData(JavaRDD records, String commit } org.apache.hudi.testutils.Assertions.assertNoWriteErrors(writeStatues); if (doCommit) { - boolean success = writeClient.commitStats(commitTime, context.parallelize(writeStatues, 1), writeStatues.stream().map(WriteStatus::getStat).collect(Collectors.toList()), + boolean success = writeClient.commitStats(commitTime, writeStatues.stream().map(WriteStatus::getStat).collect(Collectors.toList()), Option.empty(), metaClient.getCommitActionType()); Assertions.assertTrue(success); } diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestHoodieTimelineArchiver.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestHoodieTimelineArchiver.java index 28b8c16b2fd89..3f553831e30ea 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestHoodieTimelineArchiver.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestHoodieTimelineArchiver.java @@ -431,7 +431,7 @@ private HoodieInstant commitWithMdt(String instantTime, Map }); commitMeta = generateCommitMetadata(instantTime, partToFileIds); metadataWriter.performTableServices(Option.of(instantTime)); - metadataWriter.updateFromWriteStatuses(commitMeta, context.emptyHoodieData(), instantTime); + metadataWriter.update(commitMeta, instantTime); metaClient.getActiveTimeline().saveAsComplete( INSTANT_GENERATOR.createNewInstant(State.INFLIGHT, HoodieTimeline.COMMIT_ACTION, instantTime), serializeCommitMetadata(metaClient.getCommitMetadataSerDe(), commitMeta)); diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableCompaction.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableCompaction.java index ef28980d9cf95..73b9d7e8be24a 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableCompaction.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableCompaction.java @@ -135,7 +135,7 @@ public void testWriteDuringCompaction(String payloadClass) throws IOException { List writeStatuses = writeData(insertTime, 100, false); Assertions.assertEquals(200, readTableTotalRecordsNum()); // commit the write. The records should be visible now even though the compaction does not complete. - client.commitStats(insertTime, context().parallelize(writeStatuses, 1), writeStatuses.stream().map(WriteStatus::getStat) + client.commitStats(insertTime, writeStatuses.stream().map(WriteStatus::getStat) .collect(Collectors.toList()), Option.empty(), metaClient.getCommitActionType()); Assertions.assertEquals(300, readTableTotalRecordsNum()); // after the compaction, total records should remain the same @@ -207,7 +207,7 @@ private List writeData(String instant, int numRecords, boolean doCo org.apache.hudi.testutils.Assertions.assertNoWriteErrors(writeStatuses); if (doCommit) { List writeStats = writeStatuses.stream().map(WriteStatus::getStat).collect(Collectors.toList()); - boolean committed = client.commitStats(instant, context().parallelize(writeStatuses, 1), writeStats, Option.empty(), metaClient.getCommitActionType()); + boolean committed = client.commitStats(instant, writeStats, Option.empty(), metaClient.getCommitActionType()); Assertions.assertTrue(committed); } metaClient = HoodieTableMetaClient.reload(metaClient); diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestSparkNonBlockingConcurrencyControl.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestSparkNonBlockingConcurrencyControl.java index b8f55c55a7247..704d6e8420b02 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestSparkNonBlockingConcurrencyControl.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestSparkNonBlockingConcurrencyControl.java @@ -140,7 +140,6 @@ public void testNonBlockingConcurrencyControlWithPartialUpdatePayload() throws E // step to commit the 1st txn client1.commitStats( insertTime1, - context().parallelize(writeStatuses1, 1), writeStatuses1.stream().map(WriteStatus::getStat).collect(Collectors.toList()), Option.empty(), metaClient.getCommitActionType()); @@ -148,7 +147,6 @@ public void testNonBlockingConcurrencyControlWithPartialUpdatePayload() throws E // step to commit the 2nd txn client2.commitStats( insertTime2, - context().parallelize(writeStatuses2, 1), writeStatuses2.stream().map(WriteStatus::getStat).collect(Collectors.toList()), Option.empty(), metaClient.getCommitActionType()); @@ -185,7 +183,6 @@ public void testNonBlockingConcurrencyControlWithInflightInstant() throws Except // step to commit the 1st txn client1.commitStats( insertTime1, - context().parallelize(writeStatuses1, 1), writeStatuses1.stream().map(WriteStatus::getStat).collect(Collectors.toList()), Option.empty(), metaClient.getCommitActionType()); @@ -199,7 +196,6 @@ public void testNonBlockingConcurrencyControlWithInflightInstant() throws Except List writeStatuses3 = writeData(client1, insertTime3, dataset3, false, WriteOperationType.INSERT); client1.commitStats( insertTime3, - context().parallelize(writeStatuses3, 1), writeStatuses3.stream().map(WriteStatus::getStat).collect(Collectors.toList()), Option.empty(), metaClient.getCommitActionType()); @@ -233,7 +229,6 @@ public void testMultiBaseFile(boolean bulkInsertFirst) throws Exception { List writeStatuses0 = writeData(client0, insertTime0, dataset0, false, WriteOperationType.BULK_INSERT, true); client0.commitStats( insertTime0, - context().parallelize(writeStatuses0, 1), writeStatuses0.stream().map(WriteStatus::getStat).collect(Collectors.toList()), Option.empty(), metaClient.getCommitActionType()); @@ -273,7 +268,6 @@ public void testMultiBaseFile(boolean bulkInsertFirst) throws Exception { // step to commit the 1st txn client1.commitStats( insertTime1, - context().parallelize(writeStatuses1, 1), writeStatuses1.stream().map(WriteStatus::getStat).collect(Collectors.toList()), Option.empty(), metaClient.getCommitActionType()); @@ -281,7 +275,6 @@ public void testMultiBaseFile(boolean bulkInsertFirst) throws Exception { // step to commit the 2nd txn client2.commitStats( insertTime2, - context().parallelize(writeStatuses2, 1), writeStatuses2.stream().map(WriteStatus::getStat).collect(Collectors.toList()), Option.empty(), metaClient.getCommitActionType()); @@ -330,7 +323,6 @@ public void testBulkInsertInMultiWriter() throws Exception { // step to commit the 1st txn client1.commitStats( insertTime1, - context().parallelize(writeStatuses1, 1), writeStatuses1.stream().map(WriteStatus::getStat).collect(Collectors.toList()), Option.empty(), metaClient.getCommitActionType()); @@ -339,7 +331,6 @@ public void testBulkInsertInMultiWriter() throws Exception { assertThrows(HoodieWriteConflictException.class, () -> { client2.commitStats( insertTime2, - context().parallelize(writeStatuses2, 1), writeStatuses2.stream().map(WriteStatus::getStat).collect(Collectors.toList()), Option.empty(), metaClient.getCommitActionType()); @@ -552,7 +543,7 @@ private List writeData( org.apache.hudi.testutils.Assertions.assertNoWriteErrors(writeStatuses); if (doCommit) { List writeStats = writeStatuses.stream().map(WriteStatus::getStat).collect(Collectors.toList()); - boolean committed = client.commitStats(instant, context().parallelize(writeStatuses, 1), writeStats, Option.empty(), metaClient.getCommitActionType()); + boolean committed = client.commitStats(instant, writeStats, Option.empty(), metaClient.getCommitActionType()); Assertions.assertTrue(committed); } metaClient = HoodieTableMetaClient.reload(metaClient); diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieCleanerTestBase.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieCleanerTestBase.java index 8d12c7da48de1..0da2af6372596 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieCleanerTestBase.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieCleanerTestBase.java @@ -211,7 +211,7 @@ public void commitWithMdt(String instantTime, Map> partToFi HoodieCommitMetadata commitMeta = generateCommitMetadata(instantTime, partToFileIds); try (HoodieTableMetadataWriter metadataWriter = getMetadataWriter(config)) { metadataWriter.performTableServices(Option.of(instantTime)); - metadataWriter.updateFromWriteStatuses(commitMeta, context.emptyHoodieData(), instantTime); + metadataWriter.update(commitMeta, instantTime); metaClient.getActiveTimeline().saveAsComplete( INSTANT_GENERATOR.createNewInstant(HoodieInstant.State.INFLIGHT, HoodieTimeline.COMMIT_ACTION, instantTime), serializeCommitMetadata(metaClient.getCommitMetadataSerDe(), commitMeta)); diff --git a/hudi-common/src/main/java/org/apache/hudi/metadata/BaseFileRecordParsingUtils.java b/hudi-common/src/main/java/org/apache/hudi/metadata/BaseFileRecordParsingUtils.java index cff6a79dc4348..523c072f7240d 100644 --- a/hudi-common/src/main/java/org/apache/hudi/metadata/BaseFileRecordParsingUtils.java +++ b/hudi-common/src/main/java/org/apache/hudi/metadata/BaseFileRecordParsingUtils.java @@ -117,8 +117,6 @@ public static List getRecordKeysDeletedOrUpdated(String basePath, } } - - private static Set getRecordKeysFromBaseFile(HoodieStorage storage, String basePath, String partition, String fileName) throws IOException { StoragePath dataFilePath = new StoragePath(basePath, StringUtils.isNullOrEmpty(partition) ? fileName : (partition + Path.SEPARATOR) + fileName); FileFormatUtils fileFormatUtils = HoodieIOFactory.getIOFactory(storage).getFileFormatUtils(HoodieFileFormat.PARQUET); diff --git a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java index bce187a49b05c..53a7b60083021 100644 --- a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java +++ b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java @@ -885,8 +885,8 @@ static List getRecordKeysDeletedOrUpdated(HoodieEngineContext engineCont return BaseFileRecordParsingUtils.getRecordKeysDeletedOrUpdated(basePath, writeStat, storage).iterator(); } else { // for logs, every entry is either an update or a delete - StoragePath fullFilePath = new StoragePath(dataTableMetaClient.getBasePath(), writeStat.getPath()); - return getRecordKeys(fullFilePath.toString(), dataTableMetaClient, finalWriterSchemaOpt, maxBufferSize, instantTime).iterator(); + StoragePath fullFilePath = new StoragePath(dataTableMetaClient.getBasePath(), writeStat.getPath()); + return getRecordKeys(fullFilePath.toString(), dataTableMetaClient, finalWriterSchemaOpt, maxBufferSize, instantTime).iterator(); } }).collectAsList(); } catch (Exception e) { diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/ClusteringCommitSink.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/ClusteringCommitSink.java index e773f3ed11710..0dcaa4cc9b419 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/ClusteringCommitSink.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/ClusteringCommitSink.java @@ -218,8 +218,7 @@ private void doCommit(String instant, HoodieClusteringPlan clusteringPlan, Colle } // commit the clustering this.table.getMetaClient().reloadActiveTimeline(); - this.writeClient.completeTableService( - TableServiceType.CLUSTER, writeMetadata.getCommitMetadata().get(), table, instant, Option.of(HoodieListData.lazy(writeMetadata.getWriteStatuses()))); + this.writeClient.completeTableService(TableServiceType.CLUSTER, writeMetadata.getCommitMetadata().get(), table, instant); clusteringMetrics.updateCommitMetrics(instant, writeMetadata.getCommitMetadata().get()); // whether to clean up the input base parquet files used for clustering diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/internal/DataSourceInternalWriterHelper.java b/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/internal/DataSourceInternalWriterHelper.java index 1c9e2bb75795d..d9db9cd51d192 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/internal/DataSourceInternalWriterHelper.java +++ b/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/internal/DataSourceInternalWriterHelper.java @@ -84,7 +84,7 @@ public void onDataWriterCommit(String message) { public void commit(List writeStatuses) { try { List writeStatList = writeStatuses.stream().map(WriteStatus::getStat).collect(Collectors.toList()); - writeClient.commitStats(instantTime, writeClient.getEngineContext().parallelize(writeStatuses), writeStatList, Option.of(extraMetadata), + writeClient.commitStats(instantTime, writeStatList, Option.of(extraMetadata), CommitUtils.getCommitActionType(operationType, metaClient.getTableType())); } catch (Exception ioe) { throw new HoodieException(ioe.getMessage(), ioe); diff --git a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestSparkConsistentBucketClustering.java b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestSparkConsistentBucketClustering.java index 7f2663d1051c9..a96dc752ee6d0 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestSparkConsistentBucketClustering.java +++ b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestSparkConsistentBucketClustering.java @@ -305,7 +305,7 @@ public void testConcurrentWrite(boolean rowWriterEnable) throws IOException { List writeStatues = writeData(writeTime, 2000, false); // Cannot schedule clustering if there is in-flight writer Assertions.assertFalse(writeClient.scheduleClustering(Option.empty()).isPresent()); - Assertions.assertTrue(writeClient.commitStats(writeTime, context.parallelize(writeStatues, 1), writeStatues.stream().map(WriteStatus::getStat).collect(Collectors.toList()), + Assertions.assertTrue(writeClient.commitStats(writeTime, writeStatues.stream().map(WriteStatus::getStat).collect(Collectors.toList()), Option.empty(), metaClient.getCommitActionType())); metaClient = HoodieTableMetaClient.reload(metaClient); @@ -341,7 +341,7 @@ private List writeData(String commitTime, int totalRecords, boolean List writeStatues = writeClient.upsert(writeRecords, commitTime).collect(); org.apache.hudi.testutils.Assertions.assertNoWriteErrors(writeStatues); if (doCommit) { - Assertions.assertTrue(writeClient.commitStats(commitTime, context.parallelize(writeStatues, 1), writeStatues.stream().map(WriteStatus::getStat).collect(Collectors.toList()), + Assertions.assertTrue(writeClient.commitStats(commitTime, writeStatues.stream().map(WriteStatus::getStat).collect(Collectors.toList()), Option.empty(), metaClient.getCommitActionType())); } metaClient = HoodieTableMetaClient.reload(metaClient); diff --git a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestSparkSortAndSizeClustering.java b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestSparkSortAndSizeClustering.java index bc7bc5edfc58c..96b4d4251022a 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestSparkSortAndSizeClustering.java +++ b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestSparkSortAndSizeClustering.java @@ -157,7 +157,7 @@ private List writeData(String commitTime, int totalRecords, boolean org.apache.hudi.testutils.Assertions.assertNoWriteErrors(writeStatues); if (doCommit) { - Assertions.assertTrue(writeClient.commitStats(commitTime, context.parallelize(writeStatues, 1), writeStatues.stream().map(WriteStatus::getStat).collect(Collectors.toList()), + Assertions.assertTrue(writeClient.commitStats(commitTime, writeStatues.stream().map(WriteStatus::getStat).collect(Collectors.toList()), Option.empty(), metaClient.getCommitActionType())); }