diff --git a/docs/content/migration/iceberg-compatibility.md b/docs/content/migration/iceberg-compatibility.md index 01a03a45264d..8e4d3c90176b 100644 --- a/docs/content/migration/iceberg-compatibility.md +++ b/docs/content/migration/iceberg-compatibility.md @@ -373,7 +373,7 @@ you also need to set some (or all) of the following table options when creating
metadata.iceberg.manifest-compression
- gzip + snappy String Compression for Iceberg manifest files. diff --git a/paimon-core/src/main/java/org/apache/paimon/iceberg/IcebergOptions.java b/paimon-core/src/main/java/org/apache/paimon/iceberg/IcebergOptions.java index 4b59e29c8c33..55fbab5158fa 100644 --- a/paimon-core/src/main/java/org/apache/paimon/iceberg/IcebergOptions.java +++ b/paimon-core/src/main/java/org/apache/paimon/iceberg/IcebergOptions.java @@ -74,7 +74,7 @@ public class IcebergOptions { key("metadata.iceberg.manifest-compression") .stringType() .defaultValue( - "gzip") // some Iceberg reader cannot support zstd, for example DuckDB + "snappy") // some Iceberg reader cannot support zstd, for example DuckDB .withDescription("Compression for Iceberg manifest files."); public static final ConfigOption MANIFEST_LEGACY_VERSION = diff --git a/paimon-core/src/main/java/org/apache/paimon/iceberg/manifest/IcebergManifestFile.java b/paimon-core/src/main/java/org/apache/paimon/iceberg/manifest/IcebergManifestFile.java index 57484a1f3ff9..5955da6220f8 100644 --- a/paimon-core/src/main/java/org/apache/paimon/iceberg/manifest/IcebergManifestFile.java +++ b/paimon-core/src/main/java/org/apache/paimon/iceberg/manifest/IcebergManifestFile.java @@ -18,7 +18,6 @@ package org.apache.paimon.iceberg.manifest; -import org.apache.paimon.CoreOptions; import org.apache.paimon.annotation.VisibleForTesting; import org.apache.paimon.format.FileFormat; import org.apache.paimon.format.FormatReaderFactory; @@ -111,7 +110,7 @@ public static IcebergManifestFile create(FileStoreTable table, IcebergPathFactor } public List rollingWrite( - Iterator entries, long sequenceNumber) throws IOException { + Iterator entries, long sequenceNumber) { RollingFileWriter writer = new RollingFileWriter<>( () -> createWriter(sequenceNumber), targetFileSize.getBytes()); @@ -127,10 +126,7 @@ public List rollingWrite( public SingleFileWriter createWriter( long sequenceNumber) { return new IcebergManifestEntryWriter( - writerFactory, - pathFactory.newPath(), - CoreOptions.FILE_COMPRESSION.defaultValue(), - sequenceNumber); + writerFactory, pathFactory.newPath(), compression, sequenceNumber); } private class IcebergManifestEntryWriter diff --git a/paimon-core/src/test/java/org/apache/paimon/iceberg/IcebergCompatibilityTest.java b/paimon-core/src/test/java/org/apache/paimon/iceberg/IcebergCompatibilityTest.java index b069ac031d38..e5b550ff94c4 100644 --- a/paimon-core/src/test/java/org/apache/paimon/iceberg/IcebergCompatibilityTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/iceberg/IcebergCompatibilityTest.java @@ -28,6 +28,7 @@ import org.apache.paimon.data.GenericRow; import org.apache.paimon.data.Timestamp; import org.apache.paimon.disk.IOManagerImpl; +import org.apache.paimon.fs.FileIO; import org.apache.paimon.fs.Path; import org.apache.paimon.fs.local.LocalFileIO; import org.apache.paimon.iceberg.manifest.IcebergManifestFile; @@ -281,9 +282,10 @@ public void testIcebergSnapshotExpire() throws Exception { write.write(GenericRow.of(2, 20)); commit.commit(1, write.prepareCommit(false, 1)); assertThat(table.snapshotManager().latestSnapshotId()).isEqualTo(1L); + FileIO fileIO = table.fileIO(); IcebergMetadata metadata = IcebergMetadata.fromPath( - table.fileIO(), new Path(table.location(), "metadata/v1.metadata.json")); + fileIO, new Path(table.location(), "metadata/v1.metadata.json")); assertThat(metadata.snapshots()).hasSize(1); assertThat(metadata.currentSnapshotId()).isEqualTo(1); @@ -294,7 +296,7 @@ public void testIcebergSnapshotExpire() throws Exception { assertThat(table.snapshotManager().latestSnapshotId()).isEqualTo(3L); metadata = IcebergMetadata.fromPath( - table.fileIO(), new Path(table.location(), "metadata/v3.metadata.json")); + fileIO, new Path(table.location(), "metadata/v3.metadata.json")); assertThat(metadata.snapshots()).hasSize(3); assertThat(metadata.currentSnapshotId()).isEqualTo(3); @@ -304,15 +306,25 @@ public void testIcebergSnapshotExpire() throws Exception { IcebergPathFactory pathFactory = new IcebergPathFactory(new Path(table.location(), "metadata")); IcebergManifestList manifestList = IcebergManifestList.create(table, pathFactory); - assertThat(manifestList.compression()).isEqualTo("gzip"); + assertThat(manifestList.compression()).isEqualTo("snappy"); IcebergManifestFile manifestFile = IcebergManifestFile.create(table, pathFactory); - assertThat(manifestFile.compression()).isEqualTo("gzip"); + assertThat(manifestFile.compression()).isEqualTo("snappy"); Set usingManifests = new HashSet<>(); String manifestListFile = new Path(metadata.currentSnapshot().manifestList()).getName(); + + assertThat(fileIO.readFileUtf8(new Path(pathFactory.metadataDirectory(), manifestListFile))) + .contains("snappy"); + for (IcebergManifestFileMeta fileMeta : manifestList.read(manifestListFile)) { usingManifests.add(fileMeta.manifestPath()); + assertThat( + fileIO.readFileUtf8( + new Path( + pathFactory.metadataDirectory(), + fileMeta.manifestPath()))) + .contains("snappy"); } IcebergManifestList legacyManifestList = @@ -345,7 +357,7 @@ public void testIcebergSnapshotExpire() throws Exception { assertThat(table.snapshotManager().latestSnapshotId()).isEqualTo(5L); metadata = IcebergMetadata.fromPath( - table.fileIO(), new Path(table.location(), "metadata/v5.metadata.json")); + fileIO, new Path(table.location(), "metadata/v5.metadata.json")); assertThat(metadata.snapshots()).hasSize(3); assertThat(metadata.currentSnapshotId()).isEqualTo(5); @@ -358,7 +370,7 @@ public void testIcebergSnapshotExpire() throws Exception { } for (String path : unusedFiles) { - assertThat(table.fileIO().exists(new Path(path))).isFalse(); + assertThat(fileIO.exists(new Path(path))).isFalse(); } // Test all existing Iceberg snapshots are valid. diff --git a/paimon-format/src/main/java/org/apache/paimon/format/avro/AvroFileFormat.java b/paimon-format/src/main/java/org/apache/paimon/format/avro/AvroFileFormat.java index 63a51c0a13a9..fcce9ae50530 100644 --- a/paimon-format/src/main/java/org/apache/paimon/format/avro/AvroFileFormat.java +++ b/paimon-format/src/main/java/org/apache/paimon/format/avro/AvroFileFormat.java @@ -105,7 +105,7 @@ private CodecFactory createCodecFactory(String compression) { if (compression.equalsIgnoreCase("zstd")) { return CodecFactory.zstandardCodec(zstdLevel); } - return CodecFactory.fromString(options.get(AVRO_OUTPUT_CODEC)); + return CodecFactory.fromString(compression); } /** A {@link FormatWriterFactory} to write {@link InternalRow}. */ diff --git a/paimon-format/src/test/java/org/apache/paimon/format/avro/AvroFileFormatTest.java b/paimon-format/src/test/java/org/apache/paimon/format/avro/AvroFileFormatTest.java index 3f6486baaef2..9c0dbb43fe62 100644 --- a/paimon-format/src/test/java/org/apache/paimon/format/avro/AvroFileFormatTest.java +++ b/paimon-format/src/test/java/org/apache/paimon/format/avro/AvroFileFormatTest.java @@ -198,4 +198,16 @@ private void checkException() throws IOException { .isInstanceOf(IOException.class) .hasMessageContaining("Artificial exception"); } + + @Test + void testCompression() throws IOException { + RowType rowType = DataTypes.ROW(DataTypes.INT().notNull()); + AvroFileFormat format = new AvroFileFormat(new FormatContext(new Options(), 1024, 1024)); + LocalFileIO localFileIO = LocalFileIO.create(); + Path file = new Path(new Path(tempPath.toUri()), UUID.randomUUID().toString()); + try (PositionOutputStream out = localFileIO.newOutputStream(file, false)) { + assertThatThrownBy(() -> format.createWriterFactory(rowType).create(out, "unsupported")) + .hasMessageContaining("Unrecognized codec: unsupported"); + } + } }