From d1be36844972318cb533c62f42ea61196c3300ad Mon Sep 17 00:00:00 2001 From: Ravi Dutt Singh Date: Thu, 29 Aug 2024 20:12:26 +0530 Subject: [PATCH 1/5] Add fadvise:AUTO_RANDOM mode --- gcs/CHANGES.md | 1 + gcs/CONFIGURATION.md | 16 ++ .../GoogleHadoopFileSystemConfiguration.java | 9 + ...ogleHadoopFileSystemConfigurationTest.java | 1 + .../gcsio/AdaptiveFileAccessPattern.java | 181 ++++++++++++++++++ .../GoogleCloudStorageClientReadChannel.java | 71 +++---- .../gcsio/GoogleCloudStorageReadChannel.java | 82 ++++---- .../gcsio/GoogleCloudStorageReadOptions.java | 17 +- .../gcsio/AdaptiveFileAccessPatternTest.java | 152 +++++++++++++++ ...ogleCloudStorageClientReadChannelTest.java | 44 ++++- .../GoogleCloudStorageReadChannelTest.java | 74 ++++++- 11 files changed, 554 insertions(+), 94 deletions(-) create mode 100644 gcsio/src/main/java/com/google/cloud/hadoop/gcsio/AdaptiveFileAccessPattern.java create mode 100644 gcsio/src/test/java/com/google/cloud/hadoop/gcsio/AdaptiveFileAccessPatternTest.java diff --git a/gcs/CHANGES.md b/gcs/CHANGES.md index 6c91cc54f2..31911ecde8 100644 --- a/gcs/CHANGES.md +++ b/gcs/CHANGES.md @@ -1,6 +1,7 @@ # Release Notes ## Next +1. Add AUTO_RANDOM as new fadvise mode. ## 2.2.25 - 2024-08-01 1. PR #1227 - Avoid registering subscriber class multiple times diff --git a/gcs/CONFIGURATION.md b/gcs/CONFIGURATION.md index 4829c30d2d..d8b2a2b814 100644 --- a/gcs/CONFIGURATION.md +++ b/gcs/CONFIGURATION.md @@ -503,6 +503,22 @@ permissions (not authorized) to execute these requests. streaming requests as soon as first backward read or forward read for more than `fs.gs.inputstream.inplace.seek.limit` bytes was detected. + * `AUTO_RANDOM` - It is complementing `AUTO` mode which uses sequential + mode to start with and adapts to bounded range requests. `AUTO_RANDOM` + mode uses bounded channel initially and adapts to sequential requests if + consecutive requests are within `fs.gs.inputstream.min.range.request.size`. + gzip-encode object will bypass this adoption, it will always be a + streaming(unbounded) channel. This helps in cases where egress limits is + getting breached for customer because `AUTO` mode will always lead to + one unbounded channel for a file. `AUTO_RANDOM` will avoid such unwanted + unbounded channels. + +* `fs.gs.fadvise.request.track.count` (default: `3`) + + Self adaptive fadvise mode uses distance between the served requests to + decide the access pattern. This property controls how many such requests + need to be tracked. + * `fs.gs.inputstream.inplace.seek.limit` (default: `8388608`) If forward seeks are within this many bytes of the current position, seeks diff --git a/gcs/src/main/java/com/google/cloud/hadoop/fs/gcs/GoogleHadoopFileSystemConfiguration.java b/gcs/src/main/java/com/google/cloud/hadoop/fs/gcs/GoogleHadoopFileSystemConfiguration.java index 8d43be143d..07b8862275 100644 --- a/gcs/src/main/java/com/google/cloud/hadoop/fs/gcs/GoogleHadoopFileSystemConfiguration.java +++ b/gcs/src/main/java/com/google/cloud/hadoop/fs/gcs/GoogleHadoopFileSystemConfiguration.java @@ -213,6 +213,13 @@ public class GoogleHadoopFileSystemConfiguration { public static final HadoopConfigurationProperty GCS_BATCH_THREADS = new HadoopConfigurationProperty<>("fs.gs.batch.threads", 15); + /** + * Configuration key for number of request to track for adapting the access pattern i.e. fadvise: + * AUTO & AUTO_RANDOM. + */ + public static final HadoopConfigurationProperty GCS_FADVISE_REQUEST_TRACK_COUNT = + new HadoopConfigurationProperty<>("fs.gs.fadvise.request.track.count", 3); + /** * Configuration key for enabling the use of Rewrite requests for copy operations. Rewrite request * has the same effect as Copy request, but it can handle moving large objects that may @@ -673,6 +680,8 @@ private static GoogleCloudStorageReadOptions getReadChannelOptions(Configuration .setTraceLogTimeThreshold(GCS_TRACE_LOG_TIME_THRESHOLD_MS.get(config, config::getLong)) .setTraceLogExcludeProperties( ImmutableSet.copyOf(GCS_TRACE_LOG_EXCLUDE_PROPERTIES.getStringCollection(config))) + .setBlockSize(BLOCK_SIZE.get(config, config::getLong)) + .setFadviseRequestTrackCount(GCS_FADVISE_REQUEST_TRACK_COUNT.get(config, config::getInt)) .build(); } diff --git a/gcs/src/test/java/com/google/cloud/hadoop/fs/gcs/GoogleHadoopFileSystemConfigurationTest.java b/gcs/src/test/java/com/google/cloud/hadoop/fs/gcs/GoogleHadoopFileSystemConfigurationTest.java index 0cc82530ec..350bb4024e 100644 --- a/gcs/src/test/java/com/google/cloud/hadoop/fs/gcs/GoogleHadoopFileSystemConfigurationTest.java +++ b/gcs/src/test/java/com/google/cloud/hadoop/fs/gcs/GoogleHadoopFileSystemConfigurationTest.java @@ -154,6 +154,7 @@ public class GoogleHadoopFileSystemConfigurationTest { "fs.gs.write.parallel.composite.upload.part.file.cleanup.type", PartFileCleanupType.ALWAYS); put("fs.gs.write.parallel.composite.upload.part.file.name.prefix", ""); + put("fs.gs.fadvise.request.track.count", 3); } }; diff --git a/gcsio/src/main/java/com/google/cloud/hadoop/gcsio/AdaptiveFileAccessPattern.java b/gcsio/src/main/java/com/google/cloud/hadoop/gcsio/AdaptiveFileAccessPattern.java new file mode 100644 index 0000000000..75cff237ef --- /dev/null +++ b/gcsio/src/main/java/com/google/cloud/hadoop/gcsio/AdaptiveFileAccessPattern.java @@ -0,0 +1,181 @@ +/* + * Copyright 2024 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.google.cloud.hadoop.gcsio; + +import com.google.cloud.hadoop.gcsio.GoogleCloudStorageReadOptions.Fadvise; +import com.google.common.flogger.GoogleLogger; +import java.io.Closeable; +import java.io.IOException; +import java.util.LinkedList; +import java.util.ListIterator; + +class AdaptiveFileAccessPattern implements Closeable { + + private static final GoogleLogger logger = GoogleLogger.forEnclosingClass(); + private final StorageResourceId resourceId; + private final GoogleCloudStorageReadOptions readOptions; + private boolean isPatternOverriden = false; + private boolean randomAccess; + private long lastServedIndex = -1; + // Keeps track of distance between consecutive requests + private BoundedList consecutiveRequestsDistances; + + @Override + public void close() throws IOException { + if (consecutiveRequestsDistances != null) { + consecutiveRequestsDistances = null; + } + } + + class BoundedList extends LinkedList { + private int limit; + + public BoundedList(int limit) { + this.limit = limit; + } + + @Override + public boolean add(E o) { + super.add(o); + while (size() > limit) { + super.removeFirst(); + } + return true; + } + } + + public AdaptiveFileAccessPattern( + StorageResourceId resourceId, GoogleCloudStorageReadOptions readOptions) { + this.resourceId = resourceId; + this.readOptions = readOptions; + this.randomAccess = + readOptions.getFadvise() == Fadvise.AUTO_RANDOM + || readOptions.getFadvise() == Fadvise.RANDOM; + if (readOptions.getFadvise() == Fadvise.AUTO_RANDOM) { + consecutiveRequestsDistances = new BoundedList<>(readOptions.getFadviseRequestTrackCount()); + } + } + + public void updateLastServedIndex(long position) { + this.lastServedIndex = position; + } + + public boolean isRandomAccessPattern() { + return randomAccess; + } + + public void updateAccessPattern(long currentPosition) { + if (isPatternOverriden) { + logger.atFiner().log( + "Will bypass computing access pattern as it's overriden for resource :%s", resourceId); + return; + } + if (readOptions.getFadvise() == Fadvise.AUTO_RANDOM) { + if (isSequentialAccessPattern(currentPosition)) { + unsetRandomAccess(); + } + } else if (readOptions.getFadvise() == Fadvise.AUTO) { + if (isRandomAccessPattern(currentPosition)) { + setRandomAccess(); + } + } + } + + /** + * This provides a way to override the access pattern, once overridden it will not be recomputed + * for adaptive fadvise types. + * + * @param pattern, true, to override with random access else false + */ + public void overrideAccessPattern(boolean pattern) { + this.isPatternOverriden = true; + this.randomAccess = pattern; + logger.atInfo().log( + "Overriding the random access pattern to %s for fadvise:%s for resource: %s ", + pattern, readOptions.getFadvise(), resourceId); + } + + private boolean isSequentialAccessPattern(long currentPosition) { + if (lastServedIndex != -1 && consecutiveRequestsDistances != null) { + consecutiveRequestsDistances.add(currentPosition - lastServedIndex); + } + + if (!shouldDetectSequentialAccess()) { + return false; + } + + if (consecutiveRequestsDistances.size() < readOptions.getFadviseRequestTrackCount()) { + return false; + } + + ListIterator iterator = consecutiveRequestsDistances.listIterator(); + while (iterator.hasNext()) { + Long distance = iterator.next(); + if (distance < 0 || distance > readOptions.DEFAULT_INPLACE_SEEK_LIMIT) { + return false; + } + } + logger.atInfo().log( + "Detected %d consecutive read request within distance threshold %d with fadvise: %s switching to sequential IO for '%s'", + consecutiveRequestsDistances.size(), + readOptions.getInplaceSeekLimit(), + readOptions.getFadvise(), + resourceId); + return true; + } + + private boolean isRandomAccessPattern(long currentPosition) { + if (!shouldDetectRandomAccess()) { + return false; + } + if (lastServedIndex == -1) { + return false; + } + + if (currentPosition < lastServedIndex) { + logger.atFine().log( + "Detected backward read from %s to %s position, switching to random IO for '%s'", + lastServedIndex, currentPosition, resourceId); + return true; + } + if (lastServedIndex >= 0 + && lastServedIndex + readOptions.getInplaceSeekLimit() < currentPosition) { + logger.atFine().log( + "Detected forward read from %s to %s position over %s threshold," + + " switching to random IO for '%s'", + lastServedIndex, currentPosition, readOptions.getInplaceSeekLimit(), resourceId); + return true; + } + return false; + } + + private boolean shouldDetectSequentialAccess() { + return randomAccess && readOptions.getFadvise() == Fadvise.AUTO_RANDOM; + } + + private boolean shouldDetectRandomAccess() { + return !randomAccess && readOptions.getFadvise() == Fadvise.AUTO; + } + + private void setRandomAccess() { + randomAccess = true; + } + + private void unsetRandomAccess() { + randomAccess = false; + } +} diff --git a/gcsio/src/main/java/com/google/cloud/hadoop/gcsio/GoogleCloudStorageClientReadChannel.java b/gcsio/src/main/java/com/google/cloud/hadoop/gcsio/GoogleCloudStorageClientReadChannel.java index 8928e9c301..2739b8b848 100644 --- a/gcsio/src/main/java/com/google/cloud/hadoop/gcsio/GoogleCloudStorageClientReadChannel.java +++ b/gcsio/src/main/java/com/google/cloud/hadoop/gcsio/GoogleCloudStorageClientReadChannel.java @@ -35,6 +35,7 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.flogger.GoogleLogger; import java.io.ByteArrayInputStream; +import java.io.Closeable; import java.io.EOFException; import java.io.FileNotFoundException; import java.io.IOException; @@ -174,7 +175,7 @@ public void close() throws IOException { if (open) { try { logger.atFiner().log("Closing channel for '%s'", resourceId); - contentReadChannel.closeContentChannel(); + contentReadChannel.close(); } catch (Exception e) { GoogleCloudStorageEventBus.postOnException(); throw new IOException( @@ -194,7 +195,7 @@ public void close() throws IOException { * which helps in deciding the boundaries of content channel being opened and also caching the * footer of an object. */ - private class ContentReadChannel { + private class ContentReadChannel implements Closeable { // Size of buffer to allocate for skipping bytes in-place when performing in-place seeks. private static final int SKIP_BUFFER_SIZE = 8192; @@ -210,14 +211,17 @@ private class ContentReadChannel { // in-place seeks. private byte[] skipBuffer = null; private ReadableByteChannel byteChannel = null; - private boolean randomAccess; + private AdaptiveFileAccessPattern fileAccessPattern; public ContentReadChannel( GoogleCloudStorageReadOptions readOptions, StorageResourceId resourceId) { this.blobId = BlobId.of( resourceId.getBucketName(), resourceId.getObjectName(), resourceId.getGenerationId()); - this.randomAccess = readOptions.getFadvise() == Fadvise.RANDOM; + this.fileAccessPattern = new AdaptiveFileAccessPattern(resourceId, readOptions); + if (gzipEncoded) { + fileAccessPattern.overrideAccessPattern(false); + } } public int readContent(ByteBuffer dst) throws IOException { @@ -304,6 +308,7 @@ public int readContent(ByteBuffer dst) throws IOException { int partialBytes = partiallyReadBytes(remainingBeforeRead, dst); totalBytesRead += partialBytes; currentPosition += partialBytes; + contentChannelCurrentPosition += partialBytes; logger.atFine().log( "Closing contentChannel after %s exception for '%s'.", e.getMessage(), resourceId); closeContentChannel(); @@ -321,14 +326,6 @@ private int partiallyReadBytes(int remainingBeforeRead, ByteBuffer dst) { return partialReadBytes; } - private boolean shouldDetectRandomAccess() { - return !gzipEncoded && !randomAccess && readOptions.getFadvise() == Fadvise.AUTO; - } - - private void setRandomAccess() { - randomAccess = true; - } - private ReadableByteChannel openByteChannel(long bytesToRead) throws IOException { checkArgument( bytesToRead > 0, "bytesToRead should be greater than 0, but was %s", bytesToRead); @@ -341,6 +338,9 @@ private ReadableByteChannel openByteChannel(long bytesToRead) throws IOException return serveFooterContent(); } + // Should be updated only if content is not served from cached footer + fileAccessPattern.updateAccessPattern(currentPosition); + setChannelBoundaries(bytesToRead); ReadableByteChannel readableByteChannel = @@ -426,12 +426,15 @@ private long getRangeRequestEnd(long startPosition, long bytesToRead) { if (gzipEncoded) { return objectSize; } - long endPosition = objectSize; - if (randomAccess) { + if (fileAccessPattern.isRandomAccessPattern()) { // opening a channel for whole object doesn't make sense as anyhow it will not be utilized // for further reads. endPosition = startPosition + max(bytesToRead, readOptions.getMinRangeRequestSize()); + } else { + if (readOptions.getFadvise() == Fadvise.AUTO_RANDOM) { + endPosition = min(startPosition + readOptions.getBlockSize(), objectSize); + } } if (footerContent != null) { // If footer is cached open just till footerStart. @@ -451,6 +454,7 @@ public void closeContentChannel() { "Got an exception on contentChannel.close() for '%s'; ignoring it.", resourceId); } finally { byteChannel = null; + fileAccessPattern.updateLastServedIndex(contentChannelCurrentPosition); reset(); } } @@ -521,39 +525,12 @@ private void performPendingSeeks() { if (isInRangeSeek()) { skipInPlace(); } else { - if (isRandomAccessPattern()) { - setRandomAccess(); - } // close existing contentChannel as requested bytes can't be served from current // contentChannel; closeContentChannel(); } } - private boolean isRandomAccessPattern() { - if (!shouldDetectRandomAccess()) { - return false; - } - if (currentPosition < contentChannelCurrentPosition) { - logger.atFine().log( - "Detected backward read from %s to %s position, switching to random IO for '%s'", - contentChannelCurrentPosition, currentPosition, resourceId); - return true; - } - if (contentChannelCurrentPosition >= 0 - && contentChannelCurrentPosition + readOptions.getInplaceSeekLimit() < currentPosition) { - logger.atFine().log( - "Detected forward read from %s to %s position over %s threshold," - + " switching to random IO for '%s'", - contentChannelCurrentPosition, - currentPosition, - readOptions.getInplaceSeekLimit(), - resourceId); - return true; - } - return false; - } - private ReadableByteChannel getStorageReadChannel(long seek, long limit) throws IOException { ReadChannel readChannel = storage.reader(blobId, generateReadOptions(blobId)); try { @@ -591,11 +568,21 @@ private BlobSourceOption[] generateReadOptions(BlobId blobId) { private boolean isFooterRead() { return objectSize - currentPosition <= readOptions.getMinRangeRequestSize(); } + + @Override + public void close() throws IOException { + try { + fileAccessPattern.close(); + closeContentChannel(); + } finally { + fileAccessPattern = null; + } + } } @VisibleForTesting boolean randomAccessStatus() { - return contentReadChannel.randomAccess; + return contentReadChannel.fileAccessPattern.isRandomAccessPattern(); } private static void validate(GoogleCloudStorageItemInfo itemInfo) throws IOException { diff --git a/gcsio/src/main/java/com/google/cloud/hadoop/gcsio/GoogleCloudStorageReadChannel.java b/gcsio/src/main/java/com/google/cloud/hadoop/gcsio/GoogleCloudStorageReadChannel.java index 0d397d0238..c3b3fd6cd2 100644 --- a/gcsio/src/main/java/com/google/cloud/hadoop/gcsio/GoogleCloudStorageReadChannel.java +++ b/gcsio/src/main/java/com/google/cloud/hadoop/gcsio/GoogleCloudStorageReadChannel.java @@ -103,9 +103,6 @@ public class GoogleCloudStorageReadChannel implements SeekableByteChannel { // Size of the contentChannel. private long contentChannelEnd = -1; - // Whether to use bounded range requests or streaming requests. - @VisibleForTesting boolean randomAccess; - // Maximum number of automatic retries when reading from the underlying channel without making // progress; each time at least one byte is successfully read, the counter of attempted retries // is reset. @@ -122,6 +119,7 @@ public class GoogleCloudStorageReadChannel implements SeekableByteChannel { // Fine-grained options. private final GoogleCloudStorageReadOptions readOptions; + private final AdaptiveFileAccessPattern fileAccessPattern; // Sleeper used for waiting between retries. private Sleeper sleeper = Sleeper.DEFAULT; @@ -172,6 +170,7 @@ public GoogleCloudStorageReadChannel( this.errorExtractor = errorExtractor; this.readOptions = readOptions; this.resourceId = resourceId; + this.fileAccessPattern = new AdaptiveFileAccessPattern(resourceId, readOptions); // Initialize metadata if available. GoogleCloudStorageItemInfo info = getInitialMetadata(); @@ -499,6 +498,7 @@ protected void closeContentChannel() { "Got an exception on contentChannel.close() for '%s'; ignoring it.", resourceId); } finally { contentChannel = null; + fileAccessPattern.updateLastServedIndex(contentChannelPosition); resetContentChannel(); } } @@ -560,35 +560,6 @@ public SeekableByteChannel position(long newPosition) throws IOException { return this; } - private boolean isRandomAccessPattern(long oldPosition) { - if (!shouldDetectRandomAccess()) { - return false; - } - if (currentPosition < oldPosition) { - logger.atFine().log( - "Detected backward read from %s to %s position, switching to random IO for '%s'", - oldPosition, currentPosition, resourceId); - return true; - } - if (oldPosition >= 0 && oldPosition + readOptions.getInplaceSeekLimit() < currentPosition) { - logger.atFine().log( - "Detected forward read from %s to %s position over %s threshold," - + " switching to random IO for '%s'", - oldPosition, currentPosition, readOptions.getInplaceSeekLimit(), resourceId); - return true; - } - return false; - } - - private boolean shouldDetectRandomAccess() { - return !gzipEncoded && !randomAccess && readOptions.getFadvise() == Fadvise.AUTO; - } - - private void setRandomAccess() { - randomAccess = true; - checkEncodingAndAccess(); - } - private void skipInPlace(long seekDistance) { if (skipBuffer == null) { skipBuffer = new byte[SKIP_BUFFER_SIZE]; @@ -657,7 +628,7 @@ protected void setSize(long size) { private void checkEncodingAndAccess() { checkState( - !(gzipEncoded && randomAccess), + !(gzipEncoded && fileAccessPattern.isRandomAccessPattern()), "gzipEncoded and randomAccess should not be true at the same time for '%s'", resourceId); } @@ -722,19 +693,23 @@ void performLazySeek(long bytesToRead) throws IOException { } if (contentChannel == null) { - if (isRandomAccessPattern(oldPosition)) { - setRandomAccess(); - } openContentChannel(bytesToRead); } } private void openContentChannel(long bytesToRead) throws IOException { checkState(contentChannel == null, "contentChannel should be null, before opening new"); - InputStream objectContentStream = - footerContent != null && currentPosition >= size - footerContent.length - ? openFooterStream() - : openStream(bytesToRead); + InputStream objectContentStream; + if (footerContent != null && currentPosition >= size - footerContent.length) { + objectContentStream = openFooterStream(); + } else { + // only update access pattern if not getting served from cached footer + if (!gzipEncoded) { + fileAccessPattern.updateAccessPattern(currentPosition); + } + objectContentStream = openStream(bytesToRead); + } + contentChannel = Channels.newChannel(objectContentStream); checkState( contentChannelPosition == currentPosition, @@ -789,13 +764,15 @@ protected void initMetadata(@Nullable String encoding, long sizeFromMetadata, lo generation, resourceId); gzipEncoded = nullToEmpty(encoding).contains(GZIP_ENCODING); + if (gzipEncoded) { + fileAccessPattern.overrideAccessPattern(false); + } if (gzipEncoded && !readOptions.getSupportGzipEncoding()) { GoogleCloudStorageEventBus.postOnException(); throw new IOException( "Cannot read GZIP encoded files - content encoding support is disabled."); } size = gzipEncoded ? Long.MAX_VALUE : sizeFromMetadata; - randomAccess = !gzipEncoded && readOptions.getFadvise() == Fadvise.RANDOM; checkEncodingAndAccess(); if (resourceId.hasGenerationId()) { @@ -813,8 +790,8 @@ protected void initMetadata(@Nullable String encoding, long sizeFromMetadata, lo metadataInitialized = true; logger.atFiner().log( - "Initialized metadata (gzipEncoded=%s, size=%s, randomAccess=%s, generation=%s) for '%s'", - gzipEncoded, size, randomAccess, resourceId.getGenerationId(), resourceId); + "Initialized metadata (gzipEncoded=%s, size=%s, generation=%s) for '%s'", + gzipEncoded, size, resourceId.getGenerationId(), resourceId); } private void cacheFooter(HttpResponse response) throws IOException { @@ -887,7 +864,8 @@ protected InputStream openStream(long bytesToRead) throws IOException { if (!metadataInitialized) { contentChannelPosition = getContentChannelPositionForFirstRead(bytesToRead); rangeHeader = "bytes=" + contentChannelPosition + "-"; - if (readOptions.getFadvise() == Fadvise.RANDOM) { + if (readOptions.getFadvise() == Fadvise.RANDOM + || readOptions.getFadvise() == Fadvise.AUTO_RANDOM) { long maxBytesToRead = Math.max(readOptions.getMinRangeRequestSize(), bytesToRead); rangeHeader += (contentChannelPosition + maxBytesToRead - 1); } @@ -907,10 +885,14 @@ protected InputStream openStream(long bytesToRead) throws IOException { // Set rangeSize to the size of the file reminder from currentPosition. long rangeSize = size - contentChannelPosition; - if (randomAccess) { + if (fileAccessPattern.isRandomAccessPattern()) { long randomRangeSize = Math.max(bytesToRead, readOptions.getMinRangeRequestSize()); // Limit rangeSize to the randomRangeSize. rangeSize = Math.min(randomRangeSize, rangeSize); + } else { + if (readOptions.getFadvise() == Fadvise.AUTO_RANDOM) { + rangeSize = Math.min(rangeSize, readOptions.getBlockSize()); + } } contentChannelEnd = contentChannelPosition + rangeSize; @@ -933,7 +915,10 @@ protected InputStream openStream(long bytesToRead) throws IOException { resourceId); rangeHeader = "bytes=" + contentChannelPosition + "-"; - if (randomAccess || contentChannelEnd != size) { + if (fileAccessPattern.isRandomAccessPattern() + || (!fileAccessPattern.isRandomAccessPattern() + && readOptions.getFadvise() == Fadvise.AUTO_RANDOM) + || contentChannelEnd != size) { rangeHeader += (contentChannelEnd - 1); } } @@ -1187,4 +1172,9 @@ private void checkIOPrecondition(boolean precondition, String errorMessage) thro throw new IOException(errorMessage); } } + + @VisibleForTesting + boolean randomAccessStatus() { + return fileAccessPattern.isRandomAccessPattern(); + } } diff --git a/gcsio/src/main/java/com/google/cloud/hadoop/gcsio/GoogleCloudStorageReadOptions.java b/gcsio/src/main/java/com/google/cloud/hadoop/gcsio/GoogleCloudStorageReadOptions.java index 11e35c5889..07e63ad79d 100644 --- a/gcsio/src/main/java/com/google/cloud/hadoop/gcsio/GoogleCloudStorageReadOptions.java +++ b/gcsio/src/main/java/com/google/cloud/hadoop/gcsio/GoogleCloudStorageReadOptions.java @@ -32,7 +32,8 @@ public abstract class GoogleCloudStorageReadOptions { public enum Fadvise { AUTO, RANDOM, - SEQUENTIAL + SEQUENTIAL, + AUTO_RANDOM } public static final int DEFAULT_BACKOFF_INITIAL_INTERVAL_MILLIS = 200; @@ -43,6 +44,7 @@ public enum Fadvise { public static final boolean DEFAULT_FAST_FAIL_ON_NOT_FOUND = true; public static final boolean DEFAULT_SUPPORT_GZIP_ENCODING = true; public static final long DEFAULT_INPLACE_SEEK_LIMIT = 8 * 1024 * 1024; + public static final long BLOCK_SIZE = 64 * 1024 * 1024; public static final Fadvise DEFAULT_FADVISE = Fadvise.SEQUENTIAL; public static final int DEFAULT_MIN_RANGE_REQUEST_SIZE = 2 * 1024 * 1024; public static final boolean GRPC_CHECKSUMS_ENABLED_DEFAULT = false; @@ -75,6 +77,8 @@ public static Builder builder() { .setGrpcReadMessageTimeoutMillis(DEFAULT_GRPC_READ_MESSAGE_TIMEOUT_MILLIS) .setTraceLogEnabled(TRACE_LOGGING_ENABLED_DEFAULT) .setTraceLogTimeThreshold(0L) + .setBlockSize(BLOCK_SIZE) + .setFadviseRequestTrackCount(3) .setTraceLogExcludeProperties(ImmutableSet.of()); } @@ -133,6 +137,10 @@ public static Builder builder() { /** See {@link Builder#setTraceLogTimeThreshold(long)} . */ public abstract long getTraceLogTimeThreshold(); + public abstract long getBlockSize(); + + public abstract int getFadviseRequestTrackCount(); + /** Mutable builder for GoogleCloudStorageReadOptions. */ @AutoValue.Builder public abstract static class Builder { @@ -200,6 +208,9 @@ public abstract static class Builder { *
    *
  • {@code AUTO} - automatically switches to {@code RANDOM} mode if backward read or * forward read for more than {@link #setInplaceSeekLimit} bytes is detected. + *
  • {@code AUTO_RANDOM} - Uses {@code RANDOM} to start with and automatically switches to + * {@code SEQUENTIAL} mode if more than 2 requests fall within {@link + * #setInplaceSeekLimit} limits. *
  • {@code RANDOM} - sends HTTP requests with {@code Range} header set to greater of * provided reade buffer by user. *
  • {@code SEQUENTIAL} - sends HTTP requests with unbounded {@code Range} header. @@ -242,6 +253,10 @@ public abstract static class Builder { /** Sets the property for gRPC read message timeout in milliseconds. */ public abstract Builder setGrpcReadMessageTimeoutMillis(long grpcMessageTimeout); + public abstract Builder setBlockSize(long blockSize); + + public abstract Builder setFadviseRequestTrackCount(int requestTrackCount); + abstract GoogleCloudStorageReadOptions autoBuild(); public GoogleCloudStorageReadOptions build() { diff --git a/gcsio/src/test/java/com/google/cloud/hadoop/gcsio/AdaptiveFileAccessPatternTest.java b/gcsio/src/test/java/com/google/cloud/hadoop/gcsio/AdaptiveFileAccessPatternTest.java new file mode 100644 index 0000000000..a81bad3e1e --- /dev/null +++ b/gcsio/src/test/java/com/google/cloud/hadoop/gcsio/AdaptiveFileAccessPatternTest.java @@ -0,0 +1,152 @@ +/* + * Copyright 2024 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.google.cloud.hadoop.gcsio; + +import static com.google.common.truth.Truth.assertThat; + +import com.google.cloud.hadoop.gcsio.GoogleCloudStorageReadOptions.Fadvise; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +@RunWith(JUnit4.class) +public class AdaptiveFileAccessPatternTest { + + private static final String BUCKET_NAME = "bucket-name"; + private static final String OBJECT_NAME = "object-name"; + private static final StorageResourceId RESOURCE_ID = + new StorageResourceId(BUCKET_NAME, OBJECT_NAME); + + @Test + public void defaultAccessPatterns() { + GoogleCloudStorageReadOptions readOptions = + GoogleCloudStorageReadOptions.DEFAULT.toBuilder().setFadvise(Fadvise.SEQUENTIAL).build(); + + AdaptiveFileAccessPattern fileAccessPattern = + new AdaptiveFileAccessPattern(RESOURCE_ID, readOptions); + + assertThat(fileAccessPattern.isRandomAccessPattern()).isFalse(); + + readOptions = + GoogleCloudStorageReadOptions.DEFAULT.toBuilder().setFadvise(Fadvise.RANDOM).build(); + fileAccessPattern = new AdaptiveFileAccessPattern(RESOURCE_ID, readOptions); + assertThat(fileAccessPattern.isRandomAccessPattern()).isTrue(); + + readOptions = + GoogleCloudStorageReadOptions.DEFAULT.toBuilder().setFadvise(Fadvise.AUTO).build(); + fileAccessPattern = new AdaptiveFileAccessPattern(RESOURCE_ID, readOptions); + assertThat(fileAccessPattern.isRandomAccessPattern()).isFalse(); + + readOptions = + GoogleCloudStorageReadOptions.DEFAULT.toBuilder().setFadvise(Fadvise.AUTO_RANDOM).build(); + fileAccessPattern = new AdaptiveFileAccessPattern(RESOURCE_ID, readOptions); + assertThat(fileAccessPattern.isRandomAccessPattern()).isTrue(); + } + + @Test + public void testOverridenPattern() { + GoogleCloudStorageReadOptions readOptions = + GoogleCloudStorageReadOptions.DEFAULT.toBuilder().setFadvise(Fadvise.AUTO).build(); + long lastServedIndex = 10; + long currentPosition = 0; + // AUTO Adaptive access pattern type + AdaptiveFileAccessPattern fileAccessPattern = + new AdaptiveFileAccessPattern(RESOURCE_ID, readOptions); + assertThat(fileAccessPattern.isRandomAccessPattern()).isFalse(); + // backward seek would result into adapting random pattern + fileAccessPattern.updateLastServedIndex(lastServedIndex); + fileAccessPattern.updateAccessPattern(currentPosition); + assertThat(fileAccessPattern.isRandomAccessPattern()).isTrue(); + + // overriding access pattern + fileAccessPattern = new AdaptiveFileAccessPattern(RESOURCE_ID, readOptions); + // override to use sequential pattern + fileAccessPattern.overrideAccessPattern(false); + // even with backward seek, pattern remains to be sequential + fileAccessPattern.updateLastServedIndex(lastServedIndex); + fileAccessPattern.updateAccessPattern(currentPosition); + assertThat(fileAccessPattern.isRandomAccessPattern()).isFalse(); + + // AUTO_RANDOM Adaptive access pattern type + // just 2 request in sequential pattern will result in adaptation + lastServedIndex = 10; + currentPosition = 11; + readOptions = + GoogleCloudStorageReadOptions.DEFAULT + .toBuilder() + .setFadvise(Fadvise.AUTO_RANDOM) + .setFadviseRequestTrackCount(1) + .build(); + fileAccessPattern = new AdaptiveFileAccessPattern(RESOURCE_ID, readOptions); + assertThat(fileAccessPattern.isRandomAccessPattern()).isTrue(); + // sequential read request will result in flipping to use sequential read pattern + fileAccessPattern.updateLastServedIndex(lastServedIndex); + fileAccessPattern.updateAccessPattern(currentPosition); + assertThat(fileAccessPattern.isRandomAccessPattern()).isFalse(); + + // overriding access pattern + fileAccessPattern = new AdaptiveFileAccessPattern(RESOURCE_ID, readOptions); + // override to use random pattern + fileAccessPattern.overrideAccessPattern(true); + assertThat(fileAccessPattern.isRandomAccessPattern()).isTrue(); + // even with sequential read request, patten remains to be random + fileAccessPattern.updateLastServedIndex(lastServedIndex); + fileAccessPattern.updateAccessPattern(currentPosition); + assertThat(fileAccessPattern.isRandomAccessPattern()).isTrue(); + } + + @Test + public void testAutoMode() { + GoogleCloudStorageReadOptions readOptions = + GoogleCloudStorageReadOptions.DEFAULT.toBuilder().setFadvise(Fadvise.AUTO).build(); + long lastServedIndex = 10; + long currentPosition = 0; + // AUTO Adaptive access pattern type + AdaptiveFileAccessPattern fileAccessPattern = + new AdaptiveFileAccessPattern(RESOURCE_ID, readOptions); + assertThat(fileAccessPattern.isRandomAccessPattern()).isFalse(); + // backward seek would result into adapting random pattern + fileAccessPattern.updateLastServedIndex(lastServedIndex); + fileAccessPattern.updateAccessPattern(currentPosition); + assertThat(fileAccessPattern.isRandomAccessPattern()).isTrue(); + } + + @Test + public void testAutoRandomMode() { + + GoogleCloudStorageReadOptions readOptions = + GoogleCloudStorageReadOptions.DEFAULT.toBuilder().setFadvise(Fadvise.AUTO_RANDOM).build(); + int readLength = 10; + long lastServedIndex = 1; + AdaptiveFileAccessPattern fileAccessPattern = + new AdaptiveFileAccessPattern(RESOURCE_ID, readOptions); + assertThat(fileAccessPattern.isRandomAccessPattern()).isTrue(); + fileAccessPattern.updateLastServedIndex(lastServedIndex); + for (int i = 0; i < readOptions.getFadviseRequestTrackCount(); i++) { + // sequential read + long currentPosition = lastServedIndex + readOptions.getInplaceSeekLimit(); + lastServedIndex = currentPosition + readLength; + fileAccessPattern.updateAccessPattern(currentPosition); + if (i == readOptions.getFadviseRequestTrackCount() - 1) { + assertThat(fileAccessPattern.isRandomAccessPattern()).isFalse(); + } else { + assertThat(fileAccessPattern.isRandomAccessPattern()).isTrue(); + } + fileAccessPattern.updateLastServedIndex(lastServedIndex); + } + } +} diff --git a/gcsio/src/test/java/com/google/cloud/hadoop/gcsio/GoogleCloudStorageClientReadChannelTest.java b/gcsio/src/test/java/com/google/cloud/hadoop/gcsio/GoogleCloudStorageClientReadChannelTest.java index bb7969c92f..baec24438e 100644 --- a/gcsio/src/test/java/com/google/cloud/hadoop/gcsio/GoogleCloudStorageClientReadChannelTest.java +++ b/gcsio/src/test/java/com/google/cloud/hadoop/gcsio/GoogleCloudStorageClientReadChannelTest.java @@ -53,7 +53,7 @@ public class GoogleCloudStorageClientReadChannelTest { private static final String V1_BUCKET_NAME = "bucket-name"; private static final String OBJECT_NAME = "object-name"; private static final int CHUNK_SIZE = FakeReadChannel.CHUNK_SIZE; - private static final int OBJECT_SIZE = 1024 * 1024; + private static final int OBJECT_SIZE = 5 * 1024 * 1024; private static final int IN_PLACE_SEEK_LIMIT = 5; private static final StorageResourceId RESOURCE_ID = new StorageResourceId(V1_BUCKET_NAME, OBJECT_NAME); @@ -267,6 +267,48 @@ public void fadviseAuto_onBackwardRead_switchesToRandom() throws IOException { verifyNoMoreInteractions(fakeReadChannel); } + @Test + public void fadviseAutoRandom_onSequentialRead_switchToSequential() throws IOException { + long blockSize = CHUNK_SIZE; + GoogleCloudStorageReadOptions readOptions = + GoogleCloudStorageReadOptions.builder() + .setFadvise(Fadvise.AUTO_RANDOM) + .setGrpcChecksumsEnabled(true) + .setInplaceSeekLimit(5) + .setMinRangeRequestSize(10) + .setBlockSize(blockSize) + .build(); + GoogleCloudStorageClientReadChannel readChannel = + getJavaStorageChannel(DEFAULT_ITEM_INFO, readOptions); + + int seekPosition = 0; + int readLength = readOptions.getMinRangeRequestSize(); + + for (int i = 0; i < readOptions.getFadviseRequestTrackCount() + 1; i++) { + ByteBuffer buffer = ByteBuffer.allocate(readLength); + fakeReadChannel = spy(new FakeReadChannel(CONTENT)); + when(mockedStorage.reader(any(), any())).thenReturn(fakeReadChannel); + + readChannel.position(seekPosition); + readChannel.read(buffer); + verifyContent(buffer, seekPosition, readLength); + if (i < readOptions.getFadviseRequestTrackCount()) { + assertThat(readChannel.randomAccessStatus()).isTrue(); + verify(fakeReadChannel, times(1)).seek(seekPosition); + verify(fakeReadChannel, times(1)).limit(seekPosition + readLength); + verify(fakeReadChannel, times(1)).setChunkSize(0); + } else { + assertThat(readChannel.randomAccessStatus()).isFalse(); + verify(fakeReadChannel, times(1)).seek(seekPosition); + verify(fakeReadChannel, times(1)).limit(seekPosition + blockSize); + verify(fakeReadChannel, times(1)).setChunkSize(0); + } + + seekPosition += readLength; + buffer.clear(); + } + } + @Test public void footerPrefetch_reused() throws IOException { int chunkSize = FakeReadChannel.CHUNK_SIZE; diff --git a/gcsio/src/test/java/com/google/cloud/hadoop/gcsio/GoogleCloudStorageReadChannelTest.java b/gcsio/src/test/java/com/google/cloud/hadoop/gcsio/GoogleCloudStorageReadChannelTest.java index 5b39319cce..98e3eb5804 100644 --- a/gcsio/src/test/java/com/google/cloud/hadoop/gcsio/GoogleCloudStorageReadChannelTest.java +++ b/gcsio/src/test/java/com/google/cloud/hadoop/gcsio/GoogleCloudStorageReadChannelTest.java @@ -35,6 +35,7 @@ import com.google.api.client.http.HttpRequest; import com.google.api.client.testing.http.MockHttpTransport; +import com.google.api.client.testing.http.MockLowLevelHttpResponse; import com.google.api.client.util.DateTime; import com.google.api.services.storage.Storage; import com.google.api.services.storage.model.StorageObject; @@ -134,11 +135,11 @@ public void fadviseAuto_onForwardRead_switchesToRandom() throws IOException { assertThat(readBytes).isEqualTo(new byte[] {testData[1]}); readChannel.position(seekPosition); - assertThat(readChannel.randomAccess).isFalse(); + assertThat(readChannel.randomAccessStatus()).isFalse(); assertThat(readChannel.read(ByteBuffer.wrap(readBytes))).isEqualTo(1); assertThat(readBytes).isEqualTo(new byte[] {testData[seekPosition]}); - assertThat(readChannel.randomAccess).isTrue(); + assertThat(readChannel.randomAccessStatus()).isTrue(); List rangeHeaders = requests.stream().map(r -> r.getHeaders().getRange()).collect(toList()); @@ -176,11 +177,11 @@ public void fadviseAuto_onBackwardRead_switchesToRandom() throws IOException { assertThat(readBytes).isEqualTo(new byte[] {testData[seekPosition]}); readChannel.position(0); - assertThat(readChannel.randomAccess).isFalse(); + assertThat(readChannel.randomAccessStatus()).isFalse(); assertThat(readChannel.read(ByteBuffer.wrap(readBytes))).isEqualTo(1); assertThat(readBytes).isEqualTo(new byte[] {testData[0]}); - assertThat(readChannel.randomAccess).isTrue(); + assertThat(readChannel.randomAccessStatus()).isTrue(); List rangeHeaders = requests.stream().map(r -> r.getHeaders().getRange()).collect(toList()); @@ -188,6 +189,71 @@ public void fadviseAuto_onBackwardRead_switchesToRandom() throws IOException { assertThat(rangeHeaders).containsExactly("bytes=5-", "bytes=0-0").inOrder(); } + @Test + public void fadviseAutoRandom_onSequentialRead_switchToSequential() throws IOException { + int blockSize = 3; + byte[] testData = {0x00, 0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, 0x08, 0x09}; + List lowLevelHttpResponses = new ArrayList<>(); + + GoogleCloudStorageReadOptions options = + newLazyReadOptionsBuilder() + .setFadvise(Fadvise.AUTO_RANDOM) + .setMinRangeRequestSize(1) + .setBlockSize(blockSize) + .build(); + + int requestLength = options.getMinRangeRequestSize(); + int rangeStartIndex = 0; + for (int i = 0; i <= options.getFadviseRequestTrackCount(); i++) { + int rangeEndIndex = rangeStartIndex + requestLength; + if (i == options.getFadviseRequestTrackCount()) { + rangeEndIndex = rangeStartIndex + blockSize; + } + MockLowLevelHttpResponse request = + dataRangeResponse( + Arrays.copyOfRange(testData, rangeStartIndex, rangeEndIndex), + rangeStartIndex, + testData.length); + lowLevelHttpResponses.add(request); + rangeStartIndex = rangeEndIndex; + } + + MockHttpTransport transport = + mockTransport( + lowLevelHttpResponses.toArray( + new MockLowLevelHttpResponse[lowLevelHttpResponses.size()])); + + List requests = new ArrayList<>(); + + Storage storage = new Storage(transport, JSON_FACTORY, requests::add); + + GoogleCloudStorageReadChannel readChannel = createReadChannel(storage, options); + + byte[] readBytes = new byte[requestLength]; + rangeStartIndex = 0; + for (int i = 0; i <= options.getFadviseRequestTrackCount(); i++) { + readChannel.position(rangeStartIndex); + + assertThat(readChannel.read(ByteBuffer.wrap(readBytes))).isEqualTo(requestLength); + assertThat(readBytes).isEqualTo(new byte[] {testData[rangeStartIndex]}); + if (i == options.getFadviseRequestTrackCount()) { + assertThat(readChannel.randomAccessStatus()).isFalse(); + } else { + assertThat(readChannel.randomAccessStatus()).isTrue(); + } + rangeStartIndex += requestLength; + } + + List rangeHeaders = + requests.stream().map(r -> r.getHeaders().getRange()).collect(toList()); + + // bytes=3-5 instead of opening channel till end as in AUTO_RANDOM adapting to sequential read + // will result in opening channel till blockSize. + assertThat(rangeHeaders) + .containsExactly("bytes=0-0", "bytes=1-1", "bytes=2-2", "bytes=3-5") + .inOrder(); + } + @Test public void footerPrefetch_reused() throws IOException { int footeSize = 2; From 7b7a891abf02c5ae3907d96a84b31591bac24ead Mon Sep 17 00:00:00 2001 From: Ravi Dutt Singh Date: Mon, 2 Sep 2024 16:13:15 +0530 Subject: [PATCH 2/5] address review comments --- gcs/CONFIGURATION.md | 2 +- ...ern.java => FileAccessPatternManager.java} | 20 ++++++++----- .../GoogleCloudStorageClientReadChannel.java | 4 +-- .../gcsio/GoogleCloudStorageReadChannel.java | 12 +++++--- ...java => FileAccessPatternManagerTest.java} | 30 +++++++++---------- 5 files changed, 38 insertions(+), 30 deletions(-) rename gcsio/src/main/java/com/google/cloud/hadoop/gcsio/{AdaptiveFileAccessPattern.java => FileAccessPatternManager.java} (90%) rename gcsio/src/test/java/com/google/cloud/hadoop/gcsio/{AdaptiveFileAccessPatternTest.java => FileAccessPatternManagerTest.java} (85%) diff --git a/gcs/CONFIGURATION.md b/gcs/CONFIGURATION.md index d8b2a2b814..6ab5047524 100644 --- a/gcs/CONFIGURATION.md +++ b/gcs/CONFIGURATION.md @@ -517,7 +517,7 @@ permissions (not authorized) to execute these requests. Self adaptive fadvise mode uses distance between the served requests to decide the access pattern. This property controls how many such requests - need to be tracked. + need to be tracked. It is used when `AUTO_RANDOM` is selected. * `fs.gs.inputstream.inplace.seek.limit` (default: `8388608`) diff --git a/gcsio/src/main/java/com/google/cloud/hadoop/gcsio/AdaptiveFileAccessPattern.java b/gcsio/src/main/java/com/google/cloud/hadoop/gcsio/FileAccessPatternManager.java similarity index 90% rename from gcsio/src/main/java/com/google/cloud/hadoop/gcsio/AdaptiveFileAccessPattern.java rename to gcsio/src/main/java/com/google/cloud/hadoop/gcsio/FileAccessPatternManager.java index 75cff237ef..3a1f1fdf4b 100644 --- a/gcsio/src/main/java/com/google/cloud/hadoop/gcsio/AdaptiveFileAccessPattern.java +++ b/gcsio/src/main/java/com/google/cloud/hadoop/gcsio/FileAccessPatternManager.java @@ -17,13 +17,19 @@ package com.google.cloud.hadoop.gcsio; import com.google.cloud.hadoop.gcsio.GoogleCloudStorageReadOptions.Fadvise; +import com.google.common.annotations.VisibleForTesting; import com.google.common.flogger.GoogleLogger; import java.io.Closeable; import java.io.IOException; +import java.util.Iterator; import java.util.LinkedList; -import java.util.ListIterator; -class AdaptiveFileAccessPattern implements Closeable { +/** + * Manages the access pattern of object being read from cloud storage. For adaptive fadvise + * configurations it computes the access pattern based on previous requests. + */ +@VisibleForTesting +class FileAccessPatternManager implements Closeable { private static final GoogleLogger logger = GoogleLogger.forEnclosingClass(); private final StorageResourceId resourceId; @@ -36,9 +42,7 @@ class AdaptiveFileAccessPattern implements Closeable { @Override public void close() throws IOException { - if (consecutiveRequestsDistances != null) { - consecutiveRequestsDistances = null; - } + consecutiveRequestsDistances = null; } class BoundedList extends LinkedList { @@ -58,7 +62,7 @@ public boolean add(E o) { } } - public AdaptiveFileAccessPattern( + public FileAccessPatternManager( StorageResourceId resourceId, GoogleCloudStorageReadOptions readOptions) { this.resourceId = resourceId; this.readOptions = readOptions; @@ -66,7 +70,7 @@ public AdaptiveFileAccessPattern( readOptions.getFadvise() == Fadvise.AUTO_RANDOM || readOptions.getFadvise() == Fadvise.RANDOM; if (readOptions.getFadvise() == Fadvise.AUTO_RANDOM) { - consecutiveRequestsDistances = new BoundedList<>(readOptions.getFadviseRequestTrackCount()); + consecutiveRequestsDistances = new BoundedList(readOptions.getFadviseRequestTrackCount()); } } @@ -122,7 +126,7 @@ private boolean isSequentialAccessPattern(long currentPosition) { return false; } - ListIterator iterator = consecutiveRequestsDistances.listIterator(); + Iterator iterator = consecutiveRequestsDistances.iterator(); while (iterator.hasNext()) { Long distance = iterator.next(); if (distance < 0 || distance > readOptions.DEFAULT_INPLACE_SEEK_LIMIT) { diff --git a/gcsio/src/main/java/com/google/cloud/hadoop/gcsio/GoogleCloudStorageClientReadChannel.java b/gcsio/src/main/java/com/google/cloud/hadoop/gcsio/GoogleCloudStorageClientReadChannel.java index 2739b8b848..e4b5749d13 100644 --- a/gcsio/src/main/java/com/google/cloud/hadoop/gcsio/GoogleCloudStorageClientReadChannel.java +++ b/gcsio/src/main/java/com/google/cloud/hadoop/gcsio/GoogleCloudStorageClientReadChannel.java @@ -211,14 +211,14 @@ private class ContentReadChannel implements Closeable { // in-place seeks. private byte[] skipBuffer = null; private ReadableByteChannel byteChannel = null; - private AdaptiveFileAccessPattern fileAccessPattern; + private FileAccessPatternManager fileAccessPattern; public ContentReadChannel( GoogleCloudStorageReadOptions readOptions, StorageResourceId resourceId) { this.blobId = BlobId.of( resourceId.getBucketName(), resourceId.getObjectName(), resourceId.getGenerationId()); - this.fileAccessPattern = new AdaptiveFileAccessPattern(resourceId, readOptions); + this.fileAccessPattern = new FileAccessPatternManager(resourceId, readOptions); if (gzipEncoded) { fileAccessPattern.overrideAccessPattern(false); } diff --git a/gcsio/src/main/java/com/google/cloud/hadoop/gcsio/GoogleCloudStorageReadChannel.java b/gcsio/src/main/java/com/google/cloud/hadoop/gcsio/GoogleCloudStorageReadChannel.java index c3b3fd6cd2..0fc6d4d183 100644 --- a/gcsio/src/main/java/com/google/cloud/hadoop/gcsio/GoogleCloudStorageReadChannel.java +++ b/gcsio/src/main/java/com/google/cloud/hadoop/gcsio/GoogleCloudStorageReadChannel.java @@ -119,7 +119,7 @@ public class GoogleCloudStorageReadChannel implements SeekableByteChannel { // Fine-grained options. private final GoogleCloudStorageReadOptions readOptions; - private final AdaptiveFileAccessPattern fileAccessPattern; + private final FileAccessPatternManager fileAccessPattern; // Sleeper used for waiting between retries. private Sleeper sleeper = Sleeper.DEFAULT; @@ -170,7 +170,7 @@ public GoogleCloudStorageReadChannel( this.errorExtractor = errorExtractor; this.readOptions = readOptions; this.resourceId = resourceId; - this.fileAccessPattern = new AdaptiveFileAccessPattern(resourceId, readOptions); + this.fileAccessPattern = new FileAccessPatternManager(resourceId, readOptions); // Initialize metadata if available. GoogleCloudStorageItemInfo info = getInitialMetadata(); @@ -790,8 +790,12 @@ protected void initMetadata(@Nullable String encoding, long sizeFromMetadata, lo metadataInitialized = true; logger.atFiner().log( - "Initialized metadata (gzipEncoded=%s, size=%s, generation=%s) for '%s'", - gzipEncoded, size, resourceId.getGenerationId(), resourceId); + "Initialized metadata (gzipEncoded=%s, size=%s, randomAccess=%s, generation=%s) for '%s'", + gzipEncoded, + size, + fileAccessPattern.isRandomAccessPattern(), + resourceId.getGenerationId(), + resourceId); } private void cacheFooter(HttpResponse response) throws IOException { diff --git a/gcsio/src/test/java/com/google/cloud/hadoop/gcsio/AdaptiveFileAccessPatternTest.java b/gcsio/src/test/java/com/google/cloud/hadoop/gcsio/FileAccessPatternManagerTest.java similarity index 85% rename from gcsio/src/test/java/com/google/cloud/hadoop/gcsio/AdaptiveFileAccessPatternTest.java rename to gcsio/src/test/java/com/google/cloud/hadoop/gcsio/FileAccessPatternManagerTest.java index a81bad3e1e..d3986ff1c5 100644 --- a/gcsio/src/test/java/com/google/cloud/hadoop/gcsio/AdaptiveFileAccessPatternTest.java +++ b/gcsio/src/test/java/com/google/cloud/hadoop/gcsio/FileAccessPatternManagerTest.java @@ -24,7 +24,7 @@ import org.junit.runners.JUnit4; @RunWith(JUnit4.class) -public class AdaptiveFileAccessPatternTest { +public class FileAccessPatternManagerTest { private static final String BUCKET_NAME = "bucket-name"; private static final String OBJECT_NAME = "object-name"; @@ -36,24 +36,24 @@ public void defaultAccessPatterns() { GoogleCloudStorageReadOptions readOptions = GoogleCloudStorageReadOptions.DEFAULT.toBuilder().setFadvise(Fadvise.SEQUENTIAL).build(); - AdaptiveFileAccessPattern fileAccessPattern = - new AdaptiveFileAccessPattern(RESOURCE_ID, readOptions); + FileAccessPatternManager fileAccessPattern = + new FileAccessPatternManager(RESOURCE_ID, readOptions); assertThat(fileAccessPattern.isRandomAccessPattern()).isFalse(); readOptions = GoogleCloudStorageReadOptions.DEFAULT.toBuilder().setFadvise(Fadvise.RANDOM).build(); - fileAccessPattern = new AdaptiveFileAccessPattern(RESOURCE_ID, readOptions); + fileAccessPattern = new FileAccessPatternManager(RESOURCE_ID, readOptions); assertThat(fileAccessPattern.isRandomAccessPattern()).isTrue(); readOptions = GoogleCloudStorageReadOptions.DEFAULT.toBuilder().setFadvise(Fadvise.AUTO).build(); - fileAccessPattern = new AdaptiveFileAccessPattern(RESOURCE_ID, readOptions); + fileAccessPattern = new FileAccessPatternManager(RESOURCE_ID, readOptions); assertThat(fileAccessPattern.isRandomAccessPattern()).isFalse(); readOptions = GoogleCloudStorageReadOptions.DEFAULT.toBuilder().setFadvise(Fadvise.AUTO_RANDOM).build(); - fileAccessPattern = new AdaptiveFileAccessPattern(RESOURCE_ID, readOptions); + fileAccessPattern = new FileAccessPatternManager(RESOURCE_ID, readOptions); assertThat(fileAccessPattern.isRandomAccessPattern()).isTrue(); } @@ -64,8 +64,8 @@ public void testOverridenPattern() { long lastServedIndex = 10; long currentPosition = 0; // AUTO Adaptive access pattern type - AdaptiveFileAccessPattern fileAccessPattern = - new AdaptiveFileAccessPattern(RESOURCE_ID, readOptions); + FileAccessPatternManager fileAccessPattern = + new FileAccessPatternManager(RESOURCE_ID, readOptions); assertThat(fileAccessPattern.isRandomAccessPattern()).isFalse(); // backward seek would result into adapting random pattern fileAccessPattern.updateLastServedIndex(lastServedIndex); @@ -73,7 +73,7 @@ public void testOverridenPattern() { assertThat(fileAccessPattern.isRandomAccessPattern()).isTrue(); // overriding access pattern - fileAccessPattern = new AdaptiveFileAccessPattern(RESOURCE_ID, readOptions); + fileAccessPattern = new FileAccessPatternManager(RESOURCE_ID, readOptions); // override to use sequential pattern fileAccessPattern.overrideAccessPattern(false); // even with backward seek, pattern remains to be sequential @@ -91,7 +91,7 @@ public void testOverridenPattern() { .setFadvise(Fadvise.AUTO_RANDOM) .setFadviseRequestTrackCount(1) .build(); - fileAccessPattern = new AdaptiveFileAccessPattern(RESOURCE_ID, readOptions); + fileAccessPattern = new FileAccessPatternManager(RESOURCE_ID, readOptions); assertThat(fileAccessPattern.isRandomAccessPattern()).isTrue(); // sequential read request will result in flipping to use sequential read pattern fileAccessPattern.updateLastServedIndex(lastServedIndex); @@ -99,7 +99,7 @@ public void testOverridenPattern() { assertThat(fileAccessPattern.isRandomAccessPattern()).isFalse(); // overriding access pattern - fileAccessPattern = new AdaptiveFileAccessPattern(RESOURCE_ID, readOptions); + fileAccessPattern = new FileAccessPatternManager(RESOURCE_ID, readOptions); // override to use random pattern fileAccessPattern.overrideAccessPattern(true); assertThat(fileAccessPattern.isRandomAccessPattern()).isTrue(); @@ -116,8 +116,8 @@ public void testAutoMode() { long lastServedIndex = 10; long currentPosition = 0; // AUTO Adaptive access pattern type - AdaptiveFileAccessPattern fileAccessPattern = - new AdaptiveFileAccessPattern(RESOURCE_ID, readOptions); + FileAccessPatternManager fileAccessPattern = + new FileAccessPatternManager(RESOURCE_ID, readOptions); assertThat(fileAccessPattern.isRandomAccessPattern()).isFalse(); // backward seek would result into adapting random pattern fileAccessPattern.updateLastServedIndex(lastServedIndex); @@ -132,8 +132,8 @@ public void testAutoRandomMode() { GoogleCloudStorageReadOptions.DEFAULT.toBuilder().setFadvise(Fadvise.AUTO_RANDOM).build(); int readLength = 10; long lastServedIndex = 1; - AdaptiveFileAccessPattern fileAccessPattern = - new AdaptiveFileAccessPattern(RESOURCE_ID, readOptions); + FileAccessPatternManager fileAccessPattern = + new FileAccessPatternManager(RESOURCE_ID, readOptions); assertThat(fileAccessPattern.isRandomAccessPattern()).isTrue(); fileAccessPattern.updateLastServedIndex(lastServedIndex); for (int i = 0; i < readOptions.getFadviseRequestTrackCount(); i++) { From fa69dd7a0a4e91874d4c9d0c1a9859c7558ac6ca Mon Sep 17 00:00:00 2001 From: Ravi Dutt Singh Date: Mon, 2 Sep 2024 22:44:15 +0530 Subject: [PATCH 3/5] address review comments --- .../gcsio/FileAccessPatternManager.java | 69 +++++-------------- .../GoogleCloudStorageClientReadChannel.java | 29 +++----- .../gcsio/GoogleCloudStorageReadChannel.java | 22 +++--- 3 files changed, 39 insertions(+), 81 deletions(-) diff --git a/gcsio/src/main/java/com/google/cloud/hadoop/gcsio/FileAccessPatternManager.java b/gcsio/src/main/java/com/google/cloud/hadoop/gcsio/FileAccessPatternManager.java index 3a1f1fdf4b..6b3b97fc6c 100644 --- a/gcsio/src/main/java/com/google/cloud/hadoop/gcsio/FileAccessPatternManager.java +++ b/gcsio/src/main/java/com/google/cloud/hadoop/gcsio/FileAccessPatternManager.java @@ -19,59 +19,31 @@ import com.google.cloud.hadoop.gcsio.GoogleCloudStorageReadOptions.Fadvise; import com.google.common.annotations.VisibleForTesting; import com.google.common.flogger.GoogleLogger; -import java.io.Closeable; -import java.io.IOException; -import java.util.Iterator; -import java.util.LinkedList; /** * Manages the access pattern of object being read from cloud storage. For adaptive fadvise * configurations it computes the access pattern based on previous requests. */ @VisibleForTesting -class FileAccessPatternManager implements Closeable { +class FileAccessPatternManager { private static final GoogleLogger logger = GoogleLogger.forEnclosingClass(); private final StorageResourceId resourceId; private final GoogleCloudStorageReadOptions readOptions; - private boolean isPatternOverriden = false; + private boolean isPatternOverriden; private boolean randomAccess; private long lastServedIndex = -1; // Keeps track of distance between consecutive requests - private BoundedList consecutiveRequestsDistances; - - @Override - public void close() throws IOException { - consecutiveRequestsDistances = null; - } - - class BoundedList extends LinkedList { - private int limit; - - public BoundedList(int limit) { - this.limit = limit; - } - - @Override - public boolean add(E o) { - super.add(o); - while (size() > limit) { - super.removeFirst(); - } - return true; - } - } + private int consecutiveSequentialCount = 0; public FileAccessPatternManager( StorageResourceId resourceId, GoogleCloudStorageReadOptions readOptions) { + this.isPatternOverriden = false; this.resourceId = resourceId; this.readOptions = readOptions; this.randomAccess = readOptions.getFadvise() == Fadvise.AUTO_RANDOM || readOptions.getFadvise() == Fadvise.RANDOM; - if (readOptions.getFadvise() == Fadvise.AUTO_RANDOM) { - consecutiveRequestsDistances = new BoundedList(readOptions.getFadviseRequestTrackCount()); - } } public void updateLastServedIndex(long position) { @@ -100,42 +72,39 @@ public void updateAccessPattern(long currentPosition) { } /** - * This provides a way to override the access pattern, once overridden it will not be recomputed - * for adaptive fadvise types. + * This provides a way to override the access isRandomPattern, once overridden it will not be + * recomputed for adaptive fadvise types. * - * @param pattern, true, to override with random access else false + * @param isRandomPattern, true, to override with random access else false */ - public void overrideAccessPattern(boolean pattern) { + public void overrideAccessPattern(boolean isRandomPattern) { this.isPatternOverriden = true; - this.randomAccess = pattern; + this.randomAccess = isRandomPattern; logger.atInfo().log( "Overriding the random access pattern to %s for fadvise:%s for resource: %s ", - pattern, readOptions.getFadvise(), resourceId); + isRandomPattern, readOptions.getFadvise(), resourceId); } private boolean isSequentialAccessPattern(long currentPosition) { - if (lastServedIndex != -1 && consecutiveRequestsDistances != null) { - consecutiveRequestsDistances.add(currentPosition - lastServedIndex); + if (lastServedIndex != -1) { + long distance = currentPosition - lastServedIndex; + if (distance < 0 || distance > readOptions.getInplaceSeekLimit()) { + consecutiveSequentialCount = 0; + } else { + consecutiveSequentialCount++; + } } if (!shouldDetectSequentialAccess()) { return false; } - if (consecutiveRequestsDistances.size() < readOptions.getFadviseRequestTrackCount()) { + if (consecutiveSequentialCount < readOptions.getFadviseRequestTrackCount()) { return false; } - - Iterator iterator = consecutiveRequestsDistances.iterator(); - while (iterator.hasNext()) { - Long distance = iterator.next(); - if (distance < 0 || distance > readOptions.DEFAULT_INPLACE_SEEK_LIMIT) { - return false; - } - } logger.atInfo().log( "Detected %d consecutive read request within distance threshold %d with fadvise: %s switching to sequential IO for '%s'", - consecutiveRequestsDistances.size(), + consecutiveSequentialCount, readOptions.getInplaceSeekLimit(), readOptions.getFadvise(), resourceId); diff --git a/gcsio/src/main/java/com/google/cloud/hadoop/gcsio/GoogleCloudStorageClientReadChannel.java b/gcsio/src/main/java/com/google/cloud/hadoop/gcsio/GoogleCloudStorageClientReadChannel.java index e4b5749d13..c7c572b47e 100644 --- a/gcsio/src/main/java/com/google/cloud/hadoop/gcsio/GoogleCloudStorageClientReadChannel.java +++ b/gcsio/src/main/java/com/google/cloud/hadoop/gcsio/GoogleCloudStorageClientReadChannel.java @@ -35,7 +35,6 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.flogger.GoogleLogger; import java.io.ByteArrayInputStream; -import java.io.Closeable; import java.io.EOFException; import java.io.FileNotFoundException; import java.io.IOException; @@ -175,7 +174,7 @@ public void close() throws IOException { if (open) { try { logger.atFiner().log("Closing channel for '%s'", resourceId); - contentReadChannel.close(); + contentReadChannel.closeContentChannel(); } catch (Exception e) { GoogleCloudStorageEventBus.postOnException(); throw new IOException( @@ -195,7 +194,7 @@ public void close() throws IOException { * which helps in deciding the boundaries of content channel being opened and also caching the * footer of an object. */ - private class ContentReadChannel implements Closeable { + private class ContentReadChannel { // Size of buffer to allocate for skipping bytes in-place when performing in-place seeks. private static final int SKIP_BUFFER_SIZE = 8192; @@ -211,16 +210,16 @@ private class ContentReadChannel implements Closeable { // in-place seeks. private byte[] skipBuffer = null; private ReadableByteChannel byteChannel = null; - private FileAccessPatternManager fileAccessPattern; + private final FileAccessPatternManager fileAccessManager; public ContentReadChannel( GoogleCloudStorageReadOptions readOptions, StorageResourceId resourceId) { this.blobId = BlobId.of( resourceId.getBucketName(), resourceId.getObjectName(), resourceId.getGenerationId()); - this.fileAccessPattern = new FileAccessPatternManager(resourceId, readOptions); + this.fileAccessManager = new FileAccessPatternManager(resourceId, readOptions); if (gzipEncoded) { - fileAccessPattern.overrideAccessPattern(false); + fileAccessManager.overrideAccessPattern(false); } } @@ -339,7 +338,7 @@ private ReadableByteChannel openByteChannel(long bytesToRead) throws IOException } // Should be updated only if content is not served from cached footer - fileAccessPattern.updateAccessPattern(currentPosition); + fileAccessManager.updateAccessPattern(currentPosition); setChannelBoundaries(bytesToRead); @@ -427,7 +426,7 @@ private long getRangeRequestEnd(long startPosition, long bytesToRead) { return objectSize; } long endPosition = objectSize; - if (fileAccessPattern.isRandomAccessPattern()) { + if (fileAccessManager.isRandomAccessPattern()) { // opening a channel for whole object doesn't make sense as anyhow it will not be utilized // for further reads. endPosition = startPosition + max(bytesToRead, readOptions.getMinRangeRequestSize()); @@ -454,7 +453,7 @@ public void closeContentChannel() { "Got an exception on contentChannel.close() for '%s'; ignoring it.", resourceId); } finally { byteChannel = null; - fileAccessPattern.updateLastServedIndex(contentChannelCurrentPosition); + fileAccessManager.updateLastServedIndex(contentChannelCurrentPosition); reset(); } } @@ -568,21 +567,11 @@ private BlobSourceOption[] generateReadOptions(BlobId blobId) { private boolean isFooterRead() { return objectSize - currentPosition <= readOptions.getMinRangeRequestSize(); } - - @Override - public void close() throws IOException { - try { - fileAccessPattern.close(); - closeContentChannel(); - } finally { - fileAccessPattern = null; - } - } } @VisibleForTesting boolean randomAccessStatus() { - return contentReadChannel.fileAccessPattern.isRandomAccessPattern(); + return contentReadChannel.fileAccessManager.isRandomAccessPattern(); } private static void validate(GoogleCloudStorageItemInfo itemInfo) throws IOException { diff --git a/gcsio/src/main/java/com/google/cloud/hadoop/gcsio/GoogleCloudStorageReadChannel.java b/gcsio/src/main/java/com/google/cloud/hadoop/gcsio/GoogleCloudStorageReadChannel.java index 0fc6d4d183..3d7e9ee5ee 100644 --- a/gcsio/src/main/java/com/google/cloud/hadoop/gcsio/GoogleCloudStorageReadChannel.java +++ b/gcsio/src/main/java/com/google/cloud/hadoop/gcsio/GoogleCloudStorageReadChannel.java @@ -119,7 +119,7 @@ public class GoogleCloudStorageReadChannel implements SeekableByteChannel { // Fine-grained options. private final GoogleCloudStorageReadOptions readOptions; - private final FileAccessPatternManager fileAccessPattern; + private final FileAccessPatternManager fileAccessManager; // Sleeper used for waiting between retries. private Sleeper sleeper = Sleeper.DEFAULT; @@ -170,7 +170,7 @@ public GoogleCloudStorageReadChannel( this.errorExtractor = errorExtractor; this.readOptions = readOptions; this.resourceId = resourceId; - this.fileAccessPattern = new FileAccessPatternManager(resourceId, readOptions); + this.fileAccessManager = new FileAccessPatternManager(resourceId, readOptions); // Initialize metadata if available. GoogleCloudStorageItemInfo info = getInitialMetadata(); @@ -498,7 +498,7 @@ protected void closeContentChannel() { "Got an exception on contentChannel.close() for '%s'; ignoring it.", resourceId); } finally { contentChannel = null; - fileAccessPattern.updateLastServedIndex(contentChannelPosition); + fileAccessManager.updateLastServedIndex(contentChannelPosition); resetContentChannel(); } } @@ -628,7 +628,7 @@ protected void setSize(long size) { private void checkEncodingAndAccess() { checkState( - !(gzipEncoded && fileAccessPattern.isRandomAccessPattern()), + !(gzipEncoded && fileAccessManager.isRandomAccessPattern()), "gzipEncoded and randomAccess should not be true at the same time for '%s'", resourceId); } @@ -705,7 +705,7 @@ private void openContentChannel(long bytesToRead) throws IOException { } else { // only update access pattern if not getting served from cached footer if (!gzipEncoded) { - fileAccessPattern.updateAccessPattern(currentPosition); + fileAccessManager.updateAccessPattern(currentPosition); } objectContentStream = openStream(bytesToRead); } @@ -765,7 +765,7 @@ protected void initMetadata(@Nullable String encoding, long sizeFromMetadata, lo resourceId); gzipEncoded = nullToEmpty(encoding).contains(GZIP_ENCODING); if (gzipEncoded) { - fileAccessPattern.overrideAccessPattern(false); + fileAccessManager.overrideAccessPattern(false); } if (gzipEncoded && !readOptions.getSupportGzipEncoding()) { GoogleCloudStorageEventBus.postOnException(); @@ -793,7 +793,7 @@ protected void initMetadata(@Nullable String encoding, long sizeFromMetadata, lo "Initialized metadata (gzipEncoded=%s, size=%s, randomAccess=%s, generation=%s) for '%s'", gzipEncoded, size, - fileAccessPattern.isRandomAccessPattern(), + fileAccessManager.isRandomAccessPattern(), resourceId.getGenerationId(), resourceId); } @@ -889,7 +889,7 @@ protected InputStream openStream(long bytesToRead) throws IOException { // Set rangeSize to the size of the file reminder from currentPosition. long rangeSize = size - contentChannelPosition; - if (fileAccessPattern.isRandomAccessPattern()) { + if (fileAccessManager.isRandomAccessPattern()) { long randomRangeSize = Math.max(bytesToRead, readOptions.getMinRangeRequestSize()); // Limit rangeSize to the randomRangeSize. rangeSize = Math.min(randomRangeSize, rangeSize); @@ -919,8 +919,8 @@ protected InputStream openStream(long bytesToRead) throws IOException { resourceId); rangeHeader = "bytes=" + contentChannelPosition + "-"; - if (fileAccessPattern.isRandomAccessPattern() - || (!fileAccessPattern.isRandomAccessPattern() + if (fileAccessManager.isRandomAccessPattern() + || (!fileAccessManager.isRandomAccessPattern() && readOptions.getFadvise() == Fadvise.AUTO_RANDOM) || contentChannelEnd != size) { rangeHeader += (contentChannelEnd - 1); @@ -1179,6 +1179,6 @@ private void checkIOPrecondition(boolean precondition, String errorMessage) thro @VisibleForTesting boolean randomAccessStatus() { - return fileAccessPattern.isRandomAccessPattern(); + return fileAccessManager.isRandomAccessPattern(); } } From 132710375a61d7bc00fa14c3e212e8eb85cbe6c4 Mon Sep 17 00:00:00 2001 From: Ravi Dutt Singh Date: Fri, 6 Sep 2024 12:50:16 +0530 Subject: [PATCH 4/5] back and forth switch to RANDOM --- .../gcsio/FileAccessPatternManager.java | 29 +++++++++-- .../gcsio/FileAccessPatternManagerTest.java | 51 +++++++++++++------ 2 files changed, 61 insertions(+), 19 deletions(-) diff --git a/gcsio/src/main/java/com/google/cloud/hadoop/gcsio/FileAccessPatternManager.java b/gcsio/src/main/java/com/google/cloud/hadoop/gcsio/FileAccessPatternManager.java index 6b3b97fc6c..04771e2018 100644 --- a/gcsio/src/main/java/com/google/cloud/hadoop/gcsio/FileAccessPatternManager.java +++ b/gcsio/src/main/java/com/google/cloud/hadoop/gcsio/FileAccessPatternManager.java @@ -32,6 +32,10 @@ class FileAccessPatternManager { private final GoogleCloudStorageReadOptions readOptions; private boolean isPatternOverriden; private boolean randomAccess; + // keeps track of any backward seek requested in lifecycle of InputStream + private boolean isBackwardSeekRequested = false; + // keeps track of any backward seek requested in lifecycle of InputStream + private boolean isForwardSeekRequested = false; private long lastServedIndex = -1; // Keeps track of distance between consecutive requests private int consecutiveSequentialCount = 0; @@ -61,8 +65,14 @@ public void updateAccessPattern(long currentPosition) { return; } if (readOptions.getFadvise() == Fadvise.AUTO_RANDOM) { - if (isSequentialAccessPattern(currentPosition)) { - unsetRandomAccess(); + if (randomAccess) { + if (isSequentialAccessPattern(currentPosition)) { + unsetRandomAccess(); + } + } else { + if (isRandomAccessPattern(currentPosition)) { + setRandomAccess(); + } } } else if (readOptions.getFadvise() == Fadvise.AUTO) { if (isRandomAccessPattern(currentPosition)) { @@ -123,6 +133,9 @@ private boolean isRandomAccessPattern(long currentPosition) { logger.atFine().log( "Detected backward read from %s to %s position, switching to random IO for '%s'", lastServedIndex, currentPosition, resourceId); + + isBackwardSeekRequested = true; + consecutiveSequentialCount = 0; return true; } if (lastServedIndex >= 0 @@ -131,17 +144,25 @@ private boolean isRandomAccessPattern(long currentPosition) { "Detected forward read from %s to %s position over %s threshold," + " switching to random IO for '%s'", lastServedIndex, currentPosition, readOptions.getInplaceSeekLimit(), resourceId); + isForwardSeekRequested = true; + consecutiveSequentialCount = 0; return true; } return false; } private boolean shouldDetectSequentialAccess() { - return randomAccess && readOptions.getFadvise() == Fadvise.AUTO_RANDOM; + return randomAccess + && !isBackwardSeekRequested + && !isForwardSeekRequested + && consecutiveSequentialCount >= readOptions.getFadviseRequestTrackCount() + && readOptions.getFadvise() == Fadvise.AUTO_RANDOM; } private boolean shouldDetectRandomAccess() { - return !randomAccess && readOptions.getFadvise() == Fadvise.AUTO; + return !randomAccess + && (readOptions.getFadvise() == Fadvise.AUTO + || readOptions.getFadvise() == Fadvise.AUTO_RANDOM); } private void setRandomAccess() { diff --git a/gcsio/src/test/java/com/google/cloud/hadoop/gcsio/FileAccessPatternManagerTest.java b/gcsio/src/test/java/com/google/cloud/hadoop/gcsio/FileAccessPatternManagerTest.java index d3986ff1c5..030a3f4f87 100644 --- a/gcsio/src/test/java/com/google/cloud/hadoop/gcsio/FileAccessPatternManagerTest.java +++ b/gcsio/src/test/java/com/google/cloud/hadoop/gcsio/FileAccessPatternManagerTest.java @@ -129,24 +129,45 @@ public void testAutoMode() { public void testAutoRandomMode() { GoogleCloudStorageReadOptions readOptions = - GoogleCloudStorageReadOptions.DEFAULT.toBuilder().setFadvise(Fadvise.AUTO_RANDOM).build(); - int readLength = 10; - long lastServedIndex = 1; + GoogleCloudStorageReadOptions.DEFAULT + .toBuilder() + .setFadvise(Fadvise.AUTO_RANDOM) + .setFadviseRequestTrackCount(3) + .setInplaceSeekLimit(10) + .build(); FileAccessPatternManager fileAccessPattern = new FileAccessPatternManager(RESOURCE_ID, readOptions); - assertThat(fileAccessPattern.isRandomAccessPattern()).isTrue(); - fileAccessPattern.updateLastServedIndex(lastServedIndex); - for (int i = 0; i < readOptions.getFadviseRequestTrackCount(); i++) { - // sequential read - long currentPosition = lastServedIndex + readOptions.getInplaceSeekLimit(); - lastServedIndex = currentPosition + readLength; + // R-->S-->R backward seek + // sequential read resulted in flipping of pattern + // 4th request will result in sequential pattern + // 5th request is a backward seek, resulting in random read + // even any further sequential read will not result in sequential pattern + long readIndexes[] = new long[] {0, 1, 2, 3, 4, 0, 1, 2, 3, 4}; + boolean expectedRandomAccess[] = + new boolean[] {true, true, true, false, false, true, true, true, true, true}; + + for (int i = 0; i < readIndexes.length; i++) { + long currentPosition = readIndexes[i]; + fileAccessPattern.updateAccessPattern(currentPosition); + assertThat(fileAccessPattern.isRandomAccessPattern()).isEqualTo(expectedRandomAccess[i]); + fileAccessPattern.updateLastServedIndex(currentPosition); + } + + // R-->S-->R forward seek + // sequential read resulted in flipping of pattern + // 4th request will result in sequential pattern + // 5th request is a forward seek, resulting in random read + // even any further sequential read will not result in sequential pattern + readIndexes = new long[] {0, 1, 2, 3, 4, 15, 16, 17, 18, 19}; + expectedRandomAccess = + new boolean[] {true, true, true, false, false, true, true, true, true, true}; + + fileAccessPattern = new FileAccessPatternManager(RESOURCE_ID, readOptions); + for (int i = 0; i < readIndexes.length; i++) { + long currentPosition = readIndexes[i]; fileAccessPattern.updateAccessPattern(currentPosition); - if (i == readOptions.getFadviseRequestTrackCount() - 1) { - assertThat(fileAccessPattern.isRandomAccessPattern()).isFalse(); - } else { - assertThat(fileAccessPattern.isRandomAccessPattern()).isTrue(); - } - fileAccessPattern.updateLastServedIndex(lastServedIndex); + assertThat(fileAccessPattern.isRandomAccessPattern()).isEqualTo(expectedRandomAccess[i]); + fileAccessPattern.updateLastServedIndex(currentPosition); } } } From 7e8b2283b6e04a59d197d653a1eac106e20b448b Mon Sep 17 00:00:00 2001 From: Ravi Dutt Singh Date: Tue, 17 Sep 2024 20:30:24 +0530 Subject: [PATCH 5/5] fix backward and forward seek updates --- .../gcsio/FileAccessPatternManager.java | 59 +++++++++++-------- .../GoogleCloudStorageClientReadChannel.java | 4 +- .../gcsio/GoogleCloudStorageReadChannel.java | 12 ++-- .../gcsio/FileAccessPatternManagerTest.java | 57 +++++++++++------- 4 files changed, 80 insertions(+), 52 deletions(-) diff --git a/gcsio/src/main/java/com/google/cloud/hadoop/gcsio/FileAccessPatternManager.java b/gcsio/src/main/java/com/google/cloud/hadoop/gcsio/FileAccessPatternManager.java index 04771e2018..e251fcd406 100644 --- a/gcsio/src/main/java/com/google/cloud/hadoop/gcsio/FileAccessPatternManager.java +++ b/gcsio/src/main/java/com/google/cloud/hadoop/gcsio/FileAccessPatternManager.java @@ -54,7 +54,7 @@ public void updateLastServedIndex(long position) { this.lastServedIndex = position; } - public boolean isRandomAccessPattern() { + public boolean shouldAdaptToRandomAccess() { return randomAccess; } @@ -64,18 +64,19 @@ public void updateAccessPattern(long currentPosition) { "Will bypass computing access pattern as it's overriden for resource :%s", resourceId); return; } + updateSeekFlags(currentPosition); if (readOptions.getFadvise() == Fadvise.AUTO_RANDOM) { if (randomAccess) { - if (isSequentialAccessPattern(currentPosition)) { + if (shouldAdaptToSequential(currentPosition)) { unsetRandomAccess(); } } else { - if (isRandomAccessPattern(currentPosition)) { + if (shouldAdaptToRandomAccess(currentPosition)) { setRandomAccess(); } } } else if (readOptions.getFadvise() == Fadvise.AUTO) { - if (isRandomAccessPattern(currentPosition)) { + if (shouldAdaptToRandomAccess(currentPosition)) { setRandomAccess(); } } @@ -95,7 +96,7 @@ public void overrideAccessPattern(boolean isRandomPattern) { isRandomPattern, readOptions.getFadvise(), resourceId); } - private boolean isSequentialAccessPattern(long currentPosition) { + private boolean shouldAdaptToSequential(long currentPosition) { if (lastServedIndex != -1) { long distance = currentPosition - lastServedIndex; if (distance < 0 || distance > readOptions.getInplaceSeekLimit()) { @@ -121,7 +122,7 @@ private boolean isSequentialAccessPattern(long currentPosition) { return true; } - private boolean isRandomAccessPattern(long currentPosition) { + private boolean shouldAdaptToRandomAccess(long currentPosition) { if (!shouldDetectRandomAccess()) { return false; } @@ -129,23 +130,10 @@ private boolean isRandomAccessPattern(long currentPosition) { return false; } - if (currentPosition < lastServedIndex) { - logger.atFine().log( - "Detected backward read from %s to %s position, switching to random IO for '%s'", - lastServedIndex, currentPosition, resourceId); - - isBackwardSeekRequested = true; - consecutiveSequentialCount = 0; - return true; - } - if (lastServedIndex >= 0 - && lastServedIndex + readOptions.getInplaceSeekLimit() < currentPosition) { + if (isBackwardOrForwardSeekRequested()) { logger.atFine().log( - "Detected forward read from %s to %s position over %s threshold," - + " switching to random IO for '%s'", - lastServedIndex, currentPosition, readOptions.getInplaceSeekLimit(), resourceId); - isForwardSeekRequested = true; - consecutiveSequentialCount = 0; + "Backward or forward seek requested, isBackwardSeek: %s, isForwardSeek:%s for '%s'", + isBackwardSeekRequested, isForwardSeekRequested, resourceId); return true; } return false; @@ -153,8 +141,7 @@ private boolean isRandomAccessPattern(long currentPosition) { private boolean shouldDetectSequentialAccess() { return randomAccess - && !isBackwardSeekRequested - && !isForwardSeekRequested + && !isBackwardOrForwardSeekRequested() && consecutiveSequentialCount >= readOptions.getFadviseRequestTrackCount() && readOptions.getFadvise() == Fadvise.AUTO_RANDOM; } @@ -172,4 +159,28 @@ private void setRandomAccess() { private void unsetRandomAccess() { randomAccess = false; } + + private boolean isBackwardOrForwardSeekRequested() { + return isBackwardSeekRequested || isForwardSeekRequested; + } + + private void updateSeekFlags(long currentPosition) { + if (lastServedIndex == -1) { + return; + } + + if (currentPosition < lastServedIndex) { + isBackwardSeekRequested = true; + logger.atFine().log( + "Detected backward read from %s to %s position, updating to backwardSeek for '%s'", + lastServedIndex, currentPosition, resourceId); + + } else if (lastServedIndex + readOptions.getInplaceSeekLimit() < currentPosition) { + isForwardSeekRequested = true; + logger.atFine().log( + "Detected forward read from %s to %s position over %s threshold," + + " updated to forwardSeek for '%s'", + lastServedIndex, currentPosition, readOptions.getInplaceSeekLimit(), resourceId); + } + } } diff --git a/gcsio/src/main/java/com/google/cloud/hadoop/gcsio/GoogleCloudStorageClientReadChannel.java b/gcsio/src/main/java/com/google/cloud/hadoop/gcsio/GoogleCloudStorageClientReadChannel.java index c7c572b47e..77211b6dbd 100644 --- a/gcsio/src/main/java/com/google/cloud/hadoop/gcsio/GoogleCloudStorageClientReadChannel.java +++ b/gcsio/src/main/java/com/google/cloud/hadoop/gcsio/GoogleCloudStorageClientReadChannel.java @@ -426,7 +426,7 @@ private long getRangeRequestEnd(long startPosition, long bytesToRead) { return objectSize; } long endPosition = objectSize; - if (fileAccessManager.isRandomAccessPattern()) { + if (fileAccessManager.shouldAdaptToRandomAccess()) { // opening a channel for whole object doesn't make sense as anyhow it will not be utilized // for further reads. endPosition = startPosition + max(bytesToRead, readOptions.getMinRangeRequestSize()); @@ -571,7 +571,7 @@ private boolean isFooterRead() { @VisibleForTesting boolean randomAccessStatus() { - return contentReadChannel.fileAccessManager.isRandomAccessPattern(); + return contentReadChannel.fileAccessManager.shouldAdaptToRandomAccess(); } private static void validate(GoogleCloudStorageItemInfo itemInfo) throws IOException { diff --git a/gcsio/src/main/java/com/google/cloud/hadoop/gcsio/GoogleCloudStorageReadChannel.java b/gcsio/src/main/java/com/google/cloud/hadoop/gcsio/GoogleCloudStorageReadChannel.java index 3d7e9ee5ee..b18272e2e9 100644 --- a/gcsio/src/main/java/com/google/cloud/hadoop/gcsio/GoogleCloudStorageReadChannel.java +++ b/gcsio/src/main/java/com/google/cloud/hadoop/gcsio/GoogleCloudStorageReadChannel.java @@ -628,7 +628,7 @@ protected void setSize(long size) { private void checkEncodingAndAccess() { checkState( - !(gzipEncoded && fileAccessManager.isRandomAccessPattern()), + !(gzipEncoded && fileAccessManager.shouldAdaptToRandomAccess()), "gzipEncoded and randomAccess should not be true at the same time for '%s'", resourceId); } @@ -793,7 +793,7 @@ protected void initMetadata(@Nullable String encoding, long sizeFromMetadata, lo "Initialized metadata (gzipEncoded=%s, size=%s, randomAccess=%s, generation=%s) for '%s'", gzipEncoded, size, - fileAccessManager.isRandomAccessPattern(), + fileAccessManager.shouldAdaptToRandomAccess(), resourceId.getGenerationId(), resourceId); } @@ -889,7 +889,7 @@ protected InputStream openStream(long bytesToRead) throws IOException { // Set rangeSize to the size of the file reminder from currentPosition. long rangeSize = size - contentChannelPosition; - if (fileAccessManager.isRandomAccessPattern()) { + if (fileAccessManager.shouldAdaptToRandomAccess()) { long randomRangeSize = Math.max(bytesToRead, readOptions.getMinRangeRequestSize()); // Limit rangeSize to the randomRangeSize. rangeSize = Math.min(randomRangeSize, rangeSize); @@ -919,8 +919,8 @@ protected InputStream openStream(long bytesToRead) throws IOException { resourceId); rangeHeader = "bytes=" + contentChannelPosition + "-"; - if (fileAccessManager.isRandomAccessPattern() - || (!fileAccessManager.isRandomAccessPattern() + if (fileAccessManager.shouldAdaptToRandomAccess() + || (!fileAccessManager.shouldAdaptToRandomAccess() && readOptions.getFadvise() == Fadvise.AUTO_RANDOM) || contentChannelEnd != size) { rangeHeader += (contentChannelEnd - 1); @@ -1179,6 +1179,6 @@ private void checkIOPrecondition(boolean precondition, String errorMessage) thro @VisibleForTesting boolean randomAccessStatus() { - return fileAccessManager.isRandomAccessPattern(); + return fileAccessManager.shouldAdaptToRandomAccess(); } } diff --git a/gcsio/src/test/java/com/google/cloud/hadoop/gcsio/FileAccessPatternManagerTest.java b/gcsio/src/test/java/com/google/cloud/hadoop/gcsio/FileAccessPatternManagerTest.java index 030a3f4f87..d83cd17f5f 100644 --- a/gcsio/src/test/java/com/google/cloud/hadoop/gcsio/FileAccessPatternManagerTest.java +++ b/gcsio/src/test/java/com/google/cloud/hadoop/gcsio/FileAccessPatternManagerTest.java @@ -39,22 +39,22 @@ public void defaultAccessPatterns() { FileAccessPatternManager fileAccessPattern = new FileAccessPatternManager(RESOURCE_ID, readOptions); - assertThat(fileAccessPattern.isRandomAccessPattern()).isFalse(); + assertThat(fileAccessPattern.shouldAdaptToRandomAccess()).isFalse(); readOptions = GoogleCloudStorageReadOptions.DEFAULT.toBuilder().setFadvise(Fadvise.RANDOM).build(); fileAccessPattern = new FileAccessPatternManager(RESOURCE_ID, readOptions); - assertThat(fileAccessPattern.isRandomAccessPattern()).isTrue(); + assertThat(fileAccessPattern.shouldAdaptToRandomAccess()).isTrue(); readOptions = GoogleCloudStorageReadOptions.DEFAULT.toBuilder().setFadvise(Fadvise.AUTO).build(); fileAccessPattern = new FileAccessPatternManager(RESOURCE_ID, readOptions); - assertThat(fileAccessPattern.isRandomAccessPattern()).isFalse(); + assertThat(fileAccessPattern.shouldAdaptToRandomAccess()).isFalse(); readOptions = GoogleCloudStorageReadOptions.DEFAULT.toBuilder().setFadvise(Fadvise.AUTO_RANDOM).build(); fileAccessPattern = new FileAccessPatternManager(RESOURCE_ID, readOptions); - assertThat(fileAccessPattern.isRandomAccessPattern()).isTrue(); + assertThat(fileAccessPattern.shouldAdaptToRandomAccess()).isTrue(); } @Test @@ -66,11 +66,11 @@ public void testOverridenPattern() { // AUTO Adaptive access pattern type FileAccessPatternManager fileAccessPattern = new FileAccessPatternManager(RESOURCE_ID, readOptions); - assertThat(fileAccessPattern.isRandomAccessPattern()).isFalse(); + assertThat(fileAccessPattern.shouldAdaptToRandomAccess()).isFalse(); // backward seek would result into adapting random pattern fileAccessPattern.updateLastServedIndex(lastServedIndex); fileAccessPattern.updateAccessPattern(currentPosition); - assertThat(fileAccessPattern.isRandomAccessPattern()).isTrue(); + assertThat(fileAccessPattern.shouldAdaptToRandomAccess()).isTrue(); // overriding access pattern fileAccessPattern = new FileAccessPatternManager(RESOURCE_ID, readOptions); @@ -79,7 +79,7 @@ public void testOverridenPattern() { // even with backward seek, pattern remains to be sequential fileAccessPattern.updateLastServedIndex(lastServedIndex); fileAccessPattern.updateAccessPattern(currentPosition); - assertThat(fileAccessPattern.isRandomAccessPattern()).isFalse(); + assertThat(fileAccessPattern.shouldAdaptToRandomAccess()).isFalse(); // AUTO_RANDOM Adaptive access pattern type // just 2 request in sequential pattern will result in adaptation @@ -92,21 +92,21 @@ public void testOverridenPattern() { .setFadviseRequestTrackCount(1) .build(); fileAccessPattern = new FileAccessPatternManager(RESOURCE_ID, readOptions); - assertThat(fileAccessPattern.isRandomAccessPattern()).isTrue(); + assertThat(fileAccessPattern.shouldAdaptToRandomAccess()).isTrue(); // sequential read request will result in flipping to use sequential read pattern fileAccessPattern.updateLastServedIndex(lastServedIndex); fileAccessPattern.updateAccessPattern(currentPosition); - assertThat(fileAccessPattern.isRandomAccessPattern()).isFalse(); + assertThat(fileAccessPattern.shouldAdaptToRandomAccess()).isFalse(); // overriding access pattern fileAccessPattern = new FileAccessPatternManager(RESOURCE_ID, readOptions); // override to use random pattern fileAccessPattern.overrideAccessPattern(true); - assertThat(fileAccessPattern.isRandomAccessPattern()).isTrue(); + assertThat(fileAccessPattern.shouldAdaptToRandomAccess()).isTrue(); // even with sequential read request, patten remains to be random fileAccessPattern.updateLastServedIndex(lastServedIndex); fileAccessPattern.updateAccessPattern(currentPosition); - assertThat(fileAccessPattern.isRandomAccessPattern()).isTrue(); + assertThat(fileAccessPattern.shouldAdaptToRandomAccess()).isTrue(); } @Test @@ -118,11 +118,11 @@ public void testAutoMode() { // AUTO Adaptive access pattern type FileAccessPatternManager fileAccessPattern = new FileAccessPatternManager(RESOURCE_ID, readOptions); - assertThat(fileAccessPattern.isRandomAccessPattern()).isFalse(); + assertThat(fileAccessPattern.shouldAdaptToRandomAccess()).isFalse(); // backward seek would result into adapting random pattern fileAccessPattern.updateLastServedIndex(lastServedIndex); fileAccessPattern.updateAccessPattern(currentPosition); - assertThat(fileAccessPattern.isRandomAccessPattern()).isTrue(); + assertThat(fileAccessPattern.shouldAdaptToRandomAccess()).isTrue(); } @Test @@ -146,12 +146,7 @@ public void testAutoRandomMode() { boolean expectedRandomAccess[] = new boolean[] {true, true, true, false, false, true, true, true, true, true}; - for (int i = 0; i < readIndexes.length; i++) { - long currentPosition = readIndexes[i]; - fileAccessPattern.updateAccessPattern(currentPosition); - assertThat(fileAccessPattern.isRandomAccessPattern()).isEqualTo(expectedRandomAccess[i]); - fileAccessPattern.updateLastServedIndex(currentPosition); - } + verifyAccessPattern(fileAccessPattern, readIndexes, expectedRandomAccess); // R-->S-->R forward seek // sequential read resulted in flipping of pattern @@ -163,10 +158,32 @@ public void testAutoRandomMode() { new boolean[] {true, true, true, false, false, true, true, true, true, true}; fileAccessPattern = new FileAccessPatternManager(RESOURCE_ID, readOptions); + verifyAccessPattern(fileAccessPattern, readIndexes, expectedRandomAccess); + + // wouldn't flip to sequential if backward seek was requester earlier + readIndexes = new long[] {1, 0, 1, 2, 3, 4}; + expectedRandomAccess = new boolean[] {true, true, true, true, true, true}; + + fileAccessPattern = new FileAccessPatternManager(RESOURCE_ID, readOptions); + verifyAccessPattern(fileAccessPattern, readIndexes, expectedRandomAccess); + + // wouldn't flip to sequential if forward seek was requester earlier + readIndexes = new long[] {0, 15, 1, 2, 3, 4}; + expectedRandomAccess = new boolean[] {true, true, true, true, true, true}; + + fileAccessPattern = new FileAccessPatternManager(RESOURCE_ID, readOptions); + verifyAccessPattern(fileAccessPattern, readIndexes, expectedRandomAccess); + } + + private void verifyAccessPattern( + FileAccessPatternManager fileAccessPattern, + long readIndexes[], + boolean expectedRandomAccessPattern[]) { for (int i = 0; i < readIndexes.length; i++) { long currentPosition = readIndexes[i]; fileAccessPattern.updateAccessPattern(currentPosition); - assertThat(fileAccessPattern.isRandomAccessPattern()).isEqualTo(expectedRandomAccess[i]); + assertThat(fileAccessPattern.shouldAdaptToRandomAccess()) + .isEqualTo(expectedRandomAccessPattern[i]); fileAccessPattern.updateLastServedIndex(currentPosition); } }