Skip to content

Commit

Permalink
chore: fix NPE when dataplane is null on data plane signaling client
Browse files Browse the repository at this point in the history
  • Loading branch information
wolf4ood committed Mar 22, 2024
1 parent 7493208 commit dbc7f8a
Show file tree
Hide file tree
Showing 2 changed files with 66 additions and 43 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,9 @@ public DataPlaneSignalingClient(EdcHttpClient httpClient, TypeTransformerRegistr
@WithSpan
@Override
public StatusResult<DataFlowResponseMessage> start(DataFlowStartMessage message) {
return send(message, dataPlane.getUrl().toString(), message.getProcessId(), this::handleStartResponse);
return Optional.ofNullable(dataPlane)
.map(instance -> send(message, instance.getUrl().toString(), message.getProcessId(), this::handleStartResponse))
.orElseGet(() -> StatusResult.failure(FATAL_ERROR, noDataPlaneInstanceFound(message)));
}

@Override
Expand All @@ -84,6 +86,13 @@ public StatusResult<Void> terminate(String transferProcessId) {
return send(message, url, transferProcessId, r -> StatusResult.success());
}

private String noDataPlaneInstanceFound(DataFlowStartMessage message) {
var source = message.getSourceDataAddress().getType();
var destination = message.getDestinationDataAddress().getType();
var processId = message.getProcessId();
return "Unable to process transfer %s: No data plane found for source: %s and destination: %s".formatted(processId, source, destination);
}

private <T> StatusResult<T> send(Object message, String url, String processId, Function<Response, StatusResult<T>> handleStartResponse) {
var requestBuilder = transformerRegistry.transform(message, JsonObject.class)
.compose(jsonLd::compact)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,7 @@ void start_verifyReturnFatalErrorIfTransformFails() {
);
}


@Test
void start_verifyReturnFatalError_whenBadResponse() throws JsonProcessingException {
var flowRequest = createDataFlowRequest();
Expand All @@ -203,6 +204,19 @@ void start_verifyReturnFatalError_whenBadResponse() throws JsonProcessingExcepti
);
}

@Test
void start_verifyReturnFatalErrorWhenDataPlaneInstanceIsNull() {
var flowRequest = createDataFlowRequest();
TypeTransformerRegistry registry = mock();
var dataPlaneClient = new DataPlaneSignalingClient(testHttpClient(), registry, JSON_LD, MAPPER, null);

var result = dataPlaneClient.start(flowRequest);

assertThat(result.failed()).isTrue();
assertThat(result.getFailure().status()).isEqualTo(ResponseStatus.FATAL_ERROR);
assertThat(result.getFailureDetail()).contains("No data plane found for");
}

@Test
void start_verifyTransferSuccess() throws JsonProcessingException {
var flowRequest = createDataFlowRequest();
Expand Down Expand Up @@ -252,48 +266,6 @@ void start_verifyTransferSuccess_withoutDataAddress() throws JsonProcessingExcep
assertThat(result.getContent().getDataAddress()).isNull();
}

@Nested
class Suspend {

@Test
void shouldCallSuspendOnAllTheAvailableDataPlanes() {
var httpRequest = new HttpRequest().withMethod("POST").withPath(DATA_PLANE_PATH + "/processId/suspend()");
dataPlane.when(httpRequest, once()).respond(response().withStatusCode(NO_CONTENT_204.code()));

var result = dataPlaneClient.suspend("processId");

assertThat(result).isSucceeded();
dataPlane.verify(httpRequest, VerificationTimes.once());
}

@Test
void shouldFail_whenConflictResponse() {
var httpRequest = new HttpRequest().withMethod("POST").withPath(DATA_PLANE_PATH + "/processId/suspend()");
dataPlane.when(httpRequest, once()).respond(response().withStatusCode(CONFLICT_409.code()));

var result = dataPlaneClient.suspend("processId");

assertThat(result).isFailed();
}

@Test
void verifyReturnFatalErrorIfTransformFails() {
TypeTransformerRegistry registry = mock();
var dataPlaneClient = new DataPlaneSignalingClient(testHttpClient(), registry, JSON_LD, MAPPER, instance);

when(registry.transform(any(), any())).thenReturn(Result.failure("Transform Failure"));

var result = dataPlaneClient.suspend("processId");

assertThat(result.failed()).isTrue();
assertThat(result.getFailure().status()).isEqualTo(ResponseStatus.FATAL_ERROR);
assertThat(result.getFailureMessages())
.anySatisfy(s -> assertThat(s)
.isEqualTo("Transform Failure")
);
}
}

@Test
void terminate_shouldCallTerminateOnAllTheAvailableDataPlanes() {
var httpRequest = new HttpRequest().withMethod("POST").withPath(DATA_PLANE_PATH + "/processId/terminate");
Expand Down Expand Up @@ -350,4 +322,46 @@ private DataFlowStartMessage createDataFlowRequest() {
.destinationDataAddress(DataAddress.Builder.newInstance().type("test").build())
.build();
}

@Nested
class Suspend {

@Test
void shouldCallSuspendOnAllTheAvailableDataPlanes() {
var httpRequest = new HttpRequest().withMethod("POST").withPath(DATA_PLANE_PATH + "/processId/suspend()");
dataPlane.when(httpRequest, once()).respond(response().withStatusCode(NO_CONTENT_204.code()));

var result = dataPlaneClient.suspend("processId");

assertThat(result).isSucceeded();
dataPlane.verify(httpRequest, VerificationTimes.once());
}

@Test
void shouldFail_whenConflictResponse() {
var httpRequest = new HttpRequest().withMethod("POST").withPath(DATA_PLANE_PATH + "/processId/suspend()");
dataPlane.when(httpRequest, once()).respond(response().withStatusCode(CONFLICT_409.code()));

var result = dataPlaneClient.suspend("processId");

assertThat(result).isFailed();
}

@Test
void verifyReturnFatalErrorIfTransformFails() {
TypeTransformerRegistry registry = mock();
var dataPlaneClient = new DataPlaneSignalingClient(testHttpClient(), registry, JSON_LD, MAPPER, instance);

when(registry.transform(any(), any())).thenReturn(Result.failure("Transform Failure"));

var result = dataPlaneClient.suspend("processId");

assertThat(result.failed()).isTrue();
assertThat(result.getFailure().status()).isEqualTo(ResponseStatus.FATAL_ERROR);
assertThat(result.getFailureMessages())
.anySatisfy(s -> assertThat(s)
.isEqualTo("Transform Failure")
);
}
}
}

0 comments on commit dbc7f8a

Please sign in to comment.