Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

PARQUET-1643 Use airlift codecs for LZ4, LZ0, GZIP #671

Open
wants to merge 1 commit into
base: master
Choose a base branch
from

Conversation

samarthjain
Copy link

No description provided.

@RyanSkraba
Copy link
Contributor

RyanSkraba commented Aug 29, 2019

Neat! I was just poking around the codecs code so this is really interesting and timely.

I'm currently looking at how to run the parquet-benchmarks project... I'll see if I can get a clean run on master and your branch for LZ4 and GZIP to compare. (It looks like LZO benchmarks are disabled on master.)

Edit: There are no LZ4 benchmarks currently in the parquet-benchmarks module, and it looks like the run scripts need a bit of clean-up and attention! In the meantime, I managed a single, not very clean run of the WriteBenchmarks.write1MRowsDefaultBlockAndPageSizeGZIP with and without the change. No improvement or regression noted!

@samarthjain
Copy link
Author

samarthjain commented Aug 29, 2019

Benchmark results with the patch applied

Benchmark                                                                Mode  Cnt   Score   Error  Units
ReadBenchmarks.read1MRowsBS256MPS4MUncompressed                         thrpt   25   0.947 ± 0.011  ops/s
ReadBenchmarks.read1MRowsBS256MPS8MUncompressed                         thrpt   25   0.952 ± 0.010  ops/s
ReadBenchmarks.read1MRowsBS512MPS4MUncompressed                         thrpt   25   0.938 ± 0.015  ops/s
ReadBenchmarks.read1MRowsBS512MPS8MUncompressed                         thrpt   25   0.960 ± 0.012  ops/s
ReadBenchmarks.read1MRowsDefaultBlockAndPageSizeGZIP                    thrpt   25   0.725 ± 0.007  ops/s
ReadBenchmarks.read1MRowsDefaultBlockAndPageSizeSNAPPY                  thrpt   25   0.902 ± 0.005  ops/s
ReadBenchmarks.read1MRowsDefaultBlockAndPageSizeUncompressed            thrpt   25   0.940 ± 0.010  ops/s
PageChecksumReadBenchmarks.read100KRowsGzipWithVerification                ss    5   0.502 ± 0.169   s/op
PageChecksumReadBenchmarks.read100KRowsGzipWithoutVerification             ss    5   0.562 ± 0.299   s/op
PageChecksumReadBenchmarks.read100KRowsSnappyWithVerification              ss    5   0.649 ± 0.975   s/op
PageChecksumReadBenchmarks.read100KRowsSnappyWithoutVerification           ss    5   0.519 ± 0.095   s/op
PageChecksumReadBenchmarks.read100KRowsUncompressedWithVerification        ss    5   0.531 ± 0.205   s/op
PageChecksumReadBenchmarks.read100KRowsUncompressedWithoutVerification     ss    5   0.495 ± 0.182   s/op
PageChecksumReadBenchmarks.read10MRowsGzipWithVerification                 ss    5  13.505 ± 2.291   s/op
PageChecksumReadBenchmarks.read10MRowsGzipWithoutVerification              ss    5  13.529 ± 2.485   s/op
PageChecksumReadBenchmarks.read10MRowsSnappyWithVerification               ss    5  10.781 ± 1.075   s/op
PageChecksumReadBenchmarks.read10MRowsSnappyWithoutVerification            ss    5  10.711 ± 1.377   s/op
PageChecksumReadBenchmarks.read10MRowsUncompressedWithVerification         ss    5  10.822 ± 0.898   s/op
PageChecksumReadBenchmarks.read10MRowsUncompressedWithoutVerification      ss    5  10.497 ± 0.961   s/op
PageChecksumReadBenchmarks.read1MRowsGzipWithVerification                  ss    5   1.946 ± 1.070   s/op
PageChecksumReadBenchmarks.read1MRowsGzipWithoutVerification               ss    5   1.778 ± 0.684   s/op
PageChecksumReadBenchmarks.read1MRowsSnappyWithVerification                ss    5   1.817 ± 1.941   s/op
PageChecksumReadBenchmarks.read1MRowsSnappyWithoutVerification             ss    5   1.851 ± 1.808   s/op
PageChecksumReadBenchmarks.read1MRowsUncompressedWithVerification          ss    5   1.570 ± 0.242   s/op
PageChecksumReadBenchmarks.read1MRowsUncompressedWithoutVerification       ss    5   1.766 ± 1.573   s/op

