Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableUtil;
import org.apache.iceberg.catalog.Catalog;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.flink.CatalogLoader;
Expand Down Expand Up @@ -160,12 +161,13 @@ byte[] writeToManifest(
writeResults.forEach(w -> builder.add(w.writeResult()));
WriteResult result = builder.build();

Table table = catalog.loadTable(TableIdentifier.parse(key.tableName()));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should cache, or make the info part of the Committable.

Committable seems like a better choice, as the serialized manifest file is different for the 2 versions

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Get it. Let me try it.

DeltaManifests deltaManifests =
FlinkManifestUtil.writeCompletedFiles(
result,
() -> outputFileFactory(key.tableName()).create(checkpointId),
spec(key.tableName(), key.specId()),
2);
TableUtil.formatVersion(table));

return SimpleVersionedSerialization.writeVersionAndSerialize(
DeltaManifestsSerializer.INSTANCE, deltaManifests);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.PartitionField;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableUtil;
import org.apache.iceberg.catalog.Catalog;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.flink.FlinkSchemaUtil;
Expand Down Expand Up @@ -118,10 +117,6 @@ public void write(DynamicRecordInternal element, Context context)
!equalityFieldIds.isEmpty(),
"Equality field columns shouldn't be empty when configuring to use UPSERT data.");

Preconditions.checkArgument(
!(TableUtil.formatVersion(table) > 2),
"Dynamic Sink writer does not support upsert mode in tables (V3+)");

if (!table.spec().isUnpartitioned()) {
for (PartitionField partitionField : table.spec().fields()) {
Preconditions.checkState(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Schema;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableProperties;
import org.apache.iceberg.catalog.Catalog;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.data.IcebergGenerics;
Expand All @@ -73,6 +74,7 @@
import org.apache.iceberg.inmemory.InMemoryInputFile;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
Expand Down Expand Up @@ -522,6 +524,75 @@ void testUpsert() throws Exception {
}
}

@Test
void testUpsertV3() throws Exception {
ImmutableMap<String, String> properties = ImmutableMap.of(TableProperties.FORMAT_VERSION, "3");
CATALOG_EXTENSION
.catalog()
.createTable(
TableIdentifier.of(DATABASE, "t1"),
SimpleDataUtil.SCHEMA,
PartitionSpec.unpartitioned(),
null,
properties);

List<DynamicIcebergDataImpl> rows =
Lists.newArrayList(
// Insert one rows
new DynamicIcebergDataImpl(
SimpleDataUtil.SCHEMA,
"t1",
"main",
PartitionSpec.unpartitioned(),
true,
Sets.newHashSet("id"),
false),
// Remaining rows are duplicates
new DynamicIcebergDataImpl(
SimpleDataUtil.SCHEMA,
"t1",
"main",
PartitionSpec.unpartitioned(),
true,
Sets.newHashSet("id"),
true),
new DynamicIcebergDataImpl(
SimpleDataUtil.SCHEMA,
"t1",
"main",
PartitionSpec.unpartitioned(),
true,
Sets.newHashSet("id"),
true),
new DynamicIcebergDataImpl(
SimpleDataUtil.SCHEMA,
"t1",
"main",
PartitionSpec.unpartitioned(),
true,
Sets.newHashSet("id"),
true));

executeDynamicSink(rows, env, true, 1, null);

try (CloseableIterable<Record> iterable =
IcebergGenerics.read(
CATALOG_EXTENSION.catalog().loadTable(TableIdentifier.of("default", "t1")))
.build()) {
List<Record> records = Lists.newArrayList();
for (Record record : iterable) {
records.add(record);
}

assertThat(records).hasSize(1);
Record actual = records.get(0);
DynamicIcebergDataImpl input = rows.get(0);
assertThat(actual.get(0)).isEqualTo(input.rowProvided.getField(0));
assertThat(actual.get(1)).isEqualTo(input.rowProvided.getField(1));
// There is an additional _pos field which gets added
}
}

@Test
void testCommitFailedBeforeOrAfterCommit() throws Exception {
// Configure a Restart strategy to allow recovery
Expand Down