Skip to content

Commit

Permalink
Add a source identifier in target table transaction
Browse files Browse the repository at this point in the history
  • Loading branch information
danielhumanmod committed Oct 27, 2024
1 parent 6816a83 commit e0a8710
Show file tree
Hide file tree
Showing 9 changed files with 48 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -33,4 +33,5 @@ public class IncrementalTableChanges {
Iterator<TableChange> tableChanges;
// pending commits before latest commit(write) on the table.
@Builder.Default List<Instant> pendingCommits = Collections.emptyList();
@Builder.Default String sourceIdentifier = "";
}
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,9 @@
public class InternalSnapshot {
// The instant of the Snapshot
String version;
// The source table snapshot identifier
// Snapshot ID in Iceberg, version ID in Delta, and instant <timestamp_action> in Hudi
String sourceIdentifier;
// Table reference
InternalTable table;
// Data files grouped by partition
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,10 +54,14 @@ public class TableSyncMetadata {
Instant lastInstantSynced;
List<Instant> instantsToConsiderForNextSync;
int version;
String sourceIdentifier;

public static TableSyncMetadata of(
Instant lastInstantSynced, List<Instant> instantsToConsiderForNextSync) {
return new TableSyncMetadata(lastInstantSynced, instantsToConsiderForNextSync, CURRENT_VERSION);
Instant lastInstantSynced,
List<Instant> instantsToConsiderForNextSync,
String sourceIdentifier) {
return new TableSyncMetadata(
lastInstantSynced, instantsToConsiderForNextSync, CURRENT_VERSION, sourceIdentifier);
}

public String toJson() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,4 +78,13 @@ public interface ConversionSource<COMMIT> extends Closeable {
* false.
*/
boolean isIncrementalSyncSafeFrom(Instant instant);

/**
* Extract the identifier of the provided commit, the identifier defined as 1. Snapshot ID in
* Iceberg 2. Version ID in Delta 3. <timestamp_action> in Hudi
*
* @param commit The provided commit
* @return the string version of commit identifier
*/
String getCommitIdentifier(COMMIT commit);
}
Original file line number Diff line number Diff line change
Expand Up @@ -47,9 +47,12 @@ public IncrementalTableChanges extractTableChanges(
commitsBacklog.getCommitsToProcess().stream()
.map(conversionSource::getTableChangeForCommit)
.iterator();
COMMIT lastCommit =
commitsBacklog.getCommitsToProcess().get(commitsBacklog.getCommitsToProcess().size() - 1);
return IncrementalTableChanges.builder()
.tableChanges(tableChangeIterator)
.pendingCommits(commitsBacklog.getInFlightInstants())
.sourceIdentifier(conversionSource.getCommitIdentifier(lastCommit))
.build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,8 @@ public Map<String, SyncResult> syncSnapshot(
internalTable,
target -> target.syncFilesForSnapshot(snapshot.getPartitionedDataFiles()),
startTime,
snapshot.getPendingCommits()));
snapshot.getPendingCommits(),
snapshot.getSourceIdentifier()));
} catch (Exception e) {
log.error("Failed to sync snapshot", e);
results.put(
Expand Down Expand Up @@ -121,7 +122,8 @@ public Map<String, List<SyncResult>> syncChanges(
change.getTableAsOfChange(),
target -> target.syncFilesForDiff(change.getFilesDiff()),
startTime,
changes.getPendingCommits()));
changes.getPendingCommits(),
changes.getSourceIdentifier()));
} catch (Exception e) {
log.error("Failed to sync table changes", e);
resultsForFormat.add(buildResultForError(SyncMode.INCREMENTAL, startTime, e));
Expand Down Expand Up @@ -149,7 +151,8 @@ private SyncResult getSyncResult(
InternalTable tableState,
SyncFiles fileSyncMethod,
Instant startTime,
List<Instant> pendingCommits) {
List<Instant> pendingCommits,
String sourceIdentifier) {
// initialize the sync
conversionTarget.beginSync(tableState);
// sync schema updates
Expand All @@ -160,7 +163,7 @@ private SyncResult getSyncResult(
fileSyncMethod.sync(conversionTarget);
// Persist the latest commit time in table properties for incremental syncs.
TableSyncMetadata latestState =
TableSyncMetadata.of(tableState.getLatestCommitTime(), pendingCommits);
TableSyncMetadata.of(tableState.getLatestCommitTime(), pendingCommits, sourceIdentifier);
conversionTarget.syncMetadata(latestState);
conversionTarget.completeSync();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ public InternalSnapshot getCurrentSnapshot() {
return InternalSnapshot.builder()
.table(table)
.partitionedDataFiles(getInternalDataFiles(snapshot, table.getReadSchema()))
.sourceIdentifier(String.valueOf(snapshot.version()))
.build();
}

Expand Down Expand Up @@ -158,6 +159,11 @@ public boolean isIncrementalSyncSafeFrom(Instant instant) {
return deltaCommitInstant.equals(instant) || deltaCommitInstant.isBefore(instant);
}

@Override
public String getCommitIdentifier(Long commit) {
return String.valueOf(commit);
}

private DeltaIncrementalChangesState getChangesState() {
return deltaIncrementalChangesState.orElseThrow(
() -> new IllegalStateException("DeltaIncrementalChangesState is not initialized"));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ public InternalSnapshot getCurrentSnapshot() {
.findInstantsBefore(latestCommit.getTimestamp())
.getInstants();
InternalTable table = getTable(latestCommit);
HoodieInstant lastPendingInstant = pendingInstants.get(pendingInstants.size() - 1);
return InternalSnapshot.builder()
.table(table)
.partitionedDataFiles(dataFileExtractor.getFilesCurrentState(table))
Expand All @@ -101,6 +102,7 @@ public InternalSnapshot getCurrentSnapshot() {
hoodieInstant ->
HudiInstantUtils.parseFromInstantTime(hoodieInstant.getTimestamp()))
.collect(CustomCollectors.toList(pendingInstants.size())))
.sourceIdentifier(lastPendingInstant.getTimestamp() + "_" + latestCommit.getAction())
.build();
}

Expand Down Expand Up @@ -148,6 +150,11 @@ public boolean isIncrementalSyncSafeFrom(Instant instant) {
return doesCommitExistsAsOfInstant(instant) && !isAffectedByCleanupProcess(instant);
}

@Override
public String getCommitIdentifier(HoodieInstant commit) {
return commit.getTimestamp() + "_" + commit.getAction();
}

private boolean doesCommitExistsAsOfInstant(Instant instant) {
HoodieInstant hoodieInstant = getCommitAtInstant(instant);
return hoodieInstant != null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,7 @@ public InternalSnapshot getCurrentSnapshot() {
.version(String.valueOf(currentSnapshot.snapshotId()))
.table(irTable)
.partitionedDataFiles(partitionedDataFiles)
.sourceIdentifier(String.valueOf(currentSnapshot.snapshotId()))
.build();
}

Expand Down Expand Up @@ -257,6 +258,11 @@ public boolean isIncrementalSyncSafeFrom(Instant instant) {
return true;
}

@Override
public String getCommitIdentifier(Snapshot commit) {
return String.valueOf(commit.snapshotId());
}

@Override
public void close() {
getTableOps().close();
Expand Down

0 comments on commit e0a8710

Please sign in to comment.