Skip to content

Commit

Permalink
apacheGH-3080: HadoopStreams to support ByteBufferPositionedReadable …
Browse files Browse the repository at this point in the history
…input streams
  • Loading branch information
steveloughran committed Dec 3, 2024
1 parent 0ddffb2 commit 87e2464
Show file tree
Hide file tree
Showing 6 changed files with 545 additions and 51 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,10 @@ public void readVectored(List<ParquetFileRange> ranges, ByteBufferAllocator allo
VectorIoBridge.instance().readVectoredRanges(stream, ranges, allocator);
}

protected Reader getReader() {
return reader;
}

public static void readFully(Reader reader, ByteBuffer buf) throws IOException {
// unfortunately the Hadoop 2 APIs do not have a 'readFully' equivalent for the byteBuffer read
// calls. The read(ByteBuffer) call might read fewer than byteBuffer.hasRemaining() bytes. Thus we
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.parquet.hadoop.util;

import java.io.EOFException;
import java.io.IOException;
import java.nio.ByteBuffer;
import org.apache.hadoop.fs.FSDataInputStream;

/**
* Class which implements {@link #readFully(ByteBuffer)} through
* {@code ByteBufferPositionedReadable.readFully()}.
* <p>This is implemented by HDFS and possibly other clients,
*/
class H3ByteBufferInputStream extends H2SeekableInputStream {
public H3ByteBufferInputStream(final FSDataInputStream stream) {
super(stream);
}

@Override
public FSDataInputStream getStream() {
return (FSDataInputStream) super.getStream();
}

/**
* Read the buffer fully through use of {@code ByteBufferPositionedReadable.readFully()}
* at the current location.
* <p>That operation is designed to not use the current reading position, rather
* an absolute position is passed in.
* In the use here the original read position is saved, and
* after the read is finished a {@code seek()} call made to move the
* cursor on.
*
* @param buf a byte buffer to fill with data from the stream
*
* @throws EOFException the buffer length is greater than the file length
* @throws IOException other IO problems.
*/
@Override
public void readFully(final ByteBuffer buf) throws EOFException, IOException {
// use ByteBufferPositionedReadable.readFully()
final FSDataInputStream stream = getStream();
// remember the current position
final long pos = stream.getPos();
final int size = buf.remaining();
stream.readFully(pos, buf);
// then move read position on afterwards.
stream.seek(pos + size);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,16 @@

package org.apache.parquet.hadoop.util;

import static org.apache.hadoop.fs.StreamCapabilities.PREADBYTEBUFFER;
import static org.apache.hadoop.fs.StreamCapabilities.READBYTEBUFFER;

import java.io.InputStream;
import java.util.Objects;
import java.util.function.Function;
import org.apache.hadoop.fs.ByteBufferReadable;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.parquet.io.PositionOutputStream;
import org.apache.parquet.io.SeekableInputStream;
import org.apache.parquet.util.DynMethods;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -38,11 +39,6 @@ public class HadoopStreams {

private static final Logger LOG = LoggerFactory.getLogger(HadoopStreams.class);

private static final DynMethods.UnboundMethod hasCapabilitiesMethod = new DynMethods.Builder("hasCapabilities")
.impl(FSDataInputStream.class, "hasCapabilities", String.class)
.orNoop()
.build();

/**
* Wraps a {@link FSDataInputStream} in a {@link SeekableInputStream}
* implementation for Parquet readers.
Expand All @@ -53,42 +49,14 @@ public class HadoopStreams {
public static SeekableInputStream wrap(FSDataInputStream stream) {
Objects.requireNonNull(stream, "Cannot wrap a null input stream");

// Try to check using hasCapabilities(str)
Boolean hasCapabilitiesResult = isWrappedStreamByteBufferReadable(stream);

// If it is null, then fall back to the old method
if (hasCapabilitiesResult != null) {
if (hasCapabilitiesResult) {
return new H2SeekableInputStream(stream);
} else {
return new H1SeekableInputStream(stream);
}
}

return unwrapByteBufferReadableLegacy(stream).apply(stream);
}

/**
* Is the inner stream byte buffer readable?
* The test is 'the stream is not FSDataInputStream
* and implements ByteBufferReadable'
* <p>
* This logic is only used for Hadoop <2.9.x, and <3.x.x
*
* @param stream stream to probe
* @return A H2SeekableInputStream to access, or H1SeekableInputStream if the stream is not seekable
*/
private static Function<FSDataInputStream, SeekableInputStream> unwrapByteBufferReadableLegacy(
FSDataInputStream stream) {
InputStream wrapped = stream.getWrappedStream();
if (wrapped instanceof FSDataInputStream) {
LOG.debug("Checking on wrapped stream {} of {} whether is ByteBufferReadable", wrapped, stream);
return unwrapByteBufferReadableLegacy(((FSDataInputStream) wrapped));
}
if (stream.getWrappedStream() instanceof ByteBufferReadable) {
return H2SeekableInputStream::new;
// Check using hasCapabilities(str)
if (stream.hasCapability(PREADBYTEBUFFER)) {
LOG.debug("Using ByteBufferPositionedReadable to read {}", stream);
return new H3ByteBufferInputStream(stream);
} else if (isWrappedStreamByteBufferReadable(stream)) {
return new H2SeekableInputStream(stream);
} else {
return H1SeekableInputStream::new;
return new H1SeekableInputStream(stream);
}
}

Expand All @@ -111,14 +79,8 @@ private static Function<FSDataInputStream, SeekableInputStream> unwrapByteBuffer
* the data, null when it cannot be determined because of missing hasCapabilities
*/
private static Boolean isWrappedStreamByteBufferReadable(FSDataInputStream stream) {
if (hasCapabilitiesMethod.isNoop()) {
// When the method is not available, just return a null
return null;
}

boolean isByteBufferReadable = hasCapabilitiesMethod.invoke(stream, "in:readbytebuffer");

if (isByteBufferReadable) {
if (stream.hasCapability(READBYTEBUFFER)) {
// stream is issuing the guarantee that it implements the
// API. Holds for all implementations in hadoop-*
// since Hadoop 3.3.0 (HDFS-14111).
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
package org.apache.parquet.hadoop.util;

import java.io.ByteArrayInputStream;
import java.io.EOFException;
import java.io.IOException;
import org.apache.hadoop.fs.PositionedReadable;
import org.apache.hadoop.fs.Seekable;
Expand Down Expand Up @@ -71,6 +72,7 @@ public void readFully(long position, byte[] buffer) throws IOException {

@Override
public void seek(long pos) throws IOException {
rejectNegativePosition(pos);
this.pos = (int) pos;
}

Expand All @@ -84,4 +86,28 @@ public boolean seekToNewSource(long targetPos) throws IOException {
seek(targetPos);
return true;
}

/**
* How long is the actual test data.
* @return the test data
*/
int length() {
return TEST_ARRAY.length;
}

byte[] data() {
return TEST_ARRAY;
}

/**
* For consistency with real Hadoop streams: reject negative positions.
* @param pos position to read/seek to.
* @throws EOFException if pos is negative
*/
static void rejectNegativePosition(final long pos) throws EOFException {
if (pos < 0) {
throw new EOFException("Seek before file start: " + pos);
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@
import java.nio.ByteBuffer;
import org.apache.hadoop.fs.ByteBufferReadable;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.StreamCapabilities;
import org.apache.hadoop.util.StringUtils;
import org.apache.parquet.hadoop.TestUtils;
import org.apache.parquet.io.SeekableInputStream;
import org.junit.Assert;
Expand Down Expand Up @@ -407,13 +409,20 @@ public void testDoubleWrapByteBufferReadable() {
}

/**
* Input stream which claims to implement ByteBufferReadable.
* Input stream which claims to implement ByteBufferReadable in both interfaces and
* in {@code hasCapability()}.
*/
private static final class MockByteBufferInputStream extends MockHadoopInputStream implements ByteBufferReadable {
private static final class MockByteBufferInputStream extends MockHadoopInputStream
implements ByteBufferReadable, StreamCapabilities {

@Override
public int read(final ByteBuffer buf) {
return 0;
}

@Override
public boolean hasCapability(final String capability) {
return StringUtils.toLowerCase(capability).equals(READBYTEBUFFER);
}
}
}
Loading

0 comments on commit 87e2464

Please sign in to comment.