@samarthjain samarthjain reopened this Aug 29, 2019
@samarthjain
Copy link
Author

samarthjain commented Aug 30, 2019

Benchmark results on master branch:

Benchmark                                                                Mode  Cnt   Score    Error  Units
ReadBenchmarks.read1MRowsBS256MPS4MUncompressed                         thrpt   25   0.952 ±  0.008  ops/s
ReadBenchmarks.read1MRowsBS256MPS8MUncompressed                         thrpt   25   0.947 ±  0.008  ops/s
ReadBenchmarks.read1MRowsBS512MPS4MUncompressed                         thrpt   25   0.957 ±  0.010  ops/s
ReadBenchmarks.read1MRowsBS512MPS8MUncompressed                         thrpt   25   0.956 ±  0.009  ops/s
ReadBenchmarks.read1MRowsDefaultBlockAndPageSizeGZIP                    thrpt   25   0.731 ±  0.007  ops/s
ReadBenchmarks.read1MRowsDefaultBlockAndPageSizeSNAPPY                  thrpt   25   0.897 ±  0.008  ops/s
ReadBenchmarks.read1MRowsDefaultBlockAndPageSizeUncompressed            thrpt   25   0.935 ±  0.013  ops/s
PageChecksumReadBenchmarks.read100KRowsGzipWithVerification                ss    5   0.525 ±  0.079   s/op
PageChecksumReadBenchmarks.read100KRowsGzipWithoutVerification             ss    5   0.483 ±  0.093   s/op
PageChecksumReadBenchmarks.read100KRowsSnappyWithVerification              ss    5   0.545 ±  0.408   s/op
PageChecksumReadBenchmarks.read100KRowsSnappyWithoutVerification           ss    5   0.517 ±  0.133   s/op
PageChecksumReadBenchmarks.read100KRowsUncompressedWithVerification        ss    5   0.501 ±  0.213   s/op
PageChecksumReadBenchmarks.read100KRowsUncompressedWithoutVerification     ss    5   0.506 ±  0.385   s/op
PageChecksumReadBenchmarks.read10MRowsGzipWithVerification                 ss    5  14.217 ± 10.173   s/op
PageChecksumReadBenchmarks.read10MRowsGzipWithoutVerification              ss    5  13.189 ±  1.396   s/op
PageChecksumReadBenchmarks.read10MRowsSnappyWithVerification               ss    5  11.369 ±  1.966   s/op
PageChecksumReadBenchmarks.read10MRowsSnappyWithoutVerification            ss    5  10.964 ±  3.167   s/op
PageChecksumReadBenchmarks.read10MRowsUncompressedWithVerification         ss    5  11.147 ±  2.056   s/op
PageChecksumReadBenchmarks.read10MRowsUncompressedWithoutVerification      ss    5  10.554 ±  1.415   s/op
PageChecksumReadBenchmarks.read1MRowsGzipWithVerification                  ss    5   1.745 ±  0.482   s/op
PageChecksumReadBenchmarks.read1MRowsGzipWithoutVerification               ss    5   1.788 ±  0.417   s/op
PageChecksumReadBenchmarks.read1MRowsSnappyWithVerification                ss    5   1.935 ±  1.977   s/op
PageChecksumReadBenchmarks.read1MRowsSnappyWithoutVerification             ss    5   1.505 ±  0.172   s/op
PageChecksumReadBenchmarks.read1MRowsUncompressedWithVerification          ss    5   1.790 ±  1.657   s/op
PageChecksumReadBenchmarks.read1MRowsUncompressedWithoutVerification       ss    5   1.751 ±  1.790   s/op

@samarthjain
Copy link
Author

Benchmark Name Master Airlift Codecs
ReadBenchmarks.read1MRowsBS256MPS4MUncompressed  0.952 0.947
ReadBenchmarks.read1MRowsBS256MPS8MUncompressed 0.947 0.952
ReadBenchmarks.read1MRowsBS512MPS4MUncompressed 0.957 0.938
ReadBenchmarks.read1MRowsBS512MPS8MUncompressed 0.956 0.96
ReadBenchmarks.read1MRowsDefaultBlockAndPageSizeGZIP 0.731 0.725
PageChecksumReadBenchmarks.read100KRowsGzipWithVerification 0.525 0.502
PageChecksumReadBenchmarks.read100KRowsGzipWithoutVerification 0.483 0.562
PageChecksumReadBenchmarks.read10MRowsGzipWithVerification 14.217 13.505
PageChecksumReadBenchmarks.read10MRowsGzipWithoutVerification 13.189 13.529
PageChecksumReadBenchmarks.read1MRowsGzipWithVerification 1.745 1.946
PageChecksumReadBenchmarks.read1MRowsGzipWithoutVerification 1.788 1.778

