Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

PARQUET-2336: Add caching key to CodecFactory #1134

Merged
merged 4 commits into from
Sep 18, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -173,10 +173,10 @@ public BytesInput compress(BytesInput bytes) throws IOException {
// null compressor for non-native gzip
compressor.reset();
}
CompressionOutputStream cos = codec.createOutputStream(compressedOutBuffer, compressor);
bytes.writeAllTo(cos);
cos.finish();
cos.close();
try (CompressionOutputStream cos = codec.createOutputStream(compressedOutBuffer, compressor)) {
bytes.writeAllTo(cos);
cos.finish();
}
compressedBytes = BytesInput.from(compressedOutBuffer);
}
return compressedBytes;
Expand Down Expand Up @@ -234,7 +234,8 @@ protected CompressionCodec getCodec(CompressionCodecName codecName) {
if (codecClassName == null) {
return null;
}
CompressionCodec codec = CODEC_BY_NAME.get(codecClassName);
String codecCacheKey = this.cacheKey(codecName);
CompressionCodec codec = CODEC_BY_NAME.get(codecCacheKey);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since CODEC_BY_NAME is protected, I think this could break something that is relying on the cache, although I'm not sure why someone would access it directly. Maybe that visibility is an accident?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If that is a concern, we can cache the old key (w/o level) as well.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Personally, I'm not too worried about this, I don't see anyone doing this. At least nobody in the Apache org: https://github.com/search?q=org%3Aapache%20CODEC_BY_NAME&type=code :)

if (codec != null) {
return codec;
}
Expand All @@ -248,13 +249,32 @@ protected CompressionCodec getCodec(CompressionCodecName codecName) {
codecClass = configuration.getClassLoader().loadClass(codecClassName);
}
codec = (CompressionCodec) ReflectionUtils.newInstance(codecClass, configuration);
CODEC_BY_NAME.put(codecClassName, codec);
CODEC_BY_NAME.put(codecCacheKey, codec);
return codec;
} catch (ClassNotFoundException e) {
throw new BadConfigurationException("Class " + codecClassName + " was not found", e);
}
}

private String cacheKey(CompressionCodecName codecName) {
String level = null;
switch (codecName) {
case GZIP:
level = configuration.get("zlib.compress.level");
break;
case BROTLI:
level = configuration.get("compression.brotli.quality");
break;
case ZSTD:
level = configuration.get("parquet.compression.codec.zstd.level");
break;
default:
// compression level is not supported; ignore it
}
String codecClass = codecName.getHadoopCompressionCodecClassName();
return level == null ? codecClass : codecClass + ":" + level;
}

@Override
public void release() {
for (BytesCompressor compressor : compressors.values()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import java.util.Set;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.parquet.bytes.ByteBufferAllocator;
import org.apache.parquet.bytes.DirectByteBufferAllocator;
import org.apache.parquet.bytes.HeapByteBufferAllocator;
Expand Down Expand Up @@ -174,5 +175,55 @@ public void compressionCodecs() {
}
}
}

static class PublicCodecFactory extends CodecFactory {
// To make getCodec public

public PublicCodecFactory(Configuration configuration, int pageSize) {
super(configuration, pageSize);
}

public org.apache.hadoop.io.compress.CompressionCodec getCodec(CompressionCodecName name) {
return super.getCodec(name);
}
}

@Test
public void cachingKeysGzip() {
Configuration config_zlib_2 = new Configuration();
config_zlib_2.set("zlib.compress.level", "2");

Configuration config_zlib_5 = new Configuration();
config_zlib_5.set("zlib.compress.level", "5");

final CodecFactory codecFactory_2 = new PublicCodecFactory(config_zlib_2, pageSize);
final CodecFactory codecFactory_5 = new PublicCodecFactory(config_zlib_5, pageSize);

CompressionCodec codec_2_1 = codecFactory_2.getCodec(CompressionCodecName.GZIP);
CompressionCodec codec_2_2 = codecFactory_2.getCodec(CompressionCodecName.GZIP);
CompressionCodec codec_5_1 = codecFactory_5.getCodec(CompressionCodecName.GZIP);

Assert.assertEquals(codec_2_1, codec_2_2);
Assert.assertNotEquals(codec_2_1, codec_5_1);
}

@Test
public void cachingKeysZstd() {
Configuration config_zstd_2 = new Configuration();
config_zstd_2.set("parquet.compression.codec.zstd.level", "2");

Configuration config_zstd_5 = new Configuration();
config_zstd_5.set("parquet.compression.codec.zstd.level", "5");

final CodecFactory codecFactory_2 = new PublicCodecFactory(config_zstd_2, pageSize);
final CodecFactory codecFactory_5 = new PublicCodecFactory(config_zstd_5, pageSize);

CompressionCodec codec_2_1 = codecFactory_2.getCodec(CompressionCodecName.ZSTD);
CompressionCodec codec_2_2 = codecFactory_2.getCodec(CompressionCodecName.ZSTD);
CompressionCodec codec_5_1 = codecFactory_5.getCodec(CompressionCodecName.ZSTD);

Assert.assertEquals(codec_2_1, codec_2_2);
Assert.assertNotEquals(codec_2_1, codec_5_1);
}
}