Skip to content

Commit

Permalink
PARQUET-1643 Use airlift codecs for LZ4, LZ0, GZIP
Browse files Browse the repository at this point in the history
  • Loading branch information
samarthjain committed Feb 26, 2020
1 parent 806037c commit 4feb369
Show file tree
Hide file tree
Showing 6 changed files with 207 additions and 17 deletions.
5 changes: 5 additions & 0 deletions parquet-hadoop/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,11 @@
<version>${slf4j.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.airlift</groupId>
<artifactId>aircompressor</artifactId>
<version>0.16</version>
</dependency>
</dependencies>

<build>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.parquet.hadoop;

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.CompressionOutputStream;
import org.apache.hadoop.io.compress.Compressor;
import org.apache.parquet.bytes.BytesInput;
import org.apache.parquet.compression.CompressionCodecFactory;
import org.apache.parquet.hadoop.metadata.CompressionCodecName;

public class AirliftCompressor extends CodecFactory.BytesCompressor {
private final Compressor compressor;
private final ByteArrayOutputStream compressedOutBuffer;
private final CompressionCodec hadoopCodec;
private final CompressionCodecName parquetCodecName;

AirliftCompressor(CompressionCodecName parquetCodecName, CompressionCodec hadoopCodec, int pageSize) {
this.parquetCodecName = parquetCodecName;
this.hadoopCodec = hadoopCodec;
this.compressor = hadoopCodec.createCompressor();
this.compressedOutBuffer = new ByteArrayOutputStream(pageSize);
}

@Override
public BytesInput compress(BytesInput bytes) throws IOException {
compressedOutBuffer.reset();
try (CompressionOutputStream cos = hadoopCodec.createOutputStream(compressedOutBuffer, compressor)) {
bytes.writeAllTo(cos);
return BytesInput.from(compressedOutBuffer);
}
}

@Override
public CompressionCodecName getCodecName() {
return parquetCodecName;
}

@Override
public void release() {}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.parquet.hadoop;

import io.airlift.compress.gzip.JdkGzipCodec;
import io.airlift.compress.lz4.Lz4Codec;
import io.airlift.compress.lzo.LzoCodec;
import org.apache.parquet.compression.CompressionCodecFactory;
import org.apache.parquet.hadoop.metadata.CompressionCodecName;

public class AirliftCompressorCodecFactory implements CompressionCodecFactory {

private final int pageSize;

AirliftCompressorCodecFactory(int pageSize) {
this.pageSize = pageSize;
}

@Override
public CodecFactory.BytesCompressor getCompressor(CompressionCodecName codecName) {
switch (codecName.getParquetCompressionCodec()) {
case GZIP:
return new AirliftCompressor(codecName, new JdkGzipCodec(), pageSize);
case LZO:
return new AirliftCompressor(codecName, new LzoCodec(), pageSize);
case LZ4:
return new AirliftCompressor(codecName, new Lz4Codec(), pageSize);
default:
throw new IllegalArgumentException("Codec not supported in AirliftCompressorCodecFactory: " + codecName);
}
}

@Override
public CodecFactory.BytesDecompressor getDecompressor(CompressionCodecName codecName) {
switch (codecName.getParquetCompressionCodec()) {
case GZIP:
return new AirliftDecompressor(new JdkGzipCodec());
case LZO:
return new AirliftDecompressor(new LzoCodec());
case LZ4:
return new AirliftDecompressor(new Lz4Codec());
default:
throw new IllegalArgumentException("Codec not supported in AirliftCompressorCodecFactory: " + codecName);
}
}

@Override
public void release() {}

static boolean isSupported(CompressionCodecName codecName) {
switch (codecName.getParquetCompressionCodec()) {
case GZIP:
case LZO:
case LZ4:
return true;
default:
return false;
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.parquet.hadoop;

import java.io.IOException;
import java.io.InputStream;
import java.nio.ByteBuffer;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.Decompressor;
import org.apache.parquet.bytes.BytesInput;
import org.apache.parquet.compression.CompressionCodecFactory;

public class AirliftDecompressor extends CodecFactory.BytesDecompressor {
private CompressionCodec codec;
private final Decompressor decompressor;

public AirliftDecompressor(CompressionCodec codec) {
this.codec = codec;
decompressor = codec.createDecompressor();
}

@Override
public BytesInput decompress(BytesInput bytes, int uncompressedSize) throws IOException {
decompressor.reset();
InputStream is = codec.createInputStream(bytes.toInputStream(), decompressor);
return BytesInput.from(is, uncompressedSize);
}

@Override
public void decompress(ByteBuffer input, int compressedSize, ByteBuffer output, int uncompressedSize)
throws IOException {
ByteBuffer decompressed = decompress(BytesInput.from(input), uncompressedSize).toByteBuffer();
output.put(decompressed);
}

@Override
public void release() {}
}

Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,9 @@ public CompressionCodecName getCodecName() {

@Override
public BytesCompressor getCompressor(CompressionCodecName codecName) {
if (AirliftCompressorCodecFactory.isSupported(codecName)) {
return new AirliftCompressorCodecFactory(pageSize).getCompressor(codecName);
}
BytesCompressor comp = compressors.get(codecName);
if (comp == null) {
comp = createCompressor(codecName);
Expand All @@ -194,6 +197,9 @@ public BytesCompressor getCompressor(CompressionCodecName codecName) {

@Override
public BytesDecompressor getDecompressor(CompressionCodecName codecName) {
if (AirliftCompressorCodecFactory.isSupported(codecName)) {
return new AirliftCompressorCodecFactory(pageSize).getDecompressor(codecName);
}
BytesDecompressor decomp = decompressors.get(codecName);
if (decomp == null) {
decomp = createDecompressor(codecName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,7 @@
*/
package org.apache.parquet.hadoop;



import java.io.InputStream;
import java.lang.reflect.Method;
import java.lang.reflect.InvocationTargetException;
import java.io.IOException;
Expand All @@ -27,7 +26,6 @@
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;

import org.apache.commons.pool.BasePoolableObjectFactory;
import org.apache.commons.pool.impl.GenericObjectPool;
import org.apache.hadoop.conf.Configuration;
Expand Down Expand Up @@ -149,34 +147,25 @@ public void close() {
*/
public class IndirectDecompressor extends BytesDecompressor {
private final Decompressor decompressor;
private final CompressionCodec codec;

public IndirectDecompressor(CompressionCodec codec) {
this.decompressor = DirectCodecPool.INSTANCE.codec(codec).borrowDecompressor();
this.codec = codec;
}

@Override
public BytesInput decompress(BytesInput bytes, int uncompressedSize) throws IOException {
decompressor.reset();
byte[] inputBytes = bytes.toByteArray();
decompressor.setInput(inputBytes, 0, inputBytes.length);
byte[] output = new byte[uncompressedSize];
decompressor.decompress(output, 0, uncompressedSize);
return BytesInput.from(output);
InputStream is = codec.createInputStream(bytes.toInputStream(), decompressor);
return BytesInput.from(is, uncompressedSize);
}

@Override
public void decompress(ByteBuffer input, int compressedSize, ByteBuffer output, int uncompressedSize)
throws IOException {

decompressor.reset();
byte[] inputBytes = new byte[compressedSize];
input.position(0);
input.get(inputBytes);
decompressor.setInput(inputBytes, 0, inputBytes.length);
byte[] outputBytes = new byte[uncompressedSize];
decompressor.decompress(outputBytes, 0, uncompressedSize);
output.clear();
output.put(outputBytes);
output.put(decompress(BytesInput.from(input), uncompressedSize).toByteBuffer());
}

@Override
Expand Down

0 comments on commit 4feb369

Please sign in to comment.