Skip to content

Commit

Permalink
Optimizations in s3 async upload flow and guards against S3 async SDK…
Browse files Browse the repository at this point in the history
… errors

Signed-off-by: vikasvb90 <[email protected]>
  • Loading branch information
vikasvb90 committed Nov 25, 2023
1 parent 5bb6cae commit 94cde48
Show file tree
Hide file tree
Showing 15 changed files with 390 additions and 57 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -374,7 +374,7 @@ private static IrsaCredentials buildFromEnvironment(IrsaCredentials defaults) {
return new IrsaCredentials(webIdentityTokenFile, roleArn, roleSessionName);
}

private synchronized void releaseCachedClients() {
public synchronized void releaseCachedClients() {
// the clients will shutdown when they will not be used anymore
for (final AmazonAsyncS3Reference clientReference : clientsCache.values()) {
clientReference.decRef();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@
import org.opensearch.repositories.s3.async.UploadRequest;
import org.opensearch.repositories.s3.utils.HttpRangeUtils;

import java.io.BufferedInputStream;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
Expand Down Expand Up @@ -188,10 +189,38 @@ public void asyncBlobUpload(WriteContext writeContext, ActionListener<Void> comp
writeContext.getWritePriority(),
writeContext.getUploadFinalizer(),
writeContext.doRemoteDataIntegrityCheck(),
writeContext.getExpectedChecksum()
writeContext.getExpectedChecksum(),
blobStore.isUploadRetryEnabled()
);
try {
long partSize = blobStore.getAsyncTransferManager().calculateOptimalPartSize(writeContext.getFileSize());
if (uploadRequest.getContentLength() > ByteSizeUnit.GB.toBytes(10) && blobStore.isRedirectLargeUploads()) {
StreamContext streamContext = SocketAccess.doPrivileged(
() -> writeContext.getStreamProvider(uploadRequest.getContentLength())
);
InputStreamContainer inputStream = streamContext.provideStream(0);
try {
executeMultipartUpload(
blobStore,
uploadRequest.getKey(),
inputStream.getInputStream(),
uploadRequest.getContentLength()
);
completionListener.onResponse(null);
} catch (Exception ex) {
logger.error(
() -> new ParameterizedMessage(
"Failed to upload large file {} of size {} ",
uploadRequest.getKey(),
uploadRequest.getContentLength()
),
ex
);
completionListener.onFailure(ex);
}
return;
}
long partSize = blobStore.getAsyncTransferManager()
.calculateOptimalPartSize(writeContext.getFileSize(), writeContext.getWritePriority(), blobStore.isUploadRetryEnabled());
StreamContext streamContext = SocketAccess.doPrivileged(() -> writeContext.getStreamProvider(partSize));
try (AmazonAsyncS3Reference amazonS3Reference = SocketAccess.doPrivileged(blobStore::asyncClientReference)) {

Expand Down Expand Up @@ -537,8 +566,14 @@ void executeSingleUpload(final S3BlobStore blobStore, final String blobName, fin

PutObjectRequest putObjectRequest = putObjectRequestBuilder.build();
try (AmazonS3Reference clientReference = blobStore.clientReference()) {
final InputStream requestInputStream;
if (blobStore.isUploadRetryEnabled()) {
requestInputStream = new BufferedInputStream(input, (int) (blobSize + 1));
} else {
requestInputStream = input;
}
SocketAccess.doPrivilegedVoid(
() -> clientReference.get().putObject(putObjectRequest, RequestBody.fromInputStream(input, blobSize))
() -> clientReference.get().putObject(putObjectRequest, RequestBody.fromInputStream(requestInputStream, blobSize))
);
} catch (final SdkException e) {
throw new IOException("Unable to upload object [" + blobName + "] using a single upload", e);
Expand Down Expand Up @@ -578,6 +613,13 @@ void executeMultipartUpload(final S3BlobStore blobStore, final String blobName,
createMultipartUploadRequestBuilder.serverSideEncryption(ServerSideEncryption.AES256);
}

final InputStream requestInputStream;
if (blobStore.isUploadRetryEnabled()) {
requestInputStream = new BufferedInputStream(input, (int) (partSize + 1));
} else {
requestInputStream = input;
}

CreateMultipartUploadRequest createMultipartUploadRequest = createMultipartUploadRequestBuilder.build();
try (AmazonS3Reference clientReference = blobStore.clientReference()) {
uploadId.set(
Expand All @@ -601,10 +643,9 @@ void executeMultipartUpload(final S3BlobStore blobStore, final String blobName,
.build();

bytesCount += uploadPartRequest.contentLength();

final UploadPartResponse uploadResponse = SocketAccess.doPrivileged(
() -> clientReference.get()
.uploadPart(uploadPartRequest, RequestBody.fromInputStream(input, uploadPartRequest.contentLength()))
.uploadPart(uploadPartRequest, RequestBody.fromInputStream(requestInputStream, uploadPartRequest.contentLength()))
);
parts.add(CompletedPart.builder().partNumber(uploadPartRequest.partNumber()).eTag(uploadResponse.eTag()).build());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,10 @@
import static org.opensearch.repositories.s3.S3Repository.BUFFER_SIZE_SETTING;
import static org.opensearch.repositories.s3.S3Repository.BULK_DELETE_SIZE;
import static org.opensearch.repositories.s3.S3Repository.CANNED_ACL_SETTING;
import static org.opensearch.repositories.s3.S3Repository.REDIRECT_LARGE_S3_UPLOAD;
import static org.opensearch.repositories.s3.S3Repository.SERVER_SIDE_ENCRYPTION_SETTING;
import static org.opensearch.repositories.s3.S3Repository.STORAGE_CLASS_SETTING;
import static org.opensearch.repositories.s3.S3Repository.UPLOAD_RETRY_ENABLED;

class S3BlobStore implements BlobStore {

Expand All @@ -71,6 +73,10 @@ class S3BlobStore implements BlobStore {

private volatile ByteSizeValue bufferSize;

private volatile boolean redirectLargeUploads;

private volatile boolean uploadRetryEnabled;

private volatile boolean serverSideEncryption;

private volatile ObjectCannedACL cannedACL;
Expand Down Expand Up @@ -119,6 +125,9 @@ class S3BlobStore implements BlobStore {
this.normalExecutorBuilder = normalExecutorBuilder;
this.priorityExecutorBuilder = priorityExecutorBuilder;
this.urgentExecutorBuilder = urgentExecutorBuilder;
// Settings to initialize blobstore with.
this.redirectLargeUploads = REDIRECT_LARGE_S3_UPLOAD.get(repositoryMetadata.settings());
this.uploadRetryEnabled = UPLOAD_RETRY_ENABLED.get(repositoryMetadata.settings());
}

@Override
Expand All @@ -130,6 +139,8 @@ public void reload(RepositoryMetadata repositoryMetadata) {
this.cannedACL = initCannedACL(CANNED_ACL_SETTING.get(repositoryMetadata.settings()));
this.storageClass = initStorageClass(STORAGE_CLASS_SETTING.get(repositoryMetadata.settings()));
this.bulkDeletesSize = BULK_DELETE_SIZE.get(repositoryMetadata.settings());
this.redirectLargeUploads = REDIRECT_LARGE_S3_UPLOAD.get(repositoryMetadata.settings());
this.uploadRetryEnabled = UPLOAD_RETRY_ENABLED.get(repositoryMetadata.settings());
}

@Override
Expand All @@ -149,6 +160,14 @@ int getMaxRetries() {
return service.settings(repositoryMetadata).maxRetries;
}

public boolean isRedirectLargeUploads() {
return redirectLargeUploads;
}

public boolean isUploadRetryEnabled() {
return uploadRetryEnabled;
}

public String bucket() {
return bucket;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,20 @@ class S3Repository extends MeteredBlobStoreRepository {
*/
static final ByteSizeValue MAX_FILE_SIZE_USING_MULTIPART = new ByteSizeValue(5, ByteSizeUnit.TB);

/**
* Whether large uploads need to be redirected to slow sync s3 client.
*/
static final Setting<Boolean> REDIRECT_LARGE_S3_UPLOAD = Setting.boolSetting(
"redirect_large_s3_upload",
true,
Setting.Property.NodeScope
);

/**
* Whether retry on uploads are enabled. This setting wraps inputstream with buffered stream to enable retries.
*/
static final Setting<Boolean> UPLOAD_RETRY_ENABLED = Setting.boolSetting("s3_upload_retry_enabled", true, Setting.Property.NodeScope);

/**
* Minimum threshold below which the chunk is uploaded using a single request. Beyond this threshold,
* the S3 repository will use the AWS Multipart Upload API to split the chunk into several parts, each of buffer_size length, and
Expand Down Expand Up @@ -391,7 +405,9 @@ public void reload(RepositoryMetadata newRepositoryMetadata) {

// Reload configs for S3RepositoryPlugin
service.settings(metadata);
service.releaseCachedClients();
s3AsyncService.settings(metadata);
s3AsyncService.releaseCachedClients();

// Reload configs for S3BlobStore
BlobStore blobStore = getBlobStore();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -252,7 +252,9 @@ public List<Setting<?>> getSettings() {
S3ClientSettings.IDENTITY_TOKEN_FILE_SETTING,
S3ClientSettings.ROLE_SESSION_NAME_SETTING,
S3Repository.PARALLEL_MULTIPART_UPLOAD_MINIMUM_PART_SIZE_SETTING,
S3Repository.PARALLEL_MULTIPART_UPLOAD_ENABLED_SETTING
S3Repository.PARALLEL_MULTIPART_UPLOAD_ENABLED_SETTING,
S3Repository.REDIRECT_LARGE_S3_UPLOAD,
S3Repository.UPLOAD_RETRY_ENABLED
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -438,7 +438,7 @@ private static IrsaCredentials buildFromEnviroment(IrsaCredentials defaults) {
return new IrsaCredentials(webIdentityTokenFile, roleArn, roleSessionName);
}

private synchronized void releaseCachedClients() {
public synchronized void releaseCachedClients() {
// the clients will shutdown when they will not be used anymore
for (final AmazonS3Reference clientReference : clientsCache.values()) {
clientReference.decRef();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@
import org.opensearch.common.StreamContext;
import org.opensearch.common.blobstore.stream.write.WritePriority;
import org.opensearch.common.io.InputStreamContainer;
import org.opensearch.core.common.unit.ByteSizeUnit;
import org.opensearch.repositories.s3.SocketAccess;
import org.opensearch.repositories.s3.StatsMetricPublisher;
import org.opensearch.repositories.s3.io.CheckedContainer;

import java.io.BufferedInputStream;
Expand Down Expand Up @@ -54,6 +54,7 @@ public class AsyncPartsHandler {
* @param uploadId Upload Id against which multi-part is being performed
* @param completedParts Reference of completed parts
* @param inputStreamContainers Checksum containers
* @param statsMetricPublisher sdk metric publisher
* @return list of completable futures
* @throws IOException thrown in case of an IO error
*/
Expand All @@ -66,7 +67,9 @@ public static List<CompletableFuture<CompletedPart>> uploadParts(
StreamContext streamContext,
String uploadId,
AtomicReferenceArray<CompletedPart> completedParts,
AtomicReferenceArray<CheckedContainer> inputStreamContainers
AtomicReferenceArray<CheckedContainer> inputStreamContainers,
StatsMetricPublisher statsMetricPublisher,
boolean uploadRetryEnabled
) throws IOException {
List<CompletableFuture<CompletedPart>> futures = new ArrayList<>();
for (int partIdx = 0; partIdx < streamContext.getNumberOfParts(); partIdx++) {
Expand All @@ -77,6 +80,7 @@ public static List<CompletableFuture<CompletedPart>> uploadParts(
.partNumber(partIdx + 1)
.key(uploadRequest.getKey())
.uploadId(uploadId)
.overrideConfiguration(o -> o.addMetricPublisher(statsMetricPublisher.multipartUploadMetricCollector))
.contentLength(inputStreamContainer.getContentLength());
if (uploadRequest.doRemoteDataIntegrityCheck()) {
uploadPartRequestBuilder.checksumAlgorithm(ChecksumAlgorithm.CRC32);
Expand All @@ -91,7 +95,8 @@ public static List<CompletableFuture<CompletedPart>> uploadParts(
futures,
uploadPartRequestBuilder.build(),
inputStreamContainer,
uploadRequest
uploadRequest,
uploadRetryEnabled
);
}

Expand Down Expand Up @@ -128,6 +133,18 @@ public static void cleanUpParts(S3AsyncClient s3AsyncClient, UploadRequest uploa
}));
}

public static InputStream maybeRetryInputStream(
InputStream inputStream,
WritePriority writePriority,
boolean uploadRetryEnabled,
long contentLength
) {
if (uploadRetryEnabled == true && (writePriority == WritePriority.HIGH || writePriority == WritePriority.URGENT)) {
return new BufferedInputStream(inputStream, (int) (contentLength + 1));
}
return inputStream;
}

private static void uploadPart(
S3AsyncClient s3AsyncClient,
ExecutorService executorService,
Expand All @@ -138,7 +155,8 @@ private static void uploadPart(
List<CompletableFuture<CompletedPart>> futures,
UploadPartRequest uploadPartRequest,
InputStreamContainer inputStreamContainer,
UploadRequest uploadRequest
UploadRequest uploadRequest,
boolean uploadRetryEnabled
) {
Integer partNumber = uploadPartRequest.partNumber();

Expand All @@ -150,9 +168,13 @@ private static void uploadPart(
} else {
streamReadExecutor = executorService;
}
// Buffered stream is needed to allow mark and reset ops during IO errors so that only buffered
// data can be retried instead of retrying whole file by the application.
InputStream inputStream = new BufferedInputStream(inputStreamContainer.getInputStream(), (int) (ByteSizeUnit.MB.toBytes(1) + 1));

InputStream inputStream = maybeRetryInputStream(
inputStreamContainer.getInputStream(),
uploadRequest.getWritePriority(),
uploadRetryEnabled,
uploadPartRequest.contentLength()
);
CompletableFuture<UploadPartResponse> uploadPartResponseFuture = SocketAccess.doPrivileged(
() -> s3AsyncClient.uploadPart(
uploadPartRequest,
Expand Down
Loading

0 comments on commit 94cde48

Please sign in to comment.