Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Enhancement] Support restore/rollback sync during conversion (1/2) #569

Open
wants to merge 22 commits into
base: main
Choose a base branch
from

Conversation

danielhumanmod
Copy link
Contributor

@danielhumanmod danielhumanmod commented Oct 27, 2024

Important Read

What is the purpose of the pull request

Previously, if a rollback/restore occurred in the source table, XTable would reflect it as file changes (added or deleted) in the target table. In this PR, we aim to improve this by issuing a rollback command in the target tables, ensuring more consistent histories between the source and target. This approach is also more efficient, as it allows us to restore directly to a specific version/snapshot instead of computing a large diff against the table’s current state.

Brief change log

This is the first part of this enhancement (1/2), focusing primarily on supporting commit-level information for all target table formats and the ability to locate certain target commit with given source identifier.

  1. Add a source identifier in target transaction
  • snapshot ID in Iceberg, version ID in Delta, and instant timestamp in Hudi
  1. Locate target commit with given source identifier

Blank diagram

Additional Info

Fallback scenarios

Fallback will happen when a rollback or restore is detected in the source table, but the corresponding commit is not found in the target table. We will still leverage the rollback information from the source, but this round of sync will be treated as file changes in the target table, following the previous behavior.

Here’s an example:

Iceberg (Source)          Delta (Target)  
┌────────────┐      ┌─────────────────────┐
│ Snapshot 0 │ ◀  ▶ │ Version 0 (Synced)  │  
│ Snapshot 1 │ ◀  ▶ │ Version 1 (Synced)  │  
│ Snapshot 2 │ ◀  ▶ │ Version 2 (Synced)  │  
│ Snapshot 3 │ ◀  ▶ │ Version 3 (Synced)  │  
│ Snapshot 4 │      │                     │  
│ Snapshot 5 │ ◀  ▶ │ Version 4 (Synced)  │
└────────────┘      └─────────────────────┘  
  • If rollback to Snapshot 0, 1, 2, 3, 5, can find a corresponding commit in target
  • If rollback to snapshot 4, can not find a corresponding commit in target. This makes it impossible to issue a rollback command, but we can still make use of the info of source snapshot to issue a file changes

In this case, we can not guarantee complete metadata consistency between the source and target, but it helps reduce some computation.

Verify this pull request

This pull request is already covered by existing tests, all existing tests should pass

@danielhumanmod danielhumanmod marked this pull request as draft October 27, 2024 23:22
@danielhumanmod danielhumanmod changed the title [Enhancement] Support restore/rollback sync during conversion (1/3) [Enhancement] Support restore/rollback sync during conversion (1/2) Oct 28, 2024
@@ -47,9 +49,20 @@ public IncrementalTableChanges extractTableChanges(
commitsBacklog.getCommitsToProcess().stream()
.map(conversionSource::getTableChangeForCommit)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that we'll want the identifier on the commit level, right?

Copy link
Contributor Author

@danielhumanmod danielhumanmod Nov 13, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that we'll want the identifier on the commit level, right?

Thanks for the response @the-other-tim-brown.

Yes, ideally, every commit in source table should directly map to one in target table. However, based on my understanding of how XTable works, this isn’t guaranteed. Instead, the mapping (Source -> Target) is more like a N:1 mapping, which means:

  • Every commit in the target table has a corresponding mapping in the source table.
  • Not every commit in the source table has a one-to-one mapping in the target table.

The reason is, between each sync(), there could be multiple changes on source, and all these changes will sync as only one commit in target, just like this example

Iceberg (Source)          Delta (Target)  
┌────────────┐      ┌─────────────────────┐
│ Snapshot 0 │ ◀  ▶ │ Version 0 (Synced)  │  (can map to snapshot 0)
│ Snapshot 1 │      │                     │  
│ Snapshot 2 │      │                     │  
│ Snapshot 3 │      │                     │  
│ Snapshot 4 │      │                     │  
│ Snapshot 5 │ ◀  ▶ │ Version 1 (Synced)  │ (can map to snapshot 5)
└────────────┘      └─────────────────────┘  

Given this, I’ve chosen to use the information from the latest commit in the source table as the source identifier.
But my understanding might be wrong, appreciate if there is any feedback or suggestion :)

Copy link
Contributor

@the-other-tim-brown the-other-tim-brown Nov 17, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is true for snapshot sync but with incremental sync, there are multiple commits all synced to the target as their own commits. One thing we should confirm is whether we are able to track at a per commit level in each target. I am unsure if the metadata history is tracked in Iceberg and Delta. It is tracked in the Hudi target

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for pointing this out—it’s an important detail! I’ll investigate further into the ability to track commit-level information in Iceberg and Delta. I will get back to you once I figure it out

@danielhumanmod
Copy link
Contributor Author

danielhumanmod commented Nov 20, 2024

