From 8b9dcb1d70d98edb2c4ac9b10982073683e100b0 Mon Sep 17 00:00:00 2001 From: Ray Liu Date: Tue, 10 Sep 2024 15:03:31 +1000 Subject: [PATCH] add custom query params for workflow run query --- .../workflow_manager/viewsets/workflow_run.py | 51 ++++++++++++++++--- 1 file changed, 45 insertions(+), 6 deletions(-) diff --git a/lib/workload/stateless/stacks/workflow-manager/workflow_manager/viewsets/workflow_run.py b/lib/workload/stateless/stacks/workflow-manager/workflow_manager/viewsets/workflow_run.py index 83071da07..0d6871666 100644 --- a/lib/workload/stateless/stacks/workflow-manager/workflow_manager/viewsets/workflow_run.py +++ b/lib/workload/stateless/stacks/workflow-manager/workflow_manager/viewsets/workflow_run.py @@ -1,4 +1,4 @@ -from django.db.models import Q +from django.db.models import Q, Max from rest_framework import filters from rest_framework.decorators import action from rest_framework.viewsets import ReadOnlyModelViewSet @@ -17,7 +17,46 @@ class WorkflowRunViewSet(ReadOnlyModelViewSet): search_fields = WorkflowRun.get_base_fields() def get_queryset(self): - return WorkflowRun.objects.get_by_keyword(**self.request.query_params).prefetch_related('states').prefetch_related('libraries').select_related('workflow') # add prefetch_related & select_related to reduce the number of queries + + """ + custom queryset to filter by: + start_time, end_time : range of latest state timestamp + is_ongoing : filter by ongoing workflow runs + """ + # default start time is 0 + start_time = self.request.query_params.get('start_time', 0) + # default is current time + end_time = self.request.query_params.get('end_time', 0) + + # get is ongoing flag + is_ongoing = self.request.query_params.get('is_ongoing', 'false') + + def exclude_params(params): + for param in params: + self.request.query_params.pop(param) if param in self.request.query_params.keys() else None + + exclude_params([ + 'start_time', + 'end_time', + 'is_ongoing' + ]) + + # get all workflow runs with rest (build-in) of the query params + result_set = WorkflowRun.objects.get_by_keyword(**self.request.query_params).prefetch_related('states').prefetch_related('libraries').select_related('workflow') # add prefetch_related & select_related to reduce the number of queries + + if start_time and end_time: + result_set = result_set.annotate(latest_state_time=Max('states__timestamp')).filter( + latest_state_time__range=[start_time, end_time] + ) + + if is_ongoing.lower() == 'true': + result_set = result_set.filter( + ~Q(states__status="FAILED") & + ~Q(states__status="ABORTED") & + ~Q(states__status="SUCCEEDED") + ) + + return result_set @action(detail=False, methods=['GET']) def ongoing(self, request): @@ -29,14 +68,14 @@ def ongoing(self, request): if "status" in self.request.query_params.keys(): print("found status!") status = self.request.query_params.get('status') - result_set = WorkflowRun.objects.get_by_keyword(state__status=status).order_by(ordering) + result_set = WorkflowRun.objects.get_by_keyword(states__status=status).order_by(ordering) else: result_set = WorkflowRun.objects.get_by_keyword(**self.request.query_params).order_by(ordering) result_set = result_set.filter( - ~Q(state__status="FAILED") & - ~Q(state__status="ABORTED") & - ~Q(state__status="SUCCEEDED") + ~Q(states__status="FAILED") & + ~Q(states__status="ABORTED") & + ~Q(states__status="SUCCEEDED") ) pagw_qs = self.paginate_queryset(result_set) serializer = self.get_serializer(pagw_qs, many=True)