From ab31f8fb646dcaa0f04c1d82d90851cef164781c Mon Sep 17 00:00:00 2001 From: Steve Loughran Date: Tue, 26 Nov 2024 15:40:25 +0000 Subject: [PATCH 1/2] GH-3078: Use Hadoop FileSystem.openFile() to open files * Open files with FileSystem.openFile(), passing in file status * And read policy of "parquet, vector, random, adaptive" --- .../parquet/hadoop/util/HadoopInputFile.java | 54 ++++++++++++++++++- .../hadoop/util/wrapped/io/FutureIO.java | 23 ++++++++ 2 files changed, 76 insertions(+), 1 deletion(-) diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/HadoopInputFile.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/HadoopInputFile.java index 48c79fa5eb..8d1199d192 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/HadoopInputFile.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/HadoopInputFile.java @@ -19,8 +19,13 @@ package org.apache.parquet.hadoop.util; +import static org.apache.parquet.hadoop.util.wrapped.io.FutureIO.awaitFuture; + import java.io.IOException; +import java.io.InterruptedIOException; +import java.util.concurrent.CompletableFuture; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -29,6 +34,24 @@ public class HadoopInputFile implements InputFile { + /** + * openFile() option name for setting the read policy: {@value}. + */ + private static final String OPENFILE_READ_POLICY_KEY = "fs.option.openfile.read.policy"; + + /** + * Read policy when opening parquet files: {@value}. + *

Policy-aware stores pick the first policy they recognize in the list. + * everything recognizes "random"; + * "vector" came in with 3.4.0, while "parquet" came with Hadoop 3.4.1 + * parquet means "this is a Parquet file, so be clever about footers, prefetch, + * and expect vector and/or random IO". + *

In Hadoop 3.4.1, "parquet" and "vector" are both mapped to "random" for the + * S3A connector, but as the ABFS and GCS connectors do footer caching, they + * may use it as a hint to say "fetch the footer and keep it in memory" + */ + private static final String PARQUET_READ_POLICY = "parquet, vector, random, adaptive"; + private final FileSystem fs; private final FileStatus stat; private final Configuration conf; @@ -70,9 +93,38 @@ public long getLength() { return stat.getLen(); } + /** + * Open the file. + *

Uses {@code FileSystem.openFile()} so that + * the existing FileStatus can be passed down: saves a HEAD request on cloud storage. + * and ignored everywhere else. + * + * @return the input stream. + * + * @throws InterruptedIOException future was interrupted + * @throws IOException if something went wrong + * @throws RuntimeException any nested RTE thrown + */ @Override public SeekableInputStream newStream() throws IOException { - return HadoopStreams.wrap(fs.open(stat.getPath())); + FSDataInputStream stream; + try { + // this method is async so that implementations may do async HEAD head + // requests. Not done in S3A/ABFS when a file status passed down (as is done here) + final CompletableFuture future = fs.openFile(stat.getPath()) + .withFileStatus(stat) + .opt(OPENFILE_READ_POLICY_KEY, PARQUET_READ_POLICY) + .build(); + stream = awaitFuture(future); + } catch (RuntimeException e) { + // S3A < 3.3.5 would raise illegal path exception if the openFile path didn't + // equal the path in the FileStatus; Hive virtual FS could create this condition. + // As the path to open is derived from stat.getPath(), this condition seems + // near-impossible to create -but is handled here for due diligence. + stream = fs.open(stat.getPath()); + } + + return HadoopStreams.wrap(stream); } @Override diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/wrapped/io/FutureIO.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/wrapped/io/FutureIO.java index 47f4e959b4..23533c75bf 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/wrapped/io/FutureIO.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/wrapped/io/FutureIO.java @@ -70,6 +70,29 @@ public static T awaitFuture(final Future future, final long timeout, fina } } + /** + * Given a future, evaluate it. + *

+ * Any exception generated in the future is + * extracted and rethrown. + *

+ * @param future future to evaluate + * @param type of the result. + * @return the result, if all went well. + * @throws InterruptedIOException future was interrupted + * @throws IOException if something went wrong + * @throws RuntimeException any nested RTE thrown + */ + public static T awaitFuture(final Future future) + throws InterruptedIOException, IOException, RuntimeException { + try { + return future.get(); + } catch (InterruptedException e) { + throw (InterruptedIOException) new InterruptedIOException(e.toString()).initCause(e); + } catch (ExecutionException e) { + throw unwrapInnerException(e); + } + } /** * From the inner cause of an execution exception, extract the inner cause * to an IOException, raising Errors immediately. From 79a4c04eeaa8e0a4344bc66d8f37abd177383ba9 Mon Sep 17 00:00:00 2001 From: Steve Loughran Date: Mon, 2 Dec 2024 19:57:29 +0000 Subject: [PATCH 2/2] GH-3078. openFile() invocation/failure tests. The builder API is a real PITA to mock, even though it works great in application code. Uses the o.a.h.fs.impl implementation, and I've created HADOOP-19355. Mark FutureDataInputStreamBuilderImpl as VisibleForTesting to make it clear that this is allowed. --- .../parquet/hadoop/util/HadoopInputFile.java | 11 +- .../hadoop/fs/FileSystemTestBinder.java | 77 +++++ .../hadoop/util/TestHadoopOpenFile.java | 283 ++++++++++++++++++ 3 files changed, 369 insertions(+), 2 deletions(-) create mode 100644 parquet-hadoop/src/test/java/org/apache/hadoop/fs/FileSystemTestBinder.java create mode 100644 parquet-hadoop/src/test/java/org/apache/parquet/hadoop/util/TestHadoopOpenFile.java diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/HadoopInputFile.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/HadoopInputFile.java index 8d1199d192..fa6a7bdf42 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/HadoopInputFile.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/HadoopInputFile.java @@ -110,7 +110,7 @@ public SeekableInputStream newStream() throws IOException { FSDataInputStream stream; try { // this method is async so that implementations may do async HEAD head - // requests. Not done in S3A/ABFS when a file status passed down (as is done here) + // requests, such as S3A/ABFS when a file status is passed down. final CompletableFuture future = fs.openFile(stat.getPath()) .withFileStatus(stat) .opt(OPENFILE_READ_POLICY_KEY, PARQUET_READ_POLICY) @@ -121,7 +121,14 @@ public SeekableInputStream newStream() throws IOException { // equal the path in the FileStatus; Hive virtual FS could create this condition. // As the path to open is derived from stat.getPath(), this condition seems // near-impossible to create -but is handled here for due diligence. - stream = fs.open(stat.getPath()); + try { + stream = fs.open(stat.getPath()); + } catch (IOException | RuntimeException ex) { + // failure on this attempt attaches the failure of the openFile() call + // so the stack trace is preserved. + ex.addSuppressed(e); + throw ex; + } } return HadoopStreams.wrap(stream); diff --git a/parquet-hadoop/src/test/java/org/apache/hadoop/fs/FileSystemTestBinder.java b/parquet-hadoop/src/test/java/org/apache/hadoop/fs/FileSystemTestBinder.java new file mode 100644 index 0000000000..2d087aa2d6 --- /dev/null +++ b/parquet-hadoop/src/test/java/org/apache/hadoop/fs/FileSystemTestBinder.java @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs; + +import java.io.IOException; +import java.io.UncheckedIOException; +import java.net.URI; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.security.UserGroupInformation; + +/** + * Based on {@code org.apache.hadoop.fs.FileSystemTestHelper}, + * This class exports the package private {@code FileSystem} + * methods which can be used to push FS instances into the + * map of URI -> fs instance. + *

+ * This makes it easy to add instances of Mocked filesystems + * to the map, which will then be picked up by any + * code retrieving an FS instance for that URI + *

+ * The API is stable and used elsewhere. What is important + * is to remove FS instances after each test case. + * {@link #cleanFilesystemCache()} cleans the entire cache + * and should be used in teardown methods. + */ +public final class FileSystemTestBinder { + + /** + * Empty configuration. + * Part of the FileSystem method signatures, but not used behind them. + */ + public static final Configuration CONF = new Configuration(false); + + /** + * Inject a filesystem into the cache. + * @param uri filesystem URI + * @param fs filesystem to inject + * @throws UncheckedIOException Hadoop UGI problems. + */ + public static void addFileSystemForTesting(URI uri, FileSystem fs) { + try { + FileSystem.addFileSystemForTesting(uri, CONF, fs); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + } + + /** + * Clean up the filesystem cache. + * This swallows any IOE nominally raised in the process, to ensure + * this can safely invoked in teardowns. + */ + public static void cleanFilesystemCache() { + try { + FileSystem.closeAllForUGI(UserGroupInformation.getCurrentUser()); + } catch (IOException ignored) { + // Ignore the exception as if getCurrentUser() fails then it'll + // have been impossible to add mock instances to a per-user cache. + } + } +} diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/util/TestHadoopOpenFile.java b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/util/TestHadoopOpenFile.java new file mode 100644 index 0000000000..bf29541cd5 --- /dev/null +++ b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/util/TestHadoopOpenFile.java @@ -0,0 +1,283 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.parquet.hadoop.util; + +import static org.apache.hadoop.fs.FileSystemTestBinder.addFileSystemForTesting; +import static org.mockito.Mockito.doNothing; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; + +import java.io.FileNotFoundException; +import java.io.IOException; +import java.io.UncheckedIOException; +import java.net.URI; +import java.net.URISyntaxException; +import java.util.concurrent.CompletableFuture; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.FileSystemTestBinder; +import org.apache.hadoop.fs.FutureDataInputStreamBuilder; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.impl.FutureDataInputStreamBuilderImpl; +import org.apache.parquet.io.SeekableInputStream; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.mockito.Mockito; + +/** + * Test suite to validate behavior of opening files through + * use of FileSystem.openFile(), especially fallback + * to the original FileSystem.open() method when + * openFile() raises an IllegalArgumentException or other RTE. + *

+ * These tests use classes in the package org.apache.hadoop.fs.impl}. + * Although an implementation package, it is tagged as `LimitedPrivate("Filesystems")`; + * it is already used outside the hadoop codebase (e.g. google gcs). + */ +public class TestHadoopOpenFile { + + private static final int FIRST = MockHadoopInputStream.TEST_ARRAY[0]; + private URI fsUri; + private FileStatus status; + private Path path; + private Configuration conf; + private IOException fileNotFound; + private IllegalArgumentException illegal; + + @Before + public void setUp() throws URISyntaxException { + // the schema "mock:" is used to not only be confident our injected + // instance is picked up, but to ensure that there will be no + // contamination of any real schema, such as file: + fsUri = new URI("mock://path/"); + path = new Path("mock://path/file"); + + conf = new Configuration(false); + fileNotFound = new FileNotFoundException("file not found"); + illegal = new IllegalArgumentException("illegal"); + status = new FileStatus(10, false, 1, 1, 0, path); + } + + /** + * Clean up the entire FS cache for the current user. + */ + @After + public void tearDown() { + FileSystemTestBinder.cleanFilesystemCache(); + } + + /** + * The healthy path for opening a file. + */ + @Test + public void testOpenFileGoodPath() throws Throwable { + final FileSystem mockFS = prepareMockFS(); + final FSDataInputStream in = new FSDataInputStream(new MockHadoopInputStream()); + final StubOpenFileBuilder opener = new StubOpenFileBuilder(mockFS, path, CompletableFuture.completedFuture(in)); + doReturn(opener).when(mockFS).openFile(path); + + // this looks up the FS binding via the status file path. + openAndRead(fileFromStatus()); + + // The fallback call of open(path) never took place. + Mockito.verify(mockFS, never()).open(path); + } + + /** + * The openFile() call raises a RuntimeException which it is caught and the + * classic open() call invoked. + */ + @Test + public void testOpenFileEarlyFailure() throws Throwable { + final FileSystem mockFS = prepareMockFS(); + final FSDataInputStream in = new FSDataInputStream(new MockHadoopInputStream()); + + Mockito.doThrow(illegal).when(mockFS).openFile(path); + doReturn(in).when(mockFS).open(path); + + // this looks up the FS binding via the status file path. + openAndRead(fileFromStatus()); + } + + /** + * openFile() failure during the completable future execution with an + * RTE raised. + * Again, this triggers a fallback to open(). + */ + @Test + public void testOpenFileLateFailure() throws Throwable { + final FileSystem mockFS = prepareMockFS(); + final FSDataInputStream in = new FSDataInputStream(new MockHadoopInputStream()); + + final StubOpenFileBuilder opener = new StubOpenFileBuilder( + mockFS, path, CompletableFuture.completedFuture(null).thenApply((f) -> { + throw illegal; + })); + doReturn(opener).when(mockFS).openFile(path); + doReturn(in).when(mockFS).open(path); + + openAndRead(fileFromStatus()); + } + + /** + * Open a stream, read the first byte, and assert that it matches + * what is expected. + * + * @param inputFile input file + * + * @throws IOException failure to open + */ + private static void openAndRead(final HadoopInputFile inputFile) throws IOException { + try (SeekableInputStream stream = inputFile.newStream()) { + Assert.assertEquals("byte read", FIRST, stream.read()); + } + } + + /** + * If openFile() raises an IOException within the future, + * then it is thrown and the classic open() call never invoked. + */ + @Test + public void testOpenFileRaisesIOException() throws Throwable { + final FileSystem mockFS = prepareMockFS(); + + final StubOpenFileBuilder opener = new StubOpenFileBuilder( + mockFS, path, CompletableFuture.completedFuture(null).thenApply((f) -> { + // throw a wrapped IOE + throw new UncheckedIOException(fileNotFound); + })); + doReturn(opener).when(mockFS).openFile(path); + + final HadoopInputFile inputFile = fileFromStatus(); + Assert.assertThrows(FileNotFoundException.class, inputFile::newStream); + Mockito.verify(mockFS, never()).open(path); + } + + /** + * If openFile() raises a RuntimeException, this it is caught and the. + * classic open() call invoked. + * If that call raises an IOE. + * Outcome: the IOE is thrown but the caught RTE is added to the + * suppressed list. + */ + @Test + public void testOpenFileDoubleFailure() throws Throwable { + final FileSystem mockFS = prepareMockFS(); + + Mockito.doThrow(illegal).when(mockFS).openFile(path); + Mockito.doThrow(fileNotFound).when(mockFS).open(path); + + // this looks up the FS binding via the status file path. + final HadoopInputFile inputFile = fileFromStatus(); + + final FileNotFoundException caught = Assert.assertThrows(FileNotFoundException.class, inputFile::newStream); + Assert.assertSame(fileNotFound, caught); + final Throwable[] suppressed = caught.getSuppressed(); + Assert.assertEquals("number of suppressed exceptions", 1, suppressed.length); + Assert.assertSame(illegal, suppressed[0]); + } + + /** + * The handling of a double RTE is the same as the case of + * the sequence of: RTE, IOE. + */ + @Test + public void testOpenFileDoubleRTE() throws Throwable { + final FileSystem mockFS = prepareMockFS(); + + Mockito.doThrow(illegal).when(mockFS).openFile(path); + NullPointerException npe = new NullPointerException("null"); + Mockito.doThrow(npe).when(mockFS).open(path); + + // this looks up the FS binding via the status file path. + final HadoopInputFile inputFile = fileFromStatus(); + + final NullPointerException caught = Assert.assertThrows(NullPointerException.class, inputFile::newStream); + Assert.assertSame(npe, caught); + final Throwable[] suppressed = caught.getSuppressed(); + Assert.assertEquals("number of suppressed exceptions", 1, suppressed.length); + Assert.assertSame(illegal, suppressed[0]); + } + + /** + * Create a mock FileSystem with the foundational operations + * mocked. The FS is added as the binding for the mock URI. + * + * @return a mock FileSystem + * + * @throws IOException stub signature only. + */ + private FileSystem prepareMockFS() throws IOException { + final FileSystem mockFS = mock(FileSystem.class); + doNothing().when(mockFS).close(); + doReturn(conf).when(mockFS).getConf(); + doReturn(status).when(mockFS).getFileStatus(path); + + // register the FS instance under the mock URI + addFileSystemForTesting(fsUri, mockFS); + return mockFS; + } + + /** + * Build an input file from the status field. + * @return an input file. + * @throws IOException failure to create the associated filesystem. + */ + private HadoopInputFile fileFromStatus() throws IOException { + return HadoopInputFile.fromStatus(status, conf); + } + + /** + * Stub implementation of {@link FutureDataInputStreamBuilder}. + * Trying to mock the interface is troublesome as the interface has added + * some new methods over time, instead this uses the base implementation class + * within o.a.h.fs.impl. + */ + private static final class StubOpenFileBuilder extends FutureDataInputStreamBuilderImpl { + + /** + * Operation to invoke to build the result. + */ + private final CompletableFuture result; + + /** + * Create the builder. The FS and path must be non-null. + * + * @param fileSystem fs + * @param path path to open + * @param result builder result. + */ + private StubOpenFileBuilder( + final FileSystem fileSystem, Path path, final CompletableFuture result) { + super(fileSystem, path); + this.result = result; + } + + @Override + public CompletableFuture build() + throws IllegalArgumentException, UnsupportedOperationException { + return result; + } + } +}