Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

PARQUET-1643 Use airlift codecs for LZ4, LZ0, GZIP #671

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ public class BenchmarkFiles {
// public final Path parquetFile_1M_LZO = new Path("target/tests/ParquetBenchmarks/PARQUET-1M-LZO");
public static final Path file_1M_SNAPPY = new Path(TARGET_DIR + "/PARQUET-1M-SNAPPY");
public static final Path file_1M_GZIP = new Path(TARGET_DIR + "/PARQUET-1M-GZIP");
public static final Path file_1M_AIRLIFT_GZIP = new Path(TARGET_DIR + "/PARQUET-1M-AIRLIFT-GZIP");

// Page checksum files
public static final Path file_100K_CHECKSUMS_UNCOMPRESSED = new Path(TARGET_DIR + "/PARQUET-100K-CHECKSUMS-UNCOMPRESSED");
Expand All @@ -53,7 +54,11 @@ public class BenchmarkFiles {
public static final Path file_1M_CHECKSUMS_GZIP = new Path(TARGET_DIR + "/PARQUET-1M-CHECKSUMS-GZIP");
public static final Path file_1M_NOCHECKSUMS_GZIP = new Path(TARGET_DIR + "/PARQUET-1M-NOCHECKSUMS-GZIP");
public static final Path file_10M_CHECKSUMS_GZIP = new Path(TARGET_DIR + "/PARQUET-10M-CHECKSUMS-GZIP");
public static final Path file_10M_CHECKSUMS_AIRLIFT_GZIP = new Path(TARGET_DIR + "/PARQUET-10M-CHECKSUMS-AIRLIFT" +
"-ZIP");
public static final Path file_10M_NOCHECKSUMS_GZIP = new Path(TARGET_DIR + "/PARQUET-10M-NOCHECKSUMS-GZIP");
public static final Path file_10M_NOCHECKSUMS_AIRLIFT_GZIP = new Path(TARGET_DIR + "/PARQUET-10M-AIRLIFT" +
"-NOCHECKSUMS-GZIP");

public static final Path file_100K_CHECKSUMS_SNAPPY = new Path(TARGET_DIR + "/PARQUET-100K-CHECKSUMS-SNAPPY");
public static final Path file_100K_NOCHECKSUMS_SNAPPY = new Path(TARGET_DIR + "/PARQUET-100K-NOCHECKSUMS-SNAPPY");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ public void generateAll() {
// generateData(parquetFile_1M_LZO, configuration, PARQUET_2_0, BLOCK_SIZE_DEFAULT, PAGE_SIZE_DEFAULT, FIXED_LEN_BYTEARRAY_SIZE, LZO, ONE_MILLION);
generateData(file_1M_SNAPPY, configuration, PARQUET_2_0, BLOCK_SIZE_DEFAULT, PAGE_SIZE_DEFAULT, FIXED_LEN_BYTEARRAY_SIZE, SNAPPY, ONE_MILLION);
generateData(file_1M_GZIP, configuration, PARQUET_2_0, BLOCK_SIZE_DEFAULT, PAGE_SIZE_DEFAULT, FIXED_LEN_BYTEARRAY_SIZE, GZIP, ONE_MILLION);
generateData(file_1M_AIRLIFT_GZIP, configuration, PARQUET_2_0, BLOCK_SIZE_DEFAULT, PAGE_SIZE_DEFAULT, FIXED_LEN_BYTEARRAY_SIZE, GZIP, ONE_MILLION);
}
catch (IOException e) {
throw new RuntimeException(e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@
import java.io.IOException;
import java.util.Random;

import static org.apache.parquet.benchmarks.BenchmarkUtils.deleteIfExists;
import static org.apache.parquet.benchmarks.BenchmarkUtils.exists;
import static org.apache.parquet.hadoop.metadata.CompressionCodecName.*;

Expand All @@ -51,8 +50,9 @@ public class PageChecksumDataGenerator extends DataGenerator {
" }" +
"}");

public void generateData(Path outFile, int nRows, boolean writeChecksums,
CompressionCodecName compression) throws IOException {
public void generateData(
Path outFile, int nRows, boolean writeChecksums,
CompressionCodecName compression, boolean useAirlift) throws IOException {
if (exists(configuration, outFile)) {
System.out.println("File already exists " + outFile);
return;
Expand All @@ -64,6 +64,7 @@ public void generateData(Path outFile, int nRows, boolean writeChecksums,
.withCompressionCodec(compression)
.withDictionaryEncoding(true)
.withType(SCHEMA)
.withAirliftCompressorsEnabled(useAirlift)
.withPageWriteChecksumEnabled(writeChecksums)
.build();

Expand All @@ -90,15 +91,16 @@ public void generateAll() {
try {
// No need to generate the non-checksum versions, as the files generated here are only used in
// the read benchmarks
generateData(file_100K_CHECKSUMS_UNCOMPRESSED, 100 * ONE_K, true, UNCOMPRESSED);
generateData(file_100K_CHECKSUMS_GZIP, 100 * ONE_K, true, GZIP);
generateData(file_100K_CHECKSUMS_SNAPPY, 100 * ONE_K, true, SNAPPY);
generateData(file_1M_CHECKSUMS_UNCOMPRESSED, ONE_MILLION, true, UNCOMPRESSED);
generateData(file_1M_CHECKSUMS_GZIP, ONE_MILLION, true, GZIP);
generateData(file_1M_CHECKSUMS_SNAPPY, ONE_MILLION, true, SNAPPY);
generateData(file_10M_CHECKSUMS_UNCOMPRESSED, 10 * ONE_MILLION, true, UNCOMPRESSED);
generateData(file_10M_CHECKSUMS_GZIP, 10 * ONE_MILLION, true, GZIP);
generateData(file_10M_CHECKSUMS_SNAPPY, 10 * ONE_MILLION, true, SNAPPY);
generateData(file_100K_CHECKSUMS_UNCOMPRESSED, 100 * ONE_K, true, UNCOMPRESSED, false);
generateData(file_100K_CHECKSUMS_GZIP, 100 * ONE_K, true, GZIP, false);
generateData(file_100K_CHECKSUMS_SNAPPY, 100 * ONE_K, true, SNAPPY, false);
generateData(file_1M_CHECKSUMS_UNCOMPRESSED, ONE_MILLION, true, UNCOMPRESSED, false);
generateData(file_1M_CHECKSUMS_GZIP, ONE_MILLION, true, GZIP, false);
generateData(file_1M_CHECKSUMS_SNAPPY, ONE_MILLION, true, SNAPPY, false);
generateData(file_10M_CHECKSUMS_UNCOMPRESSED, 10 * ONE_MILLION, true, UNCOMPRESSED, false);
generateData(file_10M_CHECKSUMS_GZIP, 10 * ONE_MILLION, true, GZIP, false);
generateData(file_10M_CHECKSUMS_AIRLIFT_GZIP, 10 * ONE_MILLION, true, GZIP, false);
generateData(file_10M_CHECKSUMS_SNAPPY, 10 * ONE_MILLION, true, SNAPPY, false);
} catch (IOException e) {
throw new RuntimeException(e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import static org.apache.parquet.benchmarks.BenchmarkFiles.file_100K_CHECKSUMS_UNCOMPRESSED;
import static org.apache.parquet.benchmarks.BenchmarkFiles.file_100K_CHECKSUMS_GZIP;
import static org.apache.parquet.benchmarks.BenchmarkFiles.file_100K_CHECKSUMS_SNAPPY;
import static org.apache.parquet.benchmarks.BenchmarkFiles.file_10M_CHECKSUMS_AIRLIFT_GZIP;
import static org.apache.parquet.benchmarks.BenchmarkFiles.file_1M_CHECKSUMS_UNCOMPRESSED;
import static org.apache.parquet.benchmarks.BenchmarkFiles.file_1M_CHECKSUMS_GZIP;
import static org.apache.parquet.benchmarks.BenchmarkFiles.file_1M_CHECKSUMS_SNAPPY;
Expand All @@ -60,11 +61,12 @@ public void setup() {
pageChecksumDataGenerator.generateAll();
}

private void readFile(Path file, int nRows, boolean verifyChecksums, Blackhole blackhole)
private void readFile(Path file, int nRows, boolean verifyChecksums, Blackhole blackhole, boolean useAirlift)
throws IOException {
try (ParquetReader<Group> reader = ParquetReader.builder(new GroupReadSupport(), file)
.withConf(configuration)
.usePageChecksumVerification(verifyChecksums)
.useAirliftCompressors(useAirlift)
.build()) {
for (int i = 0; i < nRows; i++) {
Group group = reader.read();
Expand All @@ -84,113 +86,125 @@ private void readFile(Path file, int nRows, boolean verifyChecksums, Blackhole b
@Benchmark
@BenchmarkMode(Mode.SingleShotTime)
public void read100KRowsUncompressedWithoutVerification(Blackhole blackhole) throws IOException {
readFile(file_100K_CHECKSUMS_UNCOMPRESSED, 100 * ONE_K, false, blackhole);
readFile(file_100K_CHECKSUMS_UNCOMPRESSED, 100 * ONE_K, false, blackhole, false);
}

@Benchmark
@BenchmarkMode(Mode.SingleShotTime)
public void read100KRowsUncompressedWithVerification(Blackhole blackhole) throws IOException {
readFile(file_100K_CHECKSUMS_UNCOMPRESSED, 100 * ONE_K, true, blackhole);
readFile(file_100K_CHECKSUMS_UNCOMPRESSED, 100 * ONE_K, true, blackhole, false);
}

@Benchmark
@BenchmarkMode(Mode.SingleShotTime)
public void read100KRowsGzipWithoutVerification(Blackhole blackhole) throws IOException {
readFile(file_100K_CHECKSUMS_GZIP, 100 * ONE_K, false, blackhole);
readFile(file_100K_CHECKSUMS_GZIP, 100 * ONE_K, false, blackhole, false);
}

@Benchmark
@BenchmarkMode(Mode.SingleShotTime)
public void read100KRowsGzipWithVerification(Blackhole blackhole) throws IOException {
readFile(file_100K_CHECKSUMS_GZIP, 100 * ONE_K, true, blackhole);
readFile(file_100K_CHECKSUMS_GZIP, 100 * ONE_K, true, blackhole, false);
}

@Benchmark
@BenchmarkMode(Mode.SingleShotTime)
public void read100KRowsSnappyWithoutVerification(Blackhole blackhole) throws IOException {
readFile(file_100K_CHECKSUMS_SNAPPY, 100 * ONE_K, false, blackhole);
readFile(file_100K_CHECKSUMS_SNAPPY, 100 * ONE_K, false, blackhole, false);
}

@Benchmark
@BenchmarkMode(Mode.SingleShotTime)
public void read100KRowsSnappyWithVerification(Blackhole blackhole) throws IOException {
readFile(file_100K_CHECKSUMS_SNAPPY, 100 * ONE_K, true, blackhole);
readFile(file_100K_CHECKSUMS_SNAPPY, 100 * ONE_K, true, blackhole, false);
}

// 1M rows, uncompressed, GZIP, Snappy

@Benchmark
@BenchmarkMode(Mode.SingleShotTime)
public void read1MRowsUncompressedWithoutVerification(Blackhole blackhole) throws IOException {
readFile(file_1M_CHECKSUMS_UNCOMPRESSED, ONE_MILLION, false, blackhole);
readFile(file_1M_CHECKSUMS_UNCOMPRESSED, ONE_MILLION, false, blackhole, false);
}

@Benchmark
@BenchmarkMode(Mode.SingleShotTime)
public void read1MRowsUncompressedWithVerification(Blackhole blackhole) throws IOException {
readFile(file_1M_CHECKSUMS_UNCOMPRESSED, ONE_MILLION, true, blackhole);
readFile(file_1M_CHECKSUMS_UNCOMPRESSED, ONE_MILLION, true, blackhole, false);
}

@Benchmark
@BenchmarkMode(Mode.SingleShotTime)
public void read1MRowsGzipWithoutVerification(Blackhole blackhole) throws IOException {
readFile(file_1M_CHECKSUMS_GZIP, ONE_MILLION, false, blackhole);
readFile(file_1M_CHECKSUMS_GZIP, ONE_MILLION, false, blackhole, false);
}

@Benchmark
@BenchmarkMode(Mode.SingleShotTime)
public void read1MRowsGzipWithVerification(Blackhole blackhole) throws IOException {
readFile(file_1M_CHECKSUMS_GZIP, ONE_MILLION, true, blackhole);
readFile(file_1M_CHECKSUMS_GZIP, ONE_MILLION, true, blackhole, false);
}

@Benchmark
@BenchmarkMode(Mode.SingleShotTime)
public void read1MRowsSnappyWithoutVerification(Blackhole blackhole) throws IOException {
readFile(file_1M_CHECKSUMS_SNAPPY, ONE_MILLION, false, blackhole);
readFile(file_1M_CHECKSUMS_SNAPPY, ONE_MILLION, false, blackhole, false);
}

@Benchmark
@BenchmarkMode(Mode.SingleShotTime)
public void read1MRowsSnappyWithVerification(Blackhole blackhole) throws IOException {
readFile(file_1M_CHECKSUMS_SNAPPY, ONE_MILLION, true, blackhole);
readFile(file_1M_CHECKSUMS_SNAPPY, ONE_MILLION, true, blackhole, false);
}

// 10M rows, uncompressed, GZIP, Snappy

@Benchmark
@BenchmarkMode(Mode.SingleShotTime)
public void read10MRowsUncompressedWithoutVerification(Blackhole blackhole) throws IOException {
readFile(file_10M_CHECKSUMS_UNCOMPRESSED, 10 * ONE_MILLION, false, blackhole);
readFile(file_10M_CHECKSUMS_UNCOMPRESSED, 10 * ONE_MILLION, false, blackhole, false);
}

@Benchmark
@BenchmarkMode(Mode.SingleShotTime)
public void read10MRowsUncompressedWithVerification(Blackhole blackhole) throws IOException {
readFile(file_10M_CHECKSUMS_UNCOMPRESSED, 10 * ONE_MILLION, true, blackhole);
readFile(file_10M_CHECKSUMS_UNCOMPRESSED, 10 * ONE_MILLION, true, blackhole, false);
}

@Benchmark
@BenchmarkMode(Mode.SingleShotTime)
public void read10MRowsGzipWithoutVerification(Blackhole blackhole) throws IOException {
readFile(file_10M_CHECKSUMS_GZIP, 10 * ONE_MILLION, false, blackhole);
readFile(file_10M_CHECKSUMS_GZIP, 10 * ONE_MILLION, false, blackhole, false);
}

@Benchmark
@BenchmarkMode(Mode.SingleShotTime)
public void read10MRowsAirliftGzipWithoutVerification(Blackhole blackhole) throws IOException {
readFile(file_10M_CHECKSUMS_AIRLIFT_GZIP, 10 * ONE_MILLION, false, blackhole, true);
}

@Benchmark
@BenchmarkMode(Mode.SingleShotTime)
public void read10MRowsGzipWithVerification(Blackhole blackhole) throws IOException {
readFile(file_10M_CHECKSUMS_GZIP, 10 * ONE_MILLION, true, blackhole);
readFile(file_10M_CHECKSUMS_GZIP, 10 * ONE_MILLION, true, blackhole, false);
}

@Benchmark
@BenchmarkMode(Mode.SingleShotTime)
public void read10MRowsAirliftGzipWithVerification(Blackhole blackhole) throws IOException {
readFile(file_10M_CHECKSUMS_AIRLIFT_GZIP, 10 * ONE_MILLION, true, blackhole, true);
}

@Benchmark
@BenchmarkMode(Mode.SingleShotTime)
public void read10MRowsSnappyWithoutVerification(Blackhole blackhole) throws IOException {
readFile(file_10M_CHECKSUMS_SNAPPY, 10 * ONE_MILLION, false, blackhole);
readFile(file_10M_CHECKSUMS_SNAPPY, 10 * ONE_MILLION, false, blackhole, false);
}

@Benchmark
@BenchmarkMode(Mode.SingleShotTime)
public void read10MRowsSnappyWithVerification(Blackhole blackhole) throws IOException {
readFile(file_10M_CHECKSUMS_SNAPPY, 10 * ONE_MILLION, true, blackhole);
readFile(file_10M_CHECKSUMS_SNAPPY, 10 * ONE_MILLION, true, blackhole, false);
}

}
Loading