Skip to content

Commit

Permalink
PARQUET-2171. Vector IO: review changes (comments, logging, spotless)
Browse files Browse the repository at this point in the history
- improve doc comments of new methods and parameterized tests.
- fix build to work after rebasing
- which includes spotless:apply
  • Loading branch information
steveloughran committed Jan 15, 2024
1 parent a064890 commit a8ef548
Show file tree
Hide file tree
Showing 14 changed files with 207 additions and 278 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,6 @@ public void setDataReadFuture(CompletableFuture<ByteBuffer> dataReadFuture) {

@Override
public String toString() {
return "range[" + this.offset + "," + (this.offset + (long)this.length) + ")";
return "range[" + this.offset + "," + (this.offset + (long) this.length) + ")";
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -111,8 +111,7 @@ public abstract class SeekableInputStream extends InputStream {
* Read a set of file ranges in a vectored manner.
* @throws UnsupportedOperationException if not available in this class/runtime.
*/
public void readVectored(List<ParquetFileRange> ranges,
IntFunction<ByteBuffer> allocate) throws IOException {
public void readVectored(List<ParquetFileRange> ranges, IntFunction<ByteBuffer> allocate) throws IOException {

throw new UnsupportedOperationException("Vectored IO is not supported for " + this);
}
Expand All @@ -123,5 +122,4 @@ public void readVectored(List<ParquetFileRange> ranges,
public boolean readVectoredAvailable() {
return false;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,6 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import static org.apache.parquet.Exceptions.throwIfInstance;

public class DynMethods {

private static final Logger LOG = LoggerFactory.getLogger(DynMethods.class);
Expand Down Expand Up @@ -315,8 +313,7 @@ public Builder ctorImpl(Class<?> targetClass, Class<?>... argClasses) {
.buildChecked();
} catch (NoSuchMethodException e) {
// not the right implementation
LOG.debug("failed to load constructor arity {} from class {}",
argClasses.length, targetClass, e);
LOG.debug("failed to load constructor arity {} from class {}", argClasses.length, targetClass, e);
}
return this;
}
Expand All @@ -333,9 +330,7 @@ public Builder ctorImpl(String className, Class<?>... argClasses) {
.buildChecked();
} catch (NoSuchMethodException e) {
// not the right implementation
LOG.debug("failed to load constructor arity {} from class {}",
argClasses.length, className, e);

LOG.debug("failed to load constructor arity {} from class {}", argClasses.length, className, e);
}
return this;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,6 @@
import org.apache.parquet.format.converter.ParquetMetadataConverter.MetadataFilter;
import org.apache.parquet.hadoop.ParquetMetricsCallback;

import static org.apache.parquet.hadoop.ParquetInputFormat.HADOOP_VECTORED_IO_ENABLED;

public class HadoopReadOptions extends ParquetReadOptions {
private final Configuration conf;

Expand Down Expand Up @@ -127,6 +125,7 @@ public ParquetReadOptions build() {
usePageChecksumVerification,
useBloomFilter,
useOffHeapDecryptBuffer,
useHadoopVectoredIo,
recordFilter,
metadataFilter,
codecFactory,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1145,11 +1145,9 @@ public ColumnChunkPageReadStore readFilteredRowGroup(int blockIndex, RowRanges r
* @throws IOException any IOE.
*/
private void readAllPartsVectoredOrNormal(List<ConsecutivePartList> allParts, ChunkListBuilder builder)
throws IOException {
boolean isVectoredIO = options.useHadoopVectoredIO()
&& f.readVectoredAvailable()
&& partsLengthValidForVectoredIO(allParts);
if (isVectoredIO) {
throws IOException {

if (shouldUseVectoredIO(allParts)) {
readVectored(allParts, builder);
} else {
for (ConsecutivePartList consecutiveChunks : allParts) {
Expand All @@ -1158,16 +1156,34 @@ private void readAllPartsVectoredOrNormal(List<ConsecutivePartList> allParts, Ch
}
}

/**
* Should the read use vectored IO?
* <p>
* This returns true if all necessary conditions are met:
* <ol>
* <li> The option is enabled</li>
* <li> The Hadoop version supports vectored IO</li>
* <li> Thfe part lengths are all valid for vectored IO</li>
* </ol>
* @param allParts all parts to read.
* @return true or false.
*/
private boolean shouldUseVectoredIO(final List<ConsecutivePartList> allParts) {
return options.useHadoopVectoredIO() && f.readVectoredAvailable() && arePartLengthsValidForVectoredIO(allParts);
}

/**
* Vectored IO doesn't support reading ranges of size greater than
* Integer.MAX_VALUE.
* @param allParts all parts to read.
* @return true or false.
*/
private boolean partsLengthValidForVectoredIO(List<ConsecutivePartList> allParts) {
for (ConsecutivePartList consecutivePart : allParts) {
private boolean arePartLengthsValidForVectoredIO(List<ConsecutivePartList> allParts) {
for (ConsecutivePartList consecutivePart : allParts) {
if (consecutivePart.length >= Integer.MAX_VALUE) {
LOG.debug("Part length {} greater than Integer.MAX_VALUE thus disabling vectored IO", consecutivePart.length);
LOG.debug(
"Part length {} greater than Integer.MAX_VALUE thus disabling vectored IO",
consecutivePart.length);
return false;
}
}
Expand All @@ -1176,26 +1192,36 @@ private boolean partsLengthValidForVectoredIO(List<ConsecutivePartList> allParts

/**
* Read all parts through vectored IO.
* <p>
* The API is available in recent hadoop builds for all implementations of PositionedReadable;
* the default implementation simply does a sequence of reads at different offsets.
* <p>
* If directly implemented by a Filesystem then it is likely to be a more efficient
* operation such as a scatter-gather read (native IO) or set of parallel
* GET requests against an object store.
* @param allParts all parts to be read.
* @param builder used to build chunk list to read the pages for the different columns.
* @throws IOException any IOE.
*/
private void readVectored(List<ConsecutivePartList> allParts,
ChunkListBuilder builder) throws IOException {
private void readVectored(List<ConsecutivePartList> allParts, ChunkListBuilder builder) throws IOException {

List<ParquetFileRange> ranges = new ArrayList<>(allParts.size());
long totalSize = 0;
for (ConsecutivePartList consecutiveChunks : allParts) {
Preconditions.checkArgument(consecutiveChunks.length < Integer.MAX_VALUE,
"Invalid length %s for vectored read operation. It must be less than max integer value.",
consecutiveChunks.length);
ranges.add(new ParquetFileRange(consecutiveChunks.offset, (int) consecutiveChunks.length));
}
LOG.debug("Doing vectored IO for ranges {}", ranges);
final long len = consecutiveChunks.length;
Preconditions.checkArgument(
len < Integer.MAX_VALUE,
"Invalid length %s for vectored read operation. It must be less than max integer value.",
len);
ranges.add(new ParquetFileRange(consecutiveChunks.offset, (int) len));
totalSize += len;
}
LOG.info("Reading {} bytes of data with vectored IO in {} ranges", totalSize, ranges.size());
ByteBufferAllocator allocator = options.getAllocator();
//blocking or asynchronous vectored read.
// Request a vectored read;
f.readVectored(ranges, allocator::allocate);
int k = 0;
for (ConsecutivePartList consecutivePart : allParts) {
for (ConsecutivePartList consecutivePart : allParts) {
ParquetFileRange currRange = ranges.get(k++);
consecutivePart.readFromVectoredRange(currRange, builder);
}
Expand Down Expand Up @@ -2093,22 +2119,26 @@ private void setReadMetrics(long startNs) {
}

/**
* Populate data in a parquet file range from vectored range.
* Populate data in a parquet file range from a vectored range; will block for up
* to {@link #HADOOP_VECTORED_READ_TIMEOUT_SECONDS} seconds.
* @param currRange range to populated.
* @param builder used to build chunk list to read the pages for the different columns.
* @throws IOException if there is an error while reading from the stream.
* @throws IOException if there is an error while reading from the stream, including a timeout.
*/
public void readFromVectoredRange(ParquetFileRange currRange,
ChunkListBuilder builder) throws IOException {
public void readFromVectoredRange(ParquetFileRange currRange, ChunkListBuilder builder) throws IOException {
ByteBuffer buffer;
final long timeoutSeconds = HADOOP_VECTORED_READ_TIMEOUT_SECONDS;
try {
LOG.debug("Waiting for vectored read to finish for range {} ", currRange);
buffer = BindingUtils.awaitFuture(currRange.getDataReadFuture(),
HADOOP_VECTORED_READ_TIMEOUT_SECONDS, TimeUnit.SECONDS);
LOG.debug(
"Waiting for vectored read to finish for range {} with timeout {} seconds",
currRange,
timeoutSeconds);
buffer = BindingUtils.awaitFuture(currRange.getDataReadFuture(), timeoutSeconds, TimeUnit.SECONDS);
// report in a counter the data we just scanned
BenchmarkCounter.incrementBytesRead(currRange.getLength());
} catch (TimeoutException e) {
String error = String.format("Timeout while fetching result for %s", currRange);
String error = String.format(
"Timeout while fetching result for %s with time limit %d seconds", currRange, timeoutSeconds);
LOG.error(error, e);
throw new IOException(error, e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,10 @@
import java.nio.ByteBuffer;
import java.util.List;
import java.util.function.IntFunction;
import org.apache.parquet.hadoop.util.vectorio.VectorIOBridge;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.parquet.hadoop.util.vectorio.VectorIOBridge;
import org.apache.parquet.io.DelegatingSeekableInputStream;
import org.apache.parquet.io.ParquetFileRange;

/**
* SeekableInputStream implementation for FSDataInputStream that implements
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,21 +26,17 @@
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

import org.apache.parquet.util.DynMethods;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.parquet.util.DynMethods;

/**
* Binding utils.
*/
public final class BindingUtils {
private static final Logger LOG = LoggerFactory.getLogger(BindingUtils.class);


private BindingUtils() {
}
private BindingUtils() {}

/**
* Given a future, evaluate it.
Expand All @@ -61,17 +57,13 @@ private BindingUtils() {
* @throws RuntimeException any nested RTE thrown
* @throws TimeoutException the future timed out.
*/
public static <T> T awaitFuture(final Future<T> future,
final long timeout,
final TimeUnit unit)
throws InterruptedIOException, IOException, RuntimeException,
TimeoutException {
public static <T> T awaitFuture(final Future<T> future, final long timeout, final TimeUnit unit)
throws InterruptedIOException, IOException, RuntimeException, TimeoutException {
try {
LOG.debug("Awaiting future");
return future.get(timeout, unit);
} catch (InterruptedException e) {
throw (InterruptedIOException) new InterruptedIOException(e.toString())
.initCause(e);
throw (InterruptedIOException) new InterruptedIOException(e.toString()).initCause(e);
} catch (ExecutionException e) {
return raiseInnerCause(e);
}
Expand All @@ -93,8 +85,7 @@ public static <T> T awaitFuture(final Future<T> future,
* any non-Runtime-Exception
* @throws RuntimeException if that is the inner cause.
*/
public static <T> T raiseInnerCause(final ExecutionException e)
throws IOException {
public static <T> T raiseInnerCause(final ExecutionException e) throws IOException {
throw unwrapInnerException(e);
}

Expand Down Expand Up @@ -155,16 +146,13 @@ public static IOException unwrapInnerException(final Throwable e) {
* @return the method or "unavailable"
*/
static <T> DynMethods.UnboundMethod loadInvocation(
Class<?> source,
Class<? extends T> returnType, String name,
Class<?>... parameterTypes) {
Class<?> source, Class<? extends T> returnType, String name, Class<?>... parameterTypes) {

if (source != null) {
final DynMethods.UnboundMethod m = new DynMethods
.Builder(name)
.impl(source, name, parameterTypes)
.orNoop()
.build();
final DynMethods.UnboundMethod m = new DynMethods.Builder(name)
.impl(source, name, parameterTypes)
.orNoop()
.build();
if (m.isNoop()) {
// this is a sign of a mismatch between this class's expected
// signatures and actual ones.
Expand All @@ -187,8 +175,7 @@ static <T> DynMethods.UnboundMethod loadInvocation(
* @return a no-op method.
*/
static DynMethods.UnboundMethod noop(final String name) {
return new DynMethods.Builder(name)
.orNoop().build();
return new DynMethods.Builder(name).orNoop().build();
}

/**
Expand Down
Loading

0 comments on commit a8ef548

Please sign in to comment.