diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index 30d772bd62d77..f381912f224e2 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -424,7 +424,7 @@ class SparkContext(config: SparkConf) extends Logging { throw new SparkException("An application name must be set in your configuration") } // This should be set as early as possible. - SparkContext.fillMissingMagicCommitterConfsIfNeeded(_conf) + SparkContext.enableMagicCommitterIfNeeded(_conf) SparkContext.supplementJavaModuleOptions(_conf) SparkContext.supplementJavaIPv6Options(_conf) @@ -3378,16 +3378,10 @@ object SparkContext extends Logging { } /** - * This is a helper function to complete the missing S3A magic committer configurations - * based on a single conf: `spark.hadoop.fs.s3a.bucket..committer.magic.enabled` + * Enable Magic Committer by default for all S3 buckets if hadoop-cloud module exists. */ - private def fillMissingMagicCommitterConfsIfNeeded(conf: SparkConf): Unit = { - val magicCommitterConfs = conf - .getAllWithPrefix("spark.hadoop.fs.s3a.bucket.") - .filter(_._1.endsWith(".committer.magic.enabled")) - .filter(_._2.equalsIgnoreCase("true")) - if (magicCommitterConfs.nonEmpty) { - // Try to enable S3 magic committer if missing + private def enableMagicCommitterIfNeeded(conf: SparkConf): Unit = { + if (Utils.classIsLoadable("org.apache.spark.internal.io.cloud.PathOutputCommitProtocol")) { conf.setIfMissing("spark.hadoop.fs.s3a.committer.magic.enabled", "true") if (conf.get("spark.hadoop.fs.s3a.committer.magic.enabled").equals("true")) { conf.setIfMissing("spark.hadoop.fs.s3a.committer.name", "magic") diff --git a/core/src/test/scala/org/apache/spark/SparkContextSuite.scala b/core/src/test/scala/org/apache/spark/SparkContextSuite.scala index dd42549e46d93..164136cc9deb1 100644 --- a/core/src/test/scala/org/apache/spark/SparkContextSuite.scala +++ b/core/src/test/scala/org/apache/spark/SparkContextSuite.scala @@ -1270,53 +1270,6 @@ class SparkContextSuite extends SparkFunSuite with LocalSparkContext with Eventu } } - test("SPARK-35383: Fill missing S3A magic committer configs if needed") { - val c1 = new SparkConf().setAppName("s3a-test").setMaster("local") - sc = new SparkContext(c1) - assert(!sc.getConf.contains("spark.hadoop.fs.s3a.committer.name")) - - resetSparkContext() - val c2 = c1.clone.set("spark.hadoop.fs.s3a.bucket.mybucket.committer.magic.enabled", "false") - sc = new SparkContext(c2) - assert(!sc.getConf.contains("spark.hadoop.fs.s3a.committer.name")) - - resetSparkContext() - val c3 = c1.clone.set("spark.hadoop.fs.s3a.bucket.mybucket.committer.magic.enabled", "true") - sc = new SparkContext(c3) - Seq( - "spark.hadoop.fs.s3a.committer.magic.enabled" -> "true", - "spark.hadoop.fs.s3a.committer.name" -> "magic", - "spark.hadoop.mapreduce.outputcommitter.factory.scheme.s3a" -> - "org.apache.hadoop.fs.s3a.commit.S3ACommitterFactory", - "spark.sql.parquet.output.committer.class" -> - "org.apache.spark.internal.io.cloud.BindingParquetOutputCommitter", - "spark.sql.sources.commitProtocolClass" -> - "org.apache.spark.internal.io.cloud.PathOutputCommitProtocol" - ).foreach { case (k, v) => - assert(v == sc.getConf.get(k)) - } - - // Respect a user configuration - resetSparkContext() - val c4 = c1.clone - .set("spark.hadoop.fs.s3a.committer.magic.enabled", "false") - .set("spark.hadoop.fs.s3a.bucket.mybucket.committer.magic.enabled", "true") - sc = new SparkContext(c4) - Seq( - "spark.hadoop.fs.s3a.committer.magic.enabled" -> "false", - "spark.hadoop.fs.s3a.committer.name" -> null, - "spark.hadoop.mapreduce.outputcommitter.factory.scheme.s3a" -> null, - "spark.sql.parquet.output.committer.class" -> null, - "spark.sql.sources.commitProtocolClass" -> null - ).foreach { case (k, v) => - if (v == null) { - assert(!sc.getConf.contains(k)) - } else { - assert(v == sc.getConf.get(k)) - } - } - } - test("SPARK-35691: addFile/addJar/addDirectory should put CanonicalFile") { withTempDir { dir => try { diff --git a/docs/core-migration-guide.md b/docs/core-migration-guide.md index 7f92fa14f170a..e7b5abed149f6 100644 --- a/docs/core-migration-guide.md +++ b/docs/core-migration-guide.md @@ -24,6 +24,8 @@ license: | ## Upgrading from Core 3.5 to 4.0 +- Since Spark 4.0, Spark uses Apache Hadoop Magic Committer for all S3 buckets by default. To restore the behavior before Spark 4.0, you can set `spark.hadoop.fs.s3a.committer.magic.enabled=false`. + - Since Spark 4.0, Spark migrated all its internal reference of servlet API from `javax` to `jakarta` - Since Spark 4.0, Spark will roll event logs to archive them incrementally. To restore the behavior before Spark 4.0, you can set `spark.eventLog.rolling.enabled` to `false`.