Skip to content

Commit ef4cd39

Browse files
belieferyaooqinn
andcommitted
[SPARK-51269][SQL] Simplify AvroCompressionCodec by removing defaultCompressionLevel
### What changes were proposed in this pull request? This PR proposes to let SQLConf manage the default value for avro compression level. ### Why are the changes needed? Currently, the default value of `spark.sql.avro.deflate.level` is -1. But it managed with the enum `AvroCompressionCodec`. The document of the config item `spark.sql.avro.deflate.level` contains the description `The default value is -1 which corresponds to 6 level in the current implementation.` So the users get the knowledge that it has the default value -1. If some developer use the config item in mistake, there is no guarantee for the default value. And then causes some unpredictable behavior and make users confused. I think we should keep the default value within `SQLConf` in safety. Some other config item has the same confusion, `spark.sql.avro.xz.level` and `spark.sql.avro.zstandard.level`. ### Does this PR introduce _any_ user-facing change? 'No'. New feature. ### How was this patch tested? GA. ### Was this patch authored or co-authored using generative AI tooling? 'No'. Closes apache#50021 from beliefer/SPARK-51269. Lead-authored-by: beliefer <[email protected]> Co-authored-by: Kent Yao <[email protected]> Signed-off-by: beliefer <[email protected]>
1 parent f27464f commit ef4cd39

File tree

3 files changed

+27
-32
lines changed

3 files changed

+27
-32
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala

+6-7
Original file line numberDiff line numberDiff line change
@@ -21,14 +21,14 @@ import java.util.{Locale, Properties, TimeZone}
2121
import java.util
2222
import java.util.concurrent.TimeUnit
2323
import java.util.concurrent.atomic.AtomicReference
24-
import java.util.zip.Deflater
2524

2625
import scala.collection.immutable
2726
import scala.jdk.CollectionConverters._
2827
import scala.util.Try
2928
import scala.util.control.NonFatal
3029
import scala.util.matching.Regex
3130

31+
import org.apache.avro.file.CodecFactory
3232
import org.apache.hadoop.fs.Path
3333
import org.apache.hadoop.mapreduce.OutputCommitter
3434

@@ -4217,8 +4217,8 @@ object SQLConf {
42174217
"The default value is -1 which corresponds to 6 level in the current implementation.")
42184218
.version("2.4.0")
42194219
.intConf
4220-
.checkValues((1 to 9).toSet + Deflater.DEFAULT_COMPRESSION)
4221-
.createOptional
4220+
.checkValues((1 to 9).toSet + CodecFactory.DEFAULT_DEFLATE_LEVEL)
4221+
.createWithDefault(CodecFactory.DEFAULT_DEFLATE_LEVEL)
42224222

