2828import java .util .Objects ;
2929import java .util .concurrent .ExecutorService ;
3030import java .util .concurrent .TimeUnit ;
31+ import java .util .function .Consumer ;
32+ import javax .annotation .Nullable ;
3133import org .apache .flink .annotation .Internal ;
3234import org .apache .flink .api .connector .sink2 .Committer ;
3335import org .apache .flink .core .io .SimpleVersionedSerialization ;
4042import org .apache .iceberg .Table ;
4143import org .apache .iceberg .catalog .Catalog ;
4244import org .apache .iceberg .catalog .TableIdentifier ;
45+ import org .apache .iceberg .exceptions .ValidationException ;
4346import org .apache .iceberg .flink .sink .CommitSummary ;
4447import org .apache .iceberg .flink .sink .DeltaManifests ;
4548import org .apache .iceberg .flink .sink .DeltaManifestsSerializer ;
@@ -158,26 +161,36 @@ public void commit(Collection<CommitRequest<DynamicCommittable>> commitRequests)
158161 private static long getMaxCommittedCheckpointId (
159162 Table table , String flinkJobId , String operatorId , String branch ) {
160163 Snapshot snapshot = table .snapshot (branch );
161- long lastCommittedCheckpointId = INITIAL_CHECKPOINT_ID ;
162164
163165 while (snapshot != null ) {
164- Map <String , String > summary = snapshot .summary ();
165- String snapshotFlinkJobId = summary .get (FLINK_JOB_ID );
166- String snapshotOperatorId = summary .get (OPERATOR_ID );
167- if (flinkJobId .equals (snapshotFlinkJobId )
168- && (snapshotOperatorId == null || snapshotOperatorId .equals (operatorId ))) {
169- String value = summary .get (MAX_COMMITTED_CHECKPOINT_ID );
170- if (value != null ) {
171- lastCommittedCheckpointId = Long .parseLong (value );
172- break ;
173- }
166+ @ Nullable
167+ Long committedCheckpointId = extractCommittedCheckpointId (snapshot , flinkJobId , operatorId );
168+ if (committedCheckpointId != null ) {
169+ return committedCheckpointId ;
174170 }
175171
176172 Long parentSnapshotId = snapshot .parentId ();
177173 snapshot = parentSnapshotId != null ? table .snapshot (parentSnapshotId ) : null ;
178174 }
179175
180- return lastCommittedCheckpointId ;
176+ return INITIAL_CHECKPOINT_ID ;
177+ }
178+
179+ @ Nullable
180+ private static Long extractCommittedCheckpointId (
181+ Snapshot snapshot , String flinkJobId , String operatorId ) {
182+ Map <String , String > summary = snapshot .summary ();
183+ String snapshotFlinkJobId = summary .get (FLINK_JOB_ID );
184+ String snapshotOperatorId = summary .get (OPERATOR_ID );
185+ if (flinkJobId .equals (snapshotFlinkJobId )
186+ && (snapshotOperatorId == null || snapshotOperatorId .equals (operatorId ))) {
187+ String value = summary .get (MAX_COMMITTED_CHECKPOINT_ID );
188+ if (value != null ) {
189+ return Long .parseLong (value );
190+ }
191+ }
192+
193+ return null ;
181194 }
182195
183196 /**
@@ -276,7 +289,17 @@ private void replacePartitions(
276289 String operatorId ) {
277290 // Iceberg tables are unsorted. So the order of the append data does not matter.
278291 // Hence, we commit everything in one snapshot.
279- ReplacePartitions dynamicOverwrite = table .newReplacePartitions ().scanManifestsWith (workerPool );
292+ long checkpointId = pendingResults .lastKey ();
293+ ReplacePartitions dynamicOverwrite =
294+ table
295+ .newReplacePartitions ()
296+ .scanManifestsWith (workerPool )
297+ .validateSnapshot (
298+ new MaxCommittedCheckpointIdValidator (checkpointId , newFlinkJobId , operatorId ));
299+ @ Nullable Snapshot latestSnapshot = table .snapshot (branch );
300+ if (latestSnapshot != null ) {
301+ dynamicOverwrite = dynamicOverwrite .validateFromSnapshot (latestSnapshot .snapshotId ());
302+ }
280303
281304 for (List <WriteResult > writeResults : pendingResults .values ()) {
282305 for (WriteResult result : writeResults ) {
@@ -292,7 +315,7 @@ private void replacePartitions(
292315 "dynamic partition overwrite" ,
293316 newFlinkJobId ,
294317 operatorId ,
295- pendingResults . lastKey () );
318+ checkpointId );
296319 }
297320
298321 private void commitDeltaTxn (
@@ -306,7 +329,17 @@ private void commitDeltaTxn(
306329 long checkpointId = e .getKey ();
307330 List <WriteResult > writeResults = e .getValue ();
308331
309- RowDelta rowDelta = table .newRowDelta ().scanManifestsWith (workerPool );
332+ RowDelta rowDelta =
333+ table
334+ .newRowDelta ()
335+ .scanManifestsWith (workerPool )
336+ .validateSnapshot (
337+ new MaxCommittedCheckpointIdValidator (checkpointId , newFlinkJobId , operatorId ));
338+ @ Nullable Snapshot latestSnapshot = table .snapshot (branch );
339+ if (latestSnapshot != null ) {
340+ rowDelta = rowDelta .validateFromSnapshot (latestSnapshot .snapshotId ());
341+ }
342+
310343 for (WriteResult result : writeResults ) {
311344 // Row delta validations are not needed for streaming changes that write equality deletes.
312345 // Equality deletes are applied to data in all previous sequence numbers, so retries may
@@ -329,6 +362,39 @@ private void commitDeltaTxn(
329362 }
330363 }
331364
365+ static class MaxCommittedCheckpointIdValidator implements Consumer <Snapshot > {
366+ private final long stagedCheckpointId ;
367+ private final String flinkJobId ;
368+ private final String flinkOperatorId ;
369+
370+ MaxCommittedCheckpointIdValidator (
371+ long stagedCheckpointId , String flinkJobId , String flinkOperatorId ) {
372+ this .stagedCheckpointId = stagedCheckpointId ;
373+ this .flinkJobId = flinkJobId ;
374+ this .flinkOperatorId = flinkOperatorId ;
375+ }
376+
377+ @ Override
378+ public void accept (Snapshot snapshot ) {
379+ @ Nullable
380+ Long checkpointId = extractCommittedCheckpointId (snapshot , flinkJobId , flinkOperatorId );
381+ if (checkpointId == null ) {
382+ return ;
383+ }
384+
385+ ValidationException .check (
386+ checkpointId < stagedCheckpointId ,
387+ "The new parent snapshot '%s' has '%s': '%s' >= '%s' of the currently staged committable."
388+ + "\n This can happen, for example, when using the REST catalog: if the previous commit request failed"
389+ + " in the Flink client but succeeded on the server after the Flink job decided to retry it with the new request."
390+ + "\n Flink should retry this exception, and the committer should skip the duplicate request during the next retry." ,
391+ snapshot .snapshotId (),
392+ MAX_COMMITTED_CHECKPOINT_ID ,
393+ checkpointId ,
394+ stagedCheckpointId );
395+ }
396+ }
397+
332398 @ VisibleForTesting
333399 void commitOperation (
334400 Table table ,
0 commit comments