-
Notifications
You must be signed in to change notification settings - Fork 158
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[Enhancement] Support restore/rollback sync during conversion (1/2) #569
base: main
Are you sure you want to change the base?
Changes from 15 commits
e0a8710
9bc8256
76e0bc0
45b279a
8ac5717
53a58b6
bb5ed78
9b8b71e
252ca2c
d9cb5fd
c8dde9e
bfbb78f
002d91b
bf5f0a0
56fe85e
a40cd6c
1e39d1a
0af0a82
82b2d2d
7ffda25
4198e5b
b181a07
4ca9c9a
9552679
5fe489e
42c804b
e470e98
2033c3a
edb99fd
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,56 @@ | ||
/* | ||
* Licensed to the Apache Software Foundation (ASF) under one | ||
* or more contributor license agreements. See the NOTICE file | ||
* distributed with this work for additional information | ||
* regarding copyright ownership. The ASF licenses this file | ||
* to you under the Apache License, Version 2.0 (the | ||
* "License"); you may not use this file except in compliance | ||
* with the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
|
||
package org.apache.xtable.model.metadata; | ||
|
||
import java.io.IOException; | ||
|
||
import lombok.Builder; | ||
import lombok.Value; | ||
|
||
import com.fasterxml.jackson.databind.ObjectMapper; | ||
|
||
import org.apache.xtable.model.exception.ParseException; | ||
|
||
@Value | ||
@Builder | ||
public class SourceMetadata { | ||
// The source table snapshot identifier | ||
// Snapshot ID in Iceberg, version ID in Delta, and instant <timestamp_action> in Hudi | ||
String sourceIdentifier; | ||
// The table format of the source table | ||
String tableFormat; | ||
|
||
private static final ObjectMapper MAPPER = new ObjectMapper(); | ||
|
||
public static SourceMetadata fromJson(String json) throws ParseException { | ||
try { | ||
return MAPPER.readValue(json, SourceMetadata.class); | ||
} catch (IOException e) { | ||
throw new ParseException("Failed to deserialize SourceMetadata", e); | ||
} | ||
} | ||
|
||
public String toJson() { | ||
try { | ||
return MAPPER.writeValueAsString(this); | ||
} catch (IOException e) { | ||
throw new ParseException("Failed to serialize SourceMetadata", e); | ||
} | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -78,4 +78,13 @@ public interface ConversionSource<COMMIT> extends Closeable { | |
* false. | ||
*/ | ||
boolean isIncrementalSyncSafeFrom(Instant instant); | ||
|
||
/** | ||
* Extract the identifier of the provided commit, the identifier defined as 1. Snapshot ID in | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. You can use |
||
* Iceberg 2. Version ID in Delta 3. timestamp in Hudi | ||
* | ||
* @param commit The provided commit | ||
* @return the string version of commit identifier | ||
*/ | ||
String getCommitIdentifier(COMMIT commit); | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -25,6 +25,7 @@ | |
|
||
import org.apache.xtable.conversion.TargetTable; | ||
import org.apache.xtable.model.InternalTable; | ||
import org.apache.xtable.model.metadata.SourceMetadata; | ||
import org.apache.xtable.model.metadata.TableSyncMetadata; | ||
import org.apache.xtable.model.schema.InternalPartitionField; | ||
import org.apache.xtable.model.schema.InternalSchema; | ||
|
@@ -76,8 +77,9 @@ public interface ConversionTarget { | |
* Starts the sync and performs any initialization required | ||
* | ||
* @param table the table that will be synced | ||
* @param sourceMetadata The source table metadata for current round of sync | ||
*/ | ||
void beginSync(InternalTable table); | ||
void beginSync(InternalTable table, SourceMetadata sourceMetadata); | ||
|
||
/** Completes the sync and performs any cleanup required. */ | ||
void completeSync(); | ||
|
@@ -90,4 +92,7 @@ public interface ConversionTarget { | |
|
||
/** Initializes the client with provided configuration */ | ||
void init(TargetTable targetTable, Configuration configuration); | ||
|
||
/** Return the commit identifier from target table */ | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can you be more specific here? I am assuming this is the latest commit identifier but it should be more specific if the table can have more than one over time. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Do you mean the snapshot sync scenario? Because In incremental sync, we will ensure target and source commit has 1:1 mapping. However, in snapshot sync, we will use the commit id of current (latest) snapshot even though the changes might contains more than one commit. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. To ensure we’re aligned, the context of this function is: It is designed to retrieve the corresponding target commit based on a given source identifier during a sync operation. Here’s the scenario: Assume:
Source table commit history: When syncing the ROLLBACK operation (source commit 4), we need to identify the corresponding target commit that aligns with the source identifier 2 (which is 1 in target). There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. That makes sense to me but the java doc can be a bit more clear. It should also include context on when the Optional will be empty |
||
Optional<String> getTargetCommitIdentifier(String sourceIdentifier); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Instead of having a separate metadata, is it possible to embed this within the metadata object that we have today?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good catch! The current metadata is table-level information, while the new one is commit-level, but it is true that we can reuse this key.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
And maybe we could consider moving the source format to table-level information and leaving the source identifier at the commit level to reduce redundancy.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am wondering if all of the information should actually be stored at the commit level instead of table level. In the Hudi target, the information is already stored at the commit level since that was the intention and the format I knew best. The others, it seems like I did not add the metadata to the correct location.
The source format should be allowed to change over time for flexibility so I think it is best to keep that at the commit level as well.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah I see your point, that's a good way. I’ll get started on it.