From 723b7c40186be275c4de44781312b3428f7cc799 Mon Sep 17 00:00:00 2001 From: Guosmilesmile <511955993@qq.com> Date: Fri, 24 Oct 2025 17:36:03 +0800 Subject: [PATCH 1/2] Flink: DynamicSink support dvs --- .../dynamic/DynamicWriteResultAggregator.java | 4 +- .../flink/sink/dynamic/DynamicWriter.java | 5 -- .../sink/dynamic/TestDynamicIcebergSink.java | 71 +++++++++++++++++++ 3 files changed, 74 insertions(+), 6 deletions(-) diff --git a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicWriteResultAggregator.java b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicWriteResultAggregator.java index 0f65f1ae52f3..b7ec12d17717 100644 --- a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicWriteResultAggregator.java +++ b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicWriteResultAggregator.java @@ -34,6 +34,7 @@ import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.Table; +import org.apache.iceberg.TableUtil; import org.apache.iceberg.catalog.Catalog; import org.apache.iceberg.catalog.TableIdentifier; import org.apache.iceberg.flink.CatalogLoader; @@ -160,12 +161,13 @@ byte[] writeToManifest( writeResults.forEach(w -> builder.add(w.writeResult())); WriteResult result = builder.build(); + Table table = catalog.loadTable(TableIdentifier.parse(key.tableName())); DeltaManifests deltaManifests = FlinkManifestUtil.writeCompletedFiles( result, () -> outputFileFactory(key.tableName()).create(checkpointId), spec(key.tableName(), key.specId()), - 2); + TableUtil.formatVersion(table)); return SimpleVersionedSerialization.writeVersionAndSerialize( DeltaManifestsSerializer.INSTANCE, deltaManifests); diff --git a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicWriter.java b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicWriter.java index 5ed9da8623e9..c2a303285801 100644 --- a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicWriter.java +++ b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicWriter.java @@ -32,7 +32,6 @@ import org.apache.iceberg.FileFormat; import org.apache.iceberg.PartitionField; import org.apache.iceberg.Table; -import org.apache.iceberg.TableUtil; import org.apache.iceberg.catalog.Catalog; import org.apache.iceberg.catalog.TableIdentifier; import org.apache.iceberg.flink.FlinkSchemaUtil; @@ -118,10 +117,6 @@ public void write(DynamicRecordInternal element, Context context) !equalityFieldIds.isEmpty(), "Equality field columns shouldn't be empty when configuring to use UPSERT data."); - Preconditions.checkArgument( - !(TableUtil.formatVersion(table) > 2), - "Dynamic Sink writer does not support upsert mode in tables (V3+)"); - if (!table.spec().isUnpartitioned()) { for (PartitionField partitionField : table.spec().fields()) { Preconditions.checkState( diff --git a/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicIcebergSink.java b/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicIcebergSink.java index 20fae212b48e..5dab31609e86 100644 --- a/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicIcebergSink.java +++ b/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicIcebergSink.java @@ -56,6 +56,7 @@ import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.Schema; import org.apache.iceberg.Table; +import org.apache.iceberg.TableProperties; import org.apache.iceberg.catalog.Catalog; import org.apache.iceberg.catalog.TableIdentifier; import org.apache.iceberg.data.IcebergGenerics; @@ -73,6 +74,7 @@ import org.apache.iceberg.inmemory.InMemoryInputFile; import org.apache.iceberg.io.CloseableIterable; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.relocated.com.google.common.collect.Maps; import org.apache.iceberg.relocated.com.google.common.collect.Sets; @@ -522,6 +524,75 @@ void testUpsert() throws Exception { } } + @Test + void testUpsertV3() throws Exception { + ImmutableMap properties = ImmutableMap.of(TableProperties.FORMAT_VERSION, "3"); + CATALOG_EXTENSION + .catalog() + .createTable( + TableIdentifier.of(DATABASE, "t1"), + SimpleDataUtil.SCHEMA, + PartitionSpec.unpartitioned(), + null, + properties); + + List rows = + Lists.newArrayList( + // Insert one rows + new DynamicIcebergDataImpl( + SimpleDataUtil.SCHEMA, + "t1", + "main", + PartitionSpec.unpartitioned(), + true, + Sets.newHashSet("id"), + false), + // Remaining rows are duplicates + new DynamicIcebergDataImpl( + SimpleDataUtil.SCHEMA, + "t1", + "main", + PartitionSpec.unpartitioned(), + true, + Sets.newHashSet("id"), + true), + new DynamicIcebergDataImpl( + SimpleDataUtil.SCHEMA, + "t1", + "main", + PartitionSpec.unpartitioned(), + true, + Sets.newHashSet("id"), + true), + new DynamicIcebergDataImpl( + SimpleDataUtil.SCHEMA, + "t1", + "main", + PartitionSpec.unpartitioned(), + true, + Sets.newHashSet("id"), + true)); + + executeDynamicSink(rows, env, true, 1, null); + + try (CloseableIterable iterable = + IcebergGenerics.read( + CATALOG_EXTENSION.catalog().loadTable(TableIdentifier.of("default", "t1"))) + .build()) { + List records = Lists.newArrayList(); + for (Record record : iterable) { + records.add(record); + } + + assertThat(records).hasSize(1); + Record actual = records.get(0); + DynamicIcebergDataImpl input = rows.get(0); + assertThat(actual.get(0)).isEqualTo(input.rowProvided.getField(0)); + assertThat(actual.get(1)).isEqualTo(input.rowProvided.getField(1)); + // There is an additional _pos field which gets added + } + } + @Test void testCommitFailedBeforeOrAfterCommit() throws Exception { // Configure a Restart strategy to allow recovery From d7aa22f9ad5f0770982b05c08a1e82b301bc30ea Mon Sep 17 00:00:00 2001 From: Guosmilesmile <511955993@qq.com> Date: Fri, 7 Nov 2025 10:41:59 +0800 Subject: [PATCH 2/2] Cache format version and add doc --- docs/docs/flink-writes.md | 1 + .../dynamic/DynamicWriteResultAggregator.java | 16 ++++++++++++++-- 2 files changed, 15 insertions(+), 2 deletions(-) diff --git a/docs/docs/flink-writes.md b/docs/docs/flink-writes.md index 168cb2d4183b..9226c4b9b8e9 100644 --- a/docs/docs/flink-writes.md +++ b/docs/docs/flink-writes.md @@ -549,3 +549,4 @@ The Dynamic Iceberg Flink Sink is configured using the Builder pattern. Here are - **Range distribution mode**: Currently, the dynamic sink does not support the `RANGE` distribution mode, if set, it will fall back to `HASH`. - **Property Precedence Note**: When conflicts occur between table properties and sink properties, the sink properties will override the table properties configuration. +- **Table Format Version upgrade**: Currently, the dynamic sink does not support upgrading the underlying table version directly. diff --git a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicWriteResultAggregator.java b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicWriteResultAggregator.java index b7ec12d17717..9a45cd3f635e 100644 --- a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicWriteResultAggregator.java +++ b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicWriteResultAggregator.java @@ -68,6 +68,7 @@ class DynamicWriteResultAggregator private transient Map> results; private transient Map> specs; + private transient Map tableFormatVersions; private transient Map outputFileFactories; private transient String flinkJobId; private transient String operatorId; @@ -96,6 +97,7 @@ public void open() throws Exception { this.results = Maps.newHashMap(); this.specs = new LRUCache<>(cacheMaximumSize); this.outputFileFactories = new LRUCache<>(cacheMaximumSize); + this.tableFormatVersions = new LRUCache<>(cacheMaximumSize); this.catalog = catalogLoader.loadCatalog(); } @@ -161,13 +163,12 @@ byte[] writeToManifest( writeResults.forEach(w -> builder.add(w.writeResult())); WriteResult result = builder.build(); - Table table = catalog.loadTable(TableIdentifier.parse(key.tableName())); DeltaManifests deltaManifests = FlinkManifestUtil.writeCompletedFiles( result, () -> outputFileFactory(key.tableName()).create(checkpointId), spec(key.tableName(), key.specId()), - TableUtil.formatVersion(table)); + formatVersion(key.tableName())); return SimpleVersionedSerialization.writeVersionAndSerialize( DeltaManifestsSerializer.INSTANCE, deltaManifests); @@ -191,6 +192,7 @@ private ManifestOutputFileFactory outputFileFactory(String tableName) { unused -> { Table table = catalog.loadTable(TableIdentifier.parse(tableName)); specs.put(tableName, table.specs()); + tableFormatVersions.put(tableName, TableUtil.formatVersion(table)); // Make sure to append an identifier to avoid file clashes in case the factory was to get // re-created during a checkpoint, i.e. due to cache eviction. String fileSuffix = UUID.randomUUID().toString(); @@ -217,4 +219,14 @@ private PartitionSpec spec(String tableName, int specId) { Table table = catalog.loadTable(TableIdentifier.parse(tableName)); return table.specs().get(specId); } + + private int formatVersion(String tableName) { + Integer cachedFormatVersion = tableFormatVersions.get(tableName); + if (cachedFormatVersion != null) { + return cachedFormatVersion; + } + + Table table = catalog.loadTable(TableIdentifier.parse(tableName)); + return TableUtil.formatVersion(table); + } }