Skip to content

Commit

Permalink
[SPARK-46759][SQL][AVRO] Codec xz and zstandard support compression l…
Browse files Browse the repository at this point in the history
…evel for avro files

### What changes were proposed in this pull request?

This PR introduces 2 keys in the form of 'spark.sql.avro.$codecName.level' just like the existing 'spark.sql.avro.deflate.level' for standard and xz codec. W/ this patch, users are able to play the trade-off between the speed and compression ratio when they use AVRO compressed by zstd or xz.

### Why are the changes needed?

Avro supports compression level for deflate, xz and zstd, but we have only supported deflate.

### Does this PR introduce _any_ user-facing change?

yes, new configurations added

### How was this patch tested?

new tests

### Was this patch authored or co-authored using generative AI tooling?

no

Closes apache#44786 from yaooqinn/SPARK-46759.

Authored-by: Kent Yao <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
  • Loading branch information
yaooqinn authored and dongjoon-hyun committed Jan 18, 2024
1 parent c040824 commit 01bb1b1
Show file tree
Hide file tree
Showing 5 changed files with 81 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,29 +22,43 @@
import java.util.Map;
import java.util.stream.Collectors;

import org.apache.avro.file.DataFileConstants;
import org.apache.avro.file.*;

/**
* A mapper class from Spark supported avro compression codecs to avro compression codecs.
*/
public enum AvroCompressionCodec {
UNCOMPRESSED(DataFileConstants.NULL_CODEC),
DEFLATE(DataFileConstants.DEFLATE_CODEC),
SNAPPY(DataFileConstants.SNAPPY_CODEC),
BZIP2(DataFileConstants.BZIP2_CODEC),
XZ(DataFileConstants.XZ_CODEC),
ZSTANDARD(DataFileConstants.ZSTANDARD_CODEC);
UNCOMPRESSED(DataFileConstants.NULL_CODEC, false, -1),
DEFLATE(DataFileConstants.DEFLATE_CODEC, true, CodecFactory.DEFAULT_DEFLATE_LEVEL),
SNAPPY(DataFileConstants.SNAPPY_CODEC, false, -1),
BZIP2(DataFileConstants.BZIP2_CODEC, false, -1),
XZ(DataFileConstants.XZ_CODEC, true, CodecFactory.DEFAULT_XZ_LEVEL),
ZSTANDARD(DataFileConstants.ZSTANDARD_CODEC, true, CodecFactory.DEFAULT_ZSTANDARD_LEVEL);

private final String codecName;
private final boolean supportCompressionLevel;
private final int defaultCompressionLevel;

AvroCompressionCodec(String codecName) {
AvroCompressionCodec(
String codecName,
boolean supportCompressionLevel, int defaultCompressionLevel) {
this.codecName = codecName;
this.supportCompressionLevel = supportCompressionLevel;
this.defaultCompressionLevel = defaultCompressionLevel;
}

public String getCodecName() {
return this.codecName;
}

public boolean getSupportCompressionLevel() {
return this.supportCompressionLevel;
}

public int getDefaultCompressionLevel() {
return this.defaultCompressionLevel;
}

private static final Map<String, String> codecNameMap =
Arrays.stream(AvroCompressionCodec.values()).collect(
Collectors.toMap(codec -> codec.name(), codec -> codec.name().toLowerCase(Locale.ROOT)));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import scala.jdk.CollectionConverters._
import org.apache.avro.Schema
import org.apache.avro.file.{DataFileReader, FileReader}
import org.apache.avro.generic.{GenericDatumReader, GenericRecord}
import org.apache.avro.mapred.{AvroOutputFormat, FsInput}
import org.apache.avro.mapred.FsInput
import org.apache.avro.mapreduce.AvroJob
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.FileStatus
Expand Down Expand Up @@ -110,10 +110,12 @@ private[sql] object AvroUtils extends Logging {
case compressed =>
job.getConfiguration.setBoolean("mapred.output.compress", true)
job.getConfiguration.set(AvroJob.CONF_OUTPUT_CODEC, compressed.getCodecName)
if (compressed == DEFLATE) {
val deflateLevel = sqlConf.avroDeflateLevel
logInfo(s"Compressing Avro output using the $codecName codec at level $deflateLevel")
job.getConfiguration.setInt(AvroOutputFormat.DEFLATE_LEVEL_KEY, deflateLevel)
if (compressed.getSupportCompressionLevel) {
val level = sqlConf.getConfString(s"spark.sql.avro.$codecName.level",
compressed.getDefaultCompressionLevel.toString)
logInfo(s"Compressing Avro output using the $codecName codec at level $level")
val s = if (compressed == ZSTANDARD) "zstd" else codecName
job.getConfiguration.setInt(s"avro.mapred.$s.level", level.toInt)
} else {
logInfo(s"Compressing Avro output using the $codecName codec")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package org.apache.spark.sql.avro
import java.util.Locale

import org.apache.spark.SparkIllegalArgumentException
import org.apache.spark.sql.Row
import org.apache.spark.sql.execution.datasources.FileSourceCodecSuite
import org.apache.spark.sql.internal.SQLConf

Expand Down Expand Up @@ -58,4 +59,20 @@ class AvroCodecSuite extends FileSourceCodecSuite {
parameters = Map("codecName" -> "unsupported")
)
}

test("SPARK-46759: compression level support for zstandard codec") {
Seq("9", "1").foreach { level =>
withSQLConf(
(SQLConf.AVRO_COMPRESSION_CODEC.key -> "zstandard"),
(SQLConf.AVRO_ZSTANDARD_LEVEL.key -> level)) {
withTable("avro_t") {
sql(
s"""CREATE TABLE avro_t
|USING $format
|AS SELECT 1 as id""".stripMargin)
checkAnswer(spark.table("avro_t"), Seq(Row(1)))
}
}
}
}
}
18 changes: 18 additions & 0 deletions docs/sql-data-sources-avro.md
Original file line number Diff line number Diff line change
Expand Up @@ -362,6 +362,24 @@ Configuration of Avro can be done via `spark.conf.set` or by running `SET key=va
</td>
<td>2.4.0</td>
</tr>
<tr>
<td>spark.sql.avro.xz.level</td>
<td>6</td>
<td>
Compression level for the xz codec used in writing of AVRO files. Valid value must be in
the range of from 1 to 9 inclusive. The default value is 6 in the current implementation.
</td>
<td>4.0.0</td>
</tr>
<tr>
<td>spark.sql.avro.zstandard.level</td>
<td>3</td>
<td>
Compression level for the zstandard codec used in writing of AVRO files.
The default value is 3 in the current implementation.
</td>
<td>4.0.0</td>
</tr>
<tr>
<td>spark.sql.avro.datetimeRebaseModeInRead</td>
<td><code>EXCEPTION</code></td>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3625,7 +3625,23 @@ object SQLConf {
.version("2.4.0")
.intConf
.checkValues((1 to 9).toSet + Deflater.DEFAULT_COMPRESSION)
.createWithDefault(Deflater.DEFAULT_COMPRESSION)
.createOptional

val AVRO_XZ_LEVEL = buildConf("spark.sql.avro.zx.level")
.doc("Compression level for the xz codec used in writing of AVRO files. " +
"Valid value must be in the range of from 1 to 9 inclusive " +
"The default value is 6.")
.version("4.0.0")
.intConf
.checkValue(v => v > 0 && v <= 9, "The value must be in the range of from 1 to 9 inclusive.")
.createOptional

val AVRO_ZSTANDARD_LEVEL = buildConf("spark.sql.avro.zstandard.level")
.doc("Compression level for the zstandard codec used in writing of AVRO files. " +
"The default value is 3.")
.version("4.0.0")
.intConf
.createOptional

val LEGACY_SIZE_OF_NULL = buildConf("spark.sql.legacy.sizeOfNull")
.internal()
Expand Down Expand Up @@ -5421,8 +5437,6 @@ class SQLConf extends Serializable with Logging with SqlApiConf {

def avroCompressionCodec: String = getConf(SQLConf.AVRO_COMPRESSION_CODEC)

def avroDeflateLevel: Int = getConf(SQLConf.AVRO_DEFLATE_LEVEL)

def replaceDatabricksSparkAvroEnabled: Boolean =
getConf(SQLConf.LEGACY_REPLACE_DATABRICKS_SPARK_AVRO_ENABLED)

Expand Down

0 comments on commit 01bb1b1

Please sign in to comment.