Skip to content

Commit

Permalink
Core: Add RangeReadable interface for limiting FileIO reads (apache#4608
Browse files Browse the repository at this point in the history
)
  • Loading branch information
danielcweeks authored Apr 25, 2022
1 parent c1b553d commit a655c80
Show file tree
Hide file tree
Showing 7 changed files with 417 additions and 1 deletion.
1 change: 1 addition & 0 deletions LICENSE
Original file line number Diff line number Diff line change
Expand Up @@ -228,6 +228,7 @@ This product includes code from Apache Parquet.
* DynMethods.java
* DynConstructors.java
* AssertHelpers.java
* IOUtil.java readFully and tests

Copyright: 2014-2017 The Apache Software Foundation.
Home page: https://parquet.apache.org/
Expand Down
85 changes: 85 additions & 0 deletions api/src/main/java/org/apache/iceberg/io/RangeReadable.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.iceberg.io;

import java.io.Closeable;
import java.io.IOException;

/**
* {@code RangeReadable} is an interface that allows for implementations
* of {@link InputFile} streams to perform positional, range-based reads, which
* are more efficient than unbounded reads in many cloud provider object stores.
*
* Thread safety is not a requirement of the interface and is left to the
* implementation.
*
* If the implementation is also a {@link SeekableInputStream}, the position
* of the stream is not required to be updated based on the positional reads
* performed by this interface. Usage of {@link SeekableInputStream} should
* always seek to the appropriate position for {@link java.io.InputStream}
* based reads.
*
*/
public interface RangeReadable extends Closeable {

/**
* Fill the provided buffer with the contents of the input source starting
* at {@code position} for the given {@code offset} and {@code length}.
*
* @param position start position of the read
* @param buffer target buffer to copy data
* @param offset offset in the buffer to copy the data
* @param length size of the read
*/
void readFully(long position, byte[] buffer, int offset, int length) throws IOException;

/**
* Fill the entire buffer with the contents of the input source starting
* at {@code position}.
*
* @param position start position of the read
* @param buffer target buffer to copy data
*/
default void readFully(long position, byte[] buffer) throws IOException {
readFully(position, buffer, 0, buffer.length);
}

/**
* Read the last {@code length} bytes from the file.
*
* @param buffer the buffer to write data into
* @param offset the offset in the buffer to start writing
* @param length the number of bytes from the end of the object to read
* @return the actual number of bytes read
* @throws IOException if an error occurs while reading
*/
int readTail(byte [] buffer, int offset, int length) throws IOException;

/**
* Read the full size of the buffer from the end of the file.
*
* @param buffer the buffer to write data into
* @return the actual number of bytes read
* @throws IOException if an error occurs while reading
*/
default int readTail(byte [] buffer) throws IOException {
return readTail(buffer, 0, buffer.length);
}
}
33 changes: 32 additions & 1 deletion aws/src/main/java/org/apache/iceberg/aws/s3/S3InputStream.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
import java.util.Arrays;
import org.apache.iceberg.aws.AwsProperties;
import org.apache.iceberg.io.FileIOMetricsContext;
import org.apache.iceberg.io.IOUtil;
import org.apache.iceberg.io.RangeReadable;
import org.apache.iceberg.io.SeekableInputStream;
import org.apache.iceberg.metrics.MetricsContext;
import org.apache.iceberg.metrics.MetricsContext.Counter;
Expand All @@ -37,7 +39,7 @@
import software.amazon.awssdk.services.s3.S3Client;
import software.amazon.awssdk.services.s3.model.GetObjectRequest;

