Skip to content

[SPARK-47618][CORE] Use Magic Committer for all S3 buckets by default #45740

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 4 additions & 10 deletions core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -424,7 +424,7 @@ class SparkContext(config: SparkConf) extends Logging {
throw new SparkException("An application name must be set in your configuration")
}
// This should be set as early as possible.
SparkContext.fillMissingMagicCommitterConfsIfNeeded(_conf)
SparkContext.enableMagicCommitterIfNeeded(_conf)

SparkContext.supplementJavaModuleOptions(_conf)
SparkContext.supplementJavaIPv6Options(_conf)
Expand Down Expand Up @@ -3378,16 +3378,10 @@ object SparkContext extends Logging {
}

/**
* This is a helper function to complete the missing S3A magic committer configurations
* based on a single conf: `spark.hadoop.fs.s3a.bucket.<bucket>.committer.magic.enabled`
* Enable Magic Committer by default for all S3 buckets if hadoop-cloud module exists.
*/
private def fillMissingMagicCommitterConfsIfNeeded(conf: SparkConf): Unit = {
val magicCommitterConfs = conf
.getAllWithPrefix("spark.hadoop.fs.s3a.bucket.")
.filter(_._1.endsWith(".committer.magic.enabled"))
.filter(_._2.equalsIgnoreCase("true"))
if (magicCommitterConfs.nonEmpty) {
// Try to enable S3 magic committer if missing
private def enableMagicCommitterIfNeeded(conf: SparkConf): Unit = {
if (Utils.classIsLoadable("org.apache.spark.internal.io.cloud.PathOutputCommitProtocol")) {
conf.setIfMissing("spark.hadoop.fs.s3a.committer.magic.enabled", "true")
if (conf.get("spark.hadoop.fs.s3a.committer.magic.enabled").equals("true")) {
conf.setIfMissing("spark.hadoop.fs.s3a.committer.name", "magic")
Expand Down
47 changes: 0 additions & 47 deletions core/src/test/scala/org/apache/spark/SparkContextSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1270,53 +1270,6 @@ class SparkContextSuite extends SparkFunSuite with LocalSparkContext with Eventu
}
}

test("SPARK-35383: Fill missing S3A magic committer configs if needed") {
val c1 = new SparkConf().setAppName("s3a-test").setMaster("local")
sc = new SparkContext(c1)
assert(!sc.getConf.contains("spark.hadoop.fs.s3a.committer.name"))

resetSparkContext()
val c2 = c1.clone.set("spark.hadoop.fs.s3a.bucket.mybucket.committer.magic.enabled", "false")
sc = new SparkContext(c2)
assert(!sc.getConf.contains("spark.hadoop.fs.s3a.committer.name"))

resetSparkContext()
val c3 = c1.clone.set("spark.hadoop.fs.s3a.bucket.mybucket.committer.magic.enabled", "true")
sc = new SparkContext(c3)
Seq(
"spark.hadoop.fs.s3a.committer.magic.enabled" -> "true",
"spark.hadoop.fs.s3a.committer.name" -> "magic",
"spark.hadoop.mapreduce.outputcommitter.factory.scheme.s3a" ->
"org.apache.hadoop.fs.s3a.commit.S3ACommitterFactory",
"spark.sql.parquet.output.committer.class" ->
"org.apache.spark.internal.io.cloud.BindingParquetOutputCommitter",
"spark.sql.sources.commitProtocolClass" ->
"org.apache.spark.internal.io.cloud.PathOutputCommitProtocol"
).foreach { case (k, v) =>
assert(v == sc.getConf.get(k))
}

// Respect a user configuration
resetSparkContext()
val c4 = c1.clone
.set("spark.hadoop.fs.s3a.committer.magic.enabled", "false")
.set("spark.hadoop.fs.s3a.bucket.mybucket.committer.magic.enabled", "true")
sc = new SparkContext(c4)
Seq(
"spark.hadoop.fs.s3a.committer.magic.enabled" -> "false",
"spark.hadoop.fs.s3a.committer.name" -> null,
"spark.hadoop.mapreduce.outputcommitter.factory.scheme.s3a" -> null,
"spark.sql.parquet.output.committer.class" -> null,
"spark.sql.sources.commitProtocolClass" -> null
).foreach { case (k, v) =>
if (v == null) {
assert(!sc.getConf.contains(k))
} else {
assert(v == sc.getConf.get(k))
}
}
}

test("SPARK-35691: addFile/addJar/addDirectory should put CanonicalFile") {
withTempDir { dir =>
try {
Expand Down
2 changes: 2 additions & 0 deletions docs/core-migration-guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ license: |

## Upgrading from Core 3.5 to 4.0

- Since Spark 4.0, Spark uses Apache Hadoop Magic Committer for all S3 buckets by default. To restore the behavior before Spark 4.0, you can set `spark.hadoop.fs.s3a.committer.magic.enabled=false`.

- Since Spark 4.0, Spark migrated all its internal reference of servlet API from `javax` to `jakarta`

- Since Spark 4.0, Spark will roll event logs to archive them incrementally. To restore the behavior before Spark 4.0, you can set `spark.eventLog.rolling.enabled` to `false`.
Expand Down