-
Notifications
You must be signed in to change notification settings - Fork 2.9k
Validate concurrent commits in DynamicIcebergSink to prevent commit duplication #14517
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
Change-Id: Id5fb471eb4234168093fbbe5e7ccf3607794ddc9
36f03a9 to
90877bf
Compare
|
@pvary, I updated this PR with the new API that was recently merged. |
| lastCommittedCheckpointId = Long.parseLong(value); | ||
| break; | ||
| } | ||
| @Nullable |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: this annotation doesn't add much info, since we immediately check the null value after the line.
Please remove it.
| } | ||
|
|
||
| @Nullable | ||
| private static Long extractCommittedCheckpointId( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could we change the getMaxCommittedCheckpointId to use Iterable<Snapshot> baseSnapshots instead of the table parameter? In this case we can reuse the same method, and we don't need to write some spagetti code.
And in the original place we could just add
table.snapshot(branch) != null
? SnapshotUtil.ancestorsOf(table.snapshot(branch).snapshotId(), table::snapshot)
: List.of();
| public String errorMessage() { | ||
| if (errorMessage == null) { | ||
| return SnapshotAncestryValidator.super.errorMessage(); | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: newlines after blocks. See: https://iceberg.apache.org/contribute/#block-spacing
This PR addresses the issue in #14425 by validating that no concurrent commit has moved the
flink.max-committed-checkpoint-idinDynamicIcebergSinkthat would not be seen by the committer due to the table refreshing during the commit process.