Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[HUDI-8564] Removing WriteStatus references in Hoodie Metadata writer flow #12321

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import org.apache.hudi.client.heartbeat.HoodieHeartbeatClient;
import org.apache.hudi.client.transaction.TransactionManager;
import org.apache.hudi.client.utils.TransactionUtils;
import org.apache.hudi.common.data.HoodieData;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.model.HoodieWriteStat;
Expand Down Expand Up @@ -269,14 +268,13 @@ protected void finalizeWrite(HoodieTable table, String instantTime, List<HoodieW
* @param table {@link HoodieTable} of interest.
* @param instantTime instant time of the commit.
* @param metadata instance of {@link HoodieCommitMetadata}.
* @param writeStatuses Write statuses of the commit
*/
protected void writeTableMetadata(HoodieTable table, String instantTime, HoodieCommitMetadata metadata, HoodieData<WriteStatus> writeStatuses) {
protected void writeTableMetadata(HoodieTable table, String instantTime, HoodieCommitMetadata metadata) {
context.setJobStatus(this.getClass().getSimpleName(), "Committing to metadata table: " + config.getTableName());
Option<HoodieTableMetadataWriter> metadataWriterOpt = table.getMetadataWriter(instantTime);
if (metadataWriterOpt.isPresent()) {
try (HoodieTableMetadataWriter metadataWriter = metadataWriterOpt.get()) {
metadataWriter.updateFromWriteStatuses(metadata, writeStatuses, instantTime);
metadataWriter.update(metadata, instantTime);
} catch (Exception e) {
if (e instanceof HoodieException) {
throw (HoodieException) e;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@
import org.apache.hudi.client.timeline.HoodieTimelineArchiver;
import org.apache.hudi.client.timeline.TimelineArchivers;
import org.apache.hudi.common.HoodiePendingRollbackInfo;
import org.apache.hudi.common.data.HoodieData;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.model.ActionType;
import org.apache.hudi.common.model.HoodieCommitMetadata;
Expand Down Expand Up @@ -341,7 +340,7 @@ protected void completeCompaction(HoodieCommitMetadata metadata, HoodieTable tab
this.txnManager.beginTransaction(Option.of(compactionInstant), Option.empty());
finalizeWrite(table, compactionCommitTime, writeStats);
// commit to data table after committing to metadata table.
writeTableMetadata(table, compactionCommitTime, metadata, context.emptyHoodieData());
nsivabalan marked this conversation as resolved.
Show resolved Hide resolved
writeTableMetadata(table, compactionCommitTime, metadata);
LOG.info("Committing Compaction {}", compactionCommitTime);
LOG.debug("Compaction {} finished with result: {}", compactionCommitTime, metadata);
CompactHelpers.getInstance().completeInflightCompaction(table, compactionCommitTime, metadata);
Expand Down Expand Up @@ -404,7 +403,7 @@ protected void completeLogCompaction(HoodieCommitMetadata metadata, HoodieTable
preCommit(metadata);
finalizeWrite(table, logCompactionCommitTime, writeStats);
// commit to data table after committing to metadata table.
writeTableMetadata(table, logCompactionCommitTime, metadata, context.emptyHoodieData());
writeTableMetadata(table, logCompactionCommitTime, metadata);
LOG.info("Committing Log Compaction {}", logCompactionCommitTime);
LOG.debug("Log Compaction {} finished with result {}", logCompactionCommitTime, metadata);
CompactHelpers.getInstance().completeInflightLogCompaction(table, logCompactionCommitTime, metadata);
Expand Down Expand Up @@ -490,7 +489,7 @@ public HoodieWriteMetadata<O> cluster(String clusteringInstant, boolean shouldCo

// TODO : Where is shouldComplete used ?
if (shouldComplete && clusteringMetadata.getCommitMetadata().isPresent()) {
completeClustering((HoodieReplaceCommitMetadata) clusteringMetadata.getCommitMetadata().get(), table, clusteringInstant, Option.ofNullable(convertToWriteStatus(writeMetadata)));
completeClustering((HoodieReplaceCommitMetadata) clusteringMetadata.getCommitMetadata().get(), table, clusteringInstant);
}
return clusteringMetadata;
}
Expand Down Expand Up @@ -522,12 +521,9 @@ public HoodieWriteMetadata<T> managePartitionTTL(String instantTime) {

protected abstract HoodieWriteMetadata<O> convertToOutputMetadata(HoodieWriteMetadata<T> writeMetadata);

protected abstract HoodieData<WriteStatus> convertToWriteStatus(HoodieWriteMetadata<T> writeMetadata);

private void completeClustering(HoodieReplaceCommitMetadata metadata,
HoodieTable table,
String clusteringCommitTime,
Option<HoodieData<WriteStatus>> writeStatuses) {
String clusteringCommitTime) {
List<HoodieWriteStat> writeStats = metadata.getWriteStats();
handleWriteErrors(writeStats, TableServiceType.CLUSTER);
final HoodieInstant clusteringInstant = ClusteringUtils.getInflightClusteringInstant(clusteringCommitTime,
Expand All @@ -542,7 +538,7 @@ private void completeClustering(HoodieReplaceCommitMetadata metadata,
preCommit(metadata);
}
// Update table's metadata (table)
writeTableMetadata(table, clusteringInstant.requestedTime(), metadata, writeStatuses.orElseGet(context::emptyHoodieData));
writeTableMetadata(table, clusteringInstant.requestedTime(), metadata);

LOG.info("Committing Clustering {}", clusteringCommitTime);
LOG.debug("Clustering {} finished with result {}", clusteringCommitTime, metadata);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@
import org.apache.hudi.common.HoodiePendingRollbackInfo;
import org.apache.hudi.common.config.HoodieCommonConfig;
import org.apache.hudi.common.config.HoodieConfig;
import org.apache.hudi.common.data.HoodieData;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.model.ActionType;
import org.apache.hudi.common.model.HoodieCommitMetadata;
Expand Down Expand Up @@ -216,12 +215,12 @@ public abstract boolean commit(String instantTime, O writeStatuses, Option<Map<S
String commitActionType, Map<String, List<String>> partitionToReplacedFileIds,
Option<BiConsumer<HoodieTableMetaClient, HoodieCommitMetadata>> extraPreCommitFunc);

public boolean commitStats(String instantTime, HoodieData<WriteStatus> writeStatuses, List<HoodieWriteStat> stats, Option<Map<String, String>> extraMetadata,
public boolean commitStats(String instantTime, List<HoodieWriteStat> stats, Option<Map<String, String>> extraMetadata,
String commitActionType) {
return commitStats(instantTime, writeStatuses, stats, extraMetadata, commitActionType, Collections.emptyMap(), Option.empty());
return commitStats(instantTime, stats, extraMetadata, commitActionType, Collections.emptyMap(), Option.empty());
}

public boolean commitStats(String instantTime, HoodieData<WriteStatus> writeStatuses, List<HoodieWriteStat> stats,
public boolean commitStats(String instantTime, List<HoodieWriteStat> stats,
Option<Map<String, String>> extraMetadata,
String commitActionType, Map<String, List<String>> partitionToReplaceFileIds,
Option<BiConsumer<HoodieTableMetaClient, HoodieCommitMetadata>> extraPreCommitFunc) {
Expand All @@ -243,7 +242,7 @@ public boolean commitStats(String instantTime, HoodieData<WriteStatus> writeStat
if (extraPreCommitFunc.isPresent()) {
extraPreCommitFunc.get().accept(table.getMetaClient(), metadata);
}
commit(table, commitActionType, instantTime, metadata, stats, writeStatuses);
commit(table, commitActionType, instantTime, metadata, stats);
postCommit(table, metadata, instantTime, extraMetadata);
LOG.info("Committed " + instantTime);
} catch (IOException e) {
Expand Down Expand Up @@ -282,7 +281,7 @@ public boolean commitStats(String instantTime, HoodieData<WriteStatus> writeStat
}

protected void commit(HoodieTable table, String commitActionType, String instantTime, HoodieCommitMetadata metadata,
List<HoodieWriteStat> stats, HoodieData<WriteStatus> writeStatuses) throws IOException {
List<HoodieWriteStat> stats) throws IOException {
LOG.info("Committing " + instantTime + " action " + commitActionType);
HoodieActiveTimeline activeTimeline = table.getActiveTimeline();
// Finalize write
Expand All @@ -293,7 +292,7 @@ protected void commit(HoodieTable table, String commitActionType, String instant
saveInternalSchema(table, instantTime, metadata);
}
// update Metadata table
writeTableMetadata(table, instantTime, metadata, writeStatuses);
writeTableMetadata(table, instantTime, metadata);
activeTimeline.saveAsComplete(false, table.getMetaClient().createNewInstant(HoodieInstant.State.INFLIGHT, commitActionType, instantTime),
serializeCommitMetadata(table.getMetaClient().getCommitMetadataSerDe(), metadata));
}
Expand Down Expand Up @@ -1627,7 +1626,7 @@ public void commitTableChange(InternalSchema newSchema, HoodieTableMetaClient me
// try to save history schemas
FileBasedInternalSchemaStorageManager schemasManager = new FileBasedInternalSchemaStorageManager(metaClient);
schemasManager.persistHistorySchemaStr(instantTime, SerDeHelper.inheritSchemas(newSchema, historySchemaStr));
commitStats(instantTime, context.emptyHoodieData(), Collections.emptyList(), Option.of(extraMeta), commitActionType);
commitStats(instantTime, Collections.emptyList(), Option.of(extraMeta), commitActionType);
}

private InternalSchema getInternalSchema(TableSchemaResolver schemaUtil) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,10 @@ private void init(String fileId, String partitionPath, HoodieBaseFile baseFileTo
try {
String latestValidFilePath = baseFileToMerge.getFileName();
writeStatus.getStat().setPrevCommit(baseFileToMerge.getCommitTime());
// At the moment, we only support SI for overwrite with latest payload. So, we don't need to embed entire file slice here.
// HUDI-8518 will be taken up to fix it for any payload during which we might require entire file slice to be set here.
// Already AppendHandle adds all logs file from current file slice to HoodieDeltaWriteStat.
writeStatus.getStat().setPrevBaseFile(latestValidFilePath);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why just store the base file. for e.g if merge handle was called during compaction, dont we need the entire slice.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

and do other handles set this. - yes? no>? why?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

At the moment, we only support SI for overwrite with latest payload. So, we don't need to embed entire file slice here.
HUDI-8518 will be taken up to fix it for any payload during which we might require entire file slice to be set here.
btw, already AppendHandle adds all logs file from current file slice to HoodieDeltaWriteStat.


HoodiePartitionMetadata partitionMetadata = new HoodiePartitionMetadata(storage, instantTime,
new StoragePath(config.getBasePath()),
Expand Down
Loading
Loading