diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/H3ByteBufferInputStream.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/H3ByteBufferInputStream.java index bef6a7c1e5..61b9f3aa85 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/H3ByteBufferInputStream.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/H3ByteBufferInputStream.java @@ -59,19 +59,23 @@ public void readFully(final ByteBuffer buf) throws EOFException, IOException { /** * Read the buffer fully through use of {@code ByteBufferPositionedReadable.readFully()} - * at the current location. + * from the current location. + * That is it reads from stream[pos] to stream[pos + buf.remaining() -1] * * @param buf a byte buffer to fill with data from the stream + * @return number of bytes read. + * * @throws EOFException the buffer length is greater than the file length * @throws IOException other IO problems. */ // Visible for testing - static void performRead(final FSDataInputStream stream, final ByteBuffer buf) throws IOException { + static int performRead(final FSDataInputStream stream, final ByteBuffer buf) throws IOException { // remember the current position final long pos = stream.getPos(); final int size = buf.remaining(); stream.readFully(pos, buf); // then move read position on afterwards. stream.seek(pos + size); + return size; } } diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/util/TestHadoop3ByteBufferReadFully.java b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/util/TestHadoop3ByteBufferReadFully.java index c9e8a8249d..0481a1fdf7 100644 --- a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/util/TestHadoop3ByteBufferReadFully.java +++ b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/util/TestHadoop3ByteBufferReadFully.java @@ -19,23 +19,23 @@ package org.apache.parquet.hadoop.util; +import static org.apache.hadoop.fs.StreamCapabilities.READBYTEBUFFER; import static org.apache.parquet.hadoop.util.H3ByteBufferInputStream.performRead; import static org.apache.parquet.hadoop.util.HadoopStreams.wrap; import static org.apache.parquet.hadoop.util.MockHadoopInputStream.TEST_ARRAY; import java.io.EOFException; import java.io.IOException; -import java.io.InputStream; import java.nio.ByteBuffer; import java.util.Arrays; import java.util.Collection; import org.apache.hadoop.fs.ByteBufferPositionedReadable; +import org.apache.hadoop.fs.ByteBufferReadable; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.StreamCapabilities; import org.apache.hadoop.util.StringUtils; import org.apache.parquet.hadoop.TestUtils; import org.apache.parquet.io.SeekableInputStream; -import org.jetbrains.annotations.NotNull; import org.junit.Assert; import org.junit.Test; import org.junit.runner.RunWith; @@ -48,11 +48,14 @@ @RunWith(Parameterized.class) public class TestHadoop3ByteBufferReadFully { - public static final int LEN = 10; + /** + * The size of the stream. + */ + private static final int LEN = TEST_ARRAY.length; - @Parameterized.Parameters(name = "{0}") + @Parameterized.Parameters(name = "heap={0}") public static Collection data() { - Object[][] data = new Object[][] {{"heap", true}, {"direct", false}}; + Object[][] data = new Object[][] {{true}, {false}}; return Arrays.asList(data); } @@ -61,13 +64,20 @@ public static Collection data() { */ private final boolean useHeap; - public TestHadoop3ByteBufferReadFully(final String type, final boolean useHeap) { + /** + * Instantiate test suite. + * + * @param useHeap use a heap buffer? + */ + public TestHadoop3ByteBufferReadFully(final boolean useHeap) { this.useHeap = useHeap; } /** * Allocate a buffer; choice of on/off heap depends on test suite options. + * * @param capacity buffer capacity. + * * @return the buffer. */ private ByteBuffer allocate(int capacity) { @@ -87,11 +97,11 @@ public void testReadFullySmallBuffer() throws Exception { assertPositionAndLimit(readBuffer, 8, 8); // buffer is full so no more data is read. assertBufferRead(hadoopStream, readBuffer, 8, 8); - assertBufferMatches(readBuffer, 0); + verifyBufferMatches(readBuffer, 0); } /** - * Read more than the file size, require EOF exceptions to be raised. + * Read more than the file size, require an EOF exception to be raised. */ @Test public void testReadFullyLargeBuffer() throws Exception { @@ -107,6 +117,8 @@ public void testReadFullyLargeBuffer() throws Exception { /** * Seek to the file, try to read a buffer more than allowed. + * This fails because readFully() requires the whole buffer to be filled. + * When the buffer limit is reduced it will work. */ @Test public void testReadFullyFromOffset() throws Exception { @@ -127,7 +139,12 @@ public void testReadFullyFromOffset() throws Exception { assertBufferRead(hadoopStream, readBuffer, 4, 4); } - @NotNull private static FSDataInputStream stream() { + /** + * Create a data input stream wrapping an {@link MockByteBufferReadFullyInputStream}. + * + * @return in input stream. + */ + private static FSDataInputStream stream() { return new FSDataInputStream(new MockByteBufferReadFullyInputStream()); } @@ -146,7 +163,8 @@ public void testReadFullyJustRight() throws Exception { // trying to read 0 more bytes doesn't result in EOFException hadoopStream.readFully(11, readBuffer); - assertBufferMatches(readBuffer, 0); + // buffer unchanged + verifyBufferMatches(readBuffer, 0); } /** @@ -168,8 +186,7 @@ public void testReadFullyPosition() throws Exception { } /** - * Limit the buffer size, read with it - * @throws Exception + * Limit the buffer size, read it. */ @Test public void testReadFullyLimit() throws Exception { @@ -177,20 +194,27 @@ public void testReadFullyLimit() throws Exception { final int smallLimit = 7; readBuffer.limit(smallLimit); + // read up to the limit, twice. FSDataInputStream hadoopStream = stream(); - assertBufferRead(hadoopStream, readBuffer, smallLimit, smallLimit); hadoopStream.seek(0); + // the buffer is now full, so no bytes are read. + // the the position and the limit are unchanged. assertBufferRead(hadoopStream, readBuffer, smallLimit, smallLimit); + // and the stream is still at position zero. + assertStreamAt(hadoopStream, 0); - assertBufferMatches(readBuffer, 0); + verifyBufferMatches(readBuffer, 0); // recycle the buffer with a larger value and continue // reading from the end of the last read. readBuffer.position(smallLimit); readBuffer.limit(LEN); + hadoopStream.seek(smallLimit); + + assertStreamAt(hadoopStream, smallLimit); assertBufferRead(hadoopStream, readBuffer, LEN, LEN); - assertBufferMatches(readBuffer, 0); + verifyBufferMatches(readBuffer, 0); } @Test @@ -215,41 +239,78 @@ public void testReadFullyPositionAndLimit() throws Exception { assertBufferRead(hadoopStream, readBuffer, LEN, LEN); readBuffer.reset(); Assert.assertEquals("Buffer contents should match", ByteBuffer.wrap(TEST_ARRAY, 0, smallLimit), readBuffer); - // assertBufferMatches(readBuffer, 0); } + /** + * Assert that a buffer read raises EOFException. + * + * @param hadoopStream stream to read + * @param readBuffer target buffer. + */ private static void assertThrowsEOFException(final FSDataInputStream hadoopStream, final ByteBuffer readBuffer) { - TestUtils.assertThrows("Should throw EOFException", EOFException.class, () -> { + TestUtils.assertThrows("Must throw EOFException", EOFException.class, () -> { performRead(hadoopStream, readBuffer); return null; }); } + /** + * Regression test: verify that creating a stream for {@link MockHadoopInputStream} + * still generates an {@link H1SeekableInputStream}. + */ @Test - public void testCreateStreamNoByteBufferPositionedReadable() { - final SeekableInputStream s = wrap(new FSDataInputStream(new MockHadoopInputStream())); - Assert.assertTrue("Wrong wrapper: " + s, s instanceof H1SeekableInputStream); + public void testCreateH1Stream() { + assertStreamClass(H1SeekableInputStream.class, wrap(new FSDataInputStream(new MockHadoopInputStream()))); } + /** + * Regression test: verify that creating a stream which implements + * ByteBufferReadable but doesn't declare the capability generates {@link H2SeekableInputStream}. + */ @Test - public void testDoubleWrapNoByteBufferPositionedReadable() { - final SeekableInputStream s = - wrap(new FSDataInputStream(new FSDataInputStream(new MockByteBufferReadFullyInputStream()))); + public void testDoubleWrapByteBufferStream() { + assertStreamClass( + H2SeekableInputStream.class, + wrap(new FSDataInputStream(new FSDataInputStream(new MockByteBufferInputStream())))); + } - Assert.assertTrue("Wrong wrapper: " + s, s instanceof H3ByteBufferInputStream); + /** + * Regression test: verify that creating a stream which implements + * ByteBufferReadable generates {@link H2SeekableInputStream}. + */ + @Test + public void testDoubleWrapByteBufferStreamWithCapability() { + assertStreamClass( + H2SeekableInputStream.class, + wrap(new FSDataInputStream(new FSDataInputStream(new MockByteBufferInputStream(READBYTEBUFFER))))); + } + + /** + * Assert that an instantiated stream class matches the expected class. + * @param expected expected class + * @param stream stream to validate + */ + private static void assertStreamClass( + final Class expected, final SeekableInputStream stream) { + Assert.assertEquals("Wrong stream class: " + stream, expected, stream.getClass()); } + /** + * If a stream implements "in:preadbytebuffer" it gets bound to a H3ByteBufferInputStream. + */ @Test public void testCreateStreamWithByteBufferPositionedReadable() { - final SeekableInputStream s = wrap(stream()); - Assert.assertTrue("Wrong wrapper: " + s, s instanceof H3ByteBufferInputStream); + assertStreamClass(H3ByteBufferInputStream.class, wrap(stream())); } + /** + * + */ @Test public void testDoubleWrapByteBufferPositionedReadable() { - final SeekableInputStream s = - wrap(new FSDataInputStream(new FSDataInputStream(new MockByteBufferReadFullyInputStream()))); - Assert.assertTrue("Wrong wrapper: " + s, s instanceof H3ByteBufferInputStream); + assertStreamClass( + H3ByteBufferInputStream.class, + wrap(new FSDataInputStream(new FSDataInputStream(new MockByteBufferReadFullyInputStream())))); } /** @@ -258,15 +319,9 @@ public void testDoubleWrapByteBufferPositionedReadable() { */ @Test public void testPositionedReadableNoCapability() { - class IncapableStream extends MockByteBufferReadFullyInputStream { - @Override - public boolean hasCapability(final String capability) { - return false; - } - } - final InputStream in = new IncapableStream(); - final SeekableInputStream s = wrap(new FSDataInputStream(in)); - Assert.assertTrue("Wrong wrapper: " + s, s instanceof H1SeekableInputStream); + assertStreamClass( + H2SeekableInputStream.class, + wrap(new FSDataInputStream(new MockByteBufferReadFullyInputStream(READBYTEBUFFER)))); } /** @@ -278,9 +333,10 @@ public boolean hasCapability(final String capability) { public void testCapabilityWithoutInterface() { class InconsistentStream extends MockHadoopInputStream implements ByteBufferPositionedReadable, StreamCapabilities { + @Override public boolean hasCapability(final String capability) { - return StringUtils.toLowerCase(capability).equals(StreamCapabilities.PREADBYTEBUFFER); + return StringUtils.toLowerCase(capability).equals(PREADBYTEBUFFER); } @Override @@ -292,12 +348,19 @@ public int read(final long position, final ByteBuffer buf) throws IOException { public void readFully(final long position, final ByteBuffer buf) throws IOException {} } - final InputStream in = new InconsistentStream(); - final SeekableInputStream s = wrap(new FSDataInputStream(in)); - Assert.assertTrue("Wrong wrapper: " + s, s instanceof H3ByteBufferInputStream); + assertStreamClass(H3ByteBufferInputStream.class, wrap(new FSDataInputStream(new InconsistentStream()))); } - public static void assertBufferMatches(ByteBuffer readBuffer, int filePosition) { + /** + * Assert that the buffer contents match those of the input data from + * the offset filePosition. + * This operation reads the buffer data, so must be used after any other + * assertions about buffer, size, position etc. + * + * @param readBuffer buffer to examine + * @param filePosition file position. + */ + public static void verifyBufferMatches(ByteBuffer readBuffer, int filePosition) { readBuffer.flip(); final int remaining = readBuffer.remaining(); byte[] actual = getBytes(readBuffer); @@ -309,9 +372,11 @@ public static void assertBufferMatches(ByteBuffer readBuffer, int filePosition) } /** - * Gets the bytes of the buffer. This sets the buffer remaining + * Gets the bytes of the buffer. This sets the buffer.remaining() * value to 0. + * * @param buffer buffer. + * * @return buffer contents as bytes. */ public static byte[] getBytes(ByteBuffer buffer) { @@ -324,7 +389,9 @@ public static byte[] getBytes(ByteBuffer buffer) { * Map a byte array to hex values. * Of limited value once the byte value is greater than 15 * as the string is hard to read. + * * @param array source data + * * @return string list. */ private static String stringify(byte[] array) { @@ -338,7 +405,8 @@ private static String stringify(byte[] array) { } /** - * Assert the current buffer position and limit. + * Assert the current buffer position and limit are as expected + * * @param readBuffer buffer * @param bufferPosition buffer position. * @param limit buffer limit @@ -350,8 +418,10 @@ private static void assertPositionAndLimit(ByteBuffer readBuffer, int bufferPosi /** * Assert the stream position is at the expected value. + * * @param hadoopStream stream * @param pos expected position + * * @throws IOException exception raised on getPos() */ private static void assertStreamAt(final FSDataInputStream hadoopStream, long pos) throws IOException { @@ -360,34 +430,82 @@ private static void assertStreamAt(final FSDataInputStream hadoopStream, long po /** * Read a buffer at the current position through {@link H3ByteBufferInputStream#performRead(FSDataInputStream, ByteBuffer)}. - * Assert that the stream buffer position and limit are what is expected + * Assert that the stream buffer position and limit are as expected. + * That is: the stream position has been moved forwards by the + * size of the buffer. * * @param hadoopStream stream * @param readBuffer buffer to fill - * @param bufferPosition final buffer position - * @param limit final buffer limit + * @param expectedBufferPosition final buffer position + * @param expectedLimit final buffer limit * * @throws IOException read failure */ private static void assertBufferRead( final FSDataInputStream hadoopStream, final ByteBuffer readBuffer, - final int bufferPosition, - final int limit) + final int expectedBufferPosition, + final int expectedLimit) throws IOException { final long pos = hadoopStream.getPos(); final int remaining = readBuffer.remaining(); - performRead(hadoopStream, readBuffer); - assertPositionAndLimit(readBuffer, bufferPosition, limit); + final int read = performRead(hadoopStream, readBuffer); + // the bytes read MUST match the buffer size, as this is a full buffer read. + Assert.assertEquals("bytes read from stream", remaining, read); + // the buffer position and limit match what was expected. + assertPositionAndLimit(readBuffer, expectedBufferPosition, expectedLimit); + // the stream has moved forwards. assertStreamAt(hadoopStream, pos + remaining); } /** - * Input stream which claims to implement ByteBufferPositionedReadable + * Input stream which claims to implement ByteBufferReadable in both interfaces and, optionally, + * in {@code hasCapability()}. + */ + private static class MockByteBufferInputStream extends MockHadoopInputStream + implements ByteBufferReadable, StreamCapabilities { + + private final String[] capabilities; + + /** + * Constructor. + * @param capabilities an array of capabilities to declare support for. + */ + private MockByteBufferInputStream(String... capabilities) { + this.capabilities = capabilities; + } + + @Override + public int read(final ByteBuffer buf) { + return 0; + } + + /** + * Does a stream have the + * @param capability string to query the stream support for. + * @return true if there is an entry in the capability list matching the argument. + */ + @Override + public boolean hasCapability(final String capability) { + return Arrays.stream(capabilities).anyMatch(c -> c.equals(capability)); + } + } + + /** + * Input stream which claims to implement ByteBufferPositionedReadable, + * unless constructed with a capability list that excludes it. */ - private static class MockByteBufferReadFullyInputStream extends MockHadoopInputStream + private static class MockByteBufferReadFullyInputStream extends MockByteBufferInputStream implements ByteBufferPositionedReadable, StreamCapabilities { + public MockByteBufferReadFullyInputStream() { + this(READBYTEBUFFER, PREADBYTEBUFFER); + } + + public MockByteBufferReadFullyInputStream(final String... capabilites) { + super(capabilites); + } + @Override public int read(final long position, final ByteBuffer buf) throws IOException { rejectNegativePosition(position); @@ -413,15 +531,5 @@ public void readFully(final long position, final ByteBuffer buf) throws IOExcept System.arraycopy(data(), (int) position, result, 0, toRead); buf.put(result); } - - /** - * Declare support for ByteBufferPositionedReadable. - * This is the only way that an implementation wil be picked up. - * @param capability string to query the stream support for. - * @return - */ - public boolean hasCapability(final String capability) { - return StringUtils.toLowerCase(capability).equals(StreamCapabilities.PREADBYTEBUFFER); - } } }