diff --git a/parquet-benchmarks/src/main/java/org/apache/parquet/benchmarks/BenchmarkFiles.java b/parquet-benchmarks/src/main/java/org/apache/parquet/benchmarks/BenchmarkFiles.java index 24da8220ca..3bb082e598 100644 --- a/parquet-benchmarks/src/main/java/org/apache/parquet/benchmarks/BenchmarkFiles.java +++ b/parquet-benchmarks/src/main/java/org/apache/parquet/benchmarks/BenchmarkFiles.java @@ -39,6 +39,7 @@ public class BenchmarkFiles { // public final Path parquetFile_1M_LZO = new Path("target/tests/ParquetBenchmarks/PARQUET-1M-LZO"); public static final Path file_1M_SNAPPY = new Path(TARGET_DIR + "/PARQUET-1M-SNAPPY"); public static final Path file_1M_GZIP = new Path(TARGET_DIR + "/PARQUET-1M-GZIP"); + public static final Path file_1M_AIRLIFT_GZIP = new Path(TARGET_DIR + "/PARQUET-1M-AIRLIFT-GZIP"); // Page checksum files public static final Path file_100K_CHECKSUMS_UNCOMPRESSED = new Path(TARGET_DIR + "/PARQUET-100K-CHECKSUMS-UNCOMPRESSED"); @@ -53,7 +54,11 @@ public class BenchmarkFiles { public static final Path file_1M_CHECKSUMS_GZIP = new Path(TARGET_DIR + "/PARQUET-1M-CHECKSUMS-GZIP"); public static final Path file_1M_NOCHECKSUMS_GZIP = new Path(TARGET_DIR + "/PARQUET-1M-NOCHECKSUMS-GZIP"); public static final Path file_10M_CHECKSUMS_GZIP = new Path(TARGET_DIR + "/PARQUET-10M-CHECKSUMS-GZIP"); + public static final Path file_10M_CHECKSUMS_AIRLIFT_GZIP = new Path(TARGET_DIR + "/PARQUET-10M-CHECKSUMS-AIRLIFT" + + "-ZIP"); public static final Path file_10M_NOCHECKSUMS_GZIP = new Path(TARGET_DIR + "/PARQUET-10M-NOCHECKSUMS-GZIP"); + public static final Path file_10M_NOCHECKSUMS_AIRLIFT_GZIP = new Path(TARGET_DIR + "/PARQUET-10M-AIRLIFT" + + "-NOCHECKSUMS-GZIP"); public static final Path file_100K_CHECKSUMS_SNAPPY = new Path(TARGET_DIR + "/PARQUET-100K-CHECKSUMS-SNAPPY"); public static final Path file_100K_NOCHECKSUMS_SNAPPY = new Path(TARGET_DIR + "/PARQUET-100K-NOCHECKSUMS-SNAPPY"); diff --git a/parquet-benchmarks/src/main/java/org/apache/parquet/benchmarks/DataGenerator.java b/parquet-benchmarks/src/main/java/org/apache/parquet/benchmarks/DataGenerator.java index 3b5db686fa..3018131594 100644 --- a/parquet-benchmarks/src/main/java/org/apache/parquet/benchmarks/DataGenerator.java +++ b/parquet-benchmarks/src/main/java/org/apache/parquet/benchmarks/DataGenerator.java @@ -59,6 +59,7 @@ public void generateAll() { // generateData(parquetFile_1M_LZO, configuration, PARQUET_2_0, BLOCK_SIZE_DEFAULT, PAGE_SIZE_DEFAULT, FIXED_LEN_BYTEARRAY_SIZE, LZO, ONE_MILLION); generateData(file_1M_SNAPPY, configuration, PARQUET_2_0, BLOCK_SIZE_DEFAULT, PAGE_SIZE_DEFAULT, FIXED_LEN_BYTEARRAY_SIZE, SNAPPY, ONE_MILLION); generateData(file_1M_GZIP, configuration, PARQUET_2_0, BLOCK_SIZE_DEFAULT, PAGE_SIZE_DEFAULT, FIXED_LEN_BYTEARRAY_SIZE, GZIP, ONE_MILLION); + generateData(file_1M_AIRLIFT_GZIP, configuration, PARQUET_2_0, BLOCK_SIZE_DEFAULT, PAGE_SIZE_DEFAULT, FIXED_LEN_BYTEARRAY_SIZE, GZIP, ONE_MILLION); } catch (IOException e) { throw new RuntimeException(e); diff --git a/parquet-benchmarks/src/main/java/org/apache/parquet/benchmarks/PageChecksumDataGenerator.java b/parquet-benchmarks/src/main/java/org/apache/parquet/benchmarks/PageChecksumDataGenerator.java index 49ebdce8e3..916d85a41d 100644 --- a/parquet-benchmarks/src/main/java/org/apache/parquet/benchmarks/PageChecksumDataGenerator.java +++ b/parquet-benchmarks/src/main/java/org/apache/parquet/benchmarks/PageChecksumDataGenerator.java @@ -36,7 +36,6 @@ import java.io.IOException; import java.util.Random; -import static org.apache.parquet.benchmarks.BenchmarkUtils.deleteIfExists; import static org.apache.parquet.benchmarks.BenchmarkUtils.exists; import static org.apache.parquet.hadoop.metadata.CompressionCodecName.*; @@ -51,8 +50,9 @@ public class PageChecksumDataGenerator extends DataGenerator { " }" + "}"); - public void generateData(Path outFile, int nRows, boolean writeChecksums, - CompressionCodecName compression) throws IOException { + public void generateData( + Path outFile, int nRows, boolean writeChecksums, + CompressionCodecName compression, boolean useAirlift) throws IOException { if (exists(configuration, outFile)) { System.out.println("File already exists " + outFile); return; @@ -64,6 +64,7 @@ public void generateData(Path outFile, int nRows, boolean writeChecksums, .withCompressionCodec(compression) .withDictionaryEncoding(true) .withType(SCHEMA) + .withAirliftCompressorsEnabled(useAirlift) .withPageWriteChecksumEnabled(writeChecksums) .build(); @@ -90,15 +91,16 @@ public void generateAll() { try { // No need to generate the non-checksum versions, as the files generated here are only used in // the read benchmarks - generateData(file_100K_CHECKSUMS_UNCOMPRESSED, 100 * ONE_K, true, UNCOMPRESSED); - generateData(file_100K_CHECKSUMS_GZIP, 100 * ONE_K, true, GZIP); - generateData(file_100K_CHECKSUMS_SNAPPY, 100 * ONE_K, true, SNAPPY); - generateData(file_1M_CHECKSUMS_UNCOMPRESSED, ONE_MILLION, true, UNCOMPRESSED); - generateData(file_1M_CHECKSUMS_GZIP, ONE_MILLION, true, GZIP); - generateData(file_1M_CHECKSUMS_SNAPPY, ONE_MILLION, true, SNAPPY); - generateData(file_10M_CHECKSUMS_UNCOMPRESSED, 10 * ONE_MILLION, true, UNCOMPRESSED); - generateData(file_10M_CHECKSUMS_GZIP, 10 * ONE_MILLION, true, GZIP); - generateData(file_10M_CHECKSUMS_SNAPPY, 10 * ONE_MILLION, true, SNAPPY); + generateData(file_100K_CHECKSUMS_UNCOMPRESSED, 100 * ONE_K, true, UNCOMPRESSED, false); + generateData(file_100K_CHECKSUMS_GZIP, 100 * ONE_K, true, GZIP, false); + generateData(file_100K_CHECKSUMS_SNAPPY, 100 * ONE_K, true, SNAPPY, false); + generateData(file_1M_CHECKSUMS_UNCOMPRESSED, ONE_MILLION, true, UNCOMPRESSED, false); + generateData(file_1M_CHECKSUMS_GZIP, ONE_MILLION, true, GZIP, false); + generateData(file_1M_CHECKSUMS_SNAPPY, ONE_MILLION, true, SNAPPY, false); + generateData(file_10M_CHECKSUMS_UNCOMPRESSED, 10 * ONE_MILLION, true, UNCOMPRESSED, false); + generateData(file_10M_CHECKSUMS_GZIP, 10 * ONE_MILLION, true, GZIP, false); + generateData(file_10M_CHECKSUMS_AIRLIFT_GZIP, 10 * ONE_MILLION, true, GZIP, false); + generateData(file_10M_CHECKSUMS_SNAPPY, 10 * ONE_MILLION, true, SNAPPY, false); } catch (IOException e) { throw new RuntimeException(e); } diff --git a/parquet-benchmarks/src/main/java/org/apache/parquet/benchmarks/PageChecksumReadBenchmarks.java b/parquet-benchmarks/src/main/java/org/apache/parquet/benchmarks/PageChecksumReadBenchmarks.java index be2ebe40f7..fa288b69c7 100644 --- a/parquet-benchmarks/src/main/java/org/apache/parquet/benchmarks/PageChecksumReadBenchmarks.java +++ b/parquet-benchmarks/src/main/java/org/apache/parquet/benchmarks/PageChecksumReadBenchmarks.java @@ -37,6 +37,7 @@ import static org.apache.parquet.benchmarks.BenchmarkFiles.file_100K_CHECKSUMS_UNCOMPRESSED; import static org.apache.parquet.benchmarks.BenchmarkFiles.file_100K_CHECKSUMS_GZIP; import static org.apache.parquet.benchmarks.BenchmarkFiles.file_100K_CHECKSUMS_SNAPPY; +import static org.apache.parquet.benchmarks.BenchmarkFiles.file_10M_CHECKSUMS_AIRLIFT_GZIP; import static org.apache.parquet.benchmarks.BenchmarkFiles.file_1M_CHECKSUMS_UNCOMPRESSED; import static org.apache.parquet.benchmarks.BenchmarkFiles.file_1M_CHECKSUMS_GZIP; import static org.apache.parquet.benchmarks.BenchmarkFiles.file_1M_CHECKSUMS_SNAPPY; @@ -60,11 +61,12 @@ public void setup() { pageChecksumDataGenerator.generateAll(); } - private void readFile(Path file, int nRows, boolean verifyChecksums, Blackhole blackhole) + private void readFile(Path file, int nRows, boolean verifyChecksums, Blackhole blackhole, boolean useAirlift) throws IOException { try (ParquetReader reader = ParquetReader.builder(new GroupReadSupport(), file) .withConf(configuration) .usePageChecksumVerification(verifyChecksums) + .useAirliftCompressors(useAirlift) .build()) { for (int i = 0; i < nRows; i++) { Group group = reader.read(); @@ -84,37 +86,37 @@ private void readFile(Path file, int nRows, boolean verifyChecksums, Blackhole b @Benchmark @BenchmarkMode(Mode.SingleShotTime) public void read100KRowsUncompressedWithoutVerification(Blackhole blackhole) throws IOException { - readFile(file_100K_CHECKSUMS_UNCOMPRESSED, 100 * ONE_K, false, blackhole); + readFile(file_100K_CHECKSUMS_UNCOMPRESSED, 100 * ONE_K, false, blackhole, false); } @Benchmark @BenchmarkMode(Mode.SingleShotTime) public void read100KRowsUncompressedWithVerification(Blackhole blackhole) throws IOException { - readFile(file_100K_CHECKSUMS_UNCOMPRESSED, 100 * ONE_K, true, blackhole); + readFile(file_100K_CHECKSUMS_UNCOMPRESSED, 100 * ONE_K, true, blackhole, false); } @Benchmark @BenchmarkMode(Mode.SingleShotTime) public void read100KRowsGzipWithoutVerification(Blackhole blackhole) throws IOException { - readFile(file_100K_CHECKSUMS_GZIP, 100 * ONE_K, false, blackhole); + readFile(file_100K_CHECKSUMS_GZIP, 100 * ONE_K, false, blackhole, false); } @Benchmark @BenchmarkMode(Mode.SingleShotTime) public void read100KRowsGzipWithVerification(Blackhole blackhole) throws IOException { - readFile(file_100K_CHECKSUMS_GZIP, 100 * ONE_K, true, blackhole); + readFile(file_100K_CHECKSUMS_GZIP, 100 * ONE_K, true, blackhole, false); } @Benchmark @BenchmarkMode(Mode.SingleShotTime) public void read100KRowsSnappyWithoutVerification(Blackhole blackhole) throws IOException { - readFile(file_100K_CHECKSUMS_SNAPPY, 100 * ONE_K, false, blackhole); + readFile(file_100K_CHECKSUMS_SNAPPY, 100 * ONE_K, false, blackhole, false); } @Benchmark @BenchmarkMode(Mode.SingleShotTime) public void read100KRowsSnappyWithVerification(Blackhole blackhole) throws IOException { - readFile(file_100K_CHECKSUMS_SNAPPY, 100 * ONE_K, true, blackhole); + readFile(file_100K_CHECKSUMS_SNAPPY, 100 * ONE_K, true, blackhole, false); } // 1M rows, uncompressed, GZIP, Snappy @@ -122,37 +124,37 @@ public void read100KRowsSnappyWithVerification(Blackhole blackhole) throws IOExc @Benchmark @BenchmarkMode(Mode.SingleShotTime) public void read1MRowsUncompressedWithoutVerification(Blackhole blackhole) throws IOException { - readFile(file_1M_CHECKSUMS_UNCOMPRESSED, ONE_MILLION, false, blackhole); + readFile(file_1M_CHECKSUMS_UNCOMPRESSED, ONE_MILLION, false, blackhole, false); } @Benchmark @BenchmarkMode(Mode.SingleShotTime) public void read1MRowsUncompressedWithVerification(Blackhole blackhole) throws IOException { - readFile(file_1M_CHECKSUMS_UNCOMPRESSED, ONE_MILLION, true, blackhole); + readFile(file_1M_CHECKSUMS_UNCOMPRESSED, ONE_MILLION, true, blackhole, false); } @Benchmark @BenchmarkMode(Mode.SingleShotTime) public void read1MRowsGzipWithoutVerification(Blackhole blackhole) throws IOException { - readFile(file_1M_CHECKSUMS_GZIP, ONE_MILLION, false, blackhole); + readFile(file_1M_CHECKSUMS_GZIP, ONE_MILLION, false, blackhole, false); } @Benchmark @BenchmarkMode(Mode.SingleShotTime) public void read1MRowsGzipWithVerification(Blackhole blackhole) throws IOException { - readFile(file_1M_CHECKSUMS_GZIP, ONE_MILLION, true, blackhole); + readFile(file_1M_CHECKSUMS_GZIP, ONE_MILLION, true, blackhole, false); } @Benchmark @BenchmarkMode(Mode.SingleShotTime) public void read1MRowsSnappyWithoutVerification(Blackhole blackhole) throws IOException { - readFile(file_1M_CHECKSUMS_SNAPPY, ONE_MILLION, false, blackhole); + readFile(file_1M_CHECKSUMS_SNAPPY, ONE_MILLION, false, blackhole, false); } @Benchmark @BenchmarkMode(Mode.SingleShotTime) public void read1MRowsSnappyWithVerification(Blackhole blackhole) throws IOException { - readFile(file_1M_CHECKSUMS_SNAPPY, ONE_MILLION, true, blackhole); + readFile(file_1M_CHECKSUMS_SNAPPY, ONE_MILLION, true, blackhole, false); } // 10M rows, uncompressed, GZIP, Snappy @@ -160,37 +162,49 @@ public void read1MRowsSnappyWithVerification(Blackhole blackhole) throws IOExcep @Benchmark @BenchmarkMode(Mode.SingleShotTime) public void read10MRowsUncompressedWithoutVerification(Blackhole blackhole) throws IOException { - readFile(file_10M_CHECKSUMS_UNCOMPRESSED, 10 * ONE_MILLION, false, blackhole); + readFile(file_10M_CHECKSUMS_UNCOMPRESSED, 10 * ONE_MILLION, false, blackhole, false); } @Benchmark @BenchmarkMode(Mode.SingleShotTime) public void read10MRowsUncompressedWithVerification(Blackhole blackhole) throws IOException { - readFile(file_10M_CHECKSUMS_UNCOMPRESSED, 10 * ONE_MILLION, true, blackhole); + readFile(file_10M_CHECKSUMS_UNCOMPRESSED, 10 * ONE_MILLION, true, blackhole, false); } @Benchmark @BenchmarkMode(Mode.SingleShotTime) public void read10MRowsGzipWithoutVerification(Blackhole blackhole) throws IOException { - readFile(file_10M_CHECKSUMS_GZIP, 10 * ONE_MILLION, false, blackhole); + readFile(file_10M_CHECKSUMS_GZIP, 10 * ONE_MILLION, false, blackhole, false); + } + + @Benchmark + @BenchmarkMode(Mode.SingleShotTime) + public void read10MRowsAirliftGzipWithoutVerification(Blackhole blackhole) throws IOException { + readFile(file_10M_CHECKSUMS_AIRLIFT_GZIP, 10 * ONE_MILLION, false, blackhole, true); } @Benchmark @BenchmarkMode(Mode.SingleShotTime) public void read10MRowsGzipWithVerification(Blackhole blackhole) throws IOException { - readFile(file_10M_CHECKSUMS_GZIP, 10 * ONE_MILLION, true, blackhole); + readFile(file_10M_CHECKSUMS_GZIP, 10 * ONE_MILLION, true, blackhole, false); + } + + @Benchmark + @BenchmarkMode(Mode.SingleShotTime) + public void read10MRowsAirliftGzipWithVerification(Blackhole blackhole) throws IOException { + readFile(file_10M_CHECKSUMS_AIRLIFT_GZIP, 10 * ONE_MILLION, true, blackhole, true); } @Benchmark @BenchmarkMode(Mode.SingleShotTime) public void read10MRowsSnappyWithoutVerification(Blackhole blackhole) throws IOException { - readFile(file_10M_CHECKSUMS_SNAPPY, 10 * ONE_MILLION, false, blackhole); + readFile(file_10M_CHECKSUMS_SNAPPY, 10 * ONE_MILLION, false, blackhole, false); } @Benchmark @BenchmarkMode(Mode.SingleShotTime) public void read10MRowsSnappyWithVerification(Blackhole blackhole) throws IOException { - readFile(file_10M_CHECKSUMS_SNAPPY, 10 * ONE_MILLION, true, blackhole); + readFile(file_10M_CHECKSUMS_SNAPPY, 10 * ONE_MILLION, true, blackhole, false); } } diff --git a/parquet-benchmarks/src/main/java/org/apache/parquet/benchmarks/PageChecksumWriteBenchmarks.java b/parquet-benchmarks/src/main/java/org/apache/parquet/benchmarks/PageChecksumWriteBenchmarks.java index e892d53a76..53f77b8649 100644 --- a/parquet-benchmarks/src/main/java/org/apache/parquet/benchmarks/PageChecksumWriteBenchmarks.java +++ b/parquet-benchmarks/src/main/java/org/apache/parquet/benchmarks/PageChecksumWriteBenchmarks.java @@ -34,6 +34,7 @@ import static org.apache.parquet.benchmarks.BenchmarkFiles.file_100K_NOCHECKSUMS_GZIP; import static org.apache.parquet.benchmarks.BenchmarkFiles.file_100K_CHECKSUMS_SNAPPY; import static org.apache.parquet.benchmarks.BenchmarkFiles.file_100K_NOCHECKSUMS_SNAPPY; +import static org.apache.parquet.benchmarks.BenchmarkFiles.file_10M_CHECKSUMS_AIRLIFT_GZIP; import static org.apache.parquet.benchmarks.BenchmarkFiles.file_1M_CHECKSUMS_UNCOMPRESSED; import static org.apache.parquet.benchmarks.BenchmarkFiles.file_1M_NOCHECKSUMS_UNCOMPRESSED; import static org.apache.parquet.benchmarks.BenchmarkFiles.file_1M_CHECKSUMS_GZIP; @@ -43,6 +44,7 @@ import static org.apache.parquet.benchmarks.BenchmarkFiles.file_10M_CHECKSUMS_UNCOMPRESSED; import static org.apache.parquet.benchmarks.BenchmarkFiles.file_10M_NOCHECKSUMS_UNCOMPRESSED; import static org.apache.parquet.benchmarks.BenchmarkFiles.file_10M_CHECKSUMS_GZIP; +import static org.apache.parquet.benchmarks.BenchmarkFiles.file_10M_NOCHECKSUMS_AIRLIFT_GZIP; import static org.apache.parquet.benchmarks.BenchmarkFiles.file_10M_NOCHECKSUMS_GZIP; import static org.apache.parquet.benchmarks.BenchmarkFiles.file_10M_CHECKSUMS_SNAPPY; import static org.apache.parquet.benchmarks.BenchmarkFiles.file_10M_NOCHECKSUMS_SNAPPY; @@ -66,37 +68,37 @@ public void setup() { @Benchmark @BenchmarkMode(Mode.SingleShotTime) public void write100KRowsUncompressedWithoutChecksums() throws IOException { - pageChecksumDataGenerator.generateData(file_100K_NOCHECKSUMS_UNCOMPRESSED, 100 * ONE_K, false, UNCOMPRESSED); + pageChecksumDataGenerator.generateData(file_100K_NOCHECKSUMS_UNCOMPRESSED, 100 * ONE_K, false, UNCOMPRESSED, false); } @Benchmark @BenchmarkMode(Mode.SingleShotTime) public void write100KRowsUncompressedWithChecksums() throws IOException { - pageChecksumDataGenerator.generateData(file_100K_CHECKSUMS_UNCOMPRESSED, 100 * ONE_K, true, UNCOMPRESSED); + pageChecksumDataGenerator.generateData(file_100K_CHECKSUMS_UNCOMPRESSED, 100 * ONE_K, true, UNCOMPRESSED, false); } @Benchmark @BenchmarkMode(Mode.SingleShotTime) public void write100KRowsGzipWithoutChecksums() throws IOException { - pageChecksumDataGenerator.generateData(file_100K_NOCHECKSUMS_GZIP, 100 * ONE_K, false, GZIP); + pageChecksumDataGenerator.generateData(file_100K_NOCHECKSUMS_GZIP, 100 * ONE_K, false, GZIP, false); } @Benchmark @BenchmarkMode(Mode.SingleShotTime) public void write100KRowsGzipWithChecksums() throws IOException { - pageChecksumDataGenerator.generateData(file_100K_CHECKSUMS_GZIP, 100 * ONE_K, true, GZIP); + pageChecksumDataGenerator.generateData(file_100K_CHECKSUMS_GZIP, 100 * ONE_K, true, GZIP, false); } @Benchmark @BenchmarkMode(Mode.SingleShotTime) public void write100KRowsSnappyWithoutChecksums() throws IOException { - pageChecksumDataGenerator.generateData(file_100K_NOCHECKSUMS_SNAPPY, 100 * ONE_K, false, SNAPPY); + pageChecksumDataGenerator.generateData(file_100K_NOCHECKSUMS_SNAPPY, 100 * ONE_K, false, SNAPPY, false); } @Benchmark @BenchmarkMode(Mode.SingleShotTime) public void write100KRowsSnappyWithChecksums() throws IOException { - pageChecksumDataGenerator.generateData(file_100K_CHECKSUMS_SNAPPY, 100 * ONE_K, true, SNAPPY); + pageChecksumDataGenerator.generateData(file_100K_CHECKSUMS_SNAPPY, 100 * ONE_K, true, SNAPPY, false); } // 1M rows, uncompressed, GZIP, Snappy @@ -104,37 +106,37 @@ public void write100KRowsSnappyWithChecksums() throws IOException { @Benchmark @BenchmarkMode(Mode.SingleShotTime) public void write1MRowsUncompressedWithoutChecksums() throws IOException { - pageChecksumDataGenerator.generateData(file_1M_NOCHECKSUMS_UNCOMPRESSED, ONE_MILLION, false, UNCOMPRESSED); + pageChecksumDataGenerator.generateData(file_1M_NOCHECKSUMS_UNCOMPRESSED, ONE_MILLION, false, UNCOMPRESSED, false); } @Benchmark @BenchmarkMode(Mode.SingleShotTime) public void write1MRowsUncompressedWithChecksums() throws IOException { - pageChecksumDataGenerator.generateData(file_1M_CHECKSUMS_UNCOMPRESSED, ONE_MILLION, true, UNCOMPRESSED); + pageChecksumDataGenerator.generateData(file_1M_CHECKSUMS_UNCOMPRESSED, ONE_MILLION, true, UNCOMPRESSED, false); } @Benchmark @BenchmarkMode(Mode.SingleShotTime) public void write1MRowsGzipWithoutChecksums() throws IOException { - pageChecksumDataGenerator.generateData(file_1M_NOCHECKSUMS_GZIP, ONE_MILLION, false, GZIP); + pageChecksumDataGenerator.generateData(file_1M_NOCHECKSUMS_GZIP, ONE_MILLION, false, GZIP, false); } @Benchmark @BenchmarkMode(Mode.SingleShotTime) public void write1MRowsGzipWithChecksums() throws IOException { - pageChecksumDataGenerator.generateData(file_1M_CHECKSUMS_GZIP, ONE_MILLION, true, GZIP); + pageChecksumDataGenerator.generateData(file_1M_CHECKSUMS_GZIP, ONE_MILLION, true, GZIP, false); } @Benchmark @BenchmarkMode(Mode.SingleShotTime) public void write1MRowsSnappyWithoutChecksums() throws IOException { - pageChecksumDataGenerator.generateData(file_1M_NOCHECKSUMS_SNAPPY, ONE_MILLION, false, SNAPPY); + pageChecksumDataGenerator.generateData(file_1M_NOCHECKSUMS_SNAPPY, ONE_MILLION, false, SNAPPY, false); } @Benchmark @BenchmarkMode(Mode.SingleShotTime) public void write1MRowsSnappyWithChecksums() throws IOException { - pageChecksumDataGenerator.generateData(file_1M_CHECKSUMS_SNAPPY, ONE_MILLION, true, SNAPPY); + pageChecksumDataGenerator.generateData(file_1M_CHECKSUMS_SNAPPY, ONE_MILLION, true, SNAPPY, false); } // 10M rows, uncompressed, GZIP, Snappy @@ -142,37 +144,50 @@ public void write1MRowsSnappyWithChecksums() throws IOException { @Benchmark @BenchmarkMode(Mode.SingleShotTime) public void write10MRowsUncompressedWithoutChecksums() throws IOException { - pageChecksumDataGenerator.generateData(file_10M_NOCHECKSUMS_UNCOMPRESSED, 10 * ONE_MILLION, false, UNCOMPRESSED); + pageChecksumDataGenerator.generateData(file_10M_NOCHECKSUMS_UNCOMPRESSED, 10 * ONE_MILLION, false, UNCOMPRESSED, + false); } @Benchmark @BenchmarkMode(Mode.SingleShotTime) public void write10MRowsUncompressedWithChecksums() throws IOException { - pageChecksumDataGenerator.generateData(file_10M_CHECKSUMS_UNCOMPRESSED, 10 * ONE_MILLION, true, UNCOMPRESSED); + pageChecksumDataGenerator.generateData(file_10M_CHECKSUMS_UNCOMPRESSED, 10 * ONE_MILLION, true, UNCOMPRESSED, false); } @Benchmark @BenchmarkMode(Mode.SingleShotTime) public void write10MRowsGzipWithoutChecksums() throws IOException { - pageChecksumDataGenerator.generateData(file_10M_NOCHECKSUMS_GZIP, 10 * ONE_MILLION, false, GZIP); + pageChecksumDataGenerator.generateData(file_10M_NOCHECKSUMS_GZIP, 10 * ONE_MILLION, false, GZIP, false); + } + + @Benchmark + @BenchmarkMode(Mode.SingleShotTime) + public void write10MRowsAirliftGzipWithoutChecksums() throws IOException { + pageChecksumDataGenerator.generateData(file_10M_NOCHECKSUMS_AIRLIFT_GZIP, 10 * ONE_MILLION, false, GZIP, true); } @Benchmark @BenchmarkMode(Mode.SingleShotTime) public void write10MRowsGzipWithChecksums() throws IOException { - pageChecksumDataGenerator.generateData(file_10M_CHECKSUMS_GZIP, 10 * ONE_MILLION, true, GZIP); + pageChecksumDataGenerator.generateData(file_10M_CHECKSUMS_GZIP, 10 * ONE_MILLION, true, GZIP, false); + } + + @Benchmark + @BenchmarkMode(Mode.SingleShotTime) + public void write10MRowsAirliftGzipWithChecksums() throws IOException { + pageChecksumDataGenerator.generateData(file_10M_CHECKSUMS_AIRLIFT_GZIP, 10 * ONE_MILLION, true, GZIP, true); } @Benchmark @BenchmarkMode(Mode.SingleShotTime) public void write10MRowsSnappyWithoutChecksums() throws IOException { - pageChecksumDataGenerator.generateData(file_10M_NOCHECKSUMS_SNAPPY, 10 * ONE_MILLION, false, SNAPPY); + pageChecksumDataGenerator.generateData(file_10M_NOCHECKSUMS_SNAPPY, 10 * ONE_MILLION, false, SNAPPY, false); } @Benchmark @BenchmarkMode(Mode.SingleShotTime) public void write10MRowsSnappyWithChecksums() throws IOException { - pageChecksumDataGenerator.generateData(file_10M_CHECKSUMS_SNAPPY, 10 * ONE_MILLION, true, SNAPPY); + pageChecksumDataGenerator.generateData(file_10M_CHECKSUMS_SNAPPY, 10 * ONE_MILLION, true, SNAPPY, false); } } diff --git a/parquet-benchmarks/src/main/java/org/apache/parquet/benchmarks/ReadBenchmarks.java b/parquet-benchmarks/src/main/java/org/apache/parquet/benchmarks/ReadBenchmarks.java index e74204a69d..583a701eec 100644 --- a/parquet-benchmarks/src/main/java/org/apache/parquet/benchmarks/ReadBenchmarks.java +++ b/parquet-benchmarks/src/main/java/org/apache/parquet/benchmarks/ReadBenchmarks.java @@ -128,4 +128,12 @@ public void read1MRowsDefaultBlockAndPageSizeGZIP(Blackhole blackhole) { read(file_1M_GZIP, ONE_MILLION, blackhole); } + + @Benchmark + @BenchmarkMode(Mode.SingleShotTime) + public void read1MRowsDefaultBlockAndPageSizeAirliftGZIP(Blackhole blackhole) + throws IOException + { + read(file_1M_AIRLIFT_GZIP, ONE_MILLION, blackhole); + } } diff --git a/parquet-cli/src/main/java/org/apache/parquet/cli/commands/TransCompressionCommand.java b/parquet-cli/src/main/java/org/apache/parquet/cli/commands/TransCompressionCommand.java index cae68107da..644e71f3a3 100644 --- a/parquet-cli/src/main/java/org/apache/parquet/cli/commands/TransCompressionCommand.java +++ b/parquet-cli/src/main/java/org/apache/parquet/cli/commands/TransCompressionCommand.java @@ -57,9 +57,12 @@ public TransCompressionCommand(Logger console) { @Parameter(description = "") String output; - @Parameter(description = " bloomFilterNDVs; private final int maxBloomFilterBytes; private final ColumnProperty bloomFilterEnabled; + private final boolean useAirliftCompressors; private final int pageRowCountLimit; private final boolean pageWriteChecksumEnabled; private final boolean enableByteStreamSplit; @@ -127,6 +129,7 @@ private ParquetProperties(Builder builder) { this.pageRowCountLimit = builder.pageRowCountLimit; this.pageWriteChecksumEnabled = builder.pageWriteChecksumEnabled; this.enableByteStreamSplit = builder.enableByteStreamSplit; + this.useAirliftCompressors = builder.useAirliftCompressors; } public ValuesWriter newRepetitionLevelWriter(ColumnDescriptor path) { @@ -262,6 +265,10 @@ public boolean isBloomFilterEnabled(ColumnDescriptor column) { return bloomFilterEnabled.getValue(column); } + public boolean useAirliftCompressors() { + return useAirliftCompressors; + } + public int getMaxBloomFilterBytes() { return maxBloomFilterBytes; } @@ -286,6 +293,7 @@ public String toString() { + "Truncate length for column indexes is: " + getColumnIndexTruncateLength() + '\n' + "Truncate length for statistics min/max is: " + getStatisticsTruncateLength() + '\n' + "Bloom filter enabled: " + bloomFilterEnabled + '\n' + + "Airlift compressors enabled: " + useAirliftCompressors + '\n' + "Max Bloom filter size for a column is " + getMaxBloomFilterBytes() + '\n' + "Bloom filter expected number of distinct values are: " + bloomFilterNDVs + '\n' + "Page row count limit to " + getPageRowCountLimit() + '\n' @@ -310,6 +318,7 @@ public static class Builder { private int pageRowCountLimit = DEFAULT_PAGE_ROW_COUNT_LIMIT; private boolean pageWriteChecksumEnabled = DEFAULT_PAGE_WRITE_CHECKSUM_ENABLED; private boolean enableByteStreamSplit = DEFAULT_IS_BYTE_STREAM_SPLIT_ENABLED; + private boolean useAirliftCompressors = DEFAULT_AIRLIFT_COMPRESSORS_ENABLED; private Builder() { enableDict = ColumnProperty.builder().withDefaultValue(DEFAULT_IS_DICTIONARY_ENABLED); @@ -333,6 +342,7 @@ private Builder(ParquetProperties toCopy) { this.bloomFilterEnabled = ColumnProperty.builder(toCopy.bloomFilterEnabled); this.maxBloomFilterBytes = toCopy.maxBloomFilterBytes; this.enableByteStreamSplit = toCopy.enableByteStreamSplit; + this.useAirliftCompressors = toCopy.useAirliftCompressors; } /** @@ -484,6 +494,11 @@ public Builder withBloomFilterEnabled(boolean enabled) { return this; } + public Builder withAirliftCompressorsEnabled(boolean useAirliftCompressors) { + this.useAirliftCompressors = useAirliftCompressors; + return this; + } + /** * Enable or disable the bloom filter for the specified column. * One may either disable bloom filters for all columns by invoking {@link #withBloomFilterEnabled(boolean)} with a diff --git a/parquet-common/src/main/java/org/apache/parquet/compression/CompressionCodecFactory.java b/parquet-common/src/main/java/org/apache/parquet/compression/CompressionCodecFactory.java index 5b1b657230..099213ad4b 100644 --- a/parquet-common/src/main/java/org/apache/parquet/compression/CompressionCodecFactory.java +++ b/parquet-common/src/main/java/org/apache/parquet/compression/CompressionCodecFactory.java @@ -32,6 +32,8 @@ public interface CompressionCodecFactory { void release(); + CompressionCodecFactory withAirliftCompressors(boolean useAirliftCompressors); + interface BytesInputCompressor { BytesInput compress(BytesInput bytes) throws IOException; CompressionCodecName getCodecName(); diff --git a/parquet-hadoop/pom.xml b/parquet-hadoop/pom.xml index af2ac170df..32e63b87aa 100644 --- a/parquet-hadoop/pom.xml +++ b/parquet-hadoop/pom.xml @@ -127,6 +127,11 @@ ${slf4j.version} test + + io.airlift + aircompressor + 0.16 + diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/HadoopReadOptions.java b/parquet-hadoop/src/main/java/org/apache/parquet/HadoopReadOptions.java index b16a8c4ffa..35884db7a7 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/HadoopReadOptions.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/HadoopReadOptions.java @@ -24,6 +24,7 @@ import org.apache.parquet.compression.CompressionCodecFactory; import org.apache.parquet.filter2.compat.FilterCompat; import org.apache.parquet.format.converter.ParquetMetadataConverter.MetadataFilter; +import org.apache.parquet.hadoop.ParquetInputFormat; import org.apache.parquet.hadoop.util.HadoopCodecs; import java.util.Map; @@ -31,6 +32,7 @@ import static org.apache.parquet.hadoop.ParquetInputFormat.COLUMN_INDEX_FILTERING_ENABLED; import static org.apache.parquet.hadoop.ParquetInputFormat.DICTIONARY_FILTERING_ENABLED; import static org.apache.parquet.hadoop.ParquetInputFormat.BLOOM_FILTERING_ENABLED; +import static org.apache.parquet.hadoop.ParquetInputFormat.AIRLIFT_COMPRESSORS_ENABLED; import static org.apache.parquet.hadoop.ParquetInputFormat.getFilter; import static org.apache.parquet.hadoop.ParquetInputFormat.PAGE_VERIFY_CHECKSUM_ENABLED; import static org.apache.parquet.hadoop.ParquetInputFormat.RECORD_FILTERING_ENABLED; @@ -49,6 +51,7 @@ private HadoopReadOptions(boolean useSignedStringMinMax, boolean useColumnIndexFilter, boolean usePageChecksumVerification, boolean useBloomFilter, + boolean useAirliftCompressors, FilterCompat.Filter recordFilter, MetadataFilter metadataFilter, CompressionCodecFactory codecFactory, @@ -57,9 +60,9 @@ private HadoopReadOptions(boolean useSignedStringMinMax, Map properties, Configuration conf) { super( - useSignedStringMinMax, useStatsFilter, useDictionaryFilter, useRecordFilter, useColumnIndexFilter, - usePageChecksumVerification, useBloomFilter, recordFilter, metadataFilter, codecFactory, allocator, - maxAllocationSize, properties + useSignedStringMinMax, useStatsFilter, useDictionaryFilter, useRecordFilter, useColumnIndexFilter, + usePageChecksumVerification, useBloomFilter, useAirliftCompressors, recordFilter, metadataFilter, + codecFactory, allocator, maxAllocationSize, properties ); this.conf = conf; } @@ -94,7 +97,8 @@ public Builder(Configuration conf) { usePageChecksumVerification(conf.getBoolean(PAGE_VERIFY_CHECKSUM_ENABLED, usePageChecksumVerification)); useBloomFilter(conf.getBoolean(BLOOM_FILTERING_ENABLED, true)); - withCodecFactory(HadoopCodecs.newFactory(conf, 0)); + withCodecFactory(HadoopCodecs.newFactory(conf, 0, + conf.getBoolean(AIRLIFT_COMPRESSORS_ENABLED, useAirliftCompressors))); withRecordFilter(getFilter(conf)); withMaxAllocationInBytes(conf.getInt(ALLOCATION_SIZE, 8388608)); String badRecordThresh = conf.get(BAD_RECORD_THRESHOLD_CONF_KEY); @@ -107,8 +111,8 @@ public Builder(Configuration conf) { public ParquetReadOptions build() { return new HadoopReadOptions( useSignedStringMinMax, useStatsFilter, useDictionaryFilter, useRecordFilter, - useColumnIndexFilter, usePageChecksumVerification, useBloomFilter, recordFilter, metadataFilter, - codecFactory, allocator, maxAllocationSize, properties, conf); + useColumnIndexFilter, usePageChecksumVerification, useBloomFilter, useAirliftCompressors, recordFilter, + metadataFilter, codecFactory, allocator, maxAllocationSize, properties, conf); } } } diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/ParquetReadOptions.java b/parquet-hadoop/src/main/java/org/apache/parquet/ParquetReadOptions.java index 25b1b43744..99ad4f5808 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/ParquetReadOptions.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/ParquetReadOptions.java @@ -43,6 +43,7 @@ public class ParquetReadOptions { private static final int ALLOCATION_SIZE_DEFAULT = 8388608; // 8MB private static final boolean PAGE_VERIFY_CHECKSUM_ENABLED_DEFAULT = false; private static final boolean BLOOM_FILTER_ENABLED_DEFAULT = true; + private static final boolean AIRLIFT_COMPRESSORS_ENABLED_DEFAULT = false; private final boolean useSignedStringMinMax; private final boolean useStatsFilter; @@ -51,6 +52,7 @@ public class ParquetReadOptions { private final boolean useColumnIndexFilter; private final boolean usePageChecksumVerification; private final boolean useBloomFilter; + private final boolean useAirliftCompressors; private final FilterCompat.Filter recordFilter; private final ParquetMetadataConverter.MetadataFilter metadataFilter; private final CompressionCodecFactory codecFactory; @@ -65,6 +67,7 @@ public class ParquetReadOptions { boolean useColumnIndexFilter, boolean usePageChecksumVerification, boolean useBloomFilter, + boolean useAirliftCompressors, FilterCompat.Filter recordFilter, ParquetMetadataConverter.MetadataFilter metadataFilter, CompressionCodecFactory codecFactory, @@ -78,9 +81,14 @@ public class ParquetReadOptions { this.useColumnIndexFilter = useColumnIndexFilter; this.usePageChecksumVerification = usePageChecksumVerification; this.useBloomFilter = useBloomFilter; + this.useAirliftCompressors = useAirliftCompressors; this.recordFilter = recordFilter; this.metadataFilter = metadataFilter; - this.codecFactory = codecFactory; + if (codecFactory != null) { + this.codecFactory = codecFactory.withAirliftCompressors(useAirliftCompressors); + } else { + this.codecFactory = null; + } this.allocator = allocator; this.maxAllocationSize = maxAllocationSize; this.properties = Collections.unmodifiableMap(properties); @@ -110,6 +118,8 @@ public boolean useBloomFilter() { return useBloomFilter; } + public boolean useAirliftCompressors() { return useAirliftCompressors; } + public boolean usePageChecksumVerification() { return usePageChecksumVerification; } @@ -160,10 +170,11 @@ public static class Builder { protected boolean useColumnIndexFilter = COLUMN_INDEX_FILTERING_ENABLED_DEFAULT; protected boolean usePageChecksumVerification = PAGE_VERIFY_CHECKSUM_ENABLED_DEFAULT; protected boolean useBloomFilter = BLOOM_FILTER_ENABLED_DEFAULT; + protected boolean useAirliftCompressors = AIRLIFT_COMPRESSORS_ENABLED_DEFAULT; protected FilterCompat.Filter recordFilter = null; protected ParquetMetadataConverter.MetadataFilter metadataFilter = NO_FILTER; // the page size parameter isn't used when only using the codec factory to get decompressors - protected CompressionCodecFactory codecFactory = HadoopCodecs.newFactory(0); + protected CompressionCodecFactory codecFactory = HadoopCodecs.newFactory(0, useAirliftCompressors); protected ByteBufferAllocator allocator = new HeapByteBufferAllocator(); protected int maxAllocationSize = ALLOCATION_SIZE_DEFAULT; protected Map properties = new HashMap<>(); @@ -262,6 +273,11 @@ public Builder withCodecFactory(CompressionCodecFactory codecFactory) { return this; } + public Builder useAirliftCompressors(boolean useAirliftCompressors) { + this.useAirliftCompressors = useAirliftCompressors; + return this; + } + public Builder withAllocator(ByteBufferAllocator allocator) { this.allocator = allocator; return this; @@ -291,6 +307,7 @@ public Builder copy(ParquetReadOptions options) { withMetadataFilter(options.metadataFilter); withCodecFactory(options.codecFactory); withAllocator(options.allocator); + useAirliftCompressors(options.useAirliftCompressors); withPageChecksumVerification(options.usePageChecksumVerification); for (Map.Entry keyValue : options.properties.entrySet()) { set(keyValue.getKey(), keyValue.getValue()); @@ -301,8 +318,8 @@ public Builder copy(ParquetReadOptions options) { public ParquetReadOptions build() { return new ParquetReadOptions( useSignedStringMinMax, useStatsFilter, useDictionaryFilter, useRecordFilter, - useColumnIndexFilter, usePageChecksumVerification, useBloomFilter, recordFilter, metadataFilter, - codecFactory, allocator, maxAllocationSize, properties); + useColumnIndexFilter, usePageChecksumVerification, useBloomFilter, useAirliftCompressors, + recordFilter, metadataFilter, codecFactory, allocator, maxAllocationSize, properties); } } } diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/AirliftCompressor.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/AirliftCompressor.java new file mode 100644 index 0000000000..04598ef419 --- /dev/null +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/AirliftCompressor.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.parquet.hadoop; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import org.apache.hadoop.io.compress.CompressionCodec; +import org.apache.hadoop.io.compress.CompressionOutputStream; +import org.apache.hadoop.io.compress.Compressor; +import org.apache.parquet.bytes.BytesInput; +import org.apache.parquet.compression.CompressionCodecFactory; +import org.apache.parquet.hadoop.metadata.CompressionCodecName; + +public class AirliftCompressor extends CodecFactory.BytesCompressor { + private final Compressor compressor; + private final ByteArrayOutputStream compressedOutBuffer; + private final CompressionCodec hadoopCodec; + private final CompressionCodecName parquetCodecName; + + AirliftCompressor(CompressionCodecName parquetCodecName, CompressionCodec hadoopCodec, int pageSize) { + this.parquetCodecName = parquetCodecName; + this.hadoopCodec = hadoopCodec; + this.compressor = hadoopCodec.createCompressor(); + this.compressedOutBuffer = new ByteArrayOutputStream(pageSize); + } + + @Override + public BytesInput compress(BytesInput bytes) throws IOException { + compressedOutBuffer.reset(); + try (CompressionOutputStream cos = hadoopCodec.createOutputStream(compressedOutBuffer, compressor)) { + bytes.writeAllTo(cos); + return BytesInput.from(compressedOutBuffer); + } + } + + @Override + public CompressionCodecName getCodecName() { + return parquetCodecName; + } + + @Override + public void release() {} +} diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/AirliftCompressorCodecFactory.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/AirliftCompressorCodecFactory.java new file mode 100644 index 0000000000..d6c15b0a15 --- /dev/null +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/AirliftCompressorCodecFactory.java @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.parquet.hadoop; + +import io.airlift.compress.gzip.JdkGzipCodec; +import io.airlift.compress.lz4.Lz4Codec; +import io.airlift.compress.lzo.LzoCodec; +import org.apache.parquet.compression.CompressionCodecFactory; +import org.apache.parquet.hadoop.metadata.CompressionCodecName; + +public class AirliftCompressorCodecFactory implements CompressionCodecFactory { + + private final int pageSize; + + AirliftCompressorCodecFactory(int pageSize) { + this.pageSize = pageSize; + } + + @Override + public CodecFactory.BytesCompressor getCompressor(CompressionCodecName codecName) { + switch (codecName.getParquetCompressionCodec()) { + case GZIP: + return new AirliftCompressor(codecName, new JdkGzipCodec(), pageSize); + case LZO: + return new AirliftCompressor(codecName, new LzoCodec(), pageSize); + case LZ4: + return new AirliftCompressor(codecName, new Lz4Codec(), pageSize); + default: + throw new IllegalArgumentException("Codec not supported in AirliftCompressorCodecFactory: " + codecName); + } + } + + @Override + public CodecFactory.BytesDecompressor getDecompressor(CompressionCodecName codecName) { + switch (codecName.getParquetCompressionCodec()) { + case GZIP: + return new AirliftDecompressor(new JdkGzipCodec()); + case LZO: + return new AirliftDecompressor(new LzoCodec()); + case LZ4: + return new AirliftDecompressor(new Lz4Codec()); + default: + throw new IllegalArgumentException("Codec not supported in AirliftCompressorCodecFactory: " + codecName); + } + } + + @Override + public void release() {} + + @Override + public CompressionCodecFactory withAirliftCompressors(boolean useAirliftCompressors) { + return new AirliftCompressorCodecFactory(pageSize); + } + + static boolean isSupported(CompressionCodecName codecName) { + switch (codecName.getParquetCompressionCodec()) { + case GZIP: + case LZO: + case LZ4: + return true; + default: + return false; + } + } +} diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/AirliftDecompressor.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/AirliftDecompressor.java new file mode 100644 index 0000000000..b6798676ee --- /dev/null +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/AirliftDecompressor.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.parquet.hadoop; + +import java.io.IOException; +import java.io.InputStream; +import java.nio.ByteBuffer; +import org.apache.hadoop.io.compress.CompressionCodec; +import org.apache.hadoop.io.compress.Decompressor; +import org.apache.parquet.bytes.BytesInput; +import org.apache.parquet.compression.CompressionCodecFactory; + +public class AirliftDecompressor extends CodecFactory.BytesDecompressor { + private CompressionCodec codec; + private final Decompressor decompressor; + + public AirliftDecompressor(CompressionCodec codec) { + this.codec = codec; + decompressor = codec.createDecompressor(); + } + + @Override + public BytesInput decompress(BytesInput bytes, int uncompressedSize) throws IOException { + decompressor.reset(); + InputStream is = codec.createInputStream(bytes.toInputStream(), decompressor); + return BytesInput.from(is, uncompressedSize); + } + + @Override + public void decompress(ByteBuffer input, int compressedSize, ByteBuffer output, int uncompressedSize) + throws IOException { + ByteBuffer decompressed = decompress(BytesInput.from(input), uncompressedSize).toByteBuffer(); + output.put(decompressed); + } + + @Override + public void release() {} +} + diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/CodecFactory.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/CodecFactory.java index f0e7af35c5..7cb0380db1 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/CodecFactory.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/CodecFactory.java @@ -49,6 +49,7 @@ public class CodecFactory implements CompressionCodecFactory { protected final Configuration configuration; protected final int pageSize; + private final boolean useAirliftCompressors; /** * Create a new codec factory. @@ -60,8 +61,23 @@ public class CodecFactory implements CompressionCodecFactory { * decompressors this parameter has no impact on the function of the factory */ public CodecFactory(Configuration configuration, int pageSize) { + this(configuration, pageSize, false); + } + + /** + * Create a new codec factory. + * + * @param configuration used to pass compression codec configuration information + * @param pageSize the expected page size, does not set a hard limit, currently just used to set the + * initial size of the output stream used when compressing a buffer. If this factory is + * only used to construct decompressors this parameter has no impact on the function of + * the factory + * @param useAirliftCompressors whether to use Airlift based compressors and decompressors + */ + public CodecFactory(Configuration configuration, int pageSize, boolean useAirliftCompressors) { this.configuration = configuration; this.pageSize = pageSize; + this.useAirliftCompressors = useAirliftCompressors; } /** @@ -186,6 +202,9 @@ public CompressionCodecName getCodecName() { @Override public BytesCompressor getCompressor(CompressionCodecName codecName) { + if (useAirliftCompressors && AirliftCompressorCodecFactory.isSupported(codecName)) { + return new AirliftCompressorCodecFactory(pageSize).getCompressor(codecName); + } BytesCompressor comp = compressors.get(codecName); if (comp == null) { comp = createCompressor(codecName); @@ -196,6 +215,9 @@ public BytesCompressor getCompressor(CompressionCodecName codecName) { @Override public BytesDecompressor getDecompressor(CompressionCodecName codecName) { + if (useAirliftCompressors && AirliftCompressorCodecFactory.isSupported(codecName)) { + return new AirliftCompressorCodecFactory(pageSize).getDecompressor(codecName); + } BytesDecompressor decomp = decompressors.get(codecName); if (decomp == null) { decomp = createDecompressor(codecName); @@ -256,6 +278,11 @@ public void release() { decompressors.clear(); } + @Override + public CompressionCodecFactory withAirliftCompressors(boolean useAirlift) { + return new CodecFactory(configuration, pageSize, useAirlift); + } + /** * @deprecated will be removed in 2.0.0; use CompressionCodecFactory.BytesInputCompressor instead. */ diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/DirectCodecFactory.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/DirectCodecFactory.java index d5f13e286d..49e76672b8 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/DirectCodecFactory.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/DirectCodecFactory.java @@ -17,8 +17,7 @@ */ package org.apache.parquet.hadoop; - - +import java.io.InputStream; import java.lang.reflect.Method; import java.lang.reflect.InvocationTargetException; import java.io.IOException; @@ -27,13 +26,13 @@ import java.util.HashMap; import java.util.Map; import java.util.Objects; - import org.apache.commons.pool.BasePoolableObjectFactory; import org.apache.commons.pool.impl.GenericObjectPool; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.compress.CompressionCodec; import org.apache.hadoop.io.compress.Compressor; import org.apache.hadoop.io.compress.Decompressor; +import org.apache.parquet.compression.CompressionCodecFactory; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.xerial.snappy.Snappy; @@ -139,6 +138,13 @@ protected BytesDecompressor createDecompressor(final CompressionCodecName codecN } } + @Override + public CompressionCodecFactory withAirliftCompressors(boolean useAirliftCompressors) { + // Airlift based compressors cannot be created using DirectCodecFactory. + // So returning the instance itself instead of creating a new one. + return this; + } + public void close() { release(); } @@ -149,34 +155,25 @@ public void close() { */ public class IndirectDecompressor extends BytesDecompressor { private final Decompressor decompressor; + private final CompressionCodec codec; public IndirectDecompressor(CompressionCodec codec) { this.decompressor = DirectCodecPool.INSTANCE.codec(codec).borrowDecompressor(); + this.codec = codec; } @Override public BytesInput decompress(BytesInput bytes, int uncompressedSize) throws IOException { decompressor.reset(); - byte[] inputBytes = bytes.toByteArray(); - decompressor.setInput(inputBytes, 0, inputBytes.length); - byte[] output = new byte[uncompressedSize]; - decompressor.decompress(output, 0, uncompressedSize); - return BytesInput.from(output); + InputStream is = codec.createInputStream(bytes.toInputStream(), decompressor); + return BytesInput.from(is, uncompressedSize); } @Override public void decompress(ByteBuffer input, int compressedSize, ByteBuffer output, int uncompressedSize) throws IOException { - decompressor.reset(); - byte[] inputBytes = new byte[compressedSize]; - input.position(0); - input.get(inputBytes); - decompressor.setInput(inputBytes, 0, inputBytes.length); - byte[] outputBytes = new byte[uncompressedSize]; - decompressor.decompress(outputBytes, 0, uncompressedSize); - output.clear(); - output.put(outputBytes); + output.put(decompress(BytesInput.from(input), uncompressedSize).toByteBuffer()); } @Override diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetInputFormat.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetInputFormat.java index f46f18211a..2dd9167a19 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetInputFormat.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetInputFormat.java @@ -144,6 +144,11 @@ public class ParquetInputFormat extends FileInputFormat { */ public static final String BLOOM_FILTERING_ENABLED = "parquet.filter.bloom.enabled"; + /** + * key to configure whether airlift compressors are enabled + */ + public static final String AIRLIFT_COMPRESSORS_ENABLED = "parquet.airlift.compressors.enabled"; + /** * key to turn on or off task side metadata loading (default true) * if true then metadata is read on the task side and some tasks may finish immediately. diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetOutputFormat.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetOutputFormat.java index 1f7b6148c5..0169a9b33c 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetOutputFormat.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetOutputFormat.java @@ -18,6 +18,7 @@ */ package org.apache.parquet.hadoop; +import static org.apache.parquet.column.ParquetProperties.DEFAULT_AIRLIFT_COMPRESSORS_ENABLED; import static org.apache.parquet.column.ParquetProperties.DEFAULT_BLOOM_FILTER_ENABLED; import static org.apache.parquet.hadoop.ParquetWriter.DEFAULT_BLOCK_SIZE; import static org.apache.parquet.hadoop.util.ContextUtil.getConfiguration; @@ -151,6 +152,7 @@ public static enum JobSummaryLevel { public static final String BLOOM_FILTER_MAX_BYTES = "parquet.bloom.filter.max.bytes"; public static final String PAGE_ROW_COUNT_LIMIT = "parquet.page.row.count.limit"; public static final String PAGE_WRITE_CHECKSUM_ENABLED = "parquet.page.write-checksum.enabled"; + public static final String AIRLIFT_COMPRESSORS_ENABLED = "parquet.enable.airlift.compressors"; public static JobSummaryLevel getJobSummaryLevel(Configuration conf) { String level = conf.get(JOB_SUMMARY_LEVEL); @@ -224,6 +226,11 @@ public static int getBloomFilterMaxBytes(Configuration conf) { public static boolean getBloomFilterEnabled(Configuration conf) { return conf.getBoolean(BLOOM_FILTER_ENABLED, DEFAULT_BLOOM_FILTER_ENABLED); } + + public static boolean getAirliftCompressorsEnabled(Configuration conf) { + return conf.getBoolean(AIRLIFT_COMPRESSORS_ENABLED, DEFAULT_AIRLIFT_COMPRESSORS_ENABLED); + } + public static int getBlockSize(JobContext jobContext) { return getBlockSize(getConfiguration(jobContext)); } @@ -449,6 +456,7 @@ public RecordWriter getRecordWriter(Configuration conf, Path file, Comp .withStatisticsTruncateLength(getStatisticsTruncateLength(conf)) .withMaxBloomFilterBytes(getBloomFilterMaxBytes(conf)) .withBloomFilterEnabled(getBloomFilterEnabled(conf)) + .withAirliftCompressorsEnabled(getAirliftCompressorsEnabled(conf)) .withPageRowCountLimit(getPageRowCountLimit(conf)) .withPageWriteChecksumEnabled(getPageWriteChecksumEnabled(conf)); new ColumnConfigParser() diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetReader.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetReader.java index 71fde69064..aa7fbab153 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetReader.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetReader.java @@ -163,6 +163,11 @@ public void close() throws IOException { } } + // Visible for testing purposes only + ParquetReadOptions readOptions() { + return options; + } + public static Builder read(InputFile file) throws IOException { return new Builder<>(file); } @@ -294,6 +299,11 @@ public Builder useBloomFilter() { return this; } + public Builder useAirliftCompressors(boolean useAirliftCompressors) { + optionsBuilder.useAirliftCompressors(useAirliftCompressors); + return this; + } + public Builder usePageChecksumVerification() { optionsBuilder.usePageChecksumVerification(); return this; diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetRecordWriter.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetRecordWriter.java index a6dabc4a08..f574e8c811 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetRecordWriter.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetRecordWriter.java @@ -147,7 +147,7 @@ public ParquetRecordWriter( ParquetProperties props, MemoryManager memoryManager, Configuration conf) { - this.codecFactory = new CodecFactory(conf, props.getPageSizeThreshold()); + this.codecFactory = new CodecFactory(conf, props.getPageSizeThreshold(), props.useAirliftCompressors()); internalWriter = new InternalParquetRecordWriter(w, writeSupport, schema, extraMetaData, blockSize, codecFactory.getCompressor(codec), validating, props); diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetWriter.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetWriter.java index c6b2828401..001be8e02a 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetWriter.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetWriter.java @@ -26,6 +26,7 @@ import org.apache.parquet.column.ParquetProperties; import org.apache.parquet.column.ParquetProperties.WriterVersion; +import org.apache.parquet.compression.CompressionCodecFactory; import org.apache.parquet.hadoop.api.WriteSupport; import org.apache.parquet.hadoop.metadata.CompressionCodecName; import org.apache.parquet.hadoop.metadata.ParquetMetadata; @@ -283,7 +284,8 @@ public ParquetWriter(Path file, Configuration conf, WriteSupport writeSupport encodingProps.getPageWriteChecksumEnabled()); fileWriter.start(); - this.codecFactory = new CodecFactory(conf, encodingProps.getPageSizeThreshold()); + this.codecFactory = + new CodecFactory(conf, encodingProps.getPageSizeThreshold(), encodingProps.useAirliftCompressors()); CodecFactory.BytesCompressor compressor = codecFactory.getCompressor(compressionCodecName); this.writer = new InternalParquetRecordWriter( fileWriter, @@ -592,6 +594,17 @@ public SELF withBloomFilterEnabled(String columnPath, boolean enabled) { return self(); } + /** + * Enable or disable using Airlift based compressors and decompressors. + * + * @param useAirliftCompressors whether to enable + * @return this builder for method chaining + */ + public SELF withAirliftCompressorsEnabled(boolean useAirliftCompressors) { + encodingPropsBuilder.withAirliftCompressorsEnabled(useAirliftCompressors); + return self(); + } + /** * Set a property that will be available to the read path. For writers that use a Hadoop * configuration, this is the recommended way to add configuration values. @@ -624,4 +637,9 @@ mode, getWriteSupport(conf), codecName, } } } + + // Visible for testing purposes only + CompressionCodecFactory getCodecFactory() { + return codecFactory; + } } diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/CompressionConverter.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/CompressionConverter.java index 922699f486..91db85b826 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/CompressionConverter.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/CompressionConverter.java @@ -69,7 +69,7 @@ public CompressionConverter() { } public void processBlocks(TransParquetFileReader reader, ParquetFileWriter writer, ParquetMetadata meta, MessageType schema, - String createdBy, CompressionCodecName codecName) throws IOException { + String createdBy, CompressionCodecName codecName, boolean useAirliftCompressors) throws IOException { int blockIndex = 0; PageReadStore store = reader.readNextRowGroup(); while (store != null) { @@ -83,7 +83,7 @@ public void processBlocks(TransParquetFileReader reader, ParquetFileWriter write ColumnReadStoreImpl crstore = new ColumnReadStoreImpl(store, new DummyGroupConverter(), schema, createdBy); ColumnDescriptor columnDescriptor = descriptorsMap.get(chunk.getPath()); writer.startColumn(columnDescriptor, crstore.getColumnReader(columnDescriptor).getTotalValueCount(), codecName); - processChunk(reader, writer, chunk, createdBy, codecName); + processChunk(reader, writer, chunk, createdBy, codecName, useAirliftCompressors); writer.endColumn(); } writer.endBlock(); @@ -93,8 +93,8 @@ public void processBlocks(TransParquetFileReader reader, ParquetFileWriter write } private void processChunk(TransParquetFileReader reader, ParquetFileWriter writer, ColumnChunkMetaData chunk, - String createdBy, CompressionCodecName codecName) throws IOException { - CompressionCodecFactory codecFactory = HadoopCodecs.newFactory(0); + String createdBy, CompressionCodecName codecName, boolean useAirliftCompressors) throws IOException { + CompressionCodecFactory codecFactory = HadoopCodecs.newFactory(0, useAirliftCompressors); CompressionCodecFactory.BytesInputDecompressor decompressor = codecFactory.getDecompressor(chunk.getCodec()); CompressionCodecFactory.BytesInputCompressor compressor = codecFactory.getCompressor(codecName); ColumnIndex columnIndex = reader.readColumnIndex(chunk); diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/HadoopCodecs.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/HadoopCodecs.java index a46c8db216..402d7013c1 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/HadoopCodecs.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/HadoopCodecs.java @@ -25,12 +25,23 @@ import org.apache.parquet.hadoop.CodecFactory; public class HadoopCodecs { + + @Deprecated public static CompressionCodecFactory newFactory(int sizeHint) { - return new CodecFactory(new Configuration(), sizeHint); + return new CodecFactory(new Configuration(), sizeHint, false); } + @Deprecated public static CompressionCodecFactory newFactory(Configuration conf, int sizeHint) { - return new CodecFactory(conf, sizeHint); + return new CodecFactory(conf, sizeHint, false); + } + + public static CompressionCodecFactory newFactory(int sizeHint, boolean useAirliftCompressors) { + return new CodecFactory(new Configuration(), sizeHint, useAirliftCompressors); + } + + public static CompressionCodecFactory newFactory(Configuration conf, int sizeHint, boolean useAirliftCompressors) { + return new CodecFactory(conf, sizeHint, useAirliftCompressors); } public static CompressionCodecFactory newDirectFactory(Configuration conf, ByteBufferAllocator allocator, int sizeHint) { diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestAirliftCompressors.java b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestAirliftCompressors.java new file mode 100644 index 0000000000..789718793e --- /dev/null +++ b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestAirliftCompressors.java @@ -0,0 +1,165 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.parquet.hadoop; + +import java.io.File; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.parquet.example.data.Group; +import org.apache.parquet.example.data.GroupFactory; +import org.apache.parquet.example.data.simple.SimpleGroupFactory; +import org.apache.parquet.hadoop.example.ExampleParquetWriter; +import org.apache.parquet.hadoop.example.GroupReadSupport; +import org.apache.parquet.hadoop.example.GroupWriteSupport; +import org.apache.parquet.hadoop.metadata.CompressionCodecName; +import org.apache.parquet.schema.MessageType; +import org.apache.parquet.schema.Types; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +import static org.apache.parquet.schema.LogicalTypeAnnotation.stringType; +import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.BINARY; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; + +public class TestAirliftCompressors { + + @Rule + public TemporaryFolder temp = new TemporaryFolder(); + + @Test + public void testParquetReadWriteAirliftCompressorOption() throws Exception { + Configuration conf = new Configuration(); + File file = temp.newFile(); + file.delete(); + Path path = new Path(file.getAbsolutePath()); + MessageType schema = Types.buildMessage(). + required(BINARY).as(stringType()).named("name").named("msg"); + GroupWriteSupport.setSchema(schema, conf); + + try (ParquetWriter writer = ExampleParquetWriter.builder(path) + .withPageRowCountLimit(10) + .withConf(conf) + .withDictionaryEncoding(false) + .withAirliftCompressorsEnabled(true) + .withCompressionCodec(CompressionCodecName.GZIP) + .build()) { + assertTrue( + "AirliftCompressor instance should have been used", + writer.getCodecFactory().getCompressor(CompressionCodecName.GZIP) instanceof AirliftCompressor); + } + + file = temp.newFile(); + file.delete(); + path = new Path(file.getAbsolutePath()); + // verify that the default read option is to not use airlift compressors + try (ParquetWriter writer = ExampleParquetWriter.builder(path) + .withPageRowCountLimit(10) + .withConf(conf) + .withDictionaryEncoding(false) + .withCompressionCodec(CompressionCodecName.GZIP) + .build()) { + assertFalse( + "Non-airlift compressor instance should have been used", + writer.getCodecFactory().getCompressor(CompressionCodecName.GZIP) instanceof AirliftCompressor); + } + + try (ParquetReader reader = + ParquetReader.builder(new GroupReadSupport(), path).withConf(conf).useAirliftCompressors(true).build()) { + assertTrue("Use airlift compressors should have been true", reader.readOptions().useAirliftCompressors()); + assertTrue( + "AirliftCompressor instance should have been used", + reader.readOptions() + .getCodecFactory() + .getDecompressor(CompressionCodecName.GZIP) instanceof AirliftDecompressor); + } + + // verify that the default read option is to not use airlift compressors + try (ParquetReader reader = + ParquetReader.builder(new GroupReadSupport(), path).withConf(conf).build()) { + assertFalse("Use airlift compressors should have been false", reader.readOptions().useAirliftCompressors()); + assertFalse( + "Non-airlift decompressor instance should have been used", + reader.readOptions() + .getCodecFactory() + .getDecompressor(CompressionCodecName.GZIP) instanceof AirliftDecompressor); + } + } + + @Test + public void testWriteReadCompatibilityForAirliftAndRegularCompressors() throws Exception { + boolean[][] writeReadModes = new boolean[][] { + // Write Airlift , Read non-airlift + {true, true}, + {true, false}, + {false, true} + }; + CompressionCodecName[] codecs = {CompressionCodecName.GZIP, CompressionCodecName.LZ4, CompressionCodecName.LZO}; + for (CompressionCodecName codec : codecs) { + for (boolean[] writeReadMode : writeReadModes) { + if ((writeReadMode[1] == false || writeReadMode[0] == false) && + (codec.equals(CompressionCodecName.LZ4) || codec.equals(CompressionCodecName.LZO))) { + // TODO: Add tests for non-airlift LZ4 and LZ0. + // For some reason reads and writes are running into codec class not found exceptions + continue; + } + testWriteReadCompatibilityForAirliftAndRegularCompressors(codec, writeReadMode[0], writeReadMode[1]); + } + } + } + + private void testWriteReadCompatibilityForAirliftAndRegularCompressors( + CompressionCodecName codecName, + boolean writeAirlift, + boolean readAirlift) throws Exception { + MessageType schema = Types.buildMessage(). + required(BINARY).as(stringType()).named("name").named("msg"); + String[] testNames = {"airlift", "parquet"}; + Configuration conf = new Configuration(); + GroupWriteSupport.setSchema(schema, conf); + GroupFactory factory = new SimpleGroupFactory(schema); + File file = temp.newFile(); + file.delete(); + Path path = new Path(file.getAbsolutePath()); + + try (ParquetWriter writer = ExampleParquetWriter.builder(path) + .withPageRowCountLimit(10) + .withConf(conf) + .withDictionaryEncoding(false) + .withAirliftCompressorsEnabled(writeAirlift) + .withCompressionCodec(codecName) + .build()) { + for (String testName : testNames) { + writer.write(factory.newGroup().append("name", testName)); + } + } + + try (ParquetReader reader = + ParquetReader.builder(new GroupReadSupport(), path).withConf(conf).useAirliftCompressors(readAirlift).build()) { + Group group = reader.read(); + assertEquals("airlift", group.getBinary("name", 0).toStringUsingUTF8()); + group = reader.read(); + assertEquals("parquet", group.getBinary("name", 0).toStringUsingUTF8()); + assertNull("reader should have returned null since only two values were written to the file", reader.read()); + } + } +} diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/util/CompressionConveterTest.java b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/util/CompressionConveterTest.java index fefa5e4f08..1b913b9c2c 100644 --- a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/util/CompressionConveterTest.java +++ b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/util/CompressionConveterTest.java @@ -118,7 +118,8 @@ private void convertCompression(Configuration conf, String inputFile, String out writer.start(); try (TransParquetFileReader reader = new TransParquetFileReader(HadoopInputFile.fromPath(inPath, conf), HadoopReadOptions.builder(conf).build())) { - compressionConverter.processBlocks(reader, writer, metaData, schema, metaData.getFileMetaData().getCreatedBy(), codecName); + compressionConverter.processBlocks(reader, writer, metaData, schema, metaData.getFileMetaData().getCreatedBy(), + codecName, false); } finally { writer.end(metaData.getFileMetaData().getKeyValueMetaData()); } diff --git a/parquet-tools/src/main/java/org/apache/parquet/tools/command/TransCompressionCommand.java b/parquet-tools/src/main/java/org/apache/parquet/tools/command/TransCompressionCommand.java index 1348a63ff5..2692579554 100644 --- a/parquet-tools/src/main/java/org/apache/parquet/tools/command/TransCompressionCommand.java +++ b/parquet-tools/src/main/java/org/apache/parquet/tools/command/TransCompressionCommand.java @@ -22,6 +22,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.parquet.HadoopReadOptions; +import org.apache.parquet.Preconditions; import org.apache.parquet.hadoop.ParquetFileReader; import org.apache.parquet.hadoop.ParquetFileWriter; import org.apache.parquet.hadoop.metadata.CompressionCodecName; @@ -41,21 +42,24 @@ public class TransCompressionCommand extends ArgsOnlyCommand { " ", "where is the source parquet file", - " is the destination parquet file," + - " is the codec name in the case sensitive format to be translated to, e.g. SNAPPY, GZIP, ZSTD, LZO, LZ4, BROTLI, UNCOMPRESSED" + " is the destination parquet file", + " is the codec name in the case sensitive format to be translated to, e.g. SNAPPY, GZIP, " + + "ZSTD, LZO, LZ4, BROTLI, UNCOMPRESSED", + " optional flag (case insensitive true or false) to determine whether to use Airlift" + + " based compressors for GZIP, LZ0 or LZ4" }; private Configuration conf; private CompressionConverter compressionConverter; public TransCompressionCommand() { - super(3, 3); + super(3, 4); this.conf = new Configuration(); compressionConverter = new CompressionConverter(); } public TransCompressionCommand(Configuration conf) { - super(3, 3); + super(3, 4); this.conf = conf; compressionConverter = new CompressionConverter(); } @@ -78,15 +82,36 @@ public void execute(CommandLine options) throws Exception { Path outPath = new Path(args.get(1)); CompressionCodecName codecName = CompressionCodecName.valueOf(args.get(2)); + boolean useAirliftCompressors = false; + if (args.size() > 3) { + useAirliftCompressors = validateAirliftCompressorArg(args, codecName); + } + ParquetMetadata metaData = ParquetFileReader.readFooter(conf, inPath, NO_FILTER); MessageType schema = metaData.getFileMetaData().getSchema(); ParquetFileWriter writer = new ParquetFileWriter(conf, schema, outPath, ParquetFileWriter.Mode.CREATE); writer.start(); try (TransParquetFileReader reader = new TransParquetFileReader(HadoopInputFile.fromPath(inPath, conf), HadoopReadOptions.builder(conf).build())) { - compressionConverter.processBlocks(reader, writer, metaData, schema, metaData.getFileMetaData().getCreatedBy(), codecName); + compressionConverter.processBlocks(reader, writer, metaData, schema, metaData.getFileMetaData().getCreatedBy(), + codecName, useAirliftCompressors); } finally { writer.end(metaData.getFileMetaData().getKeyValueMetaData()); } } + + private static boolean validateAirliftCompressorArg(List args, CompressionCodecName codecName) { + String useAirliftArg = args.get(3); + Preconditions.checkArgument( + useAirliftArg.equalsIgnoreCase("true") || useAirliftArg.equalsIgnoreCase("false"), + "Illegal argument - valid values are true or false (case insensitive)"); + boolean useAirliftCompressors = Boolean.valueOf(useAirliftArg); + if (useAirliftCompressors) { + Preconditions.checkArgument( + codecName.equals("GZIP") || codecName.equals("LZ0") || codecName.equals("LZ4"), + "Airlift compressors are not supported for codec" + codecName); + return true; + } + return false; + } }