class S3InputStream extends SeekableInputStream {
class S3InputStream extends SeekableInputStream implements RangeReadable {
private static final Logger LOG = LoggerFactory.getLogger(S3InputStream.class);

private final StackTraceElement[] createStack;
Expand Down Expand Up @@ -111,6 +113,35 @@ public int read(byte[] b, int off, int len) throws IOException {
return bytesRead;
}

@Override
public void readFully(long position, byte[] buffer, int offset, int length) throws IOException {
Preconditions.checkPositionIndexes(offset, offset + length, buffer.length);

String range = String.format("bytes=%s-%s", position, position + length - 1);

IOUtil.readFully(readRange(range), buffer, offset, length);
}

@Override
public int readTail(byte[] buffer, int offset, int length) throws IOException {
Preconditions.checkPositionIndexes(offset, offset + length, buffer.length);

String range = String.format("bytes=-%s", length);

return IOUtil.readRemaining(readRange(range), buffer, offset, length);
}

private InputStream readRange(String range) {
GetObjectRequest.Builder requestBuilder = GetObjectRequest.builder()
.bucket(location.bucket())
.key(location.key())
.range(range);

S3RequestUtil.configureEncryption(awsProperties, requestBuilder);

return s3.getObject(requestBuilder.build(), ResponseTransformer.toInputStream());
}

@Override
public void close() throws IOException {
super.close();
Expand Down
44 changes: 44 additions & 0 deletions aws/src/test/java/org/apache/iceberg/aws/s3/TestS3InputStream.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import java.util.Arrays;
import java.util.Random;
import org.apache.commons.io.IOUtils;
import org.apache.iceberg.io.RangeReadable;
import org.apache.iceberg.io.SeekableInputStream;
import org.junit.Before;
import org.junit.ClassRule;
Expand Down Expand Up @@ -105,6 +106,49 @@ private void readAndCheck(SeekableInputStream in, long rangeStart, int size, byt
assertArrayEquals(Arrays.copyOfRange(original, (int) rangeStart, (int) rangeEnd), actual);
}

@Test
public void testRangeRead() throws Exception {
S3URI uri = new S3URI("s3://bucket/path/to/range-read.dat");
int dataSize = 1024 * 1024 * 10;
byte[] expected = randomData(dataSize);
byte[] actual = new byte[dataSize];

long position;
int offset;
int length;

writeS3Data(uri, expected);

try (RangeReadable in = new S3InputStream(s3, uri)) {
// first 1k
position = 0;
offset = 0;
length = 1024;
readAndCheckRanges(in, expected, position, actual, offset, length);

// last 1k
position = dataSize - 1024;
offset = dataSize - 1024;
readAndCheckRanges(in, expected, position, actual, offset, length);

// middle 2k
position = dataSize / 2 - 1024;
offset = dataSize / 2 - 1024;
length = 1024 * 2;
readAndCheckRanges(in, expected, position, actual, offset, length);
}
}

private void readAndCheckRanges(
RangeReadable in, byte [] original, long position, byte [] buffer, int offset,
int length) throws IOException {
in.readFully(position, buffer, offset, length);

assertArrayEquals(
Arrays.copyOfRange(original, offset, offset + length),
Arrays.copyOfRange(buffer, offset, offset + length));
}

@Test
public void testClose() throws Exception {
S3URI uri = new S3URI("s3://bucket/path/to/closed.dat");
Expand Down
75 changes: 75 additions & 0 deletions core/src/main/java/org/apache/iceberg/io/IOUtil.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.iceberg.io;

import java.io.EOFException;
import java.io.IOException;
import java.io.InputStream;

public class IOUtil {
// not meant to be instantiated
private IOUtil() {
}

/**
* Reads into a buffer from a stream, making multiple read calls if necessary.
*
* @param stream an InputStream to read from
* @param bytes a buffer to write into
* @param offset starting offset in the buffer for the data
* @param length length of bytes to copy from the input stream to the buffer
* @throws EOFException if the end of the stream is reached before reading length bytes
* @throws IOException if there is an error while reading
*/
public static void readFully(InputStream stream, byte[] bytes, int offset, int length) throws IOException {
int bytesRead = readRemaining(stream, bytes, offset, length);
if (bytesRead < length) {
throw new EOFException(
"Reached the end of stream with " + (length - bytesRead) + " bytes left to read");
}
}

/**
* Reads into a buffer from a stream, making multiple read calls if necessary
* returning the number of bytes read until end of stream.
*
* @param stream an InputStream to read from
* @param bytes a buffer to write into
* @param offset starting offset in the buffer for the data
* @param length length of bytes to copy from the input stream to the buffer
* @return the number of bytes read
* @throws IOException if there is an error while reading
*/
public static int readRemaining(InputStream stream, byte[] bytes, int offset, int length) throws IOException {
int pos = offset;
int remaining = length;
while (remaining > 0) {
int bytesRead = stream.read(bytes, pos, remaining);
if (bytesRead < 0) {
break;
}

remaining -= bytesRead;
pos += bytesRead;
}

return length - remaining;
}
}
58 changes: 58 additions & 0 deletions core/src/test/java/org/apache/iceberg/io/MockInputStream.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.iceberg.io;

import java.io.ByteArrayInputStream;

class MockInputStream extends ByteArrayInputStream {

static final byte[] TEST_ARRAY = new byte[] { 1, 2, 3, 4, 5, 6, 7, 8, 9, 10 };

private int[] lengths;
private int current = 0;

MockInputStream(int... actualReadLengths) {
super(TEST_ARRAY);
this.lengths = actualReadLengths;
}

@Override
public synchronized int read(byte[] b, int off, int len) {
if (current < lengths.length) {
if (len <= lengths[current]) {
// when len == lengths[current], the next read will by 0 bytes
int bytesRead = super.read(b, off, len);
lengths[current] -= bytesRead;
return bytesRead;
} else {
int bytesRead = super.read(b, off, lengths[current]);
current += 1;
return bytesRead;
}
} else {
return super.read(b, off, len);
}
}

public long getPos() {
return this.pos;
}
}

Loading

0 comments on commit a655c80

Please sign in to comment.