Skip to content

Commit

Permalink
PARQUET-2276: Bring back support for Hadoop 2.7.3 (apache#1084) (apac…
Browse files Browse the repository at this point in the history
…he#1090)

* Bring back support for Hadoop 2.7.3

* Simplify the code

* Fix the naming

* Comments
  • Loading branch information
Fokko authored May 9, 2023
1 parent 728c1cb commit e2c2499
Show file tree
Hide file tree
Showing 2 changed files with 56 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.parquet.io.PositionOutputStream;
import org.apache.parquet.io.SeekableInputStream;
import org.apache.parquet.util.DynMethods;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -37,6 +38,13 @@ public class HadoopStreams {

private static final Logger LOG = LoggerFactory.getLogger(HadoopStreams.class);

private static final DynMethods.UnboundMethod hasCapabilitiesMethod =
new DynMethods
.Builder("hasCapabilities")
.impl(FSDataInputStream.class, "hasCapabilities", String.class)
.orNoop()
.build();

/**
* Wraps a {@link FSDataInputStream} in a {@link SeekableInputStream}
* implementation for Parquet readers.
Expand All @@ -46,7 +54,39 @@ public class HadoopStreams {
*/
public static SeekableInputStream wrap(FSDataInputStream stream) {
Objects.requireNonNull(stream, "Cannot wrap a null input stream");
if (isWrappedStreamByteBufferReadable(stream)) {

// Try to check using hasCapabilities(str)
Boolean hasCapabilitiesResult = isWrappedStreamByteBufferReadable(stream);

// If it is null, then fall back to the old method
if (hasCapabilitiesResult != null) {
if (hasCapabilitiesResult) {
return new H2SeekableInputStream(stream);
} else {
return new H1SeekableInputStream(stream);
}
}

return unwrapByteBufferReadableLegacy(stream);
}

/**
* Is the inner stream byte buffer readable?
* The test is 'the stream is not FSDataInputStream
* and implements ByteBufferReadable'
*
* This logic is only used for Hadoop <2.9.x, and <3.x.x
*
* @param stream stream to probe
* @return A H2SeekableInputStream to access, or H1SeekableInputStream if the stream is not seekable
*/
private static SeekableInputStream unwrapByteBufferReadableLegacy(FSDataInputStream stream) {
InputStream wrapped = stream.getWrappedStream();
if (wrapped instanceof FSDataInputStream) {
LOG.debug("Checking on wrapped stream {} of {} whether is ByteBufferReadable", wrapped, stream);
return unwrapByteBufferReadableLegacy(((FSDataInputStream) wrapped));
}
if (stream.getWrappedStream() instanceof ByteBufferReadable) {
return new H2SeekableInputStream(stream);
} else {
return new H1SeekableInputStream(stream);
Expand All @@ -55,23 +95,31 @@ public static SeekableInputStream wrap(FSDataInputStream stream) {

/**
* Is the inner stream byte buffer readable?
* The test is "the stream is not FSDataInputStream
* The test is 'the stream is not FSDataInputStream
* and implements ByteBufferReadable'
*
* That is: all streams which implement ByteBufferReadable
* other than FSDataInputStream successfuly support read(ByteBuffer).
* This is true for all filesytem clients the hadoop codebase.
* other than FSDataInputStream successfully support read(ByteBuffer).
* This is true for all filesystem clients the hadoop codebase.
*
* In hadoop 3.3.0+, the StreamCapabilities probe can be used to
* check this: only those streams which provide the read(ByteBuffer)
* semantics MAY return true for the probe "in:readbytebuffer";
* FSDataInputStream will pass the probe down to the underlying stream.
*
* @param stream stream to probe
* @return true if it is safe to a H2SeekableInputStream to access the data
* @return true if it is safe to a H2SeekableInputStream to access
* the data, null when it cannot be determined because of missing hasCapabilities
*/
private static boolean isWrappedStreamByteBufferReadable(FSDataInputStream stream) {
if (stream.hasCapability("in:readbytebuffer")) {
private static Boolean isWrappedStreamByteBufferReadable(FSDataInputStream stream) {
if (hasCapabilitiesMethod.isNoop()) {
// When the method is not available, just return a null
return null;
}

boolean isByteBufferReadable = hasCapabilitiesMethod.invoke(stream, "in:readbytebuffer");

if (isByteBufferReadable) {
// stream is issuing the guarantee that it implements the
// API. Holds for all implementations in hadoop-*
// since Hadoop 3.3.0 (HDFS-14111).
Expand Down
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -598,7 +598,7 @@
<profile>
<id>hadoop2</id>
<properties>
<hadoop.version>2.9.2</hadoop.version>
<hadoop.version>2.7.3</hadoop.version>
</properties>
</profile>

Expand Down

0 comments on commit e2c2499

Please sign in to comment.