Pruned results for comparing GZIP perf. I don't see any significant speedup or regression.

Considering these compressors/decompressors don't use native resources, it would be cheap to create a compressor/decompressor for each page. This in turn allows for reading pages concurrently including implementing pre-fetching, removing the need to pool the de/compressor instances and making the overall code simpler.

@samarthjain samarthjain changed the title PARQUET-1643 Use airlift codecs for LZ4, LZ0 and GZIP PARQUET-1643 Use airlift codecs for LZ4, LZ0, GZIP and SNAPPY Sep 2, 2019
@@ -68,7 +67,7 @@ public ParquetRecordWriter(
MessageType schema,
Map<String, String> extraMetaData,
int blockSize, int pageSize,
BytesCompressor compressor,
BytesInputCompressor compressor,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Semantic versioning check failed on these removed constructors. Though BytesCompressor is marked as deprecated, I think you still should use it here instead of BytesInputCompressor, so that this PR can be reased in a 1.x release.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah! Thanks for pointing that out, @nandorKollar . Apparently, this and the other constructor taking BytesInputCompressor isn't used (at least I didn't find any references to it within the parquet project). I wonder if it would be ok with semantic versioning check to get rid of them. Going to try that.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Turns out removing constructors even though they are not called anywhere in the code isn't allowed either. Switching the type back to BytesCompressor did the trick.

@nandorKollar
Copy link
Contributor

@samarthjain why did you remove Snappy support?

Copy link
Contributor

@gszadovszky gszadovszky left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you also check TestDirectCodecFactory if we can run unit tests for LZO and LZ4?

@@ -27,7 +27,7 @@
import java.util.Map;
import java.util.Set;
import java.util.zip.CRC32;

import org.apache.parquet.bytes.ByteBufferAllocator;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please avoid rearranging the imports. It makes merges unnecessarily cumbersome. It is fine to remove unused imports, but those, which are still used should not be rearranged.



import java.lang.reflect.Method;
import java.lang.reflect.InvocationTargetException;
import java.io.IOException;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same as before, revert rearranging.

import static java.lang.Math.max;
import static java.lang.Math.min;
import static org.apache.parquet.Preconditions.checkNotNull;

import java.io.IOException;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same as before.

