From 2b205c770c53c1657f590bbd2022cc69a15a6522 Mon Sep 17 00:00:00 2001 From: Ray Liu Date: Tue, 10 Dec 2024 15:47:00 +1100 Subject: [PATCH 1/5] add deprecated state for state api --- .../workflow_manager/viewsets/state.py | 59 +++++++++++++------ 1 file changed, 40 insertions(+), 19 deletions(-) diff --git a/lib/workload/stateless/stacks/workflow-manager/workflow_manager/viewsets/state.py b/lib/workload/stateless/stacks/workflow-manager/workflow_manager/viewsets/state.py index 096fd134f..700ff4767 100644 --- a/lib/workload/stateless/stacks/workflow-manager/workflow_manager/viewsets/state.py +++ b/lib/workload/stateless/stacks/workflow-manager/workflow_manager/viewsets/state.py @@ -14,31 +14,46 @@ class StateViewSet(mixins.CreateModelMixin, mixins.UpdateModelMixin, mixins.List orcabus_id_prefix = State.orcabus_id_prefix http_method_names = ['get', 'post', 'patch'] pagination_class = None + + """ + valid_states_map for state creation, update + refer: + "Resolved" -- https://github.com/umccr/orcabus/issues/593 + "Deprecated" -- https://github.com/umccr/orcabus/issues/695 + """ + valid_states_map = { + 'RESOLVED': ['FAILED'], + 'DEPRECATED': ['SUCCEED'] + } def get_queryset(self): return State.objects.filter(workflow_run=self.kwargs["orcabus_id"]) def create(self, request, *args, **kwargs): + """ + Create a customed new state for a workflow run. + Currently we support "Resolved", "Deprecated" + """ wfr_orcabus_id = self.kwargs.get("orcabus_id") workflow_run = WorkflowRun.objects.get(orcabus_id=wfr_orcabus_id) - # Check if the workflow run has a "Failed" or "Aborted" state latest_state = workflow_run.get_latest_state() - if latest_state.status not in ["FAILED"]: - return Response({"detail": "Can only create 'Resolved' state for workflow runs with 'Failed' states."}, + if not latest_state: + return Response({"detail": "No state found for workflow run '{}'".format(wfr_orcabus_id)}, status=status.HTTP_400_BAD_REQUEST) - - # Check if the new state is "Resolved" - if request.data.get('status', '').upper() != "RESOLVED": - return Response({"detail": "Can only create 'Resolved' state."}, + latest_status = latest_state.status + request_status = request.data.get('status', '').upper() + + # check if the state status is valid + if not self.check_state_status(latest_status, request_status): + return Response({"detail": "Invalid state request. Can't add state '{}' to '{}'".format(request_status, latest_status)}, status=status.HTTP_400_BAD_REQUEST) - # comment is required when status is "Resolved" + # comment is required when request change state if not request.data.get('comment'): - return Response({"detail": "Comment is required when status is 'Resolved'."}, + return Response({"detail": "Comment is required when request status is '{}'".format(request_status)}, status=status.HTTP_400_BAD_REQUEST) - # Prepare data for serializer data = request.data.copy() data['timestamp'] = timezone.now() @@ -46,20 +61,17 @@ def create(self, request, *args, **kwargs): serializer = self.get_serializer(data=data) serializer.is_valid(raise_exception=True) - self.perform_create(serializer) + serializer.save() headers = self.get_success_headers(serializer.data) return Response(serializer.data, status=status.HTTP_201_CREATED, headers=headers) - def perform_create(self, serializer): - serializer.save(workflow_run_id=self.kwargs["orcabus_id"], status="RESOLVED") - def update(self, request, *args, **kwargs): partial = kwargs.pop('partial', False) instance = self.get_object() # Check if the state being updated is "Resolved" - if instance.status != "RESOLVED": - return Response({"detail": "Can only update 'Resolved' state records."}, + if instance.status not in self.valid_states_map: + return Response({"detail": "Invalid state status."}, status=status.HTTP_400_BAD_REQUEST) # Check if only the comment field is being updated @@ -69,7 +81,7 @@ def update(self, request, *args, **kwargs): serializer = self.get_serializer(instance, data=request.data, partial=partial) serializer.is_valid(raise_exception=True) - self.perform_update(serializer) + serializer.save() if getattr(instance, '_prefetched_objects_cache', None): # If 'prefetch_related' has been applied to a queryset, we need to @@ -78,5 +90,14 @@ def update(self, request, *args, **kwargs): return Response(serializer.data) - def perform_update(self, serializer): - serializer.save(status="RESOLVED") + + def check_state_status(self, current_status, request_status): + """ + check if the state status is valid: + valid_states_map[request_state] == current_state.status + """ + if request_status not in self.valid_states_map: + return False + if current_status not in self.valid_states_map[request_status]: + return False + return True \ No newline at end of file From be82d91fd7e982d522383fb3fba132b4945b5412 Mon Sep 17 00:00:00 2001 From: Ray Liu Date: Tue, 10 Dec 2024 17:00:01 +1100 Subject: [PATCH 2/5] add deprecated in stats api --- .../workflow_manager/serializers/workflow_run.py | 1 + .../workflow-manager/workflow_manager/viewsets/state.py | 5 +++-- .../workflow_manager/viewsets/workflow_run_stats.py | 9 ++++++++- 3 files changed, 12 insertions(+), 3 deletions(-) diff --git a/lib/workload/stateless/stacks/workflow-manager/workflow_manager/serializers/workflow_run.py b/lib/workload/stateless/stacks/workflow-manager/workflow_manager/serializers/workflow_run.py index fcd4502c6..c1592a8bf 100644 --- a/lib/workload/stateless/stacks/workflow-manager/workflow_manager/serializers/workflow_run.py +++ b/lib/workload/stateless/stacks/workflow-manager/workflow_manager/serializers/workflow_run.py @@ -59,6 +59,7 @@ class WorkflowRunCountByStatusSerializer(serializers.Serializer): failed = serializers.IntegerField() resolved = serializers.IntegerField() ongoing = serializers.IntegerField() + deprecated = serializers.IntegerField() def update(self, instance, validated_data): pass diff --git a/lib/workload/stateless/stacks/workflow-manager/workflow_manager/viewsets/state.py b/lib/workload/stateless/stacks/workflow-manager/workflow_manager/viewsets/state.py index 700ff4767..b208bae52 100644 --- a/lib/workload/stateless/stacks/workflow-manager/workflow_manager/viewsets/state.py +++ b/lib/workload/stateless/stacks/workflow-manager/workflow_manager/viewsets/state.py @@ -23,7 +23,7 @@ class StateViewSet(mixins.CreateModelMixin, mixins.UpdateModelMixin, mixins.List """ valid_states_map = { 'RESOLVED': ['FAILED'], - 'DEPRECATED': ['SUCCEED'] + 'DEPRECATED': ['SUCCEEDED'] } def get_queryset(self): @@ -58,7 +58,8 @@ def create(self, request, *args, **kwargs): data = request.data.copy() data['timestamp'] = timezone.now() data['workflow_run'] = wfr_orcabus_id - + data['status'] = request_status + serializer = self.get_serializer(data=data) serializer.is_valid(raise_exception=True) serializer.save() diff --git a/lib/workload/stateless/stacks/workflow-manager/workflow_manager/viewsets/workflow_run_stats.py b/lib/workload/stateless/stacks/workflow-manager/workflow_manager/viewsets/workflow_run_stats.py index 42c68527d..11c2d7ef7 100644 --- a/lib/workload/stateless/stacks/workflow-manager/workflow_manager/viewsets/workflow_run_stats.py +++ b/lib/workload/stateless/stacks/workflow-manager/workflow_manager/viewsets/workflow_run_stats.py @@ -69,7 +69,8 @@ def exclude_params(params): ~Q(states__status="FAILED") & ~Q(states__status="ABORTED") & ~Q(states__status="SUCCEEDED") & - ~Q(states__status="RESOLVED") + ~Q(states__status="RESOLVED") & + ~Q(states__status="DEPRECATED") ) if status: @@ -131,6 +132,11 @@ def count_by_status(self, request): states__status="RESOLVED" ).count() + deprecated_count = annotate_queryset.filter( + states__timestamp=F('latest_state_time'), + states__status="DEPRECATED" + ).count() + ongoing_count = base_queryset.filter( ~Q(states__status="FAILED") & ~Q(states__status="ABORTED") & @@ -143,6 +149,7 @@ def count_by_status(self, request): 'aborted': aborted_count, 'failed': failed_count, 'resolved': resolved_count, + 'deprecated': deprecated_count, 'ongoing': ongoing_count }, status=200) From 4bf33f15da284dd195371b4cffe59a9b46a75d8d Mon Sep 17 00:00:00 2001 From: Ray Liu Date: Wed, 11 Dec 2024 11:32:00 +1100 Subject: [PATCH 3/5] add api for rerun/deprecated vlidation --- .../workflow_manager/serializers/workflow_run.py | 2 +- .../serializers/workflow_run_action.py | 5 ++++- .../workflow_manager/urls/base.py | 3 +-- .../workflow_manager/viewsets/state.py | 9 ++++++++- .../viewsets/workflow_run_action.py | 15 +++++++++++++-- 5 files changed, 27 insertions(+), 7 deletions(-) diff --git a/lib/workload/stateless/stacks/workflow-manager/workflow_manager/serializers/workflow_run.py b/lib/workload/stateless/stacks/workflow-manager/workflow_manager/serializers/workflow_run.py index c1592a8bf..a8fce4aa2 100644 --- a/lib/workload/stateless/stacks/workflow-manager/workflow_manager/serializers/workflow_run.py +++ b/lib/workload/stateless/stacks/workflow-manager/workflow_manager/serializers/workflow_run.py @@ -19,7 +19,7 @@ def get_current_state(self, obj) -> dict: class WorkflowRunListParamSerializer(OptionalFieldsMixin, WorkflowRunBaseSerializer): class Meta: model = WorkflowRun - fields = "__all__" + fields = ["orcabus_id", "workflow", "analysis_run", "workflow_run_name", "portal_run_id", "execution_id", "comment",] class WorkflowRunSerializer(WorkflowRunBaseSerializer): from .workflow import WorkflowMinSerializer diff --git a/lib/workload/stateless/stacks/workflow-manager/workflow_manager/serializers/workflow_run_action.py b/lib/workload/stateless/stacks/workflow-manager/workflow_manager/serializers/workflow_run_action.py index 568904a9a..d57f7cc66 100644 --- a/lib/workload/stateless/stacks/workflow-manager/workflow_manager/serializers/workflow_run_action.py +++ b/lib/workload/stateless/stacks/workflow-manager/workflow_manager/serializers/workflow_run_action.py @@ -6,7 +6,10 @@ class AllowedRerunWorkflow(StrEnum): RNASUM = "rnasum" - + +class AllowedRerunWorkflowSerializer(serializers.Serializer): + is_valid = serializers.BooleanField() + valid_workflows = serializers.ListField(child=serializers.CharField()) class BaseRerunInputSerializer(serializers.Serializer): diff --git a/lib/workload/stateless/stacks/workflow-manager/workflow_manager/urls/base.py b/lib/workload/stateless/stacks/workflow-manager/workflow_manager/urls/base.py index da1315d52..c83584485 100644 --- a/lib/workload/stateless/stacks/workflow-manager/workflow_manager/urls/base.py +++ b/lib/workload/stateless/stacks/workflow-manager/workflow_manager/urls/base.py @@ -20,7 +20,7 @@ router = OptionalSlashDefaultRouter() -router.register("workflowrun/stats", WorkflowRunStatsViewSet, basename="workflowrun_list_all") # put it before workflowrun, as it will match the workflowrun/list_all/ url +router.register(r"workflowrun/stats", WorkflowRunStatsViewSet, basename="workflowrun_list_all") # put it before workflowrun, as it will match the workflowrun/list_all/ url router.register(r"analysis", AnalysisViewSet, basename="analysis") router.register(r"analysisrun", AnalysisRunViewSet, basename="analysisrun") router.register(r"analysiscontext", AnalysisContextViewSet, basename="analysiscontext") @@ -29,7 +29,6 @@ router.register(r"workflowrun", WorkflowRunActionViewSet, basename="workflowrun-action") router.register(r"payload", PayloadViewSet, basename="payload") -# may no longer need this as it's currently included in the detail response for an individual WorkflowRun record router.register( "workflowrun/(?P[^/.]+)/state", StateViewSet, diff --git a/lib/workload/stateless/stacks/workflow-manager/workflow_manager/viewsets/state.py b/lib/workload/stateless/stacks/workflow-manager/workflow_manager/viewsets/state.py index b208bae52..93bd9d631 100644 --- a/lib/workload/stateless/stacks/workflow-manager/workflow_manager/viewsets/state.py +++ b/lib/workload/stateless/stacks/workflow-manager/workflow_manager/viewsets/state.py @@ -1,4 +1,6 @@ - +from drf_spectacular.types import OpenApiTypes +from drf_spectacular.utils import extend_schema, PolymorphicProxySerializer +from rest_framework.decorators import action from rest_framework import mixins, status from rest_framework.response import Response from rest_framework.viewsets import GenericViewSet @@ -29,6 +31,11 @@ class StateViewSet(mixins.CreateModelMixin, mixins.UpdateModelMixin, mixins.List def get_queryset(self): return State.objects.filter(workflow_run=self.kwargs["orcabus_id"]) + @extend_schema(responses=OpenApiTypes.OBJECT, description="Valid states map for new state creation, update") + @action(detail=False, methods=['get'], url_name='valid_states_map', url_path='valid_states_map') + def get_valid_states_map(self, request, **kwargs): + return Response(self.valid_states_map) + def create(self, request, *args, **kwargs): """ Create a customed new state for a workflow run. diff --git a/lib/workload/stateless/stacks/workflow-manager/workflow_manager/viewsets/workflow_run_action.py b/lib/workload/stateless/stacks/workflow-manager/workflow_manager/viewsets/workflow_run_action.py index d05e47c32..9440cd705 100644 --- a/lib/workload/stateless/stacks/workflow-manager/workflow_manager/viewsets/workflow_run_action.py +++ b/lib/workload/stateless/stacks/workflow-manager/workflow_manager/viewsets/workflow_run_action.py @@ -15,7 +15,7 @@ from workflow_manager.models.utils import create_portal_run_id from workflow_manager.serializers.library import LibrarySerializer from workflow_manager.serializers.payload import PayloadSerializer -from workflow_manager.serializers.workflow_run_action import AllowedRerunWorkflow, RERUN_INPUT_SERIALIZERS +from workflow_manager.serializers.workflow_run_action import AllowedRerunWorkflow, RERUN_INPUT_SERIALIZERS, AllowedRerunWorkflowSerializer from workflow_manager.models import ( WorkflowRun, State, @@ -27,6 +27,17 @@ class WorkflowRunActionViewSet(ViewSet): queryset = WorkflowRun.objects.prefetch_related('states').all() orcabus_id_prefix = WorkflowRun.orcabus_id_prefix + @extend_schema(responses=AllowedRerunWorkflowSerializer, description="Allowed rerun workflows") + @action(detail=True, methods=['get'], url_name='validate_rerun_workflows', url_path='validate_rerun_workflows') + def validate_rerun_workflows(self, request, *args, **kwargs): + wfl_run = get_object_or_404(self.queryset, pk=kwargs.get('pk')) + is_valid = wfl_run.workflow.workflow_name in AllowedRerunWorkflow + reponse = { + 'is_valid': is_valid, + 'valid_workflows': AllowedRerunWorkflow + } + return Response(reponse, status=status.HTTP_200_OK) + @extend_schema( request=PolymorphicProxySerializer( component_name='WorkflowRunRerun', @@ -63,7 +74,7 @@ def rerun(self, request, *args, **kwargs): status=status.HTTP_400_BAD_REQUEST) detail = construct_rerun_eb_detail(wfl_run, serializer.data) - emit_wrsc_api_event(detail) + # emit_wrsc_api_event(detail) return Response(detail, status=status.HTTP_200_OK) From c4c58ff0eed226a80c57a94140f6a6a3dbce7423 Mon Sep 17 00:00:00 2001 From: Ray Liu Date: Wed, 11 Dec 2024 16:34:04 +1100 Subject: [PATCH 4/5] add allowed_dataset_choice in rerun valid api --- .../serializers/workflow_run_action.py | 1 + .../workflow_manager/viewsets/workflow_run_action.py | 11 ++++++++++- 2 files changed, 11 insertions(+), 1 deletion(-) diff --git a/lib/workload/stateless/stacks/workflow-manager/workflow_manager/serializers/workflow_run_action.py b/lib/workload/stateless/stacks/workflow-manager/workflow_manager/serializers/workflow_run_action.py index d57f7cc66..be317906f 100644 --- a/lib/workload/stateless/stacks/workflow-manager/workflow_manager/serializers/workflow_run_action.py +++ b/lib/workload/stateless/stacks/workflow-manager/workflow_manager/serializers/workflow_run_action.py @@ -9,6 +9,7 @@ class AllowedRerunWorkflow(StrEnum): class AllowedRerunWorkflowSerializer(serializers.Serializer): is_valid = serializers.BooleanField() + allowed_dataset_choice = serializers.ListField(child=serializers.CharField()) valid_workflows = serializers.ListField(child=serializers.CharField()) class BaseRerunInputSerializer(serializers.Serializer): diff --git a/lib/workload/stateless/stacks/workflow-manager/workflow_manager/viewsets/workflow_run_action.py b/lib/workload/stateless/stacks/workflow-manager/workflow_manager/viewsets/workflow_run_action.py index 9440cd705..721c8a7c9 100644 --- a/lib/workload/stateless/stacks/workflow-manager/workflow_manager/viewsets/workflow_run_action.py +++ b/lib/workload/stateless/stacks/workflow-manager/workflow_manager/viewsets/workflow_run_action.py @@ -32,9 +32,18 @@ class WorkflowRunActionViewSet(ViewSet): def validate_rerun_workflows(self, request, *args, **kwargs): wfl_run = get_object_or_404(self.queryset, pk=kwargs.get('pk')) is_valid = wfl_run.workflow.workflow_name in AllowedRerunWorkflow + + # Get allowed dataset choice for the workflow + wfl_name = wfl_run.workflow.workflow_name + allowed_dataset_choice = [] + if wfl_name == AllowedRerunWorkflow.RNASUM.value: + allowed_dataset_choice = RERUN_INPUT_SERIALIZERS[wfl_name].allowed_dataset_choice + reponse = { 'is_valid': is_valid, - 'valid_workflows': AllowedRerunWorkflow + 'allowed_dataset_choice': allowed_dataset_choice, + 'valid_workflows': AllowedRerunWorkflow, + } return Response(reponse, status=status.HTTP_200_OK) From 0ff1494acef58939781935367b583514e405c72d Mon Sep 17 00:00:00 2001 From: Ray Liu Date: Thu, 12 Dec 2024 14:26:04 +1100 Subject: [PATCH 5/5] bring the emit events back --- .../workflow_manager/viewsets/workflow_run_action.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/lib/workload/stateless/stacks/workflow-manager/workflow_manager/viewsets/workflow_run_action.py b/lib/workload/stateless/stacks/workflow-manager/workflow_manager/viewsets/workflow_run_action.py index 721c8a7c9..ec6c26450 100644 --- a/lib/workload/stateless/stacks/workflow-manager/workflow_manager/viewsets/workflow_run_action.py +++ b/lib/workload/stateless/stacks/workflow-manager/workflow_manager/viewsets/workflow_run_action.py @@ -43,7 +43,6 @@ def validate_rerun_workflows(self, request, *args, **kwargs): 'is_valid': is_valid, 'allowed_dataset_choice': allowed_dataset_choice, 'valid_workflows': AllowedRerunWorkflow, - } return Response(reponse, status=status.HTTP_200_OK) @@ -83,7 +82,7 @@ def rerun(self, request, *args, **kwargs): status=status.HTTP_400_BAD_REQUEST) detail = construct_rerun_eb_detail(wfl_run, serializer.data) - # emit_wrsc_api_event(detail) + emit_wrsc_api_event(detail) return Response(detail, status=status.HTTP_200_OK)