Skip to content

Commit

Permalink
Use async client for delete blob or path in S3 Blob Container (#16788) (
Browse files Browse the repository at this point in the history
#16984)

* Use async client for delete blob or path in S3 Blob Container



* Fix UTs



* Fix failures in S3BlobStoreRepositoryTests



* Fix S3BlobStoreRepositoryTests



* Fix failures in S3RepositoryThirdPartyTests



* Fix failures in S3RepositoryPluginTests



---------


(cherry picked from commit 1d4b85f)

Signed-off-by: Ashish Singh <[email protected]>
Signed-off-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com>
Co-authored-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com>
  • Loading branch information
1 parent e5c51e6 commit 36faab0
Show file tree
Hide file tree
Showing 9 changed files with 322 additions and 299 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@
import org.opensearch.repositories.RepositoryStats;
import org.opensearch.repositories.blobstore.BlobStoreRepository;
import org.opensearch.repositories.blobstore.OpenSearchMockAPIBasedRepositoryIntegTestCase;
import org.opensearch.repositories.s3.async.AsyncTransferManager;
import org.opensearch.repositories.s3.utils.AwsRequestSigner;
import org.opensearch.snapshots.SnapshotId;
import org.opensearch.snapshots.SnapshotsService;
Expand Down Expand Up @@ -166,7 +167,6 @@ protected Settings nodeSettings(int nodeOrdinal) {
// Disable request throttling because some random values in tests might generate too many failures for the S3 client
.put(S3ClientSettings.USE_THROTTLE_RETRIES_SETTING.getConcreteSettingForNamespace("test").getKey(), false)
.put(S3ClientSettings.PROXY_TYPE_SETTING.getConcreteSettingForNamespace("test").getKey(), ProxySettings.ProxyType.DIRECT)
.put(BlobStoreRepository.SNAPSHOT_ASYNC_DELETION_ENABLE_SETTING.getKey(), false)
.put(super.nodeSettings(nodeOrdinal))
.setSecureSettings(secureSettings);

Expand Down Expand Up @@ -316,22 +316,27 @@ protected S3Repository createRepository(
ClusterService clusterService,
RecoverySettings recoverySettings
) {
GenericStatsMetricPublisher genericStatsMetricPublisher = new GenericStatsMetricPublisher(10000L, 10, 10000L, 10);

AsyncTransferManager asyncUploadUtils = new AsyncTransferManager(
S3Repository.PARALLEL_MULTIPART_UPLOAD_MINIMUM_PART_SIZE_SETTING.get(clusterService.getSettings()).getBytes(),
normalExecutorBuilder.getStreamReader(),
priorityExecutorBuilder.getStreamReader(),
urgentExecutorBuilder.getStreamReader(),
transferSemaphoresHolder
);
return new S3Repository(
metadata,
registry,
service,
clusterService,
recoverySettings,
null,
null,
null,
null,
null,
false,
null,
null,
asyncUploadUtils,
urgentExecutorBuilder,
priorityExecutorBuilder,
normalExecutorBuilder,
s3AsyncService,
S3Repository.PARALLEL_MULTIPART_UPLOAD_ENABLED_SETTING.get(clusterService.getSettings()),
normalPrioritySizeBasedBlockingQ,
lowPrioritySizeBasedBlockingQ,
genericStatsMetricPublisher
) {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,14 +55,6 @@

public class S3RepositoryThirdPartyTests extends AbstractThirdPartyRepositoryTestCase {

@Override
protected Settings nodeSettings() {
return Settings.builder()
.put(super.nodeSettings())
.put(BlobStoreRepository.SNAPSHOT_ASYNC_DELETION_ENABLE_SETTING.getKey(), false)
.build();
}

@Override
@Before
@SuppressForbidden(reason = "Need to set system property here for AWS SDK v2")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
import software.amazon.awssdk.http.nio.netty.NettyNioAsyncHttpClient;
import software.amazon.awssdk.http.nio.netty.ProxyConfiguration;
import software.amazon.awssdk.http.nio.netty.SdkEventLoopGroup;
import software.amazon.awssdk.profiles.ProfileFileSystemSetting;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.s3.S3AsyncClient;
import software.amazon.awssdk.services.s3.S3AsyncClientBuilder;
Expand Down Expand Up @@ -120,6 +119,7 @@ public AmazonAsyncS3Reference client(
if (existing != null && existing.tryIncRef()) {
return existing;
}

final AmazonAsyncS3Reference clientReference = new AmazonAsyncS3Reference(
buildClient(clientSettings, urgentExecutorBuilder, priorityExecutorBuilder, normalExecutorBuilder)
);
Expand Down Expand Up @@ -235,17 +235,17 @@ synchronized AmazonAsyncS3WithCredentials buildClient(
}

static ClientOverrideConfiguration buildOverrideConfiguration(final S3ClientSettings clientSettings) {
RetryPolicy retryPolicy = SocketAccess.doPrivileged(
() -> RetryPolicy.builder()
.numRetries(clientSettings.maxRetries)
.throttlingBackoffStrategy(
clientSettings.throttleRetries ? BackoffStrategy.defaultThrottlingStrategy(RetryMode.STANDARD) : BackoffStrategy.none()
)
.build()
);

return ClientOverrideConfiguration.builder()
.retryPolicy(
RetryPolicy.builder()
.numRetries(clientSettings.maxRetries)
.throttlingBackoffStrategy(
clientSettings.throttleRetries
? BackoffStrategy.defaultThrottlingStrategy(RetryMode.STANDARD)
: BackoffStrategy.none()
)
.build()
)
.retryPolicy(retryPolicy)
.apiCallAttemptTimeout(Duration.ofMillis(clientSettings.requestTimeoutMillis))
.build();
}
Expand Down Expand Up @@ -346,12 +346,7 @@ static AwsCredentialsProvider buildCredentials(Logger logger, S3ClientSettings c
// valid paths.
@SuppressForbidden(reason = "Need to provide this override to v2 SDK so that path does not default to home path")
private static void setDefaultAwsProfilePath() {
if (ProfileFileSystemSetting.AWS_SHARED_CREDENTIALS_FILE.getStringValue().isEmpty()) {
System.setProperty(ProfileFileSystemSetting.AWS_SHARED_CREDENTIALS_FILE.property(), System.getProperty("opensearch.path.conf"));
}
if (ProfileFileSystemSetting.AWS_CONFIG_FILE.getStringValue().isEmpty()) {
System.setProperty(ProfileFileSystemSetting.AWS_CONFIG_FILE.property(), System.getProperty("opensearch.path.conf"));
}
S3Service.setDefaultAwsProfilePath();
}

private static IrsaCredentials buildFromEnvironment(IrsaCredentials defaults) {
Expand Down Expand Up @@ -443,5 +438,6 @@ public AwsCredentials resolveCredentials() {
@Override
public void close() {
releaseCachedClients();

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -43,9 +43,6 @@
import software.amazon.awssdk.services.s3.model.CompletedMultipartUpload;
import software.amazon.awssdk.services.s3.model.CompletedPart;
import software.amazon.awssdk.services.s3.model.CreateMultipartUploadRequest;
import software.amazon.awssdk.services.s3.model.Delete;
import software.amazon.awssdk.services.s3.model.DeleteObjectsRequest;
import software.amazon.awssdk.services.s3.model.DeleteObjectsResponse;
import software.amazon.awssdk.services.s3.model.GetObjectAttributesRequest;
import software.amazon.awssdk.services.s3.model.GetObjectAttributesResponse;
import software.amazon.awssdk.services.s3.model.GetObjectRequest;
Expand All @@ -55,9 +52,7 @@
import software.amazon.awssdk.services.s3.model.ListObjectsV2Response;
import software.amazon.awssdk.services.s3.model.NoSuchKeyException;
import software.amazon.awssdk.services.s3.model.ObjectAttributes;
import software.amazon.awssdk.services.s3.model.ObjectIdentifier;
import software.amazon.awssdk.services.s3.model.PutObjectRequest;
import software.amazon.awssdk.services.s3.model.S3Error;
import software.amazon.awssdk.services.s3.model.ServerSideEncryption;
import software.amazon.awssdk.services.s3.model.UploadPartRequest;
import software.amazon.awssdk.services.s3.model.UploadPartResponse;
Expand All @@ -68,7 +63,7 @@
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.opensearch.ExceptionsHelper;
import org.opensearch.action.support.PlainActionFuture;
import org.opensearch.common.Nullable;
import org.opensearch.common.SetOnce;
import org.opensearch.common.StreamContext;
Expand Down Expand Up @@ -101,11 +96,8 @@
import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Function;
Expand Down Expand Up @@ -381,125 +373,17 @@ public void writeBlobAtomic(String blobName, InputStream inputStream, long blobS
}

@Override
public DeleteResult delete() throws IOException {
final AtomicLong deletedBlobs = new AtomicLong();
final AtomicLong deletedBytes = new AtomicLong();
try (AmazonS3Reference clientReference = blobStore.clientReference()) {
ListObjectsV2Iterable listObjectsIterable = SocketAccess.doPrivileged(
() -> clientReference.get()
.listObjectsV2Paginator(
ListObjectsV2Request.builder()
.bucket(blobStore.bucket())
.prefix(keyPath)
.overrideConfiguration(
o -> o.addMetricPublisher(blobStore.getStatsMetricPublisher().listObjectsMetricPublisher)
)
.build()
)
);

Iterator<ListObjectsV2Response> listObjectsResponseIterator = listObjectsIterable.iterator();
while (listObjectsResponseIterator.hasNext()) {
ListObjectsV2Response listObjectsResponse = SocketAccess.doPrivileged(listObjectsResponseIterator::next);
List<String> blobsToDelete = listObjectsResponse.contents().stream().map(s3Object -> {
deletedBlobs.incrementAndGet();
deletedBytes.addAndGet(s3Object.size());

return s3Object.key();
}).collect(Collectors.toList());

if (!listObjectsResponseIterator.hasNext()) {
blobsToDelete.add(keyPath);
}

doDeleteBlobs(blobsToDelete, false);
}
} catch (SdkException e) {
throw new IOException("Exception when deleting blob container [" + keyPath + "]", e);
}

return new DeleteResult(deletedBlobs.get(), deletedBytes.get());
public DeleteResult delete() {
PlainActionFuture<DeleteResult> future = new PlainActionFuture<>();
deleteAsync(future);
return future.actionGet();
}

@Override
public void deleteBlobsIgnoringIfNotExists(List<String> blobNames) throws IOException {
doDeleteBlobs(blobNames, true);
}

private void doDeleteBlobs(List<String> blobNames, boolean relative) throws IOException {
if (blobNames.isEmpty()) {
return;
}
final Set<String> outstanding;
if (relative) {
outstanding = blobNames.stream().map(this::buildKey).collect(Collectors.toSet());
} else {
outstanding = new HashSet<>(blobNames);
}
try (AmazonS3Reference clientReference = blobStore.clientReference()) {
// S3 API allows 1k blobs per delete so we split up the given blobs into requests of bulk size deletes
final List<DeleteObjectsRequest> deleteRequests = new ArrayList<>();
final List<String> partition = new ArrayList<>();
for (String key : outstanding) {
partition.add(key);
if (partition.size() == blobStore.getBulkDeletesSize()) {
deleteRequests.add(bulkDelete(blobStore.bucket(), partition));
partition.clear();
}
}
if (partition.isEmpty() == false) {
deleteRequests.add(bulkDelete(blobStore.bucket(), partition));
}
SocketAccess.doPrivilegedVoid(() -> {
SdkException aex = null;
for (DeleteObjectsRequest deleteRequest : deleteRequests) {
List<String> keysInRequest = deleteRequest.delete()
.objects()
.stream()
.map(ObjectIdentifier::key)
.collect(Collectors.toList());
try {
DeleteObjectsResponse deleteObjectsResponse = clientReference.get().deleteObjects(deleteRequest);
outstanding.removeAll(keysInRequest);
outstanding.addAll(deleteObjectsResponse.errors().stream().map(S3Error::key).collect(Collectors.toSet()));
if (!deleteObjectsResponse.errors().isEmpty()) {
logger.warn(
() -> new ParameterizedMessage(
"Failed to delete some blobs {}",
deleteObjectsResponse.errors()
.stream()
.map(s3Error -> "[" + s3Error.key() + "][" + s3Error.code() + "][" + s3Error.message() + "]")
.collect(Collectors.toList())
)
);
}
} catch (SdkException e) {
// The AWS client threw any unexpected exception and did not execute the request at all so we do not
// remove any keys from the outstanding deletes set.
aex = ExceptionsHelper.useOrSuppress(aex, e);
}
}
if (aex != null) {
throw aex;
}
});
} catch (Exception e) {
throw new IOException("Failed to delete blobs [" + outstanding + "]", e);
}
assert outstanding.isEmpty();
}

private DeleteObjectsRequest bulkDelete(String bucket, List<String> blobs) {
return DeleteObjectsRequest.builder()
.bucket(bucket)
.delete(
Delete.builder()
.objects(blobs.stream().map(blob -> ObjectIdentifier.builder().key(blob).build()).collect(Collectors.toList()))
.quiet(true)
.build()
)
.overrideConfiguration(o -> o.addMetricPublisher(blobStore.getStatsMetricPublisher().deleteObjectsMetricPublisher))
.build();
public void deleteBlobsIgnoringIfNotExists(List<String> blobNames) {
PlainActionFuture<Void> future = new PlainActionFuture<>();
deleteBlobsAsyncIgnoringIfNotExists(blobNames, future);
future.actionGet();
}

@Override
Expand Down Expand Up @@ -886,7 +770,11 @@ public void deleteAsync(ActionListener<DeleteResult> completionListener) {
try (AmazonAsyncS3Reference asyncClientReference = blobStore.asyncClientReference()) {
S3AsyncClient s3AsyncClient = asyncClientReference.get().client();

ListObjectsV2Request listRequest = ListObjectsV2Request.builder().bucket(blobStore.bucket()).prefix(keyPath).build();
ListObjectsV2Request listRequest = ListObjectsV2Request.builder()
.bucket(blobStore.bucket())
.prefix(keyPath)
.overrideConfiguration(o -> o.addMetricPublisher(blobStore.getStatsMetricPublisher().listObjectsMetricPublisher))
.build();
ListObjectsV2Publisher listPublisher = s3AsyncClient.listObjectsV2Paginator(listRequest);

AtomicLong deletedBlobs = new AtomicLong();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,19 +93,19 @@ public class S3RepositoryPlugin extends Plugin implements RepositoryPlugin, Relo
private static final String NORMAL_TRANSFER_QUEUE_CONSUMER = "normal_transfer_queue_consumer";

protected final S3Service service;
private final S3AsyncService s3AsyncService;
protected final S3AsyncService s3AsyncService;

private final Path configPath;

private AsyncExecutorContainer urgentExecutorBuilder;
private AsyncExecutorContainer priorityExecutorBuilder;
private AsyncExecutorContainer normalExecutorBuilder;
protected AsyncExecutorContainer urgentExecutorBuilder;
protected AsyncExecutorContainer priorityExecutorBuilder;
protected AsyncExecutorContainer normalExecutorBuilder;
private ExecutorService lowTransferQConsumerService;
private ExecutorService normalTransferQConsumerService;
private SizeBasedBlockingQ normalPrioritySizeBasedBlockingQ;
private SizeBasedBlockingQ lowPrioritySizeBasedBlockingQ;
private TransferSemaphoresHolder transferSemaphoresHolder;
private GenericStatsMetricPublisher genericStatsMetricPublisher;
protected SizeBasedBlockingQ normalPrioritySizeBasedBlockingQ;
protected SizeBasedBlockingQ lowPrioritySizeBasedBlockingQ;
protected TransferSemaphoresHolder transferSemaphoresHolder;
protected GenericStatsMetricPublisher genericStatsMetricPublisher;

public S3RepositoryPlugin(final Settings settings, final Path configPath) {
this(settings, configPath, new S3Service(configPath), new S3AsyncService(configPath));
Expand Down Expand Up @@ -387,5 +387,8 @@ public void reload(Settings settings) {
public void close() throws IOException {
service.close();
s3AsyncService.close();
urgentExecutorBuilder.getAsyncTransferEventLoopGroup().close();
priorityExecutorBuilder.getAsyncTransferEventLoopGroup().close();
normalExecutorBuilder.getAsyncTransferEventLoopGroup().close();
}
}
Loading

0 comments on commit 36faab0

Please sign in to comment.