From 964e40b355c8507943bec978fa965482d0ac3650 Mon Sep 17 00:00:00 2001 From: Enrico Risa Date: Tue, 19 Mar 2024 16:13:37 +0100 Subject: [PATCH] feat: implements revocation in DataPlaneAuthorizationService (#4019) --- .../TransferProcessEventListener.java | 10 +- .../DataPlaneAuthorizationServiceImpl.java | 5 + ...efaultDataPlaneAccessTokenServiceImpl.java | 25 +++ .../manager/DataPlaneManagerImpl.java | 21 +- ...DataPlaneAuthorizationServiceImplTest.java | 21 ++ ...ltDataPlaneAccessTokenServiceImplTest.java | 58 +++++ .../manager/DataPlaneManagerImplTest.java | 211 ++++++++++-------- .../EndpointDataReferenceStoreReceiver.java | 6 + ...ndpointDataReferenceStoreReceiverTest.java | 4 +- .../edc/connector/dataplane/spi/DataFlow.java | 2 +- .../spi/iam/DataPlaneAccessTokenService.java | 10 + .../iam/DataPlaneAuthorizationService.java | 10 + .../spi/store/AccessTokenDataTestBase.java | 13 ++ .../SignalingEndToEndTransferTest.java | 39 +++- 14 files changed, 328 insertions(+), 107 deletions(-) diff --git a/core/control-plane/transfer-core/src/main/java/org/eclipse/edc/connector/transfer/listener/TransferProcessEventListener.java b/core/control-plane/transfer-core/src/main/java/org/eclipse/edc/connector/transfer/listener/TransferProcessEventListener.java index 750f2b53354..1745a04b941 100644 --- a/core/control-plane/transfer-core/src/main/java/org/eclipse/edc/connector/transfer/listener/TransferProcessEventListener.java +++ b/core/control-plane/transfer-core/src/main/java/org/eclipse/edc/connector/transfer/listener/TransferProcessEventListener.java @@ -95,19 +95,17 @@ public void completed(TransferProcess process) { } @Override - public void suspended(TransferProcess process) { - var event = TransferProcessSuspended.Builder.newInstance() + public void terminated(TransferProcess process) { + var event = withBaseProperties(TransferProcessTerminated.Builder.newInstance(), process) .reason(process.getErrorDetail()) - .transferProcessId(process.getId()) - .callbackAddresses(process.getCallbackAddresses()) .build(); publish(event); } @Override - public void terminated(TransferProcess process) { - var event = withBaseProperties(TransferProcessTerminated.Builder.newInstance(), process) + public void suspended(TransferProcess process) { + var event = withBaseProperties(TransferProcessSuspended.Builder.newInstance(), process) .reason(process.getErrorDetail()) .build(); diff --git a/core/data-plane/data-plane-core/src/main/java/org/eclipse/edc/connector/dataplane/framework/iam/DataPlaneAuthorizationServiceImpl.java b/core/data-plane/data-plane-core/src/main/java/org/eclipse/edc/connector/dataplane/framework/iam/DataPlaneAuthorizationServiceImpl.java index 47e6f14934e..b5f38c35c3b 100644 --- a/core/data-plane/data-plane-core/src/main/java/org/eclipse/edc/connector/dataplane/framework/iam/DataPlaneAuthorizationServiceImpl.java +++ b/core/data-plane/data-plane-core/src/main/java/org/eclipse/edc/connector/dataplane/framework/iam/DataPlaneAuthorizationServiceImpl.java @@ -83,6 +83,11 @@ public Result authorize(String token, Map requestDa .map(u -> accessTokenDataResult.getContent().dataAddress()); } + @Override + public Result revokeEndpointDataReference(String transferProcessId, String reason) { + return accessTokenService.revoke(transferProcessId, reason); + } + private Result createDataAddress(TokenRepresentation tokenRepresentation, Endpoint publicEndpoint) { var address = DataAddress.Builder.newInstance() .type(publicEndpoint.endpointType()) diff --git a/core/data-plane/data-plane-core/src/main/java/org/eclipse/edc/connector/dataplane/framework/iam/DefaultDataPlaneAccessTokenServiceImpl.java b/core/data-plane/data-plane-core/src/main/java/org/eclipse/edc/connector/dataplane/framework/iam/DefaultDataPlaneAccessTokenServiceImpl.java index bd4a3a5900b..5ae332147f5 100644 --- a/core/data-plane/data-plane-core/src/main/java/org/eclipse/edc/connector/dataplane/framework/iam/DefaultDataPlaneAccessTokenServiceImpl.java +++ b/core/data-plane/data-plane-core/src/main/java/org/eclipse/edc/connector/dataplane/framework/iam/DefaultDataPlaneAccessTokenServiceImpl.java @@ -22,6 +22,8 @@ import org.eclipse.edc.spi.iam.TokenParameters; import org.eclipse.edc.spi.iam.TokenRepresentation; import org.eclipse.edc.spi.monitor.Monitor; +import org.eclipse.edc.spi.query.Criterion; +import org.eclipse.edc.spi.query.QuerySpec; import org.eclipse.edc.spi.result.Result; import org.eclipse.edc.spi.types.domain.DataAddress; import org.eclipse.edc.token.spi.KeyIdDecorator; @@ -133,4 +135,27 @@ public Result resolve(String token) { Result.failure("AccessTokenData with ID '%s' does not exist.".formatted(tokenId)) : Result.success(existingAccessToken); } + + @Override + public Result revoke(String transferProcessId, String reason) { + + var query = QuerySpec.Builder.newInstance() + .filter(new Criterion("additionalProperties.process_id", "=", transferProcessId)) + .build(); + + var tokens = accessTokenDataStore.query(query); + return tokens.stream().map(this::deleteTokenData) + .reduce(Result::merge) + .orElseGet(() -> Result.failure("AccessTokenData associated to the transfer with ID '%s' does not exist.".formatted(transferProcessId))); + + } + + private Result deleteTokenData(AccessTokenData tokenData) { + var result = accessTokenDataStore.deleteById(tokenData.id()); + if (result.failed()) { + return Result.failure(result.getFailureDetail()); + } else { + return Result.success(); + } + } } diff --git a/core/data-plane/data-plane-core/src/main/java/org/eclipse/edc/connector/dataplane/framework/manager/DataPlaneManagerImpl.java b/core/data-plane/data-plane-core/src/main/java/org/eclipse/edc/connector/dataplane/framework/manager/DataPlaneManagerImpl.java index 76f5c6b3c51..afa933efb06 100644 --- a/core/data-plane/data-plane-core/src/main/java/org/eclipse/edc/connector/dataplane/framework/manager/DataPlaneManagerImpl.java +++ b/core/data-plane/data-plane-core/src/main/java/org/eclipse/edc/connector/dataplane/framework/manager/DataPlaneManagerImpl.java @@ -93,7 +93,7 @@ public Result start(DataFlowStartMessage startMessage) var response = DataFlowResponseMessage.Builder.newInstance() .dataAddress(result.getContent().orElse(null)) .build(); - + update(dataFlowBuilder.build()); return Result.success(response); @@ -106,20 +106,20 @@ public DataFlowStates getTransferState(String processId) { } @Override - public StatusResult terminate(String dataFlowId, @Nullable String reason) { + public StatusResult suspend(String dataFlowId) { return stop(dataFlowId) .map(dataFlow -> { - dataFlow.transitToTerminated(reason); + dataFlow.transitToSuspended(); store.save(dataFlow); return null; }); } @Override - public StatusResult suspend(String dataFlowId) { - return stop(dataFlowId) + public StatusResult terminate(String dataFlowId, @Nullable String reason) { + return stop(dataFlowId, reason) .map(dataFlow -> { - dataFlow.transitToSuspended(); + dataFlow.transitToTerminated(reason); store.save(dataFlow); return null; }); @@ -134,6 +134,10 @@ protected StateMachineManager.Builder configureStateMachineManager(StateMachineM } private StatusResult stop(String dataFlowId) { + return stop(dataFlowId, null); + } + + private StatusResult stop(String dataFlowId, String reason) { var result = store.findByIdAndLease(dataFlowId); if (result.failed()) { return StatusResult.from(result).map(it -> null); @@ -156,6 +160,11 @@ private StatusResult stop(String dataFlowId) { return StatusResult.failure(FATAL_ERROR, "DataFlow %s cannot be terminated: %s".formatted(dataFlowId, terminateResult.getFailureDetail())); } } + } else { + var revokeResult = authorizationService.revokeEndpointDataReference(dataFlowId, reason); + if (revokeResult.failed()) { + return StatusResult.failure(FATAL_ERROR, "DataFlow %s cannot be terminated: %s".formatted(dataFlowId, revokeResult.getFailureDetail())); + } } return StatusResult.success(dataFlow); diff --git a/core/data-plane/data-plane-core/src/test/java/org/eclipse/edc/connector/dataplane/framework/iam/DataPlaneAuthorizationServiceImplTest.java b/core/data-plane/data-plane-core/src/test/java/org/eclipse/edc/connector/dataplane/framework/iam/DataPlaneAuthorizationServiceImplTest.java index d4c19f1b29b..64403ebf60b 100644 --- a/core/data-plane/data-plane-core/src/test/java/org/eclipse/edc/connector/dataplane/framework/iam/DataPlaneAuthorizationServiceImplTest.java +++ b/core/data-plane/data-plane-core/src/test/java/org/eclipse/edc/connector/dataplane/framework/iam/DataPlaneAuthorizationServiceImplTest.java @@ -161,6 +161,27 @@ void authorize_accessNotGranted() { verifyNoMoreInteractions(accessTokenService, accessControlService); } + @Test + void revoke() { + when(accessTokenService.revoke(eq("id"), eq("reason"))).thenReturn(Result.success()); + + assertThat(authorizationService.revokeEndpointDataReference("id", "reason")).isSucceeded(); + + verify(accessTokenService).revoke(eq("id"), eq("reason")); + verifyNoMoreInteractions(accessTokenService, accessControlService); + } + + @Test + void revoke_error() { + when(accessTokenService.revoke(eq("id"), eq("reason"))).thenReturn(Result.failure("failure")); + + assertThat(authorizationService.revokeEndpointDataReference("id", "reason")).isFailed() + .detail().contains("failure"); + + verify(accessTokenService).revoke(eq("id"), eq("reason")); + verifyNoMoreInteractions(accessTokenService, accessControlService); + } + private DataFlowStartMessage.Builder createStartMessage() { return DataFlowStartMessage.Builder.newInstance() .processId("test-processid") diff --git a/core/data-plane/data-plane-core/src/test/java/org/eclipse/edc/connector/dataplane/framework/iam/DefaultDataPlaneAccessTokenServiceImplTest.java b/core/data-plane/data-plane-core/src/test/java/org/eclipse/edc/connector/dataplane/framework/iam/DefaultDataPlaneAccessTokenServiceImplTest.java index 86d04434964..91393880029 100644 --- a/core/data-plane/data-plane-core/src/test/java/org/eclipse/edc/connector/dataplane/framework/iam/DefaultDataPlaneAccessTokenServiceImplTest.java +++ b/core/data-plane/data-plane-core/src/test/java/org/eclipse/edc/connector/dataplane/framework/iam/DefaultDataPlaneAccessTokenServiceImplTest.java @@ -20,6 +20,8 @@ import org.eclipse.edc.spi.iam.ClaimToken; import org.eclipse.edc.spi.iam.TokenParameters; import org.eclipse.edc.spi.iam.TokenRepresentation; +import org.eclipse.edc.spi.query.Criterion; +import org.eclipse.edc.spi.query.QuerySpec; import org.eclipse.edc.spi.result.Result; import org.eclipse.edc.spi.result.StoreResult; import org.eclipse.edc.spi.types.domain.DataAddress; @@ -28,6 +30,7 @@ import org.eclipse.edc.token.spi.TokenValidationService; import org.junit.jupiter.api.Test; +import java.util.List; import java.util.Map; import java.util.regex.Pattern; @@ -39,6 +42,7 @@ import static org.mockito.ArgumentMatchers.argThat; import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verifyNoInteractions; import static org.mockito.Mockito.verifyNoMoreInteractions; @@ -180,4 +184,58 @@ void resolve_whenTokenIdNotFound() { verify(tokenValidationService).validate(eq("some-jwt"), any(), anyList()); verify(store).getById(eq(tokenId)); } + + + @Test + void revoke() { + var tokenId = "test-id"; + var processId = "tp-id"; + + var querySpec = QuerySpec.Builder.newInstance().filter(new Criterion("additionalProperties.process_id", "=", processId)).build(); + + var tokenData = new AccessTokenData(tokenId, ClaimToken.Builder.newInstance().build(), + DataAddress.Builder.newInstance().type("test-type").build()); + + when(store.query(querySpec)).thenReturn(List.of(tokenData)); + when(store.deleteById(tokenId)).thenReturn(StoreResult.success()); + + var result = accessTokenService.revoke("tp-id", "reason"); + assertThat(result).isSucceeded(); + + verify(store).deleteById(eq(tokenId)); + + } + + @Test + void revoke_storeError() { + var tokenId = "test-id"; + var processId = "tp-id"; + + var querySpec = QuerySpec.Builder.newInstance().filter(new Criterion("additionalProperties.process_id", "=", processId)).build(); + + var tokenData = new AccessTokenData(tokenId, ClaimToken.Builder.newInstance().build(), + DataAddress.Builder.newInstance().type("test-type").build()); + + when(store.query(querySpec)).thenReturn(List.of(tokenData)); + when(store.deleteById(tokenId)).thenReturn(StoreResult.generalError("storeError")); + + var result = accessTokenService.revoke("tp-id", "reason"); + assertThat(result).isFailed().detail().contains("storeError"); + + verify(store).deleteById(eq(tokenId)); + } + + @Test + void revoke_notTokensFound() { + var processId = "tp-id"; + var querySpec = QuerySpec.Builder.newInstance().filter(new Criterion("additionalProperties.process_id", "=", processId)).build(); + + when(store.query(querySpec)).thenReturn(List.of()); + + var result = accessTokenService.revoke("tp-id", "reason"); + assertThat(result).isFailed().detail().contains("AccessTokenData associated to the transfer with ID"); + + verify(store, never()).deleteById(any()); + + } } diff --git a/core/data-plane/data-plane-core/src/test/java/org/eclipse/edc/connector/dataplane/framework/manager/DataPlaneManagerImplTest.java b/core/data-plane/data-plane-core/src/test/java/org/eclipse/edc/connector/dataplane/framework/manager/DataPlaneManagerImplTest.java index 77ae364db74..8d68e86d34f 100644 --- a/core/data-plane/data-plane-core/src/test/java/org/eclipse/edc/connector/dataplane/framework/manager/DataPlaneManagerImplTest.java +++ b/core/data-plane/data-plane-core/src/test/java/org/eclipse/edc/connector/dataplane/framework/manager/DataPlaneManagerImplTest.java @@ -187,6 +187,33 @@ void terminate_shouldTerminateDataFlow() { verify(transferService).terminate(dataFlow); } + @Test + void terminate_shouldTerminatePullDataFlow() { + var dataFlow = dataFlowBuilder().state(RECEIVED.code()).id("dataFlowId").flowType(FlowType.PULL).build(); + when(store.findByIdAndLease(dataFlow.getId())).thenReturn(StoreResult.success(dataFlow)); + when(authorizationService.revokeEndpointDataReference(dataFlow.getId(), null)).thenReturn(Result.success()); + + var result = manager.terminate(dataFlow.getId(), null); + + assertThat(result).isSucceeded(); + verify(store).save(argThat(d -> d.getState() == TERMINATED.code())); + verify(authorizationService).revokeEndpointDataReference(dataFlow.getId(), null); + } + + @Test + void terminate_shouldFailToTerminatePullDataFlow_whenRevocationFails() { + var dataFlow = dataFlowBuilder().state(RECEIVED.code()).id("dataFlowId").flowType(FlowType.PULL).build(); + when(store.findByIdAndLease(dataFlow.getId())).thenReturn(StoreResult.success(dataFlow)); + when(authorizationService.revokeEndpointDataReference(dataFlow.getId(), null)).thenReturn(Result.failure("failure")); + + var result = manager.terminate(dataFlow.getId(), null); + + assertThat(result).isFailed(); + verify(store, never()).save(any()); + verify(authorizationService).revokeEndpointDataReference(dataFlow.getId(), null); + } + + @Test void terminate_shouldTerminateDataFlow_withReason() { var dataFlow = dataFlowBuilder().state(RECEIVED.code()).build(); @@ -262,98 +289,6 @@ void terminate_shouldStillTerminate_whenDataFlowHasNoSource() { verify(store).save(argThat(f -> f.getProperties().containsKey(TERMINATION_REASON))); } - @Nested - class Suspend { - @Test - void shouldSuspendDataFlow() { - var dataFlow = dataFlowBuilder().state(RECEIVED.code()).build(); - when(store.findByIdAndLease("dataFlowId")).thenReturn(StoreResult.success(dataFlow)); - when(registry.resolveTransferService(any())).thenReturn(transferService); - when(transferService.terminate(any())).thenReturn(StreamResult.success()); - - var result = manager.suspend("dataFlowId"); - - assertThat(result).isSucceeded(); - verify(store).save(argThat(d -> d.getState() == SUSPENDED.code())); - verify(transferService).terminate(dataFlow); - } - - @Test - void shouldSuspendDataFlow_withReason() { - var dataFlow = dataFlowBuilder().state(RECEIVED.code()).build(); - when(store.findByIdAndLease("dataFlowId")).thenReturn(StoreResult.success(dataFlow)); - when(registry.resolveTransferService(any())).thenReturn(transferService); - when(transferService.terminate(any())).thenReturn(StreamResult.success()); - - var result = manager.suspend("dataFlowId"); - - assertThat(result).isSucceeded(); - verify(store).save(argThat(d -> d.getState() == SUSPENDED.code())); - verify(transferService).terminate(dataFlow); - } - - @Test - void shouldReturnFatalError_whenDataFlowDoesNotExist() { - when(store.findByIdAndLease("dataFlowId")).thenReturn(StoreResult.notFound("not found")); - - var result = manager.suspend("dataFlowId"); - - assertThat(result).isFailed().extracting(ResponseFailure::status).isEqualTo(FATAL_ERROR); - verify(store, never()).save(any()); - verifyNoInteractions(transferService); - } - - @Test - void shouldReturnRetryError_whenEntityCannotBeLeased() { - when(store.findByIdAndLease("dataFlowId")).thenReturn(StoreResult.alreadyLeased("already leased")); - - var result = manager.suspend("dataFlowId"); - - assertThat(result).isFailed().extracting(ResponseFailure::status).isEqualTo(ERROR_RETRY); - verify(store, never()).save(any()); - verifyNoInteractions(transferService); - } - - @Test - void shouldReturnFatalError_whenTransferServiceNotFound() { - var dataFlow = dataFlowBuilder().state(RECEIVED.code()).build(); - when(store.findByIdAndLease("dataFlowId")).thenReturn(StoreResult.success(dataFlow)); - when(registry.resolveTransferService(any())).thenReturn(null); - - var result = manager.suspend("dataFlowId"); - - assertThat(result).isFailed().extracting(ResponseFailure::status).isEqualTo(FATAL_ERROR); - verify(store, never()).save(any()); - verifyNoInteractions(transferService); - } - - @Test - void shouldReturnFatalError_whenDataFlowCannotBeSuspended() { - var dataFlow = dataFlowBuilder().state(RECEIVED.code()).build(); - when(store.findByIdAndLease("dataFlowId")).thenReturn(StoreResult.success(dataFlow)); - when(registry.resolveTransferService(any())).thenReturn(transferService); - when(transferService.terminate(any())).thenReturn(StreamResult.error("cannot be suspended")); - - var result = manager.suspend("dataFlowId"); - - assertThat(result).isFailed().extracting(ResponseFailure::status).isEqualTo(FATAL_ERROR); - verify(store, never()).save(any()); - } - - @Test - void shouldStillSuspend_whenDataFlowHasNoSource() { - var dataFlow = dataFlowBuilder().state(RECEIVED.code()).build(); - when(store.findByIdAndLease("dataFlowId")).thenReturn(StoreResult.success(dataFlow)); - when(registry.resolveTransferService(any())).thenReturn(transferService); - when(transferService.terminate(any())).thenReturn(StreamResult.notFound()); - - var result = manager.suspend("dataFlowId"); - - assertThat(result).isSucceeded(); - verify(store).save(argThat(f -> f.getState() == SUSPENDED.code())); - } - } - @Test void received_shouldStartTransferTransitionAndTransitionToStarted() { var dataFlow = dataFlowBuilder().state(RECEIVED.code()).build(); @@ -537,4 +472,96 @@ private DataFlowStartMessage createRequest() { .build(); } + @Nested + class Suspend { + @Test + void shouldSuspendDataFlow() { + var dataFlow = dataFlowBuilder().state(RECEIVED.code()).build(); + when(store.findByIdAndLease("dataFlowId")).thenReturn(StoreResult.success(dataFlow)); + when(registry.resolveTransferService(any())).thenReturn(transferService); + when(transferService.terminate(any())).thenReturn(StreamResult.success()); + + var result = manager.suspend("dataFlowId"); + + assertThat(result).isSucceeded(); + verify(store).save(argThat(d -> d.getState() == SUSPENDED.code())); + verify(transferService).terminate(dataFlow); + } + + @Test + void shouldSuspendDataFlow_withReason() { + var dataFlow = dataFlowBuilder().state(RECEIVED.code()).build(); + when(store.findByIdAndLease("dataFlowId")).thenReturn(StoreResult.success(dataFlow)); + when(registry.resolveTransferService(any())).thenReturn(transferService); + when(transferService.terminate(any())).thenReturn(StreamResult.success()); + + var result = manager.suspend("dataFlowId"); + + assertThat(result).isSucceeded(); + verify(store).save(argThat(d -> d.getState() == SUSPENDED.code())); + verify(transferService).terminate(dataFlow); + } + + @Test + void shouldReturnFatalError_whenDataFlowDoesNotExist() { + when(store.findByIdAndLease("dataFlowId")).thenReturn(StoreResult.notFound("not found")); + + var result = manager.suspend("dataFlowId"); + + assertThat(result).isFailed().extracting(ResponseFailure::status).isEqualTo(FATAL_ERROR); + verify(store, never()).save(any()); + verifyNoInteractions(transferService); + } + + @Test + void shouldReturnRetryError_whenEntityCannotBeLeased() { + when(store.findByIdAndLease("dataFlowId")).thenReturn(StoreResult.alreadyLeased("already leased")); + + var result = manager.suspend("dataFlowId"); + + assertThat(result).isFailed().extracting(ResponseFailure::status).isEqualTo(ERROR_RETRY); + verify(store, never()).save(any()); + verifyNoInteractions(transferService); + } + + @Test + void shouldReturnFatalError_whenTransferServiceNotFound() { + var dataFlow = dataFlowBuilder().state(RECEIVED.code()).build(); + when(store.findByIdAndLease("dataFlowId")).thenReturn(StoreResult.success(dataFlow)); + when(registry.resolveTransferService(any())).thenReturn(null); + + var result = manager.suspend("dataFlowId"); + + assertThat(result).isFailed().extracting(ResponseFailure::status).isEqualTo(FATAL_ERROR); + verify(store, never()).save(any()); + verifyNoInteractions(transferService); + } + + @Test + void shouldReturnFatalError_whenDataFlowCannotBeSuspended() { + var dataFlow = dataFlowBuilder().state(RECEIVED.code()).build(); + when(store.findByIdAndLease("dataFlowId")).thenReturn(StoreResult.success(dataFlow)); + when(registry.resolveTransferService(any())).thenReturn(transferService); + when(transferService.terminate(any())).thenReturn(StreamResult.error("cannot be suspended")); + + var result = manager.suspend("dataFlowId"); + + assertThat(result).isFailed().extracting(ResponseFailure::status).isEqualTo(FATAL_ERROR); + verify(store, never()).save(any()); + } + + @Test + void shouldStillSuspend_whenDataFlowHasNoSource() { + var dataFlow = dataFlowBuilder().state(RECEIVED.code()).build(); + when(store.findByIdAndLease("dataFlowId")).thenReturn(StoreResult.success(dataFlow)); + when(registry.resolveTransferService(any())).thenReturn(transferService); + when(transferService.terminate(any())).thenReturn(StreamResult.notFound()); + + var result = manager.suspend("dataFlowId"); + + assertThat(result).isSucceeded(); + verify(store).save(argThat(f -> f.getState() == SUSPENDED.code())); + } + } + } diff --git a/extensions/control-plane/edr/edr-store-receiver/src/main/java/org/eclipse/edc/connector/edr/store/receiver/EndpointDataReferenceStoreReceiver.java b/extensions/control-plane/edr/edr-store-receiver/src/main/java/org/eclipse/edc/connector/edr/store/receiver/EndpointDataReferenceStoreReceiver.java index b7f6547954a..ea504363157 100644 --- a/extensions/control-plane/edr/edr-store-receiver/src/main/java/org/eclipse/edc/connector/edr/store/receiver/EndpointDataReferenceStoreReceiver.java +++ b/extensions/control-plane/edr/edr-store-receiver/src/main/java/org/eclipse/edc/connector/edr/store/receiver/EndpointDataReferenceStoreReceiver.java @@ -20,6 +20,7 @@ import org.eclipse.edc.connector.transfer.spi.event.TransferProcessCompleted; import org.eclipse.edc.connector.transfer.spi.event.TransferProcessEvent; import org.eclipse.edc.connector.transfer.spi.event.TransferProcessStarted; +import org.eclipse.edc.connector.transfer.spi.event.TransferProcessSuspended; import org.eclipse.edc.connector.transfer.spi.event.TransferProcessTerminated; import org.eclipse.edc.connector.transfer.spi.types.TransferProcess; import org.eclipse.edc.edr.spi.store.EndpointDataReferenceStore; @@ -59,6 +60,7 @@ public EndpointDataReferenceStoreReceiver(EndpointDataReferenceStore dataReferen this.monitor = monitor; registerHandler(TransferProcessStarted.class, this::handleTransferStarted); registerHandler(TransferProcessTerminated.class, this::handleTransferTerminated); + registerHandler(TransferProcessSuspended.class, this::handleTransferSuspended); registerHandler(TransferProcessCompleted.class, this::handleTransferCompleted); } @@ -124,6 +126,10 @@ private Result handleTransferTerminated(TransferProcessTerminated transfer return removeCachedEdr(transferProcessTerminated.getTransferProcessId()); } + private Result handleTransferSuspended(TransferProcessSuspended transferProcessSuspended) { + return removeCachedEdr(transferProcessSuspended.getTransferProcessId()); + } + private Result handleTransferCompleted(TransferProcessCompleted transferProcessCompleted) { return removeCachedEdr(transferProcessCompleted.getTransferProcessId()); } diff --git a/extensions/control-plane/edr/edr-store-receiver/src/test/java/org/eclipse/edc/connector/edr/store/receiver/EndpointDataReferenceStoreReceiverTest.java b/extensions/control-plane/edr/edr-store-receiver/src/test/java/org/eclipse/edc/connector/edr/store/receiver/EndpointDataReferenceStoreReceiverTest.java index db917cffefa..893f5a36fa1 100644 --- a/extensions/control-plane/edr/edr-store-receiver/src/test/java/org/eclipse/edc/connector/edr/store/receiver/EndpointDataReferenceStoreReceiverTest.java +++ b/extensions/control-plane/edr/edr-store-receiver/src/test/java/org/eclipse/edc/connector/edr/store/receiver/EndpointDataReferenceStoreReceiverTest.java @@ -26,6 +26,7 @@ import org.eclipse.edc.connector.transfer.spi.event.TransferProcessProvisioningRequested; import org.eclipse.edc.connector.transfer.spi.event.TransferProcessRequested; import org.eclipse.edc.connector.transfer.spi.event.TransferProcessStarted; +import org.eclipse.edc.connector.transfer.spi.event.TransferProcessSuspended; import org.eclipse.edc.connector.transfer.spi.event.TransferProcessTerminated; import org.eclipse.edc.edr.spi.store.EndpointDataReferenceStore; import org.eclipse.edc.edr.spi.types.EndpointDataReferenceEntry; @@ -219,7 +220,8 @@ public Stream provideArguments(ExtensionContext context) { var eventBuilders = Stream.of( TransferProcessTerminated.Builder.newInstance(), - TransferProcessCompleted.Builder.newInstance() + TransferProcessCompleted.Builder.newInstance(), + TransferProcessSuspended.Builder.newInstance() ); return eventBuilders diff --git a/spi/data-plane/data-plane-spi/src/main/java/org/eclipse/edc/connector/dataplane/spi/DataFlow.java b/spi/data-plane/data-plane-spi/src/main/java/org/eclipse/edc/connector/dataplane/spi/DataFlow.java index d7bc60e1581..0cc84537c9c 100644 --- a/spi/data-plane/data-plane-spi/src/main/java/org/eclipse/edc/connector/dataplane/spi/DataFlow.java +++ b/spi/data-plane/data-plane-spi/src/main/java/org/eclipse/edc/connector/dataplane/spi/DataFlow.java @@ -48,7 +48,7 @@ public class DataFlow extends StatefulEntity { private URI callbackAddress; private Map properties = new HashMap<>(); - private FlowType flowType = FlowType.PULL; + private FlowType flowType = FlowType.PUSH; @Override public DataFlow copy() { diff --git a/spi/data-plane/data-plane-spi/src/main/java/org/eclipse/edc/connector/dataplane/spi/iam/DataPlaneAccessTokenService.java b/spi/data-plane/data-plane-spi/src/main/java/org/eclipse/edc/connector/dataplane/spi/iam/DataPlaneAccessTokenService.java index fae72dc24e9..28e63b6bc4a 100644 --- a/spi/data-plane/data-plane-spi/src/main/java/org/eclipse/edc/connector/dataplane/spi/iam/DataPlaneAccessTokenService.java +++ b/spi/data-plane/data-plane-spi/src/main/java/org/eclipse/edc/connector/dataplane/spi/iam/DataPlaneAccessTokenService.java @@ -51,4 +51,14 @@ public interface DataPlaneAccessTokenService { * @return A {@link AccessTokenData} that contains the original claims and the data resource ({@link DataAddress}. If the token could not be restored, a failure is returned. */ Result resolve(String token); + + /** + * Revokes the {@link AccessTokenData} associated with the id + * + * @param transferProcessId The id of the {@link AccessTokenData} + * @param reason The reason for the revocation + * @return Success if revoked, failure otherwise + */ + Result revoke(String transferProcessId, String reason); + } diff --git a/spi/data-plane/data-plane-spi/src/main/java/org/eclipse/edc/connector/dataplane/spi/iam/DataPlaneAuthorizationService.java b/spi/data-plane/data-plane-spi/src/main/java/org/eclipse/edc/connector/dataplane/spi/iam/DataPlaneAuthorizationService.java index b14358b763f..b3637161deb 100644 --- a/spi/data-plane/data-plane-spi/src/main/java/org/eclipse/edc/connector/dataplane/spi/iam/DataPlaneAuthorizationService.java +++ b/spi/data-plane/data-plane-spi/src/main/java/org/eclipse/edc/connector/dataplane/spi/iam/DataPlaneAuthorizationService.java @@ -78,4 +78,14 @@ public interface DataPlaneAuthorizationService { * @return The {@link DataAddress} that was encapsulated in the original {@link DataFlowStartMessage} */ Result authorize(String token, Map requestData); + + + /** + * Revokes the {@link DataAddress} created with {@link #createEndpointDataReference(DataFlowStartMessage)} + * + * @param transferProcessId The id of the transfer process associated to the {@link DataAddress} + * @param reason The reason of the revocation + * @return Successful if revoked, fails otherwise + */ + Result revokeEndpointDataReference(String transferProcessId, String reason); } diff --git a/spi/data-plane/data-plane-spi/src/testFixtures/java/org/eclipse/edc/connector/dataplane/spi/store/AccessTokenDataTestBase.java b/spi/data-plane/data-plane-spi/src/testFixtures/java/org/eclipse/edc/connector/dataplane/spi/store/AccessTokenDataTestBase.java index ebd42091b6f..6189d4c18d7 100644 --- a/spi/data-plane/data-plane-spi/src/testFixtures/java/org/eclipse/edc/connector/dataplane/spi/store/AccessTokenDataTestBase.java +++ b/spi/data-plane/data-plane-spi/src/testFixtures/java/org/eclipse/edc/connector/dataplane/spi/store/AccessTokenDataTestBase.java @@ -104,6 +104,19 @@ void query_byDataAddressProperty() { .containsExactly(atd); } + @Test + void query_byAdditionalProperty() { + var ct = ClaimToken.Builder.newInstance().claim("foo", "bar").build(); + var atd = new AccessTokenData("test-id", ct, DataAddress.Builder.newInstance().type("foo-type").property("qux", "quz").build(), Map.of("participant_id", "participantId")); + getStore().store(atd); + getStore().store(new AccessTokenData("another-id", ClaimToken.Builder.newInstance().build(), DataAddress.Builder.newInstance().type("foo-type").build())); + + assertThat(getStore().query(QuerySpec.Builder.newInstance().filter(new Criterion("additionalProperties.participant_id", "=", "participantId")).build())) + .hasSize(1) + .usingRecursiveFieldByFieldElementComparator() + .containsExactly(atd); + } + @Test void query_byMultipleCriteria() { var ct = ClaimToken.Builder.newInstance().claim("foo", "bar").build(); diff --git a/system-tests/e2e-transfer-test/runner/src/test/java/org/eclipse/edc/test/e2e/signaling/SignalingEndToEndTransferTest.java b/system-tests/e2e-transfer-test/runner/src/test/java/org/eclipse/edc/test/e2e/signaling/SignalingEndToEndTransferTest.java index ac7b5a684e9..430630a1ffc 100644 --- a/system-tests/e2e-transfer-test/runner/src/test/java/org/eclipse/edc/test/e2e/signaling/SignalingEndToEndTransferTest.java +++ b/system-tests/e2e-transfer-test/runner/src/test/java/org/eclipse/edc/test/e2e/signaling/SignalingEndToEndTransferTest.java @@ -163,9 +163,46 @@ void httpPull_dataTransfer_withEdrCache() { await().atMost(timeout).untilAsserted(() -> CONSUMER.pullData(edr.get(), Map.of("message", msg), equalTo(msg))); // checks that the EDR is gone once the contract expires + await().atMost(timeout).untilAsserted(() -> assertThatThrownBy(() -> CONSUMER.getEdr(transferProcessId))); + + // checks that transfer fails + await().atMost(timeout).untilAsserted(() -> assertThatThrownBy(() -> CONSUMER.pullData(edr.get(), Map.of("message", msg), equalTo(msg)))); + } + + @Test + void suspend_httpPull_dataTransfer_withEdrCache() { + var assetId = UUID.randomUUID().toString(); + createResourcesOnProvider(assetId, noConstraintPolicy(), httpDataAddressProperties()); + var dynamicReceiverProps = CONSUMER.dynamicReceiverPrivateProperties(); + + + var transferProcessId = CONSUMER.requestAsset(PROVIDER, assetId, dynamicReceiverProps, syncDataAddress(), "HttpData-PULL"); + await().atMost(timeout).untilAsserted(() -> { - assertThatThrownBy(() -> CONSUMER.getEdr(transferProcessId)); + var state = CONSUMER.getTransferProcessState(transferProcessId); + assertThat(state).isEqualTo(STARTED.name()); }); + + var edr = new AtomicReference(); + + // fetch the EDR from the cache + await().atMost(timeout).untilAsserted(() -> { + var returnedEdr = CONSUMER.getEdr(transferProcessId); + assertThat(returnedEdr).isNotNull(); + edr.set(returnedEdr); + }); + + // Do the transfer + var msg = UUID.randomUUID().toString(); + await().atMost(timeout).untilAsserted(() -> CONSUMER.pullData(edr.get(), Map.of("message", msg), equalTo(msg))); + + CONSUMER.suspendTransfer(transferProcessId, "supension"); + + // checks that the EDR is gone once the transfer has been suspended + await().atMost(timeout).untilAsserted(() -> assertThatThrownBy(() -> CONSUMER.getEdr(transferProcessId))); + + // checks that transfer fails + await().atMost(timeout).untilAsserted(() -> assertThatThrownBy(() -> CONSUMER.pullData(edr.get(), Map.of("message", msg), equalTo(msg)))); } @Test