From 789e1853218b15a52555fc32dbbb845a9a0c2d12 Mon Sep 17 00:00:00 2001 From: Steve Loughran Date: Wed, 27 Nov 2024 20:36:38 +0000 Subject: [PATCH] GH-3080: Tests of the new input stream. Based of the H2 stream test suite but * parameterized for on/off heap * expect no changes in buffer contents on out of range reads. Still one test failure. --- .../hadoop/util/H3ByteBufferInputStream.java | 15 +- .../hadoop/util/MockHadoopInputStream.java | 1 - .../util/TestHadoop3ByteBufferReadFully.java | 536 +++++++++--------- 3 files changed, 281 insertions(+), 271 deletions(-) diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/H3ByteBufferInputStream.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/H3ByteBufferInputStream.java index c9bed63a6f..bef6a7c1e5 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/H3ByteBufferInputStream.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/H3ByteBufferInputStream.java @@ -54,8 +54,19 @@ public FSDataInputStream getStream() { */ @Override public void readFully(final ByteBuffer buf) throws EOFException, IOException { - // use ByteBufferPositionedReadable.readFully() - final FSDataInputStream stream = getStream(); + performRead(getStream(), buf); + } + + /** + * Read the buffer fully through use of {@code ByteBufferPositionedReadable.readFully()} + * at the current location. + * + * @param buf a byte buffer to fill with data from the stream + * @throws EOFException the buffer length is greater than the file length + * @throws IOException other IO problems. + */ + // Visible for testing + static void performRead(final FSDataInputStream stream, final ByteBuffer buf) throws IOException { // remember the current position final long pos = stream.getPos(); final int size = buf.remaining(); diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/util/MockHadoopInputStream.java b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/util/MockHadoopInputStream.java index 6c342fd5d1..58d494a208 100644 --- a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/util/MockHadoopInputStream.java +++ b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/util/MockHadoopInputStream.java @@ -109,5 +109,4 @@ static void rejectNegativePosition(final long pos) throws EOFException { throw new EOFException("Seek before file start: " + pos); } } - } diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/util/TestHadoop3ByteBufferReadFully.java b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/util/TestHadoop3ByteBufferReadFully.java index c4b5c947b9..c9e8a8249d 100644 --- a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/util/TestHadoop3ByteBufferReadFully.java +++ b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/util/TestHadoop3ByteBufferReadFully.java @@ -19,378 +19,373 @@ package org.apache.parquet.hadoop.util; +import static org.apache.parquet.hadoop.util.H3ByteBufferInputStream.performRead; import static org.apache.parquet.hadoop.util.HadoopStreams.wrap; import static org.apache.parquet.hadoop.util.MockHadoopInputStream.TEST_ARRAY; import java.io.EOFException; import java.io.IOException; +import java.io.InputStream; import java.nio.ByteBuffer; +import java.util.Arrays; +import java.util.Collection; import org.apache.hadoop.fs.ByteBufferPositionedReadable; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.StreamCapabilities; import org.apache.hadoop.util.StringUtils; import org.apache.parquet.hadoop.TestUtils; import org.apache.parquet.io.SeekableInputStream; +import org.jetbrains.annotations.NotNull; import org.junit.Assert; import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; /** * Test {@code ByteBufferPositionedReadable.readFully()} reads. + * Parameterized on heap vs. direct buffers. */ +@RunWith(Parameterized.class) public class TestHadoop3ByteBufferReadFully { - @Test - public void testHeapReadFullySmallBuffer() throws Exception { - ByteBuffer readBuffer = ByteBuffer.allocate(8); + public static final int LEN = 10; - FSDataInputStream hadoopStream = new FSDataInputStream(new MockByteBufferReadFullyInputStream()); + @Parameterized.Parameters(name = "{0}") + public static Collection data() { + Object[][] data = new Object[][] {{"heap", true}, {"direct", false}}; + return Arrays.asList(data); + } - Assert.assertEquals(8, readBuffer.position()); - Assert.assertEquals(8, readBuffer.limit()); + /** + * Use a heap buffer? + */ + private final boolean useHeap; - Assert.assertEquals(8, readBuffer.position()); - Assert.assertEquals(8, readBuffer.limit()); + public TestHadoop3ByteBufferReadFully(final String type, final boolean useHeap) { + this.useHeap = useHeap; + } - readBuffer.flip(); - Assert.assertEquals("Buffer contents should match", ByteBuffer.wrap(TEST_ARRAY, 0, 8), readBuffer); + /** + * Allocate a buffer; choice of on/off heap depends on test suite options. + * @param capacity buffer capacity. + * @return the buffer. + */ + private ByteBuffer allocate(int capacity) { + return useHeap ? ByteBuffer.allocate(capacity) : ByteBuffer.allocateDirect(capacity); } + /** + * Read a buffer smaller than the source file. + */ @Test - public void testHeapReadFullyLargeBuffer() throws Exception { - final ByteBuffer readBuffer = ByteBuffer.allocate(20); + public void testReadFullySmallBuffer() throws Exception { + ByteBuffer readBuffer = allocate(8); - FSDataInputStream hadoopStream = new FSDataInputStream(new MockByteBufferReadFullyInputStream()); + FSDataInputStream hadoopStream = stream(); - assertThrowsEOFException(hadoopStream, 0, readBuffer); - - // NOTE: This behavior differs from readFullyHeapBuffer because direct uses - // several read operations that will read up to the end of the input. This - // is a correct value because the bytes in the buffer are valid. This - // behavior can't be implemented for the heap buffer without using the read - // method instead of the readFully method on the underlying - // FSDataInputStream. - assertPositionAndLimit(readBuffer, 10, 20); + assertBufferRead(hadoopStream, readBuffer, 8, 8); + assertPositionAndLimit(readBuffer, 8, 8); + // buffer is full so no more data is read. + assertBufferRead(hadoopStream, readBuffer, 8, 8); + assertBufferMatches(readBuffer, 0); } - private static void assertPositionAndLimit(ByteBuffer readBuffer, int pos, int limit) { - assertPosition(readBuffer, pos); - assertLimit(readBuffer, limit); + /** + * Read more than the file size, require EOF exceptions to be raised. + */ + @Test + public void testReadFullyLargeBuffer() throws Exception { + final ByteBuffer readBuffer = allocate(20); + + FSDataInputStream hadoopStream = stream(); + + assertThrowsEOFException(hadoopStream, readBuffer); + + // EOF check happened before the read -at least with this test stream. + assertPositionAndLimit(readBuffer, 0, 20); } - private static void assertPosition(final ByteBuffer readBuffer, final int pos) { - Assert.assertEquals("Buffer Position", pos, readBuffer.position()); + /** + * Seek to the file, try to read a buffer more than allowed. + */ + @Test + public void testReadFullyFromOffset() throws Exception { + final int size = 5; + final ByteBuffer readBuffer = allocate(size); + + FSDataInputStream hadoopStream = stream(); + hadoopStream.seek(6); + + // read past EOF is a failure + assertThrowsEOFException(hadoopStream, readBuffer); + // stream does not change position + assertStreamAt(hadoopStream, 6); + + // reduce buffer limit + readBuffer.limit(4); + // now the read works. + assertBufferRead(hadoopStream, readBuffer, 4, 4); } - private static void assertLimit(final ByteBuffer readBuffer, final int limit) { - Assert.assertEquals("Buffer Limit", limit, readBuffer.limit()); + @NotNull private static FSDataInputStream stream() { + return new FSDataInputStream(new MockByteBufferReadFullyInputStream()); } + /** + * Read exactly the size of the file. + */ @Test - public void testHeapReadFullyJustRight() throws Exception { - ByteBuffer readBuffer = ByteBuffer.allocate(10); + public void testReadFullyJustRight() throws Exception { + ByteBuffer readBuffer = allocate(LEN); - FSDataInputStream hadoopStream = new FSDataInputStream(new MockByteBufferReadFullyInputStream()); + FSDataInputStream hadoopStream = stream(); // reads all of the bytes available without EOFException - hadoopStream.readFully(0, readBuffer); - assertPosition(readBuffer, 10); + assertBufferRead(hadoopStream, readBuffer, LEN, LEN); // trying to read 0 more bytes doesn't result in EOFException hadoopStream.readFully(11, readBuffer); - Assert.assertEquals(10, readBuffer.position()); - Assert.assertEquals(10, readBuffer.limit()); - readBuffer.flip(); - Assert.assertEquals("Buffer contents should match", ByteBuffer.wrap(TEST_ARRAY), readBuffer); + assertBufferMatches(readBuffer, 0); } + /** + * Read with the buffer position set to a value within the buffer. + */ @Test - public void testHeapReadFullySmallReads() throws Exception { - ByteBuffer readBuffer = ByteBuffer.allocate(10); - - FSDataInputStream hadoopStream = new FSDataInputStream(new MockByteBufferReadFullyInputStream()); - - hadoopStream.readFully(0, readBuffer); - Assert.assertEquals(10, readBuffer.position()); - Assert.assertEquals(10, readBuffer.limit()); - - hadoopStream.readFully(0, readBuffer); - Assert.assertEquals(10, readBuffer.position()); - Assert.assertEquals(10, readBuffer.limit()); - - readBuffer.flip(); - Assert.assertEquals("Buffer contents should match", ByteBuffer.wrap(TEST_ARRAY), readBuffer); - } - - @Test - public void testHeapReadFullyPosition() throws Exception { - ByteBuffer readBuffer = ByteBuffer.allocate(10); + public void testReadFullyPosition() throws Exception { + ByteBuffer readBuffer = allocate(LEN); readBuffer.position(3); readBuffer.mark(); - FSDataInputStream hadoopStream = new FSDataInputStream(new MockByteBufferReadFullyInputStream()); - - hadoopStream.readFully(0, readBuffer); - Assert.assertEquals(10, readBuffer.position()); - Assert.assertEquals(10, readBuffer.limit()); - - hadoopStream.readFully(0, readBuffer); - Assert.assertEquals(10, readBuffer.position()); - Assert.assertEquals(10, readBuffer.limit()); + FSDataInputStream hadoopStream = stream(); + assertBufferRead(hadoopStream, readBuffer, LEN, LEN); + assertBufferRead(hadoopStream, readBuffer, LEN, LEN); + // reset to where the mark is. readBuffer.reset(); Assert.assertEquals("Buffer contents should match", ByteBuffer.wrap(TEST_ARRAY, 0, 7), readBuffer); } + /** + * Limit the buffer size, read with it + * @throws Exception + */ @Test - public void testHeapReadFullyLimit() throws Exception { - ByteBuffer readBuffer = ByteBuffer.allocate(10); - readBuffer.limit(7); + public void testReadFullyLimit() throws Exception { + ByteBuffer readBuffer = allocate(LEN); + final int smallLimit = 7; + readBuffer.limit(smallLimit); - FSDataInputStream hadoopStream = new FSDataInputStream(new MockByteBufferReadFullyInputStream()); + FSDataInputStream hadoopStream = stream(); - hadoopStream.readFully(0, readBuffer); - Assert.assertEquals(7, readBuffer.position()); - Assert.assertEquals(7, readBuffer.limit()); + assertBufferRead(hadoopStream, readBuffer, smallLimit, smallLimit); + hadoopStream.seek(0); + assertBufferRead(hadoopStream, readBuffer, smallLimit, smallLimit); - hadoopStream.readFully(0, readBuffer); - Assert.assertEquals(7, readBuffer.position()); - Assert.assertEquals(7, readBuffer.limit()); + assertBufferMatches(readBuffer, 0); - readBuffer.flip(); - Assert.assertEquals("Buffer contents should match", ByteBuffer.wrap(TEST_ARRAY, 0, 7), readBuffer); - - readBuffer.position(7); - readBuffer.limit(10); - hadoopStream.readFully(0, readBuffer); - Assert.assertEquals(10, readBuffer.position()); - Assert.assertEquals(10, readBuffer.limit()); - - readBuffer.flip(); - Assert.assertEquals("Buffer contents should match", ByteBuffer.wrap(TEST_ARRAY), readBuffer); + // recycle the buffer with a larger value and continue + // reading from the end of the last read. + readBuffer.position(smallLimit); + readBuffer.limit(LEN); + assertBufferRead(hadoopStream, readBuffer, LEN, LEN); + assertBufferMatches(readBuffer, 0); } @Test - public void testHeapReadFullyPositionAndLimit() throws Exception { - ByteBuffer readBuffer = ByteBuffer.allocate(10); + public void testReadFullyPositionAndLimit() throws Exception { + ByteBuffer readBuffer = allocate(LEN); readBuffer.position(3); - readBuffer.limit(7); + final int smallLimit = 7; + readBuffer.limit(smallLimit); readBuffer.mark(); - FSDataInputStream hadoopStream = new FSDataInputStream(new MockByteBufferReadFullyInputStream()); + FSDataInputStream hadoopStream = stream(); - hadoopStream.readFully(0, readBuffer); - Assert.assertEquals(7, readBuffer.position()); - Assert.assertEquals(7, readBuffer.limit()); - - hadoopStream.readFully(0, readBuffer); - Assert.assertEquals(7, readBuffer.position()); - Assert.assertEquals(7, readBuffer.limit()); + assertBufferRead(hadoopStream, readBuffer, smallLimit, smallLimit); + assertBufferRead(hadoopStream, readBuffer, smallLimit, smallLimit); readBuffer.reset(); Assert.assertEquals("Buffer contents should match", ByteBuffer.wrap(TEST_ARRAY, 0, 4), readBuffer); - readBuffer.position(7); - readBuffer.limit(10); - hadoopStream.readFully(0, readBuffer); - Assert.assertEquals(10, readBuffer.position()); - Assert.assertEquals(10, readBuffer.limit()); + readBuffer.position(smallLimit); + readBuffer.limit(LEN); + assertBufferRead(hadoopStream, readBuffer, LEN, LEN); readBuffer.reset(); - Assert.assertEquals("Buffer contents should match", ByteBuffer.wrap(TEST_ARRAY, 0, 7), readBuffer); + Assert.assertEquals("Buffer contents should match", ByteBuffer.wrap(TEST_ARRAY, 0, smallLimit), readBuffer); + // assertBufferMatches(readBuffer, 0); } - @Test - public void testDirectReadFullySmallBuffer() throws Exception { - ByteBuffer readBuffer = ByteBuffer.allocateDirect(8); - - FSDataInputStream hadoopStream = new FSDataInputStream(new MockByteBufferReadFullyInputStream()); - - hadoopStream.readFully(0, readBuffer); - Assert.assertEquals(8, readBuffer.position()); - Assert.assertEquals(8, readBuffer.limit()); - - hadoopStream.readFully(0, readBuffer); - Assert.assertEquals(8, readBuffer.position()); - Assert.assertEquals(8, readBuffer.limit()); - - readBuffer.flip(); - Assert.assertEquals("Buffer contents should match", ByteBuffer.wrap(TEST_ARRAY, 0, 8), readBuffer); - } - - @Test - public void testDirectReadFullyLargeBuffer() throws Exception { - final ByteBuffer readBuffer = ByteBuffer.allocateDirect(20); - - FSDataInputStream hadoopStream = new FSDataInputStream(new MockByteBufferReadFullyInputStream()); - - final int position = 0; - assertThrowsEOFException(hadoopStream, position, readBuffer); - - // NOTE: This behavior differs from readFullyHeapBuffer because direct uses - // several read operations that will read up to the end of the input. This - // is a correct value because the bytes in the buffer are valid. This - // behavior can't be implemented for the heap buffer without using the read - // method instead of the readFully method on the underlying - // FSDataInputStream. - Assert.assertEquals(10, readBuffer.position()); - Assert.assertEquals(20, readBuffer.limit()); - } - - private static void assertThrowsEOFException( - final FSDataInputStream hadoopStream, final int position, final ByteBuffer readBuffer) { + private static void assertThrowsEOFException(final FSDataInputStream hadoopStream, final ByteBuffer readBuffer) { TestUtils.assertThrows("Should throw EOFException", EOFException.class, () -> { - hadoopStream.readFully(position, readBuffer); + performRead(hadoopStream, readBuffer); return null; }); } @Test - public void testDirectReadFullyJustRight() throws Exception { - ByteBuffer readBuffer = ByteBuffer.allocateDirect(10); - - FSDataInputStream hadoopStream = new FSDataInputStream(new MockByteBufferReadFullyInputStream()); - - // reads all of the bytes available without EOFException - hadoopStream.readFully(0, readBuffer); - Assert.assertEquals(10, readBuffer.position()); - Assert.assertEquals(10, readBuffer.limit()); - - // trying to read 0 more bytes doesn't result in EOFException - hadoopStream.readFully(0, readBuffer); - Assert.assertEquals(10, readBuffer.position()); - Assert.assertEquals(10, readBuffer.limit()); - - readBuffer.flip(); - Assert.assertEquals("Buffer contents should match", ByteBuffer.wrap(TEST_ARRAY), readBuffer); + public void testCreateStreamNoByteBufferPositionedReadable() { + final SeekableInputStream s = wrap(new FSDataInputStream(new MockHadoopInputStream())); + Assert.assertTrue("Wrong wrapper: " + s, s instanceof H1SeekableInputStream); } @Test - public void testDirectReadFullySmallReads() throws Exception { - ByteBuffer readBuffer = ByteBuffer.allocateDirect(10); - - FSDataInputStream hadoopStream = new FSDataInputStream(new MockByteBufferReadFullyInputStream()); - - hadoopStream.readFully(0, readBuffer); - Assert.assertEquals(10, readBuffer.position()); - Assert.assertEquals(10, readBuffer.limit()); - - hadoopStream.readFully(0, readBuffer); - Assert.assertEquals(10, readBuffer.position()); - Assert.assertEquals(10, readBuffer.limit()); + public void testDoubleWrapNoByteBufferPositionedReadable() { + final SeekableInputStream s = + wrap(new FSDataInputStream(new FSDataInputStream(new MockByteBufferReadFullyInputStream()))); - readBuffer.flip(); - Assert.assertEquals("Buffer contents should match", ByteBuffer.wrap(TEST_ARRAY), readBuffer); + Assert.assertTrue("Wrong wrapper: " + s, s instanceof H3ByteBufferInputStream); } @Test - public void testDirectReadFullyPosition() throws Exception { - ByteBuffer readBuffer = ByteBuffer.allocateDirect(10); - readBuffer.position(3); - readBuffer.mark(); - - FSDataInputStream hadoopStream = new FSDataInputStream(new MockByteBufferReadFullyInputStream()); - - hadoopStream.readFully(0, readBuffer); - Assert.assertEquals(10, readBuffer.position()); - Assert.assertEquals(10, readBuffer.limit()); - - hadoopStream.readFully(0, readBuffer); - Assert.assertEquals(10, readBuffer.position()); - Assert.assertEquals(10, readBuffer.limit()); - - readBuffer.reset(); - Assert.assertEquals("Buffer contents should match", ByteBuffer.wrap(TEST_ARRAY, 0, 7), readBuffer); + public void testCreateStreamWithByteBufferPositionedReadable() { + final SeekableInputStream s = wrap(stream()); + Assert.assertTrue("Wrong wrapper: " + s, s instanceof H3ByteBufferInputStream); } @Test - public void testDirectReadFullyLimit() throws Exception { - ByteBuffer readBuffer = ByteBuffer.allocateDirect(10); - readBuffer.limit(7); - - FSDataInputStream hadoopStream = new FSDataInputStream(new MockByteBufferReadFullyInputStream()); - - hadoopStream.readFully(0, readBuffer); - Assert.assertEquals(7, readBuffer.position()); - Assert.assertEquals(7, readBuffer.limit()); - - hadoopStream.readFully(0, readBuffer); - Assert.assertEquals(7, readBuffer.position()); - Assert.assertEquals(7, readBuffer.limit()); - - readBuffer.flip(); - Assert.assertEquals("Buffer contents should match", ByteBuffer.wrap(TEST_ARRAY, 0, 7), readBuffer); - - readBuffer.position(7); - readBuffer.limit(10); - hadoopStream.readFully(0, readBuffer); - Assert.assertEquals(10, readBuffer.position()); - Assert.assertEquals(10, readBuffer.limit()); - - readBuffer.flip(); - Assert.assertEquals("Buffer contents should match", ByteBuffer.wrap(TEST_ARRAY), readBuffer); + public void testDoubleWrapByteBufferPositionedReadable() { + final SeekableInputStream s = + wrap(new FSDataInputStream(new FSDataInputStream(new MockByteBufferReadFullyInputStream()))); + Assert.assertTrue("Wrong wrapper: " + s, s instanceof H3ByteBufferInputStream); } + /** + * The buffer reading stream is only selected if the stream declares support; + * implementing the interface is not enough. + */ @Test - public void testDirectReadFullyPositionAndLimit() throws Exception { - ByteBuffer readBuffer = ByteBuffer.allocateDirect(10); - readBuffer.position(3); - readBuffer.limit(7); - readBuffer.mark(); + public void testPositionedReadableNoCapability() { + class IncapableStream extends MockByteBufferReadFullyInputStream { + @Override + public boolean hasCapability(final String capability) { + return false; + } + } + final InputStream in = new IncapableStream(); + final SeekableInputStream s = wrap(new FSDataInputStream(in)); + Assert.assertTrue("Wrong wrapper: " + s, s instanceof H1SeekableInputStream); + } - FSDataInputStream hadoopStream = new FSDataInputStream(new MockByteBufferReadFullyInputStream()); + /** + * What happens if a stream declares support for the interface, + * but doesn't actually do it? + * The check is based on trust: if the stream lied -it doesn't work. + */ + @Test + public void testCapabilityWithoutInterface() { + class InconsistentStream extends MockHadoopInputStream + implements ByteBufferPositionedReadable, StreamCapabilities { + @Override + public boolean hasCapability(final String capability) { + return StringUtils.toLowerCase(capability).equals(StreamCapabilities.PREADBYTEBUFFER); + } - hadoopStream.readFully(0, readBuffer); - Assert.assertEquals(7, readBuffer.position()); - Assert.assertEquals(7, readBuffer.limit()); + @Override + public int read(final long position, final ByteBuffer buf) throws IOException { + return 0; + } - hadoopStream.readFully(0, readBuffer); - Assert.assertEquals(7, readBuffer.position()); - Assert.assertEquals(7, readBuffer.limit()); + @Override + public void readFully(final long position, final ByteBuffer buf) throws IOException {} + } - readBuffer.reset(); - Assert.assertEquals("Buffer contents should match", ByteBuffer.wrap(TEST_ARRAY, 0, 4), readBuffer); + final InputStream in = new InconsistentStream(); + final SeekableInputStream s = wrap(new FSDataInputStream(in)); + Assert.assertTrue("Wrong wrapper: " + s, s instanceof H3ByteBufferInputStream); + } - readBuffer.position(7); - readBuffer.limit(10); - hadoopStream.readFully(0, readBuffer); - Assert.assertEquals(10, readBuffer.position()); - Assert.assertEquals(10, readBuffer.limit()); + public static void assertBufferMatches(ByteBuffer readBuffer, int filePosition) { + readBuffer.flip(); + final int remaining = readBuffer.remaining(); + byte[] actual = getBytes(readBuffer); + byte[] expected = Arrays.copyOfRange(TEST_ARRAY, filePosition, remaining); + Assert.assertEquals( + "Buffer contents from data offset " + filePosition + " with length " + remaining, + stringify(expected), + stringify(actual)); + } - readBuffer.reset(); - Assert.assertEquals("Buffer contents should match", ByteBuffer.wrap(TEST_ARRAY, 0, 7), readBuffer); + /** + * Gets the bytes of the buffer. This sets the buffer remaining + * value to 0. + * @param buffer buffer. + * @return buffer contents as bytes. + */ + public static byte[] getBytes(ByteBuffer buffer) { + byte[] byteArray = new byte[buffer.remaining()]; + buffer.get(byteArray); + return byteArray; } - @Test - public void testCreateStreamNoByteBufferPositionedReadable() { - final SeekableInputStream s = wrap(new FSDataInputStream(new MockHadoopInputStream())); - Assert.assertTrue("Wrong wrapper: " + s, s instanceof H1SeekableInputStream); + /** + * Map a byte array to hex values. + * Of limited value once the byte value is greater than 15 + * as the string is hard to read. + * @param array source data + * @return string list. + */ + private static String stringify(byte[] array) { + // convert to offset of lower case A, to make those assertions meaningful + final int l = array.length; + StringBuilder chars = new StringBuilder(l); + for (byte b : array) { + chars.append(Integer.toHexString(b)); + } + return chars.toString(); } - @Test - public void testDoubleWrapNoByteBufferPositionedReadable() { - final SeekableInputStream s = - wrap(new FSDataInputStream(new FSDataInputStream(new MockByteBufferReadFullyInputStream()))); - Assert.assertTrue("Wrong wrapper: " + s, s instanceof H1SeekableInputStream); + /** + * Assert the current buffer position and limit. + * @param readBuffer buffer + * @param bufferPosition buffer position. + * @param limit buffer limit + */ + private static void assertPositionAndLimit(ByteBuffer readBuffer, int bufferPosition, int limit) { + Assert.assertEquals("Buffer Position", bufferPosition, readBuffer.position()); + Assert.assertEquals("Buffer Limit", limit, readBuffer.limit()); } - @Test - public void testCreateStreamWithByteBufferPositionedReadable() { - final SeekableInputStream s = wrap(new FSDataInputStream(new MockByteBufferReadFullyInputStream())); - Assert.assertTrue("Wrong wrapper: " + s, s instanceof H3ByteBufferInputStream); + /** + * Assert the stream position is at the expected value. + * @param hadoopStream stream + * @param pos expected position + * @throws IOException exception raised on getPos() + */ + private static void assertStreamAt(final FSDataInputStream hadoopStream, long pos) throws IOException { + Assert.assertEquals("Read position of stream", pos, hadoopStream.getPos()); } - @Test - public void testDoubleWrapByteBufferPositionedReadable() { - final SeekableInputStream s = - wrap(new FSDataInputStream(new FSDataInputStream(new MockByteBufferReadFullyInputStream()))); - Assert.assertTrue("Wrong wrapper: " + s, s instanceof H3ByteBufferInputStream); + /** + * Read a buffer at the current position through {@link H3ByteBufferInputStream#performRead(FSDataInputStream, ByteBuffer)}. + * Assert that the stream buffer position and limit are what is expected + * + * @param hadoopStream stream + * @param readBuffer buffer to fill + * @param bufferPosition final buffer position + * @param limit final buffer limit + * + * @throws IOException read failure + */ + private static void assertBufferRead( + final FSDataInputStream hadoopStream, + final ByteBuffer readBuffer, + final int bufferPosition, + final int limit) + throws IOException { + final long pos = hadoopStream.getPos(); + final int remaining = readBuffer.remaining(); + performRead(hadoopStream, readBuffer); + assertPositionAndLimit(readBuffer, bufferPosition, limit); + assertStreamAt(hadoopStream, pos + remaining); } /** * Input stream which claims to implement ByteBufferPositionedReadable */ - private static final class MockByteBufferReadFullyInputStream extends MockHadoopInputStream + private static class MockByteBufferReadFullyInputStream extends MockHadoopInputStream implements ByteBufferPositionedReadable, StreamCapabilities { @Override @@ -405,23 +400,28 @@ public void readFully(final long position, final ByteBuffer buf) throws IOExcept // validation rejectNegativePosition(position); final int toRead = buf.remaining(); - if (getPos() + length() > toRead) { - throw new EOFException("Read past " + length()); + if (toRead == 0) { + return; + } + if (toRead + position > length()) { + throw new EOFException("ByteBuffer.readFully(" + position + + ") buffer size: " + toRead + + " reads past file length: " + length()); } // return the subset of the data byte[] result = new byte[toRead]; - System.arraycopy(data(), 0, result, 0, toRead); + System.arraycopy(data(), (int) position, result, 0, toRead); buf.put(result); } + /** + * Declare support for ByteBufferPositionedReadable. + * This is the only way that an implementation wil be picked up. + * @param capability string to query the stream support for. + * @return + */ public boolean hasCapability(final String capability) { - switch (StringUtils.toLowerCase(capability)) { - case StreamCapabilities.READBYTEBUFFER: - case StreamCapabilities.PREADBYTEBUFFER: - return true; - default: - return false; - } + return StringUtils.toLowerCase(capability).equals(StreamCapabilities.PREADBYTEBUFFER); } } }