Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Enhancement] Support restore/rollback sync during conversion (1/2) #569

Open
wants to merge 22 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 20 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -47,4 +47,6 @@ public class InternalSnapshot {
List<PartitionFileGroup> partitionedDataFiles;
// pending commits before latest commit on the table.
@Builder.Default List<Instant> pendingCommits = Collections.emptyList();
// commit identifier in source table
@Builder.Default String sourceIdentifier = "";
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,4 +36,7 @@ public class TableChange {

/** The {@link InternalTable} at the commit time to which this table change belongs. */
InternalTable tableAsOfChange;

// Commit identifier in source table
@Builder.Default String sourceIdentifier = "";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this may be safer without a default and require non-null, what do you think?

Similar note for InternalSnapshot

}
Original file line number Diff line number Diff line change
Expand Up @@ -54,10 +54,20 @@ public class TableSyncMetadata {
Instant lastInstantSynced;
List<Instant> instantsToConsiderForNextSync;
int version;
String sourceTableFormat;
String sourceIdentifier;

public static TableSyncMetadata of(
Instant lastInstantSynced, List<Instant> instantsToConsiderForNextSync) {
return new TableSyncMetadata(lastInstantSynced, instantsToConsiderForNextSync, CURRENT_VERSION);
Instant lastInstantSynced,
List<Instant> instantsToConsiderForNextSync,
String sourceTableFormat,
String sourceIdentifier) {
return new TableSyncMetadata(
lastInstantSynced,
instantsToConsiderForNextSync,
CURRENT_VERSION,
sourceTableFormat,
sourceIdentifier);
}

public String toJson() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,4 +78,13 @@ public interface ConversionSource<COMMIT> extends Closeable {
* false.
*/
boolean isIncrementalSyncSafeFrom(Instant instant);

/**
* Extract the identifier of the provided commit, the identifier defined as 1. Snapshot ID in
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can use <ul><li> tags for formatting the docs with the list if you want

* Iceberg 2. Version ID in Delta 3. timestamp in Hudi
*
* @param commit The provided commit
* @return the string version of commit identifier
*/
String getCommitIdentifier(COMMIT commit);
}
Original file line number Diff line number Diff line change
Expand Up @@ -90,4 +90,7 @@ public interface ConversionTarget {

/** Initializes the client with provided configuration */
void init(TargetTable targetTable, Configuration configuration);

/** Return the commit identifier from target table */
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you be more specific here? I am assuming this is the latest commit identifier but it should be more specific if the table can have more than one over time.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you be more specific here? I am assuming this is the latest commit identifier but it should be more specific if the table can have more than one over time.

Do you mean the snapshot sync scenario? Because In incremental sync, we will ensure target and source commit has 1:1 mapping. However, in snapshot sync, we will use the commit id of current (latest) snapshot even though the changes might contains more than one commit.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To ensure we’re aligned, the context of this function is:

It is designed to retrieve the corresponding target commit based on a given source identifier during a sync operation. Here’s the scenario:

Assume:

  • Last completed sync (1 snapshot, 1 incremental) is source commit 3
  • We’re starting the new sync process from source commit 4

Source table commit history:
• 1 (UPDATE)
• 2 (UPDATE)
• 3 (UPDATE)
• 4 (ROLLBACK to 2)
• 5 (UPDATE)
• 6 (UPDATE)
Target table commit history (mapped by source identifiers):
• 1 (mapped to source id 2)
• 2 (mapped to source id 3)

When syncing the ROLLBACK operation (source commit 4), we need to identify the corresponding target commit that aligns with the source identifier 2 (which is 1 in target).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That makes sense to me but the java doc can be a bit more clear. It should also include context on when the Optional will be empty

Optional<String> getTargetCommitIdentifier(String sourceIdentifier);
}
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,8 @@ public Map<String, SyncResult> syncSnapshot(
internalTable,
target -> target.syncFilesForSnapshot(snapshot.getPartitionedDataFiles()),
startTime,
snapshot.getPendingCommits()));
snapshot.getPendingCommits(),
snapshot.getSourceIdentifier()));
} catch (Exception e) {
log.error("Failed to sync snapshot", e);
results.put(
Expand Down Expand Up @@ -121,7 +122,8 @@ public Map<String, List<SyncResult>> syncChanges(
change.getTableAsOfChange(),
target -> target.syncFilesForDiff(change.getFilesDiff()),
startTime,
changes.getPendingCommits()));
changes.getPendingCommits(),
change.getSourceIdentifier()));
} catch (Exception e) {
log.error("Failed to sync table changes", e);
resultsForFormat.add(buildResultForError(SyncMode.INCREMENTAL, startTime, e));
Expand Down Expand Up @@ -149,19 +151,26 @@ private SyncResult getSyncResult(
InternalTable tableState,
SyncFiles fileSyncMethod,
Instant startTime,
List<Instant> pendingCommits) {
List<Instant> pendingCommits,
String sourceIdentifier) {
// initialize the sync
conversionTarget.beginSync(tableState);
// Persist the latest commit time in table properties for incremental syncs
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here We need to move the Metadata set operation earlier because it will be required during the sync() operation in Iceberg (Delta and Hudi only need it in completeSync()

// Syncing metadata must precede the following steps to ensure that the metadata is available
// before committing
TableSyncMetadata latestState =
TableSyncMetadata.of(
tableState.getLatestCommitTime(),
pendingCommits,
tableState.getTableFormat(),
sourceIdentifier);
conversionTarget.syncMetadata(latestState);
// sync schema updates
conversionTarget.syncSchema(tableState.getReadSchema());
// sync partition updates
conversionTarget.syncPartitionSpec(tableState.getPartitioningFields());
// Update the files in the target table
fileSyncMethod.sync(conversionTarget);
// Persist the latest commit time in table properties for incremental syncs.
TableSyncMetadata latestState =
TableSyncMetadata.of(tableState.getLatestCommitTime(), pendingCommits);
conversionTarget.syncMetadata(latestState);
conversionTarget.completeSync();

return SyncResult.builder()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,14 +49,17 @@ private static Stream<Arguments> provideMetadataAndJson() {
Instant.parse("2020-07-04T10:15:30.00Z"),
Arrays.asList(
Instant.parse("2020-08-21T11:15:30.00Z"),
Instant.parse("2024-01-21T12:15:30.00Z"))),
"{\"lastInstantSynced\":\"2020-07-04T10:15:30Z\",\"instantsToConsiderForNextSync\":[\"2020-08-21T11:15:30Z\",\"2024-01-21T12:15:30Z\"],\"version\":0}"),
Instant.parse("2024-01-21T12:15:30.00Z")),
"TEST",
"0"),
"{\"lastInstantSynced\":\"2020-07-04T10:15:30Z\",\"instantsToConsiderForNextSync\":[\"2020-08-21T11:15:30Z\",\"2024-01-21T12:15:30Z\"],\"version\":0,\"sourceTableFormat\":\"TEST\",\"sourceIdentifier\":\"0\"}"),
Arguments.of(
TableSyncMetadata.of(Instant.parse("2020-07-04T10:15:30.00Z"), Collections.emptyList()),
"{\"lastInstantSynced\":\"2020-07-04T10:15:30Z\",\"instantsToConsiderForNextSync\":[],\"version\":0}"),
TableSyncMetadata.of(
Instant.parse("2020-07-04T10:15:30.00Z"), Collections.emptyList(), "TEST", "0"),
"{\"lastInstantSynced\":\"2020-07-04T10:15:30Z\",\"instantsToConsiderForNextSync\":[],\"version\":0,\"sourceTableFormat\":\"TEST\",\"sourceIdentifier\":\"0\"}"),
Arguments.of(
TableSyncMetadata.of(Instant.parse("2020-07-04T10:15:30.00Z"), null),
"{\"lastInstantSynced\":\"2020-07-04T10:15:30Z\",\"version\":0}"));
TableSyncMetadata.of(Instant.parse("2020-07-04T10:15:30.00Z"), null, "TEST", "0"),
"{\"lastInstantSynced\":\"2020-07-04T10:15:30Z\",\"version\":0,\"sourceTableFormat\":\"TEST\",\"sourceIdentifier\":\"0\"}"));
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ void syncSnapshotWithFailureForOneFormat() {
.table(startingTableState)
.partitionedDataFiles(fileGroups)
.pendingCommits(pendingCommitInstants)
.sourceIdentifier("0")
.build();
when(mockConversionTarget1.getTableFormat()).thenReturn(TableFormat.ICEBERG);
when(mockConversionTarget2.getTableFormat()).thenReturn(TableFormat.DELTA);
Expand Down Expand Up @@ -106,7 +107,10 @@ void syncSnapshotWithFailureForOneFormat() {
failureResult.getStatus());

verifyBaseConversionTargetCalls(
mockConversionTarget2, startingTableState, pendingCommitInstants);
mockConversionTarget2,
startingTableState,
pendingCommitInstants,
snapshot.getSourceIdentifier());
verify(mockConversionTarget2).syncFilesForSnapshot(fileGroups);
verify(mockConversionTarget2).completeSync();
verify(mockConversionTarget1, never()).completeSync();
Expand All @@ -124,15 +128,27 @@ void syncChangesWithFailureForOneFormat() {
InternalTable tableState1 = getTableState(1);
DataFilesDiff dataFilesDiff1 = getFilesDiff(1);
TableChange tableChange1 =
TableChange.builder().tableAsOfChange(tableState1).filesDiff(dataFilesDiff1).build();
TableChange.builder()
.tableAsOfChange(tableState1)
.filesDiff(dataFilesDiff1)
.sourceIdentifier("0")
.build();
InternalTable tableState2 = getTableState(2);
DataFilesDiff dataFilesDiff2 = getFilesDiff(2);
TableChange tableChange2 =
TableChange.builder().tableAsOfChange(tableState2).filesDiff(dataFilesDiff2).build();
TableChange.builder()
.tableAsOfChange(tableState2)
.filesDiff(dataFilesDiff2)
.sourceIdentifier("1")
.build();
InternalTable tableState3 = getTableState(3);
DataFilesDiff dataFilesDiff3 = getFilesDiff(3);
TableChange tableChange3 =
TableChange.builder().tableAsOfChange(tableState3).filesDiff(dataFilesDiff3).build();
TableChange.builder()
.tableAsOfChange(tableState3)
.filesDiff(dataFilesDiff3)
.sourceIdentifier("2")
.build();

List<Instant> pendingCommitInstants = Collections.singletonList(Instant.now());
when(mockConversionTarget1.getTableFormat()).thenReturn(TableFormat.ICEBERG);
Expand All @@ -151,10 +167,12 @@ void syncChangesWithFailureForOneFormat() {
Map<ConversionTarget, TableSyncMetadata> conversionTargetWithMetadata = new HashMap<>();
conversionTargetWithMetadata.put(
mockConversionTarget1,
TableSyncMetadata.of(Instant.now().minus(1, ChronoUnit.HOURS), Collections.emptyList()));
TableSyncMetadata.of(
Instant.now().minus(1, ChronoUnit.HOURS), Collections.emptyList(), "TEST", "0"));
conversionTargetWithMetadata.put(
mockConversionTarget2,
TableSyncMetadata.of(Instant.now().minus(1, ChronoUnit.HOURS), Collections.emptyList()));
TableSyncMetadata.of(
Instant.now().minus(1, ChronoUnit.HOURS), Collections.emptyList(), "TEST", "1"));

