Skip to content

Commit

Permalink
ORC: Add compression properties (apache#4273)
Browse files Browse the repository at this point in the history
  • Loading branch information
zhongyujiang authored Mar 30, 2022
1 parent 8321176 commit aea336e
Show file tree
Hide file tree
Showing 4 changed files with 125 additions and 26 deletions.
8 changes: 8 additions & 0 deletions core/src/main/java/org/apache/iceberg/TableProperties.java
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,14 @@ private TableProperties() {
public static final String DELETE_ORC_WRITE_BATCH_SIZE = "write.delete.orc.vectorized.batch-size";
public static final int ORC_WRITE_BATCH_SIZE_DEFAULT = 1024;

public static final String ORC_COMPRESSION = "write.orc.compression-codec";
public static final String DELETE_ORC_COMPRESSION = "write.delete.orc.compression-codec";
public static final String ORC_COMPRESSION_DEFAULT = "zlib";

public static final String ORC_COMPRESSION_STRATEGY = "write.orc.compression-strategy";
public static final String DELETE_ORC_COMPRESSION_STRATEGY = "write.delete.orc.compression-strategy";
public static final String ORC_COMPRESSION_STRATEGY_DEFAULT = "speed";

public static final String SPLIT_SIZE = "read.split.target-size";
public static final long SPLIT_SIZE_DEFAULT = 128 * 1024 * 1024; // 128 MB

Expand Down
2 changes: 2 additions & 0 deletions docs/tables/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,8 @@ Iceberg tables support table properties to configure table behavior, like the de
| write.avro.compression-level | null | Avro compression level |
| write.orc.stripe-size-bytes | 67108864 (64 MB) | Define the default ORC stripe size, in bytes |
| write.orc.block-size-bytes | 268435456 (256 MB) | Define the default file system block size for ORC files |
| write.orc.compression-codec | zlib | ORC compression codec: zstd, lz4, lzo, zlib, snappy, none |
| write.orc.compression-strategy | speed | ORC compression strategy: speed, compression |
| write.location-provider.impl | null | Optional custom implemention for LocationProvider |
| write.metadata.compression-codec | none | Metadata compression codec; none or gzip |
| write.metadata.metrics.default | truncate(16) | Default metrics mode for all columns in the table; none, counts, truncate(length), or full |
Expand Down
102 changes: 84 additions & 18 deletions orc/src/main/java/org/apache/iceberg/orc/ORC.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Objects;
import java.util.function.BiFunction;
Expand Down Expand Up @@ -58,13 +59,31 @@
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.util.ArrayUtil;
import org.apache.iceberg.util.PropertyUtil;
import org.apache.orc.CompressionKind;
import org.apache.orc.OrcConf;
import org.apache.orc.OrcFile;
import org.apache.orc.OrcFile.CompressionStrategy;
import org.apache.orc.OrcFile.ReaderOptions;
import org.apache.orc.Reader;
import org.apache.orc.TypeDescription;
import org.apache.orc.storage.ql.exec.vector.VectorizedRowBatch;

import static org.apache.iceberg.TableProperties.DELETE_ORC_BLOCK_SIZE_BYTES;
import static org.apache.iceberg.TableProperties.DELETE_ORC_COMPRESSION;
import static org.apache.iceberg.TableProperties.DELETE_ORC_COMPRESSION_STRATEGY;
import static org.apache.iceberg.TableProperties.DELETE_ORC_STRIPE_SIZE_BYTES;
import static org.apache.iceberg.TableProperties.DELETE_ORC_WRITE_BATCH_SIZE;
import static org.apache.iceberg.TableProperties.ORC_BLOCK_SIZE_BYTES;
import static org.apache.iceberg.TableProperties.ORC_BLOCK_SIZE_BYTES_DEFAULT;
import static org.apache.iceberg.TableProperties.ORC_COMPRESSION;
import static org.apache.iceberg.TableProperties.ORC_COMPRESSION_DEFAULT;
import static org.apache.iceberg.TableProperties.ORC_COMPRESSION_STRATEGY;
import static org.apache.iceberg.TableProperties.ORC_COMPRESSION_STRATEGY_DEFAULT;
import static org.apache.iceberg.TableProperties.ORC_STRIPE_SIZE_BYTES;
import static org.apache.iceberg.TableProperties.ORC_STRIPE_SIZE_BYTES_DEFAULT;
import static org.apache.iceberg.TableProperties.ORC_WRITE_BATCH_SIZE;
import static org.apache.iceberg.TableProperties.ORC_WRITE_BATCH_SIZE_DEFAULT;

@SuppressWarnings("checkstyle:AbbreviationAsWordInName")
public class ORC {

Expand Down Expand Up @@ -173,16 +192,18 @@ public <D> FileAppender<D> build() {
}

// for compatibility
if (conf.get(VECTOR_ROW_BATCH_SIZE) != null && config.get(TableProperties.ORC_WRITE_BATCH_SIZE) == null) {
config.put(TableProperties.ORC_WRITE_BATCH_SIZE, conf.get(VECTOR_ROW_BATCH_SIZE));
if (conf.get(VECTOR_ROW_BATCH_SIZE) != null && config.get(ORC_WRITE_BATCH_SIZE) == null) {
config.put(ORC_WRITE_BATCH_SIZE, conf.get(VECTOR_ROW_BATCH_SIZE));
}

// Map Iceberg properties to pass down to the ORC writer
Context context = createContextFunc.apply(config);
conf.setLong(OrcConf.STRIPE_SIZE.getAttribute(), context.stripeSize());
conf.setLong(OrcConf.BLOCK_SIZE.getAttribute(), context.blockSize());

conf.setBoolean(OrcConf.OVERWRITE_OUTPUT_FILE.getAttribute(), overwrite);
OrcConf.STRIPE_SIZE.setLong(conf, context.stripeSize());
OrcConf.BLOCK_SIZE.setLong(conf, context.blockSize());
OrcConf.COMPRESS.setString(conf, context.compressionKind().name());
OrcConf.COMPRESSION_STRATEGY.setString(conf, context.compressionStrategy().name());
OrcConf.OVERWRITE_OUTPUT_FILE.setBoolean(conf, overwrite);

return new OrcFileAppender<>(schema,
this.file, createWriterFunc, conf, metadata,
Expand All @@ -193,6 +214,8 @@ private static class Context {
private final long stripeSize;
private final long blockSize;
private final int vectorizedRowBatchSize;
private final CompressionKind compressionKind;
private final CompressionStrategy compressionStrategy;

public long stripeSize() {
return stripeSize;
Expand All @@ -206,43 +229,86 @@ public int vectorizedRowBatchSize() {
return vectorizedRowBatchSize;
}

private Context(long stripeSize, long blockSize, int vectorizedRowBatchSize) {
public CompressionKind compressionKind() {
return compressionKind;
}

public CompressionStrategy compressionStrategy() {
return compressionStrategy;
}

private Context(long stripeSize, long blockSize, int vectorizedRowBatchSize,
CompressionKind compressionKind, CompressionStrategy compressionStrategy) {
this.stripeSize = stripeSize;
this.blockSize = blockSize;
this.vectorizedRowBatchSize = vectorizedRowBatchSize;
this.compressionKind = compressionKind;
this.compressionStrategy = compressionStrategy;
}

static Context dataContext(Map<String, String> config) {
long stripeSize = PropertyUtil.propertyAsLong(config, OrcConf.STRIPE_SIZE.getAttribute(),
TableProperties.ORC_STRIPE_SIZE_BYTES_DEFAULT);
stripeSize = PropertyUtil.propertyAsLong(config, TableProperties.ORC_STRIPE_SIZE_BYTES, stripeSize);
ORC_STRIPE_SIZE_BYTES_DEFAULT);
stripeSize = PropertyUtil.propertyAsLong(config, ORC_STRIPE_SIZE_BYTES, stripeSize);
Preconditions.checkArgument(stripeSize > 0, "Stripe size must be > 0");

long blockSize = PropertyUtil.propertyAsLong(config, OrcConf.BLOCK_SIZE.getAttribute(),
TableProperties.ORC_BLOCK_SIZE_BYTES_DEFAULT);
blockSize = PropertyUtil.propertyAsLong(config, TableProperties.ORC_BLOCK_SIZE_BYTES, blockSize);
ORC_BLOCK_SIZE_BYTES_DEFAULT);
blockSize = PropertyUtil.propertyAsLong(config, ORC_BLOCK_SIZE_BYTES, blockSize);
Preconditions.checkArgument(blockSize > 0, "Block size must be > 0");

int vectorizedRowBatchSize = PropertyUtil.propertyAsInt(config,
TableProperties.ORC_WRITE_BATCH_SIZE, TableProperties.ORC_WRITE_BATCH_SIZE_DEFAULT);
ORC_WRITE_BATCH_SIZE, ORC_WRITE_BATCH_SIZE_DEFAULT);
Preconditions.checkArgument(vectorizedRowBatchSize > 0, "VectorizedRow batch size must be > 0");

return new Context(stripeSize, blockSize, vectorizedRowBatchSize);
String codecAsString = PropertyUtil.propertyAsString(config, OrcConf.COMPRESS.getAttribute(),
ORC_COMPRESSION_DEFAULT);
codecAsString = PropertyUtil.propertyAsString(config, ORC_COMPRESSION, codecAsString);
CompressionKind compressionKind = toCompressionKind(codecAsString);

String strategyAsString = PropertyUtil.propertyAsString(config, OrcConf.COMPRESSION_STRATEGY.getAttribute(),
ORC_COMPRESSION_STRATEGY_DEFAULT);
strategyAsString = PropertyUtil.propertyAsString(config, ORC_COMPRESSION_STRATEGY, strategyAsString);
CompressionStrategy compressionStrategy = toCompressionStrategy(strategyAsString);

return new Context(stripeSize, blockSize, vectorizedRowBatchSize, compressionKind, compressionStrategy);
}

static Context deleteContext(Map<String, String> config) {
Context dataContext = dataContext(config);

long stripeSize = PropertyUtil.propertyAsLong(config,
TableProperties.DELETE_ORC_STRIPE_SIZE_BYTES, dataContext.stripeSize());
long stripeSize = PropertyUtil.propertyAsLong(config, DELETE_ORC_STRIPE_SIZE_BYTES, dataContext.stripeSize());

long blockSize = PropertyUtil.propertyAsLong(config,
TableProperties.DELETE_ORC_BLOCK_SIZE_BYTES, dataContext.blockSize());
long blockSize = PropertyUtil.propertyAsLong(config, DELETE_ORC_BLOCK_SIZE_BYTES, dataContext.blockSize());

int vectorizedRowBatchSize = PropertyUtil.propertyAsInt(config,
TableProperties.DELETE_ORC_WRITE_BATCH_SIZE, dataContext.vectorizedRowBatchSize());
DELETE_ORC_WRITE_BATCH_SIZE, dataContext.vectorizedRowBatchSize());

String codecAsString = config.get(DELETE_ORC_COMPRESSION);
CompressionKind compressionKind = codecAsString != null ? toCompressionKind(codecAsString) :
dataContext.compressionKind();

String strategyAsString = config.get(DELETE_ORC_COMPRESSION_STRATEGY);
CompressionStrategy compressionStrategy =
strategyAsString != null ? toCompressionStrategy(strategyAsString) : dataContext.compressionStrategy();

return new Context(stripeSize, blockSize, vectorizedRowBatchSize, compressionKind, compressionStrategy);
}

private static CompressionKind toCompressionKind(String codecAsString) {
try {
return CompressionKind.valueOf(codecAsString.toUpperCase(Locale.ENGLISH));
} catch (IllegalArgumentException e) {
throw new IllegalArgumentException("Unsupported compression codec: " + codecAsString);
}
}

return new Context(stripeSize, blockSize, vectorizedRowBatchSize);
private static CompressionStrategy toCompressionStrategy(String strategyAsString) {
try {
return CompressionStrategy.valueOf(strategyAsString.toUpperCase(Locale.ENGLISH));
} catch (IllegalArgumentException e) {
throw new IllegalArgumentException("Unsupported compression strategy: " + strategyAsString);
}
}
}
}
Expand Down
39 changes: 31 additions & 8 deletions orc/src/test/java/org/apache/iceberg/orc/TestTableProperties.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
package org.apache.iceberg.orc;

import java.io.File;
import java.util.Random;
import org.apache.hadoop.conf.Configuration;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.Files;
Expand All @@ -35,7 +36,9 @@
import org.apache.iceberg.io.FileAppender;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.types.Types;
import org.apache.orc.CompressionKind;
import org.apache.orc.OrcConf;
import org.apache.orc.OrcFile.CompressionStrategy;
import org.junit.Assert;
import org.junit.ClassRule;
import org.junit.Test;
Expand All @@ -53,12 +56,20 @@ public class TestTableProperties {

@Test
public void testOrcTableProperties() throws Exception {
Long stripeSizeBytes = 32L * 1024 * 1024;
Long blockSizeBytes = 128L * 1024 * 1024;
Random random = new Random();
int numOfCodecs = CompressionKind.values().length;
int numOfStrategies = CompressionStrategy.values().length;

long stripeSizeBytes = 32L * 1024 * 1024;
long blockSizeBytes = 128L * 1024 * 1024;
String codecAsString = CompressionKind.values()[random.nextInt(numOfCodecs)].name();
String strategyAsString = CompressionStrategy.values()[random.nextInt(numOfStrategies)].name();

ImmutableMap<String, String> properties = ImmutableMap.of(
TableProperties.ORC_STRIPE_SIZE_BYTES, String.valueOf(stripeSizeBytes),
TableProperties.ORC_BLOCK_SIZE_BYTES, String.valueOf(blockSizeBytes),
TableProperties.ORC_COMPRESSION, codecAsString,
TableProperties.ORC_COMPRESSION_STRATEGY, strategyAsString,
TableProperties.DEFAULT_FILE_FORMAT, FileFormat.ORC.name());

File folder = TEMPORARY_FOLDER.newFolder();
Expand All @@ -82,19 +93,29 @@ public void testOrcTableProperties() throws Exception {
DynFields.builder().hiddenImpl(writer.getClass(), "conf").build(writer);

Configuration configuration = confField.get();
Assert.assertEquals(String.valueOf(blockSizeBytes), configuration.get(OrcConf.BLOCK_SIZE.getAttribute()));
Assert.assertEquals(String.valueOf(stripeSizeBytes), configuration.get(OrcConf.STRIPE_SIZE.getAttribute()));
Assert.assertEquals(blockSizeBytes, OrcConf.BLOCK_SIZE.getLong(configuration));
Assert.assertEquals(stripeSizeBytes, OrcConf.STRIPE_SIZE.getLong(configuration));
Assert.assertEquals(codecAsString, OrcConf.COMPRESS.getString(configuration));
Assert.assertEquals(strategyAsString, OrcConf.COMPRESSION_STRATEGY.getString(configuration));
Assert.assertEquals(FileFormat.ORC.name(), configuration.get(TableProperties.DEFAULT_FILE_FORMAT));
}

@Test
public void testOrcTableDeleteProperties() throws Exception {
Long stripeSizeBytes = 32L * 1024 * 1024;
Long blockSizeBytes = 128L * 1024 * 1024;
Random random = new Random();
int numOfCodecs = CompressionKind.values().length;
int numOfStrategies = CompressionStrategy.values().length;

long stripeSizeBytes = 32L * 1024 * 1024;
long blockSizeBytes = 128L * 1024 * 1024;
String codecAsString = CompressionKind.values()[random.nextInt(numOfCodecs)].name();
String strategyAsString = CompressionStrategy.values()[random.nextInt(numOfStrategies)].name();

ImmutableMap<String, String> properties = ImmutableMap.of(
TableProperties.DELETE_ORC_STRIPE_SIZE_BYTES, String.valueOf(stripeSizeBytes),
TableProperties.DELETE_ORC_BLOCK_SIZE_BYTES, String.valueOf(blockSizeBytes),
TableProperties.DELETE_ORC_COMPRESSION, codecAsString,
TableProperties.DELETE_ORC_COMPRESSION_STRATEGY, strategyAsString,
TableProperties.DEFAULT_FILE_FORMAT, FileFormat.ORC.name());

File folder = TEMPORARY_FOLDER.newFolder();
Expand Down Expand Up @@ -123,8 +144,10 @@ public void testOrcTableDeleteProperties() throws Exception {
DynFields.builder().hiddenImpl(orcFileAppender.getClass(), "conf").build(orcFileAppender);

Configuration configuration = confField.get();
Assert.assertEquals(String.valueOf(blockSizeBytes), configuration.get(OrcConf.BLOCK_SIZE.getAttribute()));
Assert.assertEquals(String.valueOf(stripeSizeBytes), configuration.get(OrcConf.STRIPE_SIZE.getAttribute()));
Assert.assertEquals(blockSizeBytes, OrcConf.BLOCK_SIZE.getLong(configuration));
Assert.assertEquals(stripeSizeBytes, OrcConf.STRIPE_SIZE.getLong(configuration));
Assert.assertEquals(codecAsString, OrcConf.COMPRESS.getString(configuration));
Assert.assertEquals(strategyAsString, OrcConf.COMPRESSION_STRATEGY.getString(configuration));
Assert.assertEquals(FileFormat.ORC.name(), configuration.get(TableProperties.DEFAULT_FILE_FORMAT));
}
}

0 comments on commit aea336e

Please sign in to comment.