Skip to content

Commit

Permalink
Fix to transfer filters and multiple datasources bugs (Digital-Ecosys…
Browse files Browse the repository at this point in the history
…tems#71)

* fix: datasink with a non ionos provider datasource

* fix: datasink with a non ionos provider datasource

* fix: filter in transfer of multiple folders

* fix: filter in transfer of multiple folders
  • Loading branch information
jannotti-glaucio authored Mar 4, 2024
1 parent ef93b09 commit 483b8e9
Show file tree
Hide file tree
Showing 11 changed files with 483 additions and 164 deletions.
11 changes: 2 additions & 9 deletions extensions/core-ionos-s3/build.gradle.kts
Original file line number Diff line number Diff line change
@@ -1,17 +1,12 @@
plugins {
`java-library`
`maven-publish`

}
val javaVersion: String by project
val faaastVersion: String by project

val edcGroup: String by project
val edcVersion: String by project
val okHttpVersion: String by project
val rsApi: String by project
val metaModelVersion: String by project
val minIOVersion: String by project

val extensionsGroup: String by project
val extensionsVersion: String by project

Expand All @@ -21,14 +16,12 @@ val gitHubUser: String? by project
val gitHubToken: String? by project

dependencies {

api("${edcGroup}:runtime-metamodel:${metaModelVersion}")

implementation("${edcGroup}:transfer-spi:${edcVersion}")
implementation("io.minio:minio:${minIOVersion}")

testImplementation ("${edcGroup}:junit:${edcVersion}")
}

java {
withJavadocJar()
withSourcesJar()
Expand Down
14 changes: 6 additions & 8 deletions extensions/data-plane-ionos-s3/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -8,30 +8,28 @@ val edcGroup: String by project
val edcVersion: String by project
val extensionsGroup: String by project
val extensionsVersion: String by project
val junitVersion: String by project
val mockitoVersion: String by project

val gitHubPkgsName: String by project
val gitHubPkgsUrl: String by project
val gitHubUser: String? by project
val gitHubToken: String? by project

dependencies {

api("${edcGroup}:data-plane-spi:${edcVersion}")

implementation(project(":extensions:core-ionos-s3"))
implementation("${edcGroup}:util:${edcVersion}")
implementation("${edcGroup}:transfer-spi:${edcVersion}")
implementation("${edcGroup}:data-plane-util:${edcVersion}")
implementation("${edcGroup}:data-plane-core:${edcVersion}")
implementation("${edcGroup}:http:${edcVersion}")
implementation("${edcGroup}:validator-spi:${edcVersion}")

implementation(project(":extensions:core-ionos-s3"))

implementation("org.realityforge.org.jetbrains.annotations:org.jetbrains.annotations:1.7.0")

testImplementation("org.junit.jupiter:junit-jupiter-api:5.9.1")
testImplementation("org.junit.jupiter:junit-jupiter-engine:5.9.1")
testImplementation("org.assertj:assertj-core:3.22.0")
testImplementation("org.junit.jupiter:junit-jupiter-api:${junitVersion}")
testImplementation("org.junit.jupiter:junit-jupiter-engine:${junitVersion}")
testImplementation("org.mockito:mockito-core:${mockitoVersion}")
}

java {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.InputStream;
import java.util.List;
import java.util.Objects;

Expand All @@ -46,27 +47,56 @@ protected StreamResult<Object> transferParts(List<DataSource.Part> parts) {
blobName = part.name();
}

var streamsOutput = new ByteArrayOutputStream();
var stream = part.openStream();
while (stream != null) {
try {
streamsOutput.write(stream.readAllBytes());
stream.close();
ByteArrayOutputStream streamsOutput = null;
InputStream stream = null;
try {
streamsOutput = new ByteArrayOutputStream();
stream = part.openStream();

} catch (Exception e) {
return uploadFailure(e, blobName);
if (part instanceof IonosDataSource.S3Part) {
// Multiple fetches
while (stream != null) {
try {
streamsOutput.write(stream.readAllBytes());
stream.close();

} catch (Exception e) {
return uploadFailure(e, blobName);
}

stream = part.openStream();
}
} else {
// Single fetch
try {
streamsOutput.write(stream.readAllBytes());
stream.close();

} catch (Exception e) {
return uploadFailure(e, blobName);
}
}

stream = part.openStream();
}
var byteArray = streamsOutput.toByteArray();
try (var streamsInput = new ByteArrayInputStream(byteArray)) {
s3Api.uploadObject(bucketName, blobName, streamsInput);
streamsOutput.close();

var byteArray = streamsOutput.toByteArray();
try (var streamsInput = new ByteArrayInputStream(byteArray)) {
s3Api.uploadObject(bucketName, blobName, streamsInput);
streamsOutput.close();
} catch (Exception e) {
return uploadFailure(e, blobName);
}
} finally {
try {
if (streamsOutput != null) {
streamsOutput.close();
}
if (stream != null) {
stream.close();
}

} catch (Exception e) {
return uploadFailure(e, blobName);
} catch (Exception e) {
monitor.severe("Error closing streams", e);
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
import java.util.stream.Collectors;
import java.util.stream.Stream;

import static org.eclipse.edc.connector.dataplane.spi.pipeline.StreamFailure.Reason.GENERAL_ERROR;
import static org.eclipse.edc.connector.dataplane.spi.pipeline.StreamFailure.Reason.NOT_FOUND;
import static org.eclipse.edc.connector.dataplane.spi.pipeline.StreamResult.success;
import static org.eclipse.edc.connector.dataplane.spi.pipeline.StreamResult.failure;

Expand All @@ -53,7 +53,7 @@ public StreamResult<Stream<Part>> openPartStream() {

if (objects.isEmpty()) {
return failure(new StreamFailure(
List.of("No files found in bucket " + bucketName + " with blobName " + blobName), GENERAL_ERROR)
List.of("No files found in bucket " + bucketName + " with blobName " + blobName), NOT_FOUND)
);
}

Expand All @@ -71,7 +71,7 @@ public StreamResult<Stream<Part>> openPartStream() {

if (objects.isEmpty()) {
return failure(new StreamFailure(
List.of("No files found in bucket " + bucketName + " with blobName " + blobName), GENERAL_ERROR)
List.of("No files found in bucket " + bucketName + " with blobName " + blobName), NOT_FOUND)
);
}

Expand All @@ -85,25 +85,25 @@ boolean applyFilterIncludes(S3Object object) {
if (object.isRootObject(blobName))
return true;

return filterIncludes.matcher(object.shortObjectName(blobName)).find();
return filterIncludes.matcher(object.shortObjectName(blobName)).matches();
}
boolean applyFilterExcludes(S3Object object) {
if (object.isRootObject(blobName))
return true;

return !filterExcludes.matcher(object.shortObjectName(blobName)).find();
return !filterExcludes.matcher(object.shortObjectName(blobName)).matches();
}

@Override
public void close() {
}

private static class S3Part implements Part {
public static class S3Part implements Part {
private final S3ConnectorApi s3Api;
private final Monitor monitor;
private final String bucketName;
private final String blobName;
private boolean isDirectory;
private final boolean isDirectory;
private final long fileSize;

private boolean isOpened = true;
Expand Down
Loading

0 comments on commit 483b8e9

Please sign in to comment.