Map<String, List<SyncResult>> result =
TableFormatSync.getInstance()
Expand Down Expand Up @@ -194,13 +212,29 @@ void syncChangesWithFailureForOneFormat() {
assertSyncResultTimes(successResults.get(i), start);
}

verifyBaseConversionTargetCalls(mockConversionTarget1, tableState1, pendingCommitInstants);
verifyBaseConversionTargetCalls(
mockConversionTarget1,
tableState1,
pendingCommitInstants,
tableChange1.getSourceIdentifier());
verify(mockConversionTarget1).syncFilesForDiff(dataFilesDiff1);
verifyBaseConversionTargetCalls(mockConversionTarget2, tableState1, pendingCommitInstants);
verifyBaseConversionTargetCalls(
mockConversionTarget2,
tableState1,
pendingCommitInstants,
tableChange1.getSourceIdentifier());
verify(mockConversionTarget2).syncFilesForDiff(dataFilesDiff1);
verifyBaseConversionTargetCalls(mockConversionTarget2, tableState2, pendingCommitInstants);
verifyBaseConversionTargetCalls(
mockConversionTarget2,
tableState2,
pendingCommitInstants,
tableChange2.getSourceIdentifier());
verify(mockConversionTarget2).syncFilesForDiff(dataFilesDiff2);
verifyBaseConversionTargetCalls(mockConversionTarget2, tableState3, pendingCommitInstants);
verifyBaseConversionTargetCalls(
mockConversionTarget2,
tableState3,
pendingCommitInstants,
tableChange3.getSourceIdentifier());
verify(mockConversionTarget2).syncFilesForDiff(dataFilesDiff3);
verify(mockConversionTarget1, times(1)).completeSync();
verify(mockConversionTarget2, times(3)).completeSync();
Expand All @@ -212,15 +246,27 @@ void syncChangesWithDifferentFormatsAndMetadata() {
InternalTable tableState1 = getTableState(1);
DataFilesDiff dataFilesDiff1 = getFilesDiff(1);
TableChange tableChange1 =
TableChange.builder().tableAsOfChange(tableState1).filesDiff(dataFilesDiff1).build();
TableChange.builder()
.tableAsOfChange(tableState1)
.filesDiff(dataFilesDiff1)
.sourceIdentifier("0")
.build();
InternalTable tableState2 = getTableState(2);
DataFilesDiff dataFilesDiff2 = getFilesDiff(2);
TableChange tableChange2 =
TableChange.builder().tableAsOfChange(tableState2).filesDiff(dataFilesDiff2).build();
TableChange.builder()
.tableAsOfChange(tableState2)
.filesDiff(dataFilesDiff2)
.sourceIdentifier("1")
.build();
InternalTable tableState3 = getTableState(3);
DataFilesDiff dataFilesDiff3 = getFilesDiff(3);
TableChange tableChange3 =
TableChange.builder().tableAsOfChange(tableState3).filesDiff(dataFilesDiff3).build();
TableChange.builder()
.tableAsOfChange(tableState3)
.filesDiff(dataFilesDiff3)
.sourceIdentifier("2")
.build();

List<Instant> pendingCommitInstants = Collections.singletonList(Instant.now());
when(mockConversionTarget1.getTableFormat()).thenReturn(TableFormat.ICEBERG);
Expand All @@ -240,12 +286,17 @@ void syncChangesWithDifferentFormatsAndMetadata() {
mockConversionTarget1,
TableSyncMetadata.of(
tableChange2.getTableAsOfChange().getLatestCommitTime(),
Collections.singletonList(tableChange1.getTableAsOfChange().getLatestCommitTime())));
Collections.singletonList(tableChange1.getTableAsOfChange().getLatestCommitTime()),
"TEST",
tableChange2.getSourceIdentifier()));
// mockConversionTarget2 will have synced the first table change previously
conversionTargetWithMetadata.put(
mockConversionTarget2,
TableSyncMetadata.of(
tableChange1.getTableAsOfChange().getLatestCommitTime(), Collections.emptyList()));
tableChange1.getTableAsOfChange().getLatestCommitTime(),
Collections.emptyList(),
"TEST",
tableChange1.getSourceIdentifier()));

