Skip to content

Commit

Permalink
GH-3078: Use Hadoop FileSystem.openFile() to open files (#3079)
Browse files Browse the repository at this point in the history
  • Loading branch information
steveloughran authored Dec 4, 2024
1 parent 0ddffb2 commit f4a3e8b
Show file tree
Hide file tree
Showing 4 changed files with 443 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,13 @@

package org.apache.parquet.hadoop.util;

import static org.apache.parquet.hadoop.util.wrapped.io.FutureIO.awaitFuture;

import java.io.IOException;
import java.io.InterruptedIOException;
import java.util.concurrent.CompletableFuture;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
Expand All @@ -29,6 +34,24 @@

public class HadoopInputFile implements InputFile {

/**
* openFile() option name for setting the read policy: {@value}.
*/
private static final String OPENFILE_READ_POLICY_KEY = "fs.option.openfile.read.policy";

/**
* Read policy when opening parquet files: {@value}.
* <p>Policy-aware stores pick the first policy they recognize in the list.
* everything recognizes "random";
* "vector" came in with 3.4.0, while "parquet" came with Hadoop 3.4.1
* parquet means "this is a Parquet file, so be clever about footers, prefetch,
* and expect vector and/or random IO".
* <p>In Hadoop 3.4.1, "parquet" and "vector" are both mapped to "random" for the
* S3A connector, but as the ABFS and GCS connectors do footer caching, they
* may use it as a hint to say "fetch the footer and keep it in memory"
*/
private static final String PARQUET_READ_POLICY = "parquet, vector, random, adaptive";

private final FileSystem fs;
private final FileStatus stat;
private final Configuration conf;
Expand Down Expand Up @@ -70,9 +93,45 @@ public long getLength() {
return stat.getLen();
}

/**
* Open the file.
* <p>Uses {@code FileSystem.openFile()} so that
* the existing FileStatus can be passed down: saves a HEAD request on cloud storage.
* and ignored everywhere else.
*
* @return the input stream.
*
* @throws InterruptedIOException future was interrupted
* @throws IOException if something went wrong
* @throws RuntimeException any nested RTE thrown
*/
@Override
public SeekableInputStream newStream() throws IOException {
return HadoopStreams.wrap(fs.open(stat.getPath()));
FSDataInputStream stream;
try {
// this method is async so that implementations may do async HEAD head
// requests, such as S3A/ABFS when a file status is passed down.
final CompletableFuture<FSDataInputStream> future = fs.openFile(stat.getPath())
.withFileStatus(stat)
.opt(OPENFILE_READ_POLICY_KEY, PARQUET_READ_POLICY)
.build();
stream = awaitFuture(future);
} catch (RuntimeException e) {
// S3A < 3.3.5 would raise illegal path exception if the openFile path didn't
// equal the path in the FileStatus; Hive virtual FS could create this condition.
// As the path to open is derived from stat.getPath(), this condition seems
// near-impossible to create -but is handled here for due diligence.
try {
stream = fs.open(stat.getPath());
} catch (IOException | RuntimeException ex) {
// failure on this attempt attaches the failure of the openFile() call
// so the stack trace is preserved.
ex.addSuppressed(e);
throw ex;
}
}

return HadoopStreams.wrap(stream);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,29 @@ public static <T> T awaitFuture(final Future<T> future, final long timeout, fina
}
}

/**
* Given a future, evaluate it.
* <p>
* Any exception generated in the future is
* extracted and rethrown.
* </p>
* @param future future to evaluate
* @param <T> type of the result.
* @return the result, if all went well.
* @throws InterruptedIOException future was interrupted
* @throws IOException if something went wrong
* @throws RuntimeException any nested RTE thrown
*/
public static <T> T awaitFuture(final Future<T> future)
throws InterruptedIOException, IOException, RuntimeException {
try {
return future.get();
} catch (InterruptedException e) {
throw (InterruptedIOException) new InterruptedIOException(e.toString()).initCause(e);
} catch (ExecutionException e) {
throw unwrapInnerException(e);
}
}
/**
* From the inner cause of an execution exception, extract the inner cause
* to an IOException, raising Errors immediately.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hadoop.fs;

import java.io.IOException;
import java.io.UncheckedIOException;
import java.net.URI;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.UserGroupInformation;

/**
* Based on {@code org.apache.hadoop.fs.FileSystemTestHelper},
* This class exports the package private {@code FileSystem}
* methods which can be used to push FS instances into the
* map of URI -> fs instance.
* <p>
* This makes it easy to add instances of Mocked filesystems
* to the map, which will then be picked up by any
* code retrieving an FS instance for that URI
* <p>
* The API is stable and used elsewhere. What is important
* is to remove FS instances after each test case.
* {@link #cleanFilesystemCache()} cleans the entire cache
* and should be used in teardown methods.
*/
public final class FileSystemTestBinder {

/**
* Empty configuration.
* Part of the FileSystem method signatures, but not used behind them.
*/
public static final Configuration CONF = new Configuration(false);

/**
* Inject a filesystem into the cache.
* @param uri filesystem URI
* @param fs filesystem to inject
* @throws UncheckedIOException Hadoop UGI problems.
*/
public static void addFileSystemForTesting(URI uri, FileSystem fs) {
try {
FileSystem.addFileSystemForTesting(uri, CONF, fs);
} catch (IOException e) {
throw new UncheckedIOException(e);
}
}

/**
* Clean up the filesystem cache.
* This swallows any IOE nominally raised in the process, to ensure
* this can safely invoked in teardowns.
*/
public static void cleanFilesystemCache() {
try {
FileSystem.closeAllForUGI(UserGroupInformation.getCurrentUser());
} catch (IOException ignored) {
// Ignore the exception as if getCurrentUser() fails then it'll
// have been impossible to add mock instances to a per-user cache.
}
}
}
Loading

0 comments on commit f4a3e8b

Please sign in to comment.