Skip to content

Commit

Permalink
[SPARK-34075][SQL][CORE] Hidden directories are being listed for part…
Browse files Browse the repository at this point in the history
…ition inference

### What changes were proposed in this pull request?

Fix a regression from apache#29959.

In Spark, the following file paths are considered as hidden paths and they are ignored on file reads:
1. starts with "_" and doesn't contain "="
2. starts with "."

However, after the refactoring PR apache#29959, the hidden paths are not filtered out on partition inference: https://github.com/apache/spark/pull/29959/files#r556432426

This PR is to fix the bug. To archive the goal, the method `InMemoryFileIndex.shouldFilterOut` is refactored as `HadoopFSUtils.shouldFilterOutPathName`

### Why are the changes needed?

Bugfix

### Does this PR introduce _any_ user-facing change?

Yes, it fixes a bug for reading file paths with partitions.

### How was this patch tested?

Unit test

Closes apache#31169 from gengliangwang/fileListingBug.

Authored-by: Gengliang Wang <[email protected]>
Signed-off-by: HyukjinKwon <[email protected]>
  • Loading branch information
gengliangwang authored and HyukjinKwon committed Jan 14, 2021
1 parent 9e93fdb commit 467d758
Show file tree
Hide file tree
Showing 5 changed files with 69 additions and 28 deletions.
19 changes: 18 additions & 1 deletion core/src/main/scala/org/apache/spark/util/HadoopFSUtils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -249,8 +249,11 @@ private[spark] object HadoopFSUtils extends Logging {
Array.empty[FileStatus]
}

val filteredStatuses =
statuses.filterNot(status => shouldFilterOutPathName(status.getPath.getName))

