Skip to content

Commit

Permalink
Add Airlift based compression for GZIP, LZO and LZ4 codecs
Browse files Browse the repository at this point in the history
  • Loading branch information
samarthjain committed Jun 29, 2020
1 parent e4988f3 commit 47871e2
Show file tree
Hide file tree
Showing 27 changed files with 648 additions and 92 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ public class BenchmarkFiles {
// public final Path parquetFile_1M_LZO = new Path("target/tests/ParquetBenchmarks/PARQUET-1M-LZO");
public static final Path file_1M_SNAPPY = new Path(TARGET_DIR + "/PARQUET-1M-SNAPPY");
public static final Path file_1M_GZIP = new Path(TARGET_DIR + "/PARQUET-1M-GZIP");
public static final Path file_1M_AIRLIFT_GZIP = new Path(TARGET_DIR + "/PARQUET-1M-AIRLIFT-GZIP");

// Page checksum files
public static final Path file_100K_CHECKSUMS_UNCOMPRESSED = new Path(TARGET_DIR + "/PARQUET-100K-CHECKSUMS-UNCOMPRESSED");
Expand All @@ -53,7 +54,11 @@ public class BenchmarkFiles {
public static final Path file_1M_CHECKSUMS_GZIP = new Path(TARGET_DIR + "/PARQUET-1M-CHECKSUMS-GZIP");
public static final Path file_1M_NOCHECKSUMS_GZIP = new Path(TARGET_DIR + "/PARQUET-1M-NOCHECKSUMS-GZIP");
public static final Path file_10M_CHECKSUMS_GZIP = new Path(TARGET_DIR + "/PARQUET-10M-CHECKSUMS-GZIP");
public static final Path file_10M_CHECKSUMS_AIRLIFT_GZIP = new Path(TARGET_DIR + "/PARQUET-10M-CHECKSUMS-AIRLIFT" +
"-ZIP");
public static final Path file_10M_NOCHECKSUMS_GZIP = new Path(TARGET_DIR + "/PARQUET-10M-NOCHECKSUMS-GZIP");
public static final Path file_10M_NOCHECKSUMS_AIRLIFT_GZIP = new Path(TARGET_DIR + "/PARQUET-10M-AIRLIFT" +
"-NOCHECKSUMS-GZIP");

public static final Path file_100K_CHECKSUMS_SNAPPY = new Path(TARGET_DIR + "/PARQUET-100K-CHECKSUMS-SNAPPY");
public static final Path file_100K_NOCHECKSUMS_SNAPPY = new Path(TARGET_DIR + "/PARQUET-100K-NOCHECKSUMS-SNAPPY");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ public void generateAll() {
// generateData(parquetFile_1M_LZO, configuration, PARQUET_2_0, BLOCK_SIZE_DEFAULT, PAGE_SIZE_DEFAULT, FIXED_LEN_BYTEARRAY_SIZE, LZO, ONE_MILLION);
generateData(file_1M_SNAPPY, configuration, PARQUET_2_0, BLOCK_SIZE_DEFAULT, PAGE_SIZE_DEFAULT, FIXED_LEN_BYTEARRAY_SIZE, SNAPPY, ONE_MILLION);
generateData(file_1M_GZIP, configuration, PARQUET_2_0, BLOCK_SIZE_DEFAULT, PAGE_SIZE_DEFAULT, FIXED_LEN_BYTEARRAY_SIZE, GZIP, ONE_MILLION);
generateData(file_1M_AIRLIFT_GZIP, configuration, PARQUET_2_0, BLOCK_SIZE_DEFAULT, PAGE_SIZE_DEFAULT, FIXED_LEN_BYTEARRAY_SIZE, GZIP, ONE_MILLION);
}
catch (IOException e) {
throw new RuntimeException(e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@
import java.io.IOException;
import java.util.Random;

import static org.apache.parquet.benchmarks.BenchmarkUtils.deleteIfExists;
import static org.apache.parquet.benchmarks.BenchmarkUtils.exists;
import static org.apache.parquet.hadoop.metadata.CompressionCodecName.*;

Expand All @@ -51,8 +50,9 @@ public class PageChecksumDataGenerator extends DataGenerator {
" }" +
"}");

public void generateData(Path outFile, int nRows, boolean writeChecksums,
CompressionCodecName compression) throws IOException {
public void generateData(
Path outFile, int nRows, boolean writeChecksums,
CompressionCodecName compression, boolean useAirlift) throws IOException {
if (exists(configuration, outFile)) {
System.out.println("File already exists " + outFile);
return;
Expand All @@ -64,6 +64,7 @@ public void generateData(Path outFile, int nRows, boolean writeChecksums,
.withCompressionCodec(compression)
.withDictionaryEncoding(true)
.withType(SCHEMA)
.withAirliftCompressorsEnabled(useAirlift)
.withPageWriteChecksumEnabled(writeChecksums)
.build();

Expand All @@ -90,15 +91,16 @@ public void generateAll() {
try {
// No need to generate the non-checksum versions, as the files generated here are only used in
// the read benchmarks
generateData(file_100K_CHECKSUMS_UNCOMPRESSED, 100 * ONE_K, true, UNCOMPRESSED);
generateData(file_100K_CHECKSUMS_GZIP, 100 * ONE_K, true, GZIP);
generateData(file_100K_CHECKSUMS_SNAPPY, 100 * ONE_K, true, SNAPPY);
generateData(file_1M_CHECKSUMS_UNCOMPRESSED, ONE_MILLION, true, UNCOMPRESSED);
generateData(file_1M_CHECKSUMS_GZIP, ONE_MILLION, true, GZIP);
generateData(file_1M_CHECKSUMS_SNAPPY, ONE_MILLION, true, SNAPPY);
generateData(file_10M_CHECKSUMS_UNCOMPRESSED, 10 * ONE_MILLION, true, UNCOMPRESSED);
generateData(file_10M_CHECKSUMS_GZIP, 10 * ONE_MILLION, true, GZIP);
generateData(file_10M_CHECKSUMS_SNAPPY, 10 * ONE_MILLION, true, SNAPPY);
generateData(file_100K_CHECKSUMS_UNCOMPRESSED, 100 * ONE_K, true, UNCOMPRESSED, false);
generateData(file_100K_CHECKSUMS_GZIP, 100 * ONE_K, true, GZIP, false);
generateData(file_100K_CHECKSUMS_SNAPPY, 100 * ONE_K, true, SNAPPY, false);
generateData(file_1M_CHECKSUMS_UNCOMPRESSED, ONE_MILLION, true, UNCOMPRESSED, false);
generateData(file_1M_CHECKSUMS_GZIP, ONE_MILLION, true, GZIP, false);
generateData(file_1M_CHECKSUMS_SNAPPY, ONE_MILLION, true, SNAPPY, false);
generateData(file_10M_CHECKSUMS_UNCOMPRESSED, 10 * ONE_MILLION, true, UNCOMPRESSED, false);
generateData(file_10M_CHECKSUMS_GZIP, 10 * ONE_MILLION, true, GZIP, false);
generateData(file_10M_CHECKSUMS_AIRLIFT_GZIP, 10 * ONE_MILLION, true, GZIP, false);
generateData(file_10M_CHECKSUMS_SNAPPY, 10 * ONE_MILLION, true, SNAPPY, false);
} catch (IOException e) {
throw new RuntimeException(e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import static org.apache.parquet.benchmarks.BenchmarkFiles.file_100K_CHECKSUMS_UNCOMPRESSED;
import static org.apache.parquet.benchmarks.BenchmarkFiles.file_100K_CHECKSUMS_GZIP;
import static org.apache.parquet.benchmarks.BenchmarkFiles.file_100K_CHECKSUMS_SNAPPY;
import static org.apache.parquet.benchmarks.BenchmarkFiles.file_10M_CHECKSUMS_AIRLIFT_GZIP;
import static org.apache.parquet.benchmarks.BenchmarkFiles.file_1M_CHECKSUMS_UNCOMPRESSED;
import static org.apache.parquet.benchmarks.BenchmarkFiles.file_1M_CHECKSUMS_GZIP;
import static org.apache.parquet.benchmarks.BenchmarkFiles.file_1M_CHECKSUMS_SNAPPY;
Expand All @@ -60,11 +61,12 @@ public void setup() {
pageChecksumDataGenerator.generateAll();
}

private void readFile(Path file, int nRows, boolean verifyChecksums, Blackhole blackhole)
private void readFile(Path file, int nRows, boolean verifyChecksums, Blackhole blackhole, boolean useAirlift)
throws IOException {
try (ParquetReader<Group> reader = ParquetReader.builder(new GroupReadSupport(), file)
.withConf(configuration)
.usePageChecksumVerification(verifyChecksums)
.useAirliftCompressors(useAirlift)
.build()) {
for (int i = 0; i < nRows; i++) {
Group group = reader.read();
Expand All @@ -84,113 +86,125 @@ private void readFile(Path file, int nRows, boolean verifyChecksums, Blackhole b
@Benchmark
@BenchmarkMode(Mode.SingleShotTime)
public void read100KRowsUncompressedWithoutVerification(Blackhole blackhole) throws IOException {
readFile(file_100K_CHECKSUMS_UNCOMPRESSED, 100 * ONE_K, false, blackhole);
readFile(file_100K_CHECKSUMS_UNCOMPRESSED, 100 * ONE_K, false, blackhole, false);
}

@Benchmark
@BenchmarkMode(Mode.SingleShotTime)
public void read100KRowsUncompressedWithVerification(Blackhole blackhole) throws IOException {
readFile(file_100K_CHECKSUMS_UNCOMPRESSED, 100 * ONE_K, true, blackhole);
readFile(file_100K_CHECKSUMS_UNCOMPRESSED, 100 * ONE_K, true, blackhole, false);
}

@Benchmark
@BenchmarkMode(Mode.SingleShotTime)
public void read100KRowsGzipWithoutVerification(Blackhole blackhole) throws IOException {
readFile(file_100K_CHECKSUMS_GZIP, 100 * ONE_K, false, blackhole);
readFile(file_100K_CHECKSUMS_GZIP, 100 * ONE_K, false, blackhole, false);
}

@Benchmark
@BenchmarkMode(Mode.SingleShotTime)
public void read100KRowsGzipWithVerification(Blackhole blackhole) throws IOException {
readFile(file_100K_CHECKSUMS_GZIP, 100 * ONE_K, true, blackhole);
readFile(file_100K_CHECKSUMS_GZIP, 100 * ONE_K, true, blackhole, false);
}

@Benchmark
@BenchmarkMode(Mode.SingleShotTime)
public void read100KRowsSnappyWithoutVerification(Blackhole blackhole) throws IOException {
readFile(file_100K_CHECKSUMS_SNAPPY, 100 * ONE_K, false, blackhole);
readFile(file_100K_CHECKSUMS_SNAPPY, 100 * ONE_K, false, blackhole, false);
}

@Benchmark
@BenchmarkMode(Mode.SingleShotTime)
public void read100KRowsSnappyWithVerification(Blackhole blackhole) throws IOException {
readFile(file_100K_CHECKSUMS_SNAPPY, 100 * ONE_K, true, blackhole);
readFile(file_100K_CHECKSUMS_SNAPPY, 100 * ONE_K, true, blackhole, false);
}

// 1M rows, uncompressed, GZIP, Snappy

@Benchmark
@BenchmarkMode(Mode.SingleShotTime)
public void read1MRowsUncompressedWithoutVerification(Blackhole blackhole) throws IOException {
readFile(file_1M_CHECKSUMS_UNCOMPRESSED, ONE_MILLION, false, blackhole);
readFile(file_1M_CHECKSUMS_UNCOMPRESSED, ONE_MILLION, false, blackhole, false);
}

@Benchmark
@BenchmarkMode(Mode.SingleShotTime)
public void read1MRowsUncompressedWithVerification(Blackhole blackhole) throws IOException {
readFile(file_1M_CHECKSUMS_UNCOMPRESSED, ONE_MILLION, true, blackhole);
readFile(file_1M_CHECKSUMS_UNCOMPRESSED, ONE_MILLION, true, blackhole, false);
}

@Benchmark
@BenchmarkMode(Mode.SingleShotTime)
public void read1MRowsGzipWithoutVerification(Blackhole blackhole) throws IOException {
readFile(file_1M_CHECKSUMS_GZIP, ONE_MILLION, false, blackhole);
readFile(file_1M_CHECKSUMS_GZIP, ONE_MILLION, false, blackhole, false);
}

@Benchmark
@BenchmarkMode(Mode.SingleShotTime)
public void read1MRowsGzipWithVerification(Blackhole blackhole) throws IOException {
readFile(file_1M_CHECKSUMS_GZIP, ONE_MILLION, true, blackhole);
readFile(file_1M_CHECKSUMS_GZIP, ONE_MILLION, true, blackhole, false);
}

@Benchmark
@BenchmarkMode(Mode.SingleShotTime)
public void read1MRowsSnappyWithoutVerification(Blackhole blackhole) throws IOException {
readFile(file_1M_CHECKSUMS_SNAPPY, ONE_MILLION, false, blackhole);
readFile(file_1M_CHECKSUMS_SNAPPY, ONE_MILLION, false, blackhole, false);
}

@Benchmark
@BenchmarkMode(Mode.SingleShotTime)
public void read1MRowsSnappyWithVerification(Blackhole blackhole) throws IOException {
readFile(file_1M_CHECKSUMS_SNAPPY, ONE_MILLION, true, blackhole);
readFile(file_1M_CHECKSUMS_SNAPPY, ONE_MILLION, true, blackhole, false);
}

// 10M rows, uncompressed, GZIP, Snappy

@Benchmark
@BenchmarkMode(Mode.SingleShotTime)
public void read10MRowsUncompressedWithoutVerification(Blackhole blackhole) throws IOException {
readFile(file_10M_CHECKSUMS_UNCOMPRESSED, 10 * ONE_MILLION, false, blackhole);
readFile(file_10M_CHECKSUMS_UNCOMPRESSED, 10 * ONE_MILLION, false, blackhole, false);
}

@Benchmark
@BenchmarkMode(Mode.SingleShotTime)
public void read10MRowsUncompressedWithVerification(Blackhole blackhole) throws IOException {
readFile(file_10M_CHECKSUMS_UNCOMPRESSED, 10 * ONE_MILLION, true, blackhole);
readFile(file_10M_CHECKSUMS_UNCOMPRESSED, 10 * ONE_MILLION, true, blackhole, false);
}

@Benchmark
@BenchmarkMode(Mode.SingleShotTime)
public void read10MRowsGzipWithoutVerification(Blackhole blackhole) throws IOException {
readFile(file_10M_CHECKSUMS_GZIP, 10 * ONE_MILLION, false, blackhole);
readFile(file_10M_CHECKSUMS_GZIP, 10 * ONE_MILLION, false, blackhole, false);
}

@Benchmark
@BenchmarkMode(Mode.SingleShotTime)
public void read10MRowsAirliftGzipWithoutVerification(Blackhole blackhole) throws IOException {
readFile(file_10M_CHECKSUMS_AIRLIFT_GZIP, 10 * ONE_MILLION, false, blackhole, true);
}

@Benchmark
@BenchmarkMode(Mode.SingleShotTime)
public void read10MRowsGzipWithVerification(Blackhole blackhole) throws IOException {
readFile(file_10M_CHECKSUMS_GZIP, 10 * ONE_MILLION, true, blackhole);
readFile(file_10M_CHECKSUMS_GZIP, 10 * ONE_MILLION, true, blackhole, false);
}

@Benchmark
@BenchmarkMode(Mode.SingleShotTime)
public void read10MRowsAirliftGzipWithVerification(Blackhole blackhole) throws IOException {
readFile(file_10M_CHECKSUMS_AIRLIFT_GZIP, 10 * ONE_MILLION, true, blackhole, true);
}

@Benchmark
@BenchmarkMode(Mode.SingleShotTime)
public void read10MRowsSnappyWithoutVerification(Blackhole blackhole) throws IOException {
readFile(file_10M_CHECKSUMS_SNAPPY, 10 * ONE_MILLION, false, blackhole);
readFile(file_10M_CHECKSUMS_SNAPPY, 10 * ONE_MILLION, false, blackhole, false);
}

@Benchmark
@BenchmarkMode(Mode.SingleShotTime)
public void read10MRowsSnappyWithVerification(Blackhole blackhole) throws IOException {
readFile(file_10M_CHECKSUMS_SNAPPY, 10 * ONE_MILLION, true, blackhole);
readFile(file_10M_CHECKSUMS_SNAPPY, 10 * ONE_MILLION, true, blackhole, false);
}

}
Loading

0 comments on commit 47871e2

Please sign in to comment.