Skip to content

Commit

Permalink
Removed WriteStatus usages across Metadata Writer flows
Browse files Browse the repository at this point in the history
  • Loading branch information
nsivabalan committed Nov 23, 2024
1 parent fedc6a5 commit b4871da
Show file tree
Hide file tree
Showing 29 changed files with 57 additions and 84 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import org.apache.hudi.client.heartbeat.HoodieHeartbeatClient;
import org.apache.hudi.client.transaction.TransactionManager;
import org.apache.hudi.client.utils.TransactionUtils;
import org.apache.hudi.common.data.HoodieData;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.model.HoodieWriteStat;
Expand Down Expand Up @@ -278,14 +277,13 @@ protected void finalizeWrite(HoodieTable table, String instantTime, List<HoodieW
* @param table {@link HoodieTable} of interest.
* @param instantTime instant time of the commit.
* @param metadata instance of {@link HoodieCommitMetadata}.
* @param writeStatuses Write statuses of the commit
*/
protected void writeTableMetadata(HoodieTable table, String instantTime, HoodieCommitMetadata metadata, HoodieData<WriteStatus> writeStatuses) {
protected void writeTableMetadata(HoodieTable table, String instantTime, HoodieCommitMetadata metadata) {
context.setJobStatus(this.getClass().getSimpleName(), "Committing to metadata table: " + config.getTableName());
Option<HoodieTableMetadataWriter> metadataWriterOpt = table.getMetadataWriter(instantTime);
if (metadataWriterOpt.isPresent()) {
try (HoodieTableMetadataWriter metadataWriter = metadataWriterOpt.get()) {
metadataWriter.updateFromWriteStatuses(metadata, writeStatuses, instantTime);
metadataWriter.update(metadata, instantTime);
} catch (Exception e) {
if (e instanceof HoodieException) {
throw (HoodieException) e;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -341,7 +341,7 @@ protected void completeCompaction(HoodieCommitMetadata metadata, HoodieTable tab
this.txnManager.beginTransaction(Option.of(compactionInstant), Option.empty());
finalizeWrite(table, compactionCommitTime, writeStats);
// commit to data table after committing to metadata table.
writeTableMetadata(table, compactionCommitTime, metadata, context.emptyHoodieData());
writeTableMetadata(table, compactionCommitTime, metadata);
LOG.info("Committing Compaction {}", compactionCommitTime);
LOG.debug("Compaction {} finished with result: {}", compactionCommitTime, metadata);
CompactHelpers.getInstance().completeInflightCompaction(table, compactionCommitTime, metadata);
Expand Down Expand Up @@ -404,7 +404,7 @@ protected void completeLogCompaction(HoodieCommitMetadata metadata, HoodieTable
preCommit(metadata);
finalizeWrite(table, logCompactionCommitTime, writeStats);
// commit to data table after committing to metadata table.
writeTableMetadata(table, logCompactionCommitTime, metadata, context.emptyHoodieData());
writeTableMetadata(table, logCompactionCommitTime, metadata);
LOG.info("Committing Log Compaction {}", logCompactionCommitTime);
LOG.debug("Log Compaction {} finished with result {}", logCompactionCommitTime, metadata);
CompactHelpers.getInstance().completeInflightLogCompaction(table, logCompactionCommitTime, metadata);
Expand Down Expand Up @@ -490,7 +490,7 @@ public HoodieWriteMetadata<O> cluster(String clusteringInstant, boolean shouldCo

// TODO : Where is shouldComplete used ?
if (shouldComplete && clusteringMetadata.getCommitMetadata().isPresent()) {
completeClustering((HoodieReplaceCommitMetadata) clusteringMetadata.getCommitMetadata().get(), table, clusteringInstant, Option.ofNullable(convertToWriteStatus(writeMetadata)));
completeClustering((HoodieReplaceCommitMetadata) clusteringMetadata.getCommitMetadata().get(), table, clusteringInstant);
}
return clusteringMetadata;
}
Expand Down Expand Up @@ -526,8 +526,7 @@ public HoodieWriteMetadata<T> managePartitionTTL(String instantTime) {

private void completeClustering(HoodieReplaceCommitMetadata metadata,
HoodieTable table,
String clusteringCommitTime,
Option<HoodieData<WriteStatus>> writeStatuses) {
String clusteringCommitTime) {
List<HoodieWriteStat> writeStats = metadata.getWriteStats();
handleWriteErrors(writeStats, TableServiceType.CLUSTER);
final HoodieInstant clusteringInstant = ClusteringUtils.getInflightClusteringInstant(clusteringCommitTime,
Expand All @@ -542,7 +541,7 @@ private void completeClustering(HoodieReplaceCommitMetadata metadata,
preCommit(metadata);
}
// Update table's metadata (table)
writeTableMetadata(table, clusteringInstant.requestedTime(), metadata, writeStatuses.orElseGet(context::emptyHoodieData));
writeTableMetadata(table, clusteringInstant.requestedTime(), metadata);

LOG.info("Committing Clustering {}", clusteringCommitTime);
LOG.debug("Clustering {} finished with result {}", clusteringCommitTime, metadata);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@
import org.apache.hudi.common.HoodiePendingRollbackInfo;
import org.apache.hudi.common.config.HoodieCommonConfig;
import org.apache.hudi.common.config.HoodieConfig;
import org.apache.hudi.common.data.HoodieData;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.model.ActionType;
import org.apache.hudi.common.model.HoodieCommitMetadata;
Expand Down Expand Up @@ -216,12 +215,12 @@ public abstract boolean commit(String instantTime, O writeStatuses, Option<Map<S
String commitActionType, Map<String, List<String>> partitionToReplacedFileIds,
Option<BiConsumer<HoodieTableMetaClient, HoodieCommitMetadata>> extraPreCommitFunc);

public boolean commitStats(String instantTime, HoodieData<WriteStatus> writeStatuses, List<HoodieWriteStat> stats, Option<Map<String, String>> extraMetadata,
public boolean commitStats(String instantTime, List<HoodieWriteStat> stats, Option<Map<String, String>> extraMetadata,
String commitActionType) {
return commitStats(instantTime, writeStatuses, stats, extraMetadata, commitActionType, Collections.emptyMap(), Option.empty());
return commitStats(instantTime, stats, extraMetadata, commitActionType, Collections.emptyMap(), Option.empty());
}

public boolean commitStats(String instantTime, HoodieData<WriteStatus> writeStatuses, List<HoodieWriteStat> stats,
public boolean commitStats(String instantTime, List<HoodieWriteStat> stats,
Option<Map<String, String>> extraMetadata,
String commitActionType, Map<String, List<String>> partitionToReplaceFileIds,
Option<BiConsumer<HoodieTableMetaClient, HoodieCommitMetadata>> extraPreCommitFunc) {
Expand All @@ -243,7 +242,7 @@ public boolean commitStats(String instantTime, HoodieData<WriteStatus> writeStat
if (extraPreCommitFunc.isPresent()) {
extraPreCommitFunc.get().accept(table.getMetaClient(), metadata);
}
commit(table, commitActionType, instantTime, metadata, stats, writeStatuses);
commit(table, commitActionType, instantTime, metadata, stats);
postCommit(table, metadata, instantTime, extraMetadata);
LOG.info("Committed " + instantTime);
} catch (IOException e) {
Expand Down Expand Up @@ -282,7 +281,7 @@ public boolean commitStats(String instantTime, HoodieData<WriteStatus> writeStat
}

protected void commit(HoodieTable table, String commitActionType, String instantTime, HoodieCommitMetadata metadata,
List<HoodieWriteStat> stats, HoodieData<WriteStatus> writeStatuses) throws IOException {
List<HoodieWriteStat> stats) throws IOException {
LOG.info("Committing " + instantTime + " action " + commitActionType);
HoodieActiveTimeline activeTimeline = table.getActiveTimeline();
// Finalize write
Expand All @@ -293,7 +292,7 @@ protected void commit(HoodieTable table, String commitActionType, String instant
saveInternalSchema(table, instantTime, metadata);
}
// update Metadata table
writeTableMetadata(table, instantTime, metadata, writeStatuses);
writeTableMetadata(table, instantTime, metadata);
activeTimeline.saveAsComplete(false, table.getMetaClient().createNewInstant(HoodieInstant.State.INFLIGHT, commitActionType, instantTime),
serializeCommitMetadata(table.getMetaClient().getCommitMetadataSerDe(), metadata));
}
Expand Down Expand Up @@ -1611,7 +1610,7 @@ public void commitTableChange(InternalSchema newSchema, HoodieTableMetaClient me
// try to save history schemas
FileBasedInternalSchemaStorageManager schemasManager = new FileBasedInternalSchemaStorageManager(metaClient);
schemasManager.persistHistorySchemaStr(instantTime, SerDeHelper.inheritSchemas(newSchema, historySchemaStr));
commitStats(instantTime, context.emptyHoodieData(), Collections.emptyList(), Option.of(extraMeta), commitActionType);
commitStats(instantTime, Collections.emptyList(), Option.of(extraMeta), commitActionType);
}

private InternalSchema getInternalSchema(TableSchemaResolver schemaUtil) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
import org.apache.hudi.avro.model.HoodieRestorePlan;
import org.apache.hudi.avro.model.HoodieRollbackMetadata;
import org.apache.hudi.client.BaseHoodieWriteClient;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.common.config.HoodieMetadataConfig;
import org.apache.hudi.common.data.HoodieData;
import org.apache.hudi.common.engine.EngineType;
Expand Down Expand Up @@ -1043,12 +1042,11 @@ public void buildMetadataPartitions(HoodieEngineContext engineContext, List<Hood

/**
* Update from {@code HoodieCommitMetadata}.
*
* @param commitMetadata {@code HoodieCommitMetadata}
* @param instantTime Timestamp at which the commit was performed
*/
@Override
public void updateFromWriteStatuses(HoodieCommitMetadata commitMetadata, HoodieData<WriteStatus> writeStatus, String instantTime) {
public void update(HoodieCommitMetadata commitMetadata, String instantTime) {
processAndCommit(instantTime, () -> {
Map<String, HoodieData<HoodieRecord>> partitionToRecordMap =
HoodieTableMetadataUtil.convertMetadataToRecords(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import org.apache.hudi.avro.model.HoodieIndexPartitionInfo;
import org.apache.hudi.avro.model.HoodieRestoreMetadata;
import org.apache.hudi.avro.model.HoodieRollbackMetadata;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.common.data.HoodieData;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.model.HoodieCommitMetadata;
Expand Down Expand Up @@ -57,16 +56,15 @@ public interface HoodieTableMetadataWriter extends Serializable, AutoCloseable {

/**
* Update the metadata table due to a COMMIT operation.
*
* @param commitMetadata commit metadata of the operation of interest.
* @param instantTime instant time of the commit.
*/
void updateFromWriteStatuses(HoodieCommitMetadata commitMetadata, HoodieData<WriteStatus> writeStatuses, String instantTime);
void update(HoodieCommitMetadata commitMetadata, String instantTime);

/**
* Update the metadata table due to a COMMIT or REPLACECOMMIT operation.
* As compared to {@link #updateFromWriteStatuses(HoodieCommitMetadata, HoodieData, String)}, this method
* directly updates metadata with the given records, instead of first converting {@link WriteStatus} to {@link HoodieRecord}.
* As compared to {@link #update(HoodieCommitMetadata, String)}, this method
* directly updates metadata with the given records, instead of generating HoodieRecords based on HoodieCommitMetadata.
*
* @param commitMetadata commit metadata of the operation of interest.
* @param records records to update metadata with.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,6 @@
import org.apache.hudi.avro.model.HoodieCleanMetadata;
import org.apache.hudi.avro.model.HoodieRestoreMetadata;
import org.apache.hudi.avro.model.HoodieRollbackMetadata;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.common.data.HoodieData;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.model.WriteOperationType;
Expand Down Expand Up @@ -72,7 +70,7 @@ public BaseActionExecutor(HoodieEngineContext context, HoodieWriteConfig config,
*
* @param metadata commit metadata of interest.
*/
protected final void writeTableMetadata(HoodieCommitMetadata metadata, HoodieData<WriteStatus> writeStatus, String actionType) {
protected final void writeTableMetadata(HoodieCommitMetadata metadata, String actionType) {
// Recreate MDT for insert_overwrite_table operation.
if (table.getConfig().isMetadataTableEnabled()
&& WriteOperationType.INSERT_OVERWRITE_TABLE == metadata.getOperationType()) {
Expand All @@ -83,7 +81,7 @@ protected final void writeTableMetadata(HoodieCommitMetadata metadata, HoodieDat
Option<HoodieTableMetadataWriter> metadataWriterOpt = table.getMetadataWriter(instantTime);
if (metadataWriterOpt.isPresent()) {
try (HoodieTableMetadataWriter metadataWriter = metadataWriterOpt.get()) {
metadataWriter.updateFromWriteStatuses(metadata, writeStatus, instantTime);
metadataWriter.update(metadata, instantTime);
} catch (Exception e) {
if (e instanceof HoodieException) {
throw (HoodieException) e;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,7 @@ protected void autoCommit(HoodieWriteMetadata<O> result) {

protected abstract void commit(HoodieWriteMetadata<O> result);

protected void commit(HoodieData<WriteStatus> writeStatuses, HoodieWriteMetadata<O> result, List<HoodieWriteStat> writeStats) {
protected void commit(HoodieWriteMetadata<O> result, List<HoodieWriteStat> writeStats) {
String actionType = getCommitActionType();
LOG.info("Committing " + instantTime + ", action Type " + actionType + ", operation Type " + operationType);
result.setCommitted(true);
Expand All @@ -222,7 +222,7 @@ protected void commit(HoodieData<WriteStatus> writeStatuses, HoodieWriteMetadata
HoodieActiveTimeline activeTimeline = table.getActiveTimeline();
HoodieCommitMetadata metadata = result.getCommitMetadata().get();

writeTableMetadata(metadata, writeStatuses, actionType);
writeTableMetadata(metadata, actionType);
// cannot serialize maps with null values
metadata.getExtraMetadata().entrySet().removeIf(entry -> entry.getValue() == null);
activeTimeline.saveAsComplete(false,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,6 @@ public WriteStatBasedIndexingCatchupTask(HoodieTableMetadataWriter metadataWrite
public void updateIndexForWriteAction(HoodieInstant instant) throws IOException {
HoodieCommitMetadata commitMetadata = metaClient.getCommitMetadataSerDe().deserialize(instant,
metaClient.getActiveTimeline().getInstantDetails(instant).get(), HoodieCommitMetadata.class);
metadataWriter.updateFromWriteStatuses(commitMetadata, engineContext.emptyHoodieData(), instant.requestedTime());
metadataWriter.update(commitMetadata, instant.requestedTime());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ public HoodieCommitMetadata doWriteOperation(String commitTime, WriteOperationTy
partitionToFilesNameLengthMap, bootstrap, true);
if (writer != null && !createInflightCommit) {
writer.performTableServices(Option.of(commitTime));
writer.updateFromWriteStatuses(commitMetadata, context.get().emptyHoodieData(), commitTime);
writer.update(commitMetadata, commitTime);
}
// DT should be committed after MDT.
if (!createInflightCommit) {
Expand All @@ -110,7 +110,7 @@ public HoodieCommitMetadata doWriteOperation(String commitTime, WriteOperationTy
public HoodieTestTable moveInflightCommitToComplete(String instantTime, HoodieCommitMetadata metadata) throws IOException {
super.moveInflightCommitToComplete(instantTime, metadata);
if (writer != null) {
writer.updateFromWriteStatuses(metadata, context.get().emptyHoodieData(), instantTime);
writer.update(metadata, instantTime);
}
return this;
}
Expand All @@ -119,7 +119,7 @@ public HoodieTestTable moveInflightCommitToComplete(String instantTime, HoodieCo
public HoodieTestTable moveInflightCompactionToComplete(String instantTime, HoodieCommitMetadata metadata) throws IOException {
super.moveInflightCompactionToComplete(instantTime, metadata);
if (writer != null) {
writer.updateFromWriteStatuses(metadata, context.get().emptyHoodieData(), instantTime);
writer.update(metadata, instantTime);
}
return this;
}
Expand All @@ -146,7 +146,7 @@ public void repeatClean(String cleanCommitTime,
public HoodieTestTable addCompaction(String instantTime, HoodieCommitMetadata commitMetadata) throws Exception {
super.addCompaction(instantTime, commitMetadata);
if (writer != null) {
writer.updateFromWriteStatuses(commitMetadata, context.get().emptyHoodieData(), instantTime);
writer.update(commitMetadata, instantTime);
}
return this;
}
Expand Down Expand Up @@ -177,7 +177,7 @@ public HoodieTestTable addReplaceCommit(
HoodieReplaceCommitMetadata completeReplaceMetadata) throws Exception {
super.addReplaceCommit(instantTime, requestedReplaceMetadata, inflightReplaceMetadata, completeReplaceMetadata);
if (writer != null) {
writer.updateFromWriteStatuses(completeReplaceMetadata, context.get().emptyHoodieData(), instantTime);
writer.update(completeReplaceMetadata, instantTime);
}
return this;
}
Expand All @@ -187,7 +187,7 @@ public HoodieTestTable addCluster(String instantTime, HoodieRequestedReplaceMeta
HoodieReplaceCommitMetadata completeReplaceMetadata) throws Exception {
super.addCluster(instantTime, requestedReplaceMetadata, inflightReplaceMetadata, completeReplaceMetadata);
if (writer != null) {
writer.updateFromWriteStatuses(completeReplaceMetadata, context.get().emptyHoodieData(), instantTime);
writer.update(completeReplaceMetadata, instantTime);
}
return this;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ protected void completeCompaction(HoodieCommitMetadata metadata, HoodieTable tab
// commit to data table after committing to metadata table.
// Do not do any conflict resolution here as we do with regular writes. We take the lock here to ensure all writes to metadata table happens within a
// single lock (single writer). Because more than one write to metadata table will result in conflicts since all of them updates the same partition.
writeTableMetadata(table, compactionCommitTime, metadata, context.emptyHoodieData());
writeTableMetadata(table, compactionCommitTime, metadata);
LOG.info("Committing Compaction {} finished with result {}.", compactionCommitTime, metadata);
CompactHelpers.getInstance().completeInflightCompaction(table, compactionCommitTime, metadata);
} finally {
Expand All @@ -113,8 +113,7 @@ protected void completeCompaction(HoodieCommitMetadata metadata, HoodieTable tab
protected void completeClustering(
HoodieReplaceCommitMetadata metadata,
HoodieTable<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>> table,
String clusteringCommitTime,
Option<HoodieData<WriteStatus>> writeStatuses) {
String clusteringCommitTime) {
this.context.setJobStatus(this.getClass().getSimpleName(), "Collect clustering write status and commit clustering");
HoodieInstant clusteringInstant = ClusteringUtils.getInflightClusteringInstant(clusteringCommitTime, table.getActiveTimeline(), table.getInstantGenerator()).get();
List<HoodieWriteStat> writeStats = metadata.getPartitionToWriteStats().entrySet().stream().flatMap(e ->
Expand All @@ -135,7 +134,7 @@ protected void completeClustering(
// commit to data table after committing to metadata table.
// We take the lock here to ensure all writes to metadata table happens within a single lock (single writer).
// Because more than one write to metadata table will result in conflicts since all of them updates the same partition.
writeTableMetadata(table, clusteringCommitTime, metadata, writeStatuses.orElseGet(context::emptyHoodieData));
writeTableMetadata(table, clusteringCommitTime, metadata);

LOG.info("Committing Clustering {} finished with result {}.", clusteringCommitTime, metadata);
ClusteringUtils.transitionClusteringOrReplaceInflightToComplete(
Expand Down
Loading

0 comments on commit b4871da

Please sign in to comment.