Skip to content

Commit

Permalink
Throw TrinoException when user selects to LZ4 for Parquet Hive/Iceber…
Browse files Browse the repository at this point in the history
…g table

Previously an unchecked non-TrinoException was thrown.
  • Loading branch information
findepi committed Jan 30, 2024
1 parent c81f059 commit c8c14ba
Show file tree
Hide file tree
Showing 10 changed files with 30 additions and 38 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -465,7 +465,8 @@ private ParquetFileWriter createParquetFileWriter(Location path)
.setMaxPageSize(getParquetWriterPageSize(session))
.setMaxPageValueCount(getParquetWriterPageValueCount(session))
.build();
CompressionCodec compressionCodec = getCompressionCodec(session).getParquetCompressionCodec();
CompressionCodec compressionCodec = getCompressionCodec(session).getParquetCompressionCodec()
.orElseThrow(); // validated on the session property level

try {
Closeable rollbackAction = () -> fileSystem.deleteFile(path);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -362,7 +362,8 @@ private ParquetFileWriter createParquetFileWriter(Location path, List<DeltaLakeC
.setMaxPageSize(getParquetWriterPageSize(session))
.setMaxPageValueCount(getParquetWriterPageValueCount(session))
.build();
CompressionCodec compressionCodec = getCompressionCodec(session).getParquetCompressionCodec();
CompressionCodec compressionCodec = getCompressionCodec(session).getParquetCompressionCodec()
.orElseThrow(); // validated on the session property level

try {
Closeable rollbackAction = () -> fileSystem.deleteFile(path);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,27 +26,27 @@ public enum HiveCompressionCodec
{
NONE(null, CompressionKind.NONE, CompressionCodec.UNCOMPRESSED, AvroCompressionKind.NULL),
SNAPPY(io.trino.hive.formats.compression.CompressionKind.SNAPPY, CompressionKind.SNAPPY, CompressionCodec.SNAPPY, AvroCompressionKind.SNAPPY),
LZ4(io.trino.hive.formats.compression.CompressionKind.LZ4, CompressionKind.LZ4, CompressionCodec.LZ4, null),
LZ4(io.trino.hive.formats.compression.CompressionKind.LZ4, CompressionKind.LZ4, null, null),
ZSTD(io.trino.hive.formats.compression.CompressionKind.ZSTD, CompressionKind.ZSTD, CompressionCodec.ZSTD, AvroCompressionKind.ZSTANDARD),
// Using DEFLATE for GZIP for Avro for now so Avro files can be written in default configuration
// TODO(https://github.com/trinodb/trino/issues/12580) change GZIP to be unsupported for Avro when we change Trino default compression to be storage format aware
GZIP(io.trino.hive.formats.compression.CompressionKind.GZIP, CompressionKind.ZLIB, CompressionCodec.GZIP, AvroCompressionKind.DEFLATE);

private final Optional<io.trino.hive.formats.compression.CompressionKind> hiveCompressionKind;
private final CompressionKind orcCompressionKind;
private final CompressionCodec parquetCompressionCodec;
private final Optional<CompressionCodec> parquetCompressionCodec;

private final Optional<AvroCompressionKind> avroCompressionKind;

HiveCompressionCodec(
@Nullable io.trino.hive.formats.compression.CompressionKind hiveCompressionKind,
CompressionKind orcCompressionKind,
CompressionCodec parquetCompressionCodec,
@Nullable CompressionCodec parquetCompressionCodec,
@Nullable AvroCompressionKind avroCompressionKind)
{
this.hiveCompressionKind = Optional.ofNullable(hiveCompressionKind);
this.orcCompressionKind = requireNonNull(orcCompressionKind, "orcCompressionKind is null");
this.parquetCompressionCodec = requireNonNull(parquetCompressionCodec, "parquetCompressionCodec is null");
this.parquetCompressionCodec = Optional.ofNullable(parquetCompressionCodec);
this.avroCompressionKind = Optional.ofNullable(avroCompressionKind);
}

Expand All @@ -60,7 +60,7 @@ public CompressionKind getOrcCompressionKind()
return orcCompressionKind;
}

public CompressionCodec getParquetCompressionCodec()
public Optional<CompressionCodec> getParquetCompressionCodec()
{
return parquetCompressionCodec;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,9 @@ public static HiveCompressionCodec selectCompressionCodec(HiveCompressionOption
HiveCompressionCodec selectedCodec = selectCompressionCodec(compressionOption);

// perform codec vs format validation
if (storageFormat == HiveStorageFormat.AVRO && selectedCodec.getAvroCompressionKind().isEmpty()) {
throw new TrinoException(HIVE_UNSUPPORTED_FORMAT, "Compression codec %s not supported for AVRO".formatted(selectedCodec));
if ((storageFormat == HiveStorageFormat.PARQUET && selectedCodec.getParquetCompressionCodec().isEmpty()) ||
(storageFormat == HiveStorageFormat.AVRO && selectedCodec.getAvroCompressionKind().isEmpty())) {
throw new TrinoException(HIVE_UNSUPPORTED_FORMAT, "Compression codec %s not supported for %s".formatted(selectedCodec, storageFormat));
}

return selectedCodec;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,9 @@ public Optional<FileWriter> createFileWriter(
schemaConverter.getPrimitiveTypes(),
parquetWriterOptions,
fileInputColumnIndexes,
compressionCodec.getParquetCompressionCodec(),
compressionCodec.getParquetCompressionCodec()
// Ensured by the caller
.orElseThrow(() -> new IllegalArgumentException("Unsupported compression codec for Parquet: " + compressionCodec)),
nodeVersion.toString(),
Optional.of(parquetTimeZone),
validationInputFactory));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8077,16 +8077,9 @@ private void testCreateTableWithCompressionCodec(Session baseSession, HiveStorag
.build();
String tableName = "test_table_with_compression_" + compressionCodec;
String createTableSql = format("CREATE TABLE %s WITH (format = '%s') AS TABLE tpch.tiny.nation", tableName, storageFormat);
if (storageFormat == HiveStorageFormat.PARQUET && compressionCodec == HiveCompressionCodec.LZ4) {
// TODO (https://github.com/trinodb/trino/issues/9142) Support LZ4 compression with native Parquet writer
assertThatThrownBy(() -> computeActual(session, createTableSql))
.hasMessage("Unsupported codec: LZ4")
.isInstanceOf(QueryFailedException.class)
// TODO this should be TrinoException
.cause().hasToString("java.lang.RuntimeException: Unsupported codec: LZ4");
return;
}
if (storageFormat == HiveStorageFormat.AVRO && compressionCodec == HiveCompressionCodec.LZ4) {
// TODO (https://github.com/trinodb/trino/issues/9142) Support LZ4 compression with native Parquet writer
boolean unsupported = (storageFormat == HiveStorageFormat.PARQUET || storageFormat == HiveStorageFormat.AVRO) && compressionCodec == HiveCompressionCodec.LZ4;
if (unsupported) {
assertQueryFails(session, createTableSql, "Compression codec " + compressionCodec + " not supported for " + storageFormat);
return;
}
Expand Down Expand Up @@ -8114,16 +8107,9 @@ private void testCreateTableWithEmptyBucketsAndCompressionCodec(Session baseSess
.build();
String tableName = "test_table_with_compression_" + compressionCodec;
String createTableSql = format("CREATE TABLE %s WITH (format = '%s', bucketed_by = ARRAY['regionkey'], bucket_count = 7) AS TABLE tpch.tiny.nation", tableName, storageFormat);
if (storageFormat == HiveStorageFormat.PARQUET && compressionCodec == HiveCompressionCodec.LZ4) {
// TODO (https://github.com/trinodb/trino/issues/9142) Support LZ4 compression with native Parquet writer
assertThatThrownBy(() -> computeActual(session, createTableSql))
.hasMessage("Unsupported codec: LZ4")
.isInstanceOf(QueryFailedException.class)
// TODO this should be TrinoException
.cause().hasToString("java.lang.RuntimeException: Unsupported codec: LZ4");
return;
}
if (storageFormat == HiveStorageFormat.AVRO && compressionCodec == HiveCompressionCodec.LZ4) {
// TODO (https://github.com/trinodb/trino/issues/9142) Support LZ4 compression with native Parquet writer
boolean unsupported = (storageFormat == HiveStorageFormat.PARQUET || storageFormat == HiveStorageFormat.AVRO) && compressionCodec == HiveCompressionCodec.LZ4;
if (unsupported) {
assertQueryFails(session, createTableSql, "Compression codec " + compressionCodec + " not supported for " + storageFormat);
return;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1564,7 +1564,7 @@ private static void configureCompression(Configuration config, HiveCompressionCo
}

// For Parquet
config.set(ParquetOutputFormat.COMPRESSION, compressionCodec.getParquetCompressionCodec().name());
config.set(ParquetOutputFormat.COMPRESSION, compressionCodec.getParquetCompressionCodec().orElseThrow().name());

// For Avro
compressionCodec.getAvroCompressionKind().ifPresent(kind -> config.set("avro.output.codec", kind.toString()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import io.trino.orc.OutputStreamOrcDataSink;
import io.trino.parquet.writer.ParquetWriterOptions;
import io.trino.plugin.hive.FileFormatDataSourceStats;
import io.trino.plugin.hive.HiveCompressionCodec;
import io.trino.plugin.hive.NodeVersion;
import io.trino.plugin.hive.orc.OrcWriterConfig;
import io.trino.plugin.iceberg.fileio.ForwardingOutputFile;
Expand Down Expand Up @@ -182,6 +183,7 @@ private IcebergFileWriter createParquetWriter(
.setBatchSize(getParquetWriterBatchSize(session))
.build();

HiveCompressionCodec hiveCompressionCodec = getCompressionCodec(session);
return new IcebergParquetFileWriter(
metricsConfig,
outputFile,
Expand All @@ -192,7 +194,8 @@ private IcebergFileWriter createParquetWriter(
makeTypeMap(fileColumnTypes, fileColumnNames),
parquetWriterOptions,
IntStream.range(0, fileColumnNames.size()).toArray(),
getCompressionCodec(session).getParquetCompressionCodec(),
hiveCompressionCodec.getParquetCompressionCodec()
.orElseThrow(() -> new TrinoException(NOT_SUPPORTED, "Compression codec %s not supported for Parquet".formatted(hiveCompressionCodec))),
nodeVersion.toString());
}
catch (IOException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,7 @@
import static io.trino.testing.TestingSession.testSessionBuilder;
import static io.trino.testing.TransactionBuilder.transaction;
import static io.trino.testing.assertions.Assert.assertEventually;
import static io.trino.testing.assertions.TrinoExceptionAssert.assertTrinoExceptionThrownBy;
import static java.lang.String.format;
import static java.lang.String.join;
import static java.nio.charset.StandardCharsets.UTF_8;
Expand Down Expand Up @@ -7451,11 +7452,8 @@ private void testCreateTableWithCompressionCodec(HiveCompressionCodec compressio
String createTableSql = format("CREATE TABLE %s AS TABLE tpch.tiny.nation", tableName);
if (format == IcebergFileFormat.PARQUET && compressionCodec == HiveCompressionCodec.LZ4) {
// TODO (https://github.com/trinodb/trino/issues/9142) Support LZ4 compression with native Parquet writer
assertThatThrownBy(() -> computeActual(session, createTableSql))
.hasMessage("Unsupported codec: LZ4")
.isInstanceOf(QueryFailedException.class)
// TODO this should be TrinoException
.cause().hasToString("java.lang.RuntimeException: Unsupported codec: LZ4");
assertTrinoExceptionThrownBy(() -> computeActual(session, createTableSql))
.hasMessage("Compression codec LZ4 not supported for Parquet");
return;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1474,7 +1474,7 @@ public void testSparkReadingTrinoCompressedData(StorageFormat storageFormat, Str
if (storageFormat == StorageFormat.PARQUET && "LZ4".equals(compressionCodec)) {
// TODO (https://github.com/trinodb/trino/issues/9142) LZ4 is not supported with native Parquet writer
assertQueryFailure(() -> onTrino().executeQuery(createTable))
.hasMessageMatching("\\QQuery failed (#\\E\\S+\\Q): Unsupported codec: LZ4");
.hasMessageMatching("\\QQuery failed (#\\E\\S+\\Q): Compression codec LZ4 not supported for Parquet");
return;
}
if (storageFormat == StorageFormat.AVRO && compressionCodec.equals("LZ4")) {
Expand Down

0 comments on commit c8c14ba

Please sign in to comment.