From fd81ffc69abe172e6bf4f4a1d267cba0a9df53da Mon Sep 17 00:00:00 2001 From: Enrico Risa Date: Mon, 25 Mar 2024 09:29:30 +0100 Subject: [PATCH] chore: fix NPE when dataplane is null on data plane signaling client (#4039) * chore: fix NPE when dataplane is null on data plane signaling client * chore: test refactor --- .../client/DataPlaneSignalingClient.java | 11 +- .../client/DataPlaneSignalingClientTest.java | 327 ++++++++++-------- 2 files changed, 186 insertions(+), 152 deletions(-) diff --git a/extensions/data-plane/data-plane-signaling/data-plane-signaling-client/src/main/java/org/eclipse/edc/connector/dataplane/client/DataPlaneSignalingClient.java b/extensions/data-plane/data-plane-signaling/data-plane-signaling-client/src/main/java/org/eclipse/edc/connector/dataplane/client/DataPlaneSignalingClient.java index aa0ca5a377d..49e2510f323 100644 --- a/extensions/data-plane/data-plane-signaling/data-plane-signaling-client/src/main/java/org/eclipse/edc/connector/dataplane/client/DataPlaneSignalingClient.java +++ b/extensions/data-plane/data-plane-signaling/data-plane-signaling-client/src/main/java/org/eclipse/edc/connector/dataplane/client/DataPlaneSignalingClient.java @@ -67,7 +67,9 @@ public DataPlaneSignalingClient(EdcHttpClient httpClient, TypeTransformerRegistr @WithSpan @Override public StatusResult start(DataFlowStartMessage message) { - return send(message, dataPlane.getUrl().toString(), message.getProcessId(), this::handleStartResponse); + return Optional.ofNullable(dataPlane) + .map(instance -> send(message, instance.getUrl().toString(), message.getProcessId(), this::handleStartResponse)) + .orElseGet(() -> StatusResult.failure(FATAL_ERROR, noDataPlaneInstanceFound(message))); } @Override @@ -84,6 +86,13 @@ public StatusResult terminate(String transferProcessId) { return send(message, url, transferProcessId, r -> StatusResult.success()); } + private String noDataPlaneInstanceFound(DataFlowStartMessage message) { + var source = message.getSourceDataAddress().getType(); + var destination = message.getDestinationDataAddress().getType(); + var processId = message.getProcessId(); + return "Unable to process transfer %s: No data plane found for source: %s and destination: %s".formatted(processId, source, destination); + } + private StatusResult send(Object message, String url, String processId, Function> handleStartResponse) { var requestBuilder = transformerRegistry.transform(message, JsonObject.class) .compose(jsonLd::compact) diff --git a/extensions/data-plane/data-plane-signaling/data-plane-signaling-client/src/test/java/org/eclipse/edc/connector/dataplane/client/DataPlaneSignalingClientTest.java b/extensions/data-plane/data-plane-signaling/data-plane-signaling-client/src/test/java/org/eclipse/edc/connector/dataplane/client/DataPlaneSignalingClientTest.java index 6c3903ca1b2..93c8f8b668b 100644 --- a/extensions/data-plane/data-plane-signaling/data-plane-signaling-client/src/test/java/org/eclipse/edc/connector/dataplane/client/DataPlaneSignalingClientTest.java +++ b/extensions/data-plane/data-plane-signaling/data-plane-signaling-client/src/test/java/org/eclipse/edc/connector/dataplane/client/DataPlaneSignalingClientTest.java @@ -115,152 +115,191 @@ public void resetMockServer() { dataPlane.reset(); } - @Test - void start_verifyReturnFatalErrorIfReceiveResponseWithNullBody() throws JsonProcessingException { - var flowRequest = createDataFlowRequest(); - var expected = TRANSFORMER_REGISTRY.transform(flowRequest, JsonObject.class) - .compose(JSON_LD::compact) - .orElseThrow((e) -> new EdcException(e.getFailureDetail())); + private HttpResponse withResponse(String errorMsg) throws JsonProcessingException { + return response().withStatusCode(HttpStatusCode.BAD_REQUEST_400.code()) + .withBody(MAPPER.writeValueAsString(new TransferErrorResponse(List.of(errorMsg))), MediaType.APPLICATION_JSON); + } - var httpRequest = new HttpRequest().withPath(DATA_PLANE_PATH).withBody(MAPPER.writeValueAsString(expected)); - dataPlane.when(httpRequest, once()).respond(response().withStatusCode(HttpStatusCode.BAD_REQUEST_400.code())); + private DataFlowStartMessage createDataFlowRequest() { + return DataFlowStartMessage.Builder.newInstance() + .id("123") + .processId("456") + .flowType(FlowType.PULL) + .assetId("assetId") + .agreementId("agreementId") + .participantId("participantId") + .callbackAddress(URI.create("http://void")) + .sourceDataAddress(DataAddress.Builder.newInstance().type("test").build()) + .destinationDataAddress(DataAddress.Builder.newInstance().type("test").build()) + .build(); + } - var result = dataPlaneClient.start(flowRequest); + @Nested + class Start { - dataPlane.verify(httpRequest, VerificationTimes.once()); + @Test + void verifyReturnFatalErrorIfReceiveResponseWithNullBody() throws JsonProcessingException { + var flowRequest = createDataFlowRequest(); - assertThat(result.failed()).isTrue(); - assertThat(result.getFailure().status()).isEqualTo(ResponseStatus.FATAL_ERROR); - assertThat(result.getFailureMessages()) - .anySatisfy(s -> assertThat(s) - .isEqualTo("Transfer request failed with status code 400 for request %s", flowRequest.getProcessId()) - ); - } + var expected = TRANSFORMER_REGISTRY.transform(flowRequest, JsonObject.class) + .compose(JSON_LD::compact) + .orElseThrow((e) -> new EdcException(e.getFailureDetail())); - @Test - void start_verifyReturnFatalErrorIfReceiveErrorInResponse() throws JsonProcessingException { - var flowRequest = createDataFlowRequest(); + var httpRequest = new HttpRequest().withPath(DATA_PLANE_PATH).withBody(MAPPER.writeValueAsString(expected)); + dataPlane.when(httpRequest, once()).respond(response().withStatusCode(HttpStatusCode.BAD_REQUEST_400.code())); - var expected = TRANSFORMER_REGISTRY.transform(flowRequest, JsonObject.class) - .compose(JSON_LD::compact) - .orElseThrow((e) -> new EdcException(e.getFailureDetail())); + var result = dataPlaneClient.start(flowRequest); - var httpRequest = new HttpRequest().withPath(DATA_PLANE_PATH).withBody(MAPPER.writeValueAsString(expected)); - var errorMsg = UUID.randomUUID().toString(); - dataPlane.when(httpRequest, once()).respond(withResponse(errorMsg)); + dataPlane.verify(httpRequest, VerificationTimes.once()); - var result = dataPlaneClient.start(flowRequest); + assertThat(result.failed()).isTrue(); + assertThat(result.getFailure().status()).isEqualTo(ResponseStatus.FATAL_ERROR); + assertThat(result.getFailureMessages()) + .anySatisfy(s -> assertThat(s) + .isEqualTo("Transfer request failed with status code 400 for request %s", flowRequest.getProcessId()) + ); + } - dataPlane.verify(httpRequest, VerificationTimes.once()); + @Test + void verifyReturnFatalErrorIfReceiveErrorInResponse() throws JsonProcessingException { + var flowRequest = createDataFlowRequest(); - assertThat(result.failed()).isTrue(); - assertThat(result.getFailure().status()).isEqualTo(ResponseStatus.FATAL_ERROR); - assertThat(result.getFailureMessages()) - .anySatisfy(s -> assertThat(s) - .isEqualTo(format("Transfer request failed with status code 400 for request %s", flowRequest.getProcessId())) - ); - } + var expected = TRANSFORMER_REGISTRY.transform(flowRequest, JsonObject.class) + .compose(JSON_LD::compact) + .orElseThrow((e) -> new EdcException(e.getFailureDetail())); - @Test - void start_verifyReturnFatalErrorIfTransformFails() { - var flowRequest = createDataFlowRequest(); - TypeTransformerRegistry registry = mock(); - var dataPlaneClient = new DataPlaneSignalingClient(testHttpClient(), registry, JSON_LD, MAPPER, instance); + var httpRequest = new HttpRequest().withPath(DATA_PLANE_PATH).withBody(MAPPER.writeValueAsString(expected)); + var errorMsg = UUID.randomUUID().toString(); + dataPlane.when(httpRequest, once()).respond(withResponse(errorMsg)); - when(registry.transform(any(), any())).thenReturn(Result.failure("Transform Failure")); + var result = dataPlaneClient.start(flowRequest); - var result = dataPlaneClient.start(flowRequest); + dataPlane.verify(httpRequest, VerificationTimes.once()); - assertThat(result.failed()).isTrue(); - assertThat(result.getFailure().status()).isEqualTo(ResponseStatus.FATAL_ERROR); - assertThat(result.getFailureMessages()) - .anySatisfy(s -> assertThat(s) - .isEqualTo("Transform Failure") - ); - } + assertThat(result.failed()).isTrue(); + assertThat(result.getFailure().status()).isEqualTo(ResponseStatus.FATAL_ERROR); + assertThat(result.getFailureMessages()) + .anySatisfy(s -> assertThat(s) + .isEqualTo(format("Transfer request failed with status code 400 for request %s", flowRequest.getProcessId())) + ); + } - @Test - void start_verifyReturnFatalError_whenBadResponse() throws JsonProcessingException { - var flowRequest = createDataFlowRequest(); - var expected = TRANSFORMER_REGISTRY.transform(flowRequest, JsonObject.class) - .compose(JSON_LD::compact) - .orElseThrow((e) -> new EdcException(e.getFailureDetail())); + @Test + void verifyReturnFatalErrorIfTransformFails() { + var flowRequest = createDataFlowRequest(); + TypeTransformerRegistry registry = mock(); + var dataPlaneClient = new DataPlaneSignalingClient(testHttpClient(), registry, JSON_LD, MAPPER, instance); + when(registry.transform(any(), any())).thenReturn(Result.failure("Transform Failure")); - var httpRequest = new HttpRequest().withPath(DATA_PLANE_PATH).withBody(MAPPER.writeValueAsString(expected)); - dataPlane.when(httpRequest, once()).respond(response().withBody("{}").withStatusCode(HttpStatusCode.OK_200.code())); + var result = dataPlaneClient.start(flowRequest); - var result = dataPlaneClient.start(flowRequest); + assertThat(result.failed()).isTrue(); + assertThat(result.getFailure().status()).isEqualTo(ResponseStatus.FATAL_ERROR); + assertThat(result.getFailureMessages()) + .anySatisfy(s -> assertThat(s) + .isEqualTo("Transform Failure") + ); + } - dataPlane.verify(httpRequest, VerificationTimes.once()); - assertThat(result.failed()).isTrue(); - assertThat(result.getFailure().status()).isEqualTo(ResponseStatus.FATAL_ERROR); - assertThat(result.getFailureMessages()) - .anySatisfy(s -> assertThat(s) - .contains("Error expanding JSON-LD structure") - ); - } + @Test + void verifyReturnFatalError_whenBadResponse() throws JsonProcessingException { + var flowRequest = createDataFlowRequest(); + var expected = TRANSFORMER_REGISTRY.transform(flowRequest, JsonObject.class) + .compose(JSON_LD::compact) + .orElseThrow((e) -> new EdcException(e.getFailureDetail())); - @Test - void start_verifyTransferSuccess() throws JsonProcessingException { - var flowRequest = createDataFlowRequest(); - var expected = TRANSFORMER_REGISTRY.transform(flowRequest, JsonObject.class) - .compose(JSON_LD::compact) - .orElseThrow((e) -> new EdcException(e.getFailureDetail())); - var flowResponse = DataFlowResponseMessage.Builder.newInstance().dataAddress(DataAddress.Builder.newInstance().type("type").build()).build(); - var response = TRANSFORMER_REGISTRY.transform(flowResponse, JsonObject.class) - .compose(JSON_LD::compact) - .orElseThrow((e) -> new EdcException(e.getFailureDetail())); + var httpRequest = new HttpRequest().withPath(DATA_PLANE_PATH).withBody(MAPPER.writeValueAsString(expected)); + dataPlane.when(httpRequest, once()).respond(response().withBody("{}").withStatusCode(HttpStatusCode.OK_200.code())); + var result = dataPlaneClient.start(flowRequest); - var httpRequest = new HttpRequest().withPath(DATA_PLANE_PATH).withBody(MAPPER.writeValueAsString(expected)); - dataPlane.when(httpRequest, once()).respond(response().withBody(MAPPER.writeValueAsString(response)).withStatusCode(HttpStatusCode.OK_200.code())); + dataPlane.verify(httpRequest, VerificationTimes.once()); + + assertThat(result.failed()).isTrue(); + assertThat(result.getFailure().status()).isEqualTo(ResponseStatus.FATAL_ERROR); + assertThat(result.getFailureMessages()) + .anySatisfy(s -> assertThat(s) + .contains("Error expanding JSON-LD structure") + ); + } - var result = dataPlaneClient.start(flowRequest); + @Test + void verifyReturnFatalErrorWhenDataPlaneInstanceIsNull() { + var flowRequest = createDataFlowRequest(); + TypeTransformerRegistry registry = mock(); + var dataPlaneClient = new DataPlaneSignalingClient(testHttpClient(), registry, JSON_LD, MAPPER, null); - dataPlane.verify(httpRequest, VerificationTimes.once()); + var result = dataPlaneClient.start(flowRequest); - assertThat(result.succeeded()).isTrue(); + assertThat(result.failed()).isTrue(); + assertThat(result.getFailure().status()).isEqualTo(ResponseStatus.FATAL_ERROR); + assertThat(result.getFailureDetail()).contains("No data plane found for"); + } - assertThat(result.getContent().getDataAddress()).isNotNull(); - } + @Test + void verifyTransferSuccess() throws JsonProcessingException { + var flowRequest = createDataFlowRequest(); + var expected = TRANSFORMER_REGISTRY.transform(flowRequest, JsonObject.class) + .compose(JSON_LD::compact) + .orElseThrow((e) -> new EdcException(e.getFailureDetail())); - @Test - void start_verifyTransferSuccess_withoutDataAddress() throws JsonProcessingException { - var flowRequest = createDataFlowRequest(); - var expected = TRANSFORMER_REGISTRY.transform(flowRequest, JsonObject.class) - .compose(JSON_LD::compact) - .orElseThrow((e) -> new EdcException(e.getFailureDetail())); + var flowResponse = DataFlowResponseMessage.Builder.newInstance().dataAddress(DataAddress.Builder.newInstance().type("type").build()).build(); + var response = TRANSFORMER_REGISTRY.transform(flowResponse, JsonObject.class) + .compose(JSON_LD::compact) + .orElseThrow((e) -> new EdcException(e.getFailureDetail())); - var flowResponse = DataFlowResponseMessage.Builder.newInstance().build(); - var response = TRANSFORMER_REGISTRY.transform(flowResponse, JsonObject.class) - .compose(JSON_LD::compact) - .orElseThrow((e) -> new EdcException(e.getFailureDetail())); + var httpRequest = new HttpRequest().withPath(DATA_PLANE_PATH).withBody(MAPPER.writeValueAsString(expected)); + dataPlane.when(httpRequest, once()).respond(response().withBody(MAPPER.writeValueAsString(response)).withStatusCode(HttpStatusCode.OK_200.code())); - var httpRequest = new HttpRequest().withPath(DATA_PLANE_PATH).withBody(MAPPER.writeValueAsString(expected)); - dataPlane.when(httpRequest, once()).respond(response().withBody(MAPPER.writeValueAsString(response)).withStatusCode(HttpStatusCode.OK_200.code())); + var result = dataPlaneClient.start(flowRequest); - var result = dataPlaneClient.start(flowRequest); + dataPlane.verify(httpRequest, VerificationTimes.once()); - dataPlane.verify(httpRequest, VerificationTimes.once()); + assertThat(result.succeeded()).isTrue(); + + assertThat(result.getContent().getDataAddress()).isNotNull(); + } + + @Test + void verifyTransferSuccess_withoutDataAddress() throws JsonProcessingException { + var flowRequest = createDataFlowRequest(); + var expected = TRANSFORMER_REGISTRY.transform(flowRequest, JsonObject.class) + .compose(JSON_LD::compact) + .orElseThrow((e) -> new EdcException(e.getFailureDetail())); + + var flowResponse = DataFlowResponseMessage.Builder.newInstance().build(); + var response = TRANSFORMER_REGISTRY.transform(flowResponse, JsonObject.class) + .compose(JSON_LD::compact) + .orElseThrow((e) -> new EdcException(e.getFailureDetail())); + + + var httpRequest = new HttpRequest().withPath(DATA_PLANE_PATH).withBody(MAPPER.writeValueAsString(expected)); + dataPlane.when(httpRequest, once()).respond(response().withBody(MAPPER.writeValueAsString(response)).withStatusCode(HttpStatusCode.OK_200.code())); + + var result = dataPlaneClient.start(flowRequest); + + dataPlane.verify(httpRequest, VerificationTimes.once()); + + assertThat(result.succeeded()).isTrue(); + assertThat(result.getContent().getDataAddress()).isNull(); + } - assertThat(result.succeeded()).isTrue(); - assertThat(result.getContent().getDataAddress()).isNull(); } @Nested - class Suspend { + class Terminate { @Test - void shouldCallSuspendOnAllTheAvailableDataPlanes() { - var httpRequest = new HttpRequest().withMethod("POST").withPath(DATA_PLANE_PATH + "/processId/suspend()"); + void shouldCallTerminateOnAllTheAvailableDataPlanes() { + var httpRequest = new HttpRequest().withMethod("POST").withPath(DATA_PLANE_PATH + "/processId/terminate"); dataPlane.when(httpRequest, once()).respond(response().withStatusCode(NO_CONTENT_204.code())); - var result = dataPlaneClient.suspend("processId"); + var result = dataPlaneClient.terminate("processId"); assertThat(result).isSucceeded(); dataPlane.verify(httpRequest, VerificationTimes.once()); @@ -268,10 +307,10 @@ void shouldCallSuspendOnAllTheAvailableDataPlanes() { @Test void shouldFail_whenConflictResponse() { - var httpRequest = new HttpRequest().withMethod("POST").withPath(DATA_PLANE_PATH + "/processId/suspend()"); + var httpRequest = new HttpRequest().withMethod("POST").withPath(DATA_PLANE_PATH + "/processId/terminate"); dataPlane.when(httpRequest, once()).respond(response().withStatusCode(CONFLICT_409.code())); - var result = dataPlaneClient.suspend("processId"); + var result = dataPlaneClient.terminate("processId"); assertThat(result).isFailed(); } @@ -283,7 +322,7 @@ void verifyReturnFatalErrorIfTransformFails() { when(registry.transform(any(), any())).thenReturn(Result.failure("Transform Failure")); - var result = dataPlaneClient.suspend("processId"); + var result = dataPlaneClient.terminate("processId"); assertThat(result.failed()).isTrue(); assertThat(result.getFailure().status()).isEqualTo(ResponseStatus.FATAL_ERROR); @@ -292,62 +331,48 @@ void verifyReturnFatalErrorIfTransformFails() { .isEqualTo("Transform Failure") ); } - } - @Test - void terminate_shouldCallTerminateOnAllTheAvailableDataPlanes() { - var httpRequest = new HttpRequest().withMethod("POST").withPath(DATA_PLANE_PATH + "/processId/terminate"); - dataPlane.when(httpRequest, once()).respond(response().withStatusCode(NO_CONTENT_204.code())); + } - var result = dataPlaneClient.terminate("processId"); + @Nested + class Suspend { - assertThat(result).isSucceeded(); - dataPlane.verify(httpRequest, VerificationTimes.once()); - } + @Test + void shouldCallSuspendOnAllTheAvailableDataPlanes() { + var httpRequest = new HttpRequest().withMethod("POST").withPath(DATA_PLANE_PATH + "/processId/suspend"); + dataPlane.when(httpRequest, once()).respond(response().withStatusCode(NO_CONTENT_204.code())); - @Test - void terminate_shouldFail_whenConflictResponse() { - var httpRequest = new HttpRequest().withMethod("POST").withPath(DATA_PLANE_PATH + "/processId/terminate"); - dataPlane.when(httpRequest, once()).respond(response().withStatusCode(CONFLICT_409.code())); + var result = dataPlaneClient.suspend("processId"); - var result = dataPlaneClient.terminate("processId"); + assertThat(result).isSucceeded(); + dataPlane.verify(httpRequest, VerificationTimes.once()); + } - assertThat(result).isFailed(); - } + @Test + void shouldFail_whenConflictResponse() { + var httpRequest = new HttpRequest().withMethod("POST").withPath(DATA_PLANE_PATH + "/processId/suspend"); + dataPlane.when(httpRequest, once()).respond(response().withStatusCode(CONFLICT_409.code())); - @Test - void terminate_verifyReturnFatalErrorIfTransformFails() { - TypeTransformerRegistry registry = mock(); - var dataPlaneClient = new DataPlaneSignalingClient(testHttpClient(), registry, JSON_LD, MAPPER, instance); + var result = dataPlaneClient.suspend("processId"); - when(registry.transform(any(), any())).thenReturn(Result.failure("Transform Failure")); + assertThat(result).isFailed(); + } - var result = dataPlaneClient.terminate("processId"); + @Test + void verifyReturnFatalErrorIfTransformFails() { + TypeTransformerRegistry registry = mock(); + var dataPlaneClient = new DataPlaneSignalingClient(testHttpClient(), registry, JSON_LD, MAPPER, instance); - assertThat(result.failed()).isTrue(); - assertThat(result.getFailure().status()).isEqualTo(ResponseStatus.FATAL_ERROR); - assertThat(result.getFailureMessages()) - .anySatisfy(s -> assertThat(s) - .isEqualTo("Transform Failure") - ); - } + when(registry.transform(any(), any())).thenReturn(Result.failure("Transform Failure")); - private HttpResponse withResponse(String errorMsg) throws JsonProcessingException { - return response().withStatusCode(HttpStatusCode.BAD_REQUEST_400.code()) - .withBody(MAPPER.writeValueAsString(new TransferErrorResponse(List.of(errorMsg))), MediaType.APPLICATION_JSON); - } + var result = dataPlaneClient.suspend("processId"); - private DataFlowStartMessage createDataFlowRequest() { - return DataFlowStartMessage.Builder.newInstance() - .id("123") - .processId("456") - .flowType(FlowType.PULL) - .assetId("assetId") - .agreementId("agreementId") - .participantId("participantId") - .callbackAddress(URI.create("http://void")) - .sourceDataAddress(DataAddress.Builder.newInstance().type("test").build()) - .destinationDataAddress(DataAddress.Builder.newInstance().type("test").build()) - .build(); + assertThat(result.failed()).isTrue(); + assertThat(result.getFailure().status()).isEqualTo(ResponseStatus.FATAL_ERROR); + assertThat(result.getFailureMessages()) + .anySatisfy(s -> assertThat(s) + .isEqualTo("Transform Failure") + ); + } } }