From e3f9825b9e6a0235416f1301277facaf5d2be52e Mon Sep 17 00:00:00 2001 From: Paul Latzelsperger Date: Tue, 5 Mar 2024 16:39:29 +0100 Subject: [PATCH 1/7] add new module v2 --- .../data-plane-public-api-v2/build.gradle.kts | 46 ++++ .../api/DataPlanePublicApiExtension.java | 102 +++++++++ .../ContainerRequestContextApi.java | 68 ++++++ .../ContainerRequestContextApiImpl.java | 108 ++++++++++ .../controller/DataFlowRequestSupplier.java | 71 +++++++ .../api/controller/DataPlanePublicApi.java | 82 +++++++ .../DataPlanePublicApiController.java | 181 ++++++++++++++++ ...nsumerPullTransferDataAddressResolver.java | 67 ++++++ ...rg.eclipse.edc.spi.system.ServiceExtension | 1 + .../DataFlowStartMessageSupplierTest.java | 94 ++++++++ .../DataPlanePublicApiControllerTest.java | 201 ++++++++++++++++++ ...erPullTransferDataAddressResolverTest.java | 106 +++++++++ settings.gradle.kts | 1 + 13 files changed, 1128 insertions(+) create mode 100644 extensions/data-plane/data-plane-public-api-v2/build.gradle.kts create mode 100644 extensions/data-plane/data-plane-public-api-v2/src/main/java/org/eclipse/edc/connector/dataplane/api/DataPlanePublicApiExtension.java create mode 100644 extensions/data-plane/data-plane-public-api-v2/src/main/java/org/eclipse/edc/connector/dataplane/api/controller/ContainerRequestContextApi.java create mode 100644 extensions/data-plane/data-plane-public-api-v2/src/main/java/org/eclipse/edc/connector/dataplane/api/controller/ContainerRequestContextApiImpl.java create mode 100644 extensions/data-plane/data-plane-public-api-v2/src/main/java/org/eclipse/edc/connector/dataplane/api/controller/DataFlowRequestSupplier.java create mode 100644 extensions/data-plane/data-plane-public-api-v2/src/main/java/org/eclipse/edc/connector/dataplane/api/controller/DataPlanePublicApi.java create mode 100644 extensions/data-plane/data-plane-public-api-v2/src/main/java/org/eclipse/edc/connector/dataplane/api/controller/DataPlanePublicApiController.java create mode 100644 extensions/data-plane/data-plane-public-api-v2/src/main/java/org/eclipse/edc/connector/dataplane/api/validation/ConsumerPullTransferDataAddressResolver.java create mode 100644 extensions/data-plane/data-plane-public-api-v2/src/main/resources/META-INF/services/org.eclipse.edc.spi.system.ServiceExtension create mode 100644 extensions/data-plane/data-plane-public-api-v2/src/test/java/org/eclipse/edc/connector/dataplane/api/controller/DataFlowStartMessageSupplierTest.java create mode 100644 extensions/data-plane/data-plane-public-api-v2/src/test/java/org/eclipse/edc/connector/dataplane/api/controller/DataPlanePublicApiControllerTest.java create mode 100644 extensions/data-plane/data-plane-public-api-v2/src/test/java/org/eclipse/edc/connector/dataplane/api/validation/ConsumerPullTransferDataAddressResolverTest.java diff --git a/extensions/data-plane/data-plane-public-api-v2/build.gradle.kts b/extensions/data-plane/data-plane-public-api-v2/build.gradle.kts new file mode 100644 index 00000000000..f8d8ba71851 --- /dev/null +++ b/extensions/data-plane/data-plane-public-api-v2/build.gradle.kts @@ -0,0 +1,46 @@ +/* + * Copyright (c) 2022 Microsoft Corporation + * + * This program and the accompanying materials are made available under the + * terms of the Apache License, Version 2.0 which is available at + * https://www.apache.org/licenses/LICENSE-2.0 + * + * SPDX-License-Identifier: Apache-2.0 + * + * Contributors: + * Microsoft Corporation - initial API and implementation + * Bayerische Motoren Werke Aktiengesellschaft (BMW AG) - improvements + * Mercedes-Benz Tech Innovation GmbH - publish public api context into dedicated swagger hub page + * + */ + + +plugins { + `java-library` + id("io.swagger.core.v3.swagger-gradle-plugin") +} + +dependencies { + api(project(":spi:common:http-spi")) + api(project(":spi:common:web-spi")) + api(project(":spi:data-plane:data-plane-spi")) + implementation(project(":core:common:util")) + + implementation(project(":core:data-plane:data-plane-util")) + implementation(libs.jakarta.rsApi) + + testImplementation(project(":extensions:common:http")) + testImplementation(project(":core:common:junit")) + testImplementation(libs.jersey.multipart) + testImplementation(libs.restAssured) + testImplementation(libs.mockserver.netty) + testImplementation(libs.mockserver.client) + testImplementation(testFixtures(project(":extensions:common:http:jersey-core"))) +} +edcBuild { + swagger { + apiGroup.set("public-api") + } +} + + diff --git a/extensions/data-plane/data-plane-public-api-v2/src/main/java/org/eclipse/edc/connector/dataplane/api/DataPlanePublicApiExtension.java b/extensions/data-plane/data-plane-public-api-v2/src/main/java/org/eclipse/edc/connector/dataplane/api/DataPlanePublicApiExtension.java new file mode 100644 index 00000000000..4c3d4b729ba --- /dev/null +++ b/extensions/data-plane/data-plane-public-api-v2/src/main/java/org/eclipse/edc/connector/dataplane/api/DataPlanePublicApiExtension.java @@ -0,0 +1,102 @@ +/* + * Copyright (c) 2022 Microsoft Corporation + * + * This program and the accompanying materials are made available under the + * terms of the Apache License, Version 2.0 which is available at + * https://www.apache.org/licenses/LICENSE-2.0 + * + * SPDX-License-Identifier: Apache-2.0 + * + * Contributors: + * Microsoft Corporation - initial API and implementation + * Mercedes-Benz Tech Innovation GmbH - publish public api context into dedicated swagger hub page + * + */ + +package org.eclipse.edc.connector.dataplane.api; + +import org.eclipse.edc.connector.dataplane.api.controller.DataPlanePublicApiController; +import org.eclipse.edc.connector.dataplane.api.validation.ConsumerPullTransferDataAddressResolver; +import org.eclipse.edc.connector.dataplane.spi.iam.DataPlaneAuthorizationService; +import org.eclipse.edc.connector.dataplane.spi.pipeline.PipelineService; +import org.eclipse.edc.runtime.metamodel.annotation.Extension; +import org.eclipse.edc.runtime.metamodel.annotation.Inject; +import org.eclipse.edc.runtime.metamodel.annotation.Setting; +import org.eclipse.edc.spi.http.EdcHttpClient; +import org.eclipse.edc.spi.system.ExecutorInstrumentation; +import org.eclipse.edc.spi.system.ServiceExtension; +import org.eclipse.edc.spi.system.ServiceExtensionContext; +import org.eclipse.edc.spi.types.TypeManager; +import org.eclipse.edc.web.spi.WebServer; +import org.eclipse.edc.web.spi.WebService; +import org.eclipse.edc.web.spi.configuration.WebServiceConfigurer; +import org.eclipse.edc.web.spi.configuration.WebServiceSettings; + +import java.util.concurrent.Executors; + +/** + * This extension provides generic endpoints which are open to public participants of the Dataspace to execute + * requests on the actual data source. + */ +@Extension(value = DataPlanePublicApiExtension.NAME) +public class DataPlanePublicApiExtension implements ServiceExtension { + public static final String NAME = "Data Plane Public API"; + private static final int DEFAULT_PUBLIC_PORT = 8185; + private static final String PUBLIC_API_CONFIG = "web.http.public"; + private static final String PUBLIC_CONTEXT_ALIAS = "public"; + private static final String PUBLIC_CONTEXT_PATH = "/api/v1/public"; + + @Setting + private static final String CONTROL_PLANE_VALIDATION_ENDPOINT = "edc.dataplane.token.validation.endpoint"; + + private static final int DEFAULT_THREAD_POOL = 10; + + private static final WebServiceSettings PUBLIC_SETTINGS = WebServiceSettings.Builder.newInstance() + .apiConfigKey(PUBLIC_API_CONFIG) + .contextAlias(PUBLIC_CONTEXT_ALIAS) + .defaultPath(PUBLIC_CONTEXT_PATH) + .defaultPort(DEFAULT_PUBLIC_PORT) + .name(NAME) + .build(); + + @Inject + private WebServer webServer; + + @Inject + private WebServiceConfigurer webServiceConfigurer; + + @Inject + private PipelineService pipelineService; + + @Inject + private WebService webService; + + @Inject + private EdcHttpClient httpClient; + + @Inject + private TypeManager typeManager; + + @Inject + private ExecutorInstrumentation executorInstrumentation; + @Inject + private DataPlaneAuthorizationService authorizationService; + + @Override + public String name() { + return NAME; + } + + @Override + public void initialize(ServiceExtensionContext context) { + var validationEndpoint = context.getConfig().getString(CONTROL_PLANE_VALIDATION_ENDPOINT); + var dataAddressResolver = new ConsumerPullTransferDataAddressResolver(httpClient, validationEndpoint, typeManager.getMapper()); + var configuration = webServiceConfigurer.configure(context, webServer, PUBLIC_SETTINGS); + var executorService = executorInstrumentation.instrument( + Executors.newFixedThreadPool(DEFAULT_THREAD_POOL), + "Data plane proxy transfers" + ); + var publicApiController = new DataPlanePublicApiController(pipelineService, dataAddressResolver, executorService, authorizationService); + webService.registerResource(configuration.getContextAlias(), publicApiController); + } +} diff --git a/extensions/data-plane/data-plane-public-api-v2/src/main/java/org/eclipse/edc/connector/dataplane/api/controller/ContainerRequestContextApi.java b/extensions/data-plane/data-plane-public-api-v2/src/main/java/org/eclipse/edc/connector/dataplane/api/controller/ContainerRequestContextApi.java new file mode 100644 index 00000000000..99c48a2e4df --- /dev/null +++ b/extensions/data-plane/data-plane-public-api-v2/src/main/java/org/eclipse/edc/connector/dataplane/api/controller/ContainerRequestContextApi.java @@ -0,0 +1,68 @@ +/* + * Copyright (c) 2022 Amadeus + * + * This program and the accompanying materials are made available under the + * terms of the Apache License, Version 2.0 which is available at + * https://www.apache.org/licenses/LICENSE-2.0 + * + * SPDX-License-Identifier: Apache-2.0 + * + * Contributors: + * Amadeus - initial API and implementation + * + */ + +package org.eclipse.edc.connector.dataplane.api.controller; + +import jakarta.ws.rs.container.ContainerRequestContext; + +import java.util.Map; + +/** + * Wrapper around {@link ContainerRequestContext} enabling mocking. + */ +public interface ContainerRequestContextApi { + + /** + * Get the request headers. Note that if more than one value is associated to a specific header, + * only the first one is retained. + * + * @return Headers map. + */ + Map headers(); + + /** + * Format query of the request as string, e.g. "hello=world\&foo=bar". + * + * @return Query param string. + */ + String queryParams(); + + /** + * Format the request body into a string. + * + * @return Request body. + */ + String body(); + + /** + * Get the media type from incoming request. + * + * @return Media type. + */ + String mediaType(); + + /** + * Return request path, e.g. "hello/world/foo/bar". + * + * @return Path string. + */ + String path(); + + /** + * Get http method from the incoming request, e.g. "GET", "POST"... + * + * @return Http method. + */ + String method(); +} diff --git a/extensions/data-plane/data-plane-public-api-v2/src/main/java/org/eclipse/edc/connector/dataplane/api/controller/ContainerRequestContextApiImpl.java b/extensions/data-plane/data-plane-public-api-v2/src/main/java/org/eclipse/edc/connector/dataplane/api/controller/ContainerRequestContextApiImpl.java new file mode 100644 index 00000000000..c81245a62d6 --- /dev/null +++ b/extensions/data-plane/data-plane-public-api-v2/src/main/java/org/eclipse/edc/connector/dataplane/api/controller/ContainerRequestContextApiImpl.java @@ -0,0 +1,108 @@ +/* + * Copyright (c) 2022 Amadeus + * + * This program and the accompanying materials are made available under the + * terms of the Apache License, Version 2.0 which is available at + * https://www.apache.org/licenses/LICENSE-2.0 + * + * SPDX-License-Identifier: Apache-2.0 + * + * Contributors: + * Amadeus - initial API and implementation + * + */ + +package org.eclipse.edc.connector.dataplane.api.controller; + +import jakarta.ws.rs.container.ContainerRequestContext; +import jakarta.ws.rs.core.MediaType; +import org.eclipse.edc.spi.EdcException; + +import java.io.BufferedReader; +import java.io.IOException; +import java.io.InputStreamReader; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.stream.Collectors; + +/** + * This class provides a set of API wrapping a {@link ContainerRequestContext}. + */ +public class ContainerRequestContextApiImpl implements ContainerRequestContextApi { + + private static final String QUERY_PARAM_SEPARATOR = "&"; + + private final ContainerRequestContext context; + + public ContainerRequestContextApiImpl(ContainerRequestContext context) { + this.context = context; + } + + @Override + public Map headers() { + return context.getHeaders().entrySet() + .stream() + .filter(entry -> !entry.getValue().isEmpty()) + .collect(Collectors.toMap(Map.Entry::getKey, entry -> entry.getValue().get(0))); + } + + @Override + public String queryParams() { + return context.getUriInfo().getQueryParameters().entrySet() + .stream() + .map(entry -> new QueryParam(entry.getKey(), entry.getValue())) + .filter(QueryParam::isValid) + .map(QueryParam::toString) + .collect(Collectors.joining(QUERY_PARAM_SEPARATOR)); + } + + @Override + public String body() { + try (BufferedReader br = new BufferedReader(new InputStreamReader(context.getEntityStream()))) { + return br.lines().collect(Collectors.joining("\n")); + } catch (IOException e) { + throw new EdcException("Failed to read request body: " + e.getMessage()); + } + } + + @Override + public String path() { + var pathInfo = context.getUriInfo().getPath(); + return pathInfo.startsWith("/") ? pathInfo.substring(1) : pathInfo; + } + + @Override + public String mediaType() { + return Optional.ofNullable(context.getMediaType()) + .map(MediaType::toString) + .orElse(null); + } + + @Override + public String method() { + return context.getMethod(); + } + + private static final class QueryParam { + + private final String key; + private final List values; + private final boolean valid; + + private QueryParam(String key, List values) { + this.key = key; + this.values = values; + this.valid = key != null && values != null && !values.isEmpty(); + } + + public boolean isValid() { + return valid; + } + + @Override + public String toString() { + return valid ? key + "=" + values.get(0) : ""; + } + } +} diff --git a/extensions/data-plane/data-plane-public-api-v2/src/main/java/org/eclipse/edc/connector/dataplane/api/controller/DataFlowRequestSupplier.java b/extensions/data-plane/data-plane-public-api-v2/src/main/java/org/eclipse/edc/connector/dataplane/api/controller/DataFlowRequestSupplier.java new file mode 100644 index 00000000000..b1af7e8159c --- /dev/null +++ b/extensions/data-plane/data-plane-public-api-v2/src/main/java/org/eclipse/edc/connector/dataplane/api/controller/DataFlowRequestSupplier.java @@ -0,0 +1,71 @@ +/* + * Copyright (c) 2022 Amadeus + * + * This program and the accompanying materials are made available under the + * terms of the Apache License, Version 2.0 which is available at + * https://www.apache.org/licenses/LICENSE-2.0 + * + * SPDX-License-Identifier: Apache-2.0 + * + * Contributors: + * Amadeus - initial API and implementation + * + */ + +package org.eclipse.edc.connector.dataplane.api.controller; + +import org.eclipse.edc.connector.dataplane.util.sink.AsyncStreamingDataSink; +import org.eclipse.edc.spi.types.domain.DataAddress; +import org.eclipse.edc.spi.types.domain.transfer.DataFlowStartMessage; + +import java.util.HashMap; +import java.util.Map; +import java.util.Optional; +import java.util.UUID; +import java.util.function.BiFunction; + +import static org.eclipse.edc.connector.dataplane.spi.schema.DataFlowRequestSchema.BODY; +import static org.eclipse.edc.connector.dataplane.spi.schema.DataFlowRequestSchema.MEDIA_TYPE; +import static org.eclipse.edc.connector.dataplane.spi.schema.DataFlowRequestSchema.METHOD; +import static org.eclipse.edc.connector.dataplane.spi.schema.DataFlowRequestSchema.PATH; +import static org.eclipse.edc.connector.dataplane.spi.schema.DataFlowRequestSchema.QUERY_PARAMS; + +public class DataFlowRequestSupplier implements BiFunction { + + /** + * Put all properties of the incoming request (method, request body, query params...) into a map. + */ + private static Map createProps(ContainerRequestContextApi contextApi) { + var props = new HashMap(); + props.put(METHOD, contextApi.method()); + props.put(QUERY_PARAMS, contextApi.queryParams()); + props.put(PATH, contextApi.path()); + Optional.ofNullable(contextApi.mediaType()) + .ifPresent(mediaType -> { + props.put(MEDIA_TYPE, mediaType); + props.put(BODY, contextApi.body()); + }); + return props; + } + + /** + * Create a {@link DataFlowStartMessage} based on incoming request and claims decoded from the access token. + * + * @param contextApi Api for accessing request properties. + * @param dataAddress Source data address. + * @return DataFlowRequest + */ + @Override + public DataFlowStartMessage apply(ContainerRequestContextApi contextApi, DataAddress dataAddress) { + var props = createProps(contextApi); + return DataFlowStartMessage.Builder.newInstance() + .processId(UUID.randomUUID().toString()) + .sourceDataAddress(dataAddress) + .destinationDataAddress(DataAddress.Builder.newInstance() + .type(AsyncStreamingDataSink.TYPE) + .build()) + .id(UUID.randomUUID().toString()) + .properties(props) + .build(); + } +} diff --git a/extensions/data-plane/data-plane-public-api-v2/src/main/java/org/eclipse/edc/connector/dataplane/api/controller/DataPlanePublicApi.java b/extensions/data-plane/data-plane-public-api-v2/src/main/java/org/eclipse/edc/connector/dataplane/api/controller/DataPlanePublicApi.java new file mode 100644 index 00000000000..0f40b8a733f --- /dev/null +++ b/extensions/data-plane/data-plane-public-api-v2/src/main/java/org/eclipse/edc/connector/dataplane/api/controller/DataPlanePublicApi.java @@ -0,0 +1,82 @@ +/* + * Copyright (c) 2022 Amadeus + * + * This program and the accompanying materials are made available under the + * terms of the Apache License, Version 2.0 which is available at + * https://www.apache.org/licenses/LICENSE-2.0 + * + * SPDX-License-Identifier: Apache-2.0 + * + * Contributors: + * Amadeus - Initial implementation + * + */ + +package org.eclipse.edc.connector.dataplane.api.controller; + +import io.swagger.v3.oas.annotations.OpenAPIDefinition; +import io.swagger.v3.oas.annotations.Operation; +import io.swagger.v3.oas.annotations.responses.ApiResponse; +import io.swagger.v3.oas.annotations.tags.Tag; +import jakarta.ws.rs.container.AsyncResponse; +import jakarta.ws.rs.container.ContainerRequestContext; + +@OpenAPIDefinition +@Tag(name = "Data Plane public API", + description = "The public API of the Data Plane is a data proxy enabling a data consumer to actively query" + + "data from the provider data source (e.g. backend Rest API, internal database...) through its Data Plane" + + "instance. Thus the Data Plane is the only entry/output door for the data, which avoids the provider to expose" + + "directly its data externally." + + "" + + "The Data Plane public API being a proxy, it supports all verbs (i.e. GET, POST, PUT, PATCH, DELETE), which" + + "can then conveyed until the data source is required. This is especially useful when the actual data source" + + "is a Rest API itself." + + "In the same manner, any set of arbitrary query parameters, path parameters and request body are supported " + + "(in the limits fixed by the HTTP server) and can also conveyed to the actual data source.") +public interface DataPlanePublicApi { + + @Operation(description = "Send `GET` data query to the Data Plane.", + responses = { + @ApiResponse(responseCode = "400", description = "Missing access token"), + @ApiResponse(responseCode = "403", description = "Access token is expired or invalid"), + @ApiResponse(responseCode = "500", description = "Failed to transfer data") + } + ) + void get(ContainerRequestContext context, AsyncResponse response); + + @Operation(description = "Send `POST` data query to the Data Plane.", + responses = { + @ApiResponse(responseCode = "400", description = "Missing access token"), + @ApiResponse(responseCode = "403", description = "Access token is expired or invalid"), + @ApiResponse(responseCode = "500", description = "Failed to transfer data") + } + ) + void post(ContainerRequestContext context, AsyncResponse response); + + @Operation(description = "Send `PUT` data query to the Data Plane.", + responses = { + @ApiResponse(responseCode = "400", description = "Missing access token"), + @ApiResponse(responseCode = "403", description = "Access token is expired or invalid"), + @ApiResponse(responseCode = "500", description = "Failed to transfer data") + } + ) + void put(ContainerRequestContext context, AsyncResponse response); + + @Operation(description = "Send `DELETE` data query to the Data Plane.", + responses = { + @ApiResponse(responseCode = "400", description = "Missing access token"), + @ApiResponse(responseCode = "403", description = "Access token is expired or invalid"), + @ApiResponse(responseCode = "500", description = "Failed to transfer data") + } + ) + void delete(ContainerRequestContext context, AsyncResponse response); + + @Operation(description = "Send `PATCH` data query to the Data Plane.", + responses = { + @ApiResponse(responseCode = "400", description = "Missing access token"), + @ApiResponse(responseCode = "403", description = "Access token is expired or invalid"), + @ApiResponse(responseCode = "500", description = "Failed to transfer data") + } + ) + void patch(ContainerRequestContext context, AsyncResponse response); +} diff --git a/extensions/data-plane/data-plane-public-api-v2/src/main/java/org/eclipse/edc/connector/dataplane/api/controller/DataPlanePublicApiController.java b/extensions/data-plane/data-plane-public-api-v2/src/main/java/org/eclipse/edc/connector/dataplane/api/controller/DataPlanePublicApiController.java new file mode 100644 index 00000000000..79a5a02f801 --- /dev/null +++ b/extensions/data-plane/data-plane-public-api-v2/src/main/java/org/eclipse/edc/connector/dataplane/api/controller/DataPlanePublicApiController.java @@ -0,0 +1,181 @@ +/* + * Copyright (c) 2022 Amadeus + * + * This program and the accompanying materials are made available under the + * terms of the Apache License, Version 2.0 which is available at + * https://www.apache.org/licenses/LICENSE-2.0 + * + * SPDX-License-Identifier: Apache-2.0 + * + * Contributors: + * Amadeus - initial API and implementation + * Bayerische Motoren Werke Aktiengesellschaft (BMW AG) - improvements + * Mercedes-Benz Tech Innovation GmbH - publish public api context into dedicated swagger hub page + * + */ + +package org.eclipse.edc.connector.dataplane.api.controller; + +import jakarta.ws.rs.DELETE; +import jakarta.ws.rs.GET; +import jakarta.ws.rs.PATCH; +import jakarta.ws.rs.POST; +import jakarta.ws.rs.PUT; +import jakarta.ws.rs.Path; +import jakarta.ws.rs.Produces; +import jakarta.ws.rs.container.AsyncResponse; +import jakarta.ws.rs.container.ContainerRequestContext; +import jakarta.ws.rs.container.Suspended; +import jakarta.ws.rs.core.Context; +import jakarta.ws.rs.core.HttpHeaders; +import jakarta.ws.rs.core.Response; +import jakarta.ws.rs.core.StreamingOutput; +import org.eclipse.edc.connector.dataplane.spi.iam.DataPlaneAuthorizationService; +import org.eclipse.edc.connector.dataplane.spi.pipeline.PipelineService; +import org.eclipse.edc.connector.dataplane.spi.resolver.DataAddressResolver; +import org.eclipse.edc.connector.dataplane.spi.response.TransferErrorResponse; +import org.eclipse.edc.connector.dataplane.util.sink.AsyncStreamingDataSink; +import org.eclipse.edc.spi.types.domain.transfer.DataFlowStartMessage; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ExecutorService; + +import static jakarta.ws.rs.core.MediaType.APPLICATION_JSON; +import static jakarta.ws.rs.core.MediaType.WILDCARD; +import static jakarta.ws.rs.core.Response.Status.FORBIDDEN; +import static jakarta.ws.rs.core.Response.Status.INTERNAL_SERVER_ERROR; +import static jakarta.ws.rs.core.Response.Status.UNAUTHORIZED; +import static jakarta.ws.rs.core.Response.status; + +@Path("{any:.*}") +@Produces(WILDCARD) +public class DataPlanePublicApiController implements DataPlanePublicApi { + + private final PipelineService pipelineService; + private final DataAddressResolver dataAddressResolver; + private final DataFlowRequestSupplier requestSupplier; + private final ExecutorService executorService; + private final DataPlaneAuthorizationService authorizationService; + + public DataPlanePublicApiController(PipelineService pipelineService, DataAddressResolver dataAddressResolver, + ExecutorService executorService, DataPlaneAuthorizationService authorizationService) { + this.pipelineService = pipelineService; + this.dataAddressResolver = dataAddressResolver; + this.authorizationService = authorizationService; + this.requestSupplier = new DataFlowRequestSupplier(); + this.executorService = executorService; + } + + private static Response error(Response.Status status, String error) { + return status(status).type(APPLICATION_JSON).entity(new TransferErrorResponse(List.of(error))).build(); + } + + @GET + @Override + public void get(@Context ContainerRequestContext requestContext, @Suspended AsyncResponse response) { + handle(requestContext, response); + } + + /** + * Sends a {@link POST} request to the data source and returns data. + * + * @param requestContext Request context. + * @param response Data fetched from the data source. + */ + @POST + @Override + public void post(@Context ContainerRequestContext requestContext, @Suspended AsyncResponse response) { + handle(requestContext, response); + } + + /** + * Sends a {@link PUT} request to the data source and returns data. + * + * @param requestContext Request context. + * @param response Data fetched from the data source. + */ + @PUT + @Override + public void put(@Context ContainerRequestContext requestContext, @Suspended AsyncResponse response) { + handle(requestContext, response); + } + + /** + * Sends a {@link DELETE} request to the data source and returns data. + * + * @param requestContext Request context. + * @param response Data fetched from the data source. + */ + @DELETE + @Override + public void delete(@Context ContainerRequestContext requestContext, @Suspended AsyncResponse response) { + handle(requestContext, response); + } + + /** + * Sends a {@link PATCH} request to the data source and returns data. + * + * @param requestContext Request context. + * @param response Data fetched from the data source. + */ + @PATCH + @Override + public void patch(@Context ContainerRequestContext requestContext, @Suspended AsyncResponse response) { + handle(requestContext, response); + } + + private void handle(ContainerRequestContext requestContext, AsyncResponse response) { + var contextApi = new ContainerRequestContextApiImpl(requestContext); + + var token = contextApi.headers().get(HttpHeaders.AUTHORIZATION); + if (token == null) { + response.resume(error(UNAUTHORIZED, "Missing Authorization Header")); + return; + } + + var sourceDataAddress = authorizationService.authorize(token, buildRequestData(requestContext)); + if (sourceDataAddress.failed()) { + response.resume(error(FORBIDDEN, sourceDataAddress.getFailureDetail())); + return; + } + + var startMessage = requestSupplier.apply(contextApi, sourceDataAddress.getContent()); + + processRequest(startMessage, response); + } + + private Map buildRequestData(ContainerRequestContext requestContext) { + var requestData = new HashMap(); + requestData.put("headers", requestContext.getHeaders()); + requestData.put("path", requestContext.getUriInfo()); + requestData.put("method", requestContext.getMethod()); + requestData.put("content-type", requestContext.getMediaType()); + return requestData; + } + + private void processRequest(DataFlowStartMessage dataFlowStartMessage, AsyncResponse response) { + + AsyncStreamingDataSink.AsyncResponseContext asyncResponseContext = callback -> { + StreamingOutput output = t -> callback.outputStreamConsumer().accept(t); + var resp = Response.ok(output).type(callback.mediaType()).build(); + return response.resume(resp); + }; + + var sink = new AsyncStreamingDataSink(asyncResponseContext, executorService); + + pipelineService.transfer(dataFlowStartMessage, sink) + .whenComplete((result, throwable) -> { + if (throwable == null) { + if (result.failed()) { + response.resume(error(INTERNAL_SERVER_ERROR, result.getFailureDetail())); + } + } else { + var error = "Unhandled exception occurred during data transfer: " + throwable.getMessage(); + response.resume(error(INTERNAL_SERVER_ERROR, error)); + } + }); + } + +} diff --git a/extensions/data-plane/data-plane-public-api-v2/src/main/java/org/eclipse/edc/connector/dataplane/api/validation/ConsumerPullTransferDataAddressResolver.java b/extensions/data-plane/data-plane-public-api-v2/src/main/java/org/eclipse/edc/connector/dataplane/api/validation/ConsumerPullTransferDataAddressResolver.java new file mode 100644 index 00000000000..c3f66301b82 --- /dev/null +++ b/extensions/data-plane/data-plane-public-api-v2/src/main/java/org/eclipse/edc/connector/dataplane/api/validation/ConsumerPullTransferDataAddressResolver.java @@ -0,0 +1,67 @@ +/* + * Copyright (c) 2022 Amadeus + * + * This program and the accompanying materials are made available under the + * terms of the Apache License, Version 2.0 which is available at + * https://www.apache.org/licenses/LICENSE-2.0 + * + * SPDX-License-Identifier: Apache-2.0 + * + * Contributors: + * Amadeus - initial API and implementation + * + */ + +package org.eclipse.edc.connector.dataplane.api.validation; + +import com.fasterxml.jackson.databind.ObjectMapper; +import jakarta.ws.rs.core.HttpHeaders; +import okhttp3.Request; +import org.eclipse.edc.connector.dataplane.spi.resolver.DataAddressResolver; +import org.eclipse.edc.spi.http.EdcHttpClient; +import org.eclipse.edc.spi.result.Result; +import org.eclipse.edc.spi.types.domain.DataAddress; + +import java.io.IOException; + +import static java.lang.String.format; + +public class ConsumerPullTransferDataAddressResolver implements DataAddressResolver { + + private final EdcHttpClient httpClient; + private final String endpoint; + private final ObjectMapper mapper; + + public ConsumerPullTransferDataAddressResolver(EdcHttpClient httpClient, String endpoint, ObjectMapper mapper) { + this.httpClient = httpClient; + this.endpoint = endpoint; + this.mapper = mapper; + } + + /** + * Resolves access token received in input of Data Plane public API (consumer pull) into the {@link DataAddress} + * of the requested data. + * + * @param token Access token received in input of the Data Plane public API + * @return Data address + */ + @Override + public Result resolve(String token) { + var request = new Request.Builder().url(endpoint).header(HttpHeaders.AUTHORIZATION, token).get().build(); + try (var response = httpClient.execute(request)) { + var body = response.body(); + var stringBody = body != null ? body.string() : null; + if (stringBody == null) { + return Result.failure("Token validation server returned null body"); + } + + if (response.isSuccessful()) { + return Result.success(mapper.readValue(stringBody, DataAddress.class)); + } else { + return Result.failure(format("Call to token validation sever failed: %s - %s. %s", response.code(), response.message(), stringBody)); + } + } catch (IOException e) { + return Result.failure("Unhandled exception occurred during call to token validation server: " + e.getMessage()); + } + } +} diff --git a/extensions/data-plane/data-plane-public-api-v2/src/main/resources/META-INF/services/org.eclipse.edc.spi.system.ServiceExtension b/extensions/data-plane/data-plane-public-api-v2/src/main/resources/META-INF/services/org.eclipse.edc.spi.system.ServiceExtension new file mode 100644 index 00000000000..d0da2de712a --- /dev/null +++ b/extensions/data-plane/data-plane-public-api-v2/src/main/resources/META-INF/services/org.eclipse.edc.spi.system.ServiceExtension @@ -0,0 +1 @@ +org.eclipse.edc.connector.dataplane.api.DataPlanePublicApiExtension \ No newline at end of file diff --git a/extensions/data-plane/data-plane-public-api-v2/src/test/java/org/eclipse/edc/connector/dataplane/api/controller/DataFlowStartMessageSupplierTest.java b/extensions/data-plane/data-plane-public-api-v2/src/test/java/org/eclipse/edc/connector/dataplane/api/controller/DataFlowStartMessageSupplierTest.java new file mode 100644 index 00000000000..8961e2e42f4 --- /dev/null +++ b/extensions/data-plane/data-plane-public-api-v2/src/test/java/org/eclipse/edc/connector/dataplane/api/controller/DataFlowStartMessageSupplierTest.java @@ -0,0 +1,94 @@ +/* + * Copyright (c) 2022 Amadeus + * + * This program and the accompanying materials are made available under the + * terms of the Apache License, Version 2.0 which is available at + * https://www.apache.org/licenses/LICENSE-2.0 + * + * SPDX-License-Identifier: Apache-2.0 + * + * Contributors: + * Amadeus - initial API and implementation + * + */ + +package org.eclipse.edc.connector.dataplane.api.controller; + +import jakarta.ws.rs.HttpMethod; +import jakarta.ws.rs.core.MediaType; +import org.eclipse.edc.connector.dataplane.spi.schema.DataFlowRequestSchema; +import org.eclipse.edc.connector.dataplane.util.sink.AsyncStreamingDataSink; +import org.eclipse.edc.spi.types.domain.DataAddress; +import org.junit.jupiter.api.Test; + +import java.util.Map; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +class DataFlowStartMessageSupplierTest { + + + private final DataFlowRequestSupplier supplier = new DataFlowRequestSupplier(); + + private static DataAddress createDataAddress() { + return DataAddress.Builder.newInstance().type("test-type").build(); + } + + @Test + void verifyMapping_noInputBody() { + var contextApi = mock(ContainerRequestContextApi.class); + var address = createDataAddress(); + + var method = HttpMethod.GET; + var queryParams = "test-query-param"; + var path = "test-path"; + + when(contextApi.method()).thenReturn(method); + when(contextApi.queryParams()).thenReturn(queryParams); + when(contextApi.path()).thenReturn(path); + + var request = supplier.apply(contextApi, address); + + assertThat(request.getId()).isNotBlank(); + assertThat(request.getDestinationDataAddress().getType()).isEqualTo(AsyncStreamingDataSink.TYPE); + assertThat(request.getSourceDataAddress().getType()).isEqualTo(address.getType()); + assertThat(request.getProperties()).containsExactlyInAnyOrderEntriesOf(Map.of( + DataFlowRequestSchema.PATH, path, + DataFlowRequestSchema.METHOD, method, + DataFlowRequestSchema.QUERY_PARAMS, queryParams + + )); + } + + @Test + void verifyMapping_withInputBody() { + var contextApi = mock(ContainerRequestContextApi.class); + var address = createDataAddress(); + + var method = HttpMethod.GET; + var queryParams = "test-query-param"; + var path = "test-path"; + var body = "Test request body"; + + when(contextApi.method()).thenReturn(method); + when(contextApi.queryParams()).thenReturn(queryParams); + when(contextApi.path()).thenReturn(path); + when(contextApi.mediaType()).thenReturn(MediaType.TEXT_PLAIN); + when(contextApi.body()).thenReturn(body); + + var request = supplier.apply(contextApi, address); + + assertThat(request.getId()).isNotBlank(); + assertThat(request.getDestinationDataAddress().getType()).isEqualTo(AsyncStreamingDataSink.TYPE); + assertThat(request.getSourceDataAddress().getType()).isEqualTo(address.getType()); + assertThat(request.getProperties()).containsExactlyInAnyOrderEntriesOf(Map.of( + DataFlowRequestSchema.PATH, path, + DataFlowRequestSchema.METHOD, method, + DataFlowRequestSchema.QUERY_PARAMS, queryParams, + DataFlowRequestSchema.BODY, body, + DataFlowRequestSchema.MEDIA_TYPE, MediaType.TEXT_PLAIN + )); + } +} diff --git a/extensions/data-plane/data-plane-public-api-v2/src/test/java/org/eclipse/edc/connector/dataplane/api/controller/DataPlanePublicApiControllerTest.java b/extensions/data-plane/data-plane-public-api-v2/src/test/java/org/eclipse/edc/connector/dataplane/api/controller/DataPlanePublicApiControllerTest.java new file mode 100644 index 00000000000..0af70943382 --- /dev/null +++ b/extensions/data-plane/data-plane-public-api-v2/src/test/java/org/eclipse/edc/connector/dataplane/api/controller/DataPlanePublicApiControllerTest.java @@ -0,0 +1,201 @@ +/* + * Copyright (c) 2022 Amadeus + * + * This program and the accompanying materials are made available under the + * terms of the Apache License, Version 2.0 which is available at + * https://www.apache.org/licenses/LICENSE-2.0 + * + * SPDX-License-Identifier: Apache-2.0 + * + * Contributors: + * Amadeus - initial API and implementation + * Bayerische Motoren Werke Aktiengesellschaft (BMW AG) - improvements + * + */ + +package org.eclipse.edc.connector.dataplane.api.controller; + +import io.restassured.specification.RequestSpecification; +import jakarta.ws.rs.core.Response; +import org.eclipse.edc.connector.dataplane.spi.iam.DataPlaneAuthorizationService; +import org.eclipse.edc.connector.dataplane.spi.pipeline.DataSource; +import org.eclipse.edc.connector.dataplane.spi.pipeline.PipelineService; +import org.eclipse.edc.connector.dataplane.spi.pipeline.StreamResult; +import org.eclipse.edc.connector.dataplane.spi.resolver.DataAddressResolver; +import org.eclipse.edc.connector.dataplane.util.sink.AsyncStreamingDataSink; +import org.eclipse.edc.junit.annotations.ApiTest; +import org.eclipse.edc.spi.result.Result; +import org.eclipse.edc.spi.types.domain.DataAddress; +import org.eclipse.edc.spi.types.domain.transfer.DataFlowStartMessage; +import org.eclipse.edc.web.jersey.testfixtures.RestControllerTestBase; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.mockito.ArgumentCaptor; + +import java.io.ByteArrayInputStream; +import java.io.InputStream; +import java.util.UUID; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Executors; +import java.util.stream.Stream; + +import static io.restassured.RestAssured.given; +import static io.restassured.http.ContentType.JSON; +import static jakarta.ws.rs.core.HttpHeaders.AUTHORIZATION; +import static java.util.concurrent.CompletableFuture.completedFuture; +import static java.util.concurrent.CompletableFuture.failedFuture; +import static org.assertj.core.api.Assertions.assertThat; +import static org.hamcrest.CoreMatchers.is; +import static org.hamcrest.CoreMatchers.not; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyMap; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +@ApiTest +class DataPlanePublicApiControllerTest extends RestControllerTestBase { + + private final PipelineService pipelineService = mock(); + private final DataAddressResolver dataAddressResolver = mock(); + private final DataPlaneAuthorizationService authorizationService = mock(); + + @BeforeEach + void setup() { + when(authorizationService.authorize(anyString(), anyMap())) + .thenReturn(Result.success(testDestAddress())); + } + + @Test + void should_returnBadRequest_if_missingAuthorizationHeader() { + baseRequest() + .post("/any") + .then() + .statusCode(Response.Status.UNAUTHORIZED.getStatusCode()) + .body("errors[0]", is("Missing Authorization Header")); + } + + @Test + void shouldNotReturn302_whenUrlWithoutTrailingSlash() { + baseRequest() + .post("") + .then() + .statusCode(not(302)); + } + + @Test + void should_returnForbidden_if_tokenValidationFails() { + var token = UUID.randomUUID().toString(); + when(authorizationService.authorize(anyString(), anyMap())).thenReturn(Result.failure("token is not valid")); + + baseRequest() + .header(AUTHORIZATION, token) + .post("/any") + .then() + .statusCode(Response.Status.FORBIDDEN.getStatusCode()) + .contentType(JSON) + .body("errors.size()", is(1)); + + verify(authorizationService).authorize(eq(token), anyMap()); + } + + @Test + void should_returnInternalServerError_if_transferFails() { + var token = UUID.randomUUID().toString(); + var errorMsg = UUID.randomUUID().toString(); + when(dataAddressResolver.resolve(any())).thenReturn(Result.success(testDestAddress())); + when(pipelineService.transfer(any(), any())) + .thenReturn(completedFuture(StreamResult.error(errorMsg))); + + baseRequest() + .header(AUTHORIZATION, token) + .when() + .post("/any") + .then() + .statusCode(Response.Status.INTERNAL_SERVER_ERROR.getStatusCode()) + .contentType(JSON) + .body("errors[0]", is(errorMsg)); + } + + @Test + void should_returnInternalServerError_if_transferThrows() { + var token = UUID.randomUUID().toString(); + var errorMsg = UUID.randomUUID().toString(); + when(dataAddressResolver.resolve(any())).thenReturn(Result.success(testDestAddress())); + when(pipelineService.transfer(any(DataFlowStartMessage.class), any())) + .thenReturn(failedFuture(new RuntimeException(errorMsg))); + + baseRequest() + .header(AUTHORIZATION, token) + .when() + .post("/any") + .then() + .statusCode(Response.Status.INTERNAL_SERVER_ERROR.getStatusCode()) + .contentType(JSON) + .body("errors[0]", is("Unhandled exception occurred during data transfer: " + errorMsg)); + } + + @Test + void shouldStreamSourceToResponse() { + when(dataAddressResolver.resolve(any())).thenReturn(Result.success(testDestAddress())); + when(pipelineService.transfer(any(), any())).thenAnswer(i -> { + ((AsyncStreamingDataSink) i.getArgument(1)).transfer(new TestDataSource("application/something", "data")); + return CompletableFuture.completedFuture(StreamResult.success()); + }); + + var responseBody = baseRequest() + .header(AUTHORIZATION, UUID.randomUUID().toString()) + .when() + .post("/any?foo=bar") + .then() + .log().ifError() + .statusCode(Response.Status.OK.getStatusCode()) + .contentType("application/something") + .extract().body().asString(); + + assertThat(responseBody).isEqualTo("data"); + var requestCaptor = ArgumentCaptor.forClass(DataFlowStartMessage.class); + verify(pipelineService).transfer(requestCaptor.capture(), any()); + var request = requestCaptor.getValue(); + assertThat(request.getDestinationDataAddress().getType()).isEqualTo(AsyncStreamingDataSink.TYPE); + assertThat(request.getSourceDataAddress().getType()).isEqualTo("test"); + assertThat(request.getProperties()).containsEntry("method", "POST").containsEntry("pathSegments", "any").containsEntry("queryParams", "foo=bar"); + } + + @Override + protected Object controller() { + return new DataPlanePublicApiController(pipelineService, dataAddressResolver, Executors.newSingleThreadExecutor(), authorizationService); + } + + private RequestSpecification baseRequest() { + return given() + .baseUri("http://localhost:" + port) + .when(); + } + + private DataAddress testDestAddress() { + return DataAddress.Builder.newInstance().type("test").build(); + } + + private record TestDataSource(String mediaType, String data) implements DataSource, DataSource.Part { + + @Override + public StreamResult> openPartStream() { + return StreamResult.success(Stream.of(this)); + } + + @Override + public String name() { + return "test"; + } + + @Override + public InputStream openStream() { + return new ByteArrayInputStream(data.getBytes()); + } + + } + +} diff --git a/extensions/data-plane/data-plane-public-api-v2/src/test/java/org/eclipse/edc/connector/dataplane/api/validation/ConsumerPullTransferDataAddressResolverTest.java b/extensions/data-plane/data-plane-public-api-v2/src/test/java/org/eclipse/edc/connector/dataplane/api/validation/ConsumerPullTransferDataAddressResolverTest.java new file mode 100644 index 00000000000..a6c38561e78 --- /dev/null +++ b/extensions/data-plane/data-plane-public-api-v2/src/test/java/org/eclipse/edc/connector/dataplane/api/validation/ConsumerPullTransferDataAddressResolverTest.java @@ -0,0 +1,106 @@ +/* + * Copyright (c) 2022 Amadeus + * + * This program and the accompanying materials are made available under the + * terms of the Apache License, Version 2.0 which is available at + * https://www.apache.org/licenses/LICENSE-2.0 + * + * SPDX-License-Identifier: Apache-2.0 + * + * Contributors: + * Amadeus - initial API and implementation + * + */ + +package org.eclipse.edc.connector.dataplane.api.validation; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import jakarta.ws.rs.core.HttpHeaders; +import org.eclipse.edc.spi.types.TypeManager; +import org.eclipse.edc.spi.types.domain.DataAddress; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.mockserver.integration.ClientAndServer; +import org.mockserver.model.HttpRequest; +import org.mockserver.model.HttpResponse; +import org.mockserver.model.MediaType; + +import java.util.UUID; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.eclipse.edc.junit.testfixtures.TestUtils.getFreePort; +import static org.eclipse.edc.junit.testfixtures.TestUtils.testHttpClient; +import static org.mockserver.integration.ClientAndServer.startClientAndServer; +import static org.mockserver.matchers.Times.once; +import static org.mockserver.stop.Stop.stopQuietly; + +class ConsumerPullTransferDataAddressResolverTest { + + private static final ObjectMapper MAPPER = new TypeManager().getMapper(); + private static final int PORT = getFreePort(); + private static final String TOKEN_VALIDATION_SERVER_URL = "http://localhost:" + PORT; + private static ClientAndServer validationClientAndServer; + + private ConsumerPullTransferDataAddressResolver resolver; + + @BeforeAll + public static void startServer() { + validationClientAndServer = startClientAndServer(PORT); + } + + @AfterAll + public static void stopServer() { + stopQuietly(validationClientAndServer); + } + + @BeforeEach + public void setUp() { + resolver = new ConsumerPullTransferDataAddressResolver(testHttpClient(), TOKEN_VALIDATION_SERVER_URL, MAPPER); + } + + @AfterEach + public void tearDown() { + validationClientAndServer.reset(); + } + + @Test + void verifySuccessTokenValidation() throws JsonProcessingException { + var token = UUID.randomUUID().toString(); + var address = DataAddress.Builder.newInstance() + .type("test-type") + .build(); + + validationClientAndServer.when(new HttpRequest().withHeader(HttpHeaders.AUTHORIZATION, token), once()) + .respond(HttpResponse.response() + .withStatusCode(200) + .withBody(MAPPER.writeValueAsString(address)) + .withContentType(MediaType.APPLICATION_JSON)); + + var result = resolver.resolve(token); + + assertThat(result.succeeded()).isTrue(); + assertThat(result.getContent().getType()).isEqualTo(address.getType()); + } + + @Test + void verifyFailedResultReturnedIfServerResponseIsUnsuccessful() throws JsonProcessingException { + var token = UUID.randomUUID().toString(); + var address = DataAddress.Builder.newInstance() + .type("test-type") + .build(); + + validationClientAndServer.when(new HttpRequest().withHeader(HttpHeaders.AUTHORIZATION, token), once()) + .respond(HttpResponse.response() + .withStatusCode(400) + .withBody(MAPPER.writeValueAsString(address)) + .withContentType(MediaType.APPLICATION_JSON)); + + var result = resolver.resolve(token); + + assertThat(result.failed()).isTrue(); + } +} diff --git a/settings.gradle.kts b/settings.gradle.kts index d041677c193..b12dea2c7df 100644 --- a/settings.gradle.kts +++ b/settings.gradle.kts @@ -175,6 +175,7 @@ include(":extensions:data-plane:data-plane-signaling:data-plane-signaling-api-co include(":extensions:data-plane:data-plane-signaling:data-plane-signaling-client") include(":extensions:data-plane:data-plane-signaling:data-plane-signaling-transform") include(":extensions:data-plane:data-plane-public-api") +include(":extensions:data-plane:data-plane-public-api-v2") include(":extensions:data-plane:data-plane-http") include(":extensions:data-plane:data-plane-http-oauth2") From 7b245ea5b2a68c1f8415d1f735a01d1d2719f7ae Mon Sep 17 00:00:00 2001 From: Paul Latzelsperger Date: Tue, 5 Mar 2024 16:43:10 +0100 Subject: [PATCH 2/7] removed address resolver --- .../api/DataPlanePublicApiExtension.java | 9 +- .../DataPlanePublicApiController.java | 10 +- ...nsumerPullTransferDataAddressResolver.java | 67 ----------- .../DataPlanePublicApiControllerTest.java | 2 +- ...erPullTransferDataAddressResolverTest.java | 106 ------------------ 5 files changed, 6 insertions(+), 188 deletions(-) delete mode 100644 extensions/data-plane/data-plane-public-api-v2/src/main/java/org/eclipse/edc/connector/dataplane/api/validation/ConsumerPullTransferDataAddressResolver.java delete mode 100644 extensions/data-plane/data-plane-public-api-v2/src/test/java/org/eclipse/edc/connector/dataplane/api/validation/ConsumerPullTransferDataAddressResolverTest.java diff --git a/extensions/data-plane/data-plane-public-api-v2/src/main/java/org/eclipse/edc/connector/dataplane/api/DataPlanePublicApiExtension.java b/extensions/data-plane/data-plane-public-api-v2/src/main/java/org/eclipse/edc/connector/dataplane/api/DataPlanePublicApiExtension.java index 4c3d4b729ba..25931aa1c2b 100644 --- a/extensions/data-plane/data-plane-public-api-v2/src/main/java/org/eclipse/edc/connector/dataplane/api/DataPlanePublicApiExtension.java +++ b/extensions/data-plane/data-plane-public-api-v2/src/main/java/org/eclipse/edc/connector/dataplane/api/DataPlanePublicApiExtension.java @@ -16,12 +16,10 @@ package org.eclipse.edc.connector.dataplane.api; import org.eclipse.edc.connector.dataplane.api.controller.DataPlanePublicApiController; -import org.eclipse.edc.connector.dataplane.api.validation.ConsumerPullTransferDataAddressResolver; import org.eclipse.edc.connector.dataplane.spi.iam.DataPlaneAuthorizationService; import org.eclipse.edc.connector.dataplane.spi.pipeline.PipelineService; import org.eclipse.edc.runtime.metamodel.annotation.Extension; import org.eclipse.edc.runtime.metamodel.annotation.Inject; -import org.eclipse.edc.runtime.metamodel.annotation.Setting; import org.eclipse.edc.spi.http.EdcHttpClient; import org.eclipse.edc.spi.system.ExecutorInstrumentation; import org.eclipse.edc.spi.system.ServiceExtension; @@ -46,9 +44,6 @@ public class DataPlanePublicApiExtension implements ServiceExtension { private static final String PUBLIC_CONTEXT_ALIAS = "public"; private static final String PUBLIC_CONTEXT_PATH = "/api/v1/public"; - @Setting - private static final String CONTROL_PLANE_VALIDATION_ENDPOINT = "edc.dataplane.token.validation.endpoint"; - private static final int DEFAULT_THREAD_POOL = 10; private static final WebServiceSettings PUBLIC_SETTINGS = WebServiceSettings.Builder.newInstance() @@ -89,14 +84,12 @@ public String name() { @Override public void initialize(ServiceExtensionContext context) { - var validationEndpoint = context.getConfig().getString(CONTROL_PLANE_VALIDATION_ENDPOINT); - var dataAddressResolver = new ConsumerPullTransferDataAddressResolver(httpClient, validationEndpoint, typeManager.getMapper()); var configuration = webServiceConfigurer.configure(context, webServer, PUBLIC_SETTINGS); var executorService = executorInstrumentation.instrument( Executors.newFixedThreadPool(DEFAULT_THREAD_POOL), "Data plane proxy transfers" ); - var publicApiController = new DataPlanePublicApiController(pipelineService, dataAddressResolver, executorService, authorizationService); + var publicApiController = new DataPlanePublicApiController(pipelineService, executorService, authorizationService); webService.registerResource(configuration.getContextAlias(), publicApiController); } } diff --git a/extensions/data-plane/data-plane-public-api-v2/src/main/java/org/eclipse/edc/connector/dataplane/api/controller/DataPlanePublicApiController.java b/extensions/data-plane/data-plane-public-api-v2/src/main/java/org/eclipse/edc/connector/dataplane/api/controller/DataPlanePublicApiController.java index 79a5a02f801..c716cbf41d2 100644 --- a/extensions/data-plane/data-plane-public-api-v2/src/main/java/org/eclipse/edc/connector/dataplane/api/controller/DataPlanePublicApiController.java +++ b/extensions/data-plane/data-plane-public-api-v2/src/main/java/org/eclipse/edc/connector/dataplane/api/controller/DataPlanePublicApiController.java @@ -32,7 +32,6 @@ import jakarta.ws.rs.core.StreamingOutput; import org.eclipse.edc.connector.dataplane.spi.iam.DataPlaneAuthorizationService; import org.eclipse.edc.connector.dataplane.spi.pipeline.PipelineService; -import org.eclipse.edc.connector.dataplane.spi.resolver.DataAddressResolver; import org.eclipse.edc.connector.dataplane.spi.response.TransferErrorResponse; import org.eclipse.edc.connector.dataplane.util.sink.AsyncStreamingDataSink; import org.eclipse.edc.spi.types.domain.transfer.DataFlowStartMessage; @@ -54,15 +53,14 @@ public class DataPlanePublicApiController implements DataPlanePublicApi { private final PipelineService pipelineService; - private final DataAddressResolver dataAddressResolver; private final DataFlowRequestSupplier requestSupplier; private final ExecutorService executorService; private final DataPlaneAuthorizationService authorizationService; - public DataPlanePublicApiController(PipelineService pipelineService, DataAddressResolver dataAddressResolver, - ExecutorService executorService, DataPlaneAuthorizationService authorizationService) { + public DataPlanePublicApiController(PipelineService pipelineService, + ExecutorService executorService, + DataPlaneAuthorizationService authorizationService) { this.pipelineService = pipelineService; - this.dataAddressResolver = dataAddressResolver; this.authorizationService = authorizationService; this.requestSupplier = new DataFlowRequestSupplier(); this.executorService = executorService; @@ -145,7 +143,7 @@ private void handle(ContainerRequestContext requestContext, AsyncResponse respon processRequest(startMessage, response); } - + private Map buildRequestData(ContainerRequestContext requestContext) { var requestData = new HashMap(); requestData.put("headers", requestContext.getHeaders()); diff --git a/extensions/data-plane/data-plane-public-api-v2/src/main/java/org/eclipse/edc/connector/dataplane/api/validation/ConsumerPullTransferDataAddressResolver.java b/extensions/data-plane/data-plane-public-api-v2/src/main/java/org/eclipse/edc/connector/dataplane/api/validation/ConsumerPullTransferDataAddressResolver.java deleted file mode 100644 index c3f66301b82..00000000000 --- a/extensions/data-plane/data-plane-public-api-v2/src/main/java/org/eclipse/edc/connector/dataplane/api/validation/ConsumerPullTransferDataAddressResolver.java +++ /dev/null @@ -1,67 +0,0 @@ -/* - * Copyright (c) 2022 Amadeus - * - * This program and the accompanying materials are made available under the - * terms of the Apache License, Version 2.0 which is available at - * https://www.apache.org/licenses/LICENSE-2.0 - * - * SPDX-License-Identifier: Apache-2.0 - * - * Contributors: - * Amadeus - initial API and implementation - * - */ - -package org.eclipse.edc.connector.dataplane.api.validation; - -import com.fasterxml.jackson.databind.ObjectMapper; -import jakarta.ws.rs.core.HttpHeaders; -import okhttp3.Request; -import org.eclipse.edc.connector.dataplane.spi.resolver.DataAddressResolver; -import org.eclipse.edc.spi.http.EdcHttpClient; -import org.eclipse.edc.spi.result.Result; -import org.eclipse.edc.spi.types.domain.DataAddress; - -import java.io.IOException; - -import static java.lang.String.format; - -public class ConsumerPullTransferDataAddressResolver implements DataAddressResolver { - - private final EdcHttpClient httpClient; - private final String endpoint; - private final ObjectMapper mapper; - - public ConsumerPullTransferDataAddressResolver(EdcHttpClient httpClient, String endpoint, ObjectMapper mapper) { - this.httpClient = httpClient; - this.endpoint = endpoint; - this.mapper = mapper; - } - - /** - * Resolves access token received in input of Data Plane public API (consumer pull) into the {@link DataAddress} - * of the requested data. - * - * @param token Access token received in input of the Data Plane public API - * @return Data address - */ - @Override - public Result resolve(String token) { - var request = new Request.Builder().url(endpoint).header(HttpHeaders.AUTHORIZATION, token).get().build(); - try (var response = httpClient.execute(request)) { - var body = response.body(); - var stringBody = body != null ? body.string() : null; - if (stringBody == null) { - return Result.failure("Token validation server returned null body"); - } - - if (response.isSuccessful()) { - return Result.success(mapper.readValue(stringBody, DataAddress.class)); - } else { - return Result.failure(format("Call to token validation sever failed: %s - %s. %s", response.code(), response.message(), stringBody)); - } - } catch (IOException e) { - return Result.failure("Unhandled exception occurred during call to token validation server: " + e.getMessage()); - } - } -} diff --git a/extensions/data-plane/data-plane-public-api-v2/src/test/java/org/eclipse/edc/connector/dataplane/api/controller/DataPlanePublicApiControllerTest.java b/extensions/data-plane/data-plane-public-api-v2/src/test/java/org/eclipse/edc/connector/dataplane/api/controller/DataPlanePublicApiControllerTest.java index 0af70943382..a1f7423ba27 100644 --- a/extensions/data-plane/data-plane-public-api-v2/src/test/java/org/eclipse/edc/connector/dataplane/api/controller/DataPlanePublicApiControllerTest.java +++ b/extensions/data-plane/data-plane-public-api-v2/src/test/java/org/eclipse/edc/connector/dataplane/api/controller/DataPlanePublicApiControllerTest.java @@ -166,7 +166,7 @@ void shouldStreamSourceToResponse() { @Override protected Object controller() { - return new DataPlanePublicApiController(pipelineService, dataAddressResolver, Executors.newSingleThreadExecutor(), authorizationService); + return new DataPlanePublicApiController(pipelineService, Executors.newSingleThreadExecutor(), authorizationService); } private RequestSpecification baseRequest() { diff --git a/extensions/data-plane/data-plane-public-api-v2/src/test/java/org/eclipse/edc/connector/dataplane/api/validation/ConsumerPullTransferDataAddressResolverTest.java b/extensions/data-plane/data-plane-public-api-v2/src/test/java/org/eclipse/edc/connector/dataplane/api/validation/ConsumerPullTransferDataAddressResolverTest.java deleted file mode 100644 index a6c38561e78..00000000000 --- a/extensions/data-plane/data-plane-public-api-v2/src/test/java/org/eclipse/edc/connector/dataplane/api/validation/ConsumerPullTransferDataAddressResolverTest.java +++ /dev/null @@ -1,106 +0,0 @@ -/* - * Copyright (c) 2022 Amadeus - * - * This program and the accompanying materials are made available under the - * terms of the Apache License, Version 2.0 which is available at - * https://www.apache.org/licenses/LICENSE-2.0 - * - * SPDX-License-Identifier: Apache-2.0 - * - * Contributors: - * Amadeus - initial API and implementation - * - */ - -package org.eclipse.edc.connector.dataplane.api.validation; - -import com.fasterxml.jackson.core.JsonProcessingException; -import com.fasterxml.jackson.databind.ObjectMapper; -import jakarta.ws.rs.core.HttpHeaders; -import org.eclipse.edc.spi.types.TypeManager; -import org.eclipse.edc.spi.types.domain.DataAddress; -import org.junit.jupiter.api.AfterAll; -import org.junit.jupiter.api.AfterEach; -import org.junit.jupiter.api.BeforeAll; -import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Test; -import org.mockserver.integration.ClientAndServer; -import org.mockserver.model.HttpRequest; -import org.mockserver.model.HttpResponse; -import org.mockserver.model.MediaType; - -import java.util.UUID; - -import static org.assertj.core.api.Assertions.assertThat; -import static org.eclipse.edc.junit.testfixtures.TestUtils.getFreePort; -import static org.eclipse.edc.junit.testfixtures.TestUtils.testHttpClient; -import static org.mockserver.integration.ClientAndServer.startClientAndServer; -import static org.mockserver.matchers.Times.once; -import static org.mockserver.stop.Stop.stopQuietly; - -class ConsumerPullTransferDataAddressResolverTest { - - private static final ObjectMapper MAPPER = new TypeManager().getMapper(); - private static final int PORT = getFreePort(); - private static final String TOKEN_VALIDATION_SERVER_URL = "http://localhost:" + PORT; - private static ClientAndServer validationClientAndServer; - - private ConsumerPullTransferDataAddressResolver resolver; - - @BeforeAll - public static void startServer() { - validationClientAndServer = startClientAndServer(PORT); - } - - @AfterAll - public static void stopServer() { - stopQuietly(validationClientAndServer); - } - - @BeforeEach - public void setUp() { - resolver = new ConsumerPullTransferDataAddressResolver(testHttpClient(), TOKEN_VALIDATION_SERVER_URL, MAPPER); - } - - @AfterEach - public void tearDown() { - validationClientAndServer.reset(); - } - - @Test - void verifySuccessTokenValidation() throws JsonProcessingException { - var token = UUID.randomUUID().toString(); - var address = DataAddress.Builder.newInstance() - .type("test-type") - .build(); - - validationClientAndServer.when(new HttpRequest().withHeader(HttpHeaders.AUTHORIZATION, token), once()) - .respond(HttpResponse.response() - .withStatusCode(200) - .withBody(MAPPER.writeValueAsString(address)) - .withContentType(MediaType.APPLICATION_JSON)); - - var result = resolver.resolve(token); - - assertThat(result.succeeded()).isTrue(); - assertThat(result.getContent().getType()).isEqualTo(address.getType()); - } - - @Test - void verifyFailedResultReturnedIfServerResponseIsUnsuccessful() throws JsonProcessingException { - var token = UUID.randomUUID().toString(); - var address = DataAddress.Builder.newInstance() - .type("test-type") - .build(); - - validationClientAndServer.when(new HttpRequest().withHeader(HttpHeaders.AUTHORIZATION, token), once()) - .respond(HttpResponse.response() - .withStatusCode(400) - .withBody(MAPPER.writeValueAsString(address)) - .withContentType(MediaType.APPLICATION_JSON)); - - var result = resolver.resolve(token); - - assertThat(result.failed()).isTrue(); - } -} From b2296805f3d03199526e54caf427514aa02b48c5 Mon Sep 17 00:00:00 2001 From: Paul Latzelsperger Date: Tue, 5 Mar 2024 17:30:56 +0100 Subject: [PATCH 3/7] wip --- .../api/DataPlanePublicApiExtension.java | 6 ++-- ...blicApi.java => DataPlanePublicApiV2.java} | 3 +- ...va => DataPlanePublicApiV2Controller.java} | 10 +++--- ...> DataPlanePublicApiV2ControllerTest.java} | 4 +-- .../runtimes/data-plane/build.gradle.kts | 2 +- .../edc/test/e2e/AbstractDataPlaneTest.java | 33 +++++++++++++++++++ .../e2e/DataPlanePublicApiEndToEndTest.java | 24 ++++++++++++++ .../DataPlaneSignalingApiEndToEndTest.java | 20 ++--------- 8 files changed, 72 insertions(+), 30 deletions(-) rename extensions/data-plane/data-plane-public-api-v2/src/main/java/org/eclipse/edc/connector/dataplane/api/controller/{DataPlanePublicApi.java => DataPlanePublicApiV2.java} (98%) rename extensions/data-plane/data-plane-public-api-v2/src/main/java/org/eclipse/edc/connector/dataplane/api/controller/{DataPlanePublicApiController.java => DataPlanePublicApiV2Controller.java} (95%) rename extensions/data-plane/data-plane-public-api-v2/src/test/java/org/eclipse/edc/connector/dataplane/api/controller/{DataPlanePublicApiControllerTest.java => DataPlanePublicApiV2ControllerTest.java} (97%) create mode 100644 system-tests/e2e-dataplane-tests/tests/src/test/java/org/eclipse/edc/test/e2e/AbstractDataPlaneTest.java create mode 100644 system-tests/e2e-dataplane-tests/tests/src/test/java/org/eclipse/edc/test/e2e/DataPlanePublicApiEndToEndTest.java diff --git a/extensions/data-plane/data-plane-public-api-v2/src/main/java/org/eclipse/edc/connector/dataplane/api/DataPlanePublicApiExtension.java b/extensions/data-plane/data-plane-public-api-v2/src/main/java/org/eclipse/edc/connector/dataplane/api/DataPlanePublicApiExtension.java index 25931aa1c2b..3051bc3bb3f 100644 --- a/extensions/data-plane/data-plane-public-api-v2/src/main/java/org/eclipse/edc/connector/dataplane/api/DataPlanePublicApiExtension.java +++ b/extensions/data-plane/data-plane-public-api-v2/src/main/java/org/eclipse/edc/connector/dataplane/api/DataPlanePublicApiExtension.java @@ -15,7 +15,7 @@ package org.eclipse.edc.connector.dataplane.api; -import org.eclipse.edc.connector.dataplane.api.controller.DataPlanePublicApiController; +import org.eclipse.edc.connector.dataplane.api.controller.DataPlanePublicApiV2Controller; import org.eclipse.edc.connector.dataplane.spi.iam.DataPlaneAuthorizationService; import org.eclipse.edc.connector.dataplane.spi.pipeline.PipelineService; import org.eclipse.edc.runtime.metamodel.annotation.Extension; @@ -42,7 +42,7 @@ public class DataPlanePublicApiExtension implements ServiceExtension { private static final int DEFAULT_PUBLIC_PORT = 8185; private static final String PUBLIC_API_CONFIG = "web.http.public"; private static final String PUBLIC_CONTEXT_ALIAS = "public"; - private static final String PUBLIC_CONTEXT_PATH = "/api/v1/public"; + private static final String PUBLIC_CONTEXT_PATH = "/api/v2/public"; private static final int DEFAULT_THREAD_POOL = 10; @@ -89,7 +89,7 @@ public void initialize(ServiceExtensionContext context) { Executors.newFixedThreadPool(DEFAULT_THREAD_POOL), "Data plane proxy transfers" ); - var publicApiController = new DataPlanePublicApiController(pipelineService, executorService, authorizationService); + var publicApiController = new DataPlanePublicApiV2Controller(pipelineService, executorService, authorizationService); webService.registerResource(configuration.getContextAlias(), publicApiController); } } diff --git a/extensions/data-plane/data-plane-public-api-v2/src/main/java/org/eclipse/edc/connector/dataplane/api/controller/DataPlanePublicApi.java b/extensions/data-plane/data-plane-public-api-v2/src/main/java/org/eclipse/edc/connector/dataplane/api/controller/DataPlanePublicApiV2.java similarity index 98% rename from extensions/data-plane/data-plane-public-api-v2/src/main/java/org/eclipse/edc/connector/dataplane/api/controller/DataPlanePublicApi.java rename to extensions/data-plane/data-plane-public-api-v2/src/main/java/org/eclipse/edc/connector/dataplane/api/controller/DataPlanePublicApiV2.java index 0f40b8a733f..426d96ccfca 100644 --- a/extensions/data-plane/data-plane-public-api-v2/src/main/java/org/eclipse/edc/connector/dataplane/api/controller/DataPlanePublicApi.java +++ b/extensions/data-plane/data-plane-public-api-v2/src/main/java/org/eclipse/edc/connector/dataplane/api/controller/DataPlanePublicApiV2.java @@ -27,13 +27,12 @@ "data from the provider data source (e.g. backend Rest API, internal database...) through its Data Plane" + "instance. Thus the Data Plane is the only entry/output door for the data, which avoids the provider to expose" + "directly its data externally." + - "" + "The Data Plane public API being a proxy, it supports all verbs (i.e. GET, POST, PUT, PATCH, DELETE), which" + "can then conveyed until the data source is required. This is especially useful when the actual data source" + "is a Rest API itself." + "In the same manner, any set of arbitrary query parameters, path parameters and request body are supported " + "(in the limits fixed by the HTTP server) and can also conveyed to the actual data source.") -public interface DataPlanePublicApi { +public interface DataPlanePublicApiV2 { @Operation(description = "Send `GET` data query to the Data Plane.", responses = { diff --git a/extensions/data-plane/data-plane-public-api-v2/src/main/java/org/eclipse/edc/connector/dataplane/api/controller/DataPlanePublicApiController.java b/extensions/data-plane/data-plane-public-api-v2/src/main/java/org/eclipse/edc/connector/dataplane/api/controller/DataPlanePublicApiV2Controller.java similarity index 95% rename from extensions/data-plane/data-plane-public-api-v2/src/main/java/org/eclipse/edc/connector/dataplane/api/controller/DataPlanePublicApiController.java rename to extensions/data-plane/data-plane-public-api-v2/src/main/java/org/eclipse/edc/connector/dataplane/api/controller/DataPlanePublicApiV2Controller.java index c716cbf41d2..a8a4aaf1d20 100644 --- a/extensions/data-plane/data-plane-public-api-v2/src/main/java/org/eclipse/edc/connector/dataplane/api/controller/DataPlanePublicApiController.java +++ b/extensions/data-plane/data-plane-public-api-v2/src/main/java/org/eclipse/edc/connector/dataplane/api/controller/DataPlanePublicApiV2Controller.java @@ -48,18 +48,18 @@ import static jakarta.ws.rs.core.Response.Status.UNAUTHORIZED; import static jakarta.ws.rs.core.Response.status; -@Path("{any:.*}") +@Path("/v2/{any:.*}") @Produces(WILDCARD) -public class DataPlanePublicApiController implements DataPlanePublicApi { +public class DataPlanePublicApiV2Controller implements DataPlanePublicApiV2 { private final PipelineService pipelineService; private final DataFlowRequestSupplier requestSupplier; private final ExecutorService executorService; private final DataPlaneAuthorizationService authorizationService; - public DataPlanePublicApiController(PipelineService pipelineService, - ExecutorService executorService, - DataPlaneAuthorizationService authorizationService) { + public DataPlanePublicApiV2Controller(PipelineService pipelineService, + ExecutorService executorService, + DataPlaneAuthorizationService authorizationService) { this.pipelineService = pipelineService; this.authorizationService = authorizationService; this.requestSupplier = new DataFlowRequestSupplier(); diff --git a/extensions/data-plane/data-plane-public-api-v2/src/test/java/org/eclipse/edc/connector/dataplane/api/controller/DataPlanePublicApiControllerTest.java b/extensions/data-plane/data-plane-public-api-v2/src/test/java/org/eclipse/edc/connector/dataplane/api/controller/DataPlanePublicApiV2ControllerTest.java similarity index 97% rename from extensions/data-plane/data-plane-public-api-v2/src/test/java/org/eclipse/edc/connector/dataplane/api/controller/DataPlanePublicApiControllerTest.java rename to extensions/data-plane/data-plane-public-api-v2/src/test/java/org/eclipse/edc/connector/dataplane/api/controller/DataPlanePublicApiV2ControllerTest.java index a1f7423ba27..5c9ebe63ea9 100644 --- a/extensions/data-plane/data-plane-public-api-v2/src/test/java/org/eclipse/edc/connector/dataplane/api/controller/DataPlanePublicApiControllerTest.java +++ b/extensions/data-plane/data-plane-public-api-v2/src/test/java/org/eclipse/edc/connector/dataplane/api/controller/DataPlanePublicApiV2ControllerTest.java @@ -56,7 +56,7 @@ import static org.mockito.Mockito.when; @ApiTest -class DataPlanePublicApiControllerTest extends RestControllerTestBase { +class DataPlanePublicApiV2ControllerTest extends RestControllerTestBase { private final PipelineService pipelineService = mock(); private final DataAddressResolver dataAddressResolver = mock(); @@ -166,7 +166,7 @@ void shouldStreamSourceToResponse() { @Override protected Object controller() { - return new DataPlanePublicApiController(pipelineService, Executors.newSingleThreadExecutor(), authorizationService); + return new DataPlanePublicApiV2Controller(pipelineService, Executors.newSingleThreadExecutor(), authorizationService); } private RequestSpecification baseRequest() { diff --git a/system-tests/e2e-dataplane-tests/runtimes/data-plane/build.gradle.kts b/system-tests/e2e-dataplane-tests/runtimes/data-plane/build.gradle.kts index cdb72787362..25cceb3888c 100644 --- a/system-tests/e2e-dataplane-tests/runtimes/data-plane/build.gradle.kts +++ b/system-tests/e2e-dataplane-tests/runtimes/data-plane/build.gradle.kts @@ -24,7 +24,7 @@ dependencies { implementation(project(":extensions:control-plane:api:control-plane-api-client")) implementation(project(":extensions:data-plane:data-plane-http")) implementation(project(":extensions:data-plane:data-plane-control-api")) - implementation(project(":extensions:data-plane:data-plane-public-api")) + implementation(project(":extensions:data-plane:data-plane-public-api-v2")) implementation(project(":extensions:common:vault:vault-filesystem")) } diff --git a/system-tests/e2e-dataplane-tests/tests/src/test/java/org/eclipse/edc/test/e2e/AbstractDataPlaneTest.java b/system-tests/e2e-dataplane-tests/tests/src/test/java/org/eclipse/edc/test/e2e/AbstractDataPlaneTest.java new file mode 100644 index 00000000000..464ee6ab269 --- /dev/null +++ b/system-tests/e2e-dataplane-tests/tests/src/test/java/org/eclipse/edc/test/e2e/AbstractDataPlaneTest.java @@ -0,0 +1,33 @@ +/* + * Copyright (c) 2024 Bayerische Motoren Werke Aktiengesellschaft (BMW AG) + * + * This program and the accompanying materials are made available under the + * terms of the Apache License, Version 2.0 which is available at + * https://www.apache.org/licenses/LICENSE-2.0 + * + * SPDX-License-Identifier: Apache-2.0 + * + * Contributors: + * Bayerische Motoren Werke Aktiengesellschaft (BMW AG) - initial API and implementation + * + */ + +package org.eclipse.edc.test.e2e; + +import org.eclipse.edc.junit.extensions.EdcRuntimeExtension; +import org.eclipse.edc.test.e2e.participant.DataPlaneParticipant; +import org.junit.jupiter.api.extension.RegisterExtension; + +public abstract class AbstractDataPlaneTest { + protected static final DataPlaneParticipant DATAPLANE = DataPlaneParticipant.Builder.newInstance() + .name("provider") + .id("urn:connector:provider") + .build(); + @RegisterExtension + static EdcRuntimeExtension runtime = + new EdcRuntimeExtension( + ":system-tests:e2e-dataplane-tests:runtimes:data-plane", + "data-plane", + DATAPLANE.dataPlaneConfiguration() + ); +} diff --git a/system-tests/e2e-dataplane-tests/tests/src/test/java/org/eclipse/edc/test/e2e/DataPlanePublicApiEndToEndTest.java b/system-tests/e2e-dataplane-tests/tests/src/test/java/org/eclipse/edc/test/e2e/DataPlanePublicApiEndToEndTest.java new file mode 100644 index 00000000000..8875a80710e --- /dev/null +++ b/system-tests/e2e-dataplane-tests/tests/src/test/java/org/eclipse/edc/test/e2e/DataPlanePublicApiEndToEndTest.java @@ -0,0 +1,24 @@ +/* + * Copyright (c) 2024 Bayerische Motoren Werke Aktiengesellschaft (BMW AG) + * + * This program and the accompanying materials are made available under the + * terms of the Apache License, Version 2.0 which is available at + * https://www.apache.org/licenses/LICENSE-2.0 + * + * SPDX-License-Identifier: Apache-2.0 + * + * Contributors: + * Bayerische Motoren Werke Aktiengesellschaft (BMW AG) - initial API and implementation + * + */ + +package org.eclipse.edc.test.e2e; + +import org.junit.jupiter.api.Test; + +public class DataPlanePublicApiEndToEndTest extends AbstractDataPlaneTest { + @Test + void foo() { + + } +} diff --git a/system-tests/e2e-dataplane-tests/tests/src/test/java/org/eclipse/edc/test/e2e/DataPlaneSignalingApiEndToEndTest.java b/system-tests/e2e-dataplane-tests/tests/src/test/java/org/eclipse/edc/test/e2e/DataPlaneSignalingApiEndToEndTest.java index 16339d97afc..b58dc0588ef 100644 --- a/system-tests/e2e-dataplane-tests/tests/src/test/java/org/eclipse/edc/test/e2e/DataPlaneSignalingApiEndToEndTest.java +++ b/system-tests/e2e-dataplane-tests/tests/src/test/java/org/eclipse/edc/test/e2e/DataPlaneSignalingApiEndToEndTest.java @@ -32,18 +32,15 @@ import org.eclipse.edc.core.transform.TypeTransformerRegistryImpl; import org.eclipse.edc.jsonld.util.JacksonJsonLd; import org.eclipse.edc.junit.annotations.EndToEndTest; -import org.eclipse.edc.junit.extensions.EdcRuntimeExtension; import org.eclipse.edc.spi.result.Failure; import org.eclipse.edc.spi.types.domain.DataAddress; import org.eclipse.edc.spi.types.domain.transfer.DataFlowStartMessage; import org.eclipse.edc.spi.types.domain.transfer.DataFlowTerminateMessage; import org.eclipse.edc.spi.types.domain.transfer.FlowType; -import org.eclipse.edc.test.e2e.participant.DataPlaneParticipant; import org.jetbrains.annotations.NotNull; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.DisplayName; import org.junit.jupiter.api.Test; -import org.junit.jupiter.api.extension.RegisterExtension; import java.net.URI; import java.time.Duration; @@ -59,20 +56,9 @@ import static org.hamcrest.Matchers.notNullValue; @EndToEndTest -public class DataPlaneSignalingApiEndToEndTest { - - public static final String DATAPLANE_PUBLIC_ENDPOINT_URL = "http://fizz.buzz/bar"; - protected static final DataPlaneParticipant DATAPLANE = DataPlaneParticipant.Builder.newInstance() - .name("provider") - .id("urn:connector:provider") - .build(); - @RegisterExtension - static EdcRuntimeExtension runtime = - new EdcRuntimeExtension( - ":system-tests:e2e-dataplane-tests:runtimes:data-plane", - "data-plane", - DATAPLANE.dataPlaneConfiguration() - ); +public class DataPlaneSignalingApiEndToEndTest extends AbstractDataPlaneTest { + + private static final String DATAPLANE_PUBLIC_ENDPOINT_URL = "http://fizz.buzz/bar"; protected final Duration timeout = Duration.ofSeconds(60); private ObjectMapper mapper; From 2c137c72f8837d960d1e116a92c788cc025fc4f9 Mon Sep 17 00:00:00 2001 From: Paul Latzelsperger Date: Wed, 6 Mar 2024 12:21:42 +0100 Subject: [PATCH 4/7] add tests for public api V2 --- .../api/DataPlanePublicApiExtension.java | 8 - .../accesstokendata-store-sql/docs/schema.sql | 6 +- .../store/sql/SqlAccessTokenDataStore.java | 10 +- .../sql/schema/AccessTokenDataStatements.java | 4 + .../schema/BaseSqlAccessTokenStatements.java | 1 + .../postgres/AccessTokenDataMapping.java | 1 + .../dataplane/spi/AccessTokenData.java | 14 +- .../spi/store/AccessTokenDataTestBase.java | 3 +- .../tests/build.gradle.kts | 1 + .../edc/test/e2e/AbstractDataPlaneTest.java | 2 +- .../e2e/DataPlanePublicApiEndToEndTest.java | 143 +++++++++++++++++- .../DataPlaneSignalingApiEndToEndTest.java | 17 ++- .../e2e/participant/DataPlaneParticipant.java | 26 +--- 13 files changed, 195 insertions(+), 41 deletions(-) diff --git a/extensions/data-plane/data-plane-public-api-v2/src/main/java/org/eclipse/edc/connector/dataplane/api/DataPlanePublicApiExtension.java b/extensions/data-plane/data-plane-public-api-v2/src/main/java/org/eclipse/edc/connector/dataplane/api/DataPlanePublicApiExtension.java index 3051bc3bb3f..d4c1909c187 100644 --- a/extensions/data-plane/data-plane-public-api-v2/src/main/java/org/eclipse/edc/connector/dataplane/api/DataPlanePublicApiExtension.java +++ b/extensions/data-plane/data-plane-public-api-v2/src/main/java/org/eclipse/edc/connector/dataplane/api/DataPlanePublicApiExtension.java @@ -20,11 +20,9 @@ import org.eclipse.edc.connector.dataplane.spi.pipeline.PipelineService; import org.eclipse.edc.runtime.metamodel.annotation.Extension; import org.eclipse.edc.runtime.metamodel.annotation.Inject; -import org.eclipse.edc.spi.http.EdcHttpClient; import org.eclipse.edc.spi.system.ExecutorInstrumentation; import org.eclipse.edc.spi.system.ServiceExtension; import org.eclipse.edc.spi.system.ServiceExtensionContext; -import org.eclipse.edc.spi.types.TypeManager; import org.eclipse.edc.web.spi.WebServer; import org.eclipse.edc.web.spi.WebService; import org.eclipse.edc.web.spi.configuration.WebServiceConfigurer; @@ -66,12 +64,6 @@ public class DataPlanePublicApiExtension implements ServiceExtension { @Inject private WebService webService; - @Inject - private EdcHttpClient httpClient; - - @Inject - private TypeManager typeManager; - @Inject private ExecutorInstrumentation executorInstrumentation; @Inject diff --git a/extensions/data-plane/store/sql/accesstokendata-store-sql/docs/schema.sql b/extensions/data-plane/store/sql/accesstokendata-store-sql/docs/schema.sql index 707a0f42082..de43bc74ce1 100644 --- a/extensions/data-plane/store/sql/accesstokendata-store-sql/docs/schema.sql +++ b/extensions/data-plane/store/sql/accesstokendata-store-sql/docs/schema.sql @@ -17,9 +17,11 @@ CREATE TABLE IF NOT EXISTS edc_accesstokendata ( id VARCHAR NOT NULL PRIMARY KEY, - claim_token json NOT NULL, - data_address JSON NOT NULL + claim_token JSON NOT NULL, + data_address JSON NOT NULL, + additional_properties JSON DEFAULT '{}' ); COMMENT ON COLUMN edc_accesstokendata.claim_token IS 'ClaimToken serialized as JSON map'; COMMENT ON COLUMN edc_accesstokendata.data_address IS 'DataAddress serialized as JSON map'; +COMMENT ON COLUMN edc_accesstokendata.additional_properties IS 'Optional Additional properties serialized as JSON map'; diff --git a/extensions/data-plane/store/sql/accesstokendata-store-sql/src/main/java/org/eclipse/edc/connector/dataplane/store/sql/SqlAccessTokenDataStore.java b/extensions/data-plane/store/sql/accesstokendata-store-sql/src/main/java/org/eclipse/edc/connector/dataplane/store/sql/SqlAccessTokenDataStore.java index 030d348b340..25cefcd0a8e 100644 --- a/extensions/data-plane/store/sql/accesstokendata-store-sql/src/main/java/org/eclipse/edc/connector/dataplane/store/sql/SqlAccessTokenDataStore.java +++ b/extensions/data-plane/store/sql/accesstokendata-store-sql/src/main/java/org/eclipse/edc/connector/dataplane/store/sql/SqlAccessTokenDataStore.java @@ -14,6 +14,7 @@ package org.eclipse.edc.connector.dataplane.store.sql; +import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.databind.ObjectMapper; import org.eclipse.edc.connector.dataplane.spi.AccessTokenData; import org.eclipse.edc.connector.dataplane.spi.store.AccessTokenDataStore; @@ -34,6 +35,7 @@ import java.sql.ResultSet; import java.sql.SQLException; import java.util.Collection; +import java.util.Map; import static org.eclipse.edc.spi.query.Criterion.criterion; @@ -42,6 +44,8 @@ */ public class SqlAccessTokenDataStore extends AbstractSqlStore implements AccessTokenDataStore { + private static final TypeReference> MAP_TYPE_REF = new TypeReference<>() { + }; private final AccessTokenDataStatements statements; public SqlAccessTokenDataStore(DataSourceRegistry dataSourceRegistry, @@ -113,7 +117,8 @@ private void insert(Connection connection, AccessTokenData dataFlow) { queryExecutor.execute(connection, sql, dataFlow.id(), toJson(dataFlow.claimToken()), - toJson(dataFlow.dataAddress()) + toJson(dataFlow.dataAddress()), + toJson(dataFlow.additionalProperties()) ); } @@ -121,9 +126,10 @@ private void insert(Connection connection, AccessTokenData dataFlow) { private AccessTokenData mapAccessTokenData(ResultSet resultSet) throws SQLException { var claimToken = fromJson(resultSet.getString(statements.getClaimTokenColumn()), ClaimToken.class); var dataAddress = fromJson(resultSet.getString(statements.getDataAddressColumn()), DataAddress.class); + var additionalProperties = fromJson(resultSet.getString(statements.getAdditionalPropertiesColumn()), MAP_TYPE_REF); var id = resultSet.getString(statements.getIdColumn()); - return new AccessTokenData(id, claimToken, dataAddress); + return new AccessTokenData(id, claimToken, dataAddress, additionalProperties); } private @Nullable AccessTokenData findByIdInternal(Connection conn, String id) { diff --git a/extensions/data-plane/store/sql/accesstokendata-store-sql/src/main/java/org/eclipse/edc/connector/dataplane/store/sql/schema/AccessTokenDataStatements.java b/extensions/data-plane/store/sql/accesstokendata-store-sql/src/main/java/org/eclipse/edc/connector/dataplane/store/sql/schema/AccessTokenDataStatements.java index 2f7bfc6f2e6..ed7a724e71f 100644 --- a/extensions/data-plane/store/sql/accesstokendata-store-sql/src/main/java/org/eclipse/edc/connector/dataplane/store/sql/schema/AccessTokenDataStatements.java +++ b/extensions/data-plane/store/sql/accesstokendata-store-sql/src/main/java/org/eclipse/edc/connector/dataplane/store/sql/schema/AccessTokenDataStatements.java @@ -39,6 +39,10 @@ default String getDataAddressColumn() { return "data_address"; } + default String getAdditionalPropertiesColumn() { + return "additional_properties"; + } + String getInsertTemplate(); String getSelectTemplate(); diff --git a/extensions/data-plane/store/sql/accesstokendata-store-sql/src/main/java/org/eclipse/edc/connector/dataplane/store/sql/schema/BaseSqlAccessTokenStatements.java b/extensions/data-plane/store/sql/accesstokendata-store-sql/src/main/java/org/eclipse/edc/connector/dataplane/store/sql/schema/BaseSqlAccessTokenStatements.java index 88139d33799..46a647c24e8 100644 --- a/extensions/data-plane/store/sql/accesstokendata-store-sql/src/main/java/org/eclipse/edc/connector/dataplane/store/sql/schema/BaseSqlAccessTokenStatements.java +++ b/extensions/data-plane/store/sql/accesstokendata-store-sql/src/main/java/org/eclipse/edc/connector/dataplane/store/sql/schema/BaseSqlAccessTokenStatements.java @@ -33,6 +33,7 @@ public String getInsertTemplate() { .column(getIdColumn()) .jsonColumn(getClaimTokenColumn()) .jsonColumn(getDataAddressColumn()) + .jsonColumn(getAdditionalPropertiesColumn()) .insertInto(getTableName()); } diff --git a/extensions/data-plane/store/sql/accesstokendata-store-sql/src/main/java/org/eclipse/edc/connector/dataplane/store/sql/schema/postgres/AccessTokenDataMapping.java b/extensions/data-plane/store/sql/accesstokendata-store-sql/src/main/java/org/eclipse/edc/connector/dataplane/store/sql/schema/postgres/AccessTokenDataMapping.java index 3741953dce7..ef6ddef26c0 100644 --- a/extensions/data-plane/store/sql/accesstokendata-store-sql/src/main/java/org/eclipse/edc/connector/dataplane/store/sql/schema/postgres/AccessTokenDataMapping.java +++ b/extensions/data-plane/store/sql/accesstokendata-store-sql/src/main/java/org/eclipse/edc/connector/dataplane/store/sql/schema/postgres/AccessTokenDataMapping.java @@ -29,5 +29,6 @@ public AccessTokenDataMapping(AccessTokenDataStatements statements) { add("id", statements.getIdColumn()); add("claimToken", new JsonFieldTranslator(statements.getClaimTokenColumn())); add("dataAddress", new JsonFieldTranslator(statements.getDataAddressColumn())); + add("additionalProperties", new JsonFieldTranslator(statements.getAdditionalPropertiesColumn())); } } diff --git a/spi/data-plane/data-plane-spi/src/main/java/org/eclipse/edc/connector/dataplane/spi/AccessTokenData.java b/spi/data-plane/data-plane-spi/src/main/java/org/eclipse/edc/connector/dataplane/spi/AccessTokenData.java index c1bc0f8b354..a392308545b 100644 --- a/spi/data-plane/data-plane-spi/src/main/java/org/eclipse/edc/connector/dataplane/spi/AccessTokenData.java +++ b/spi/data-plane/data-plane-spi/src/main/java/org/eclipse/edc/connector/dataplane/spi/AccessTokenData.java @@ -17,10 +17,22 @@ import org.eclipse.edc.spi.iam.ClaimToken; import org.eclipse.edc.spi.types.domain.DataAddress; +import java.util.Map; + /** * Container object for a {@link ClaimToken} and a {@link DataAddress} that the data plane uses to keep track of currently * all access tokens that are currently valid. + * + * @param id The correlation ID of the EDR, that is authorized for this data resources. The token, that is stored inside the + * EDR must carry this information. For JWTs this would be the "jti" claim. + * @param claimToken The representation of the EDR + * @param dataAddress The data resource (= source address) for which the token is authorized + * @param additionalProperties (optional) a list of additional properties that should be persisted with the AccessTokenData, for example refresh tokens, etc. */ -public record AccessTokenData(String id, ClaimToken claimToken, DataAddress dataAddress) { +public record AccessTokenData(String id, ClaimToken claimToken, DataAddress dataAddress, + Map additionalProperties) { + public AccessTokenData(String id, ClaimToken claimToken, DataAddress dataAddress) { + this(id, claimToken, dataAddress, Map.of()); + } } diff --git a/spi/data-plane/data-plane-spi/src/testFixtures/java/org/eclipse/edc/connector/dataplane/spi/store/AccessTokenDataTestBase.java b/spi/data-plane/data-plane-spi/src/testFixtures/java/org/eclipse/edc/connector/dataplane/spi/store/AccessTokenDataTestBase.java index a5b328486c8..f4514186907 100644 --- a/spi/data-plane/data-plane-spi/src/testFixtures/java/org/eclipse/edc/connector/dataplane/spi/store/AccessTokenDataTestBase.java +++ b/spi/data-plane/data-plane-spi/src/testFixtures/java/org/eclipse/edc/connector/dataplane/spi/store/AccessTokenDataTestBase.java @@ -25,6 +25,7 @@ import java.util.Comparator; import java.util.List; +import java.util.Map; import java.util.stream.IntStream; import static org.assertj.core.api.Assertions.assertThat; @@ -161,6 +162,6 @@ protected DataAddress dataAddress() { } protected AccessTokenData accessTokenData(String id) { - return new AccessTokenData(id, ClaimToken.Builder.newInstance().build(), dataAddress()); + return new AccessTokenData(id, ClaimToken.Builder.newInstance().build(), dataAddress(), Map.of("foo", List.of("bar", "baz"))); } } diff --git a/system-tests/e2e-dataplane-tests/tests/build.gradle.kts b/system-tests/e2e-dataplane-tests/tests/build.gradle.kts index 1d06fdfe57d..6f6d290c63e 100644 --- a/system-tests/e2e-dataplane-tests/tests/build.gradle.kts +++ b/system-tests/e2e-dataplane-tests/tests/build.gradle.kts @@ -35,6 +35,7 @@ dependencies { testImplementation(libs.mockserver.netty) testImplementation(libs.mockserver.client) testImplementation(project(":core:common:transform-core")) // for the transformer registry impl + testImplementation(project(":extensions:common:crypto:crypto-common")) testCompileOnly(project(":system-tests:e2e-dataplane-tests:runtimes:data-plane")) } diff --git a/system-tests/e2e-dataplane-tests/tests/src/test/java/org/eclipse/edc/test/e2e/AbstractDataPlaneTest.java b/system-tests/e2e-dataplane-tests/tests/src/test/java/org/eclipse/edc/test/e2e/AbstractDataPlaneTest.java index 464ee6ab269..5b72fcd1a17 100644 --- a/system-tests/e2e-dataplane-tests/tests/src/test/java/org/eclipse/edc/test/e2e/AbstractDataPlaneTest.java +++ b/system-tests/e2e-dataplane-tests/tests/src/test/java/org/eclipse/edc/test/e2e/AbstractDataPlaneTest.java @@ -24,7 +24,7 @@ public abstract class AbstractDataPlaneTest { .id("urn:connector:provider") .build(); @RegisterExtension - static EdcRuntimeExtension runtime = + protected static EdcRuntimeExtension runtime = new EdcRuntimeExtension( ":system-tests:e2e-dataplane-tests:runtimes:data-plane", "data-plane", diff --git a/system-tests/e2e-dataplane-tests/tests/src/test/java/org/eclipse/edc/test/e2e/DataPlanePublicApiEndToEndTest.java b/system-tests/e2e-dataplane-tests/tests/src/test/java/org/eclipse/edc/test/e2e/DataPlanePublicApiEndToEndTest.java index 8875a80710e..b0c1d8ac968 100644 --- a/system-tests/e2e-dataplane-tests/tests/src/test/java/org/eclipse/edc/test/e2e/DataPlanePublicApiEndToEndTest.java +++ b/system-tests/e2e-dataplane-tests/tests/src/test/java/org/eclipse/edc/test/e2e/DataPlanePublicApiEndToEndTest.java @@ -14,11 +14,150 @@ package org.eclipse.edc.test.e2e; +import com.nimbusds.jose.JOSEException; +import com.nimbusds.jose.JWSAlgorithm; +import com.nimbusds.jose.JWSHeader; +import com.nimbusds.jwt.JWTClaimsSet; +import com.nimbusds.jwt.SignedJWT; +import io.restassured.http.ContentType; +import jakarta.ws.rs.core.HttpHeaders; +import org.eclipse.edc.connector.core.security.keyparsers.PemParser; +import org.eclipse.edc.connector.dataplane.spi.AccessTokenData; +import org.eclipse.edc.connector.dataplane.spi.store.AccessTokenDataStore; +import org.eclipse.edc.security.token.jwt.CryptoConverter; +import org.eclipse.edc.spi.iam.ClaimToken; +import org.eclipse.edc.spi.security.Vault; +import org.eclipse.edc.spi.types.domain.DataAddress; +import org.hamcrest.Matchers; import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; + +import java.security.Key; +import java.security.PrivateKey; +import java.util.Date; +import java.util.UUID; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.eclipse.edc.spi.CoreConstants.EDC_NAMESPACE; +import static org.mockito.Mockito.mock; public class DataPlanePublicApiEndToEndTest extends AbstractDataPlaneTest { + + public static final String PUBLIC_KEY_ALIAS = "public-key"; + public static final String PRIVATE_KEY_ALIAS = "1"; + // this is a data address representing the private backend for an HTTP pull transfer + public static final DataAddress BACKEND_API_DATAADDRESS = DataAddress.Builder.newInstance() + .type("HttpData") + .property(EDC_NAMESPACE + "baseUrl", "https://jsonplaceholder.typicode.com/todos") + .build(); + + @Test + void httpPull_missingToken_expect401() { + DATAPLANE.getDataPlanePublicEndpoint() + .baseRequest() + .contentType(ContentType.JSON) + /*.header(HttpHeaders.AUTHORIZATION, token) missing */ + .body(""" + { + "bar": "baz" + } + """) + .post("/v2/foo") + .then() + .statusCode(401) + .body(Matchers.containsString("Missing Authorization Header")); + } + + @Test + void httpPull_invalidToken_expect403() { + var token = "some-invalid-token"; + DATAPLANE.getDataPlanePublicEndpoint() + .baseRequest() + .contentType(ContentType.JSON) + .header(HttpHeaders.AUTHORIZATION, token) + .body(""" + { + "bar": "baz" + } + """) + .post("/v2/foo") + .then() + .statusCode(403); + } + + @ParameterizedTest(name = "Method = {0}") + @ValueSource(strings = { "GET", "POST", "PUT", "PATCH", "DELETE", "HEAD" }) + void request_expect200(String method) { + var token = createEdr(); + var body = DATAPLANE.getDataPlanePublicEndpoint() + .baseRequest() + .contentType(ContentType.JSON) + .header(HttpHeaders.AUTHORIZATION, token) + .request(method, "/v2/bar/baz") + .then() + .log().ifError() + .statusCode(200) + .extract().body().asString(); + assertThat(body).isNotNull(); + } + @Test - void foo() { - + void get_expect200() { + var token = createEdr(); + var body = DATAPLANE.getDataPlanePublicEndpoint() + .baseRequest() + .contentType(ContentType.JSON) + .header(HttpHeaders.AUTHORIZATION, token) + .get("/v2/foo") + .then() + .log().ifError() + .statusCode(200) + .extract().body().asString(); + assertThat(body).isNotNull(); + } + + private Key resolvePrivateKey() { + var privateKeyPem = runtime.getService(Vault.class).resolveSecret(PRIVATE_KEY_ALIAS); + return new PemParser(mock()).parse(privateKeyPem).orElseThrow(f -> new RuntimeException(f.getFailureDetail())); + } + + /** + * Creates and stores an EDR in the data plane. The serialized EDR (as serialized JWT) is returned. Token and the {@link AccessTokenData} + * stored in the data plane are correlated via the "jti" claim in the token. + * + * @return The EDR in the form of a serialized JWT. + */ + private String createEdr() { + var tokenId = UUID.randomUUID().toString(); + // create JWT representing the EDR + var jwt = createJwt(tokenId); + + // store the EDR + var accessTokenStore = runtime.getService(AccessTokenDataStore.class); + accessTokenStore.store(new AccessTokenData(tokenId, ClaimToken.Builder.newInstance().build(), BACKEND_API_DATAADDRESS)); + return jwt; } + + private String createJwt(String tokenId) { + + try { + var jwk = resolvePrivateKey(); + var header = new JWSHeader.Builder(JWSAlgorithm.RS256) + .keyID(PUBLIC_KEY_ALIAS).build(); + var claims = new JWTClaimsSet.Builder() + .issuer("me") + .subject("me") + .issueTime(new Date()) + .jwtID(tokenId) + .build(); + + var jwt = new SignedJWT(header, claims); + jwt.sign(CryptoConverter.createSignerFor((PrivateKey) jwk)); + return jwt.serialize(); + } catch (JOSEException e) { + throw new RuntimeException(e); + } + } + } diff --git a/system-tests/e2e-dataplane-tests/tests/src/test/java/org/eclipse/edc/test/e2e/DataPlaneSignalingApiEndToEndTest.java b/system-tests/e2e-dataplane-tests/tests/src/test/java/org/eclipse/edc/test/e2e/DataPlaneSignalingApiEndToEndTest.java index b58dc0588ef..5d8584e2706 100644 --- a/system-tests/e2e-dataplane-tests/tests/src/test/java/org/eclipse/edc/test/e2e/DataPlaneSignalingApiEndToEndTest.java +++ b/system-tests/e2e-dataplane-tests/tests/src/test/java/org/eclipse/edc/test/e2e/DataPlaneSignalingApiEndToEndTest.java @@ -37,6 +37,7 @@ import org.eclipse.edc.spi.types.domain.transfer.DataFlowStartMessage; import org.eclipse.edc.spi.types.domain.transfer.DataFlowTerminateMessage; import org.eclipse.edc.spi.types.domain.transfer.FlowType; +import org.hamcrest.Matchers; import org.jetbrains.annotations.NotNull; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.DisplayName; @@ -83,9 +84,17 @@ void startTransfer() throws JsonProcessingException { var processId = "test-processId"; var flowMessage = createStartMessage(processId); - var jo = registry.transform(flowMessage, JsonObject.class).orElseThrow(failTest()); + var startMessage = registry.transform(flowMessage, JsonObject.class).orElseThrow(failTest()); - var resultJson = DATAPLANE.initiateTransfer(jo); + var resultJson = DATAPLANE.getDataPlaneSignalingEndpoint() + .baseRequest() + .contentType(ContentType.JSON) + .body(startMessage) + .post("/v1/dataflows") + .then() + .body(Matchers.notNullValue()) + .statusCode(200) + .extract().body().asString(); var dataAddress = registry.transform(mapper.readValue(resultJson, JsonObject.class), DataAddress.class) .orElseThrow(failTest()); @@ -113,7 +122,7 @@ void getState() { .build(); runtime.getService(DataPlaneStore.class).save(flow); - var resultJson = DATAPLANE.getDataPlaneSignalingApi() + var resultJson = DATAPLANE.getDataPlaneSignalingEndpoint() .baseRequest() .contentType(ContentType.JSON) .get("/v1/dataflows/%s/state".formatted(dataFlowId)) @@ -145,7 +154,7 @@ void terminate() { .add(DATA_FLOW_TERMINATE_MESSAGE_REASON, "test-reason") .build(); - DATAPLANE.getDataPlaneSignalingApi() + DATAPLANE.getDataPlaneSignalingEndpoint() .baseRequest() .body(terminateMessage) .contentType(ContentType.JSON) diff --git a/system-tests/e2e-dataplane-tests/tests/src/test/java/org/eclipse/edc/test/e2e/participant/DataPlaneParticipant.java b/system-tests/e2e-dataplane-tests/tests/src/test/java/org/eclipse/edc/test/e2e/participant/DataPlaneParticipant.java index 714ce0cda07..749b13c89f2 100644 --- a/system-tests/e2e-dataplane-tests/tests/src/test/java/org/eclipse/edc/test/e2e/participant/DataPlaneParticipant.java +++ b/system-tests/e2e-dataplane-tests/tests/src/test/java/org/eclipse/edc/test/e2e/participant/DataPlaneParticipant.java @@ -15,10 +15,7 @@ package org.eclipse.edc.test.e2e.participant; import com.fasterxml.jackson.annotation.JsonCreator; -import io.restassured.http.ContentType; -import jakarta.json.JsonObject; import org.eclipse.edc.test.system.utils.Participant; -import org.hamcrest.Matchers; import org.jetbrains.annotations.NotNull; import java.net.URI; @@ -35,14 +32,17 @@ public class DataPlaneParticipant extends Participant { private final URI dataPlaneControl = URI.create("http://localhost:" + getFreePort() + "/control"); private final URI dataPlanePublic = URI.create("http://localhost:" + getFreePort() + "/public"); private final URI dataPlaneSignaling = URI.create("http://localhost:" + getFreePort() + "/api/signaling"); - private final Endpoint dataPlaneSignalingApi = new Endpoint(dataPlaneSignaling); private DataPlaneParticipant() { super(); } - public Endpoint getDataPlaneSignalingApi() { - return dataPlaneSignalingApi; + public Endpoint getDataPlaneSignalingEndpoint() { + return new Endpoint(dataPlaneSignaling); + } + + public Endpoint getDataPlanePublicEndpoint() { + return new Endpoint(dataPlanePublic); } public Map dataPlaneConfiguration() { @@ -66,20 +66,6 @@ public Map dataPlaneConfiguration() { }; } - /** - * Uses the data plane's control API to initiate a transfer - */ - public String initiateTransfer(JsonObject startMessage) { - return dataPlaneSignalingApi.baseRequest() - .contentType(ContentType.JSON) - .body(startMessage) - .post("/v1/dataflows") - .then() - .body(Matchers.notNullValue()) - .statusCode(200) - .extract().body().asString(); - } - @NotNull private String resourceAbsolutePath(String filename) { return System.getProperty("user.dir") + separator + "build" + separator + "resources" + separator + "test" + separator + filename; From 685cff6661fc6c5696a1ba2337c479b500e2b407 Mon Sep 17 00:00:00 2001 From: Paul Latzelsperger Date: Wed, 6 Mar 2024 12:31:14 +0100 Subject: [PATCH 5/7] deprecation warning --- .../api/DataPlanePublicApiExtension.java | 6 ++- .../api/controller/DataPlanePublicApi.java | 17 +++++--- .../DataPlanePublicApiController.java | 40 +++++++++++-------- .../DataPlanePublicApiControllerTest.java | 10 ++--- .../e2e/DataPlanePublicApiEndToEndTest.java | 15 ------- 5 files changed, 45 insertions(+), 43 deletions(-) diff --git a/extensions/data-plane/data-plane-public-api/src/main/java/org/eclipse/edc/connector/dataplane/api/DataPlanePublicApiExtension.java b/extensions/data-plane/data-plane-public-api/src/main/java/org/eclipse/edc/connector/dataplane/api/DataPlanePublicApiExtension.java index 75973c5a8df..8265c6645b6 100644 --- a/extensions/data-plane/data-plane-public-api/src/main/java/org/eclipse/edc/connector/dataplane/api/DataPlanePublicApiExtension.java +++ b/extensions/data-plane/data-plane-public-api/src/main/java/org/eclipse/edc/connector/dataplane/api/DataPlanePublicApiExtension.java @@ -93,7 +93,11 @@ public void initialize(ServiceExtensionContext context) { Executors.newFixedThreadPool(DEFAULT_THREAD_POOL), "Data plane proxy transfers" ); - var publicApiController = new DataPlanePublicApiController(pipelineService, dataAddressResolver, executorService); + var monitor = context.getMonitor().withPrefix("DataPlane Public API"); + var publicApiController = new DataPlanePublicApiController(pipelineService, dataAddressResolver, executorService, monitor); webService.registerResource(configuration.getContextAlias(), publicApiController); + + monitor.warning("This public API controller is scheduled for removal. Please consider upgrading your deployment " + + "to the data-plane-public-api-v2 module. The Data Plane Public API will then be available under at /v2/ prefix."); } } diff --git a/extensions/data-plane/data-plane-public-api/src/main/java/org/eclipse/edc/connector/dataplane/api/controller/DataPlanePublicApi.java b/extensions/data-plane/data-plane-public-api/src/main/java/org/eclipse/edc/connector/dataplane/api/controller/DataPlanePublicApi.java index 0f40b8a733f..a90f74be0f2 100644 --- a/extensions/data-plane/data-plane-public-api/src/main/java/org/eclipse/edc/connector/dataplane/api/controller/DataPlanePublicApi.java +++ b/extensions/data-plane/data-plane-public-api/src/main/java/org/eclipse/edc/connector/dataplane/api/controller/DataPlanePublicApi.java @@ -27,7 +27,6 @@ "data from the provider data source (e.g. backend Rest API, internal database...) through its Data Plane" + "instance. Thus the Data Plane is the only entry/output door for the data, which avoids the provider to expose" + "directly its data externally." + - "" + "The Data Plane public API being a proxy, it supports all verbs (i.e. GET, POST, PUT, PATCH, DELETE), which" + "can then conveyed until the data source is required. This is especially useful when the actual data source" + "is a Rest API itself." + @@ -40,8 +39,10 @@ public interface DataPlanePublicApi { @ApiResponse(responseCode = "400", description = "Missing access token"), @ApiResponse(responseCode = "403", description = "Access token is expired or invalid"), @ApiResponse(responseCode = "500", description = "Failed to transfer data") - } + }, + deprecated = true ) + @Deprecated(since = "0.5.1") void get(ContainerRequestContext context, AsyncResponse response); @Operation(description = "Send `POST` data query to the Data Plane.", @@ -49,8 +50,10 @@ public interface DataPlanePublicApi { @ApiResponse(responseCode = "400", description = "Missing access token"), @ApiResponse(responseCode = "403", description = "Access token is expired or invalid"), @ApiResponse(responseCode = "500", description = "Failed to transfer data") - } + }, + deprecated = true ) + @Deprecated(since = "0.5.1") void post(ContainerRequestContext context, AsyncResponse response); @Operation(description = "Send `PUT` data query to the Data Plane.", @@ -58,8 +61,10 @@ public interface DataPlanePublicApi { @ApiResponse(responseCode = "400", description = "Missing access token"), @ApiResponse(responseCode = "403", description = "Access token is expired or invalid"), @ApiResponse(responseCode = "500", description = "Failed to transfer data") - } + }, + deprecated = true ) + @Deprecated(since = "0.5.1") void put(ContainerRequestContext context, AsyncResponse response); @Operation(description = "Send `DELETE` data query to the Data Plane.", @@ -76,7 +81,9 @@ public interface DataPlanePublicApi { @ApiResponse(responseCode = "400", description = "Missing access token"), @ApiResponse(responseCode = "403", description = "Access token is expired or invalid"), @ApiResponse(responseCode = "500", description = "Failed to transfer data") - } + }, + deprecated = true ) + @Deprecated(since = "0.5.1") void patch(ContainerRequestContext context, AsyncResponse response); } diff --git a/extensions/data-plane/data-plane-public-api/src/main/java/org/eclipse/edc/connector/dataplane/api/controller/DataPlanePublicApiController.java b/extensions/data-plane/data-plane-public-api/src/main/java/org/eclipse/edc/connector/dataplane/api/controller/DataPlanePublicApiController.java index be9f033e049..64fe5effe91 100644 --- a/extensions/data-plane/data-plane-public-api/src/main/java/org/eclipse/edc/connector/dataplane/api/controller/DataPlanePublicApiController.java +++ b/extensions/data-plane/data-plane-public-api/src/main/java/org/eclipse/edc/connector/dataplane/api/controller/DataPlanePublicApiController.java @@ -34,6 +34,7 @@ import org.eclipse.edc.connector.dataplane.spi.resolver.DataAddressResolver; import org.eclipse.edc.connector.dataplane.spi.response.TransferErrorResponse; import org.eclipse.edc.connector.dataplane.util.sink.AsyncStreamingDataSink; +import org.eclipse.edc.spi.monitor.Monitor; import java.util.List; import java.util.concurrent.ExecutorService; @@ -53,15 +54,21 @@ public class DataPlanePublicApiController implements DataPlanePublicApi { private final DataAddressResolver dataAddressResolver; private final DataFlowRequestSupplier requestSupplier; private final ExecutorService executorService; + private final Monitor monitor; public DataPlanePublicApiController(PipelineService pipelineService, DataAddressResolver dataAddressResolver, - ExecutorService executorService) { + ExecutorService executorService, Monitor monitor) { this.pipelineService = pipelineService; this.dataAddressResolver = dataAddressResolver; + this.monitor = monitor; this.requestSupplier = new DataFlowRequestSupplier(); this.executorService = executorService; } + private static Response error(Response.Status status, String error) { + return status(status).type(APPLICATION_JSON).entity(new TransferErrorResponse(List.of(error))).build(); + } + @GET @Override public void get(@Context ContainerRequestContext requestContext, @Suspended AsyncResponse response) { @@ -69,54 +76,57 @@ public void get(@Context ContainerRequestContext requestContext, @Suspended Asyn } /** - * Sends a {@link DELETE} request to the data source and returns data. + * Sends a {@link POST} request to the data source and returns data. * * @param requestContext Request context. * @param response Data fetched from the data source. */ - @DELETE + @POST @Override - public void delete(@Context ContainerRequestContext requestContext, @Suspended AsyncResponse response) { + public void post(@Context ContainerRequestContext requestContext, @Suspended AsyncResponse response) { handle(requestContext, response); } /** - * Sends a {@link PATCH} request to the data source and returns data. + * Sends a {@link PUT} request to the data source and returns data. * * @param requestContext Request context. * @param response Data fetched from the data source. */ - @PATCH + @PUT @Override - public void patch(@Context ContainerRequestContext requestContext, @Suspended AsyncResponse response) { + public void put(@Context ContainerRequestContext requestContext, @Suspended AsyncResponse response) { handle(requestContext, response); } /** - * Sends a {@link PUT} request to the data source and returns data. + * Sends a {@link DELETE} request to the data source and returns data. * * @param requestContext Request context. * @param response Data fetched from the data source. */ - @PUT + @DELETE @Override - public void put(@Context ContainerRequestContext requestContext, @Suspended AsyncResponse response) { + public void delete(@Context ContainerRequestContext requestContext, @Suspended AsyncResponse response) { handle(requestContext, response); } /** - * Sends a {@link POST} request to the data source and returns data. + * Sends a {@link PATCH} request to the data source and returns data. * * @param requestContext Request context. * @param response Data fetched from the data source. */ - @POST + @PATCH @Override - public void post(@Context ContainerRequestContext requestContext, @Suspended AsyncResponse response) { + public void patch(@Context ContainerRequestContext requestContext, @Suspended AsyncResponse response) { handle(requestContext, response); } private void handle(ContainerRequestContext context, AsyncResponse response) { + + monitor.warning("The DataPlane Public API is deprecated. Please consider upgrading to the /v2/ path. Your request will then be: %s" + .formatted(context.getUriInfo().getBaseUri() + "v2/" + context.getUriInfo().getPath())); var contextApi = new ContainerRequestContextApiImpl(context); var token = contextApi.headers().get(HttpHeaders.AUTHORIZATION); if (token == null) { @@ -154,8 +164,4 @@ private void handle(ContainerRequestContext context, AsyncResponse response) { }); } - private static Response error(Response.Status status, String error) { - return status(status).type(APPLICATION_JSON).entity(new TransferErrorResponse(List.of(error))).build(); - } - } diff --git a/extensions/data-plane/data-plane-public-api/src/test/java/org/eclipse/edc/connector/dataplane/api/controller/DataPlanePublicApiControllerTest.java b/extensions/data-plane/data-plane-public-api/src/test/java/org/eclipse/edc/connector/dataplane/api/controller/DataPlanePublicApiControllerTest.java index 279b81b6c0e..dd33ec267da 100644 --- a/extensions/data-plane/data-plane-public-api/src/test/java/org/eclipse/edc/connector/dataplane/api/controller/DataPlanePublicApiControllerTest.java +++ b/extensions/data-plane/data-plane-public-api/src/test/java/org/eclipse/edc/connector/dataplane/api/controller/DataPlanePublicApiControllerTest.java @@ -152,6 +152,11 @@ void shouldStreamSourceToResponse() { assertThat(request.getProperties()).containsEntry("method", "POST").containsEntry("pathSegments", "any").containsEntry("queryParams", "foo=bar"); } + @Override + protected Object controller() { + return new DataPlanePublicApiController(pipelineService, dataAddressResolver, Executors.newSingleThreadExecutor(), mock()); + } + private RequestSpecification baseRequest() { return given() .baseUri("http://localhost:" + port) @@ -162,11 +167,6 @@ private DataAddress testDestAddress() { return DataAddress.Builder.newInstance().type("test").build(); } - @Override - protected Object controller() { - return new DataPlanePublicApiController(pipelineService, dataAddressResolver, Executors.newSingleThreadExecutor()); - } - private record TestDataSource(String mediaType, String data) implements DataSource, DataSource.Part { @Override diff --git a/system-tests/e2e-dataplane-tests/tests/src/test/java/org/eclipse/edc/test/e2e/DataPlanePublicApiEndToEndTest.java b/system-tests/e2e-dataplane-tests/tests/src/test/java/org/eclipse/edc/test/e2e/DataPlanePublicApiEndToEndTest.java index b0c1d8ac968..3d61049c356 100644 --- a/system-tests/e2e-dataplane-tests/tests/src/test/java/org/eclipse/edc/test/e2e/DataPlanePublicApiEndToEndTest.java +++ b/system-tests/e2e-dataplane-tests/tests/src/test/java/org/eclipse/edc/test/e2e/DataPlanePublicApiEndToEndTest.java @@ -102,21 +102,6 @@ void request_expect200(String method) { assertThat(body).isNotNull(); } - @Test - void get_expect200() { - var token = createEdr(); - var body = DATAPLANE.getDataPlanePublicEndpoint() - .baseRequest() - .contentType(ContentType.JSON) - .header(HttpHeaders.AUTHORIZATION, token) - .get("/v2/foo") - .then() - .log().ifError() - .statusCode(200) - .extract().body().asString(); - assertThat(body).isNotNull(); - } - private Key resolvePrivateKey() { var privateKeyPem = runtime.getService(Vault.class).resolveSecret(PRIVATE_KEY_ALIAS); return new PemParser(mock()).parse(privateKeyPem).orElseThrow(f -> new RuntimeException(f.getFailureDetail())); From 3e8e9cc413bac54d4f627db22a0806d677afd215 Mon Sep 17 00:00:00 2001 From: Paul Latzelsperger Date: Wed, 6 Mar 2024 13:02:52 +0100 Subject: [PATCH 6/7] fix test --- .../api/controller/DataPlanePublicApiV2ControllerTest.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/extensions/data-plane/data-plane-public-api-v2/src/test/java/org/eclipse/edc/connector/dataplane/api/controller/DataPlanePublicApiV2ControllerTest.java b/extensions/data-plane/data-plane-public-api-v2/src/test/java/org/eclipse/edc/connector/dataplane/api/controller/DataPlanePublicApiV2ControllerTest.java index 5c9ebe63ea9..c8d2bfdd92a 100644 --- a/extensions/data-plane/data-plane-public-api-v2/src/test/java/org/eclipse/edc/connector/dataplane/api/controller/DataPlanePublicApiV2ControllerTest.java +++ b/extensions/data-plane/data-plane-public-api-v2/src/test/java/org/eclipse/edc/connector/dataplane/api/controller/DataPlanePublicApiV2ControllerTest.java @@ -161,7 +161,7 @@ void shouldStreamSourceToResponse() { var request = requestCaptor.getValue(); assertThat(request.getDestinationDataAddress().getType()).isEqualTo(AsyncStreamingDataSink.TYPE); assertThat(request.getSourceDataAddress().getType()).isEqualTo("test"); - assertThat(request.getProperties()).containsEntry("method", "POST").containsEntry("pathSegments", "any").containsEntry("queryParams", "foo=bar"); + assertThat(request.getProperties()).containsEntry("method", "POST").containsEntry("pathSegments", "v2/any").containsEntry("queryParams", "foo=bar"); } @Override @@ -171,7 +171,7 @@ protected Object controller() { private RequestSpecification baseRequest() { return given() - .baseUri("http://localhost:" + port) + .baseUri("http://localhost:" + port + "/v2") .when(); } From d6f1ad93d1548a0990bd76c7535d5ef07041aac8 Mon Sep 17 00:00:00 2001 From: Paul Latzelsperger Date: Wed, 6 Mar 2024 13:33:14 +0100 Subject: [PATCH 7/7] fix copyright --- .../dataplane/api/DataPlanePublicApiExtension.java | 5 ++--- .../api/controller/DataPlanePublicApiV2Controller.java | 6 ++---- 2 files changed, 4 insertions(+), 7 deletions(-) diff --git a/extensions/data-plane/data-plane-public-api-v2/src/main/java/org/eclipse/edc/connector/dataplane/api/DataPlanePublicApiExtension.java b/extensions/data-plane/data-plane-public-api-v2/src/main/java/org/eclipse/edc/connector/dataplane/api/DataPlanePublicApiExtension.java index d4c1909c187..e8521ac71c8 100644 --- a/extensions/data-plane/data-plane-public-api-v2/src/main/java/org/eclipse/edc/connector/dataplane/api/DataPlanePublicApiExtension.java +++ b/extensions/data-plane/data-plane-public-api-v2/src/main/java/org/eclipse/edc/connector/dataplane/api/DataPlanePublicApiExtension.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2022 Microsoft Corporation + * Copyright (c) 2024 Bayerische Motoren Werke Aktiengesellschaft (BMW AG) * * This program and the accompanying materials are made available under the * terms of the Apache License, Version 2.0 which is available at @@ -8,8 +8,7 @@ * SPDX-License-Identifier: Apache-2.0 * * Contributors: - * Microsoft Corporation - initial API and implementation - * Mercedes-Benz Tech Innovation GmbH - publish public api context into dedicated swagger hub page + * Bayerische Motoren Werke Aktiengesellschaft (BMW AG) - initial API and implementation * */ diff --git a/extensions/data-plane/data-plane-public-api-v2/src/main/java/org/eclipse/edc/connector/dataplane/api/controller/DataPlanePublicApiV2Controller.java b/extensions/data-plane/data-plane-public-api-v2/src/main/java/org/eclipse/edc/connector/dataplane/api/controller/DataPlanePublicApiV2Controller.java index a8a4aaf1d20..b8f1770c6ae 100644 --- a/extensions/data-plane/data-plane-public-api-v2/src/main/java/org/eclipse/edc/connector/dataplane/api/controller/DataPlanePublicApiV2Controller.java +++ b/extensions/data-plane/data-plane-public-api-v2/src/main/java/org/eclipse/edc/connector/dataplane/api/controller/DataPlanePublicApiV2Controller.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2022 Amadeus + * Copyright (c) 2024 Bayerische Motoren Werke Aktiengesellschaft (BMW AG) * * This program and the accompanying materials are made available under the * terms of the Apache License, Version 2.0 which is available at @@ -8,9 +8,7 @@ * SPDX-License-Identifier: Apache-2.0 * * Contributors: - * Amadeus - initial API and implementation - * Bayerische Motoren Werke Aktiengesellschaft (BMW AG) - improvements - * Mercedes-Benz Tech Innovation GmbH - publish public api context into dedicated swagger hub page + * Bayerische Motoren Werke Aktiengesellschaft (BMW AG) - initial API and implementation * */