Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Change(WorkflowRunManager): update the workflowrun model #536

Merged
merged 8 commits into from
Sep 10, 2024
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
# Generated by Django 5.1 on 2024-09-09 05:55

import django.db.models.deletion
from django.db import migrations, models


class Migration(migrations.Migration):

dependencies = [
("workflow_manager", "0001_initial"),
]

operations = [
migrations.AlterField(
model_name="state",
name="workflow_run",
field=models.ForeignKey(
on_delete=django.db.models.deletion.CASCADE,
related_name="states",
to="workflow_manager.workflowrun",
),
),
]
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ class Meta:
id = models.BigAutoField(primary_key=True)

# --- mandatory fields
workflow_run = models.ForeignKey(WorkflowRun, on_delete=models.CASCADE)
workflow_run = models.ForeignKey(WorkflowRun, related_name='states', on_delete=models.CASCADE)
status = models.CharField(max_length=255) # TODO: How and where to enforce conventions?
timestamp = models.DateTimeField()

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,11 @@ def to_dict(self):

def get_all_states(self):
# retrieve all states (DB records rather than a queryset)
return list(self.state_set.all()) # TODO: ensure order by timestamp ?
return list(self.states.all()) # TODO: ensure order by timestamp ?

def get_latest_state(self):
# retrieve all related states and get the latest one
return self.states.order_by('-timestamp').first()

class LibraryAssociationManager(OrcaBusBaseManager):
pass
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,23 +39,29 @@ class Meta:
fields = '__all__'


class LibraryModelSerializer(serializers.ModelSerializer):
class Meta:
model = Library
fields = '__all__'

class WorkflowRunModelSerializer(serializers.ModelSerializer):
current_state = serializers.SerializerMethodField()
libraries = LibraryModelSerializer(many=True, read_only=True)
workflow = WorkflowModelSerializer(read_only=True)
class Meta:
model = WorkflowRun
fields = '__all__'

def get_current_state(self, obj)->dict:
latest_state = obj.get_latest_state()
return StateModelSerializer(latest_state).data if latest_state else None

class PayloadModelSerializer(serializers.ModelSerializer):
class Meta:
model = Payload
fields = '__all__'


class LibraryModelSerializer(serializers.ModelSerializer):
class Meta:
model = Library
fields = '__all__'


class StateModelSerializer(serializers.ModelSerializer):
class Meta:
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from django.db.models import Q
from django.db.models import Q, Max
from rest_framework import filters
from rest_framework.decorators import action
from rest_framework.viewsets import ReadOnlyModelViewSet
Expand All @@ -17,7 +17,46 @@ class WorkflowRunViewSet(ReadOnlyModelViewSet):
search_fields = WorkflowRun.get_base_fields()

def get_queryset(self):
return WorkflowRun.objects.get_by_keyword(**self.request.query_params)

"""
custom queryset to filter by:
start_time, end_time : range of latest state timestamp
is_ongoing : filter by ongoing workflow runs
"""
# default time is 0
start_time = self.request.query_params.get('start_time', 0)
end_time = self.request.query_params.get('end_time', 0)

# get is ongoing flag
is_ongoing = self.request.query_params.get('is_ongoing', 'false')

# exclude the custom query params from the rest of the query params
def exclude_params(params):
for param in params:
self.request.query_params.pop(param) if param in self.request.query_params.keys() else None

exclude_params([
'start_time',
'end_time',
'is_ongoing'
])

# get all workflow runs with rest of the query params
result_set = WorkflowRun.objects.get_by_keyword(**self.request.query_params).prefetch_related('states').prefetch_related('libraries').select_related('workflow') # add prefetch_related & select_related to reduce the number of queries

if start_time and end_time:
result_set = result_set.annotate(latest_state_time=Max('states__timestamp')).filter(
latest_state_time__range=[start_time, end_time]
)

if is_ongoing.lower() == 'true':
result_set = result_set.filter(
~Q(states__status="FAILED") &
~Q(states__status="ABORTED") &
~Q(states__status="SUCCEEDED")
)

return result_set

@action(detail=False, methods=['GET'])
def ongoing(self, request):
Expand All @@ -29,14 +68,14 @@ def ongoing(self, request):
if "status" in self.request.query_params.keys():
print("found status!")
status = self.request.query_params.get('status')
result_set = WorkflowRun.objects.get_by_keyword(state__status=status).order_by(ordering)
result_set = WorkflowRun.objects.get_by_keyword(states__status=status).order_by(ordering)
else:
result_set = WorkflowRun.objects.get_by_keyword(**self.request.query_params).order_by(ordering)

result_set = result_set.filter(
~Q(state__status="FAILED") &
~Q(state__status="ABORTED") &
~Q(state__status="SUCCEEDED")
~Q(states__status="FAILED") &
~Q(states__status="ABORTED") &
~Q(states__status="SUCCEEDED")
)
pagw_qs = self.paginate_queryset(result_set)
serializer = self.get_serializer(pagw_qs, many=True)
Expand Down