val allLeafStatuses = {
val (dirs, topLevelFiles) = statuses.partition(_.isDirectory)
val (dirs, topLevelFiles) = filteredStatuses.partition(_.isDirectory)
val nestedFiles: Seq[FileStatus] = contextOpt match {
case Some(context) if dirs.size > parallelismThreshold =>
parallelListLeafFilesInternal(
Expand Down Expand Up @@ -350,4 +353,18 @@ private[spark] object HadoopFSUtils extends Logging {
modificationTime: Long,
accessTime: Long,
blockLocations: Array[SerializableBlockLocation])

/** Checks if we should filter out this path name. */
def shouldFilterOutPathName(pathName: String): Boolean = {
// We filter follow paths:
// 1. everything that starts with _ and ., except _common_metadata and _metadata
// because Parquet needs to find those metadata files from leaf files returned by this method.
// We should refactor this logic to not mix metadata files with data files.
// 2. everything that ends with `._COPYING_`, because this is a intermediate state of file. we
// should skip this file in case of double reading.
val exclude = (pathName.startsWith("_") && !pathName.contains("=")) ||
pathName.startsWith(".") || pathName.endsWith("._COPYING_")
val include = pathName.startsWith("_common_metadata") || pathName.startsWith("_metadata")
exclude && !include
}
}
33 changes: 33 additions & 0 deletions core/src/test/scala/org/apache/spark/util/HadoopFSUtilsSuite.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.util

import org.apache.spark.SparkFunSuite

class HadoopFSUtilsSuite extends SparkFunSuite {
test("HadoopFSUtils - file filtering") {
assert(!HadoopFSUtils.shouldFilterOutPathName("abcd"))
assert(HadoopFSUtils.shouldFilterOutPathName(".ab"))
assert(HadoopFSUtils.shouldFilterOutPathName("_cd"))
assert(!HadoopFSUtils.shouldFilterOutPathName("_metadata"))
assert(!HadoopFSUtils.shouldFilterOutPathName("_common_metadata"))
assert(HadoopFSUtils.shouldFilterOutPathName("_ab_metadata"))
assert(HadoopFSUtils.shouldFilterOutPathName("_cd_common_metadata"))
assert(HadoopFSUtils.shouldFilterOutPathName("a._COPYING_"))
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ import org.apache.spark.sql.sources._
import org.apache.spark.sql.streaming.OutputMode
import org.apache.spark.sql.types.{CalendarIntervalType, StructField, StructType}
import org.apache.spark.sql.util.SchemaUtils
import org.apache.spark.util.{ThreadUtils, Utils}
import org.apache.spark.util.{HadoopFSUtils, ThreadUtils, Utils}

/**
* The main class responsible for representing a pluggable Data Source in Spark SQL. In addition to
Expand Down Expand Up @@ -811,7 +811,7 @@ object DataSource extends Logging {
val allPaths = globbedPaths ++ nonGlobPaths
if (checkFilesExist) {
val (filteredOut, filteredIn) = allPaths.partition { path =>
InMemoryFileIndex.shouldFilterOut(path.getName)
HadoopFSUtils.shouldFilterOutPathName(path.getName)
}
if (filteredIn.isEmpty) {
logWarning(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -158,23 +158,10 @@ object InMemoryFileIndex extends Logging {
parallelismMax = sparkSession.sessionState.conf.parallelPartitionDiscoveryParallelism)
}

/** Checks if we should filter out this path name. */
def shouldFilterOut(pathName: String): Boolean = {
// We filter follow paths:
// 1. everything that starts with _ and ., except _common_metadata and _metadata
// because Parquet needs to find those metadata files from leaf files returned by this method.
// We should refactor this logic to not mix metadata files with data files.
// 2. everything that ends with `._COPYING_`, because this is a intermediate state of file. we
// should skip this file in case of double reading.
val exclude = (pathName.startsWith("_") && !pathName.contains("=")) ||
pathName.startsWith(".") || pathName.endsWith("._COPYING_")
val include = pathName.startsWith("_common_metadata") || pathName.startsWith("_metadata")
exclude && !include
}
}

private class PathFilterWrapper(val filter: PathFilter) extends PathFilter with Serializable {
override def accept(path: Path): Boolean = {
(filter == null || filter.accept(path)) && !InMemoryFileIndex.shouldFilterOut(path.getName)
(filter == null || filter.accept(path)) && !HadoopFSUtils.shouldFilterOutPathName(path.getName)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -297,17 +297,6 @@ class FileIndexSuite extends SharedSparkSession {
}
}

test("InMemoryFileIndex - file filtering") {
assert(!InMemoryFileIndex.shouldFilterOut("abcd"))
assert(InMemoryFileIndex.shouldFilterOut(".ab"))
assert(InMemoryFileIndex.shouldFilterOut("_cd"))
assert(!InMemoryFileIndex.shouldFilterOut("_metadata"))
assert(!InMemoryFileIndex.shouldFilterOut("_common_metadata"))
assert(InMemoryFileIndex.shouldFilterOut("_ab_metadata"))
assert(InMemoryFileIndex.shouldFilterOut("_cd_common_metadata"))
assert(InMemoryFileIndex.shouldFilterOut("a._COPYING_"))
}

test("SPARK-17613 - PartitioningAwareFileIndex: base path w/o '/' at end") {
class MockCatalog(
override val rootPaths: Seq[Path])
Expand Down Expand Up @@ -416,6 +405,21 @@ class FileIndexSuite extends SharedSparkSession {
fileStatusCache.putLeafFiles(new Path("/tmp", "abc"), files.toArray)
}

test("SPARK-34075: InMemoryFileIndex filters out hidden file on partition inference") {
withTempPath { path =>
spark
.range(2)
.select(col("id").as("p"), col("id"))
.write
.partitionBy("p")
.parquet(path.getAbsolutePath)
val targetPath = new File(path, "p=1")
val hiddenPath = new File(path, "_hidden_path")
targetPath.renameTo(hiddenPath)
assert(spark.read.parquet(path.getAbsolutePath).count() == 1L)
}
}

test("SPARK-20367 - properly unescape column names in inferPartitioning") {
withTempPath { path =>
val colToUnescape = "Column/#%'?"
Expand Down

0 comments on commit 467d758

Please sign in to comment.