diff --git a/extensions/asset-validator/src/main/java/org/upm/inesdata/validator/AssetValidatorExtension.java b/extensions/asset-validator/src/main/java/org/upm/inesdata/validator/AssetValidatorExtension.java index 4d34600..e5c130f 100644 --- a/extensions/asset-validator/src/main/java/org/upm/inesdata/validator/AssetValidatorExtension.java +++ b/extensions/asset-validator/src/main/java/org/upm/inesdata/validator/AssetValidatorExtension.java @@ -4,7 +4,6 @@ import org.eclipse.edc.runtime.metamodel.annotation.Inject; import org.eclipse.edc.runtime.metamodel.annotation.Provides; import org.eclipse.edc.spi.system.ServiceExtension; -import org.eclipse.edc.spi.system.ServiceExtensionContext; import org.eclipse.edc.validator.spi.JsonObjectValidatorRegistry; import static org.eclipse.edc.connector.controlplane.asset.spi.domain.Asset.EDC_ASSET_TYPE; diff --git a/extensions/asset-validator/src/main/java/org/upm/inesdata/validator/InesdataAssetValidator.java b/extensions/asset-validator/src/main/java/org/upm/inesdata/validator/InesdataAssetValidator.java index bb51d6d..f101571 100644 --- a/extensions/asset-validator/src/main/java/org/upm/inesdata/validator/InesdataAssetValidator.java +++ b/extensions/asset-validator/src/main/java/org/upm/inesdata/validator/InesdataAssetValidator.java @@ -1,17 +1,3 @@ -/* - * Copyright (c) 2023 Bayerische Motoren Werke Aktiengesellschaft (BMW AG) - * - * This program and the accompanying materials are made available under the - * terms of the Apache License, Version 2.0 which is available at - * https://www.apache.org/licenses/LICENSE-2.0 - * - * SPDX-License-Identifier: Apache-2.0 - * - * Contributors: - * Bayerische Motoren Werke Aktiengesellschaft (BMW AG) - initial API and implementation - * - */ - package org.upm.inesdata.validator; import jakarta.json.JsonObject; diff --git a/extensions/store-asset-api/src/main/java/org/upm/inesdata/storageasset/StorageAssetApiExtension.java b/extensions/store-asset-api/src/main/java/org/upm/inesdata/storageasset/StorageAssetApiExtension.java index 3b248f8..fa1d20c 100644 --- a/extensions/store-asset-api/src/main/java/org/upm/inesdata/storageasset/StorageAssetApiExtension.java +++ b/extensions/store-asset-api/src/main/java/org/upm/inesdata/storageasset/StorageAssetApiExtension.java @@ -95,8 +95,7 @@ public void initialize(ServiceExtensionContext context) { Region region = Region.of(regionName); - // Crear una instancia de S3Service - S3Service s3Service = new S3Service(accessKey, secretKey, endpointOverride, region, bucketName); + S3Service s3Service = new S3Service(accessKey, secretKey, endpointOverride, region, bucketName, monitor); var storageAssetApiController = new StorageAssetApiController(assetService, managementApiTransformerRegistry, validator,s3Service, diff --git a/extensions/store-asset-api/src/main/java/org/upm/inesdata/storageasset/controller/StorageAssetApi.java b/extensions/store-asset-api/src/main/java/org/upm/inesdata/storageasset/controller/StorageAssetApi.java index 9392365..6f24d2e 100644 --- a/extensions/store-asset-api/src/main/java/org/upm/inesdata/storageasset/controller/StorageAssetApi.java +++ b/extensions/store-asset-api/src/main/java/org/upm/inesdata/storageasset/controller/StorageAssetApi.java @@ -2,6 +2,7 @@ import io.swagger.v3.oas.annotations.OpenAPIDefinition; import io.swagger.v3.oas.annotations.Operation; +import io.swagger.v3.oas.annotations.Parameter; import io.swagger.v3.oas.annotations.info.Info; import io.swagger.v3.oas.annotations.media.ArraySchema; import io.swagger.v3.oas.annotations.media.Content; @@ -10,41 +11,82 @@ import io.swagger.v3.oas.annotations.responses.ApiResponse; import io.swagger.v3.oas.annotations.tags.Tag; import jakarta.json.JsonObject; +import jakarta.ws.rs.Consumes; +import jakarta.ws.rs.HeaderParam; +import jakarta.ws.rs.POST; +import jakarta.ws.rs.Path; +import jakarta.ws.rs.Produces; import jakarta.ws.rs.core.MediaType; import org.eclipse.edc.api.model.ApiCoreSchema; -import org.glassfish.jersey.media.multipart.FormDataContentDisposition; import org.glassfish.jersey.media.multipart.FormDataParam; import java.io.InputStream; @OpenAPIDefinition( - info = @Info(description = "Manages the connector s3 assets.", - title = "S3 Asset API", version = "1")) + info = @Info(description = "Manages the connector S3 assets.", + title = "S3 Asset API", version = "1")) @Tag(name = "S3Asset") +@Consumes(MediaType.MULTIPART_FORM_DATA) +@Produces(MediaType.APPLICATION_JSON) public interface StorageAssetApi { /** - * Creates a new storage asset + * Uploads a chunk of a file for creating a new S3 asset. * - * @param fileInputStream the input stream of the file to be uploaded - * @param fileDetail the details of the file to be uploaded - * @param assetJson the input stream of the asset metadata in JSON format - * @return JsonObject with the created asset + * @param contentDisposition The Content-Disposition header, which contains the file name. + * @param chunkIndex The index of the current chunk in the upload sequence. + * @param totalChunks The total number of chunks for this file. + * @param assetJson The asset info + * @param fileInputStream The input stream of the file chunk to be uploaded. + * @return JsonObject with status of the chunk upload or the next action. */ - @Operation(description = "Creates a new S3 asset", - requestBody = @RequestBody(content = @Content(mediaType = MediaType.MULTIPART_FORM_DATA, schema = @Schema( - type = "object", requiredProperties = {"file", "json"} - ))), - responses = { - @ApiResponse(responseCode = "200", description = "S3 asset was created successfully", - content = @Content(schema = @Schema(implementation = ApiCoreSchema.IdResponseSchema.class))), - @ApiResponse(responseCode = "400", description = "Request body was malformed", - content = @Content(array = @ArraySchema(schema = @Schema(implementation = ApiCoreSchema.ApiErrorDetailSchema.class)))), - @ApiResponse(responseCode = "409", description = "Could not create asset, because an asset with that ID already exists", - content = @Content(array = @ArraySchema(schema = @Schema(implementation = ApiCoreSchema.ApiErrorDetailSchema.class)))) - } + @POST + @Path("/upload-chunk") + @Operation( + description = "Uploads a chunk of a file to create a new S3 asset.", + requestBody = @RequestBody(content = @Content(mediaType = MediaType.MULTIPART_FORM_DATA, schema = @Schema( + type = "object", requiredProperties = {"file", "json"} + ))), + responses = { + @ApiResponse(responseCode = "200", description = "Chunk uploaded successfully", + content = @Content(schema = @Schema(implementation = ApiCoreSchema.IdResponseSchema.class))), + @ApiResponse(responseCode = "400", description = "Request body was malformed", + content = @Content(array = @ArraySchema(schema = @Schema(implementation = ApiCoreSchema.ApiErrorDetailSchema.class)))), + @ApiResponse(responseCode = "409", description = "Could not upload chunk, because of conflicts", + content = @Content(array = @ArraySchema(schema = @Schema(implementation = ApiCoreSchema.ApiErrorDetailSchema.class)))) + } ) - JsonObject createStorageAsset(@FormDataParam("file") InputStream fileInputStream, - @FormDataParam("file") FormDataContentDisposition fileDetail, - @FormDataParam("json") JsonObject assetJson); + JsonObject uploadChunk( + @Parameter(description = "Content-Disposition header, which contains the file name") @HeaderParam("Content-Disposition") String contentDisposition, + @Parameter(description = "Index of the current chunk in the upload sequence") @HeaderParam("Chunk-Index") int chunkIndex, + @Parameter(description = "Total number of chunks for this file") @HeaderParam("Total-Chunks") int totalChunks, + @FormDataParam("json") JsonObject assetJson, + @FormDataParam("file") InputStream fileInputStream); + + /** + * Finalizes the upload and creates the asset using the provided metadata (JSON). + * + * @param assetJson The metadata for the asset in JSON format. + * @param fileName The name of the uploaded file. + * @return JsonObject with the created asset or the status. + */ + @POST + @Path("/finalize-upload") + @Operation( + description = "Finalizes the chunked upload and creates the S3 asset using the provided metadata.", + requestBody = @RequestBody(content = @Content(mediaType = MediaType.MULTIPART_FORM_DATA, schema = @Schema( + type = "object", requiredProperties = {"json", "fileName"} + ))), + responses = { + @ApiResponse(responseCode = "200", description = "Asset created successfully", + content = @Content(schema = @Schema(implementation = ApiCoreSchema.IdResponseSchema.class))), + @ApiResponse(responseCode = "400", description = "Request body was malformed", + content = @Content(array = @ArraySchema(schema = @Schema(implementation = ApiCoreSchema.ApiErrorDetailSchema.class)))), + @ApiResponse(responseCode = "409", description = "Asset could not be created due to conflict", + content = @Content(array = @ArraySchema(schema = @Schema(implementation = ApiCoreSchema.ApiErrorDetailSchema.class)))) + } + ) + JsonObject finalizeUpload( + @FormDataParam("fileName") String fileName, + @FormDataParam("json") JsonObject assetJson); } diff --git a/extensions/store-asset-api/src/main/java/org/upm/inesdata/storageasset/controller/StorageAssetApiController.java b/extensions/store-asset-api/src/main/java/org/upm/inesdata/storageasset/controller/StorageAssetApiController.java index 04af4df..bd65f11 100644 --- a/extensions/store-asset-api/src/main/java/org/upm/inesdata/storageasset/controller/StorageAssetApiController.java +++ b/extensions/store-asset-api/src/main/java/org/upm/inesdata/storageasset/controller/StorageAssetApiController.java @@ -1,11 +1,9 @@ package org.upm.inesdata.storageasset.controller; +import jakarta.json.Json; import jakarta.json.JsonObject; import jakarta.servlet.annotation.MultipartConfig; -import jakarta.ws.rs.Consumes; -import jakarta.ws.rs.POST; -import jakarta.ws.rs.Path; -import jakarta.ws.rs.Produces; +import jakarta.ws.rs.*; import jakarta.ws.rs.core.MediaType; import org.eclipse.edc.api.model.IdResponse; import org.eclipse.edc.connector.controlplane.asset.spi.domain.Asset; @@ -13,39 +11,36 @@ import org.eclipse.edc.jsonld.spi.JsonLd; import org.eclipse.edc.spi.EdcException; import org.eclipse.edc.spi.constants.CoreConstants; +import org.eclipse.edc.spi.monitor.Monitor; import org.eclipse.edc.transform.spi.TypeTransformerRegistry; -import org.eclipse.edc.util.string.StringUtils; import org.eclipse.edc.validator.spi.JsonObjectValidatorRegistry; import org.eclipse.edc.web.spi.exception.InvalidRequestException; import org.eclipse.edc.web.spi.exception.ValidationFailureException; -import org.glassfish.jersey.media.multipart.FormDataContentDisposition; import org.glassfish.jersey.media.multipart.FormDataParam; import org.upm.inesdata.storageasset.service.S3Service; -import java.io.BufferedInputStream; -import java.io.IOException; -import java.io.InputStream; +import java.io.*; +import java.util.ArrayList; +import java.util.List; import static org.eclipse.edc.connector.controlplane.asset.spi.domain.Asset.EDC_ASSET_TYPE; import static org.eclipse.edc.web.spi.exception.ServiceResultHandler.exceptionMapper; @MultipartConfig -@Consumes(MediaType.MULTIPART_FORM_DATA) -@Produces(MediaType.APPLICATION_JSON) @Path("/s3assets") public class StorageAssetApiController implements StorageAssetApi { + private final TypeTransformerRegistry transformerRegistry; private final AssetService service; private final JsonObjectValidatorRegistry validator; private final S3Service s3Service; - private final JsonLd jsonLd; - private final String bucketName; private final String region; public StorageAssetApiController(AssetService service, TypeTransformerRegistry transformerRegistry, - JsonObjectValidatorRegistry validator, S3Service s3Service, JsonLd jsonLd, String bucketName, String region) { + JsonObjectValidatorRegistry validator, S3Service s3Service, JsonLd jsonLd, + String bucketName, String region) { this.transformerRegistry = transformerRegistry; this.service = service; this.validator = validator; @@ -55,71 +50,91 @@ public StorageAssetApiController(AssetService service, TypeTransformerRegistry t this.region = region; } + /** + * Handles each chunk upload + */ @POST - @Override - public JsonObject createStorageAsset(@FormDataParam("file") InputStream fileInputStream, - @FormDataParam("file") FormDataContentDisposition fileDetail, @FormDataParam("json") JsonObject assetJson) { - - String fileName = fileDetail.getFileName(); - - InputStream bufferedInputStream = new BufferedInputStream(fileInputStream); + @Path("/upload-chunk") + @Consumes(MediaType.MULTIPART_FORM_DATA) + @Produces(MediaType.APPLICATION_JSON) + public JsonObject uploadChunk(@HeaderParam("Content-Disposition") String contentDisposition, + @HeaderParam("Chunk-Index") int chunkIndex, + @HeaderParam("Total-Chunks") int totalChunks, + @FormDataParam("json") JsonObject assetJson, + @FormDataParam("file") InputStream fileInputStream) { JsonObject expand = jsonLd.expand(assetJson).orElseThrow((f) -> new EdcException("Failed to expand request")); - // Validación + validator.validate(EDC_ASSET_TYPE, expand).orElseThrow(ValidationFailureException::new); + Asset asset = transformerRegistry.transform(expand, Asset.class).orElseThrow(InvalidRequestException::new); - // Transformación - var asset = transformerRegistry.transform(expand, Asset.class).orElseThrow(InvalidRequestException::new); + String fileName = contentDisposition.split("filename=")[1].replace("\"", ""); + String folder = String.valueOf(asset.getDataAddress().getProperties().get(CoreConstants.EDC_NAMESPACE+"folder")); - // Guardar fichero en MinIO - // Calcular el tamaño del fichero manualmente - long contentLength = 0; - try { - contentLength = getFileSize(bufferedInputStream); - } catch (IOException e) { - throw new EdcException("Failed to process file size", e); + // Construct the S3 key for the file, keeping the folder structure + String fullKey; + if (folder == null || folder.trim().isEmpty() || "null".equals(folder)) { + fullKey = fileName; // No folder, use the file name + } else { + fullKey = folder.endsWith("/") ? folder + fileName : folder + "/" + fileName; } - String folder = String.valueOf(asset.getDataAddress().getProperties().get(CoreConstants.EDC_NAMESPACE+"folder")); - String fullKey = StringUtils.isNullOrBlank(folder) || "null".equals(folder)?fileName:(folder.endsWith("/") ? folder + fileName : folder + "/" + fileName); - s3Service.uploadFile(fullKey,bufferedInputStream, contentLength); - try { - setStorageProperties(asset, fullKey); - // Creación de asset - var idResponse = service.create(asset) - .map(a -> IdResponse.Builder.newInstance().id(a.getId()).createdAt(a.getCreatedAt()).build()) - .orElseThrow(exceptionMapper(Asset.class, asset.getId())); + // Handle file upload chunking + try { + s3Service.uploadChunk(fullKey, fileInputStream, chunkIndex, totalChunks); - return transformerRegistry.transform(idResponse, JsonObject.class) - .orElseThrow(f -> new EdcException(f.getFailureDetail())); + // Return successful upload status for each chunk + return Json.createObjectBuilder() + .add("status", "Chunk " + chunkIndex + " uploaded successfully") + .build(); + } catch (IOException e) { + // If an error occurs, delete the file from S3 + s3Service.deleteFile(fullKey); + throw new EdcException("Failed to read or upload chunk", e); } catch (Exception e) { - // Eliminar el archivo en caso de fallo + // If an error occurs, delete the file from S3 s3Service.deleteFile(fullKey); - throw new EdcException("Failed to process multipart data", e); + throw new EdcException("Failed to process chunked data", e); } } - private long getFileSize(InputStream inputStream) throws IOException { - byte[] buffer = new byte[8192]; - int bytesRead; - long size = 0; - inputStream.mark(Integer.MAX_VALUE); + /** + * Finalize upload and create asset with JSON data + */ + @POST + @Path("/finalize-upload") + @Consumes(MediaType.MULTIPART_FORM_DATA) + @Produces(MediaType.APPLICATION_JSON) + public JsonObject finalizeUpload(@FormDataParam("fileName") String fileName, + @FormDataParam("json") JsonObject assetJson) { + + JsonObject expand = jsonLd.expand(assetJson).orElseThrow((f) -> new EdcException("Failed to expand request")); - while ((bytesRead = inputStream.read(buffer)) != -1) { - size += bytesRead; - } + validator.validate(EDC_ASSET_TYPE, expand).orElseThrow(ValidationFailureException::new); + Asset asset = transformerRegistry.transform(expand, Asset.class).orElseThrow(InvalidRequestException::new); + + // Set storage properties for the asset + setStorageProperties(asset, fileName); - inputStream.reset(); + // Create the asset in the service + IdResponse idResponse = service.create(asset) + .map(a -> IdResponse.Builder.newInstance().id(a.getId()).createdAt(a.getCreatedAt()).build()) + .orElseThrow(exceptionMapper(Asset.class, asset.getId())); - return size; + // Return the response for the created asset + return transformerRegistry.transform(idResponse, JsonObject.class) + .orElseThrow(f -> new EdcException(f.getFailureDetail())); } + /** + * Set necessary storage properties for the asset in S3. + */ private void setStorageProperties(Asset asset, String fileName) { asset.getPrivateProperties().put("storageAssetFile", fileName); asset.getDataAddress().setKeyName(fileName); asset.getDataAddress().setType("AmazonS3"); - asset.getDataAddress().getProperties().put(CoreConstants.EDC_NAMESPACE+ "bucketName", bucketName); - asset.getDataAddress().getProperties().put(CoreConstants.EDC_NAMESPACE+"region", region); + asset.getDataAddress().getProperties().put(CoreConstants.EDC_NAMESPACE + "bucketName", bucketName); + asset.getDataAddress().getProperties().put(CoreConstants.EDC_NAMESPACE + "region", region); } } diff --git a/extensions/store-asset-api/src/main/java/org/upm/inesdata/storageasset/service/S3Service.java b/extensions/store-asset-api/src/main/java/org/upm/inesdata/storageasset/service/S3Service.java index 204e79f..88d28a3 100644 --- a/extensions/store-asset-api/src/main/java/org/upm/inesdata/storageasset/service/S3Service.java +++ b/extensions/store-asset-api/src/main/java/org/upm/inesdata/storageasset/service/S3Service.java @@ -1,104 +1,130 @@ package org.upm.inesdata.storageasset.service; -import org.eclipse.edc.spi.EdcException; -import org.eclipse.edc.web.spi.exception.ObjectConflictException; +import org.eclipse.edc.spi.monitor.Monitor; import software.amazon.awssdk.auth.credentials.AwsBasicCredentials; import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider; +import software.amazon.awssdk.core.async.AsyncRequestBody; import software.amazon.awssdk.regions.Region; import software.amazon.awssdk.services.s3.S3AsyncClient; import software.amazon.awssdk.services.s3.S3Configuration; -import software.amazon.awssdk.services.s3.model.DeleteObjectRequest; -import software.amazon.awssdk.services.s3.model.HeadObjectRequest; -import software.amazon.awssdk.services.s3.model.NoSuchKeyException; -import software.amazon.awssdk.services.s3.model.PutObjectRequest; -import software.amazon.awssdk.services.s3.model.S3Exception; -import software.amazon.awssdk.transfer.s3.S3TransferManager; -import software.amazon.awssdk.transfer.s3.model.CompletedUpload; -import software.amazon.awssdk.transfer.s3.model.UploadRequest; -import software.amazon.awssdk.core.async.AsyncRequestBody; +import software.amazon.awssdk.services.s3.model.*; +import java.io.IOException; import java.io.InputStream; import java.net.URI; +import java.util.ArrayList; +import java.util.List; import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; -/** - * Servicio para manejar operaciones de almacenamiento en S3. - */ public class S3Service { private final S3AsyncClient s3AsyncClient; - private final S3TransferManager transferManager; private final String bucketName; - private final ExecutorService executorService; + private final ConcurrentMap multipartUploadStates = new ConcurrentHashMap<>(); + private final Monitor monitor; - public S3Service(String accessKey, String secretKey, String endpointOverride, Region region, String bucketName) { + private static final long PART_SIZE = 50 * 1024 * 1024; + + public S3Service(String accessKey, String secretKey, String endpointOverride, Region region, String bucketName, Monitor monitor) { this.s3AsyncClient = S3AsyncClient.builder() - .region(region) - .credentialsProvider(StaticCredentialsProvider.create(AwsBasicCredentials.create(accessKey, secretKey))) - .endpointOverride(URI.create(endpointOverride)) - .serviceConfiguration(S3Configuration.builder().pathStyleAccessEnabled(true).build()) - .build(); - this.transferManager = S3TransferManager.builder().s3Client(s3AsyncClient).build(); + .region(region) + .credentialsProvider(StaticCredentialsProvider.create(AwsBasicCredentials.create(accessKey, secretKey))) + .endpointOverride(URI.create(endpointOverride)) + .serviceConfiguration(S3Configuration.builder().pathStyleAccessEnabled(true).build()) + .build(); this.bucketName = bucketName; - this.executorService = Executors.newFixedThreadPool(10); // Crear un pool de hilos fijo + this.monitor = monitor; } - public String uploadFile(String key, InputStream inputStream, long contentLength) { + public void uploadChunk(String key, InputStream inputStream, int chunkIndex, int totalChunks) throws IOException { + MultipartUploadState uploadState = multipartUploadStates.computeIfAbsent(key, k -> initMultipartUpload(key)); + + byte[] buffer = inputStream.readAllBytes(); + long contentLength = buffer.length; - // Verificar si el archivo ya existe - boolean exists = doesObjectExist(bucketName, key).join(); - if (exists) { - throw new ObjectConflictException("File with key " + key + " already exists."); + if (contentLength < PART_SIZE && chunkIndex < totalChunks - 1) { + throw new IllegalArgumentException("Each chunk (except the last one) must be at least " + PART_SIZE + " bytes"); } - PutObjectRequest objectRequest = PutObjectRequest.builder() - .bucket(bucketName) - .key(key) - .build(); + UploadPartRequest uploadPartRequest = UploadPartRequest.builder() + .bucket(bucketName) + .key(key) + .uploadId(uploadState.uploadId) + .partNumber(chunkIndex + 1) + .contentLength(contentLength) + .build(); - AsyncRequestBody requestBody = AsyncRequestBody.fromInputStream(inputStream, contentLength, executorService); + CompletableFuture uploadFuture = s3AsyncClient.uploadPart(uploadPartRequest, AsyncRequestBody.fromBytes(buffer)); + uploadFuture.thenAccept(uploadPartResponse -> { + uploadState.completedParts.add(CompletedPart.builder().partNumber(chunkIndex + 1).eTag(uploadPartResponse.eTag()).build()); - UploadRequest uploadRequest = UploadRequest.builder() - .putObjectRequest(objectRequest) - .requestBody(requestBody) - .build(); + if (chunkIndex == totalChunks - 1) { + completeMultipartUpload(key, uploadState); + multipartUploadStates.remove(key); + } + }).exceptionally(e -> { + abortMultipartUpload(key, uploadState); + multipartUploadStates.remove(key); + monitor.warning("Error uploading chunk " + chunkIndex + " for file " + key, e); + throw new RuntimeException("Error uploading chunk " + chunkIndex + " for file " + key, e); + }); + } + + private MultipartUploadState initMultipartUpload(String key) { + CreateMultipartUploadRequest createRequest = CreateMultipartUploadRequest.builder() + .bucket(bucketName) + .key(key) + .build(); + CreateMultipartUploadResponse createResponse = s3AsyncClient.createMultipartUpload(createRequest).join(); + monitor.info(" Upload started"); + return new MultipartUploadState(createResponse.uploadId()); + } - CompletableFuture upload = transferManager.upload(uploadRequest).completionFuture(); - upload.join(); // Esperar a que la carga se complete + private void completeMultipartUpload(String key, MultipartUploadState uploadState) { + CompleteMultipartUploadRequest completeRequest = CompleteMultipartUploadRequest.builder() + .bucket(bucketName) + .key(key) + .uploadId(uploadState.uploadId) + .multipartUpload(CompletedMultipartUpload.builder().parts(uploadState.completedParts).build()) + .build(); + s3AsyncClient.completeMultipartUpload(completeRequest).join(); + monitor.info(" Upload completed"); + } - return key; + private void abortMultipartUpload(String key, MultipartUploadState uploadState) { + AbortMultipartUploadRequest abortRequest = AbortMultipartUploadRequest.builder() + .bucket(bucketName) + .key(key) + .uploadId(uploadState.uploadId) + .build(); + s3AsyncClient.abortMultipartUpload(abortRequest).join(); + monitor.info("Upload aborted"); + } + + private static class MultipartUploadState { + private final String uploadId; + private final List completedParts; + + MultipartUploadState(String uploadId) { + this.uploadId = uploadId; + this.completedParts = new ArrayList<>(); + } } public void deleteFile(String key) { - // Ajustar la clave para incluir la carpeta - DeleteObjectRequest deleteObjectRequest = DeleteObjectRequest.builder() - .bucket(bucketName) - .key(key) - .build(); - s3AsyncClient.deleteObject(deleteObjectRequest).join(); // Esperar a que se complete la eliminación + try { + DeleteObjectRequest deleteObjectRequest = DeleteObjectRequest.builder() + .bucket(bucketName) + .key(key) + .build(); + s3AsyncClient.deleteObject(deleteObjectRequest).join(); + } catch (Exception e) { + monitor.severe("Error deleting file " + key + ": " + e.getMessage()); + } } public void close() { - transferManager.close(); s3AsyncClient.close(); - executorService.shutdown(); - } - - public CompletableFuture doesObjectExist(String bucketName, String key) { - HeadObjectRequest headObjectRequest = HeadObjectRequest.builder() - .bucket(bucketName) - .key(key) - .build(); - - return s3AsyncClient.headObject(headObjectRequest) - .thenApply(response -> true) - .exceptionally(ex -> { - if (ex.getCause() instanceof NoSuchKeyException || (ex.getCause() instanceof S3Exception && ((S3Exception) ex.getCause()).statusCode() == 404)) { - return false; - } else { - throw new RuntimeException("Error checking if object exists", ex); - } - }); } } diff --git a/extensions/vocabulary-api/src/main/java/org/upm/inesdata/vocabulary/VocabularyApiExtension.java b/extensions/vocabulary-api/src/main/java/org/upm/inesdata/vocabulary/VocabularyApiExtension.java index 36742a0..21d0289 100644 --- a/extensions/vocabulary-api/src/main/java/org/upm/inesdata/vocabulary/VocabularyApiExtension.java +++ b/extensions/vocabulary-api/src/main/java/org/upm/inesdata/vocabulary/VocabularyApiExtension.java @@ -26,7 +26,6 @@ import java.util.Map; import static org.eclipse.edc.spi.constants.CoreConstants.JSON_LD; -import static org.eclipse.edc.spi.system.ServiceExtensionContext.ANONYMOUS_PARTICIPANT; import static org.upm.inesdata.spi.vocabulary.domain.Vocabulary.EDC_VOCABULARY_TYPE; /** diff --git a/resources/sql/060_vocabulary-schema.sql b/resources/sql/060_vocabulary-schema.sql index 7caf1df..d57eae7 100644 --- a/resources/sql/060_vocabulary-schema.sql +++ b/resources/sql/060_vocabulary-schema.sql @@ -6,7 +6,7 @@ CREATE TABLE IF NOT EXISTS edc_vocabulary json_schema JSON DEFAULT '{}', name VARCHAR NOT NULL, connector_id VARCHAR NOT NULL, - category VARCHAR NOT NULL + category VARCHAR NOT NULL, PRIMARY KEY (id, connector_id) ); diff --git a/resources/sql/080_access_token_data.sql b/resources/sql/080_access_token_data.sql new file mode 100644 index 0000000..61af874 --- /dev/null +++ b/resources/sql/080_access_token_data.sql @@ -0,0 +1,7 @@ +CREATE TABLE IF NOT EXISTS edc_accesstokendata +( + id VARCHAR NOT NULL PRIMARY KEY, + claim_token JSON NOT NULL, + data_address JSON NOT NULL, + additional_properties JSON DEFAULT '{}' +); \ No newline at end of file