Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feat (wfm): add workflow stats viewsets #725

Merged
merged 1 commit into from
Dec 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,16 @@
from workflow_manager.viewsets.workflow_run_action import WorkflowRunActionViewSet
# from workflow_manager.viewsets.library import LibraryViewSet
from workflow_manager.viewsets.workflow_run_comment import WorkflowRunCommentViewSet
from workflow_manager.viewsets.workflow_run_stats import WorkflowRunStatsViewSet
from workflow_manager.settings.base import API_VERSION

api_namespace = "api"
api_version = API_VERSION
api_base = f"{api_namespace}/{api_version}/"

router = OptionalSlashDefaultRouter()

router.register("workflowrun/stats", WorkflowRunStatsViewSet, basename="workflowrun_list_all") # put it before workflowrun, as it will match the workflowrun/list_all/ url
router.register(r"analysis", AnalysisViewSet, basename="analysis")
router.register(r"analysisrun", AnalysisRunViewSet, basename="analysisrun")
router.register(r"analysiscontext", AnalysisContextViewSet, basename="analysiscontext")
Expand Down
Original file line number Diff line number Diff line change
@@ -1,11 +1,9 @@
from django.db.models import Q, Max, F
from drf_spectacular.utils import extend_schema
from rest_framework.decorators import action
from rest_framework.response import Response

from workflow_manager.models.workflow_run import WorkflowRun
from workflow_manager.serializers.workflow_run import WorkflowRunCountByStatusSerializer, WorkflowRunListParamSerializer
from workflow_manager.serializers.workflow_run import WorkflowRunDetailSerializer, WorkflowRunSerializer
from workflow_manager.serializers.workflow_run import WorkflowRunListParamSerializer, WorkflowRunDetailSerializer, WorkflowRunSerializer
from workflow_manager.viewsets.base import BaseViewSet


Expand All @@ -15,9 +13,7 @@ class WorkflowRunViewSet(BaseViewSet):
queryset = WorkflowRun.objects.prefetch_related("libraries").all()
orcabus_id_prefix = WorkflowRun.orcabus_id_prefix

@extend_schema(parameters=[
WorkflowRunListParamSerializer
])
@extend_schema(parameters=[WorkflowRunListParamSerializer])
def list(self, request, *args, **kwargs):
self.serializer_class = WorkflowRunSerializer # use simple view for record listing
return super().list(request, *args, **kwargs)
Expand Down Expand Up @@ -133,51 +129,4 @@ def unresolved(self, request):
)
pagw_qs = self.paginate_queryset(result_set)
serializer = self.get_serializer(pagw_qs, many=True)
return self.get_paginated_response(serializer.data)

@extend_schema(operation_id='/api/v1/workflow_run/count_by_status/', responses=WorkflowRunCountByStatusSerializer)
@action(detail=False, methods=['GET'])
def count_by_status(self, request):
"""
Returns the count of records for each status: 'SUCCEEDED', 'ABORTED', 'FAILED', and 'Onging' State.
"""
base_queryset = WorkflowRun.objects.prefetch_related('states')

all_count = base_queryset.count()

annotate_queryset = base_queryset.annotate(latest_state_time=Max('states__timestamp'))

succeeded_count = annotate_queryset.filter(
states__timestamp=F('latest_state_time'),
states__status="SUCCEEDED"
).count()

aborted_count = annotate_queryset.filter(
states__timestamp=F('latest_state_time'),
states__status="ABORTED"
).count()

failed_count = annotate_queryset.filter(
states__timestamp=F('latest_state_time'),
states__status="FAILED"
).count()

resolved_count = annotate_queryset.filter(
states__timestamp=F('latest_state_time'),
states__status="RESOLVED"
).count()

ongoing_count = base_queryset.filter(
~Q(states__status="FAILED") &
~Q(states__status="ABORTED") &
~Q(states__status="SUCCEEDED")
).count()

return Response({
'all': all_count,
'succeeded': succeeded_count,
'aborted': aborted_count,
'failed': failed_count,
'resolved': resolved_count,
'ongoing': ongoing_count
}, status=200)
return self.get_paginated_response(serializer.data)
Original file line number Diff line number Diff line change
@@ -0,0 +1,149 @@
from django.db.models import Q, Max, F
from rest_framework import mixins
from rest_framework.viewsets import GenericViewSet
from rest_framework.decorators import action
from drf_spectacular.utils import extend_schema
from rest_framework.response import Response

