From 82b2d2d65807f70a84be4bed4cb4e865f3d26fa8 Mon Sep 17 00:00:00 2001 From: Daniel Tu Date: Fri, 13 Dec 2024 00:30:55 -0800 Subject: [PATCH] refactor get target commit process --- .../xtable/delta/DeltaConversionTarget.java | 6 -- .../xtable/hudi/HudiConversionTarget.java | 83 ++++++++----------- .../iceberg/IcebergConversionTarget.java | 9 +- .../hudi/ITHudiConversionSourceTarget.java | 10 +-- .../xtable/iceberg/TestIcebergSync.java | 7 +- 5 files changed, 47 insertions(+), 68 deletions(-) diff --git a/xtable-core/src/main/java/org/apache/xtable/delta/DeltaConversionTarget.java b/xtable-core/src/main/java/org/apache/xtable/delta/DeltaConversionTarget.java index 00f645167..fa3f7a8fb 100644 --- a/xtable-core/src/main/java/org/apache/xtable/delta/DeltaConversionTarget.java +++ b/xtable-core/src/main/java/org/apache/xtable/delta/DeltaConversionTarget.java @@ -223,7 +223,6 @@ public String getTableFormat() { @Override public Optional getTargetCommitIdentifier(String sourceIdentifier) { - long sourceIdentifierVal = Long.parseLong(sourceIdentifier); Snapshot currentSnapshot = deltaLog.currentSnapshot().snapshot(); Iterator>> versionIterator = @@ -266,11 +265,6 @@ public Optional getTargetCommitIdentifier(String sourceIdentifier) { if (sourceIdentifier.equals(metadata.getSourceIdentifier())) { return Optional.of(String.valueOf(targetVersion)); } - - // Stop if greater than sourceIdentifier since we're iterating from oldest to newest - if (Long.parseLong(metadata.getSourceIdentifier()) > sourceIdentifierVal) { - return Optional.empty(); - } } catch (Exception e) { log.warn("Failed to parse commit metadata for commit: {}", targetVersion, e); } diff --git a/xtable-core/src/main/java/org/apache/xtable/hudi/HudiConversionTarget.java b/xtable-core/src/main/java/org/apache/xtable/hudi/HudiConversionTarget.java index d3ae487df..c39ffd639 100644 --- a/xtable-core/src/main/java/org/apache/xtable/hudi/HudiConversionTarget.java +++ b/xtable-core/src/main/java/org/apache/xtable/hudi/HudiConversionTarget.java @@ -287,28 +287,7 @@ public Optional getTableMetadata() { .filterCompletedInstants() .lastInstant() .toJavaOptional() - .map( - instant -> { - try { - if (instant.getAction().equals(HoodieTimeline.REPLACE_COMMIT_ACTION)) { - return HoodieReplaceCommitMetadata.fromBytes( - client.getActiveTimeline().getInstantDetails(instant).get(), - HoodieReplaceCommitMetadata.class) - .getExtraMetadata(); - } else { - return HoodieCommitMetadata.fromBytes( - client.getActiveTimeline().getInstantDetails(instant).get(), - HoodieCommitMetadata.class) - .getExtraMetadata(); - } - } catch (IOException ex) { - throw new ReadException("Unable to read Hudi commit metadata", ex); - } - }) - .flatMap( - metadata -> - TableSyncMetadata.fromJson( - metadata.get(TableSyncMetadata.XTABLE_METADATA)))); + .flatMap(instant -> getMetadata(instant, client))); } @Override @@ -326,40 +305,19 @@ public Optional getTargetCommitIdentifier(String sourceIdentifier) { Optional getTargetCommitIdentifier( String sourceIdentifier, HoodieTableMetaClient metaClient) { - long sourceIdentifierVal = Long.parseLong(sourceIdentifier); - HoodieTimeline completedTimeline = metaClient.getActiveTimeline().filterCompletedInstants(); + HoodieTimeline commitTimeline = metaClient.getCommitsTimeline(); - for (HoodieInstant instant : completedTimeline.getInstants()) { + for (HoodieInstant instant : commitTimeline.getInstants()) { try { - Option instantDetails = metaClient.getActiveTimeline().getInstantDetails(instant); - if (!instantDetails.isPresent()) { - continue; - } - - HoodieCommitMetadata commitMetadata = - HoodieCommitMetadata.fromBytes(instantDetails.get(), HoodieCommitMetadata.class); - String sourceMetadataJson = - commitMetadata.getExtraMetadata().get(TableSyncMetadata.XTABLE_METADATA); - if (sourceMetadataJson == null) { - continue; - } - - Optional optionalMetadata = - TableSyncMetadata.fromJson(sourceMetadataJson); + Optional optionalMetadata = getMetadata(instant, metaClient); if (!optionalMetadata.isPresent()) { return Optional.empty(); } TableSyncMetadata metadata = optionalMetadata.get(); if (sourceIdentifier.equals(metadata.getSourceIdentifier())) { - return Optional.of(sourceIdentifier); - } - - long curSourceIdentifierVal = Long.parseLong(metadata.getSourceIdentifier()); - // Stop if greater than sourceIdentifier since we're iterating from oldest to newest - if (curSourceIdentifierVal > sourceIdentifierVal) { - return Optional.empty(); + return Optional.of(String.valueOf(instant.getTimestamp())); } } catch (Exception e) { log.warn("Failed to parse commit metadata for instant: {}", instant, e); @@ -373,6 +331,37 @@ private HoodieTableMetaClient getMetaClient() { () -> new IllegalStateException("beginSync must be called before calling this method")); } + private Optional getMetadata( + HoodieInstant instant, HoodieTableMetaClient metaClient) { + try { + // Get instant details + Option instantDetails = metaClient.getActiveTimeline().getInstantDetails(instant); + if (!instantDetails.isPresent()) { + return Optional.empty(); + } + + // Check action and parse the appropriate metadata + Map extraMetadata; + if (instant.getAction().equals(HoodieTimeline.REPLACE_COMMIT_ACTION)) { + HoodieReplaceCommitMetadata replaceCommitMetadata = + HoodieReplaceCommitMetadata.fromBytes( + instantDetails.get(), HoodieReplaceCommitMetadata.class); + extraMetadata = replaceCommitMetadata.getExtraMetadata(); + } else { + HoodieCommitMetadata commitMetadata = + HoodieCommitMetadata.fromBytes(instantDetails.get(), HoodieCommitMetadata.class); + extraMetadata = commitMetadata.getExtraMetadata(); + } + + // Extract and transform to TableSyncMetadata + String sourceMetadataJson = extraMetadata.get(TableSyncMetadata.XTABLE_METADATA); + return TableSyncMetadata.fromJson(sourceMetadataJson); + } catch (Exception e) { + log.warn("Failed to parse commit metadata for instant: {}", instant, e); + return Optional.empty(); + } + } + static class CommitState { private HoodieTableMetaClient metaClient; @Getter private final String instantTime; diff --git a/xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergConversionTarget.java b/xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergConversionTarget.java index f476436df..c7e5721bb 100644 --- a/xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergConversionTarget.java +++ b/xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergConversionTarget.java @@ -253,7 +253,6 @@ public String getTableFormat() { @Override public Optional getTargetCommitIdentifier(String sourceIdentifier) { - long sourceIdentifierVal = Long.parseLong(sourceIdentifier); for (Snapshot snapshot : table.snapshots()) { Map summary = snapshot.summary(); String sourceMetadataJson = summary.get(TableSyncMetadata.XTABLE_METADATA); @@ -270,13 +269,7 @@ public Optional getTargetCommitIdentifier(String sourceIdentifier) { TableSyncMetadata metadata = optionalMetadata.get(); if (sourceIdentifier.equals(metadata.getSourceIdentifier())) { - return Optional.of(metadata.getSourceIdentifier()); - } - - long curSourceIdentifierVal = Long.parseLong(metadata.getSourceIdentifier()); - // Stop if greater than sourceIdentifier since we're iterating from oldest to newest - if (curSourceIdentifierVal > sourceIdentifierVal) { - return Optional.empty(); + return Optional.of(String.valueOf(snapshot.snapshotId())); } } catch (Exception e) { log.warn("Failed to parse parse snapshot metadata for {}", snapshot.snapshotId(), e); diff --git a/xtable-core/src/test/java/org/apache/xtable/hudi/ITHudiConversionSourceTarget.java b/xtable-core/src/test/java/org/apache/xtable/hudi/ITHudiConversionSourceTarget.java index 9138b51a3..2373b2215 100644 --- a/xtable-core/src/test/java/org/apache/xtable/hudi/ITHudiConversionSourceTarget.java +++ b/xtable-core/src/test/java/org/apache/xtable/hudi/ITHudiConversionSourceTarget.java @@ -22,6 +22,7 @@ import static org.apache.xtable.hudi.HudiTestUtil.getHoodieWriteConfig; import static org.apache.xtable.hudi.HudiTestUtil.initTableAndGetMetaClient; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertTrue; import java.nio.file.Path; @@ -312,7 +313,6 @@ CONTEXT, getHoodieWriteConfig(metaClient).getMetadataConfig(), tableBasePath, tr Optional targetIdentifier = targetClient.getTargetCommitIdentifier(latestState.getSourceIdentifier(), metaClient); assertTrue(targetIdentifier.isPresent()); - assertEquals(latestState.getSourceIdentifier(), targetIdentifier.get()); // create a new commit that removes fileName1 and adds fileName2 String fileName2 = "file_2.parquet"; @@ -336,7 +336,6 @@ CONTEXT, getHoodieWriteConfig(metaClient).getMetadataConfig(), tableBasePath, tr } Optional targetIdentifier2 = targetClient.getTargetCommitIdentifier("1", metaClient); assertTrue(targetIdentifier2.isPresent()); - assertEquals("1", targetIdentifier2.get()); // create a new commit that removes fileName2 and adds fileName3 String fileName3 = "file_3.parquet"; @@ -386,15 +385,16 @@ CONTEXT, getHoodieWriteConfig(metaClient).getMetadataConfig(), tableBasePath, tr } Optional targetIdentifier3 = targetClient.getTargetCommitIdentifier("2", metaClient); assertTrue(targetIdentifier3.isPresent()); - assertEquals("2", targetIdentifier3.get()); Optional targetIdentifier4 = targetClient.getTargetCommitIdentifier("3", metaClient); assertTrue(targetIdentifier4.isPresent()); - assertEquals("3", targetIdentifier4.get()); Optional targetIdentifier5 = targetClient.getTargetCommitIdentifier("4", metaClient); assertTrue(targetIdentifier5.isPresent()); - assertEquals("4", targetIdentifier5.get()); + + // Case that return empty target identifier + Optional targetIdentifier6 = targetClient.getTargetCommitIdentifier("5", metaClient); + assertFalse(targetIdentifier6.isPresent()); // the first commit to the timeline should be archived assertEquals( diff --git a/xtable-core/src/test/java/org/apache/xtable/iceberg/TestIcebergSync.java b/xtable-core/src/test/java/org/apache/xtable/iceberg/TestIcebergSync.java index 18599d03f..4a379c617 100644 --- a/xtable-core/src/test/java/org/apache/xtable/iceberg/TestIcebergSync.java +++ b/xtable-core/src/test/java/org/apache/xtable/iceberg/TestIcebergSync.java @@ -248,7 +248,6 @@ public void testCreateSnapshotControlFlow() throws Exception { conversionTarget.getTargetCommitIdentifier(snapshot1.getSourceIdentifier()); validateIcebergTable(tableName, table1, Sets.newHashSet(dataFile1, dataFile2), null); assertTrue(targetIdentifier1.isPresent()); - assertEquals("0", targetIdentifier1.get()); TableFormatSync.getInstance() .syncSnapshot(Collections.singletonList(conversionTarget), snapshot2); @@ -256,7 +255,11 @@ public void testCreateSnapshotControlFlow() throws Exception { conversionTarget.getTargetCommitIdentifier(snapshot2.getSourceIdentifier()); validateIcebergTable(tableName, table2, Sets.newHashSet(dataFile2, dataFile3), null); assertTrue(targetIdentifier2.isPresent()); - assertEquals("1", targetIdentifier2.get()); + + // Case that return empty target identifier + Optional targetIdentifier3 = + conversionTarget.getTargetCommitIdentifier("3"); + assertFalse(targetIdentifier3.isPresent()); ArgumentCaptor transactionArgumentCaptor = ArgumentCaptor.forClass(Transaction.class);