diff --git a/xtable-api/src/main/java/org/apache/xtable/spi/sync/ConversionTarget.java b/xtable-api/src/main/java/org/apache/xtable/spi/sync/ConversionTarget.java index 646513a9d..a44dbaf58 100644 --- a/xtable-api/src/main/java/org/apache/xtable/spi/sync/ConversionTarget.java +++ b/xtable-api/src/main/java/org/apache/xtable/spi/sync/ConversionTarget.java @@ -91,6 +91,13 @@ public interface ConversionTarget { /** Initializes the client with provided configuration */ void init(TargetTable targetTable, Configuration configuration); - /** Return the commit identifier from target table */ + /** + * Retrieves the commit identifier from the target table that corresponds to a given source table + * commit identifier + * + * @param sourceIdentifier the unique identifier of the source table commit + * @return an {@link Optional} containing the target commit identifier if a corresponding commit + * exists, or an empty {@link Optional} if no match is found + */ Optional getTargetCommitIdentifier(String sourceIdentifier); } diff --git a/xtable-core/src/test/java/org/apache/xtable/delta/TestDeltaSync.java b/xtable-core/src/test/java/org/apache/xtable/delta/TestDeltaSync.java index b061b819e..f770594ea 100644 --- a/xtable-core/src/test/java/org/apache/xtable/delta/TestDeltaSync.java +++ b/xtable-core/src/test/java/org/apache/xtable/delta/TestDeltaSync.java @@ -19,6 +19,7 @@ package org.apache.xtable.delta; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertTrue; import static org.junit.jupiter.api.parallel.ExecutionMode.SAME_THREAD; @@ -165,17 +166,20 @@ public void testCreateSnapshotControlFlow() throws Exception { TableFormatSync.getInstance() .syncSnapshot(Collections.singletonList(conversionTarget), snapshot1); - Optional targetIdentifier1 = conversionTarget.getTargetCommitIdentifier("0"); + Optional targetIdentifier1 = conversionTarget.getTargetCommitIdentifier(snapshot1.getSourceIdentifier()); validateDeltaTable(basePath, new HashSet<>(Arrays.asList(dataFile1, dataFile2)), null); assertTrue(targetIdentifier1.isPresent()); assertEquals("0", targetIdentifier1.get()); TableFormatSync.getInstance() .syncSnapshot(Collections.singletonList(conversionTarget), snapshot2); - Optional targetIdentifier2 = conversionTarget.getTargetCommitIdentifier("1"); + Optional targetIdentifier2 = conversionTarget.getTargetCommitIdentifier(snapshot2.getSourceIdentifier()); validateDeltaTable(basePath, new HashSet<>(Arrays.asList(dataFile2, dataFile3)), null); assertTrue(targetIdentifier2.isPresent()); assertEquals("1", targetIdentifier2.get()); + + Optional emptyTargetIdentifier = conversionTarget.getTargetCommitIdentifier("3"); + assertFalse(emptyTargetIdentifier.isPresent()); } @Test @@ -221,7 +225,7 @@ public void testPrimitiveFieldPartitioning() throws Exception { EqualTo equalToExpr = new EqualTo(new Column("string_field", new StringType()), Literal.of("warning")); - InternalSnapshot snapshot1 = buildSnapshot(table, dataFile1, dataFile2, dataFile3); + InternalSnapshot snapshot1 = buildSnapshot(table, "0", dataFile1, dataFile2, dataFile3); TableFormatSync.getInstance() .syncSnapshot(Collections.singletonList(conversionTarget), snapshot1); @@ -301,7 +305,7 @@ public void testMultipleFieldPartitioning() throws Exception { EqualTo equalToExpr2 = new EqualTo(new Column("int_field", new IntegerType()), Literal.of(20)); And CombinedExpr = new And(equalToExpr1, equalToExpr2); - InternalSnapshot snapshot1 = buildSnapshot(table, dataFile1, dataFile2, dataFile3); + InternalSnapshot snapshot1 = buildSnapshot(table, "0", dataFile1, dataFile2, dataFile3); TableFormatSync.getInstance() .syncSnapshot(Collections.singletonList(conversionTarget), snapshot1); validateDeltaTable(basePath, new HashSet<>(Arrays.asList(dataFile2)), CombinedExpr); @@ -344,7 +348,7 @@ public void testTimestampPartitioning(PartitionTransformType transformType) thro InternalDataFile dataFile2 = getDataFile(2, partitionValues1, basePath); InternalDataFile dataFile3 = getDataFile(3, partitionValues2, basePath); - InternalSnapshot snapshot1 = buildSnapshot(table, dataFile1, dataFile2, dataFile3); + InternalSnapshot snapshot1 = buildSnapshot(table, "0", dataFile1, dataFile2, dataFile3); TableFormatSync.getInstance() .syncSnapshot(Collections.singletonList(conversionTarget), snapshot1); @@ -415,10 +419,6 @@ private void validateDeltaTable( internalDataFiles.size(), count, "Number of files from DeltaScan don't match expectation"); } - private InternalSnapshot buildSnapshot(InternalTable table, InternalDataFile... dataFiles) { - return buildSnapshot(table, "", dataFiles); - } - private InternalSnapshot buildSnapshot( InternalTable table, String sourceIdentifier, InternalDataFile... dataFiles) { return InternalSnapshot.builder() diff --git a/xtable-core/src/test/java/org/apache/xtable/hudi/ITHudiConversionSourceTarget.java b/xtable-core/src/test/java/org/apache/xtable/hudi/ITHudiConversionSourceTarget.java index 2373b2215..8e7235fc5 100644 --- a/xtable-core/src/test/java/org/apache/xtable/hudi/ITHudiConversionSourceTarget.java +++ b/xtable-core/src/test/java/org/apache/xtable/hudi/ITHudiConversionSourceTarget.java @@ -287,7 +287,6 @@ void archiveTimelineAndCleanMetadataTableAfterMultipleCommits(String partitionPa .range(Range.scalar("partitionPath")) .build())) .build()); - // sync snapshot and metadata InternalTable initialState = getState(Instant.now().minus(24, ChronoUnit.HOURS)); HudiConversionTarget targetClient = getTargetClient(); @@ -393,8 +392,8 @@ CONTEXT, getHoodieWriteConfig(metaClient).getMetadataConfig(), tableBasePath, tr assertTrue(targetIdentifier5.isPresent()); // Case that return empty target identifier - Optional targetIdentifier6 = targetClient.getTargetCommitIdentifier("5", metaClient); - assertFalse(targetIdentifier6.isPresent()); + Optional emptyTargetIdentifier = targetClient.getTargetCommitIdentifier("5", metaClient); + assertFalse(emptyTargetIdentifier.isPresent()); // the first commit to the timeline should be archived assertEquals( diff --git a/xtable-core/src/test/java/org/apache/xtable/hudi/TestHudiConversionSourceTarget.java b/xtable-core/src/test/java/org/apache/xtable/hudi/TestHudiConversionSourceTarget.java index a7838f826..9aec9be5c 100644 --- a/xtable-core/src/test/java/org/apache/xtable/hudi/TestHudiConversionSourceTarget.java +++ b/xtable-core/src/test/java/org/apache/xtable/hudi/TestHudiConversionSourceTarget.java @@ -249,6 +249,7 @@ void syncFilesForSnapshot() { void beginSyncForExistingTable() { HoodieTableMetaClient mockMetaClient = mock(HoodieTableMetaClient.class); HudiConversionTarget targetClient = getTargetClient(mockMetaClient); + targetClient.beginSync(TABLE); // verify meta client timeline refreshed verify(mockMetaClient).reloadActiveTimeline(); diff --git a/xtable-core/src/test/java/org/apache/xtable/iceberg/TestIcebergSync.java b/xtable-core/src/test/java/org/apache/xtable/iceberg/TestIcebergSync.java index 9ab4ba9e4..71c6afd34 100644 --- a/xtable-core/src/test/java/org/apache/xtable/iceberg/TestIcebergSync.java +++ b/xtable-core/src/test/java/org/apache/xtable/iceberg/TestIcebergSync.java @@ -257,8 +257,8 @@ public void testCreateSnapshotControlFlow() throws Exception { assertTrue(targetIdentifier2.isPresent()); // Case that return empty target identifier - Optional targetIdentifier3 = conversionTarget.getTargetCommitIdentifier("3"); - assertFalse(targetIdentifier3.isPresent()); + Optional emptyTargetIdentifier = conversionTarget.getTargetCommitIdentifier("3"); + assertFalse(emptyTargetIdentifier.isPresent()); ArgumentCaptor transactionArgumentCaptor = ArgumentCaptor.forClass(Transaction.class); @@ -332,9 +332,9 @@ public void testIncompleteWriteRollback() throws Exception { InternalDataFile dataFile2 = getDataFile(2, Collections.emptyList()); InternalDataFile dataFile3 = getDataFile(3, Collections.emptyList()); InternalDataFile dataFile4 = getDataFile(4, Collections.emptyList()); - InternalSnapshot snapshot1 = buildSnapshot(table1, dataFile1, dataFile2); - InternalSnapshot snapshot2 = buildSnapshot(table2, dataFile2, dataFile3); - InternalSnapshot snapshot3 = buildSnapshot(table2, dataFile3, dataFile4); + InternalSnapshot snapshot1 = buildSnapshot(table1, "0", dataFile1, dataFile2); + InternalSnapshot snapshot2 = buildSnapshot(table2, "1", dataFile2, dataFile3); + InternalSnapshot snapshot3 = buildSnapshot(table2, "2", dataFile3, dataFile4); when(mockSchemaExtractor.toIceberg(internalSchema)).thenReturn(icebergSchema); when(mockSchemaExtractor.toIceberg(schema2)).thenReturn(icebergSchema2); ArgumentCaptor partitionSpecSchemaArgumentCaptor = @@ -408,7 +408,7 @@ public void testTimestampPartitioning() throws Exception { InternalDataFile dataFile1 = getDataFile(1, partitionValues1); InternalDataFile dataFile2 = getDataFile(2, partitionValues1); InternalDataFile dataFile3 = getDataFile(3, partitionValues2); - InternalSnapshot snapshot = buildSnapshot(table, dataFile1, dataFile2, dataFile3); + InternalSnapshot snapshot = buildSnapshot(table, "0", dataFile1, dataFile2, dataFile3); when(mockSchemaExtractor.toIceberg(internalSchema)) .thenReturn(icebergSchema) @@ -471,7 +471,7 @@ public void testDatePartitioning() throws Exception { InternalDataFile dataFile1 = getDataFile(1, partitionValues1); InternalDataFile dataFile2 = getDataFile(2, partitionValues1); InternalDataFile dataFile3 = getDataFile(3, partitionValues2); - InternalSnapshot snapshot = buildSnapshot(table, dataFile1, dataFile2, dataFile3); + InternalSnapshot snapshot = buildSnapshot(table, "0", dataFile1, dataFile2, dataFile3); when(mockSchemaExtractor.toIceberg(internalSchema)).thenReturn(icebergSchema); PartitionSpec partitionSpec = @@ -525,7 +525,7 @@ public void testNumericFieldPartitioning() throws Exception { InternalDataFile dataFile1 = getDataFile(1, partitionValues1); InternalDataFile dataFile2 = getDataFile(2, partitionValues1); InternalDataFile dataFile3 = getDataFile(3, partitionValues2); - InternalSnapshot snapshot = buildSnapshot(table, dataFile1, dataFile2, dataFile3); + InternalSnapshot snapshot = buildSnapshot(table, "0", dataFile1, dataFile2, dataFile3); when(mockSchemaExtractor.toIceberg(internalSchema)).thenReturn(icebergSchema); PartitionSpec partitionSpec = @@ -600,7 +600,7 @@ public void testMultipleFieldPartitioning() throws Exception { InternalDataFile dataFile1 = getDataFile(1, partitionValues1); InternalDataFile dataFile2 = getDataFile(2, partitionValues2); InternalDataFile dataFile3 = getDataFile(3, partitionValues3); - InternalSnapshot snapshot = buildSnapshot(table, dataFile1, dataFile2, dataFile3); + InternalSnapshot snapshot = buildSnapshot(table, "0", dataFile1, dataFile2, dataFile3); when(mockSchemaExtractor.toIceberg(internalSchema)).thenReturn(icebergSchema); PartitionSpec partitionSpec = @@ -666,7 +666,7 @@ public void testNestedFieldPartitioning() throws Exception { InternalDataFile dataFile1 = getDataFile(1, partitionValues1); InternalDataFile dataFile2 = getDataFile(2, partitionValues1); InternalDataFile dataFile3 = getDataFile(3, partitionValues2); - InternalSnapshot snapshot = buildSnapshot(table, dataFile1, dataFile2, dataFile3); + InternalSnapshot snapshot = buildSnapshot(table, "0", dataFile1, dataFile2, dataFile3); when(mockSchemaExtractor.toIceberg(internalSchema)).thenReturn(icebergSchema); PartitionSpec partitionSpec = @@ -691,10 +691,6 @@ public void testNestedFieldPartitioning() throws Exception { Expressions.equal(partitionField.getSourceField().getPath(), "value1")); } - private InternalSnapshot buildSnapshot(InternalTable table, InternalDataFile... dataFiles) { - return buildSnapshot(table, "", dataFiles); - } - private InternalSnapshot buildSnapshot( InternalTable table, String sourceIdentifier, InternalDataFile... dataFiles) { return InternalSnapshot.builder()