@@ -68,7 +67,7 @@ public ParquetRecordWriter(
MessageType schema,
Map<String, String> extraMetaData,
int blockSize, int pageSize,
BytesCompressor compressor,
CodecFactory.BytesCompressor compressor,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please revert this change too, add the relevant import as before.

@@ -107,7 +106,7 @@ public ParquetRecordWriter(
MessageType schema,
Map<String, String> extraMetaData,
long blockSize, int pageSize,
BytesCompressor compressor,
CodecFactory.BytesCompressor compressor,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same as the previous.

@Override
public BytesInput compress(BytesInput bytes) throws IOException {
compressedOutBuffer.reset();
CompressionOutputStream cos = hadoopCodec.createOutputStream(compressedOutBuffer, compressor);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please use try-with-resource here to close the stream even when an exception happens. Also, I don't think calling finish is required here, since close() on the stream calls it as the first statement.

@samarthjain samarthjain changed the title PARQUET-1643 Use airlift codecs for LZ4, LZ0, GZIP and SNAPPY PARQUET-1643 Use airlift codecs for LZ4, LZ0, GZIP Oct 16, 2019
@samarthjain
Copy link
Author

@samarthjain why did you remove Snappy support?

@nandorKollar - it looks like Parquet has its own implementation for Snappy which from what I can tell doesn't depend on native. Also, adding snappy support for airliftcomrpessor was causing snappy tests to fail. So I dropped support for it. I have updated the PR title also to reflect the same.

@samarthjain
Copy link
Author

@nandorKollar - I just pushed a commit to address changes you requested. Sorry for the delay. I had to punt working on this for various reasons.

@samarthjain samarthjain force-pushed the airliftcodecs branch 4 times, most recently from f5c76a6 to 4feb369 Compare February 26, 2020 19:50
@nandorKollar
Copy link
Contributor

@samarthjain thanks for addressing my comments, and sorry for the late reply. I have two additional question. I'm wondering if we might want to introduce a new configuration option to turn Airlift codecs on and off, in case something is wrong with Airlift, clients can still fall back to the original implementation. Not sure if it worths the effort, @gszadovszky what do you think?

I also noticed, that in other codecs we use org.apache.hadoop.io.compress.CodecPool, should we consider using it for Airlift compressors too? We can address this in a separate ticket though.

@gszadovszky
Copy link
Contributor

Without reviewing this change and knowing too much about Airlift I would say the configuration might make sense. Meanwhile, the main purpose of using a pure java compression codec over the ones provided by Hadoop is to be independent from Hadoop. However, our code is hardly relying on Hadoop (the whole read/write is implemented in parquet-hadoop) the target is to make parquet-mr work without Hadoop and its dependencies. So, I would suggest introducing new features in a way that it does not depend on Hadoop or it would be easy to remove the Hadoop dependencies.

@samarthjain
Copy link
Author

samarthjain commented Apr 9, 2020

@nandorKollar - I am not exactly sure where I can add this configuration which I was thinking of naming as parquet.airlift.compressors.enable

We want both ParquetReadOptions (with the config defined in ParquetInputFormat ) and ParquetRecordWriter to be able to use the config for instantiating the correct (de)compressor. Does that mean we need separate compression related configs for read and write?

For compressor:
In ParquetRecordWriter here:
https://github.com/apache/parquet-mr/blob/master/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetRecordWriter.java#L150

For decompressor:
In ParquetReadOptions here:
https://github.com/apache/parquet-mr/blob/master/parquet-hadoop/src/main/java/org/apache/parquet/ParquetReadOptions.java#L302

so that the correct decompressor can be used by the ParquetFileReader over here:
https://github.com/apache/parquet-mr/blob/master/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java#L1036

@dbtsai
Copy link
Member

dbtsai commented Apr 30, 2020

@samarthjain thanks for the work. I am looking to deploy zstd parquet into prod, but that requires new hadoop with native library support which is not practical in many prod use-cases.

Since airlift is pure Java implementation, what's the performance implications for zstd? I saw there is a benchmark for GZIP, but I don't see benchmark for other codecs.

Also, do we consider to use zstd-jin which is a Java library that packages native implementation of zstd for different platforms in jar?

@samarthjain
Copy link
Author

Force pushed a new commit that makes it configurable whether to use Airlift based compressors or not. Also added tests and GZIP benchmarks for Airlift compressors. Benchmark results reveal that there are no performance improvements or regressions when using Airlift GZIP vs plain GZIP.

PageChecksumReadBenchmarks.read10MRowsAirliftGzipWithVerification                    3     6.431 ±    0.741
PageChecksumReadBenchmarks.read10MRowsAirliftGzipWithoutVerification                 3     6.605 ±    0.709
PageChecksumReadBenchmarks.read10MRowsGzipWithVerification                           3     6.468 ±    0.700
PageChecksumReadBenchmarks.read10MRowsGzipWithoutVerification                        3     6.583 ±    1.538

PageChecksumWriteBenchmarks.write10MRowsAirliftGzipWithChecksums                     3    36.333 ±    0.510
PageChecksumWriteBenchmarks.write10MRowsAirliftGzipWithoutChecksums                  3    36.069 ±    1.096
PageChecksumWriteBenchmarks.write10MRowsGzipWithChecksums                            3    36.141 ±    1.095
PageChecksumWriteBenchmarks.write10MRowsGzipWithoutChecksums                         3    36.174 ±    5.125


ReadBenchmarks.read1MRowsDefaultBlockAndPageSizeAirliftGZIP                          3     0.898 ±    1.254
ReadBenchmarks.read1MRowsDefaultBlockAndPageSizeGZIP                                 3     0.891 ±    1.201

@samarthjain
Copy link
Author

@dbtsai

Since airlift is pure Java implementation, what's the performance implications for zstd? I saw there is a benchmark for GZIP, but I don't see benchmark for other codecs.
It looks like the zstd Airlift implementation doesn't implement the Hadoop APIs. It can be integrated within Parquet, but will take some work worth definitely worthy of another PR.

@samarthjain
Copy link
Author

@nandorKollar, @rdblue, @danielcweeks - if you have cycles, could you please take a look at this PR.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants