Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Enhancement] Support restore/rollback sync during conversion (1/2) #569

Open
wants to merge 29 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 14 commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
e0a8710
Add a source identifier in target table transaction
danielhumanmod Oct 27, 2024
9bc8256
detect rollback from source and target table
danielhumanmod Oct 27, 2024
76e0bc0
boundary safe
danielhumanmod Oct 28, 2024
45b279a
fix existing tests
danielhumanmod Oct 28, 2024
8ac5717
support commit level info
danielhumanmod Nov 22, 2024
53a58b6
remove redundant changes
danielhumanmod Nov 22, 2024
bb5ed78
simplify source id generation
danielhumanmod Nov 22, 2024
9b8b71e
remove rollback detection in this PR
danielhumanmod Nov 22, 2024
252ca2c
format
danielhumanmod Nov 22, 2024
d9cb5fd
avoid hard code
danielhumanmod Nov 22, 2024
c8dde9e
format
danielhumanmod Nov 22, 2024
bfbb78f
use timestamp as source identifier for Hudi
danielhumanmod Nov 22, 2024
002d91b
optimize early termination logic
danielhumanmod Nov 24, 2024
bf5f0a0
source-target identifier mapping test
danielhumanmod Nov 24, 2024
56fe85e
introduce source metadata
danielhumanmod Dec 4, 2024
a40cd6c
make some methods private
danielhumanmod Dec 11, 2024
1e39d1a
make xtable metadata commit level in delta and iceberg
danielhumanmod Dec 13, 2024
0af0a82
simplify hudi metadata operation
danielhumanmod Dec 13, 2024
82b2d2d
refactor get target commit process
danielhumanmod Dec 13, 2024
7ffda25
format
danielhumanmod Dec 13, 2024
4198e5b
improve doc and test
danielhumanmod Dec 14, 2024
b181a07
format
danielhumanmod Dec 14, 2024
4ca9c9a
Merge branch 'main' of https://github.com/danielhumanmod/incubator-xt…
danielhumanmod Jan 23, 2025
9552679
fix compile error
danielhumanmod Jan 23, 2025
5fe489e
make sourceIdentifier nonnull without default value
danielhumanmod Jan 25, 2025
42c804b
keep existing table-level metadata
danielhumanmod Jan 25, 2025
e470e98
standard way for doc
danielhumanmod Jan 25, 2025
2033c3a
complete tests after making sourceIdentifier nonnull
danielhumanmod Jan 25, 2025
edb99fd
using TimelineUtils#getCommitMetadata to extract commit metadata
danielhumanmod Jan 25, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,13 @@
@Value
@Builder
public class InternalSnapshot {
public static final String DEFAULT_IDENTIFIER = "";

// The instant of the Snapshot
String version;
// The source table snapshot identifier
// Snapshot ID in Iceberg, version ID in Delta, and instant <timestamp_action> in Hudi
@Builder.Default String sourceIdentifier = DEFAULT_IDENTIFIER;
// Table reference
InternalTable table;
// Data files grouped by partition
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,4 +36,6 @@ public class TableChange {

/** The {@link InternalTable} at the commit time to which this table change belongs. */
InternalTable tableAsOfChange;

String sourceIdentifier;
}
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@ public class TableSyncMetadata {
/** Property name for the XTABLE metadata in the table metadata/properties */
public static final String XTABLE_METADATA = "XTABLE_METADATA";

public static final String XTABLE_SOURCE_IDENTIFIER = "XTABLE_SOURCE_IDENTIFIER";

Instant lastInstantSynced;
List<Instant> instantsToConsiderForNextSync;
int version;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,4 +78,13 @@ public interface ConversionSource<COMMIT> extends Closeable {
* false.
*/
boolean isIncrementalSyncSafeFrom(Instant instant);

/**
* Extract the identifier of the provided commit, the identifier defined as 1. Snapshot ID in
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can use <ul><li> tags for formatting the docs with the list if you want

* Iceberg 2. Version ID in Delta 3. timestamp in Hudi
*
* @param commit The provided commit
* @return the string version of commit identifier
*/
String getCommitIdentifier(COMMIT commit);
}
Original file line number Diff line number Diff line change
Expand Up @@ -76,8 +76,9 @@ public interface ConversionTarget {
* Starts the sync and performs any initialization required
*
* @param table the table that will be synced
* @param sourceIdentifier
*/
void beginSync(InternalTable table);
void beginSync(InternalTable table, String sourceIdentifier);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am wondering if it makes sense to attach the source format as part of the identifier. Then we can also have some metadata about what the writer format was.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am wondering if it makes sense to attach the source format as part of the identifier. Then we can also have some metadata about what the writer format was.

Good idea. I create a SourceMetadata as a commit-level information attached in target table commit so that It would be much easier if we want add more information in the future


/** Completes the sync and performs any cleanup required. */
void completeSync();
Expand All @@ -90,4 +91,7 @@ public interface ConversionTarget {

/** Initializes the client with provided configuration */
void init(TargetTable targetTable, Configuration configuration);

/** Return the commit identifier from target table */
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you be more specific here? I am assuming this is the latest commit identifier but it should be more specific if the table can have more than one over time.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you be more specific here? I am assuming this is the latest commit identifier but it should be more specific if the table can have more than one over time.

Do you mean the snapshot sync scenario? Because In incremental sync, we will ensure target and source commit has 1:1 mapping. However, in snapshot sync, we will use the commit id of current (latest) snapshot even though the changes might contains more than one commit.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To ensure we’re aligned, the context of this function is:

It is designed to retrieve the corresponding target commit based on a given source identifier during a sync operation. Here’s the scenario:

Assume:

  • Last completed sync (1 snapshot, 1 incremental) is source commit 3
  • We’re starting the new sync process from source commit 4

Source table commit history:
• 1 (UPDATE)
• 2 (UPDATE)
• 3 (UPDATE)
• 4 (ROLLBACK to 2)
• 5 (UPDATE)
• 6 (UPDATE)
Target table commit history (mapped by source identifiers):
• 1 (mapped to source id 2)
• 2 (mapped to source id 3)

When syncing the ROLLBACK operation (source commit 4), we need to identify the corresponding target commit that aligns with the source identifier 2 (which is 1 in target).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That makes sense to me but the java doc can be a bit more clear. It should also include context on when the Optional will be empty

Optional<String> getTargetCommitIdentifier(String sourceIdentifier);
}
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,8 @@ public Map<String, SyncResult> syncSnapshot(
internalTable,
target -> target.syncFilesForSnapshot(snapshot.getPartitionedDataFiles()),
startTime,
snapshot.getPendingCommits()));
snapshot.getPendingCommits(),
snapshot.getSourceIdentifier()));
} catch (Exception e) {
log.error("Failed to sync snapshot", e);
results.put(
Expand Down Expand Up @@ -121,7 +122,8 @@ public Map<String, List<SyncResult>> syncChanges(
change.getTableAsOfChange(),
target -> target.syncFilesForDiff(change.getFilesDiff()),
startTime,
changes.getPendingCommits()));
changes.getPendingCommits(),
change.getSourceIdentifier()));
} catch (Exception e) {
log.error("Failed to sync table changes", e);
resultsForFormat.add(buildResultForError(SyncMode.INCREMENTAL, startTime, e));
Expand Down Expand Up @@ -149,9 +151,10 @@ private SyncResult getSyncResult(
InternalTable tableState,
SyncFiles fileSyncMethod,
Instant startTime,
List<Instant> pendingCommits) {
List<Instant> pendingCommits,
String sourceIdentifier) {
// initialize the sync
conversionTarget.beginSync(tableState);
conversionTarget.beginSync(tableState, sourceIdentifier);
// sync schema updates
conversionTarget.syncSchema(tableState.getReadSchema());
// sync partition updates
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,12 +76,13 @@ void syncSnapshotWithFailureForOneFormat() {
.table(startingTableState)
.partitionedDataFiles(fileGroups)
.pendingCommits(pendingCommitInstants)
.sourceIdentifier("1")
.build();
when(mockConversionTarget1.getTableFormat()).thenReturn(TableFormat.ICEBERG);
when(mockConversionTarget2.getTableFormat()).thenReturn(TableFormat.DELTA);
doThrow(new RuntimeException("Failure"))
.when(mockConversionTarget1)
.beginSync(startingTableState);
.beginSync(startingTableState, snapshot.getSourceIdentifier());
Map<String, SyncResult> result =
TableFormatSync.getInstance()
.syncSnapshot(Arrays.asList(mockConversionTarget1, mockConversionTarget2), snapshot);
Expand All @@ -106,7 +107,10 @@ void syncSnapshotWithFailureForOneFormat() {
failureResult.getStatus());

verifyBaseConversionTargetCalls(
mockConversionTarget2, startingTableState, pendingCommitInstants);
mockConversionTarget2,
startingTableState,
pendingCommitInstants,
snapshot.getSourceIdentifier());
verify(mockConversionTarget2).syncFilesForSnapshot(fileGroups);
verify(mockConversionTarget2).completeSync();
verify(mockConversionTarget1, never()).completeSync();
Expand All @@ -124,22 +128,36 @@ void syncChangesWithFailureForOneFormat() {
InternalTable tableState1 = getTableState(1);
DataFilesDiff dataFilesDiff1 = getFilesDiff(1);
TableChange tableChange1 =
TableChange.builder().tableAsOfChange(tableState1).filesDiff(dataFilesDiff1).build();
TableChange.builder()
.tableAsOfChange(tableState1)
.filesDiff(dataFilesDiff1)
.sourceIdentifier("1")
.build();
InternalTable tableState2 = getTableState(2);
DataFilesDiff dataFilesDiff2 = getFilesDiff(2);
TableChange tableChange2 =
TableChange.builder().tableAsOfChange(tableState2).filesDiff(dataFilesDiff2).build();
TableChange.builder()
.tableAsOfChange(tableState2)
.filesDiff(dataFilesDiff2)
.sourceIdentifier("2")
.build();
InternalTable tableState3 = getTableState(3);
DataFilesDiff dataFilesDiff3 = getFilesDiff(3);
TableChange tableChange3 =
TableChange.builder().tableAsOfChange(tableState3).filesDiff(dataFilesDiff3).build();
TableChange.builder()
.tableAsOfChange(tableState3)
.filesDiff(dataFilesDiff3)
.sourceIdentifier("3")
.build();

List<Instant> pendingCommitInstants = Collections.singletonList(Instant.now());
when(mockConversionTarget1.getTableFormat()).thenReturn(TableFormat.ICEBERG);
when(mockConversionTarget2.getTableFormat()).thenReturn(TableFormat.DELTA);
// throw exception on second change and show that first change is still returned for this format
// and other conversionTarget is not affected
doThrow(new RuntimeException("Failure")).when(mockConversionTarget1).beginSync(tableState2);
doThrow(new RuntimeException("Failure"))
.when(mockConversionTarget1)
.beginSync(tableState2, tableChange2.getSourceIdentifier());

List<TableChange> tableChanges = Arrays.asList(tableChange1, tableChange2, tableChange3);
IncrementalTableChanges incrementalTableChanges =
Expand Down Expand Up @@ -194,13 +212,29 @@ void syncChangesWithFailureForOneFormat() {
assertSyncResultTimes(successResults.get(i), start);
}

verifyBaseConversionTargetCalls(mockConversionTarget1, tableState1, pendingCommitInstants);
verifyBaseConversionTargetCalls(
mockConversionTarget1,
tableState1,
pendingCommitInstants,
tableChange1.getSourceIdentifier());
verify(mockConversionTarget1).syncFilesForDiff(dataFilesDiff1);
verifyBaseConversionTargetCalls(mockConversionTarget2, tableState1, pendingCommitInstants);
verifyBaseConversionTargetCalls(
mockConversionTarget2,
tableState1,
pendingCommitInstants,
tableChange1.getSourceIdentifier());
verify(mockConversionTarget2).syncFilesForDiff(dataFilesDiff1);
verifyBaseConversionTargetCalls(mockConversionTarget2, tableState2, pendingCommitInstants);
verifyBaseConversionTargetCalls(
mockConversionTarget2,
tableState2,
pendingCommitInstants,
tableChange2.getSourceIdentifier());
verify(mockConversionTarget2).syncFilesForDiff(dataFilesDiff2);
verifyBaseConversionTargetCalls(mockConversionTarget2, tableState3, pendingCommitInstants);
verifyBaseConversionTargetCalls(
mockConversionTarget2,
tableState3,
pendingCommitInstants,
tableChange3.getSourceIdentifier());
verify(mockConversionTarget2).syncFilesForDiff(dataFilesDiff3);
verify(mockConversionTarget1, times(1)).completeSync();
verify(mockConversionTarget2, times(3)).completeSync();
Expand Down Expand Up @@ -280,15 +314,31 @@ void syncChangesWithDifferentFormatsAndMetadata() {
}

// conversionTarget1 syncs table changes 1 and 3
verifyBaseConversionTargetCalls(mockConversionTarget1, tableState1, pendingCommitInstants);
verifyBaseConversionTargetCalls(
mockConversionTarget1,
tableState1,
pendingCommitInstants,
tableChange1.getSourceIdentifier());
verify(mockConversionTarget1).syncFilesForDiff(dataFilesDiff1);
verifyBaseConversionTargetCalls(mockConversionTarget1, tableState3, pendingCommitInstants);
verifyBaseConversionTargetCalls(
mockConversionTarget1,
tableState3,
pendingCommitInstants,
tableChange3.getSourceIdentifier());
verify(mockConversionTarget1).syncFilesForDiff(dataFilesDiff3);
verify(mockConversionTarget1, times(2)).completeSync();
// conversionTarget2 syncs table changes 2 and 3
verifyBaseConversionTargetCalls(mockConversionTarget2, tableState2, pendingCommitInstants);
verifyBaseConversionTargetCalls(
mockConversionTarget2,
tableState2,
pendingCommitInstants,
tableChange2.getSourceIdentifier());
verify(mockConversionTarget2).syncFilesForDiff(dataFilesDiff2);
verifyBaseConversionTargetCalls(mockConversionTarget2, tableState3, pendingCommitInstants);
verifyBaseConversionTargetCalls(
mockConversionTarget2,
tableState3,
pendingCommitInstants,
tableChange3.getSourceIdentifier());
verify(mockConversionTarget2).syncFilesForDiff(dataFilesDiff3);
verify(mockConversionTarget2, times(2)).completeSync();
}
Expand All @@ -299,7 +349,11 @@ void syncChangesOneFormatWithNoRequiredChanges() {
InternalTable tableState1 = getTableState(1);
DataFilesDiff dataFilesDiff1 = getFilesDiff(1);
TableChange tableChange1 =
TableChange.builder().tableAsOfChange(tableState1).filesDiff(dataFilesDiff1).build();
TableChange.builder()
.tableAsOfChange(tableState1)
.filesDiff(dataFilesDiff1)
.sourceIdentifier("1")
.build();

List<Instant> pendingCommitInstants = Collections.emptyList();
when(mockConversionTarget1.getTableFormat()).thenReturn(TableFormat.ICEBERG);
Expand Down Expand Up @@ -334,11 +388,15 @@ void syncChangesOneFormatWithNoRequiredChanges() {
assertSyncResultTimes(syncResult, start);
});

verify(mockConversionTarget1, never()).beginSync(any());
verify(mockConversionTarget1, never()).beginSync(any(), any());
verify(mockConversionTarget1, never()).syncFilesForDiff(any());
verify(mockConversionTarget1, never()).completeSync();

verifyBaseConversionTargetCalls(mockConversionTarget2, tableState1, pendingCommitInstants);
verifyBaseConversionTargetCalls(
mockConversionTarget2,
tableState1,
pendingCommitInstants,
tableChange1.getSourceIdentifier());
verify(mockConversionTarget2).syncFilesForDiff(dataFilesDiff1);
}

Expand Down Expand Up @@ -379,8 +437,9 @@ private DataFilesDiff getFilesDiff(int id) {
private void verifyBaseConversionTargetCalls(
ConversionTarget mockConversionTarget,
InternalTable startingTableState,
List<Instant> pendingCommitInstants) {
verify(mockConversionTarget).beginSync(startingTableState);
List<Instant> pendingCommitInstants,
String sourceIdentifier) {
verify(mockConversionTarget).beginSync(startingTableState, sourceIdentifier);
verify(mockConversionTarget).syncSchema(startingTableState.getReadSchema());
verify(mockConversionTarget).syncPartitionSpec(startingTableState.getPartitioningFields());
verify(mockConversionTarget)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ public InternalSnapshot getCurrentSnapshot() {
return InternalSnapshot.builder()
.table(table)
.partitionedDataFiles(getInternalDataFiles(snapshot, table.getReadSchema()))
.sourceIdentifier(getCommitIdentifier(snapshot.version()))
.build();
}

Expand Down Expand Up @@ -125,7 +126,11 @@ public TableChange getTableChangeForCommit(Long versionNumber) {
}
DataFilesDiff dataFilesDiff =
DataFilesDiff.builder().filesAdded(addedFiles).filesRemoved(removedFiles).build();
return TableChange.builder().tableAsOfChange(tableAtVersion).filesDiff(dataFilesDiff).build();
return TableChange.builder()
.tableAsOfChange(tableAtVersion)
.filesDiff(dataFilesDiff)
.sourceIdentifier(getCommitIdentifier(versionNumber))
.build();
}

@Override
Expand Down Expand Up @@ -158,6 +163,11 @@ public boolean isIncrementalSyncSafeFrom(Instant instant) {
return deltaCommitInstant.equals(instant) || deltaCommitInstant.isBefore(instant);
}

@Override
public String getCommitIdentifier(Long commit) {
return String.valueOf(commit);
}

private DeltaIncrementalChangesState getChangesState() {
return deltaIncrementalChangesState.orElseThrow(
() -> new IllegalStateException("DeltaIncrementalChangesState is not initialized"));
Expand Down
Loading