from workflow_manager.models import WorkflowRun
from workflow_manager.serializers.workflow_run import WorkflowRunDetailSerializer, WorkflowRunCountByStatusSerializer


class WorkflowRunStatsViewSet(mixins.ListModelMixin, GenericViewSet):
serializer_class = WorkflowRunDetailSerializer
pagination_class = None # No pagination by default
http_method_names = ['get']

def get_queryset(self):
"""
custom queryset:
add filter by:
start_time, end_time : range of latest state timestamp
is_ongoing : filter by ongoing workflow runs
status : filter by latest state status

add search terms:
library_id: filter by library_id
orcabus_id: filter by orcabus_id
"""
# default time is 0
start_time = self.request.query_params.get('start_time', 0)
end_time = self.request.query_params.get('end_time', 0)

# get is ongoing flag
is_ongoing = self.request.query_params.get('is_ongoing', 'false')

# get status
status = self.request.query_params.get('status', '')

# get search query params
search_params = self.request.query_params.get('search', '')

# exclude the custom query params from the rest of the query params
def exclude_params(params):
for param in params:
self.request.query_params.pop(param) if param in self.request.query_params.keys() else None

exclude_params([
'start_time',
'end_time',
'is_ongoing',
'status',
'search'
])

# get all workflow runs with rest of the query params
# add prefetch_related & select_related to reduce the number of queries
result_set = WorkflowRun.objects.get_by_keyword(**self.request.query_params)\
.prefetch_related('states')\
.prefetch_related('libraries')\
.select_related('workflow')

if start_time and end_time:
result_set = result_set.annotate(latest_state_time=Max('states__timestamp')).filter(
latest_state_time__range=[start_time, end_time]
)

if is_ongoing.lower() == 'true':
result_set = result_set.filter(
~Q(states__status="FAILED") &
~Q(states__status="ABORTED") &
~Q(states__status="SUCCEEDED") &
~Q(states__status="RESOLVED")
)

if status:
result_set = result_set.annotate(latest_state_time=Max('states__timestamp')).filter(
states__timestamp=F('latest_state_time'),
states__status=status.upper()
)

# Combine search across multiple fields (worfkflow run name, comment, library_id, orcabus_id, workflow name)
if search_params:
result_set = result_set.filter(
Q(workflow_run_name__icontains=search_params) |
Q(comment__icontains=search_params) |
Q(libraries__library_id__icontains=search_params) |
Q(libraries__orcabus_id__icontains=search_params) |
Q(workflow__workflow_name__icontains=search_params)
).distinct()

return result_set

@extend_schema(responses=WorkflowRunDetailSerializer(many=True))
@action(detail=False, methods=['GET'], url_path='list_all')
def list_all(self, request):
return self.list(request)


@extend_schema(responses=WorkflowRunCountByStatusSerializer)
@action(detail=False, methods=['GET'])
def count_by_status(self, request):
"""
Returns the count of records for each status: 'SUCCEEDED', 'ABORTED', 'FAILED', and 'Onging' State based on the query params.
"""
start_time = self.request.query_params.get('start_time', 0)
end_time = self.request.query_params.get('end_time', 0)

base_queryset = self.get_queryset()

all_count = base_queryset.count()

annotate_queryset = base_queryset.annotate(latest_state_time=Max('states__timestamp'))

succeeded_count = annotate_queryset.filter(
states__timestamp=F('latest_state_time'),
states__status="SUCCEEDED"
).count()

aborted_count = annotate_queryset.filter(
states__timestamp=F('latest_state_time'),
states__status="ABORTED"
).count()

failed_count = annotate_queryset.filter(
states__timestamp=F('latest_state_time'),
states__status="FAILED"
).count()

resolved_count = annotate_queryset.filter(
states__timestamp=F('latest_state_time'),
states__status="RESOLVED"
).count()

ongoing_count = base_queryset.filter(
~Q(states__status="FAILED") &
~Q(states__status="ABORTED") &
~Q(states__status="SUCCEEDED")
).count()

return Response({
'all': all_count,
'succeeded': succeeded_count,
'aborted': aborted_count,
'failed': failed_count,
'resolved': resolved_count,
'ongoing': ongoing_count
}, status=200)


Loading