Skip to content

Commit

Permalink
Transfer destination path feature (Digital-Ecosystems#72)
Browse files Browse the repository at this point in the history
* fix: datasink with a non ionos provider datasource

* fix: datasink with a non ionos provider datasource

* fix: filter in transfer of multiple folders

* fix: filter in transfer of multiple folders

* feat: transfer destination path

* feat: transfer destination path
  • Loading branch information
jannotti-glaucio authored Mar 12, 2024
1 parent 483b8e9 commit 912664a
Show file tree
Hide file tree
Showing 8 changed files with 44 additions and 57 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ public interface IonosBucketSchema {
String STORAGE_NAME = EDC_NAMESPACE + "storage";
String BUCKET_NAME = EDC_NAMESPACE + "bucketName";
String BLOB_NAME = EDC_NAMESPACE + "blobName";
String PATH = EDC_NAMESPACE + "path";
String FILTER_INCLUDES = EDC_NAMESPACE + "filter.includes";
String FILTER_EXCLUDES = EDC_NAMESPACE + "filter.excludes";
String ACCESS_KEY_ID = EDC_NAMESPACE + "accessKey";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ public class IonosDataSink extends ParallelSink {

private S3ConnectorApi s3Api;
private String bucketName;
private String blobName;
private String path;

private IonosDataSink() {}

Expand All @@ -41,8 +41,8 @@ protected StreamResult<Object> transferParts(List<DataSource.Part> parts) {

for (DataSource.Part part : parts) {
String blobName;
if (this.blobName != null) {
blobName = this.blobName;
if (this.path != null) {
blobName = this.path + part.name();
} else {
blobName = part.name();
}
Expand All @@ -53,6 +53,8 @@ protected StreamResult<Object> transferParts(List<DataSource.Part> parts) {
streamsOutput = new ByteArrayOutputStream();
stream = part.openStream();


// TODO Make this more configurable
if (part instanceof IonosDataSource.S3Part) {
// Multiple fetches
while (stream != null) {
Expand Down Expand Up @@ -130,8 +132,8 @@ public Builder bucketName(String bucketName) {
return this;
}

public Builder blobName(String blobName) {
sink.blobName = blobName;
public Builder path(String path) {
sink.path = path;
return this;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ public DataSink createSink(DataFlowRequest request) {
maxFiles);
return IonosDataSink.Builder.newInstance()
.bucketName(destination.getStringProperty(IonosBucketSchema.BUCKET_NAME))
.blobName(destination.getStringProperty(IonosBucketSchema.BLOB_NAME))
.path(destination.getStringProperty(IonosBucketSchema.PATH))
.requestId(request.getId())
.executorService(executorService)
.monitor(monitor)
Expand All @@ -104,7 +104,7 @@ public DataSink createSink(DataFlowRequest request) {
maxFiles);
return IonosDataSink.Builder.newInstance()
.bucketName(destination.getStringProperty(IonosBucketSchema.BUCKET_NAME))
.blobName(destination.getStringProperty(IonosBucketSchema.BLOB_NAME))
.path(destination.getStringProperty(IonosBucketSchema.PATH))
.requestId(request.getId())
.executorService(executorService)
.monitor(monitor)
Expand All @@ -115,7 +115,7 @@ public DataSink createSink(DataFlowRequest request) {

return IonosDataSink.Builder.newInstance()
.bucketName(destination.getStringProperty(IonosBucketSchema.BUCKET_NAME))
.blobName(destination.getStringProperty(IonosBucketSchema.BLOB_NAME))
.path(destination.getStringProperty(IonosBucketSchema.PATH))
.requestId(request.getId()).executorService(executorService)
.monitor(monitor)
.s3Api(s3Api)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ public void initialize(ServiceExtensionContext context) {
monitor.debug("IonosProvisionExtension" + "retryPolicy");
var retryPolicy = (RetryPolicy<Object>) context.getService(RetryPolicy.class);
monitor.debug("IonosProvisionExtension" + "s3BucketProvisioner");
var s3BucketProvisioner = new IonosS3Provisioner(retryPolicy, monitor, clientApi);
var s3BucketProvisioner = new IonosS3Provisioner(retryPolicy, clientApi);
provisionManager.register(s3BucketProvisioner);

// register the generator
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.eclipse.edc.policy.model.Policy;

import com.ionos.edc.extension.s3.schema.IonosBucketSchema;
import org.eclipse.edc.spi.EdcException;

public class IonosS3ConsumerResourceDefinitionGenerator implements ConsumerResourceDefinitionGenerator {

Expand All @@ -33,11 +34,16 @@ public ResourceDefinition generate(DataRequest dataRequest, Policy policy) {
Objects.requireNonNull(policy, "policy must always be provided");

var destination = dataRequest.getDataDestination();

var path = destination.getStringProperty(IonosBucketSchema.PATH);
if ((path != null) && !path.endsWith("/")) {
throw new EdcException("path must be a directory");
}

var id = randomUUID().toString();
var keyName = destination.getKeyName();
var storage = destination.getStringProperty(IonosBucketSchema.STORAGE_NAME);
var bucketName = destination.getStringProperty(IonosBucketSchema.BUCKET_NAME);
var blobName = destination.getStringProperty(IonosBucketSchema.BLOB_NAME);
var accessKey = destination.getStringProperty(IonosBucketSchema.ACCESS_KEY_ID);
var secretKey = destination.getStringProperty(IonosBucketSchema.SECRET_ACCESS_KEY);

Expand All @@ -46,8 +52,9 @@ public ResourceDefinition generate(DataRequest dataRequest, Policy policy) {
.keyName(keyName)
.storage(storage)
.bucketName(bucketName)
.blobName(blobName)
.path(path)
.accessKey(accessKey)
.secretKey(secretKey)
.build();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,15 +29,6 @@ public class IonosS3ProvisionedResource extends ProvisionedDataDestinationResour

private String accessKey;

public String getStorage() {
return getDataAddress().getStringProperty(STORAGE_NAME);
}
public String getBucketName() {
return getDataAddress().getStringProperty(BUCKET_NAME);
}
public String getBlobName() {
return getDataAddress().getStringProperty(BLOB_NAME);
}
public String getAccessKey() {
return accessKey;
}
Expand Down Expand Up @@ -69,8 +60,8 @@ public Builder bucketName(String bucketName) {
return this;
}

public Builder blobName(String blobName) {
dataAddressBuilder.property(BLOB_NAME, blobName);
public Builder path(String path) {
dataAddressBuilder.property(PATH, path);
return this;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,32 +16,26 @@

import com.ionos.edc.extension.s3.api.S3ConnectorApi;
import com.ionos.edc.extension.s3.configuration.IonosToken;
import com.ionos.edc.extension.s3.connector.ionosapi.TemporaryKey;

import dev.failsafe.RetryPolicy;
import org.eclipse.edc.connector.transfer.spi.provision.Provisioner;
import org.eclipse.edc.connector.transfer.spi.types.DeprovisionedResource;
import org.eclipse.edc.connector.transfer.spi.types.ProvisionResponse;
import org.eclipse.edc.connector.transfer.spi.types.ProvisionedResource;
import org.eclipse.edc.connector.transfer.spi.types.ResourceDefinition;
import org.eclipse.edc.spi.monitor.Monitor;
import org.eclipse.edc.spi.response.StatusResult;
import org.jetbrains.annotations.NotNull;

import java.time.OffsetDateTime;
import java.util.concurrent.CompletableFuture;
import static dev.failsafe.Failsafe.with;
import static java.lang.String.format;

public class IonosS3Provisioner implements Provisioner<IonosS3ResourceDefinition, IonosS3ProvisionedResource> {
private final RetryPolicy<Object> retryPolicy;
private final Monitor monitor;
private final S3ConnectorApi s3Api;

public IonosS3Provisioner(RetryPolicy<Object> retryPolicy, Monitor monitor, S3ConnectorApi s3Api) {
public IonosS3Provisioner(RetryPolicy<Object> retryPolicy, S3ConnectorApi s3Api) {

this.retryPolicy = retryPolicy;
this.monitor = monitor;
this.s3Api = s3Api;
}

Expand Down Expand Up @@ -79,8 +73,8 @@ public CompletableFuture<StatusResult<ProvisionResponse>> provision(IonosS3Resou
if (resourceDefinition.getStorage() != null) {
resourceBuilder = resourceBuilder.storage(resourceDefinition.getStorage());
}
if (resourceDefinition.getBlobName() != null) {
resourceBuilder = resourceBuilder.blobName(resourceDefinition.getBlobName());
if (resourceDefinition.getPath() != null) {
resourceBuilder = resourceBuilder.path(resourceDefinition.getPath());
}
var resource = resourceBuilder.build();

Expand All @@ -99,12 +93,9 @@ public CompletableFuture<StatusResult<DeprovisionedResource>> deprovision(
StatusResult.success(DeprovisionedResource.Builder.newInstance().provisionedResourceId(provisionedResource.getId()).build())
);
}

@NotNull
private CompletableFuture<Void> createBucket(String bucketName) {
return with(retryPolicy).runAsync(() -> {
s3Api.createBucket(bucketName);
});

private void createBucket(String bucketName) {
s3Api.createBucket(bucketName);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -17,19 +17,17 @@
import java.util.Objects;

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonTypeName;
import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
import org.eclipse.edc.connector.transfer.spi.types.ResourceDefinition;

@JsonDeserialize(as=IonosS3ResourceDefinition.class)
public class IonosS3ResourceDefinition extends ResourceDefinition {
private String keyName;
private String storage;

private String bucketName;
private String blobName;
private String accessKey = "DEFAULT";

private String path;
private String accessKey;
private String secretKey;

public IonosS3ResourceDefinition() {

Expand All @@ -40,30 +38,27 @@ public String getKeyName() {
public String getStorage() {
return storage;
}

public String getBucketName() {
return bucketName;
}
public String getBlobName() {
return blobName;
}
public String getAccessKey() {
return accessKey;
public String getPath() {
return path;
}

@Override
public Builder toBuilder() {
return initializeBuilder(new Builder())
.keyName(keyName)
.storage(storage)
.accessKey(accessKey)
.bucketName(bucketName)
.blobName(blobName);
.path(path)
.accessKey(accessKey)
.secretKey(secretKey);
}

public static class Builder extends ResourceDefinition.Builder<IonosS3ResourceDefinition, Builder> {

private Builder() {

super(new IonosS3ResourceDefinition());
}
@JsonCreator
Expand All @@ -75,26 +70,26 @@ public Builder keyName(String keyName) {
resourceDefinition.keyName = keyName;
return this;
}

public Builder storage(String storage) {
resourceDefinition.storage = storage;
return this;
}


public Builder bucketName(String bucketName) {
resourceDefinition.bucketName = bucketName;
return this;
}

public Builder blobName(String blobName) {
resourceDefinition.blobName = blobName;
public Builder path(String path) {
resourceDefinition.path = path;
return this;
}
public Builder accessKey(String accessKey) {
resourceDefinition.accessKey = accessKey;
return this;
}
public Builder secretKey(String secretKey) {
resourceDefinition.secretKey = secretKey;
return this;
}

@Override
protected void verify() {
Expand Down

0 comments on commit 912664a

Please sign in to comment.