Skip to content

Commit

Permalink
refactor get target commit process
Browse files Browse the repository at this point in the history
  • Loading branch information
danielhumanmod committed Dec 13, 2024
1 parent 0af0a82 commit 82b2d2d
Show file tree
Hide file tree
Showing 5 changed files with 47 additions and 68 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -223,7 +223,6 @@ public String getTableFormat() {

@Override
public Optional<String> getTargetCommitIdentifier(String sourceIdentifier) {
long sourceIdentifierVal = Long.parseLong(sourceIdentifier);
Snapshot currentSnapshot = deltaLog.currentSnapshot().snapshot();

Iterator<Tuple2<Object, Seq<Action>>> versionIterator =
Expand Down Expand Up @@ -266,11 +265,6 @@ public Optional<String> getTargetCommitIdentifier(String sourceIdentifier) {
if (sourceIdentifier.equals(metadata.getSourceIdentifier())) {
return Optional.of(String.valueOf(targetVersion));
}

// Stop if greater than sourceIdentifier since we're iterating from oldest to newest
if (Long.parseLong(metadata.getSourceIdentifier()) > sourceIdentifierVal) {
return Optional.empty();
}
} catch (Exception e) {
log.warn("Failed to parse commit metadata for commit: {}", targetVersion, e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -287,28 +287,7 @@ public Optional<TableSyncMetadata> getTableMetadata() {
.filterCompletedInstants()
.lastInstant()
.toJavaOptional()
.map(
instant -> {
try {
if (instant.getAction().equals(HoodieTimeline.REPLACE_COMMIT_ACTION)) {
return HoodieReplaceCommitMetadata.fromBytes(
client.getActiveTimeline().getInstantDetails(instant).get(),
HoodieReplaceCommitMetadata.class)
.getExtraMetadata();
} else {
return HoodieCommitMetadata.fromBytes(
client.getActiveTimeline().getInstantDetails(instant).get(),
HoodieCommitMetadata.class)
.getExtraMetadata();
}
} catch (IOException ex) {
throw new ReadException("Unable to read Hudi commit metadata", ex);
}
})
.flatMap(
metadata ->
TableSyncMetadata.fromJson(
metadata.get(TableSyncMetadata.XTABLE_METADATA))));
.flatMap(instant -> getMetadata(instant, client)));
}

@Override
Expand All @@ -326,40 +305,19 @@ public Optional<String> getTargetCommitIdentifier(String sourceIdentifier) {

Optional<String> getTargetCommitIdentifier(
String sourceIdentifier, HoodieTableMetaClient metaClient) {
long sourceIdentifierVal = Long.parseLong(sourceIdentifier);

HoodieTimeline completedTimeline = metaClient.getActiveTimeline().filterCompletedInstants();
HoodieTimeline commitTimeline = metaClient.getCommitsTimeline();

for (HoodieInstant instant : completedTimeline.getInstants()) {
for (HoodieInstant instant : commitTimeline.getInstants()) {
try {
Option<byte[]> instantDetails = metaClient.getActiveTimeline().getInstantDetails(instant);
if (!instantDetails.isPresent()) {
continue;
}

HoodieCommitMetadata commitMetadata =
HoodieCommitMetadata.fromBytes(instantDetails.get(), HoodieCommitMetadata.class);
String sourceMetadataJson =
commitMetadata.getExtraMetadata().get(TableSyncMetadata.XTABLE_METADATA);
if (sourceMetadataJson == null) {
continue;
}

Optional<TableSyncMetadata> optionalMetadata =
TableSyncMetadata.fromJson(sourceMetadataJson);
Optional<TableSyncMetadata> optionalMetadata = getMetadata(instant, metaClient);
if (!optionalMetadata.isPresent()) {
return Optional.empty();
}

TableSyncMetadata metadata = optionalMetadata.get();
if (sourceIdentifier.equals(metadata.getSourceIdentifier())) {
return Optional.of(sourceIdentifier);
}

long curSourceIdentifierVal = Long.parseLong(metadata.getSourceIdentifier());
// Stop if greater than sourceIdentifier since we're iterating from oldest to newest
if (curSourceIdentifierVal > sourceIdentifierVal) {
return Optional.empty();
return Optional.of(String.valueOf(instant.getTimestamp()));
}
} catch (Exception e) {
log.warn("Failed to parse commit metadata for instant: {}", instant, e);
Expand All @@ -373,6 +331,37 @@ private HoodieTableMetaClient getMetaClient() {
() -> new IllegalStateException("beginSync must be called before calling this method"));
}

private Optional<TableSyncMetadata> getMetadata(
HoodieInstant instant, HoodieTableMetaClient metaClient) {
try {
// Get instant details
Option<byte[]> instantDetails = metaClient.getActiveTimeline().getInstantDetails(instant);
if (!instantDetails.isPresent()) {
return Optional.empty();
}

// Check action and parse the appropriate metadata
Map<String, String> extraMetadata;
if (instant.getAction().equals(HoodieTimeline.REPLACE_COMMIT_ACTION)) {
HoodieReplaceCommitMetadata replaceCommitMetadata =
HoodieReplaceCommitMetadata.fromBytes(
instantDetails.get(), HoodieReplaceCommitMetadata.class);
extraMetadata = replaceCommitMetadata.getExtraMetadata();
} else {
HoodieCommitMetadata commitMetadata =
HoodieCommitMetadata.fromBytes(instantDetails.get(), HoodieCommitMetadata.class);
extraMetadata = commitMetadata.getExtraMetadata();
}

// Extract and transform to TableSyncMetadata
String sourceMetadataJson = extraMetadata.get(TableSyncMetadata.XTABLE_METADATA);
return TableSyncMetadata.fromJson(sourceMetadataJson);
} catch (Exception e) {
log.warn("Failed to parse commit metadata for instant: {}", instant, e);
return Optional.empty();
}
}

static class CommitState {
private HoodieTableMetaClient metaClient;
@Getter private final String instantTime;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -253,7 +253,6 @@ public String getTableFormat() {

@Override
public Optional<String> getTargetCommitIdentifier(String sourceIdentifier) {
long sourceIdentifierVal = Long.parseLong(sourceIdentifier);
for (Snapshot snapshot : table.snapshots()) {
Map<String, String> summary = snapshot.summary();
String sourceMetadataJson = summary.get(TableSyncMetadata.XTABLE_METADATA);
Expand All @@ -270,13 +269,7 @@ public Optional<String> getTargetCommitIdentifier(String sourceIdentifier) {

TableSyncMetadata metadata = optionalMetadata.get();
if (sourceIdentifier.equals(metadata.getSourceIdentifier())) {
return Optional.of(metadata.getSourceIdentifier());
}

long curSourceIdentifierVal = Long.parseLong(metadata.getSourceIdentifier());
// Stop if greater than sourceIdentifier since we're iterating from oldest to newest
if (curSourceIdentifierVal > sourceIdentifierVal) {
return Optional.empty();
return Optional.of(String.valueOf(snapshot.snapshotId()));
}
} catch (Exception e) {
log.warn("Failed to parse parse snapshot metadata for {}", snapshot.snapshotId(), e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import static org.apache.xtable.hudi.HudiTestUtil.getHoodieWriteConfig;
import static org.apache.xtable.hudi.HudiTestUtil.initTableAndGetMetaClient;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;

import java.nio.file.Path;
Expand Down Expand Up @@ -312,7 +313,6 @@ CONTEXT, getHoodieWriteConfig(metaClient).getMetadataConfig(), tableBasePath, tr
Optional<String> targetIdentifier =
targetClient.getTargetCommitIdentifier(latestState.getSourceIdentifier(), metaClient);
assertTrue(targetIdentifier.isPresent());
assertEquals(latestState.getSourceIdentifier(), targetIdentifier.get());

// create a new commit that removes fileName1 and adds fileName2
String fileName2 = "file_2.parquet";
Expand All @@ -336,7 +336,6 @@ CONTEXT, getHoodieWriteConfig(metaClient).getMetadataConfig(), tableBasePath, tr
}
Optional<String> targetIdentifier2 = targetClient.getTargetCommitIdentifier("1", metaClient);
assertTrue(targetIdentifier2.isPresent());
assertEquals("1", targetIdentifier2.get());

// create a new commit that removes fileName2 and adds fileName3
String fileName3 = "file_3.parquet";
Expand Down Expand Up @@ -386,15 +385,16 @@ CONTEXT, getHoodieWriteConfig(metaClient).getMetadataConfig(), tableBasePath, tr
}
Optional<String> targetIdentifier3 = targetClient.getTargetCommitIdentifier("2", metaClient);
assertTrue(targetIdentifier3.isPresent());
assertEquals("2", targetIdentifier3.get());

Optional<String> targetIdentifier4 = targetClient.getTargetCommitIdentifier("3", metaClient);
assertTrue(targetIdentifier4.isPresent());
assertEquals("3", targetIdentifier4.get());

Optional<String> targetIdentifier5 = targetClient.getTargetCommitIdentifier("4", metaClient);
assertTrue(targetIdentifier5.isPresent());
assertEquals("4", targetIdentifier5.get());

// Case that return empty target identifier
Optional<String> targetIdentifier6 = targetClient.getTargetCommitIdentifier("5", metaClient);
assertFalse(targetIdentifier6.isPresent());

// the first commit to the timeline should be archived
assertEquals(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -248,15 +248,18 @@ public void testCreateSnapshotControlFlow() throws Exception {
conversionTarget.getTargetCommitIdentifier(snapshot1.getSourceIdentifier());
validateIcebergTable(tableName, table1, Sets.newHashSet(dataFile1, dataFile2), null);
assertTrue(targetIdentifier1.isPresent());
assertEquals("0", targetIdentifier1.get());

TableFormatSync.getInstance()
.syncSnapshot(Collections.singletonList(conversionTarget), snapshot2);
Optional<String> targetIdentifier2 =
conversionTarget.getTargetCommitIdentifier(snapshot2.getSourceIdentifier());
validateIcebergTable(tableName, table2, Sets.newHashSet(dataFile2, dataFile3), null);
assertTrue(targetIdentifier2.isPresent());
assertEquals("1", targetIdentifier2.get());

// Case that return empty target identifier
Optional<String> targetIdentifier3 =
conversionTarget.getTargetCommitIdentifier("3");
assertFalse(targetIdentifier3.isPresent());

ArgumentCaptor<Transaction> transactionArgumentCaptor =
ArgumentCaptor.forClass(Transaction.class);
Expand Down

0 comments on commit 82b2d2d

Please sign in to comment.