Map<String, List<SyncResult>> result =
TableFormatSync.getInstance()
Expand Down Expand Up @@ -280,15 +331,31 @@ void syncChangesWithDifferentFormatsAndMetadata() {
}

// conversionTarget1 syncs table changes 1 and 3
verifyBaseConversionTargetCalls(mockConversionTarget1, tableState1, pendingCommitInstants);
verifyBaseConversionTargetCalls(
mockConversionTarget1,
tableState1,
pendingCommitInstants,
tableChange1.getSourceIdentifier());
verify(mockConversionTarget1).syncFilesForDiff(dataFilesDiff1);
verifyBaseConversionTargetCalls(mockConversionTarget1, tableState3, pendingCommitInstants);
verifyBaseConversionTargetCalls(
mockConversionTarget1,
tableState3,
pendingCommitInstants,
tableChange3.getSourceIdentifier());
verify(mockConversionTarget1).syncFilesForDiff(dataFilesDiff3);
verify(mockConversionTarget1, times(2)).completeSync();
// conversionTarget2 syncs table changes 2 and 3
verifyBaseConversionTargetCalls(mockConversionTarget2, tableState2, pendingCommitInstants);
verifyBaseConversionTargetCalls(
mockConversionTarget2,
tableState2,
pendingCommitInstants,
tableChange2.getSourceIdentifier());
verify(mockConversionTarget2).syncFilesForDiff(dataFilesDiff2);
verifyBaseConversionTargetCalls(mockConversionTarget2, tableState3, pendingCommitInstants);
verifyBaseConversionTargetCalls(
mockConversionTarget2,
tableState3,
pendingCommitInstants,
tableChange3.getSourceIdentifier());
verify(mockConversionTarget2).syncFilesForDiff(dataFilesDiff3);
verify(mockConversionTarget2, times(2)).completeSync();
}
Expand All @@ -299,7 +366,11 @@ void syncChangesOneFormatWithNoRequiredChanges() {
InternalTable tableState1 = getTableState(1);
DataFilesDiff dataFilesDiff1 = getFilesDiff(1);
TableChange tableChange1 =
TableChange.builder().tableAsOfChange(tableState1).filesDiff(dataFilesDiff1).build();
TableChange.builder()
.tableAsOfChange(tableState1)
.filesDiff(dataFilesDiff1)
.sourceIdentifier("0")
.build();

List<Instant> pendingCommitInstants = Collections.emptyList();
when(mockConversionTarget1.getTableFormat()).thenReturn(TableFormat.ICEBERG);
Expand All @@ -315,11 +386,13 @@ void syncChangesOneFormatWithNoRequiredChanges() {
Map<ConversionTarget, TableSyncMetadata> conversionTargetWithMetadata = new HashMap<>();
// mockConversionTarget1 will have nothing to sync
conversionTargetWithMetadata.put(
mockConversionTarget1, TableSyncMetadata.of(Instant.now(), Collections.emptyList()));
mockConversionTarget1,
TableSyncMetadata.of(Instant.now(), Collections.emptyList(), "TEST", "0"));
// mockConversionTarget2 will have synced the first table change previously
conversionTargetWithMetadata.put(
mockConversionTarget2,
TableSyncMetadata.of(Instant.now().minus(1, ChronoUnit.HOURS), Collections.emptyList()));
TableSyncMetadata.of(
Instant.now().minus(1, ChronoUnit.HOURS), Collections.emptyList(), "TEST", "1"));

