Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: introduces properties provider for DataFlowStartMessage #4044

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,16 @@
import org.eclipse.edc.connector.transfer.dataplane.flow.DataPlaneSignalingFlowController;
import org.eclipse.edc.connector.transfer.spi.callback.ControlApiUrl;
import org.eclipse.edc.connector.transfer.spi.flow.DataFlowManager;
import org.eclipse.edc.connector.transfer.spi.flow.DataFlowPropertiesProvider;
import org.eclipse.edc.runtime.metamodel.annotation.Extension;
import org.eclipse.edc.runtime.metamodel.annotation.Inject;
import org.eclipse.edc.runtime.metamodel.annotation.Setting;
import org.eclipse.edc.spi.response.StatusResult;
import org.eclipse.edc.spi.system.ServiceExtension;
import org.eclipse.edc.spi.system.ServiceExtensionContext;

import java.util.Map;

import static org.eclipse.edc.connector.transfer.dataplane.TransferDataPlaneSignalingExtension.NAME;

@Extension(NAME)
Expand All @@ -47,10 +51,17 @@ public class TransferDataPlaneSignalingExtension implements ServiceExtension {
@Inject
private DataPlaneClientFactory clientFactory;

@Inject(required = false)
private DataFlowPropertiesProvider propertiesProvider;

@Override
public void initialize(ServiceExtensionContext context) {
var selectionStrategy = context.getSetting(DPF_SELECTOR_STRATEGY, DEFAULT_DATAPLANE_SELECTOR_STRATEGY);
dataFlowManager.register(new DataPlaneSignalingFlowController(callbackUrl, selectorService, clientFactory, selectionStrategy));
dataFlowManager.register(new DataPlaneSignalingFlowController(callbackUrl, selectorService, getPropertiesProvider(), clientFactory, selectionStrategy));
}

private DataFlowPropertiesProvider getPropertiesProvider() {
return propertiesProvider == null ? (tp, p) -> StatusResult.success(Map.of()) : propertiesProvider;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import org.eclipse.edc.connector.dataplane.selector.spi.instance.DataPlaneInstance;
import org.eclipse.edc.connector.transfer.spi.callback.ControlApiUrl;
import org.eclipse.edc.connector.transfer.spi.flow.DataFlowController;
import org.eclipse.edc.connector.transfer.spi.flow.DataFlowPropertiesProvider;
import org.eclipse.edc.connector.transfer.spi.types.DataFlowResponse;
import org.eclipse.edc.connector.transfer.spi.types.TransferProcess;
import org.eclipse.edc.policy.model.Policy;
Expand Down Expand Up @@ -52,11 +53,13 @@ public class DataPlaneSignalingFlowController implements DataFlowController {
private final DataPlaneSelectorService selectorClient;
private final DataPlaneClientFactory clientFactory;

private final DataFlowPropertiesProvider propertiesProvider;
private final String selectionStrategy;

public DataPlaneSignalingFlowController(ControlApiUrl callbackUrl, DataPlaneSelectorService selectorClient, DataPlaneClientFactory clientFactory, String selectionStrategy) {
public DataPlaneSignalingFlowController(ControlApiUrl callbackUrl, DataPlaneSelectorService selectorClient, DataFlowPropertiesProvider propertiesProvider, DataPlaneClientFactory clientFactory, String selectionStrategy) {
this.callbackUrl = callbackUrl;
this.selectorClient = selectorClient;
this.propertiesProvider = propertiesProvider;
this.clientFactory = clientFactory;
this.selectionStrategy = selectionStrategy;
}
Expand All @@ -73,6 +76,11 @@ public boolean canHandle(TransferProcess transferProcess) {
return StatusResult.failure(ResponseStatus.FATAL_ERROR, flowType.getFailureDetail());
}

var propertiesResult = propertiesProvider.propertiesFor(transferProcess, policy);
if (propertiesResult.failed()) {
return StatusResult.failure(ResponseStatus.FATAL_ERROR, propertiesResult.getFailureDetail());
}

var dataPlaneInstance = selectorClient.select(transferProcess.getContentDataAddress(), transferProcess.getDataDestination(), selectionStrategy, transferProcess.getTransferType());
var dataFlowRequest = DataFlowStartMessage.Builder.newInstance()
.id(UUID.randomUUID().toString())
Expand All @@ -84,6 +92,7 @@ public boolean canHandle(TransferProcess transferProcess) {
.assetId(transferProcess.getAssetId())
.flowType(flowType.getContent())
.callbackAddress(callbackUrl != null ? callbackUrl.get() : null)
.properties(propertiesResult.getContent())
.build();

var dataPlaneInstanceId = dataPlaneInstance != null ? dataPlaneInstance.getId() : null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import org.eclipse.edc.connector.dataplane.selector.spi.client.DataPlaneClient;
import org.eclipse.edc.connector.dataplane.selector.spi.client.DataPlaneClientFactory;
import org.eclipse.edc.connector.dataplane.selector.spi.instance.DataPlaneInstance;
import org.eclipse.edc.connector.transfer.spi.flow.DataFlowPropertiesProvider;
import org.eclipse.edc.connector.transfer.spi.types.DataFlowResponse;
import org.eclipse.edc.connector.transfer.spi.types.TransferProcess;
import org.eclipse.edc.policy.model.Policy;
Expand All @@ -36,6 +37,7 @@

import java.net.URI;
import java.util.List;
import java.util.Map;
import java.util.UUID;

import static org.assertj.core.api.Assertions.assertThat;
Expand All @@ -53,8 +55,11 @@ public class DataPlaneSignalingFlowControllerTest {
private final DataPlaneClient dataPlaneClient = mock();
private final DataPlaneClientFactory dataPlaneClientFactory = mock();
private final DataPlaneSelectorService selectorService = mock();

private final DataFlowPropertiesProvider propertiesProvider = mock();
private final DataPlaneSignalingFlowController flowController =
new DataPlaneSignalingFlowController(() -> URI.create("http://localhost"), selectorService, dataPlaneClientFactory, "random");
new DataPlaneSignalingFlowController(() -> URI.create("http://localhost"), selectorService, propertiesProvider, dataPlaneClientFactory, "random");


@Test
void canHandle() {
Expand All @@ -80,6 +85,8 @@ void initiateFlow_transferSuccess(String transferType) {
.contentDataAddress(testDataAddress())
.build();

var customProperties = Map.of("foo", "bar");
when(propertiesProvider.propertiesFor(any(), any())).thenReturn(StatusResult.success(customProperties));
when(dataPlaneClient.start(any(DataFlowStartMessage.class))).thenReturn(StatusResult.success(mock(DataFlowResponseMessage.class)));
var dataPlaneInstance = createDataPlaneInstance();
when(selectorService.select(any(), any(), any(), eq(transferType))).thenReturn(dataPlaneInstance);
Expand All @@ -98,7 +105,7 @@ void initiateFlow_transferSuccess(String transferType) {
assertThat(captured.getAgreementId()).isEqualTo(transferProcess.getContractId());
assertThat(captured.getAssetId()).isEqualTo(transferProcess.getAssetId());
assertThat(transferType).contains(captured.getFlowType().toString());
assertThat(captured.getProperties()).isEmpty();
assertThat(captured.getProperties()).containsAllEntriesOf(customProperties);
assertThat(captured.getCallbackAddress()).isNotNull();
}

Expand All @@ -112,6 +119,7 @@ void initiateFlow_transferSuccess_withReturnedDataAddress() {

var response = mock(DataFlowResponseMessage.class);
when(response.getDataAddress()).thenReturn(DataAddress.Builder.newInstance().type("type").build());
when(propertiesProvider.propertiesFor(any(), any())).thenReturn(StatusResult.success(Map.of()));
when(dataPlaneClient.start(any(DataFlowStartMessage.class))).thenReturn(StatusResult.success(response));
var dataPlaneInstance = createDataPlaneInstance();
when(selectorService.select(any(), any(), any(), eq(HTTP_DATA_PULL))).thenReturn(dataPlaneInstance);
Expand All @@ -134,6 +142,7 @@ void initiateFlow_transferSuccess_withoutDataPlane() {
.transferType(HTTP_DATA_PULL)
.build();

when(propertiesProvider.propertiesFor(any(), any())).thenReturn(StatusResult.success(Map.of()));
when(dataPlaneClient.start(any(DataFlowStartMessage.class))).thenReturn(StatusResult.success(mock(DataFlowResponseMessage.class)));
when(selectorService.select(any(), any())).thenReturn(null);
when(dataPlaneClientFactory.createClient(any())).thenReturn(dataPlaneClient);
Expand Down Expand Up @@ -171,6 +180,21 @@ void initiateFlow_invalidTransferType(String transferType) {
assertThat(result.getFailureMessages()).allSatisfy(s -> assertThat(s).contains("Failed to extract flow type from transferType %s".formatted(transferType)));
}

@Test
void initiateFlow_returnFailedResult_whenPropertiesResolveFails() {
var errorMsg = "error";
var transferProcess = transferProcessBuilder()
.contentDataAddress(testDataAddress())
.transferType(HTTP_DATA_PULL)
.build();

when(propertiesProvider.propertiesFor(any(), any())).thenReturn(StatusResult.failure(ResponseStatus.FATAL_ERROR, errorMsg));
var result = flowController.start(transferProcess, Policy.Builder.newInstance().build());

assertThat(result.failed()).isTrue();
assertThat(result.getFailureMessages()).allSatisfy(s -> assertThat(s).contains(errorMsg));
}

@Test
void initiateFlow_returnFailedResultIfTransferFails() {
var errorMsg = "error";
Expand All @@ -179,6 +203,7 @@ void initiateFlow_returnFailedResultIfTransferFails() {
.transferType(HTTP_DATA_PULL)
.build();

when(propertiesProvider.propertiesFor(any(), any())).thenReturn(StatusResult.success(Map.of()));
when(dataPlaneClient.start(any())).thenReturn(StatusResult.failure(ResponseStatus.FATAL_ERROR, errorMsg));
var dataPlaneInstance = createDataPlaneInstance();
when(selectorService.select(any(), any())).thenReturn(dataPlaneInstance);
Expand All @@ -192,65 +217,6 @@ void initiateFlow_returnFailedResultIfTransferFails() {
assertThat(result.getFailureMessages()).allSatisfy(s -> assertThat(s).contains(errorMsg));
}

@Nested
class Suspend {

@Test
void shouldCallTerminate() {
var transferProcess = TransferProcess.Builder.newInstance()
.id("transferProcessId")
.contentDataAddress(testDataAddress())
.build();
when(dataPlaneClient.suspend(any())).thenReturn(StatusResult.success());
var dataPlaneInstance = createDataPlaneInstance();
when(dataPlaneClientFactory.createClient(any())).thenReturn(dataPlaneClient);
when(selectorService.getAll()).thenReturn(List.of(dataPlaneInstance));

var result = flowController.suspend(transferProcess);

assertThat(result).isSucceeded();
verify(dataPlaneClient).suspend("transferProcessId");
}

@Test
void shouldCallTerminateOnTheRightDataPlane() {
var dataPlaneInstance = createDataPlaneInstance();
var mockedDataPlane = mock(DataPlaneInstance.class);
var transferProcess = TransferProcess.Builder.newInstance()
.id("transferProcessId")
.contentDataAddress(testDataAddress())
.dataPlaneId(dataPlaneInstance.getId())
.build();
when(mockedDataPlane.getId()).thenReturn("notValidId");
when(dataPlaneClient.suspend(any())).thenReturn(StatusResult.success());
when(dataPlaneClientFactory.createClient(any())).thenReturn(dataPlaneClient);
when(selectorService.getAll()).thenReturn(List.of(dataPlaneInstance, mockedDataPlane));

var result = flowController.suspend(transferProcess);

assertThat(result).isSucceeded();
verify(dataPlaneClient).suspend("transferProcessId");
verify(mockedDataPlane).getId();
}

@Test
void shouldFail_withInvalidDataPlaneId() {
var dataPlaneInstance = createDataPlaneInstance();
var transferProcess = TransferProcess.Builder.newInstance()
.id("transferProcessId")
.contentDataAddress(testDataAddress())
.dataPlaneId("invalid")
.build();
when(dataPlaneClient.suspend(any())).thenReturn(StatusResult.success());
when(dataPlaneClientFactory.createClient(any())).thenReturn(dataPlaneClient);
when(selectorService.getAll()).thenReturn(List.of(dataPlaneInstance));

var result = flowController.suspend(transferProcess);

assertThat(result).isFailed().detail().contains("Failed to select the data plane for suspending the transfer process");
}
}

@Test
void terminate_shouldCallTerminate() {
var transferProcess = transferProcessBuilder()
Expand Down Expand Up @@ -325,7 +291,6 @@ private DataPlaneInstance.Builder dataPlaneInstanceBuilder() {
return DataPlaneInstance.Builder.newInstance().url("http://any");
}


private DataPlaneInstance createDataPlaneInstance() {
return dataPlaneInstanceBuilder().build();
}
Expand All @@ -350,4 +315,63 @@ private TransferProcess.Builder transferProcessBuilder() {
.counterPartyAddress("test.connector.address")
.dataDestination(DataAddress.Builder.newInstance().type("test").build());
}

@Nested
class Suspend {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was late for review, but why put tests under private methods? Doesn't look correct

Copy link
Contributor Author

@wolf4ood wolf4ood Mar 26, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

which tests are private?


@Test
void shouldCallTerminate() {
var transferProcess = TransferProcess.Builder.newInstance()
.id("transferProcessId")
.contentDataAddress(testDataAddress())
.build();
when(dataPlaneClient.suspend(any())).thenReturn(StatusResult.success());
var dataPlaneInstance = createDataPlaneInstance();
when(dataPlaneClientFactory.createClient(any())).thenReturn(dataPlaneClient);
when(selectorService.getAll()).thenReturn(List.of(dataPlaneInstance));

var result = flowController.suspend(transferProcess);

assertThat(result).isSucceeded();
verify(dataPlaneClient).suspend("transferProcessId");
}

@Test
void shouldCallTerminateOnTheRightDataPlane() {
var dataPlaneInstance = createDataPlaneInstance();
var mockedDataPlane = mock(DataPlaneInstance.class);
var transferProcess = TransferProcess.Builder.newInstance()
.id("transferProcessId")
.contentDataAddress(testDataAddress())
.dataPlaneId(dataPlaneInstance.getId())
.build();
when(mockedDataPlane.getId()).thenReturn("notValidId");
when(dataPlaneClient.suspend(any())).thenReturn(StatusResult.success());
when(dataPlaneClientFactory.createClient(any())).thenReturn(dataPlaneClient);
when(selectorService.getAll()).thenReturn(List.of(dataPlaneInstance, mockedDataPlane));

var result = flowController.suspend(transferProcess);

assertThat(result).isSucceeded();
verify(dataPlaneClient).suspend("transferProcessId");
verify(mockedDataPlane).getId();
}

@Test
void shouldFail_withInvalidDataPlaneId() {
var dataPlaneInstance = createDataPlaneInstance();
var transferProcess = TransferProcess.Builder.newInstance()
.id("transferProcessId")
.contentDataAddress(testDataAddress())
.dataPlaneId("invalid")
.build();
when(dataPlaneClient.suspend(any())).thenReturn(StatusResult.success());
when(dataPlaneClientFactory.createClient(any())).thenReturn(dataPlaneClient);
when(selectorService.getAll()).thenReturn(List.of(dataPlaneInstance));

var result = flowController.suspend(transferProcess);

assertThat(result).isFailed().detail().contains("Failed to select the data plane for suspending the transfer process");
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
/*
* Copyright (c) 2024 Bayerische Motoren Werke Aktiengesellschaft (BMW AG)
*
* This program and the accompanying materials are made available under the
* terms of the Apache License, Version 2.0 which is available at
* https://www.apache.org/licenses/LICENSE-2.0
*
* SPDX-License-Identifier: Apache-2.0
*
* Contributors:
* Bayerische Motoren Werke Aktiengesellschaft (BMW AG) - initial API and implementation
*
*/

package org.eclipse.edc.connector.transfer.spi.flow;

import org.eclipse.edc.connector.transfer.spi.types.TransferProcess;
import org.eclipse.edc.policy.model.Policy;
import org.eclipse.edc.runtime.metamodel.annotation.ExtensionPoint;
import org.eclipse.edc.spi.response.StatusResult;
import org.eclipse.edc.spi.types.domain.transfer.DataFlowStartMessage;

import java.util.Map;

/**
* Extension point allows additional properties to be included in a {@link DataFlowStartMessage}
*/
@FunctionalInterface
@ExtensionPoint
public interface DataFlowPropertiesProvider {

StatusResult<Map<String, String>> propertiesFor(TransferProcess transferProcess, Policy policy);
Fixed Show fixed Hide fixed
Fixed Show fixed Hide fixed

Check notice

Code scanning / CodeQL

Useless parameter Note

The parameter 'transferProcess' is never used.

Check notice

Code scanning / CodeQL

Useless parameter Note

The parameter 'policy' is never used.

}
Loading