diff --git a/extensions/data-plane/data-plane-signaling/data-plane-signaling-client/src/main/java/org/eclipse/edc/connector/dataplane/client/DataPlaneSignalingClient.java b/extensions/data-plane/data-plane-signaling/data-plane-signaling-client/src/main/java/org/eclipse/edc/connector/dataplane/client/DataPlaneSignalingClient.java index aa0ca5a377d..49e2510f323 100644 --- a/extensions/data-plane/data-plane-signaling/data-plane-signaling-client/src/main/java/org/eclipse/edc/connector/dataplane/client/DataPlaneSignalingClient.java +++ b/extensions/data-plane/data-plane-signaling/data-plane-signaling-client/src/main/java/org/eclipse/edc/connector/dataplane/client/DataPlaneSignalingClient.java @@ -67,7 +67,9 @@ public DataPlaneSignalingClient(EdcHttpClient httpClient, TypeTransformerRegistr @WithSpan @Override public StatusResult start(DataFlowStartMessage message) { - return send(message, dataPlane.getUrl().toString(), message.getProcessId(), this::handleStartResponse); + return Optional.ofNullable(dataPlane) + .map(instance -> send(message, instance.getUrl().toString(), message.getProcessId(), this::handleStartResponse)) + .orElseGet(() -> StatusResult.failure(FATAL_ERROR, noDataPlaneInstanceFound(message))); } @Override @@ -84,6 +86,13 @@ public StatusResult terminate(String transferProcessId) { return send(message, url, transferProcessId, r -> StatusResult.success()); } + private String noDataPlaneInstanceFound(DataFlowStartMessage message) { + var source = message.getSourceDataAddress().getType(); + var destination = message.getDestinationDataAddress().getType(); + var processId = message.getProcessId(); + return "Unable to process transfer %s: No data plane found for source: %s and destination: %s".formatted(processId, source, destination); + } + private StatusResult send(Object message, String url, String processId, Function> handleStartResponse) { var requestBuilder = transformerRegistry.transform(message, JsonObject.class) .compose(jsonLd::compact) diff --git a/extensions/data-plane/data-plane-signaling/data-plane-signaling-client/src/test/java/org/eclipse/edc/connector/dataplane/client/DataPlaneSignalingClientTest.java b/extensions/data-plane/data-plane-signaling/data-plane-signaling-client/src/test/java/org/eclipse/edc/connector/dataplane/client/DataPlaneSignalingClientTest.java index 6c3903ca1b2..11f2c0bf982 100644 --- a/extensions/data-plane/data-plane-signaling/data-plane-signaling-client/src/test/java/org/eclipse/edc/connector/dataplane/client/DataPlaneSignalingClientTest.java +++ b/extensions/data-plane/data-plane-signaling/data-plane-signaling-client/src/test/java/org/eclipse/edc/connector/dataplane/client/DataPlaneSignalingClientTest.java @@ -180,6 +180,7 @@ void start_verifyReturnFatalErrorIfTransformFails() { ); } + @Test void start_verifyReturnFatalError_whenBadResponse() throws JsonProcessingException { var flowRequest = createDataFlowRequest(); @@ -203,6 +204,19 @@ void start_verifyReturnFatalError_whenBadResponse() throws JsonProcessingExcepti ); } + @Test + void start_verifyReturnFatalErrorWhenDataPlaneInstanceIsNull() { + var flowRequest = createDataFlowRequest(); + TypeTransformerRegistry registry = mock(); + var dataPlaneClient = new DataPlaneSignalingClient(testHttpClient(), registry, JSON_LD, MAPPER, null); + + var result = dataPlaneClient.start(flowRequest); + + assertThat(result.failed()).isTrue(); + assertThat(result.getFailure().status()).isEqualTo(ResponseStatus.FATAL_ERROR); + assertThat(result.getFailureDetail()).contains("No data plane found for"); + } + @Test void start_verifyTransferSuccess() throws JsonProcessingException { var flowRequest = createDataFlowRequest(); @@ -252,48 +266,6 @@ void start_verifyTransferSuccess_withoutDataAddress() throws JsonProcessingExcep assertThat(result.getContent().getDataAddress()).isNull(); } - @Nested - class Suspend { - - @Test - void shouldCallSuspendOnAllTheAvailableDataPlanes() { - var httpRequest = new HttpRequest().withMethod("POST").withPath(DATA_PLANE_PATH + "/processId/suspend()"); - dataPlane.when(httpRequest, once()).respond(response().withStatusCode(NO_CONTENT_204.code())); - - var result = dataPlaneClient.suspend("processId"); - - assertThat(result).isSucceeded(); - dataPlane.verify(httpRequest, VerificationTimes.once()); - } - - @Test - void shouldFail_whenConflictResponse() { - var httpRequest = new HttpRequest().withMethod("POST").withPath(DATA_PLANE_PATH + "/processId/suspend()"); - dataPlane.when(httpRequest, once()).respond(response().withStatusCode(CONFLICT_409.code())); - - var result = dataPlaneClient.suspend("processId"); - - assertThat(result).isFailed(); - } - - @Test - void verifyReturnFatalErrorIfTransformFails() { - TypeTransformerRegistry registry = mock(); - var dataPlaneClient = new DataPlaneSignalingClient(testHttpClient(), registry, JSON_LD, MAPPER, instance); - - when(registry.transform(any(), any())).thenReturn(Result.failure("Transform Failure")); - - var result = dataPlaneClient.suspend("processId"); - - assertThat(result.failed()).isTrue(); - assertThat(result.getFailure().status()).isEqualTo(ResponseStatus.FATAL_ERROR); - assertThat(result.getFailureMessages()) - .anySatisfy(s -> assertThat(s) - .isEqualTo("Transform Failure") - ); - } - } - @Test void terminate_shouldCallTerminateOnAllTheAvailableDataPlanes() { var httpRequest = new HttpRequest().withMethod("POST").withPath(DATA_PLANE_PATH + "/processId/terminate"); @@ -350,4 +322,46 @@ private DataFlowStartMessage createDataFlowRequest() { .destinationDataAddress(DataAddress.Builder.newInstance().type("test").build()) .build(); } + + @Nested + class Suspend { + + @Test + void shouldCallSuspendOnAllTheAvailableDataPlanes() { + var httpRequest = new HttpRequest().withMethod("POST").withPath(DATA_PLANE_PATH + "/processId/suspend()"); + dataPlane.when(httpRequest, once()).respond(response().withStatusCode(NO_CONTENT_204.code())); + + var result = dataPlaneClient.suspend("processId"); + + assertThat(result).isSucceeded(); + dataPlane.verify(httpRequest, VerificationTimes.once()); + } + + @Test + void shouldFail_whenConflictResponse() { + var httpRequest = new HttpRequest().withMethod("POST").withPath(DATA_PLANE_PATH + "/processId/suspend()"); + dataPlane.when(httpRequest, once()).respond(response().withStatusCode(CONFLICT_409.code())); + + var result = dataPlaneClient.suspend("processId"); + + assertThat(result).isFailed(); + } + + @Test + void verifyReturnFatalErrorIfTransformFails() { + TypeTransformerRegistry registry = mock(); + var dataPlaneClient = new DataPlaneSignalingClient(testHttpClient(), registry, JSON_LD, MAPPER, instance); + + when(registry.transform(any(), any())).thenReturn(Result.failure("Transform Failure")); + + var result = dataPlaneClient.suspend("processId"); + + assertThat(result.failed()).isTrue(); + assertThat(result.getFailure().status()).isEqualTo(ResponseStatus.FATAL_ERROR); + assertThat(result.getFailureMessages()) + .anySatisfy(s -> assertThat(s) + .isEqualTo("Transform Failure") + ); + } + } }