From d40363b9ab9cd30f8c813e8a31a194ccf743ebf7 Mon Sep 17 00:00:00 2001 From: Enrico Risa Date: Thu, 29 Feb 2024 17:18:09 +0100 Subject: [PATCH] feat: add trasformers from/to for DataFlowStartMessage --- .../DataPlaneAuthorizationServiceImpl.java | 4 +- ...DataPlaneAuthorizationServiceImplTest.java | 3 +- ...onsumerPullTransferDataFlowController.java | 2 +- ...roviderPushTransferDataFlowController.java | 6 +- ...ctFromDataFlowStartMessageTransformer.java | 68 ++++++++++++++ ...jectToDataFlowStartMessageTransformer.java | 82 ++++++++++++++++ ...omDataFlowStartMessageTransformerTest.java | 86 +++++++++++++++++ ...ToDataFlowStartMessageTransformerTest.java | 94 +++++++++++++++++++ .../domain/transfer/DataFlowStartMessage.java | 24 +++-- .../spi/types/domain/transfer}/FlowType.java | 2 +- .../transfer/DataFlowStartMessageTest.java | 4 +- .../edc/test/e2e/DataplaneEndToEndTest.java | 4 +- 12 files changed, 358 insertions(+), 21 deletions(-) create mode 100644 extensions/data-plane/data-plane-signaling/data-plane-signaling-transform/src/main/java/org/eclipse/edc/connector/api/signaling/transform/from/JsonObjectFromDataFlowStartMessageTransformer.java create mode 100644 extensions/data-plane/data-plane-signaling/data-plane-signaling-transform/src/main/java/org/eclipse/edc/connector/api/signaling/transform/to/JsonObjectToDataFlowStartMessageTransformer.java create mode 100644 extensions/data-plane/data-plane-signaling/data-plane-signaling-transform/src/test/java/org/eclipse/edc/connector/api/signaling/transform/from/JsonObjectFromDataFlowStartMessageTransformerTest.java create mode 100644 extensions/data-plane/data-plane-signaling/data-plane-signaling-transform/src/test/java/org/eclipse/edc/connector/api/signaling/transform/to/JsonObjectToDataFlowStartMessageTransformerTest.java rename spi/{control-plane/transfer-spi/src/main/java/org/eclipse/edc/connector/transfer/spi/flow => common/core-spi/src/main/java/org/eclipse/edc/spi/types/domain/transfer}/FlowType.java (92%) diff --git a/core/data-plane/data-plane-core/src/main/java/org/eclipse/edc/connector/dataplane/framework/iam/DataPlaneAuthorizationServiceImpl.java b/core/data-plane/data-plane-core/src/main/java/org/eclipse/edc/connector/dataplane/framework/iam/DataPlaneAuthorizationServiceImpl.java index 2b23c93cb13..c0bd5c56133 100644 --- a/core/data-plane/data-plane-core/src/main/java/org/eclipse/edc/connector/dataplane/framework/iam/DataPlaneAuthorizationServiceImpl.java +++ b/core/data-plane/data-plane-core/src/main/java/org/eclipse/edc/connector/dataplane/framework/iam/DataPlaneAuthorizationServiceImpl.java @@ -36,7 +36,7 @@ public class DataPlaneAuthorizationServiceImpl implements DataPlaneAuthorization public static final String CLAIM_AGREEMENT_ID = "agreement_id"; public static final String CLAIM_ASSET_ID = "asset_id"; public static final String CLAIM_PROCESS_ID = "process_id"; - public static final String CLAIM_TRANSFER_TYPE = "transfer_type"; + public static final String CLAIM_FLOW_TYPE = "flow_type"; private final DataPlaneAccessTokenService accessTokenService; private final PublicEndpointGeneratorService endpointGenerator; private final DataPlaneAccessControlService accessControlService; @@ -94,7 +94,7 @@ private TokenParameters createTokenParams(DataFlowStartMessage message) { .claims(CLAIM_AGREEMENT_ID, message.getAgreementId()) .claims(CLAIM_ASSET_ID, message.getAssetId()) .claims(CLAIM_PROCESS_ID, message.getProcessId()) - .claims(CLAIM_TRANSFER_TYPE, message.getTransferType()) + .claims(CLAIM_FLOW_TYPE, message.getFlowType().toString()) .build(); } diff --git a/core/data-plane/data-plane-core/src/test/java/org/eclipse/edc/connector/dataplane/framework/iam/DataPlaneAuthorizationServiceImplTest.java b/core/data-plane/data-plane-core/src/test/java/org/eclipse/edc/connector/dataplane/framework/iam/DataPlaneAuthorizationServiceImplTest.java index 407fc0f014f..36c981a41f4 100644 --- a/core/data-plane/data-plane-core/src/test/java/org/eclipse/edc/connector/dataplane/framework/iam/DataPlaneAuthorizationServiceImplTest.java +++ b/core/data-plane/data-plane-core/src/test/java/org/eclipse/edc/connector/dataplane/framework/iam/DataPlaneAuthorizationServiceImplTest.java @@ -24,6 +24,7 @@ import org.eclipse.edc.spi.result.Result; import org.eclipse.edc.spi.types.domain.DataAddress; import org.eclipse.edc.spi.types.domain.transfer.DataFlowStartMessage; +import org.eclipse.edc.spi.types.domain.transfer.FlowType; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.mockito.ArgumentMatchers; @@ -156,7 +157,7 @@ void authorize_accessNotGranted() { private DataFlowStartMessage.Builder createStartMessage() { return DataFlowStartMessage.Builder.newInstance() .processId("test-processid") - .transferType("test-transfer-type") + .flowType(FlowType.PULL) .agreementId("test-agreementid") .participantId("test-participantid") .assetId("test-assetid") diff --git a/extensions/control-plane/transfer/transfer-data-plane/src/main/java/org/eclipse/edc/connector/transfer/dataplane/flow/ConsumerPullTransferDataFlowController.java b/extensions/control-plane/transfer/transfer-data-plane/src/main/java/org/eclipse/edc/connector/transfer/dataplane/flow/ConsumerPullTransferDataFlowController.java index 1b02d666db8..7d69fad84b8 100644 --- a/extensions/control-plane/transfer/transfer-data-plane/src/main/java/org/eclipse/edc/connector/transfer/dataplane/flow/ConsumerPullTransferDataFlowController.java +++ b/extensions/control-plane/transfer/transfer-data-plane/src/main/java/org/eclipse/edc/connector/transfer/dataplane/flow/ConsumerPullTransferDataFlowController.java @@ -31,9 +31,9 @@ import static java.lang.String.format; import static org.eclipse.edc.connector.transfer.dataplane.spi.TransferDataPlaneConstants.HTTP_PROXY; -import static org.eclipse.edc.connector.transfer.spi.flow.FlowType.PULL; import static org.eclipse.edc.spi.response.ResponseStatus.FATAL_ERROR; import static org.eclipse.edc.spi.response.StatusResult.failure; +import static org.eclipse.edc.spi.types.domain.transfer.FlowType.PULL; public class ConsumerPullTransferDataFlowController implements DataFlowController { diff --git a/extensions/control-plane/transfer/transfer-data-plane/src/main/java/org/eclipse/edc/connector/transfer/dataplane/flow/ProviderPushTransferDataFlowController.java b/extensions/control-plane/transfer/transfer-data-plane/src/main/java/org/eclipse/edc/connector/transfer/dataplane/flow/ProviderPushTransferDataFlowController.java index 731a9e7c902..ff69e6f1f17 100644 --- a/extensions/control-plane/transfer/transfer-data-plane/src/main/java/org/eclipse/edc/connector/transfer/dataplane/flow/ProviderPushTransferDataFlowController.java +++ b/extensions/control-plane/transfer/transfer-data-plane/src/main/java/org/eclipse/edc/connector/transfer/dataplane/flow/ProviderPushTransferDataFlowController.java @@ -36,8 +36,8 @@ import static java.util.stream.Collectors.toSet; import static org.eclipse.edc.connector.transfer.dataplane.spi.TransferDataPlaneConstants.HTTP_PROXY; -import static org.eclipse.edc.connector.transfer.spi.flow.FlowType.PULL; -import static org.eclipse.edc.connector.transfer.spi.flow.FlowType.PUSH; +import static org.eclipse.edc.spi.types.domain.transfer.FlowType.PULL; +import static org.eclipse.edc.spi.types.domain.transfer.FlowType.PUSH; public class ProviderPushTransferDataFlowController implements DataFlowController { @@ -68,7 +68,7 @@ public boolean canHandle(TransferProcess transferProcess) { .processId(transferProcess.getId()) .sourceDataAddress(transferProcess.getContentDataAddress()) .destinationDataAddress(transferProcess.getDataDestination()) - .transferType(transferProcess.getTransferType()) + .flowType(PUSH) .callbackAddress(callbackUrl != null ? callbackUrl.get() : null) .build(); diff --git a/extensions/data-plane/data-plane-signaling/data-plane-signaling-transform/src/main/java/org/eclipse/edc/connector/api/signaling/transform/from/JsonObjectFromDataFlowStartMessageTransformer.java b/extensions/data-plane/data-plane-signaling/data-plane-signaling-transform/src/main/java/org/eclipse/edc/connector/api/signaling/transform/from/JsonObjectFromDataFlowStartMessageTransformer.java new file mode 100644 index 00000000000..4c74f83e776 --- /dev/null +++ b/extensions/data-plane/data-plane-signaling/data-plane-signaling-transform/src/main/java/org/eclipse/edc/connector/api/signaling/transform/from/JsonObjectFromDataFlowStartMessageTransformer.java @@ -0,0 +1,68 @@ +/* + * Copyright (c) 2024 Bayerische Motoren Werke Aktiengesellschaft (BMW AG) + * + * This program and the accompanying materials are made available under the + * terms of the Apache License, Version 2.0 which is available at + * https://www.apache.org/licenses/LICENSE-2.0 + * + * SPDX-License-Identifier: Apache-2.0 + * + * Contributors: + * Bayerische Motoren Werke Aktiengesellschaft (BMW AG) - initial API and implementation + * + */ + +package org.eclipse.edc.connector.api.signaling.transform.from; + +import com.fasterxml.jackson.databind.ObjectMapper; +import jakarta.json.JsonBuilderFactory; +import jakarta.json.JsonObject; +import org.eclipse.edc.jsonld.spi.transformer.AbstractJsonLdTransformer; +import org.eclipse.edc.spi.types.domain.transfer.DataFlowStartMessage; +import org.eclipse.edc.transform.spi.TransformerContext; +import org.jetbrains.annotations.NotNull; +import org.jetbrains.annotations.Nullable; + +import static org.eclipse.edc.jsonld.spi.JsonLdKeywords.TYPE; +import static org.eclipse.edc.spi.types.domain.transfer.DataFlowStartMessage.DC_DATA_FLOW_START_MESSAGE_PROCESS_ID; +import static org.eclipse.edc.spi.types.domain.transfer.DataFlowStartMessage.EDC_DATA_FLOW_START_MESSAGE_AGREEMENT_ID; +import static org.eclipse.edc.spi.types.domain.transfer.DataFlowStartMessage.EDC_DATA_FLOW_START_MESSAGE_DATASET_ID; +import static org.eclipse.edc.spi.types.domain.transfer.DataFlowStartMessage.EDC_DATA_FLOW_START_MESSAGE_DESTINATION_CALLBACK_ADDRESS; +import static org.eclipse.edc.spi.types.domain.transfer.DataFlowStartMessage.EDC_DATA_FLOW_START_MESSAGE_DESTINATION_DATA_ADDRESS; +import static org.eclipse.edc.spi.types.domain.transfer.DataFlowStartMessage.EDC_DATA_FLOW_START_MESSAGE_FLOW_TYPE; +import static org.eclipse.edc.spi.types.domain.transfer.DataFlowStartMessage.EDC_DATA_FLOW_START_MESSAGE_PARTICIPANT_ID; +import static org.eclipse.edc.spi.types.domain.transfer.DataFlowStartMessage.EDC_DATA_FLOW_START_MESSAGE_PROPERTIES; +import static org.eclipse.edc.spi.types.domain.transfer.DataFlowStartMessage.EDC_DATA_FLOW_START_MESSAGE_SOURCE_DATA_ADDRESS; +import static org.eclipse.edc.spi.types.domain.transfer.DataFlowStartMessage.EDC_DATA_FLOW_START_MESSAGE_TYPE; + +/** + * Converts from a {@link DataFlowStartMessage} to a {@link JsonObject} in JSON-LD expanded form . + */ +public class JsonObjectFromDataFlowStartMessageTransformer extends AbstractJsonLdTransformer { + private final JsonBuilderFactory jsonFactory; + private final ObjectMapper mapper; + + public JsonObjectFromDataFlowStartMessageTransformer(JsonBuilderFactory jsonFactory, ObjectMapper mapper) { + super(DataFlowStartMessage.class, JsonObject.class); + this.jsonFactory = jsonFactory; + this.mapper = mapper; + } + + @Override + public @Nullable JsonObject transform(@NotNull DataFlowStartMessage message, @NotNull TransformerContext context) { + var propertiesBuilder = jsonFactory.createObjectBuilder(); + transformProperties(message.getProperties(), propertiesBuilder, mapper, context); + return jsonFactory.createObjectBuilder() + .add(TYPE, EDC_DATA_FLOW_START_MESSAGE_TYPE) + .add(EDC_DATA_FLOW_START_MESSAGE_FLOW_TYPE, message.getFlowType().toString()) + .add(EDC_DATA_FLOW_START_MESSAGE_AGREEMENT_ID, message.getAgreementId()) + .add(DC_DATA_FLOW_START_MESSAGE_PROCESS_ID, message.getProcessId()) + .add(EDC_DATA_FLOW_START_MESSAGE_DATASET_ID, message.getAssetId()) + .add(EDC_DATA_FLOW_START_MESSAGE_PROPERTIES, propertiesBuilder) + .add(EDC_DATA_FLOW_START_MESSAGE_DESTINATION_CALLBACK_ADDRESS, message.getCallbackAddress().toString()) + .add(EDC_DATA_FLOW_START_MESSAGE_DESTINATION_DATA_ADDRESS, context.transform(message.getDestinationDataAddress(), JsonObject.class)) + .add(EDC_DATA_FLOW_START_MESSAGE_SOURCE_DATA_ADDRESS, context.transform(message.getSourceDataAddress(), JsonObject.class)) + .add(EDC_DATA_FLOW_START_MESSAGE_PARTICIPANT_ID, message.getParticipantId()) + .build(); + } +} diff --git a/extensions/data-plane/data-plane-signaling/data-plane-signaling-transform/src/main/java/org/eclipse/edc/connector/api/signaling/transform/to/JsonObjectToDataFlowStartMessageTransformer.java b/extensions/data-plane/data-plane-signaling/data-plane-signaling-transform/src/main/java/org/eclipse/edc/connector/api/signaling/transform/to/JsonObjectToDataFlowStartMessageTransformer.java new file mode 100644 index 00000000000..f982a8d2548 --- /dev/null +++ b/extensions/data-plane/data-plane-signaling/data-plane-signaling-transform/src/main/java/org/eclipse/edc/connector/api/signaling/transform/to/JsonObjectToDataFlowStartMessageTransformer.java @@ -0,0 +1,82 @@ +/* + * Copyright (c) 2024 Bayerische Motoren Werke Aktiengesellschaft (BMW AG) + * + * This program and the accompanying materials are made available under the + * terms of the Apache License, Version 2.0 which is available at + * https://www.apache.org/licenses/LICENSE-2.0 + * + * SPDX-License-Identifier: Apache-2.0 + * + * Contributors: + * Bayerische Motoren Werke Aktiengesellschaft (BMW AG) - initial API and implementation + * + */ + +package org.eclipse.edc.connector.api.signaling.transform.to; + +import jakarta.json.JsonObject; +import jakarta.json.JsonValue; +import org.eclipse.edc.jsonld.spi.transformer.AbstractJsonLdTransformer; +import org.eclipse.edc.spi.types.domain.DataAddress; +import org.eclipse.edc.spi.types.domain.transfer.DataFlowStartMessage; +import org.eclipse.edc.spi.types.domain.transfer.FlowType; +import org.eclipse.edc.transform.spi.TransformerContext; +import org.jetbrains.annotations.NotNull; +import org.jetbrains.annotations.Nullable; + +import java.net.URI; +import java.util.Optional; + +import static org.eclipse.edc.spi.types.domain.transfer.DataFlowStartMessage.Builder; +import static org.eclipse.edc.spi.types.domain.transfer.DataFlowStartMessage.DC_DATA_FLOW_START_MESSAGE_PROCESS_ID; +import static org.eclipse.edc.spi.types.domain.transfer.DataFlowStartMessage.EDC_DATA_FLOW_START_MESSAGE_AGREEMENT_ID; +import static org.eclipse.edc.spi.types.domain.transfer.DataFlowStartMessage.EDC_DATA_FLOW_START_MESSAGE_DATASET_ID; +import static org.eclipse.edc.spi.types.domain.transfer.DataFlowStartMessage.EDC_DATA_FLOW_START_MESSAGE_DESTINATION_CALLBACK_ADDRESS; +import static org.eclipse.edc.spi.types.domain.transfer.DataFlowStartMessage.EDC_DATA_FLOW_START_MESSAGE_DESTINATION_DATA_ADDRESS; +import static org.eclipse.edc.spi.types.domain.transfer.DataFlowStartMessage.EDC_DATA_FLOW_START_MESSAGE_FLOW_TYPE; +import static org.eclipse.edc.spi.types.domain.transfer.DataFlowStartMessage.EDC_DATA_FLOW_START_MESSAGE_PARTICIPANT_ID; +import static org.eclipse.edc.spi.types.domain.transfer.DataFlowStartMessage.EDC_DATA_FLOW_START_MESSAGE_PROPERTIES; +import static org.eclipse.edc.spi.types.domain.transfer.DataFlowStartMessage.EDC_DATA_FLOW_START_MESSAGE_SOURCE_DATA_ADDRESS; + +/** + * Converts from a {@link JsonObject} in JSON-LD expanded form to a {@link DataFlowStartMessage}. + */ +public class JsonObjectToDataFlowStartMessageTransformer extends AbstractJsonLdTransformer { + + public JsonObjectToDataFlowStartMessageTransformer() { + super(JsonObject.class, DataFlowStartMessage.class); + } + + @Override + public @Nullable DataFlowStartMessage transform(@NotNull JsonObject object, @NotNull TransformerContext context) { + var builder = Builder.newInstance(); + visitProperties(object, (s, jsonValue) -> transformProperties(s, jsonValue, builder, context)); + return builder.build(); + } + + private void transformProperties(String key, JsonValue jsonValue, Builder builder, TransformerContext context) { + switch (key) { + case DC_DATA_FLOW_START_MESSAGE_PROCESS_ID -> builder.processId(transformString(jsonValue, context)); + case EDC_DATA_FLOW_START_MESSAGE_AGREEMENT_ID -> builder.agreementId(transformString(jsonValue, context)); + case EDC_DATA_FLOW_START_MESSAGE_DATASET_ID -> builder.assetId(transformString(jsonValue, context)); + case EDC_DATA_FLOW_START_MESSAGE_DESTINATION_CALLBACK_ADDRESS -> + Optional.ofNullable(transformString(jsonValue, context)).map(URI::create).ifPresent(builder::callbackAddress); + case EDC_DATA_FLOW_START_MESSAGE_PROPERTIES -> { + var props = jsonValue.asJsonArray().getJsonObject(0); + visitProperties(props, (k, val) -> transformProperties(k, val, builder, context)); + } + case EDC_DATA_FLOW_START_MESSAGE_PARTICIPANT_ID -> + builder.participantId(transformString(jsonValue, context)); + case EDC_DATA_FLOW_START_MESSAGE_FLOW_TYPE -> + builder.flowType(FlowType.valueOf(transformString(jsonValue, context))); + + case EDC_DATA_FLOW_START_MESSAGE_DESTINATION_DATA_ADDRESS -> + builder.destinationDataAddress(transformObject(jsonValue, DataAddress.class, context)); + + case EDC_DATA_FLOW_START_MESSAGE_SOURCE_DATA_ADDRESS -> + builder.sourceDataAddress(transformObject(jsonValue, DataAddress.class, context)); + + default -> builder.property(key, transformString(jsonValue, context)); + } + } +} diff --git a/extensions/data-plane/data-plane-signaling/data-plane-signaling-transform/src/test/java/org/eclipse/edc/connector/api/signaling/transform/from/JsonObjectFromDataFlowStartMessageTransformerTest.java b/extensions/data-plane/data-plane-signaling/data-plane-signaling-transform/src/test/java/org/eclipse/edc/connector/api/signaling/transform/from/JsonObjectFromDataFlowStartMessageTransformerTest.java new file mode 100644 index 00000000000..cb5ac002a2c --- /dev/null +++ b/extensions/data-plane/data-plane-signaling/data-plane-signaling-transform/src/test/java/org/eclipse/edc/connector/api/signaling/transform/from/JsonObjectFromDataFlowStartMessageTransformerTest.java @@ -0,0 +1,86 @@ +/* + * Copyright (c) 2024 Bayerische Motoren Werke Aktiengesellschaft (BMW AG) + * + * This program and the accompanying materials are made available under the + * terms of the Apache License, Version 2.0 which is available at + * https://www.apache.org/licenses/LICENSE-2.0 + * + * SPDX-License-Identifier: Apache-2.0 + * + * Contributors: + * Bayerische Motoren Werke Aktiengesellschaft (BMW AG) - initial API and implementation + * + */ + +package org.eclipse.edc.connector.api.signaling.transform.from; + +import jakarta.json.Json; +import org.eclipse.edc.spi.types.domain.DataAddress; +import org.eclipse.edc.spi.types.domain.transfer.DataFlowStartMessage; +import org.eclipse.edc.spi.types.domain.transfer.FlowType; +import org.eclipse.edc.transform.spi.TransformerContext; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import java.net.URI; +import java.util.Map; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.eclipse.edc.jsonld.spi.JsonLdKeywords.TYPE; +import static org.eclipse.edc.jsonld.util.JacksonJsonLd.createObjectMapper; +import static org.eclipse.edc.spi.types.domain.transfer.DataFlowStartMessage.DC_DATA_FLOW_START_MESSAGE_PROCESS_ID; +import static org.eclipse.edc.spi.types.domain.transfer.DataFlowStartMessage.EDC_DATA_FLOW_START_MESSAGE_AGREEMENT_ID; +import static org.eclipse.edc.spi.types.domain.transfer.DataFlowStartMessage.EDC_DATA_FLOW_START_MESSAGE_DATASET_ID; +import static org.eclipse.edc.spi.types.domain.transfer.DataFlowStartMessage.EDC_DATA_FLOW_START_MESSAGE_DESTINATION_CALLBACK_ADDRESS; +import static org.eclipse.edc.spi.types.domain.transfer.DataFlowStartMessage.EDC_DATA_FLOW_START_MESSAGE_DESTINATION_DATA_ADDRESS; +import static org.eclipse.edc.spi.types.domain.transfer.DataFlowStartMessage.EDC_DATA_FLOW_START_MESSAGE_FLOW_TYPE; +import static org.eclipse.edc.spi.types.domain.transfer.DataFlowStartMessage.EDC_DATA_FLOW_START_MESSAGE_PARTICIPANT_ID; +import static org.eclipse.edc.spi.types.domain.transfer.DataFlowStartMessage.EDC_DATA_FLOW_START_MESSAGE_SOURCE_DATA_ADDRESS; +import static org.eclipse.edc.spi.types.domain.transfer.DataFlowStartMessage.EDC_DATA_FLOW_START_MESSAGE_TYPE; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.isA; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +class JsonObjectFromDataFlowStartMessageTransformerTest { + + private final TransformerContext context = mock(TransformerContext.class); + private JsonObjectFromDataFlowStartMessageTransformer transformer; + + @BeforeEach + void setUp() { + transformer = new JsonObjectFromDataFlowStartMessageTransformer(Json.createBuilderFactory(Map.of()), createObjectMapper()); + when(context.transform(isA(DataAddress.class), any())).thenReturn(Json.createObjectBuilder().build()); + } + + @Test + void transform() { + + var message = DataFlowStartMessage.Builder.newInstance() + .processId("processId") + .assetId("assetId") + .agreementId("agreementId") + .participantId("participantId") + .flowType(FlowType.PUSH) + .callbackAddress(URI.create("http://localhost")) + .sourceDataAddress(DataAddress.Builder.newInstance().type("sourceType").build()) + .destinationDataAddress(DataAddress.Builder.newInstance().type("destType").build()) + .build(); + + var jsonObject = transformer.transform(message, context); + + assertThat(jsonObject).isNotNull(); + + assertThat(jsonObject.getJsonString(TYPE).getString()).isEqualTo(EDC_DATA_FLOW_START_MESSAGE_TYPE); + assertThat(jsonObject.getJsonString(DC_DATA_FLOW_START_MESSAGE_PROCESS_ID).getString()).isEqualTo(message.getProcessId()); + assertThat(jsonObject.getJsonString(EDC_DATA_FLOW_START_MESSAGE_DATASET_ID).getString()).isEqualTo(message.getAssetId()); + assertThat(jsonObject.getJsonString(EDC_DATA_FLOW_START_MESSAGE_AGREEMENT_ID).getString()).isEqualTo(message.getAgreementId()); + assertThat(jsonObject.getJsonString(EDC_DATA_FLOW_START_MESSAGE_PARTICIPANT_ID).getString()).isEqualTo(message.getParticipantId()); + assertThat(jsonObject.getJsonString(EDC_DATA_FLOW_START_MESSAGE_DESTINATION_CALLBACK_ADDRESS).getString()).isEqualTo(message.getCallbackAddress().toString()); + assertThat(jsonObject.getJsonString(EDC_DATA_FLOW_START_MESSAGE_FLOW_TYPE).getString()).isEqualTo(message.getFlowType().toString()); + assertThat(jsonObject.get(EDC_DATA_FLOW_START_MESSAGE_DESTINATION_DATA_ADDRESS)).isNotNull(); + assertThat(jsonObject.get(EDC_DATA_FLOW_START_MESSAGE_SOURCE_DATA_ADDRESS)).isNotNull(); + + } + +} diff --git a/extensions/data-plane/data-plane-signaling/data-plane-signaling-transform/src/test/java/org/eclipse/edc/connector/api/signaling/transform/to/JsonObjectToDataFlowStartMessageTransformerTest.java b/extensions/data-plane/data-plane-signaling/data-plane-signaling-transform/src/test/java/org/eclipse/edc/connector/api/signaling/transform/to/JsonObjectToDataFlowStartMessageTransformerTest.java new file mode 100644 index 00000000000..18729a23a90 --- /dev/null +++ b/extensions/data-plane/data-plane-signaling/data-plane-signaling-transform/src/test/java/org/eclipse/edc/connector/api/signaling/transform/to/JsonObjectToDataFlowStartMessageTransformerTest.java @@ -0,0 +1,94 @@ +/* + * Copyright (c) 2024 Bayerische Motoren Werke Aktiengesellschaft (BMW AG) + * + * This program and the accompanying materials are made available under the + * terms of the Apache License, Version 2.0 which is available at + * https://www.apache.org/licenses/LICENSE-2.0 + * + * SPDX-License-Identifier: Apache-2.0 + * + * Contributors: + * Bayerische Motoren Werke Aktiengesellschaft (BMW AG) - initial API and implementation + * + */ + +package org.eclipse.edc.connector.api.signaling.transform.to; + +import jakarta.json.Json; +import jakarta.json.JsonBuilderFactory; +import jakarta.json.JsonObjectBuilder; +import org.eclipse.edc.spi.types.domain.DataAddress; +import org.eclipse.edc.spi.types.domain.transfer.DataFlowStartMessage; +import org.eclipse.edc.spi.types.domain.transfer.FlowType; +import org.eclipse.edc.transform.spi.TransformerContext; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import java.net.URI; +import java.util.Map; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.eclipse.edc.connector.api.signaling.transform.TestFunctions.getExpanded; +import static org.eclipse.edc.jsonld.spi.JsonLdKeywords.CONTEXT; +import static org.eclipse.edc.jsonld.spi.JsonLdKeywords.TYPE; +import static org.eclipse.edc.jsonld.spi.JsonLdKeywords.VOCAB; +import static org.eclipse.edc.spi.CoreConstants.EDC_NAMESPACE; +import static org.eclipse.edc.spi.CoreConstants.EDC_PREFIX; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +class JsonObjectToDataFlowStartMessageTransformerTest { + + private final JsonBuilderFactory jsonFactory = Json.createBuilderFactory(Map.of()); + private final TransformerContext context = mock(TransformerContext.class); + private JsonObjectToDataFlowStartMessageTransformer transformer; + + @BeforeEach + void setUp() { + transformer = new JsonObjectToDataFlowStartMessageTransformer(); + when(context.transform(any(), eq(DataAddress.class))).thenReturn(DataAddress.Builder.newInstance().type("address-type").build()); + } + + @Test + void transform() { + + var jsonObj = jsonFactory.createObjectBuilder() + .add(CONTEXT, createContextBuilder().build()) + .add(TYPE, DataFlowStartMessage.EDC_DATA_FLOW_START_MESSAGE_TYPE) + .add("processId", "processId") + .add("agreementId", "agreementId") + .add("datasetId", "datasetId") + .add("participantId", "participantId") + .add("flowType", "PULL") + .add("sourceDataAddress", jsonFactory.createObjectBuilder().add("type", "address-type")) + .add("destinationDataAddress", jsonFactory.createObjectBuilder().add("type", "address-type")) + .add("properties", jsonFactory.createObjectBuilder().add("foo", "bar")) + .add("callbackAddress", "http://localhost") + .build(); + + var message = transformer.transform(getExpanded(jsonObj), context); + + assertThat(message).isNotNull(); + + assertThat(message.getProcessId()).isEqualTo("processId"); + assertThat(message.getAssetId()).isEqualTo("datasetId"); + assertThat(message.getAgreementId()).isEqualTo("agreementId"); + assertThat(message.getParticipantId()).isEqualTo("participantId"); + assertThat(message.getFlowType()).isEqualTo(FlowType.PULL); + assertThat(message.getDestinationDataAddress()).extracting(DataAddress::getType).isEqualTo("address-type"); + assertThat(message.getSourceDataAddress()).extracting(DataAddress::getType).isEqualTo("address-type"); + assertThat(message.getProperties()).containsEntry(EDC_NAMESPACE + "foo", "bar"); + assertThat(message.getCallbackAddress()).isEqualTo(URI.create("http://localhost")); + + } + + + private JsonObjectBuilder createContextBuilder() { + return jsonFactory.createObjectBuilder() + .add(VOCAB, EDC_NAMESPACE) + .add(EDC_PREFIX, EDC_NAMESPACE); + } + +} diff --git a/spi/common/core-spi/src/main/java/org/eclipse/edc/spi/types/domain/transfer/DataFlowStartMessage.java b/spi/common/core-spi/src/main/java/org/eclipse/edc/spi/types/domain/transfer/DataFlowStartMessage.java index 85f91bf7d2f..1cd3ee9500c 100644 --- a/spi/common/core-spi/src/main/java/org/eclipse/edc/spi/types/domain/transfer/DataFlowStartMessage.java +++ b/spi/common/core-spi/src/main/java/org/eclipse/edc/spi/types/domain/transfer/DataFlowStartMessage.java @@ -23,6 +23,7 @@ import org.eclipse.edc.spi.types.domain.Polymorphic; import java.net.URI; +import java.util.HashMap; import java.util.Map; import java.util.Objects; import java.util.UUID; @@ -40,14 +41,14 @@ public class DataFlowStartMessage implements Polymorphic, TraceCarrier { public static final String DC_DATA_FLOW_START_MESSAGE_PROCESS_ID = EDC_NAMESPACE + "processId"; public static final String EDC_DATA_FLOW_START_MESSAGE_SIMPLE_TYPE = "DataFlowStartMessage"; public static final String EDC_DATA_FLOW_START_MESSAGE_TYPE = EDC_NAMESPACE + EDC_DATA_FLOW_START_MESSAGE_SIMPLE_TYPE; - public static final String EDC_DATA_FLOW_START_MESSAGE_TRANSFER_TYPE = EDC_NAMESPACE + "transferType"; + public static final String EDC_DATA_FLOW_START_MESSAGE_FLOW_TYPE = EDC_NAMESPACE + "flowType"; public static final String EDC_DATA_FLOW_START_MESSAGE_DATASET_ID = EDC_NAMESPACE + "datasetId"; public static final String EDC_DATA_FLOW_START_MESSAGE_PARTICIPANT_ID = EDC_NAMESPACE + "participantId"; public static final String EDC_DATA_FLOW_START_MESSAGE_AGREEMENT_ID = EDC_NAMESPACE + "agreementId"; public static final String EDC_DATA_FLOW_START_MESSAGE_SOURCE_DATA_ADDRESS = EDC_NAMESPACE + "sourceDataAddress"; public static final String EDC_DATA_FLOW_START_MESSAGE_DESTINATION_DATA_ADDRESS = EDC_NAMESPACE + "destinationDataAddress"; public static final String EDC_DATA_FLOW_START_MESSAGE_DESTINATION_CALLBACK_ADDRESS = EDC_NAMESPACE + "callbackAddress"; - public static final String EDC_DATA_FLOW_START_MESSAGE_DESTINATION_PROPERTIES = EDC_NAMESPACE + "properties"; + public static final String EDC_DATA_FLOW_START_MESSAGE_PROPERTIES = EDC_NAMESPACE + "properties"; private String id; private String processId; @@ -58,10 +59,10 @@ public class DataFlowStartMessage implements Polymorphic, TraceCarrier { private DataAddress sourceDataAddress; private DataAddress destinationDataAddress; - private String transferType; + private FlowType flowType; private URI callbackAddress; - private Map properties = Map.of(); + private Map properties = new HashMap<>(); private Map traceContext = Map.of(); // TODO: should this stay in the DataFlow class? private DataFlowStartMessage() { @@ -96,10 +97,10 @@ public DataAddress getDestinationDataAddress() { } /** - * The transfer type to use for the request + * The {@link FlowType} for the request */ - public String getTransferType() { - return transferType; + public FlowType getFlowType() { + return flowType; } @@ -200,8 +201,8 @@ public Builder destinationDataAddress(DataAddress destination) { return this; } - public Builder transferType(String transferType) { - request.transferType = transferType; + public Builder flowType(FlowType flowType) { + request.flowType = flowType; return this; } @@ -225,6 +226,11 @@ public Builder properties(Map value) { return this; } + public Builder property(String key, String value) { + request.properties.put(key, value); + return this; + } + public Builder traceContext(Map value) { request.traceContext = value; return this; diff --git a/spi/control-plane/transfer-spi/src/main/java/org/eclipse/edc/connector/transfer/spi/flow/FlowType.java b/spi/common/core-spi/src/main/java/org/eclipse/edc/spi/types/domain/transfer/FlowType.java similarity index 92% rename from spi/control-plane/transfer-spi/src/main/java/org/eclipse/edc/connector/transfer/spi/flow/FlowType.java rename to spi/common/core-spi/src/main/java/org/eclipse/edc/spi/types/domain/transfer/FlowType.java index b2b86c0dd9a..f10894f9c7a 100644 --- a/spi/control-plane/transfer-spi/src/main/java/org/eclipse/edc/connector/transfer/spi/flow/FlowType.java +++ b/spi/common/core-spi/src/main/java/org/eclipse/edc/spi/types/domain/transfer/FlowType.java @@ -12,7 +12,7 @@ * */ -package org.eclipse.edc.connector.transfer.spi.flow; +package org.eclipse.edc.spi.types.domain.transfer; /** * Data Flow types, generally they can be Push (provider pushing data to the consumer) and Pull (consumer pulling data diff --git a/spi/common/core-spi/src/test/java/org/eclipse/edc/spi/types/domain/transfer/DataFlowStartMessageTest.java b/spi/common/core-spi/src/test/java/org/eclipse/edc/spi/types/domain/transfer/DataFlowStartMessageTest.java index 38be6fcb567..41567fb8d8c 100644 --- a/spi/common/core-spi/src/test/java/org/eclipse/edc/spi/types/domain/transfer/DataFlowStartMessageTest.java +++ b/spi/common/core-spi/src/test/java/org/eclipse/edc/spi/types/domain/transfer/DataFlowStartMessageTest.java @@ -42,7 +42,7 @@ void verifySerializeDeserialize() throws JsonProcessingException { .properties(Map.of("key", "value")) .traceContext(Map.of("key2", "value2")) .participantId("participantId") - .transferType("transferType") + .flowType(FlowType.PUSH) .agreementId("agreementId") .assetId("assetId") .build(); @@ -54,7 +54,7 @@ void verifySerializeDeserialize() throws JsonProcessingException { assertThat(deserialized.getTraceContext().get("key2")).isEqualTo("value2"); assertThat(deserialized.getCallbackAddress()).isEqualTo(uri); assertThat(deserialized.getAgreementId()).isEqualTo("agreementId"); - assertThat(deserialized.getTransferType()).isEqualTo("transferType"); + assertThat(deserialized.getFlowType()).isEqualTo(request.getFlowType()); assertThat(deserialized.getAssetId()).isEqualTo("assetId"); assertThat(deserialized.getParticipantId()).isEqualTo("participantId"); } diff --git a/system-tests/e2e-dataplane-tests/tests/src/test/java/org/eclipse/edc/test/e2e/DataplaneEndToEndTest.java b/system-tests/e2e-dataplane-tests/tests/src/test/java/org/eclipse/edc/test/e2e/DataplaneEndToEndTest.java index 7532cc1a4b4..b8edad0a102 100644 --- a/system-tests/e2e-dataplane-tests/tests/src/test/java/org/eclipse/edc/test/e2e/DataplaneEndToEndTest.java +++ b/system-tests/e2e-dataplane-tests/tests/src/test/java/org/eclipse/edc/test/e2e/DataplaneEndToEndTest.java @@ -16,11 +16,11 @@ import org.eclipse.edc.connector.dataplane.spi.Endpoint; import org.eclipse.edc.connector.dataplane.spi.iam.PublicEndpointGeneratorService; -import org.eclipse.edc.connector.transfer.spi.flow.FlowType; import org.eclipse.edc.junit.annotations.EndToEndTest; import org.eclipse.edc.junit.extensions.EdcRuntimeExtension; import org.eclipse.edc.spi.types.domain.DataAddress; import org.eclipse.edc.spi.types.domain.transfer.DataFlowStartMessage; +import org.eclipse.edc.spi.types.domain.transfer.FlowType; import org.eclipse.edc.test.e2e.participant.DataPlaneParticipant; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.RegisterExtension; @@ -55,7 +55,7 @@ void startTransfer_httpPull() { .processId("test-processId") .sourceDataAddress(DataAddress.Builder.newInstance().type("HttpData").property(EDC_NAMESPACE + "baseUrl", "http://foo.bar/").build()) .destinationDataAddress(DataAddress.Builder.newInstance().type("HttpData").property(EDC_NAMESPACE + "baseUrl", "http://fizz.buzz").build()) - .transferType(FlowType.PULL.toString()) + .flowType(FlowType.PULL) .assetId("test-asset") .agreementId("test-agreement") .build();