Skip to content

Commit

Permalink
improve doc and test
Browse files Browse the repository at this point in the history
  • Loading branch information
danielhumanmod committed Dec 14, 2024
1 parent 7ffda25 commit 4198e5b
Show file tree
Hide file tree
Showing 5 changed files with 30 additions and 27 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,13 @@ public interface ConversionTarget {
/** Initializes the client with provided configuration */
void init(TargetTable targetTable, Configuration configuration);

/** Return the commit identifier from target table */
/**
* Retrieves the commit identifier from the target table that corresponds to a given source table
* commit identifier
*
* @param sourceIdentifier the unique identifier of the source table commit
* @return an {@link Optional} containing the target commit identifier if a corresponding commit
* exists, or an empty {@link Optional} if no match is found
*/
Optional<String> getTargetCommitIdentifier(String sourceIdentifier);
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.xtable.delta;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.parallel.ExecutionMode.SAME_THREAD;
Expand Down Expand Up @@ -165,17 +166,20 @@ public void testCreateSnapshotControlFlow() throws Exception {

TableFormatSync.getInstance()
.syncSnapshot(Collections.singletonList(conversionTarget), snapshot1);
Optional<String> targetIdentifier1 = conversionTarget.getTargetCommitIdentifier("0");
Optional<String> targetIdentifier1 = conversionTarget.getTargetCommitIdentifier(snapshot1.getSourceIdentifier());
validateDeltaTable(basePath, new HashSet<>(Arrays.asList(dataFile1, dataFile2)), null);
assertTrue(targetIdentifier1.isPresent());
assertEquals("0", targetIdentifier1.get());

TableFormatSync.getInstance()
.syncSnapshot(Collections.singletonList(conversionTarget), snapshot2);
Optional<String> targetIdentifier2 = conversionTarget.getTargetCommitIdentifier("1");
Optional<String> targetIdentifier2 = conversionTarget.getTargetCommitIdentifier(snapshot2.getSourceIdentifier());
validateDeltaTable(basePath, new HashSet<>(Arrays.asList(dataFile2, dataFile3)), null);
assertTrue(targetIdentifier2.isPresent());
assertEquals("1", targetIdentifier2.get());

Optional<String> emptyTargetIdentifier = conversionTarget.getTargetCommitIdentifier("3");
assertFalse(emptyTargetIdentifier.isPresent());
}

@Test
Expand Down Expand Up @@ -221,7 +225,7 @@ public void testPrimitiveFieldPartitioning() throws Exception {
EqualTo equalToExpr =
new EqualTo(new Column("string_field", new StringType()), Literal.of("warning"));

InternalSnapshot snapshot1 = buildSnapshot(table, dataFile1, dataFile2, dataFile3);
InternalSnapshot snapshot1 = buildSnapshot(table, "0", dataFile1, dataFile2, dataFile3);

TableFormatSync.getInstance()
.syncSnapshot(Collections.singletonList(conversionTarget), snapshot1);
Expand Down Expand Up @@ -301,7 +305,7 @@ public void testMultipleFieldPartitioning() throws Exception {
EqualTo equalToExpr2 = new EqualTo(new Column("int_field", new IntegerType()), Literal.of(20));
And CombinedExpr = new And(equalToExpr1, equalToExpr2);

InternalSnapshot snapshot1 = buildSnapshot(table, dataFile1, dataFile2, dataFile3);
InternalSnapshot snapshot1 = buildSnapshot(table, "0", dataFile1, dataFile2, dataFile3);
TableFormatSync.getInstance()
.syncSnapshot(Collections.singletonList(conversionTarget), snapshot1);
validateDeltaTable(basePath, new HashSet<>(Arrays.asList(dataFile2)), CombinedExpr);
Expand Down Expand Up @@ -344,7 +348,7 @@ public void testTimestampPartitioning(PartitionTransformType transformType) thro
InternalDataFile dataFile2 = getDataFile(2, partitionValues1, basePath);
InternalDataFile dataFile3 = getDataFile(3, partitionValues2, basePath);

InternalSnapshot snapshot1 = buildSnapshot(table, dataFile1, dataFile2, dataFile3);
InternalSnapshot snapshot1 = buildSnapshot(table, "0", dataFile1, dataFile2, dataFile3);

TableFormatSync.getInstance()
.syncSnapshot(Collections.singletonList(conversionTarget), snapshot1);
Expand Down Expand Up @@ -415,10 +419,6 @@ private void validateDeltaTable(
internalDataFiles.size(), count, "Number of files from DeltaScan don't match expectation");
}

private InternalSnapshot buildSnapshot(InternalTable table, InternalDataFile... dataFiles) {
return buildSnapshot(table, "", dataFiles);
}

private InternalSnapshot buildSnapshot(
InternalTable table, String sourceIdentifier, InternalDataFile... dataFiles) {
return InternalSnapshot.builder()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -287,7 +287,6 @@ void archiveTimelineAndCleanMetadataTableAfterMultipleCommits(String partitionPa
.range(Range.scalar("partitionPath"))
.build()))
.build());

// sync snapshot and metadata
InternalTable initialState = getState(Instant.now().minus(24, ChronoUnit.HOURS));
HudiConversionTarget targetClient = getTargetClient();
Expand Down Expand Up @@ -393,8 +392,8 @@ CONTEXT, getHoodieWriteConfig(metaClient).getMetadataConfig(), tableBasePath, tr
assertTrue(targetIdentifier5.isPresent());

// Case that return empty target identifier
Optional<String> targetIdentifier6 = targetClient.getTargetCommitIdentifier("5", metaClient);
assertFalse(targetIdentifier6.isPresent());
Optional<String> emptyTargetIdentifier = targetClient.getTargetCommitIdentifier("5", metaClient);
assertFalse(emptyTargetIdentifier.isPresent());

// the first commit to the timeline should be archived
assertEquals(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -249,6 +249,7 @@ void syncFilesForSnapshot() {
void beginSyncForExistingTable() {
HoodieTableMetaClient mockMetaClient = mock(HoodieTableMetaClient.class);
HudiConversionTarget targetClient = getTargetClient(mockMetaClient);

targetClient.beginSync(TABLE);
// verify meta client timeline refreshed
verify(mockMetaClient).reloadActiveTimeline();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -257,8 +257,8 @@ public void testCreateSnapshotControlFlow() throws Exception {
assertTrue(targetIdentifier2.isPresent());

// Case that return empty target identifier
Optional<String> targetIdentifier3 = conversionTarget.getTargetCommitIdentifier("3");
assertFalse(targetIdentifier3.isPresent());
Optional<String> emptyTargetIdentifier = conversionTarget.getTargetCommitIdentifier("3");
assertFalse(emptyTargetIdentifier.isPresent());

ArgumentCaptor<Transaction> transactionArgumentCaptor =
ArgumentCaptor.forClass(Transaction.class);
Expand Down Expand Up @@ -332,9 +332,9 @@ public void testIncompleteWriteRollback() throws Exception {
InternalDataFile dataFile2 = getDataFile(2, Collections.emptyList());
InternalDataFile dataFile3 = getDataFile(3, Collections.emptyList());
InternalDataFile dataFile4 = getDataFile(4, Collections.emptyList());
InternalSnapshot snapshot1 = buildSnapshot(table1, dataFile1, dataFile2);
InternalSnapshot snapshot2 = buildSnapshot(table2, dataFile2, dataFile3);
InternalSnapshot snapshot3 = buildSnapshot(table2, dataFile3, dataFile4);
InternalSnapshot snapshot1 = buildSnapshot(table1, "0", dataFile1, dataFile2);
InternalSnapshot snapshot2 = buildSnapshot(table2, "1", dataFile2, dataFile3);
InternalSnapshot snapshot3 = buildSnapshot(table2, "2", dataFile3, dataFile4);
when(mockSchemaExtractor.toIceberg(internalSchema)).thenReturn(icebergSchema);
when(mockSchemaExtractor.toIceberg(schema2)).thenReturn(icebergSchema2);
ArgumentCaptor<Schema> partitionSpecSchemaArgumentCaptor =
Expand Down Expand Up @@ -408,7 +408,7 @@ public void testTimestampPartitioning() throws Exception {
InternalDataFile dataFile1 = getDataFile(1, partitionValues1);
InternalDataFile dataFile2 = getDataFile(2, partitionValues1);
InternalDataFile dataFile3 = getDataFile(3, partitionValues2);
InternalSnapshot snapshot = buildSnapshot(table, dataFile1, dataFile2, dataFile3);
InternalSnapshot snapshot = buildSnapshot(table, "0", dataFile1, dataFile2, dataFile3);

when(mockSchemaExtractor.toIceberg(internalSchema))
.thenReturn(icebergSchema)
Expand Down Expand Up @@ -471,7 +471,7 @@ public void testDatePartitioning() throws Exception {
InternalDataFile dataFile1 = getDataFile(1, partitionValues1);
InternalDataFile dataFile2 = getDataFile(2, partitionValues1);
InternalDataFile dataFile3 = getDataFile(3, partitionValues2);
InternalSnapshot snapshot = buildSnapshot(table, dataFile1, dataFile2, dataFile3);
InternalSnapshot snapshot = buildSnapshot(table, "0", dataFile1, dataFile2, dataFile3);

when(mockSchemaExtractor.toIceberg(internalSchema)).thenReturn(icebergSchema);
PartitionSpec partitionSpec =
Expand Down Expand Up @@ -525,7 +525,7 @@ public void testNumericFieldPartitioning() throws Exception {
InternalDataFile dataFile1 = getDataFile(1, partitionValues1);
InternalDataFile dataFile2 = getDataFile(2, partitionValues1);
InternalDataFile dataFile3 = getDataFile(3, partitionValues2);
InternalSnapshot snapshot = buildSnapshot(table, dataFile1, dataFile2, dataFile3);
InternalSnapshot snapshot = buildSnapshot(table, "0", dataFile1, dataFile2, dataFile3);

when(mockSchemaExtractor.toIceberg(internalSchema)).thenReturn(icebergSchema);
PartitionSpec partitionSpec =
Expand Down Expand Up @@ -600,7 +600,7 @@ public void testMultipleFieldPartitioning() throws Exception {
InternalDataFile dataFile1 = getDataFile(1, partitionValues1);
InternalDataFile dataFile2 = getDataFile(2, partitionValues2);
InternalDataFile dataFile3 = getDataFile(3, partitionValues3);
InternalSnapshot snapshot = buildSnapshot(table, dataFile1, dataFile2, dataFile3);
InternalSnapshot snapshot = buildSnapshot(table, "0", dataFile1, dataFile2, dataFile3);

when(mockSchemaExtractor.toIceberg(internalSchema)).thenReturn(icebergSchema);
PartitionSpec partitionSpec =
Expand Down Expand Up @@ -666,7 +666,7 @@ public void testNestedFieldPartitioning() throws Exception {
InternalDataFile dataFile1 = getDataFile(1, partitionValues1);
InternalDataFile dataFile2 = getDataFile(2, partitionValues1);
InternalDataFile dataFile3 = getDataFile(3, partitionValues2);
InternalSnapshot snapshot = buildSnapshot(table, dataFile1, dataFile2, dataFile3);
InternalSnapshot snapshot = buildSnapshot(table, "0", dataFile1, dataFile2, dataFile3);

when(mockSchemaExtractor.toIceberg(internalSchema)).thenReturn(icebergSchema);
PartitionSpec partitionSpec =
Expand All @@ -691,10 +691,6 @@ public void testNestedFieldPartitioning() throws Exception {
Expressions.equal(partitionField.getSourceField().getPath(), "value1"));
}

private InternalSnapshot buildSnapshot(InternalTable table, InternalDataFile... dataFiles) {
return buildSnapshot(table, "", dataFiles);
}

private InternalSnapshot buildSnapshot(
InternalTable table, String sourceIdentifier, InternalDataFile... dataFiles) {
return InternalSnapshot.builder()
Expand Down

0 comments on commit 4198e5b

Please sign in to comment.