Map<String, List<SyncResult>> result =
TableFormatSync.getInstance()
Expand All @@ -338,7 +411,11 @@ void syncChangesOneFormatWithNoRequiredChanges() {
verify(mockConversionTarget1, never()).syncFilesForDiff(any());
verify(mockConversionTarget1, never()).completeSync();

verifyBaseConversionTargetCalls(mockConversionTarget2, tableState1, pendingCommitInstants);
verifyBaseConversionTargetCalls(
mockConversionTarget2,
tableState1,
pendingCommitInstants,
tableChange1.getSourceIdentifier());
verify(mockConversionTarget2).syncFilesForDiff(dataFilesDiff1);
}

Expand Down Expand Up @@ -379,12 +456,17 @@ private DataFilesDiff getFilesDiff(int id) {
private void verifyBaseConversionTargetCalls(
ConversionTarget mockConversionTarget,
InternalTable startingTableState,
List<Instant> pendingCommitInstants) {
List<Instant> pendingCommitInstants,
String sourceIdentifier) {
verify(mockConversionTarget).beginSync(startingTableState);
verify(mockConversionTarget).syncSchema(startingTableState.getReadSchema());
verify(mockConversionTarget).syncPartitionSpec(startingTableState.getPartitioningFields());
verify(mockConversionTarget)
.syncMetadata(
TableSyncMetadata.of(startingTableState.getLatestCommitTime(), pendingCommitInstants));
TableSyncMetadata.of(
startingTableState.getLatestCommitTime(),
pendingCommitInstants,
startingTableState.getTableFormat(),
sourceIdentifier));
}
}
Loading
Loading