42234223
val AVRO_XZ_LEVEL = buildConf("spark.sql.avro.xz.level")
42244224
.doc("Compression level for the xz codec used in writing of AVRO files. " +
@@ -4227,14 +4227,13 @@ object SQLConf {
42274227
.version("4.0.0")
42284228
.intConf
42294229
.checkValue(v => v > 0 && v <= 9, "The value must be in the range of from 1 to 9 inclusive.")
4230-
.createOptional
4230+
.createWithDefault(CodecFactory.DEFAULT_XZ_LEVEL)
42314231

42324232
val AVRO_ZSTANDARD_LEVEL = buildConf("spark.sql.avro.zstandard.level")
4233-
.doc("Compression level for the zstandard codec used in writing of AVRO files. " +
4234-
"The default value is 3.")
4233+
.doc("Compression level for the zstandard codec used in writing of AVRO files. ")
42354234
.version("4.0.0")
42364235
.intConf
4237-
.createOptional
4236+
.createWithDefault(CodecFactory.DEFAULT_ZSTANDARD_LEVEL)
42384237

42394238
val AVRO_ZSTANDARD_BUFFER_POOL_ENABLED = buildConf("spark.sql.avro.zstandard.bufferPool.enabled")
42404239
.doc("If true, enable buffer pool of ZSTD JNI library when writing of AVRO files")

sql/core/src/main/java/org/apache/spark/sql/avro/AvroCompressionCodec.java

+8-14
Original file line numberDiff line numberDiff line change
@@ -22,29 +22,27 @@
2222
import java.util.Map;
2323
import java.util.stream.Collectors;
2424

25-
import org.apache.avro.file.*;
25+
import org.apache.avro.file.DataFileConstants;
2626

2727
/**
2828
* A mapper class from Spark supported avro compression codecs to avro compression codecs.
2929
*/
3030
public enum AvroCompressionCodec {
31-
UNCOMPRESSED(DataFileConstants.NULL_CODEC, false, -1),
32-
DEFLATE(DataFileConstants.DEFLATE_CODEC, true, CodecFactory.DEFAULT_DEFLATE_LEVEL),
33-
SNAPPY(DataFileConstants.SNAPPY_CODEC, false, -1),
34-
BZIP2(DataFileConstants.BZIP2_CODEC, false, -1),
35-
XZ(DataFileConstants.XZ_CODEC, true, CodecFactory.DEFAULT_XZ_LEVEL),
36-
ZSTANDARD(DataFileConstants.ZSTANDARD_CODEC, true, CodecFactory.DEFAULT_ZSTANDARD_LEVEL);
31+
UNCOMPRESSED(DataFileConstants.NULL_CODEC, false),
32+
DEFLATE(DataFileConstants.DEFLATE_CODEC, true),
33+
SNAPPY(DataFileConstants.SNAPPY_CODEC, false),
34+
BZIP2(DataFileConstants.BZIP2_CODEC, false),
35+
XZ(DataFileConstants.XZ_CODEC, true),
36+
ZSTANDARD(DataFileConstants.ZSTANDARD_CODEC, true);
3737

3838
private final String codecName;
3939
private final boolean supportCompressionLevel;
40-
private final int defaultCompressionLevel;
4140

4241
AvroCompressionCodec(
4342
String codecName,
44-
boolean supportCompressionLevel, int defaultCompressionLevel) {
43+
boolean supportCompressionLevel) {
4544
this.codecName = codecName;
4645
this.supportCompressionLevel = supportCompressionLevel;
47-
this.defaultCompressionLevel = defaultCompressionLevel;
4846
}
4947

5048
public String getCodecName() {
@@ -55,10 +53,6 @@ public boolean getSupportCompressionLevel() {
5553
return this.supportCompressionLevel;
5654
}
5755

58-
public int getDefaultCompressionLevel() {
59-
return this.defaultCompressionLevel;
60-
}
61-
6256
private static final Map<String, String> codecNameMap =
6357
Arrays.stream(AvroCompressionCodec.values()).collect(
6458
Collectors.toMap(codec -> codec.name(), codec -> codec.name().toLowerCase(Locale.ROOT)));

sql/core/src/main/scala/org/apache/spark/sql/avro/AvroUtils.scala

+13-11
Original file line numberDiff line numberDiff line change
@@ -121,18 +121,20 @@ private[sql] object AvroUtils extends Logging {
121121
jobConf.setBoolean("mapreduce.output.fileoutputformat.compress", true)
122122
jobConf.set(AvroJob.CONF_OUTPUT_CODEC, compressed.getCodecName)
123123
if (compressed.getSupportCompressionLevel) {
124-
val level = sqlConf.getConfString(s"spark.sql.avro.$codecName.level",
125-
compressed.getDefaultCompressionLevel.toString)
126-
logInfo(log"Compressing Avro output using the ${MDC(CODEC_NAME, codecName)} codec " +
127-
log"at level ${MDC(CODEC_LEVEL, level)}")
128-
val s = if (compressed == ZSTANDARD) {
129-
val bufferPoolEnabled = sqlConf.getConf(SQLConf.AVRO_ZSTANDARD_BUFFER_POOL_ENABLED)
130-
jobConf.setBoolean(AvroOutputFormat.ZSTD_BUFFERPOOL_KEY, bufferPoolEnabled)
131-
"zstd"
132-
} else {
133-
codecName
124+
val levelAndCodecName = compressed match {
125+
case DEFLATE => Some(sqlConf.getConf(SQLConf.AVRO_DEFLATE_LEVEL), codecName)
126+
case XZ => Some(sqlConf.getConf(SQLConf.AVRO_XZ_LEVEL), codecName)
127+
case ZSTANDARD =>
128+
jobConf.setBoolean(AvroOutputFormat.ZSTD_BUFFERPOOL_KEY,
129+
sqlConf.getConf(SQLConf.AVRO_ZSTANDARD_BUFFER_POOL_ENABLED))
130+
Some(sqlConf.getConf(SQLConf.AVRO_ZSTANDARD_LEVEL), "zstd")
131+
case _ => None
132+
}
133+
levelAndCodecName.foreach { case (level, mapredCodecName) =>
134+
logInfo(log"Compressing Avro output using the ${MDC(CODEC_NAME, codecName)} " +
135+
log"codec at level ${MDC(CODEC_LEVEL, level)}")
136+
jobConf.setInt(s"avro.mapred.$mapredCodecName.level", level.toInt)
134137
}
135-
jobConf.setInt(s"avro.mapred.$s.level", level.toInt)
136138
} else {
137139
logInfo(log"Compressing Avro output using the ${MDC(CODEC_NAME, codecName)} codec")
138140
}

0 commit comments

Comments
 (0)