Skip to content

Commit

Permalink
Add airlift compression codec based compressors and decompressors
Browse files Browse the repository at this point in the history
  • Loading branch information
samarthjain committed Sep 2, 2019
1 parent 62ce7a4 commit b9a7dbe
Show file tree
Hide file tree
Showing 12 changed files with 236 additions and 33 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,10 @@
public enum CompressionCodecName {
UNCOMPRESSED(null, CompressionCodec.UNCOMPRESSED, ""),
SNAPPY("org.apache.parquet.hadoop.codec.SnappyCodec", CompressionCodec.SNAPPY, ".snappy"),
GZIP("io.airlift.compress.gzip.JdkGzipCodec", CompressionCodec.GZIP, ".gz"),
LZO("io.airlift.compress.lzo.LzoCodec", CompressionCodec.LZO, ".lzo"),
GZIP("org.apache.hadoop.io.compress.GzipCodec", CompressionCodec.GZIP, ".gz"),
LZO("com.hadoop.compression.lzo.LzoCodec", CompressionCodec.LZO, ".lzo"),
BROTLI("org.apache.hadoop.io.compress.BrotliCodec", CompressionCodec.BROTLI, ".br"),
LZ4("io.airlift.compress.lz4.Lz4Codec", CompressionCodec.LZ4, ".lz4"),
LZ4("org.apache.hadoop.io.compress.Lz4Codec", CompressionCodec.LZ4, ".lz4"),
ZSTD("org.apache.hadoop.io.compress.ZStandardCodec", CompressionCodec.ZSTD, ".zstd");

public static CompressionCodecName fromConf(String name) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.parquet.hadoop;

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.CompressionOutputStream;
import org.apache.hadoop.io.compress.Compressor;
import org.apache.parquet.bytes.BytesInput;
import org.apache.parquet.compression.CompressionCodecFactory;
import org.apache.parquet.hadoop.metadata.CompressionCodecName;

public class AirliftCompressor implements CompressionCodecFactory.BytesInputCompressor {
private final Compressor compressor;
private final ByteArrayOutputStream compressedOutBuffer;
private final CompressionCodec codec;

AirliftCompressor(CompressionCodec codec, int pageSize) {
this.codec = codec;
this.compressor = codec.createCompressor();
this.compressedOutBuffer = new ByteArrayOutputStream(pageSize);
}

@Override
public BytesInput compress(BytesInput bytes) throws IOException {
compressedOutBuffer.reset();
CompressionOutputStream cos = codec.createOutputStream(compressedOutBuffer, compressor);
bytes.writeAllTo(cos);
cos.finish();
cos.close();
return BytesInput.from(compressedOutBuffer);
}

@Override
public CompressionCodecName getCodecName() {
return CompressionCodecName.GZIP;
}

@Override
public void release() {}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.parquet.hadoop;

import io.airlift.compress.gzip.JdkGzipCodec;
import io.airlift.compress.lz4.Lz4Codec;
import io.airlift.compress.lzo.LzoCodec;
import org.apache.parquet.compression.CompressionCodecFactory;
import org.apache.parquet.hadoop.codec.SnappyCodec;
import org.apache.parquet.hadoop.metadata.CompressionCodecName;

public class AirliftCompressorCodecFactory implements CompressionCodecFactory {

private final int pageSize;

AirliftCompressorCodecFactory(int pageSize) {
this.pageSize = pageSize;
}

@Override
public BytesInputCompressor getCompressor(CompressionCodecName codecName) {
switch (codecName.getParquetCompressionCodec()) {
case GZIP:
return new AirliftCompressor(new JdkGzipCodec(), pageSize);
case SNAPPY:
return new AirliftCompressor(new SnappyCodec(), pageSize);
case LZO:
return new AirliftCompressor(new LzoCodec(), pageSize);
case LZ4:
return new AirliftCompressor(new Lz4Codec(), pageSize);
default:
throw new IllegalArgumentException("Codec not supported in AirliftCompressorCodecFactory: " + codecName);
}
}

@Override
public BytesInputDecompressor getDecompressor(CompressionCodecName codecName) {
switch (codecName.getParquetCompressionCodec()) {
case GZIP:
return new AirliftDecompressor(new JdkGzipCodec());
case SNAPPY:
return new AirliftDecompressor(new SnappyCodec());
case LZO:
return new AirliftDecompressor(new LzoCodec());
case LZ4:
return new AirliftDecompressor(new Lz4Codec());
default:
throw new IllegalArgumentException("Codec not supported in AirliftCompressorCodecFactory: " + codecName);
}
}

@Override
public void release() {}

static boolean isSupported(CompressionCodecName codecName) {
switch (codecName.getParquetCompressionCodec()) {
case GZIP:
case SNAPPY:
case LZO:
case LZ4:
return true;
default:
return false;
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.parquet.hadoop;

import java.io.IOException;
import java.io.InputStream;
import java.nio.ByteBuffer;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.Decompressor;
import org.apache.parquet.bytes.BytesInput;
import org.apache.parquet.compression.CompressionCodecFactory;

public class AirliftDecompressor implements CompressionCodecFactory.BytesInputDecompressor {
private CompressionCodec codec;
private final Decompressor decompressor;

public AirliftDecompressor(CompressionCodec codec) {
this.codec = codec;
decompressor = codec.createDecompressor();
}

@Override
public BytesInput decompress(BytesInput bytes, int uncompressedSize) throws IOException {
decompressor.reset();
InputStream is = codec.createInputStream(bytes.toInputStream(), decompressor);
return BytesInput.from(is, uncompressedSize);
}

@Override
public void decompress(ByteBuffer input, int compressedSize, ByteBuffer output, int uncompressedSize)
throws IOException {
ByteBuffer decompressed = decompress(BytesInput.from(input), uncompressedSize).toByteBuffer();
output.put(decompressed);
}

public void release() {}
}

Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,10 @@ public CompressionCodecName getCodecName() {
}

@Override
public BytesCompressor getCompressor(CompressionCodecName codecName) {
public BytesInputCompressor getCompressor(CompressionCodecName codecName) {
if (AirliftCompressorCodecFactory.isSupported(codecName)) {
return new AirliftCompressorCodecFactory(pageSize).getCompressor(codecName);
}
BytesCompressor comp = compressors.get(codecName);
if (comp == null) {
comp = createCompressor(codecName);
Expand All @@ -193,7 +196,10 @@ public BytesCompressor getCompressor(CompressionCodecName codecName) {
}

@Override
public BytesDecompressor getDecompressor(CompressionCodecName codecName) {
public BytesInputDecompressor getDecompressor(CompressionCodecName codecName) {
if (AirliftCompressorCodecFactory.isSupported(codecName)) {
return new AirliftCompressorCodecFactory(pageSize).getDecompressor(codecName);
}
BytesDecompressor decomp = decompressors.get(codecName);
if (decomp == null) {
decomp = createDecompressor(codecName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
import java.util.Map;
import java.util.Set;
import java.util.zip.CRC32;

import org.apache.parquet.bytes.ByteBufferAllocator;
import org.apache.parquet.bytes.BytesInput;
import org.apache.parquet.bytes.ConcatenatingByteArrayCollector;
import org.apache.parquet.column.ColumnDescriptor;
Expand All @@ -37,13 +37,12 @@
import org.apache.parquet.column.page.PageWriteStore;
import org.apache.parquet.column.page.PageWriter;
import org.apache.parquet.column.statistics.Statistics;
import org.apache.parquet.compression.CompressionCodecFactory.BytesInputCompressor;
import org.apache.parquet.format.converter.ParquetMetadataConverter;
import org.apache.parquet.hadoop.CodecFactory.BytesCompressor;
import org.apache.parquet.internal.column.columnindex.ColumnIndexBuilder;
import org.apache.parquet.internal.column.columnindex.OffsetIndexBuilder;
import org.apache.parquet.io.ParquetEncodingException;
import org.apache.parquet.schema.MessageType;
import org.apache.parquet.bytes.ByteBufferAllocator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -55,7 +54,7 @@ class ColumnChunkPageWriteStore implements PageWriteStore {
private static final class ColumnChunkPageWriter implements PageWriter {

private final ColumnDescriptor path;
private final BytesCompressor compressor;
private final BytesInputCompressor compressor;

private final ByteArrayOutputStream tempOutputStream = new ByteArrayOutputStream();
private final ConcatenatingByteArrayCollector buf;
Expand All @@ -80,7 +79,7 @@ private static final class ColumnChunkPageWriter implements PageWriter {
boolean pageWriteChecksumEnabled;

private ColumnChunkPageWriter(ColumnDescriptor path,
BytesCompressor compressor,
BytesInputCompressor compressor,
ByteBufferAllocator allocator,
int columnIndexTruncateLength,
boolean pageWriteChecksumEnabled) {
Expand Down Expand Up @@ -294,13 +293,14 @@ public String memUsageString(String prefix) {
private final Map<ColumnDescriptor, ColumnChunkPageWriter> writers = new HashMap<ColumnDescriptor, ColumnChunkPageWriter>();
private final MessageType schema;

public ColumnChunkPageWriteStore(BytesCompressor compressor, MessageType schema, ByteBufferAllocator allocator,
public ColumnChunkPageWriteStore(BytesInputCompressor compressor, MessageType schema, ByteBufferAllocator allocator,
int columnIndexTruncateLength) {
this(compressor, schema, allocator, columnIndexTruncateLength,
ParquetProperties.DEFAULT_PAGE_WRITE_CHECKSUM_ENABLED);
}

public ColumnChunkPageWriteStore(BytesCompressor compressor, MessageType schema, ByteBufferAllocator allocator,
public ColumnChunkPageWriteStore(
BytesInputCompressor compressor, MessageType schema, ByteBufferAllocator allocator,
int columnIndexTruncateLength, boolean pageWriteChecksumEnabled) {
this.schema = schema;
for (ColumnDescriptor path : schema.getColumns()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,17 +18,12 @@
*/
package org.apache.parquet.hadoop;

import static java.lang.Math.max;
import static java.lang.Math.min;
import static org.apache.parquet.Preconditions.checkNotNull;

import java.io.IOException;
import java.util.HashMap;
import java.util.Map;

import org.apache.parquet.column.ColumnWriteStore;
import org.apache.parquet.column.ParquetProperties;
import org.apache.parquet.hadoop.CodecFactory.BytesCompressor;
import org.apache.parquet.compression.CompressionCodecFactory.BytesInputCompressor;
import org.apache.parquet.hadoop.api.WriteSupport;
import org.apache.parquet.hadoop.api.WriteSupport.FinalizedWriteContext;
import org.apache.parquet.hadoop.metadata.ParquetMetadata;
Expand All @@ -39,6 +34,10 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import static java.lang.Math.max;
import static java.lang.Math.min;
import static org.apache.parquet.Preconditions.checkNotNull;

class InternalParquetRecordWriter<T> {
private static final Logger LOG = LoggerFactory.getLogger(InternalParquetRecordWriter.class);

Expand All @@ -52,7 +51,7 @@ class InternalParquetRecordWriter<T> {
private final long rowGroupSize;
private long rowGroupSizeThreshold;
private long nextRowGroupSize;
private final BytesCompressor compressor;
private final BytesInputCompressor compressor;
private final boolean validating;
private final ParquetProperties props;

Expand Down Expand Up @@ -80,7 +79,7 @@ public InternalParquetRecordWriter(
MessageType schema,
Map<String, String> extraMetaData,
long rowGroupSize,
BytesCompressor compressor,
BytesInputCompressor compressor,
boolean validating,
ParquetProperties props) {
this.parquetFileWriter = parquetFileWriter;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,13 @@

import java.io.IOException;
import java.util.Map;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;

import org.apache.parquet.column.ParquetProperties;
import org.apache.parquet.column.ParquetProperties.WriterVersion;
import org.apache.parquet.hadoop.CodecFactory.BytesCompressor;
import org.apache.parquet.compression.CompressionCodecFactory;
import org.apache.parquet.compression.CompressionCodecFactory.BytesInputCompressor;
import org.apache.parquet.hadoop.api.WriteSupport;
import org.apache.parquet.hadoop.metadata.CompressionCodecName;
import org.apache.parquet.schema.MessageType;
Expand All @@ -45,7 +44,7 @@ public class ParquetRecordWriter<T> extends RecordWriter<Void, T> {

private final InternalParquetRecordWriter<T> internalWriter;
private final MemoryManager memoryManager;
private final CodecFactory codecFactory;
private final CompressionCodecFactory codecFactory;

/**
*
Expand All @@ -68,7 +67,7 @@ public ParquetRecordWriter(
MessageType schema,
Map<String, String> extraMetaData,
int blockSize, int pageSize,
BytesCompressor compressor,
BytesInputCompressor compressor,
int dictionaryPageSize,
boolean enableDictionary,
boolean validating,
Expand Down Expand Up @@ -107,7 +106,7 @@ public ParquetRecordWriter(
MessageType schema,
Map<String, String> extraMetaData,
long blockSize, int pageSize,
BytesCompressor compressor,
BytesInputCompressor compressor,
int dictionaryPageSize,
boolean enableDictionary,
boolean validating,
Expand Down
Loading

0 comments on commit b9a7dbe

Please sign in to comment.