Skip to content

Commit

Permalink
fix: allow endpoint overrides in all clients (#436)
Browse files Browse the repository at this point in the history
* fix: tunnel properties through provisioner

* allow setting endpoint overrides in all clients

* DEPENDENCIES

* checkstyle

* simplification
  • Loading branch information
paullatzelsperger authored Sep 9, 2024
1 parent c0dd9a5 commit f9e2d9c
Show file tree
Hide file tree
Showing 14 changed files with 114 additions and 53 deletions.
6 changes: 3 additions & 3 deletions DEPENDENCIES
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ maven/mavencentral/com.google.errorprone/error_prone_annotations/2.28.0, Apache-
maven/mavencentral/com.google.guava/failureaccess/1.0.2, Apache-2.0, approved, CQ22654
maven/mavencentral/com.google.guava/guava/33.3.0-jre, Apache-2.0 AND CC0-1.0 AND (Apache-2.0 AND CC-PDDC) AND (Apache-2.0 AND CC0-1.0), approved, #15952
maven/mavencentral/com.google.guava/listenablefuture/9999.0-empty-to-avoid-conflict-with-guava, Apache-2.0, approved, CQ22657
maven/mavencentral/com.puppycrawl.tools/checkstyle/10.18.0, LGPL-2.1-or-later, restricted, clearlydefined
maven/mavencentral/com.puppycrawl.tools/checkstyle/10.18.1, LGPL-2.1-or-later AND (Apache-2.0 AND LGPL-2.1-or-later) AND Apache-2.0, approved, #16060
maven/mavencentral/com.squareup.okhttp3/okhttp-dnsoverhttps/4.12.0, Apache-2.0, approved, #11159
maven/mavencentral/com.squareup.okhttp3/okhttp/4.12.0, Apache-2.0, approved, #15227
maven/mavencentral/com.squareup.okhttp3/okhttp/4.9.3, Apache-2.0 AND MPL-2.0, approved, #3225
Expand Down Expand Up @@ -51,7 +51,7 @@ maven/mavencentral/net.bytebuddy/byte-buddy/1.14.16, Apache-2.0 AND BSD-3-Clause
maven/mavencentral/net.bytebuddy/byte-buddy/1.14.18, Apache-2.0 AND BSD-3-Clause, approved, #7163
maven/mavencentral/net.bytebuddy/byte-buddy/1.15.0, Apache-2.0 AND BSD-3-Clause, approved, #16008
maven/mavencentral/net.java.dev.jna/jna/5.13.0, Apache-2.0 AND LGPL-2.1-or-later, approved, #15196
maven/mavencentral/net.sf.saxon/Saxon-HE/12.5, NOASSERTION, restricted, clearlydefined
maven/mavencentral/net.sf.saxon/Saxon-HE/12.5, MPL-2.0-no-copyleft-exception AND (LicenseRef-scancode-proprietary-license AND MPL-2.0-no-copyleft-exception) AND (MPL-2.0-no-copyleft-exception AND X11) AND (MIT AND MPL-2.0-no-copyleft-exception) AND (MPL-1.0 AND MPL-2.0-no-copyleft-exception) AND (Apache-2.0 AND MPL-2.0-no-copyleft-exception) AND MPL-1.0, restricted, #16061
maven/mavencentral/org.antlr/antlr4-runtime/4.13.2, BSD-3-Clause, approved, #10767
maven/mavencentral/org.apache.commons/commons-compress/1.24.0, Apache-2.0 AND BSD-3-Clause AND bzip2-1.0.6 AND LicenseRef-Public-Domain, approved, #10368
maven/mavencentral/org.apache.commons/commons-lang3/3.7, Apache-2.0, approved, clearlydefined
Expand Down Expand Up @@ -127,7 +127,7 @@ maven/mavencentral/org.junit.jupiter/junit-jupiter-engine/5.11.0, EPL-2.0, appro
maven/mavencentral/org.junit.jupiter/junit-jupiter-params/5.11.0, EPL-2.0, approved, #15940
maven/mavencentral/org.junit.platform/junit-platform-commons/1.11.0, EPL-2.0, approved, #15936
maven/mavencentral/org.junit.platform/junit-platform-engine/1.11.0, EPL-2.0, approved, #15932
maven/mavencentral/org.junit/junit-bom/5.11.0, , restricted, clearlydefined
maven/mavencentral/org.junit/junit-bom/5.11.0, EPL-2.0, approved, #16062
maven/mavencentral/org.mockito/mockito-core/5.13.0, MIT, approved, clearlydefined
maven/mavencentral/org.mockito/mockito-core/5.2.0, MIT AND (Apache-2.0 AND MIT) AND Apache-2.0, approved, #7401
maven/mavencentral/org.objenesis/objenesis/3.3, Apache-2.0, approved, clearlydefined
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
package org.eclipse.edc.aws.s3;

import org.eclipse.edc.runtime.metamodel.annotation.ExtensionPoint;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.iam.IamAsyncClient;
import software.amazon.awssdk.services.s3.S3AsyncClient;
import software.amazon.awssdk.services.s3.S3Client;
Expand Down Expand Up @@ -44,18 +45,48 @@ public interface AwsClientProvider {

/**
* Returns the s3 async client for the specified region
*
* @deprecated Use {@link AwsClientProvider#s3AsyncClient(S3ClientRequest)} instead!
*/
S3AsyncClient s3AsyncClient(String region);
@Deprecated
default S3AsyncClient s3AsyncClient(String region) {
return s3AsyncClient(S3ClientRequest.from(region, null));
}

/**
* Returns the s3 async client for the specified region
*/
S3AsyncClient s3AsyncClient(S3ClientRequest s3ClientRequest);

/**
* Returns the iam async client for the global region
*
* @deprecated Use {@link AwsClientProvider#iamAsyncClient(S3ClientRequest)} instead!
*/
@Deprecated
default IamAsyncClient iamAsyncClient() {
return iamAsyncClient(S3ClientRequest.from(Region.AWS_GLOBAL.id(), null));
}

/**
* Returns the iam async client for the global region
*/
IamAsyncClient iamAsyncClient();
IamAsyncClient iamAsyncClient(S3ClientRequest s3ClientRequest);

/**
* Returns the sts async client for the specified region
*
* @deprecated Use {@link AwsClientProvider#stsAsyncClient(S3ClientRequest)} instead!
*/
@Deprecated
default StsAsyncClient stsAsyncClient(String region) {
return stsAsyncClient(S3ClientRequest.from(region, null));
}

/**
* Returns the sts async client for the specified region
*/
StsAsyncClient stsAsyncClient(String region);
StsAsyncClient stsAsyncClient(S3ClientRequest s3ClientRequest);

/**
* Releases resources used by the provider.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@

import java.net.URI;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
Expand All @@ -48,13 +49,12 @@ public class AwsClientProviderImpl implements AwsClientProvider {
private final Map<String, S3Client> s3Clients = new ConcurrentHashMap<>();
private final Map<String, S3AsyncClient> s3AsyncClients = new ConcurrentHashMap<>();
private final Map<String, StsAsyncClient> stsAsyncClients = new ConcurrentHashMap<>();
private final IamAsyncClient iamAsyncClient;
private final Map<String, IamAsyncClient> iamAsyncClients = new ConcurrentHashMap<>();

public AwsClientProviderImpl(AwsClientProviderConfiguration configuration) {
this.credentialsProvider = configuration.getCredentialsProvider();
this.configuration = configuration;
this.executor = Executors.newFixedThreadPool(configuration.getThreadPoolSize(), new ThreadFactoryBuilder().threadNamePrefix("aws-client").build());
this.iamAsyncClient = createIamAsyncClient();
}

@Override
Expand All @@ -63,23 +63,26 @@ public S3Client s3Client(S3ClientRequest s3ClientRequest) {
}

@Override
public S3AsyncClient s3AsyncClient(String region) {
return s3AsyncClients.computeIfAbsent(region, this::createS3AsyncClient);
public S3AsyncClient s3AsyncClient(S3ClientRequest clientRequest) {
var key = clientRequest.region() + "/" + clientRequest.endpointOverride();
return s3AsyncClients.computeIfAbsent(key, s -> createS3AsyncClient(clientRequest.region(), clientRequest.endpointOverride()));
}

@Override
public IamAsyncClient iamAsyncClient() {
return iamAsyncClient;
public IamAsyncClient iamAsyncClient(S3ClientRequest clientRequest) {
var key = clientRequest.endpointOverride();
return iamAsyncClients.computeIfAbsent(key, s -> createIamAsyncClient(clientRequest.endpointOverride()));
}

@Override
public StsAsyncClient stsAsyncClient(String region) {
return stsAsyncClients.computeIfAbsent(region, this::createStsClient);
public StsAsyncClient stsAsyncClient(S3ClientRequest clientRequest) {
var key = clientRequest.region() + "/" + clientRequest.endpointOverride();
return stsAsyncClients.computeIfAbsent(key, s -> createStsClient(clientRequest.region(), clientRequest.endpointOverride()));
}

@Override
public void shutdown() {
iamAsyncClient.close();
iamAsyncClients.values().forEach(SdkAutoCloseable::close);
s3AsyncClients.values().forEach(SdkAutoCloseable::close);
stsAsyncClients.values().forEach(SdkAutoCloseable::close);
}
Expand Down Expand Up @@ -116,35 +119,35 @@ private S3Client createS3Client(AwsCredentialsProvider credentialsProvider, Stri
return builder.build();
}

private S3AsyncClient createS3AsyncClient(String region) {
private S3AsyncClient createS3AsyncClient(String region, String endpointOverride) {
var builder = S3AsyncClient.builder()
.asyncConfiguration(b -> b.advancedOption(FUTURE_COMPLETION_EXECUTOR, executor))
.credentialsProvider(credentialsProvider)
.region(Region.of(region));

handleBaseEndpointOverride(builder, null);
handleBaseEndpointOverride(builder, endpointOverride);

return builder.build();
}

private StsAsyncClient createStsClient(String region) {
private StsAsyncClient createStsClient(String region, String endpointOverride) {
var builder = StsAsyncClient.builder()
.asyncConfiguration(b -> b.advancedOption(FUTURE_COMPLETION_EXECUTOR, executor))
.credentialsProvider(credentialsProvider)
.region(Region.of(region));

handleEndpointOverride(builder);
handleEndpointOverride(builder, endpointOverride);

return builder.build();
}

private IamAsyncClient createIamAsyncClient() {
private IamAsyncClient createIamAsyncClient(String endpointOverride) {
var builder = IamAsyncClient.builder()
.asyncConfiguration(b -> b.advancedOption(FUTURE_COMPLETION_EXECUTOR, executor))
.credentialsProvider(credentialsProvider)
.region(Region.AWS_GLOBAL);

handleEndpointOverride(builder);
handleEndpointOverride(builder, endpointOverride);

return builder.build();
}
Expand All @@ -164,10 +167,13 @@ private void handleBaseEndpointOverride(S3BaseClientBuilder<?, ?> builder, Strin
}
}

private void handleEndpointOverride(SdkClientBuilder<?, ?> builder) {
var endpointOverride = configuration.getEndpointOverride();
if (endpointOverride != null) {
builder.endpointOverride(endpointOverride);
}
private void handleEndpointOverride(SdkClientBuilder<?, ?> builder, String endpointOverride) {

// either take override from parameter, or from config, or null
var uri = Optional.ofNullable(endpointOverride)
.map(URI::create)
.orElseGet(configuration::getEndpointOverride);

builder.endpointOverride(uri);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import org.eclipse.edc.aws.s3.AwsClientProvider;
import org.eclipse.edc.aws.s3.AwsClientProviderConfiguration;
import org.eclipse.edc.aws.s3.AwsClientProviderImpl;
import org.eclipse.edc.aws.s3.S3ClientRequest;
import org.jetbrains.annotations.NotNull;
import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
import software.amazon.awssdk.auth.credentials.AwsCredentials;
Expand Down Expand Up @@ -67,7 +68,8 @@ private S3TestClient(String url, String region) {
.endpointOverride(this.s3Endpoint)
.build();
this.clientProvider = new AwsClientProviderImpl(configuration);
this.s3AsyncClient = clientProvider.s3AsyncClient(region);
var rq = S3ClientRequest.from(region, s3Endpoint.toString());
this.s3AsyncClient = clientProvider.s3AsyncClient(rq);
}

public static S3TestClient create(String url, String region) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,6 @@
import java.util.stream.Stream;

import static org.eclipse.edc.aws.s3.spi.S3BucketSchema.BUCKET_NAME;
import static org.eclipse.edc.aws.s3.spi.S3BucketSchema.KEY_NAME;
import static org.eclipse.edc.aws.s3.spi.S3BucketSchema.KEY_PREFIX;
import static org.eclipse.edc.aws.s3.spi.S3BucketSchema.OBJECT_NAME;
import static org.eclipse.edc.aws.s3.spi.S3BucketSchema.OBJECT_PREFIX;
import static org.eclipse.edc.aws.s3.spi.S3BucketSchema.REGION;
Expand All @@ -46,7 +44,7 @@ public ValidationResult validate(DataAddress dataAddress) {
}
});

if (Stream.of(OBJECT_NAME, KEY_NAME, OBJECT_PREFIX, KEY_PREFIX)
if (Stream.of(OBJECT_NAME, OBJECT_PREFIX)
.map(dataAddress::getStringProperty)
.allMatch(it -> (it == null || it.isBlank()))) {
violations.add(violation("Either the '%s' or '%s' attribute must be provided."
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.eclipse.edc.connector.controlplane.transfer.spi.types.ProvisionedDataDestinationResource;

import static org.eclipse.edc.aws.s3.spi.S3BucketSchema.BUCKET_NAME;
import static org.eclipse.edc.aws.s3.spi.S3BucketSchema.ENDPOINT_OVERRIDE;
import static org.eclipse.edc.aws.s3.spi.S3BucketSchema.REGION;


Expand All @@ -41,6 +42,10 @@ public String getBucketName() {
return getDataAddress().getStringProperty(BUCKET_NAME);
}

public String getEndpointOverride() {
return getDataAddress().getStringProperty(ENDPOINT_OVERRIDE);
}

@Override
public String getResourceName() {
return dataAddress.getStringProperty(BUCKET_NAME);
Expand Down Expand Up @@ -82,6 +87,11 @@ public Builder role(String arn) {
provisionedResource.role = arn;
return this;
}

public Builder endpointOverride(String endpointOverride) {
dataAddressBuilder.property(ENDPOINT_OVERRIDE, endpointOverride);
return this;
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ private StatusResult<ProvisionResponse> provisionSuccedeed(S3BucketResourceDefin
.role(role.roleName())
.transferProcessId(resourceDefinition.getTransferProcessId())
.resourceName(resourceDefinition.getBucketName())
.endpointOverride(resourceDefinition.getEndpointOverride())
.build();

var secretToken = new AwsTemporarySecretToken(credentials.accessKeyId(), credentials.secretAccessKey(), credentials.sessionToken(), credentials.expiration().toEpochMilli());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import org.eclipse.edc.connector.controlplane.transfer.spi.types.ResourceDefinition;

import java.util.Objects;
import java.util.function.Supplier;

/**
* An S3 bucket and access credentials to be provisioned.
Expand All @@ -30,7 +29,7 @@
public class S3BucketResourceDefinition extends ResourceDefinition {
private String regionId;
private String bucketName;
private Supplier<Boolean> checker;
private String endpointOverride;

private S3BucketResourceDefinition() {
}
Expand All @@ -50,6 +49,10 @@ public Builder toBuilder() {
.bucketName(bucketName);
}

public String getEndpointOverride() {
return endpointOverride;
}

@JsonPOJOBuilder(withPrefix = "")
public static class Builder extends ResourceDefinition.Builder<S3BucketResourceDefinition, Builder> {

Expand All @@ -71,6 +74,11 @@ public Builder bucketName(String bucketName) {
return this;
}

public Builder endpointOverride(String endpointOverride) {
resourceDefinition.endpointOverride = endpointOverride;
return this;
}

@Override
protected void verify() {
super.verify();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,14 +32,20 @@ public class S3ConsumerResourceDefinitionGenerator implements ConsumerResourceDe

@Override
public ResourceDefinition generate(TransferProcess transferProcess, Policy policy) {
if (transferProcess.getDataDestination().getStringProperty(S3BucketSchema.REGION) == null) {

var dataDestination = transferProcess.getDataDestination();
var endpointOverride = dataDestination.getStringProperty(S3BucketSchema.ENDPOINT_OVERRIDE);
if (dataDestination.getStringProperty(S3BucketSchema.REGION) == null) {
// FIXME generate region from policy engine
return S3BucketResourceDefinition.Builder.newInstance().id(randomUUID().toString()).bucketName(transferProcess.getDataDestination().getStringProperty(S3BucketSchema.BUCKET_NAME)).regionId(Region.US_EAST_1.id()).build();
return S3BucketResourceDefinition.Builder.newInstance().id(randomUUID().toString()).bucketName(dataDestination.getStringProperty(S3BucketSchema.BUCKET_NAME)).regionId(Region.US_EAST_1.id()).build();
}
var destination = transferProcess.getDataDestination();
var id = randomUUID().toString();

return S3BucketResourceDefinition.Builder.newInstance().id(id).bucketName(destination.getStringProperty(S3BucketSchema.BUCKET_NAME)).regionId(destination.getStringProperty(S3BucketSchema.REGION)).build();
return S3BucketResourceDefinition.Builder.newInstance().id(id)
.bucketName(dataDestination.getStringProperty(S3BucketSchema.BUCKET_NAME))
.regionId(dataDestination.getStringProperty(S3BucketSchema.REGION))
.endpointOverride(endpointOverride)
.build();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import dev.failsafe.Failsafe;
import dev.failsafe.RetryPolicy;
import org.eclipse.edc.aws.s3.AwsClientProvider;
import org.eclipse.edc.aws.s3.S3ClientRequest;
import org.eclipse.edc.connector.controlplane.transfer.spi.types.DeprovisionedResource;
import org.eclipse.edc.spi.monitor.Monitor;
import software.amazon.awssdk.services.iam.IamAsyncClient;
Expand Down Expand Up @@ -56,11 +57,12 @@ public S3DeprovisionPipeline(RetryPolicy<Object> retryPolicy, AwsClientProvider
* Performs a non-blocking deprovisioning operation.
*/
public CompletableFuture<?> deprovision(S3BucketProvisionedResource resource) {
var s3Client = clientProvider.s3AsyncClient(resource.getRegion());
var iamClient = clientProvider.iamAsyncClient();
var rq = S3ClientRequest.from(resource.getRegion(), resource.getEndpointOverride());
var s3Client = clientProvider.s3AsyncClient(rq);
var iamClient = clientProvider.iamAsyncClient(rq);

String bucketName = resource.getBucketName();
String role = resource.getRole();
var bucketName = resource.getBucketName();
var role = resource.getRole();

var listObjectsRequest = ListObjectsV2Request.builder().bucket(bucketName).build();
monitor.debug("S3DeprovisionPipeline: list objects");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import dev.failsafe.Failsafe;
import dev.failsafe.RetryPolicy;
import org.eclipse.edc.aws.s3.AwsClientProvider;
import org.eclipse.edc.aws.s3.S3ClientRequest;
import org.eclipse.edc.spi.monitor.Monitor;
import software.amazon.awssdk.services.iam.IamAsyncClient;
import software.amazon.awssdk.services.iam.model.CreateRoleRequest;
Expand Down Expand Up @@ -80,9 +81,10 @@ private S3ProvisionPipeline(RetryPolicy<Object> retryPolicy, AwsClientProvider c
* Performs a non-blocking provisioning operation.
*/
public CompletableFuture<S3ProvisionResponse> provision(S3BucketResourceDefinition resourceDefinition) {
var s3AsyncClient = clientProvider.s3AsyncClient(resourceDefinition.getRegionId());
var iamClient = clientProvider.iamAsyncClient();
var stsClient = clientProvider.stsAsyncClient(resourceDefinition.getRegionId());
var rq = S3ClientRequest.from(resourceDefinition.getRegionId(), resourceDefinition.getEndpointOverride());
var s3AsyncClient = clientProvider.s3AsyncClient(rq);
var iamClient = clientProvider.iamAsyncClient(rq);
var stsClient = clientProvider.stsAsyncClient(rq);

var request = CreateBucketRequest.builder()
.bucket(resourceDefinition.getBucketName())
Expand Down
Loading

0 comments on commit f9e2d9c

Please sign in to comment.