Skip to content

Commit

Permalink
detect rollback from source and target table
Browse files Browse the repository at this point in the history
  • Loading branch information
danielhumanmod committed Oct 27, 2024
1 parent e0a8710 commit 9bc8256
Show file tree
Hide file tree
Showing 10 changed files with 181 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
@AllArgsConstructor(access = AccessLevel.PRIVATE)
public class TableSyncMetadata {
private static final int CURRENT_VERSION = 0;
private static final String DEFAULT_IDENTIFIER = "";
private static final ObjectMapper MAPPER =
new ObjectMapper()
.registerModule(new JavaTimeModule())
Expand All @@ -56,6 +57,13 @@ public class TableSyncMetadata {
int version;
String sourceIdentifier;

public static TableSyncMetadata of(
Instant lastInstantSynced,
List<Instant> instantsToConsiderForNextSync) {
return new TableSyncMetadata(
lastInstantSynced, instantsToConsiderForNextSync, CURRENT_VERSION, DEFAULT_IDENTIFIER);
}

public static TableSyncMetadata of(
Instant lastInstantSynced,
List<Instant> instantsToConsiderForNextSync,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,14 @@

import java.io.Closeable;
import java.time.Instant;
import java.util.Optional;

import org.apache.xtable.model.CommitsBacklog;
import org.apache.xtable.model.InstantsForIncrementalSync;
import org.apache.xtable.model.InternalSnapshot;
import org.apache.xtable.model.InternalTable;
import org.apache.xtable.model.TableChange;
import org.apache.xtable.model.metadata.TableSyncMetadata;

/**
* A client that provides the major functionality for extracting the state at a given instant in a
Expand Down Expand Up @@ -65,6 +67,15 @@ public interface ConversionSource<COMMIT> extends Closeable {
*/
CommitsBacklog<COMMIT> getCommitsBacklog(InstantsForIncrementalSync instantsForIncrementalSync);

/**
* Extracts the rollback snapshot as {@link InternalSnapshot} from the changes since the last sync
* of the source table.
*
* @param lastSyncMetadata The last sync metadata
* @return {@link InternalSnapshot} represent the rollback snapshot
*/
Optional<InternalSnapshot> getRollbackSnapshot(TableSyncMetadata lastSyncMetadata);

/**
* Determines whether an incremental sync is safe from a given instant. This method checks for a
* couple of things: the existence of a commit at or before the provided instant and whether the
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.xtable.spi.extractor;

import java.util.Iterator;
import java.util.Optional;

import lombok.AllArgsConstructor;
import lombok.Getter;
Expand All @@ -28,6 +29,7 @@
import org.apache.xtable.model.InstantsForIncrementalSync;
import org.apache.xtable.model.InternalSnapshot;
import org.apache.xtable.model.TableChange;
import org.apache.xtable.model.metadata.TableSyncMetadata;

@AllArgsConstructor(staticName = "of")
@Getter
Expand Down Expand Up @@ -55,4 +57,8 @@ public IncrementalTableChanges extractTableChanges(
.sourceIdentifier(conversionSource.getCommitIdentifier(lastCommit))
.build();
}

public Optional<InternalSnapshot> extractRollbackSnapshot(TableSyncMetadata lastSyncMetadata) {
return conversionSource.getRollbackSnapshot(lastSyncMetadata);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -90,4 +90,7 @@ public interface ConversionTarget {

/** Initializes the client with provided configuration */
void init(TargetTable targetTable, Configuration configuration);

/** Return the commit identifier from target table */
Optional<String> getTargetCommitIdentifier(String sourceIdentifier);
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@

import org.apache.spark.sql.SparkSession;

import org.apache.spark.sql.delta.DeltaHistory;
import org.apache.spark.sql.delta.DeltaHistoryManager;
import org.apache.spark.sql.delta.DeltaLog;
import org.apache.spark.sql.delta.Snapshot;
Expand All @@ -39,6 +40,7 @@
import org.apache.spark.sql.delta.actions.RemoveFile;

import scala.Option;
import scala.collection.Seq;

import io.delta.tables.DeltaTable;

Expand All @@ -48,6 +50,7 @@
import org.apache.xtable.model.InternalSnapshot;
import org.apache.xtable.model.InternalTable;
import org.apache.xtable.model.TableChange;
import org.apache.xtable.model.metadata.TableSyncMetadata;
import org.apache.xtable.model.schema.InternalSchema;
import org.apache.xtable.model.storage.DataFilesDiff;
import org.apache.xtable.model.storage.FileFormat;
Expand Down Expand Up @@ -159,6 +162,36 @@ public boolean isIncrementalSyncSafeFrom(Instant instant) {
return deltaCommitInstant.equals(instant) || deltaCommitInstant.isBefore(instant);
}

/**
* Extracts the rollback snapshot through the following steps: 1. Retrieve all commits made after
* the last sync 2. Identify the latest restore log among these commits 3. Return the {@link
* InternalSnapshot} from the latest restore log
*/
@Override
public Optional<InternalSnapshot> getRollbackSnapshot(TableSyncMetadata lastSyncMetadata) {
DeltaHistoryManager.Commit deltaCommitAtLastSyncInstant =
deltaLog
.history()
.getActiveCommitAtTime(
Timestamp.from(lastSyncMetadata.getLastInstantSynced()), true, false, true);
long lastSyncVersion = deltaCommitAtLastSyncInstant.getVersion();
Seq<DeltaHistory> deltaCommits = deltaLog.history().getHistory(lastSyncVersion, null);
// Using find to get first "RESTORE" operation from history (latest to oldest)
Option<DeltaHistory> restoreLog =
deltaCommits.find(history -> "RESTORE".equals(history.operation()));
if (restoreLog.isDefined()) {
DeltaHistory unwrapRestoreLog = restoreLog.get();
InternalTable table = getTable(unwrapRestoreLog.getVersion());
Snapshot snapshot = deltaLog.getSnapshotAt(lastSyncVersion, Option.empty());
return Optional.of(
InternalSnapshot.builder()
.table(table)
.partitionedDataFiles(getInternalDataFiles(snapshot, table.getReadSchema()))
.build());
}
return Optional.empty();
}

@Override
public String getCommitIdentifier(Long commit) {
return String.valueOf(commit);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import org.apache.spark.sql.delta.DeltaLog;
import org.apache.spark.sql.delta.DeltaOperations;
import org.apache.spark.sql.delta.OptimisticTransaction;
import org.apache.spark.sql.delta.Snapshot;
import org.apache.spark.sql.delta.actions.Action;
import org.apache.spark.sql.delta.actions.AddFile;
import org.apache.spark.sql.delta.actions.Format;
Expand Down Expand Up @@ -215,6 +216,28 @@ public String getTableFormat() {
return TableFormat.DELTA;
}

@Override
public Optional<String> getTargetCommitIdentifier(String sourceIdentifier) {
Snapshot currentSnapshot = deltaLog.currentSnapshot().snapshot();

// Iterate backward from the current version
for (long version = currentSnapshot.version(); version >= 0; version--) {
Snapshot snapshot = deltaLog.getSnapshotAt(version, null);
Optional<TableSyncMetadata> metadata =
TableSyncMetadata.fromJson(
snapshot
.metadata()
.configuration()
.getOrElse(TableSyncMetadata.XTABLE_METADATA, () -> null));
if (metadata.isPresent()
&& String.valueOf(metadata.get().getSourceIdentifier()).equals(sourceIdentifier)) {
return Optional.of(String.valueOf(snapshot.version()));
}
}

return Optional.empty();
}

@EqualsAndHashCode
@ToString
private class TransactionState {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.stream.Collectors;

import lombok.Builder;
Expand All @@ -49,6 +50,7 @@
import org.apache.xtable.model.InternalSnapshot;
import org.apache.xtable.model.InternalTable;
import org.apache.xtable.model.TableChange;
import org.apache.xtable.model.metadata.TableSyncMetadata;
import org.apache.xtable.spi.extractor.ConversionSource;

public class HudiConversionSource implements ConversionSource<HoodieInstant> {
Expand Down Expand Up @@ -155,6 +157,39 @@ public String getCommitIdentifier(HoodieInstant commit) {
return commit.getTimestamp() + "_" + commit.getAction();
}

@Override
public Optional<InternalSnapshot> getRollbackSnapshot(TableSyncMetadata lastSyncMetadata) {
String lastSyncCommit =
HudiInstantUtils.convertInstantToCommit(lastSyncMetadata.getLastInstantSynced());
HoodieTimeline pendingTimeline = getCompletedCommits().findInstantsAfter(lastSyncCommit);
List<HoodieInstant> instantsAfterLastSync = pendingTimeline.getInstants();
Optional<HoodieInstant> rollbackInstant =
instantsAfterLastSync.stream()
.filter(instant -> "rollback".equalsIgnoreCase(instant.getAction()))
.findFirst();
if (rollbackInstant.isPresent()) {
String rollbackTimestamp = rollbackInstant.get().getTimestamp();
List<HoodieInstant> pendingInstants =
pendingTimeline.findInstantsBeforeOrEquals(rollbackTimestamp).getInstants();
InternalTable table = getTable(rollbackInstant.get());
return Optional.of(
InternalSnapshot.builder()
.table(table)
.partitionedDataFiles(dataFileExtractor.getFilesCurrentState(table))
.pendingCommits(
pendingInstants.stream()
.map(
hoodieInstant ->
HudiInstantUtils.parseFromInstantTime(hoodieInstant.getTimestamp()))
.collect(CustomCollectors.toList(pendingInstants.size())))
.sourceIdentifier(
rollbackInstant.get().getTimestamp() + "_" + rollbackInstant.get().getAction())
.build());
}

return Optional.empty();
}

private boolean doesCommitExistsAsOfInstant(Instant instant) {
HoodieInstant hoodieInstant = getCommitAtInstant(instant);
return hoodieInstant != null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -316,6 +316,35 @@ public String getTableFormat() {
return TableFormat.HUDI;
}

@Override
public Optional<String> getTargetCommitIdentifier(String sourceIdentifier) {
if (!metaClient.isPresent()) {
return Optional.empty();
}
HoodieTimeline completedTimeline =
metaClient.get().getActiveTimeline().filterCompletedInstants();
for (HoodieInstant instant : completedTimeline.getInstants()) {
Option<byte[]> instantDetails =
metaClient.get().getActiveTimeline().getInstantDetails(instant);
if (!instantDetails.isPresent()) {
continue;
}
try {
HoodieCommitMetadata commitMetadata =
HoodieCommitMetadata.fromBytes(instantDetails.get(), HoodieCommitMetadata.class);
String metadataJson =
commitMetadata.getExtraMetadata().get(TableSyncMetadata.XTABLE_METADATA);
Optional<TableSyncMetadata> xTableMetadata = TableSyncMetadata.fromJson(metadataJson);
if (xTableMetadata.isPresent()
&& xTableMetadata.get().getSourceIdentifier().equals(sourceIdentifier)) {
return Optional.of(sourceIdentifier);
}
} catch (IOException ignored) {
}
}
return Optional.empty();
}

private HoodieTableMetaClient getMetaClient() {
return metaClient.orElseThrow(
() -> new IllegalStateException("beginSync must be called before calling this method"));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;
Expand All @@ -37,6 +38,7 @@

import org.apache.iceberg.DataFile;
import org.apache.iceberg.FileScanTask;
import org.apache.iceberg.HistoryEntry;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Schema;
import org.apache.iceberg.Snapshot;
Expand All @@ -55,6 +57,7 @@
import org.apache.xtable.model.InternalSnapshot;
import org.apache.xtable.model.InternalTable;
import org.apache.xtable.model.TableChange;
import org.apache.xtable.model.metadata.TableSyncMetadata;
import org.apache.xtable.model.schema.InternalPartitionField;
import org.apache.xtable.model.schema.InternalSchema;
import org.apache.xtable.model.stat.PartitionValue;
Expand Down Expand Up @@ -258,6 +261,23 @@ public boolean isIncrementalSyncSafeFrom(Instant instant) {
return true;
}

/*
* Extract rollback snapshot by checking if the current snapshot
* matches any entry in the table's history.
*/
@Override
public Optional<InternalSnapshot> getRollbackSnapshot(TableSyncMetadata lastSyncMetadata) {
Table iceTable = getSourceTable();
long currentSnapshotId = iceTable.currentSnapshot().snapshotId();
for (HistoryEntry historyEntry : iceTable.history()) {
if (historyEntry.snapshotId() == currentSnapshotId) {
return Optional.of(getCurrentSnapshot());
}
}

return Optional.empty();
}

@Override
public String getCommitIdentifier(Snapshot commit) {
return String.valueOf(commit.snapshotId());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -242,6 +242,19 @@ public String getTableFormat() {
return TableFormat.ICEBERG;
}

@Override
public Optional<String> getTargetCommitIdentifier(String sourceIdentifier) {
for (Snapshot snapshot : table.snapshots()) {
Optional<TableSyncMetadata> metadata =
TableSyncMetadata.fromJson(snapshot.summary().get(TableSyncMetadata.XTABLE_METADATA));
if (metadata.isPresent()
&& String.valueOf(metadata.get().getSourceIdentifier()).equals(sourceIdentifier)) {
return Optional.of(String.valueOf(snapshot.snapshotId()));
}
}
return Optional.empty();
}

private void rollbackCorruptCommits() {
if (table == null) {
// there is no existing table so exit early
Expand Down

0 comments on commit 9bc8256

Please sign in to comment.