Skip to content

Commit

Permalink
[HUDI-8564] Removing WriteStatus references in Hoodie Metadata writer…
Browse files Browse the repository at this point in the history
… flow (#12321)

- Fixing RLI and SI record generation in MDT to not rely on RDD<WriteStatus> as it has edge cases wrt spark task retries. This patch does on-demand reads from base and log files to generate RLI and SI records for MDT. 

---------

Co-authored-by: vinoth chandar <[email protected]>
  • Loading branch information
nsivabalan and vinothchandar authored Nov 28, 2024
1 parent 77f6869 commit 3018c49
Show file tree
Hide file tree
Showing 39 changed files with 1,037 additions and 184 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import org.apache.hudi.client.heartbeat.HoodieHeartbeatClient;
import org.apache.hudi.client.transaction.TransactionManager;
import org.apache.hudi.client.utils.TransactionUtils;
import org.apache.hudi.common.data.HoodieData;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.model.HoodieWriteStat;
Expand Down Expand Up @@ -269,14 +268,13 @@ protected void finalizeWrite(HoodieTable table, String instantTime, List<HoodieW
* @param table {@link HoodieTable} of interest.
* @param instantTime instant time of the commit.
* @param metadata instance of {@link HoodieCommitMetadata}.
* @param writeStatuses Write statuses of the commit
*/
protected void writeTableMetadata(HoodieTable table, String instantTime, HoodieCommitMetadata metadata, HoodieData<WriteStatus> writeStatuses) {
protected void writeTableMetadata(HoodieTable table, String instantTime, HoodieCommitMetadata metadata) {
context.setJobStatus(this.getClass().getSimpleName(), "Committing to metadata table: " + config.getTableName());
Option<HoodieTableMetadataWriter> metadataWriterOpt = table.getMetadataWriter(instantTime);
if (metadataWriterOpt.isPresent()) {
try (HoodieTableMetadataWriter metadataWriter = metadataWriterOpt.get()) {
metadataWriter.updateFromWriteStatuses(metadata, writeStatuses, instantTime);
metadataWriter.update(metadata, instantTime);
} catch (Exception e) {
if (e instanceof HoodieException) {
throw (HoodieException) e;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@
import org.apache.hudi.client.timeline.HoodieTimelineArchiver;
import org.apache.hudi.client.timeline.TimelineArchivers;
import org.apache.hudi.common.HoodiePendingRollbackInfo;
import org.apache.hudi.common.data.HoodieData;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.model.ActionType;
import org.apache.hudi.common.model.HoodieCommitMetadata;
Expand Down Expand Up @@ -341,7 +340,7 @@ protected void completeCompaction(HoodieCommitMetadata metadata, HoodieTable tab
this.txnManager.beginTransaction(Option.of(compactionInstant), Option.empty());
finalizeWrite(table, compactionCommitTime, writeStats);
// commit to data table after committing to metadata table.
writeTableMetadata(table, compactionCommitTime, metadata, context.emptyHoodieData());
writeTableMetadata(table, compactionCommitTime, metadata);
LOG.info("Committing Compaction {}", compactionCommitTime);
LOG.debug("Compaction {} finished with result: {}", compactionCommitTime, metadata);
CompactHelpers.getInstance().completeInflightCompaction(table, compactionCommitTime, metadata);
Expand Down Expand Up @@ -404,7 +403,7 @@ protected void completeLogCompaction(HoodieCommitMetadata metadata, HoodieTable
preCommit(metadata);
finalizeWrite(table, logCompactionCommitTime, writeStats);
// commit to data table after committing to metadata table.
writeTableMetadata(table, logCompactionCommitTime, metadata, context.emptyHoodieData());
writeTableMetadata(table, logCompactionCommitTime, metadata);
LOG.info("Committing Log Compaction {}", logCompactionCommitTime);
LOG.debug("Log Compaction {} finished with result {}", logCompactionCommitTime, metadata);
CompactHelpers.getInstance().completeInflightLogCompaction(table, logCompactionCommitTime, metadata);
Expand Down Expand Up @@ -490,7 +489,7 @@ public HoodieWriteMetadata<O> cluster(String clusteringInstant, boolean shouldCo

// TODO : Where is shouldComplete used ?
if (shouldComplete && clusteringMetadata.getCommitMetadata().isPresent()) {
completeClustering((HoodieReplaceCommitMetadata) clusteringMetadata.getCommitMetadata().get(), table, clusteringInstant, Option.ofNullable(convertToWriteStatus(writeMetadata)));
completeClustering((HoodieReplaceCommitMetadata) clusteringMetadata.getCommitMetadata().get(), table, clusteringInstant);
}
return clusteringMetadata;
}
Expand Down Expand Up @@ -522,12 +521,9 @@ public HoodieWriteMetadata<T> managePartitionTTL(String instantTime) {

protected abstract HoodieWriteMetadata<O> convertToOutputMetadata(HoodieWriteMetadata<T> writeMetadata);

protected abstract HoodieData<WriteStatus> convertToWriteStatus(HoodieWriteMetadata<T> writeMetadata);

private void completeClustering(HoodieReplaceCommitMetadata metadata,
HoodieTable table,
String clusteringCommitTime,
Option<HoodieData<WriteStatus>> writeStatuses) {
String clusteringCommitTime) {
List<HoodieWriteStat> writeStats = metadata.getWriteStats();
handleWriteErrors(writeStats, TableServiceType.CLUSTER);
final HoodieInstant clusteringInstant = ClusteringUtils.getInflightClusteringInstant(clusteringCommitTime,
Expand All @@ -542,7 +538,7 @@ private void completeClustering(HoodieReplaceCommitMetadata metadata,
preCommit(metadata);
}
// Update table's metadata (table)
writeTableMetadata(table, clusteringInstant.requestedTime(), metadata, writeStatuses.orElseGet(context::emptyHoodieData));
writeTableMetadata(table, clusteringInstant.requestedTime(), metadata);

LOG.info("Committing Clustering {}", clusteringCommitTime);
LOG.debug("Clustering {} finished with result {}", clusteringCommitTime, metadata);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@
import org.apache.hudi.common.HoodiePendingRollbackInfo;
import org.apache.hudi.common.config.HoodieCommonConfig;
import org.apache.hudi.common.config.HoodieConfig;
import org.apache.hudi.common.data.HoodieData;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.model.ActionType;
import org.apache.hudi.common.model.HoodieCommitMetadata;
Expand Down Expand Up @@ -216,12 +215,12 @@ public abstract boolean commit(String instantTime, O writeStatuses, Option<Map<S
String commitActionType, Map<String, List<String>> partitionToReplacedFileIds,
Option<BiConsumer<HoodieTableMetaClient, HoodieCommitMetadata>> extraPreCommitFunc);

public boolean commitStats(String instantTime, HoodieData<WriteStatus> writeStatuses, List<HoodieWriteStat> stats, Option<Map<String, String>> extraMetadata,
public boolean commitStats(String instantTime, List<HoodieWriteStat> stats, Option<Map<String, String>> extraMetadata,
String commitActionType) {
return commitStats(instantTime, writeStatuses, stats, extraMetadata, commitActionType, Collections.emptyMap(), Option.empty());
return commitStats(instantTime, stats, extraMetadata, commitActionType, Collections.emptyMap(), Option.empty());
}

public boolean commitStats(String instantTime, HoodieData<WriteStatus> writeStatuses, List<HoodieWriteStat> stats,
public boolean commitStats(String instantTime, List<HoodieWriteStat> stats,
Option<Map<String, String>> extraMetadata,
String commitActionType, Map<String, List<String>> partitionToReplaceFileIds,
Option<BiConsumer<HoodieTableMetaClient, HoodieCommitMetadata>> extraPreCommitFunc) {
Expand All @@ -243,7 +242,7 @@ public boolean commitStats(String instantTime, HoodieData<WriteStatus> writeStat
if (extraPreCommitFunc.isPresent()) {
extraPreCommitFunc.get().accept(table.getMetaClient(), metadata);
}
commit(table, commitActionType, instantTime, metadata, stats, writeStatuses);
commit(table, commitActionType, instantTime, metadata, stats);
postCommit(table, metadata, instantTime, extraMetadata);
LOG.info("Committed " + instantTime);
} catch (IOException e) {
Expand Down Expand Up @@ -282,7 +281,7 @@ public boolean commitStats(String instantTime, HoodieData<WriteStatus> writeStat
}

protected void commit(HoodieTable table, String commitActionType, String instantTime, HoodieCommitMetadata metadata,
List<HoodieWriteStat> stats, HoodieData<WriteStatus> writeStatuses) throws IOException {
List<HoodieWriteStat> stats) throws IOException {
LOG.info("Committing " + instantTime + " action " + commitActionType);
HoodieActiveTimeline activeTimeline = table.getActiveTimeline();
// Finalize write
Expand All @@ -293,7 +292,7 @@ protected void commit(HoodieTable table, String commitActionType, String instant
saveInternalSchema(table, instantTime, metadata);
}
// update Metadata table
writeTableMetadata(table, instantTime, metadata, writeStatuses);
writeTableMetadata(table, instantTime, metadata);
activeTimeline.saveAsComplete(false, table.getMetaClient().createNewInstant(HoodieInstant.State.INFLIGHT, commitActionType, instantTime),
serializeCommitMetadata(table.getMetaClient().getCommitMetadataSerDe(), metadata));
}
Expand Down Expand Up @@ -1627,7 +1626,7 @@ public void commitTableChange(InternalSchema newSchema, HoodieTableMetaClient me
// try to save history schemas
FileBasedInternalSchemaStorageManager schemasManager = new FileBasedInternalSchemaStorageManager(metaClient);
schemasManager.persistHistorySchemaStr(instantTime, SerDeHelper.inheritSchemas(newSchema, historySchemaStr));
commitStats(instantTime, context.emptyHoodieData(), Collections.emptyList(), Option.of(extraMeta), commitActionType);
commitStats(instantTime, Collections.emptyList(), Option.of(extraMeta), commitActionType);
}

private InternalSchema getInternalSchema(TableSchemaResolver schemaUtil) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,10 @@ private void init(String fileId, String partitionPath, HoodieBaseFile baseFileTo
try {
String latestValidFilePath = baseFileToMerge.getFileName();
writeStatus.getStat().setPrevCommit(baseFileToMerge.getCommitTime());
// At the moment, we only support SI for overwrite with latest payload. So, we don't need to embed entire file slice here.
// HUDI-8518 will be taken up to fix it for any payload during which we might require entire file slice to be set here.
// Already AppendHandle adds all logs file from current file slice to HoodieDeltaWriteStat.
writeStatus.getStat().setPrevBaseFile(latestValidFilePath);

HoodiePartitionMetadata partitionMetadata = new HoodiePartitionMetadata(storage, instantTime,
new StoragePath(config.getBasePath()),
Expand Down
Loading

0 comments on commit 3018c49

Please sign in to comment.