From 467d7589737d2430d09f1ffbd33bf801d179f990 Mon Sep 17 00:00:00 2001 From: Gengliang Wang Date: Thu, 14 Jan 2021 09:39:38 +0900 Subject: [PATCH] [SPARK-34075][SQL][CORE] Hidden directories are being listed for partition inference ### What changes were proposed in this pull request? Fix a regression from https://github.com/apache/spark/pull/29959. In Spark, the following file paths are considered as hidden paths and they are ignored on file reads: 1. starts with "_" and doesn't contain "=" 2. starts with "." However, after the refactoring PR https://github.com/apache/spark/pull/29959, the hidden paths are not filtered out on partition inference: https://github.com/apache/spark/pull/29959/files#r556432426 This PR is to fix the bug. To archive the goal, the method `InMemoryFileIndex.shouldFilterOut` is refactored as `HadoopFSUtils.shouldFilterOutPathName` ### Why are the changes needed? Bugfix ### Does this PR introduce _any_ user-facing change? Yes, it fixes a bug for reading file paths with partitions. ### How was this patch tested? Unit test Closes #31169 from gengliangwang/fileListingBug. Authored-by: Gengliang Wang Signed-off-by: HyukjinKwon --- .../org/apache/spark/util/HadoopFSUtils.scala | 19 ++++++++++- .../spark/util/HadoopFSUtilsSuite.scala | 33 +++++++++++++++++++ .../execution/datasources/DataSource.scala | 4 +-- .../datasources/InMemoryFileIndex.scala | 15 +-------- .../datasources/FileIndexSuite.scala | 26 ++++++++------- 5 files changed, 69 insertions(+), 28 deletions(-) create mode 100644 core/src/test/scala/org/apache/spark/util/HadoopFSUtilsSuite.scala diff --git a/core/src/main/scala/org/apache/spark/util/HadoopFSUtils.scala b/core/src/main/scala/org/apache/spark/util/HadoopFSUtils.scala index 4af48d5b9125c..60a73adc8582e 100644 --- a/core/src/main/scala/org/apache/spark/util/HadoopFSUtils.scala +++ b/core/src/main/scala/org/apache/spark/util/HadoopFSUtils.scala @@ -249,8 +249,11 @@ private[spark] object HadoopFSUtils extends Logging { Array.empty[FileStatus] } + val filteredStatuses = + statuses.filterNot(status => shouldFilterOutPathName(status.getPath.getName)) + val allLeafStatuses = { - val (dirs, topLevelFiles) = statuses.partition(_.isDirectory) + val (dirs, topLevelFiles) = filteredStatuses.partition(_.isDirectory) val nestedFiles: Seq[FileStatus] = contextOpt match { case Some(context) if dirs.size > parallelismThreshold => parallelListLeafFilesInternal( @@ -350,4 +353,18 @@ private[spark] object HadoopFSUtils extends Logging { modificationTime: Long, accessTime: Long, blockLocations: Array[SerializableBlockLocation]) + + /** Checks if we should filter out this path name. */ + def shouldFilterOutPathName(pathName: String): Boolean = { + // We filter follow paths: + // 1. everything that starts with _ and ., except _common_metadata and _metadata + // because Parquet needs to find those metadata files from leaf files returned by this method. + // We should refactor this logic to not mix metadata files with data files. + // 2. everything that ends with `._COPYING_`, because this is a intermediate state of file. we + // should skip this file in case of double reading. + val exclude = (pathName.startsWith("_") && !pathName.contains("=")) || + pathName.startsWith(".") || pathName.endsWith("._COPYING_") + val include = pathName.startsWith("_common_metadata") || pathName.startsWith("_metadata") + exclude && !include + } } diff --git a/core/src/test/scala/org/apache/spark/util/HadoopFSUtilsSuite.scala b/core/src/test/scala/org/apache/spark/util/HadoopFSUtilsSuite.scala new file mode 100644 index 0000000000000..ba91eabc1cab1 --- /dev/null +++ b/core/src/test/scala/org/apache/spark/util/HadoopFSUtilsSuite.scala @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.util + +import org.apache.spark.SparkFunSuite + +class HadoopFSUtilsSuite extends SparkFunSuite { + test("HadoopFSUtils - file filtering") { + assert(!HadoopFSUtils.shouldFilterOutPathName("abcd")) + assert(HadoopFSUtils.shouldFilterOutPathName(".ab")) + assert(HadoopFSUtils.shouldFilterOutPathName("_cd")) + assert(!HadoopFSUtils.shouldFilterOutPathName("_metadata")) + assert(!HadoopFSUtils.shouldFilterOutPathName("_common_metadata")) + assert(HadoopFSUtils.shouldFilterOutPathName("_ab_metadata")) + assert(HadoopFSUtils.shouldFilterOutPathName("_cd_common_metadata")) + assert(HadoopFSUtils.shouldFilterOutPathName("a._COPYING_")) + } +} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala index 53fa32480bf00..ea0bc4fcd451b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala @@ -50,7 +50,7 @@ import org.apache.spark.sql.sources._ import org.apache.spark.sql.streaming.OutputMode import org.apache.spark.sql.types.{CalendarIntervalType, StructField, StructType} import org.apache.spark.sql.util.SchemaUtils -import org.apache.spark.util.{ThreadUtils, Utils} +import org.apache.spark.util.{HadoopFSUtils, ThreadUtils, Utils} /** * The main class responsible for representing a pluggable Data Source in Spark SQL. In addition to @@ -811,7 +811,7 @@ object DataSource extends Logging { val allPaths = globbedPaths ++ nonGlobPaths if (checkFilesExist) { val (filteredOut, filteredIn) = allPaths.partition { path => - InMemoryFileIndex.shouldFilterOut(path.getName) + HadoopFSUtils.shouldFilterOutPathName(path.getName) } if (filteredIn.isEmpty) { logWarning( diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InMemoryFileIndex.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InMemoryFileIndex.scala index 21275951b5603..6c3deee2c3173 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InMemoryFileIndex.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InMemoryFileIndex.scala @@ -158,23 +158,10 @@ object InMemoryFileIndex extends Logging { parallelismMax = sparkSession.sessionState.conf.parallelPartitionDiscoveryParallelism) } - /** Checks if we should filter out this path name. */ - def shouldFilterOut(pathName: String): Boolean = { - // We filter follow paths: - // 1. everything that starts with _ and ., except _common_metadata and _metadata - // because Parquet needs to find those metadata files from leaf files returned by this method. - // We should refactor this logic to not mix metadata files with data files. - // 2. everything that ends with `._COPYING_`, because this is a intermediate state of file. we - // should skip this file in case of double reading. - val exclude = (pathName.startsWith("_") && !pathName.contains("=")) || - pathName.startsWith(".") || pathName.endsWith("._COPYING_") - val include = pathName.startsWith("_common_metadata") || pathName.startsWith("_metadata") - exclude && !include - } } private class PathFilterWrapper(val filter: PathFilter) extends PathFilter with Serializable { override def accept(path: Path): Boolean = { - (filter == null || filter.accept(path)) && !InMemoryFileIndex.shouldFilterOut(path.getName) + (filter == null || filter.accept(path)) && !HadoopFSUtils.shouldFilterOutPathName(path.getName) } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileIndexSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileIndexSuite.scala index 02be8c9221704..fcaf8df4f9a02 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileIndexSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileIndexSuite.scala @@ -297,17 +297,6 @@ class FileIndexSuite extends SharedSparkSession { } } - test("InMemoryFileIndex - file filtering") { - assert(!InMemoryFileIndex.shouldFilterOut("abcd")) - assert(InMemoryFileIndex.shouldFilterOut(".ab")) - assert(InMemoryFileIndex.shouldFilterOut("_cd")) - assert(!InMemoryFileIndex.shouldFilterOut("_metadata")) - assert(!InMemoryFileIndex.shouldFilterOut("_common_metadata")) - assert(InMemoryFileIndex.shouldFilterOut("_ab_metadata")) - assert(InMemoryFileIndex.shouldFilterOut("_cd_common_metadata")) - assert(InMemoryFileIndex.shouldFilterOut("a._COPYING_")) - } - test("SPARK-17613 - PartitioningAwareFileIndex: base path w/o '/' at end") { class MockCatalog( override val rootPaths: Seq[Path]) @@ -416,6 +405,21 @@ class FileIndexSuite extends SharedSparkSession { fileStatusCache.putLeafFiles(new Path("/tmp", "abc"), files.toArray) } + test("SPARK-34075: InMemoryFileIndex filters out hidden file on partition inference") { + withTempPath { path => + spark + .range(2) + .select(col("id").as("p"), col("id")) + .write + .partitionBy("p") + .parquet(path.getAbsolutePath) + val targetPath = new File(path, "p=1") + val hiddenPath = new File(path, "_hidden_path") + targetPath.renameTo(hiddenPath) + assert(spark.read.parquet(path.getAbsolutePath).count() == 1L) + } + } + test("SPARK-20367 - properly unescape column names in inferPartitioning") { withTempPath { path => val colToUnescape = "Column/#%'?"