Hi @the-other-tim-brown, based on my investigation, both Iceberg and Delta support storing commit-level information, but we might need to adjust our current code. Here’s a summary of the findings:

  • Iceberg
    • Can use the snapshot summary to store snapshot-level information.
    • However, we currently store sync metadata in properties, which is table-level information.
  • Delta
    • Can use commit tags to store commit-level information.
    • However, we currently store sync metadata in configurations, which is table-level information.

To align with these capabilities, some code adjustments may be needed for both Iceberg and Delta. I’ll start working on a proof of concept to explore this, and will get back to you once it’s completed.

@the-other-tim-brown
Copy link
Contributor

@danielhumanmod Thanks for looking into it. I think being able to store this at a commit level better aligns with my our intentions so it would be great to fix this.

@danielhumanmod danielhumanmod marked this pull request as ready for review November 24, 2024 21:56
@danielhumanmod
Copy link
Contributor Author

@danielhumanmod Thanks for looking into it. I think being able to store this at a commit level better aligns with my our intentions so it would be great to fix this.

Hi @the-other-tim-brown, I have updated the code to support the commit level info, appreciate any review and suggestion!

@@ -90,4 +91,7 @@ public interface ConversionTarget {

/** Initializes the client with provided configuration */
void init(TargetTable targetTable, Configuration configuration);

/** Return the commit identifier from target table */
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you be more specific here? I am assuming this is the latest commit identifier but it should be more specific if the table can have more than one over time.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you be more specific here? I am assuming this is the latest commit identifier but it should be more specific if the table can have more than one over time.

Do you mean the snapshot sync scenario? Because In incremental sync, we will ensure target and source commit has 1:1 mapping. However, in snapshot sync, we will use the commit id of current (latest) snapshot even though the changes might contains more than one commit.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To ensure we’re aligned, the context of this function is:

It is designed to retrieve the corresponding target commit based on a given source identifier during a sync operation. Here’s the scenario:

Assume:

  • Last completed sync (1 snapshot, 1 incremental) is source commit 3
  • We’re starting the new sync process from source commit 4

Source table commit history:
• 1 (UPDATE)
• 2 (UPDATE)
• 3 (UPDATE)
• 4 (ROLLBACK to 2)
• 5 (UPDATE)
• 6 (UPDATE)
Target table commit history (mapped by source identifiers):
• 1 (mapped to source id 2)
• 2 (mapped to source id 3)

When syncing the ROLLBACK operation (source commit 4), we need to identify the corresponding target commit that aligns with the source identifier 2 (which is 1 in target).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That makes sense to me but the java doc can be a bit more clear. It should also include context on when the Optional will be empty

@@ -215,13 +224,64 @@ public String getTableFormat() {
return TableFormat.DELTA;
}

@Override
public Optional<String> getTargetCommitIdentifier(String sourceIdentifier) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If there is no commit Identifier in the latest commit, should this just return an empty Optional?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If there is no commit Identifier in the latest commit, should this just return an empty Optional?

Yes, if we can not find the corresponding target commit, we will return an empty Optional

*/
void beginSync(InternalTable table);
void beginSync(InternalTable table, String sourceIdentifier);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am wondering if it makes sense to attach the source format as part of the identifier. Then we can also have some metadata about what the writer format was.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am wondering if it makes sense to attach the source format as part of the identifier. Then we can also have some metadata about what the writer format was.

Good idea. I create a SourceMetadata as a commit-level information attached in target table commit so that It would be much easier if we want add more information in the future

@@ -51,6 +51,8 @@ public class TableSyncMetadata {
/** Property name for the XTABLE metadata in the table metadata/properties */
public static final String XTABLE_METADATA = "XTABLE_METADATA";

public static final String XTABLE_SOURCE_METADATA = "XTABLE_SOURCE_METADATA";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of having a separate metadata, is it possible to embed this within the metadata object that we have today?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch! The current metadata is table-level information, while the new one is commit-level, but it is true that we can reuse this key.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And maybe we could consider moving the source format to table-level information and leaving the source identifier at the commit level to reduce redundancy.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am wondering if all of the information should actually be stored at the commit level instead of table level. In the Hudi target, the information is already stored at the commit level since that was the intention and the format I knew best. The others, it seems like I did not add the metadata to the correct location.

The source format should be allowed to change over time for flexibility so I think it is best to keep that at the commit level as well.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah I see your point, that's a good way. I’ll get started on it.

@@ -264,7 +330,8 @@ private void commitTransaction() {
transaction.updateMetadata(metadata, false);
transaction.commit(
actions,
new DeltaOperations.Update(Option.apply(Literal.fromObject("xtable-delta-sync"))));
new DeltaOperations.Update(Option.apply(Literal.fromObject("xtable-delta-sync"))),
ScalaUtils.convertJavaMapToScala(getCommitTags()));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the difference between putting this info in tags compared to the metadata we are currently using? Should we consolidate?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here xtable-delta-sync is the operation, and the tags is more recommended to store the metadata as k-v pairs

validateDeltaTable(basePath, new HashSet<>(Arrays.asList(dataFile2, dataFile3)), null);
assertTrue(targetIdentifier2.isPresent());
assertEquals("1", targetIdentifier2.get());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's have 1 test where we return an empty option as well

return getTargetCommitIdentifier(sourceIdentifier, metaClient.get());
}

public Optional<String> getTargetCommitIdentifier(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can this be made private or at least package private?


public Optional<String> getTargetCommitIdentifier(
String sourceIdentifier, HoodieTableMetaClient metaClient) {
long sourceIdentifierVal = Long.parseLong(sourceIdentifier);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is assuming that all sources will have identifiers that can be mapped to longs, that may not be true in the future. Can we keep this as strings throughout?

String sourceIdentifier, HoodieTableMetaClient metaClient) {
long sourceIdentifierVal = Long.parseLong(sourceIdentifier);

HoodieTimeline completedTimeline = metaClient.getActiveTimeline().filterCompletedInstants();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can filter this to the commits timeline (avoids looking at cleaner or other commit types) with getCommitsTimeline


for (HoodieInstant instant : completedTimeline.getInstants()) {
try {
Option<byte[]> instantDetails = metaClient.getActiveTimeline().getInstantDetails(instant);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is a similar block of code above (line 300) that also handles parsing of the "replace commit" type which is what we'll be writing from XTable. Can you extract that into a common helper method to avoid repeated code?

SourceMetadata sourceMetadata = SourceMetadata.fromJson(sourceMetadataJson);

if (sourceIdentifier.equals(sourceMetadata.getSourceIdentifier())) {
return Optional.of(sourceIdentifier);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we return the identifier of the target?

// initialize the sync
conversionTarget.beginSync(tableState);
// Persist the latest commit time in table properties for incremental syncs
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here We need to move the Metadata set operation earlier because it will be required during the sync() operation in Iceberg (Delta and Hudi only need it in completeSync()

OverwriteFiles overwriteFiles = transaction.newOverwrite();
filesAdded.forEach(f -> overwriteFiles.addFile(getDataFile(partitionSpec, schema, f)));
filesRemoved.forEach(overwriteFiles::deleteFile);
overwriteFiles.set(TableSyncMetadata.XTABLE_METADATA, metadata.toJson());
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Attach XTable metadata as Iceberg snapshot summary (commit level)

@danielhumanmod
Copy link
Contributor Author

Thanks for the suggestions @the-other-tim-brown ! I have updated the code in latest commit, including changes:

  • All XTABLE_METADATA are commit level across 3 table format
  • Adjust getTargetCommitIdentifier method
  • Refactor metadata retrieval logic in `HudiConversionTarget'
  • Added test cases for empty target commit identifier.

Appreciate any further suggestion in advanced!

Copy link
Contributor

@the-other-tim-brown the-other-tim-brown left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@danielhumanmod apologies for the delay on this review. I had lost track of this change. One concern I have is how do we migrate existing tables. Should we support the old way of reading the commit metadata as a fallback so we can easily migrate users without requiring them to do a full snapshot sync?

@@ -78,4 +78,13 @@ public interface ConversionSource<COMMIT> extends Closeable {
* false.
*/
boolean isIncrementalSyncSafeFrom(Instant instant);

/**
* Extract the identifier of the provided commit, the identifier defined as 1. Snapshot ID in
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can use <ul><li> tags for formatting the docs with the list if you want


private Map<String, String> getCommitTags() {
Map<String, String> tags = new HashMap<>();
tags.put(TableSyncMetadata.XTABLE_METADATA, metadata.toJson());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nitpick: we can use Collections.singletonMap here


// Check action and parse the appropriate metadata
Map<String, String> extraMetadata;
if (instant.getAction().equals(HoodieTimeline.REPLACE_COMMIT_ACTION)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just discovered there is a TimelineUtils#getCommitMetadata that we can use to handle this logic for us. https://github.com/apache/hudi/blob/release-0.14.0/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/TimelineUtils.java#L271C38-L271C55

The HoodieReplaceCommitMetadata extends HoodieCommitMetadata so it will work for our use case where we just want to inspect the metadata


TableSyncMetadata metadata = optionalMetadata.get();
if (sourceIdentifier.equals(metadata.getSourceIdentifier())) {
return Optional.of(String.valueOf(instant.getTimestamp()));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

getTimestamp is already returning a string, can we remove the String.valueOf?

@@ -36,4 +36,7 @@ public class TableChange {

/** The {@link InternalTable} at the commit time to which this table change belongs. */
InternalTable tableAsOfChange;

// Commit identifier in source table
@Builder.Default String sourceIdentifier = "";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this may be safer without a default and require non-null, what do you think?

Similar note for InternalSnapshot

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants