From f3d2e39b3e5b05558b9f195e4d11137c664b32ba Mon Sep 17 00:00:00 2001 From: Florian Reisinger Date: Tue, 13 Aug 2024 16:46:55 +1000 Subject: [PATCH 01/14] Update API paths --- .../stateless/stacks/workflow-manager/README.md | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/lib/workload/stateless/stacks/workflow-manager/README.md b/lib/workload/stateless/stacks/workflow-manager/README.md index 86f31ac98..922edefd6 100644 --- a/lib/workload/stateless/stacks/workflow-manager/README.md +++ b/lib/workload/stateless/stacks/workflow-manager/README.md @@ -40,6 +40,7 @@ make ps ``` python manage.py help python manage.py showmigrations +python manage.py makemigrations python manage.py migrate ``` @@ -73,17 +74,17 @@ python manage.py runserver_plus ``` ``` -curl -s http://localhost:8000/wfm/v1/workflow | jq +curl -s http://localhost:8000/api/v1/workflow | jq ``` ``` -curl -s http://localhost:8000/wfm/v1/workflow/1 | jq +curl -s http://localhost:8000/api/v1/workflow/1 | jq ``` Or visit in browser: -- http://localhost:8000/wfm/v1 -- http://localhost:8000/wfm/v1/workflow -- http://localhost:8000/wfm/v1/workflow/1 +- http://localhost:8000/api/v1 +- http://localhost:8000/api/v1/workflow +- http://localhost:8000/api/v1/workflow/1 ### API Doc From 71b7ea4df5d120eebd96710d4fb7f438616528f5 Mon Sep 17 00:00:00 2001 From: Florian Reisinger Date: Tue, 13 Aug 2024 16:47:41 +1000 Subject: [PATCH 02/14] Add library model --- .../workflow_manager/models/library.py | 26 +++++++++++++++++++ .../workflow_manager/models/workflow_run.py | 12 +++++++++ 2 files changed, 38 insertions(+) create mode 100644 lib/workload/stateless/stacks/workflow-manager/workflow_manager/models/library.py diff --git a/lib/workload/stateless/stacks/workflow-manager/workflow_manager/models/library.py b/lib/workload/stateless/stacks/workflow-manager/workflow_manager/models/library.py new file mode 100644 index 000000000..940f03e69 --- /dev/null +++ b/lib/workload/stateless/stacks/workflow-manager/workflow_manager/models/library.py @@ -0,0 +1,26 @@ +from django.core.serializers.json import DjangoJSONEncoder +from django.db import models + +from workflow_manager.models.base import OrcaBusBaseModel, OrcaBusBaseManager +from workflow_manager.models.workflow import Workflow + + +class LibraryManager(OrcaBusBaseManager): + pass + + +class Library(OrcaBusBaseModel): + id = models.BigAutoField(primary_key=True) + + library_id = models.CharField(max_length=255, unique=True) + + objects = LibraryManager() + + def __str__(self): + return f"ID: {self.id}, library_id: {self.library_id}" + + def to_dict(self): + return { + "id": self.id, + "library_id": self.library_id + } diff --git a/lib/workload/stateless/stacks/workflow-manager/workflow_manager/models/workflow_run.py b/lib/workload/stateless/stacks/workflow-manager/workflow_manager/models/workflow_run.py index 32ba36480..768dec46f 100644 --- a/lib/workload/stateless/stacks/workflow-manager/workflow_manager/models/workflow_run.py +++ b/lib/workload/stateless/stacks/workflow-manager/workflow_manager/models/workflow_run.py @@ -3,6 +3,7 @@ from workflow_manager.models.base import OrcaBusBaseModel, OrcaBusBaseManager from workflow_manager.models.payload import Payload from workflow_manager.models.workflow import Workflow +from workflow_manager.models.library import Library class WorkflowRunManager(OrcaBusBaseManager): @@ -36,6 +37,10 @@ class Meta: # Link to workflow payload data payload = models.ForeignKey(Payload, null=True, blank=True, on_delete=models.SET_NULL) + # Link to library table + libraries = models.ManyToManyField(Library, through="LibraryAssociation") + + objects = WorkflowRunManager() def __str__(self): @@ -53,3 +58,10 @@ def to_dict(self): "payload": self.payload.to_dict() if (self.payload is not None) else None, "workflow": self.workflow.to_dict() if (self.workflow is not None) else None } + + +class LibraryAssociation(OrcaBusBaseModel): + workflow_run = models.ForeignKey(WorkflowRun, on_delete=models.CASCADE) + library = models.ForeignKey(Library, on_delete=models.CASCADE) + association_date = models.DateTimeField() + status = models.CharField(max_length=255) From 27096f04b675e0acd1b6569b92ad05959a90a96f Mon Sep 17 00:00:00 2001 From: Florian Reisinger Date: Wed, 14 Aug 2024 11:57:45 +1000 Subject: [PATCH 03/14] Add library to model and schema --- .../workflow_manager/serializers.py | 5 +++++ .../workflow_manager/urls/base.py | 1 + .../workflow_manager/viewsets/library.py | 18 ++++++++++++++++++ .../WorkflowRunStateChange.py | 18 +++++++++++++++++- .../WorkflowRunStateChange.py | 18 +++++++++++++++++- 5 files changed, 58 insertions(+), 2 deletions(-) create mode 100644 lib/workload/stateless/stacks/workflow-manager/workflow_manager/viewsets/library.py diff --git a/lib/workload/stateless/stacks/workflow-manager/workflow_manager/serializers.py b/lib/workload/stateless/stacks/workflow-manager/workflow_manager/serializers.py index e606019ba..003d410a4 100644 --- a/lib/workload/stateless/stacks/workflow-manager/workflow_manager/serializers.py +++ b/lib/workload/stateless/stacks/workflow-manager/workflow_manager/serializers.py @@ -54,3 +54,8 @@ class PayloadModelSerializer(serializers.ModelSerializer): class Meta: model = Payload fields = '__all__' + +class LibraryModelSerializer(serializers.ModelSerializer): + class Meta: + model = Library + fields = '__all__' diff --git a/lib/workload/stateless/stacks/workflow-manager/workflow_manager/urls/base.py b/lib/workload/stateless/stacks/workflow-manager/workflow_manager/urls/base.py index 22902fbd8..beb14bdc2 100644 --- a/lib/workload/stateless/stacks/workflow-manager/workflow_manager/urls/base.py +++ b/lib/workload/stateless/stacks/workflow-manager/workflow_manager/urls/base.py @@ -14,6 +14,7 @@ router.register(r"workflow", WorkflowViewSet, basename="workflow") router.register(r"workflowrun", WorkflowRunViewSet, basename="workflowrun") router.register(r"payload", PayloadViewSet, basename="payload") +router.register(r"library", PayloadViewSet, basename="library") urlpatterns = [ path(f"{api_base}", include(router.urls)), diff --git a/lib/workload/stateless/stacks/workflow-manager/workflow_manager/viewsets/library.py b/lib/workload/stateless/stacks/workflow-manager/workflow_manager/viewsets/library.py new file mode 100644 index 000000000..d529aa5a9 --- /dev/null +++ b/lib/workload/stateless/stacks/workflow-manager/workflow_manager/viewsets/library.py @@ -0,0 +1,18 @@ +from rest_framework import filters +from rest_framework.viewsets import ReadOnlyModelViewSet + +from workflow_manager.models.library import Library +from workflow_manager.pagination import StandardResultsSetPagination +from workflow_manager.serializers import LibraryModelSerializer + + +class LibraryViewSet(ReadOnlyModelViewSet): + serializer_class = LibraryModelSerializer + pagination_class = StandardResultsSetPagination + filter_backends = [filters.OrderingFilter, filters.SearchFilter] + ordering_fields = '__all__' + ordering = ['-id'] + search_fields = Library.get_base_fields() + + def get_queryset(self): + return Library.objects.get_by_keyword(**self.request.query_params) diff --git a/lib/workload/stateless/stacks/workflow-manager/workflow_manager_proc/domain/executionservice/workflowrunstatechange/WorkflowRunStateChange.py b/lib/workload/stateless/stacks/workflow-manager/workflow_manager_proc/domain/executionservice/workflowrunstatechange/WorkflowRunStateChange.py index a80b1943f..a334306ca 100644 --- a/lib/workload/stateless/stacks/workflow-manager/workflow_manager_proc/domain/executionservice/workflowrunstatechange/WorkflowRunStateChange.py +++ b/lib/workload/stateless/stacks/workflow-manager/workflow_manager_proc/domain/executionservice/workflowrunstatechange/WorkflowRunStateChange.py @@ -14,6 +14,7 @@ class WorkflowRunStateChange(object): 'workflowName': 'str', 'workflowVersion': 'str', 'workflowRunName': 'str', + 'linkedLibraries': 'list[str]', 'payload': 'Payload' } @@ -25,10 +26,11 @@ class WorkflowRunStateChange(object): 'workflowName': 'workflowName', 'workflowVersion': 'workflowVersion', 'workflowRunName': 'workflowRunName', + 'linkedLibraries': 'linkedLibraries', 'payload': 'payload' } - def __init__(self, portalRunId=None, executionId=None, timestamp=None, status=None, workflowName=None, workflowVersion=None, workflowRunName=None, payload=None): # noqa: E501 + def __init__(self, portalRunId=None, executionId=None, timestamp=None, status=None, workflowName=None, workflowVersion=None, workflowRunName=None, linkedLibraries=None, payload=None): # noqa: E501 self._portalRunId = None self._executionId = None self._timestamp = None @@ -36,6 +38,7 @@ def __init__(self, portalRunId=None, executionId=None, timestamp=None, status=No self._workflowName = None self._workflowVersion = None self._workflowRunName = None + self._linkedLibraries = None self._payload = None self.discriminator = None self.portalRunId = portalRunId @@ -45,6 +48,7 @@ def __init__(self, portalRunId=None, executionId=None, timestamp=None, status=No self.workflowName = workflowName self.workflowVersion = workflowVersion self.workflowRunName = workflowRunName + self.linkedLibraries = linkedLibraries self.payload = payload @@ -132,6 +136,18 @@ def workflowRunName(self, workflowRunName): self._workflowRunName = workflowRunName + @property + def linkedLibraries(self): + + return self._linkedLibraries + + @linkedLibraries.setter + def linkedLibraries(self, linkedLibraries): + + + self._linkedLibraries = linkedLibraries + + @property def payload(self): diff --git a/lib/workload/stateless/stacks/workflow-manager/workflow_manager_proc/domain/workflowmanager/workflowrunstatechange/WorkflowRunStateChange.py b/lib/workload/stateless/stacks/workflow-manager/workflow_manager_proc/domain/workflowmanager/workflowrunstatechange/WorkflowRunStateChange.py index 8037d7ec6..276da2935 100644 --- a/lib/workload/stateless/stacks/workflow-manager/workflow_manager_proc/domain/workflowmanager/workflowrunstatechange/WorkflowRunStateChange.py +++ b/lib/workload/stateless/stacks/workflow-manager/workflow_manager_proc/domain/workflowmanager/workflowrunstatechange/WorkflowRunStateChange.py @@ -13,6 +13,7 @@ class WorkflowRunStateChange(object): 'workflowName': 'str', 'workflowVersion': 'str', 'workflowRunName': 'str', + 'linkedLibraries': 'list[str]', 'payload': 'Payload' } @@ -23,16 +24,18 @@ class WorkflowRunStateChange(object): 'workflowName': 'workflowName', 'workflowVersion': 'workflowVersion', 'workflowRunName': 'workflowRunName', + 'linkedLibraries': 'linkedLibraries', 'payload': 'payload' } - def __init__(self, portalRunId=None, timestamp=None, status=None, workflowName=None, workflowVersion=None, workflowRunName=None, payload=None): # noqa: E501 + def __init__(self, portalRunId=None, timestamp=None, status=None, workflowName=None, workflowVersion=None, workflowRunName=None, linkedLibraries=None, payload=None): # noqa: E501 self._portalRunId = None self._timestamp = None self._status = None self._workflowName = None self._workflowVersion = None self._workflowRunName = None + self._linkedLibraries = None self._payload = None self.discriminator = None self.portalRunId = portalRunId @@ -41,6 +44,7 @@ def __init__(self, portalRunId=None, timestamp=None, status=None, workflowName=N self.workflowName = workflowName self.workflowVersion = workflowVersion self.workflowRunName = workflowRunName + self.linkedLibraries = linkedLibraries self.payload = payload @@ -115,6 +119,18 @@ def workflowRunName(self, workflowRunName): self._workflowRunName = workflowRunName + + @property + def linkedLibraries(self): + + return self._linkedLibraries + + @linkedLibraries.setter + def linkedLibraries(self, linkedLibraries): + + + self._linkedLibraries = linkedLibraries + @property def payload(self): From f53a3f40222115f7520e13cb80cc89cff2a15a52 Mon Sep 17 00:00:00 2001 From: Florian Reisinger Date: Wed, 14 Aug 2024 12:02:49 +1000 Subject: [PATCH 04/14] Event schema update --- .../events/workflowmanager/WorkflowRunStateChange.schema.json | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/schemas/events/workflowmanager/WorkflowRunStateChange.schema.json b/docs/schemas/events/workflowmanager/WorkflowRunStateChange.schema.json index e30646448..e3932acd2 100644 --- a/docs/schemas/events/workflowmanager/WorkflowRunStateChange.schema.json +++ b/docs/schemas/events/workflowmanager/WorkflowRunStateChange.schema.json @@ -119,4 +119,4 @@ } } } -} +} \ No newline at end of file From 5419f3b4b99c4ec8de596cccfb4fc4fdd83432d0 Mon Sep 17 00:00:00 2001 From: Florian Reisinger Date: Thu, 15 Aug 2024 09:25:35 +1000 Subject: [PATCH 05/14] First workflowrun with lib ids test working --- .../stacks/workflow-manager/README.md | 2 +- .../commands/generate_mock_workflow_run.py | 23 ++++-- ...ibraryassociation_workflowrun_libraries.py | 42 +++++++++++ .../workflow_manager/models/__init__.py | 3 +- .../workflow_manager/models/workflow_run.py | 7 +- .../workflow_manager/serializers.py | 4 +- .../workflow_manager/tests/factories.py | 34 +++++---- .../services/create_workflow_run.py | 73 +++++++++++++------ .../tests/test_workflow_srv.py | 73 +++++++++++++++++-- 9 files changed, 206 insertions(+), 55 deletions(-) create mode 100644 lib/workload/stateless/stacks/workflow-manager/workflow_manager/migrations/0002_library_libraryassociation_workflowrun_libraries.py diff --git a/lib/workload/stateless/stacks/workflow-manager/README.md b/lib/workload/stateless/stacks/workflow-manager/README.md index 922edefd6..b64df4c92 100644 --- a/lib/workload/stateless/stacks/workflow-manager/README.md +++ b/lib/workload/stateless/stacks/workflow-manager/README.md @@ -51,7 +51,7 @@ _^^^ please make sure to run `python manage.py migrate` first! ^^^_ #### Generate Workflow Record ``` -python manage.py help generate_mock_data +python manage.py help generate_mock_workflow_run > Generate mock Workflow data into database for local development and testing ``` diff --git a/lib/workload/stateless/stacks/workflow-manager/workflow_manager/management/commands/generate_mock_workflow_run.py b/lib/workload/stateless/stacks/workflow-manager/workflow_manager/management/commands/generate_mock_workflow_run.py index 5d27a5cb4..14c2beb76 100644 --- a/lib/workload/stateless/stacks/workflow-manager/workflow_manager/management/commands/generate_mock_workflow_run.py +++ b/lib/workload/stateless/stacks/workflow-manager/workflow_manager/management/commands/generate_mock_workflow_run.py @@ -1,10 +1,13 @@ from django.core.management import BaseCommand from django.db.models import QuerySet +from django.utils.timezone import make_aware import json +from datetime import datetime from libumccr import libjson -from workflow_manager.models import WorkflowRun -from workflow_manager.tests.factories import WorkflowRunFactory, WorkflowFactory, PayloadFactory +from workflow_manager.models import WorkflowRun, LibraryAssociation +from workflow_manager.tests.factories import WorkflowRunFactory, WorkflowFactory, PayloadFactory, LibraryFactory + # https://docs.djangoproject.com/en/5.0/howto/custom-management-commands/ class Command(BaseCommand): @@ -14,10 +17,18 @@ def handle(self, *args, **options): wf_payload = PayloadFactory() wf_workflow = WorkflowFactory() - wf:WorkflowRun = WorkflowRunFactory( - workflow_run_name = "MockWorkflowRun", - payload = wf_payload, - workflow = wf_workflow + wf: WorkflowRun = WorkflowRunFactory( + workflow_run_name="MockWorkflowRun", + payload=wf_payload, + workflow=wf_workflow + ) + + library = LibraryFactory() + LibraryAssociation.objects.create( + workflow_run=wf, + library=library, + association_date=make_aware(datetime.now()), + status="ACTIVE", ) print(libjson.dumps(wf.to_dict())) diff --git a/lib/workload/stateless/stacks/workflow-manager/workflow_manager/migrations/0002_library_libraryassociation_workflowrun_libraries.py b/lib/workload/stateless/stacks/workflow-manager/workflow_manager/migrations/0002_library_libraryassociation_workflowrun_libraries.py new file mode 100644 index 000000000..80a342d0e --- /dev/null +++ b/lib/workload/stateless/stacks/workflow-manager/workflow_manager/migrations/0002_library_libraryassociation_workflowrun_libraries.py @@ -0,0 +1,42 @@ +# Generated by Django 5.1 on 2024-08-14 04:07 + +import django.db.models.deletion +from django.db import migrations, models + + +class Migration(migrations.Migration): + + dependencies = [ + ('workflow_manager', '0001_initial'), + ] + + operations = [ + migrations.CreateModel( + name='Library', + fields=[ + ('id', models.BigAutoField(primary_key=True, serialize=False)), + ('library_id', models.CharField(max_length=255, unique=True)), + ], + options={ + 'abstract': False, + }, + ), + migrations.CreateModel( + name='LibraryAssociation', + fields=[ + ('id', models.BigAutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID')), + ('association_date', models.DateTimeField()), + ('status', models.CharField(max_length=255)), + ('library', models.ForeignKey(on_delete=django.db.models.deletion.CASCADE, to='workflow_manager.library')), + ('workflow_run', models.ForeignKey(on_delete=django.db.models.deletion.CASCADE, to='workflow_manager.workflowrun')), + ], + options={ + 'abstract': False, + }, + ), + migrations.AddField( + model_name='workflowrun', + name='libraries', + field=models.ManyToManyField(through='workflow_manager.LibraryAssociation', to='workflow_manager.library'), + ), + ] diff --git a/lib/workload/stateless/stacks/workflow-manager/workflow_manager/models/__init__.py b/lib/workload/stateless/stacks/workflow-manager/workflow_manager/models/__init__.py index ce81e5ceb..a1e68b280 100644 --- a/lib/workload/stateless/stacks/workflow-manager/workflow_manager/models/__init__.py +++ b/lib/workload/stateless/stacks/workflow-manager/workflow_manager/models/__init__.py @@ -2,4 +2,5 @@ from .workflow import Workflow from .payload import Payload -from .workflow_run import WorkflowRun +from .workflow_run import WorkflowRun, LibraryAssociation +from .library import Library diff --git a/lib/workload/stateless/stacks/workflow-manager/workflow_manager/models/workflow_run.py b/lib/workload/stateless/stacks/workflow-manager/workflow_manager/models/workflow_run.py index 768dec46f..baba1278f 100644 --- a/lib/workload/stateless/stacks/workflow-manager/workflow_manager/models/workflow_run.py +++ b/lib/workload/stateless/stacks/workflow-manager/workflow_manager/models/workflow_run.py @@ -40,7 +40,6 @@ class Meta: # Link to library table libraries = models.ManyToManyField(Library, through="LibraryAssociation") - objects = WorkflowRunManager() def __str__(self): @@ -60,8 +59,14 @@ def to_dict(self): } +class LibraryAssociationManager(OrcaBusBaseManager): + pass + + class LibraryAssociation(OrcaBusBaseModel): workflow_run = models.ForeignKey(WorkflowRun, on_delete=models.CASCADE) library = models.ForeignKey(Library, on_delete=models.CASCADE) association_date = models.DateTimeField() status = models.CharField(max_length=255) + + objects = LibraryAssociationManager() diff --git a/lib/workload/stateless/stacks/workflow-manager/workflow_manager/serializers.py b/lib/workload/stateless/stacks/workflow-manager/workflow_manager/serializers.py index 003d410a4..aaa783fde 100644 --- a/lib/workload/stateless/stacks/workflow-manager/workflow_manager/serializers.py +++ b/lib/workload/stateless/stacks/workflow-manager/workflow_manager/serializers.py @@ -3,9 +3,7 @@ from rest_framework import serializers from rest_framework.fields import empty -from workflow_manager.models.workflow import Workflow -from workflow_manager.models.workflow_run import WorkflowRun -from workflow_manager.models.payload import Payload +from workflow_manager.models import Workflow, WorkflowRun, Payload, Library READ_ONLY_SERIALIZER = "READ ONLY SERIALIZER" diff --git a/lib/workload/stateless/stacks/workflow-manager/workflow_manager/tests/factories.py b/lib/workload/stateless/stacks/workflow-manager/workflow_manager/tests/factories.py index b5d0a94d6..916b0ccbe 100644 --- a/lib/workload/stateless/stacks/workflow-manager/workflow_manager/tests/factories.py +++ b/lib/workload/stateless/stacks/workflow-manager/workflow_manager/tests/factories.py @@ -6,12 +6,10 @@ import factory from django.utils.timezone import make_aware -from workflow_manager.models.workflow import Workflow -from workflow_manager.models.workflow_run import WorkflowRun -from workflow_manager.models.payload import Payload +from workflow_manager.models import Workflow, WorkflowRun, Payload, Library -class TestConstant(Enum): +class TestConstant(Enum): workflow_name = "TestWorkflow1" payload = { "key": "value", @@ -19,25 +17,26 @@ class TestConstant(Enum): "bar": datetime.now().astimezone(ZoneInfo('Australia/Sydney')), "sub": {"my": "sub"} } + library_id = "L000001" class WorkflowFactory(factory.django.DjangoModelFactory): class Meta: model = Workflow - workflow_name = "TestWorkflow" - workflow_version = "1.0" - execution_engine_pipeline_id = str(uuid.uuid4()) - execution_engine = "ICAv2" - approval_state = "NATA" + workflow_name = "TestWorkflow" + workflow_version = "1.0" + execution_engine_pipeline_id = str(uuid.uuid4()) + execution_engine = "ICAv2" + approval_state = "NATA" class PayloadFactory(factory.django.DjangoModelFactory): class Meta: model = Payload - version = "1.0.0" - payload_ref_id = str(uuid.uuid4()) + version = "1.0.0" + payload_ref_id = str(uuid.uuid4()) data = TestConstant.payload.value @@ -46,10 +45,10 @@ class Meta: model = WorkflowRun _uid = str(uuid.uuid4()) - portal_run_id = f"20240130{_uid[:8]}" - execution_id = _uid - workflow_run_name = f"TestWorkflowRun{_uid[:8]}" - status = "READY" + portal_run_id = f"20240130{_uid[:8]}" + execution_id = _uid + workflow_run_name = f"TestWorkflowRun{_uid[:8]}" + status = "READY" comment = "Lorem Ipsum" timestamp = make_aware(datetime.now()) # If required, set later @@ -57,3 +56,8 @@ class Meta: workflow = None +class LibraryFactory(factory.django.DjangoModelFactory): + class Meta: + model = Library + + library_id = TestConstant.library_id.value diff --git a/lib/workload/stateless/stacks/workflow-manager/workflow_manager_proc/services/create_workflow_run.py b/lib/workload/stateless/stacks/workflow-manager/workflow_manager_proc/services/create_workflow_run.py index 9eb441154..76abca99c 100644 --- a/lib/workload/stateless/stacks/workflow-manager/workflow_manager_proc/services/create_workflow_run.py +++ b/lib/workload/stateless/stacks/workflow-manager/workflow_manager_proc/services/create_workflow_run.py @@ -4,9 +4,23 @@ # # --- keep ^^^ at top of the module import uuid +from datetime import datetime + from django.db import transaction -from workflow_manager_proc.domain.executionservice.workflowrunstatechange import WorkflowRunStateChange, Marshaller -from workflow_manager.models.workflow_run import WorkflowRun, Workflow, Payload +from django.utils.timezone import make_aware +from workflow_manager_proc.domain.executionservice.workflowrunstatechange import ( + WorkflowRunStateChange, + Marshaller, +) +from workflow_manager.models.workflow_run import ( + WorkflowRun, + Workflow, + Payload, + Library, + LibraryAssociation, +) + +ASSOCIATION_STATUS = "ACTIVE" @transaction.atomic @@ -17,45 +31,45 @@ def handler(event, context): print(f"Processing {event}, {context}") wrsc: WorkflowRunStateChange = Marshaller.unmarshall(event, WorkflowRunStateChange) + print(wrsc) - # We expect: a corresponding Workflow has to exist for each workflow run + # We expect: a corresponding Workflow has to exist for each workflow run # TODO: decide whether we allow dynamic workflow creation or expect them to exist and fail if not try: print(f"Looking for workflow ({wrsc.workflowName}:{wrsc.workflowVersion}).") workflow: Workflow = Workflow.objects.get( - workflow_name = wrsc.workflowName, - workflow_version = wrsc.workflowVersion + workflow_name=wrsc.workflowName, workflow_version=wrsc.workflowVersion ) except Exception: print("No workflow found! Creating new entry.") workflow = Workflow( - workflow_name = wrsc.workflowName, - workflow_version = wrsc.workflowVersion, - execution_engine = "Unknown", - execution_engine_pipeline_id = "Unknown", - approval_state = "RESEARCH" + workflow_name=wrsc.workflowName, + workflow_version=wrsc.workflowVersion, + execution_engine="Unknown", + execution_engine_pipeline_id="Unknown", + approval_state="RESEARCH", ) print("Persisting Workflow record.") workflow.save() # then create the actual workflow run state change entry wfr = WorkflowRun( - workflow = workflow, - portal_run_id = wrsc.portalRunId, - execution_id = wrsc.executionId, # the execution service WRSC does carry the execution ID - workflow_run_name = wrsc.workflowRunName, - status = wrsc.status, - comment = None, - timestamp = wrsc.timestamp - ) + workflow=workflow, + portal_run_id=wrsc.portalRunId, + execution_id=wrsc.executionId, # the execution service WRSC does carry the execution ID + workflow_run_name=wrsc.workflowRunName, + status=wrsc.status, + comment=None, + timestamp=wrsc.timestamp, + ) # if payload is not null, create a new payload entry and assign a unique reference ID for it input_payload: Payload = wrsc.payload if input_payload: pld = Payload( - payload_ref_id = str(uuid.uuid4()), - version = input_payload.version, - data = input_payload.data + payload_ref_id=str(uuid.uuid4()), + version=input_payload.version, + data=input_payload.data, ) print("Persisting Payload record.") pld.save() @@ -65,5 +79,22 @@ def handler(event, context): print("Persisting WorkflowRun record.") wfr.save() + # if the workflow run is linked to library record(s), create the association(s) + input_libraries: list[str] = wrsc.linkedLibraries + for input_lib in input_libraries: + # check if the library has already a DB record + db_lib: Library = Library.objects.get_by_keyword(library_id=input_lib) + # create it if not + if not db_lib: + db_lib = Library.objects.create(library_id=input_lib) + + # create the library association + LibraryAssociation.objects.create( + workflow_run=wfr, + library=db_lib, + association_date=make_aware(datetime.now()), + status=ASSOCIATION_STATUS, + ) + print(f"{__name__} done.") return wfr # FIXME: serialise in future (json.dumps) diff --git a/lib/workload/stateless/stacks/workflow-manager/workflow_manager_proc/tests/test_workflow_srv.py b/lib/workload/stateless/stacks/workflow-manager/workflow_manager_proc/tests/test_workflow_srv.py index c17b47026..a7c8a5407 100644 --- a/lib/workload/stateless/stacks/workflow-manager/workflow_manager_proc/tests/test_workflow_srv.py +++ b/lib/workload/stateless/stacks/workflow-manager/workflow_manager_proc/tests/test_workflow_srv.py @@ -6,17 +6,76 @@ class WorkflowSrvUnitTests(WorkflowManagerProcUnitTestCase): - @skip + # @skip def test_get_workflow_from_db(self): """ python manage.py test workflow_manager_proc.tests.test_workflow_srv.WorkflowSrvUnitTests.test_get_workflow_from_db """ - # TODO: implement - mock_wfl = Workflow() - mock_wfl.text = "Test Workflow" - mock_wfl.save() - test_wfl = create_workflow_run.handler() + test_event = { + "portalRunId": "202405012397gatc", + "executionId": "icav2.id.12345", + "timestamp": "2025-05-01T09:25:44Z", + "status": "SUCCEEDED", + "workflowName": "ctTSO500", + "workflowVersion": "4.2.7", + "workflowRunName": "ctTSO500-L000002", + "payload": { + "version": "0.1.0", + "data": { + "projectId": "bxxxxxxxx-dxxx-4xxxx-adcc-xxxxxxxxx", + "analysisId": "12345678-238c-4200-b632-d5dd8c8db94a", + "userReference": "540424_A01001_0193_BBBBMMDRX5_c754de_bd822f", + "timeCreated": "2024-05-01T10:11:35Z", + "timeModified": "2024-05-01T11:24:29Z", + "pipelineId": "bfffffff-cb27-4dfa-846e-acd6eb081aca", + "pipelineCode": "CTTSO500 v4_2_7", + "pipelineDescription": "This is an ctTSO500 workflow execution", + "pipelineUrn": "urn:ilmn:ica:pipeline:bfffffff-cb27-4dfa-846e-acd6eb081aca#CTTSO500_v4_2_7" + } + } + } + + test_wfl = create_workflow_run.handler(test_event, None) + logger.info(test_wfl) + self.assertIsNotNone(test_wfl) + self.assertEqual("ctTSO500-L000002", test_wfl.workflow_run_name) + + def test_get_workflow_from_db2(self): + """ + python manage.py test workflow_manager_proc.tests.test_workflow_srv.WorkflowSrvUnitTests.test_get_workflow_from_db2 + """ + lib_ids = ["L000001", "L000002"] + + test_event = { + "portalRunId": "202405012397gatc", + "executionId": "icav2.id.12345", + "timestamp": "2025-05-01T09:25:44Z", + "status": "SUCCEEDED", + "workflowName": "ctTSO500", + "workflowVersion": "4.2.7", + "workflowRunName": "ctTSO500-L000002", + "linkedLibraries": lib_ids, + "payload": { + "version": "0.1.0", + "data": { + "projectId": "bxxxxxxxx-dxxx-4xxxx-adcc-xxxxxxxxx", + "analysisId": "12345678-238c-4200-b632-d5dd8c8db94a", + "userReference": "540424_A01001_0193_BBBBMMDRX5_c754de_bd822f", + "timeCreated": "2024-05-01T10:11:35Z", + "timeModified": "2024-05-01T11:24:29Z", + "pipelineId": "bfffffff-cb27-4dfa-846e-acd6eb081aca", + "pipelineCode": "CTTSO500 v4_2_7", + "pipelineDescription": "This is an ctTSO500 workflow execution", + "pipelineUrn": "urn:ilmn:ica:pipeline:bfffffff-cb27-4dfa-846e-acd6eb081aca#CTTSO500_v4_2_7" + } + } + } + + test_wfl = create_workflow_run.handler(test_event, None) logger.info(test_wfl) self.assertIsNotNone(test_wfl) - self.assertIn("Workflow", test_wfl.portal_run_id) + self.assertEqual("ctTSO500-L000002", test_wfl.workflow_run_name) + libs = test_wfl.libraries.all() + for lib in libs: + self.assertIn(lib.library_id, lib_ids) From 7161c49202fca08a14a041edbd6c499928e3ae18 Mon Sep 17 00:00:00 2001 From: Florian Reisinger Date: Thu, 15 Aug 2024 09:57:37 +1000 Subject: [PATCH 06/14] Add .gitattributes --- .gitattributes | 2 ++ 1 file changed, 2 insertions(+) create mode 100644 .gitattributes diff --git a/.gitattributes b/.gitattributes new file mode 100644 index 000000000..937c0eb37 --- /dev/null +++ b/.gitattributes @@ -0,0 +1,2 @@ +/.yarn/releases/** binary +/.yarn/plugins/** binary From 3373929efbc23c33975b661fb7b71e94bdede6c0 Mon Sep 17 00:00:00 2001 From: Florian Reisinger Date: Mon, 19 Aug 2024 11:34:50 +1000 Subject: [PATCH 07/14] WRSC schema update --- ...ibraryassociation_workflowrun_libraries.py | 42 --------- .../workflowrunstatechange/LibraryRecord.py | 92 +++++++++++++++++++ .../WorkflowRunStateChange.py | 2 +- .../workflowrunstatechange/__init__.py | 1 + .../workflowrunstatechange/LibraryRecord.py | 92 +++++++++++++++++++ .../WorkflowRunStateChange.py | 4 +- .../workflowrunstatechange/__init__.py | 1 + 7 files changed, 189 insertions(+), 45 deletions(-) delete mode 100644 lib/workload/stateless/stacks/workflow-manager/workflow_manager/migrations/0002_library_libraryassociation_workflowrun_libraries.py create mode 100644 lib/workload/stateless/stacks/workflow-manager/workflow_manager_proc/domain/executionservice/workflowrunstatechange/LibraryRecord.py create mode 100644 lib/workload/stateless/stacks/workflow-manager/workflow_manager_proc/domain/workflowmanager/workflowrunstatechange/LibraryRecord.py diff --git a/lib/workload/stateless/stacks/workflow-manager/workflow_manager/migrations/0002_library_libraryassociation_workflowrun_libraries.py b/lib/workload/stateless/stacks/workflow-manager/workflow_manager/migrations/0002_library_libraryassociation_workflowrun_libraries.py deleted file mode 100644 index 80a342d0e..000000000 --- a/lib/workload/stateless/stacks/workflow-manager/workflow_manager/migrations/0002_library_libraryassociation_workflowrun_libraries.py +++ /dev/null @@ -1,42 +0,0 @@ -# Generated by Django 5.1 on 2024-08-14 04:07 - -import django.db.models.deletion -from django.db import migrations, models - - -class Migration(migrations.Migration): - - dependencies = [ - ('workflow_manager', '0001_initial'), - ] - - operations = [ - migrations.CreateModel( - name='Library', - fields=[ - ('id', models.BigAutoField(primary_key=True, serialize=False)), - ('library_id', models.CharField(max_length=255, unique=True)), - ], - options={ - 'abstract': False, - }, - ), - migrations.CreateModel( - name='LibraryAssociation', - fields=[ - ('id', models.BigAutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID')), - ('association_date', models.DateTimeField()), - ('status', models.CharField(max_length=255)), - ('library', models.ForeignKey(on_delete=django.db.models.deletion.CASCADE, to='workflow_manager.library')), - ('workflow_run', models.ForeignKey(on_delete=django.db.models.deletion.CASCADE, to='workflow_manager.workflowrun')), - ], - options={ - 'abstract': False, - }, - ), - migrations.AddField( - model_name='workflowrun', - name='libraries', - field=models.ManyToManyField(through='workflow_manager.LibraryAssociation', to='workflow_manager.library'), - ), - ] diff --git a/lib/workload/stateless/stacks/workflow-manager/workflow_manager_proc/domain/executionservice/workflowrunstatechange/LibraryRecord.py b/lib/workload/stateless/stacks/workflow-manager/workflow_manager_proc/domain/executionservice/workflowrunstatechange/LibraryRecord.py new file mode 100644 index 000000000..db8e1dee7 --- /dev/null +++ b/lib/workload/stateless/stacks/workflow-manager/workflow_manager_proc/domain/executionservice/workflowrunstatechange/LibraryRecord.py @@ -0,0 +1,92 @@ +# coding: utf-8 +import pprint +import re # noqa: F401 + +import six +from enum import Enum + +class LibraryRecord(object): + + + _types = { + 'libraryId': 'str', + 'orcabusId': 'str' + } + + _attribute_map = { + 'libraryId': 'libraryId', + 'orcabusId': 'orcabusId' + } + + def __init__(self, libraryId=None, orcabusId=None): # noqa: E501 + self._libraryId = None + self._orcabusId = None + self.discriminator = None + self.libraryId = libraryId + self.orcabusId = orcabusId + + + @property + def libraryId(self): + + return self._libraryId + + @libraryId.setter + def libraryId(self, libraryId): + + + self._libraryId = libraryId + + + @property + def orcabusId(self): + + return self._orcabusId + + @orcabusId.setter + def orcabusId(self, orcabusId): + + + self._orcabusId = orcabusId + + def to_dict(self): + result = {} + + for attr, _ in six.iteritems(self._types): + value = getattr(self, attr) + if isinstance(value, list): + result[attr] = list(map( + lambda x: x.to_dict() if hasattr(x, "to_dict") else x, + value + )) + elif hasattr(value, "to_dict"): + result[attr] = value.to_dict() + elif isinstance(value, dict): + result[attr] = dict(map( + lambda item: (item[0], item[1].to_dict()) + if hasattr(item[1], "to_dict") else item, + value.items() + )) + else: + result[attr] = value + if issubclass(LibraryRecord, dict): + for key, value in self.items(): + result[key] = value + + return result + + def to_str(self): + return pprint.pformat(self.to_dict()) + + def __repr__(self): + return self.to_str() + + def __eq__(self, other): + if not isinstance(other, LibraryRecord): + return False + + return self.__dict__ == other.__dict__ + + def __ne__(self, other): + return not self == other + diff --git a/lib/workload/stateless/stacks/workflow-manager/workflow_manager_proc/domain/executionservice/workflowrunstatechange/WorkflowRunStateChange.py b/lib/workload/stateless/stacks/workflow-manager/workflow_manager_proc/domain/executionservice/workflowrunstatechange/WorkflowRunStateChange.py index a334306ca..8a3807708 100644 --- a/lib/workload/stateless/stacks/workflow-manager/workflow_manager_proc/domain/executionservice/workflowrunstatechange/WorkflowRunStateChange.py +++ b/lib/workload/stateless/stacks/workflow-manager/workflow_manager_proc/domain/executionservice/workflowrunstatechange/WorkflowRunStateChange.py @@ -14,7 +14,7 @@ class WorkflowRunStateChange(object): 'workflowName': 'str', 'workflowVersion': 'str', 'workflowRunName': 'str', - 'linkedLibraries': 'list[str]', + 'linkedLibraries': 'list[LibraryRecord]', 'payload': 'Payload' } diff --git a/lib/workload/stateless/stacks/workflow-manager/workflow_manager_proc/domain/executionservice/workflowrunstatechange/__init__.py b/lib/workload/stateless/stacks/workflow-manager/workflow_manager_proc/domain/executionservice/workflowrunstatechange/__init__.py index 394eb38ee..bef47cdab 100644 --- a/lib/workload/stateless/stacks/workflow-manager/workflow_manager_proc/domain/executionservice/workflowrunstatechange/__init__.py +++ b/lib/workload/stateless/stacks/workflow-manager/workflow_manager_proc/domain/executionservice/workflowrunstatechange/__init__.py @@ -4,5 +4,6 @@ from .marshaller import Marshaller from .AWSEvent import AWSEvent +from .LibraryRecord import LibraryRecord from .Payload import Payload from .WorkflowRunStateChange import WorkflowRunStateChange diff --git a/lib/workload/stateless/stacks/workflow-manager/workflow_manager_proc/domain/workflowmanager/workflowrunstatechange/LibraryRecord.py b/lib/workload/stateless/stacks/workflow-manager/workflow_manager_proc/domain/workflowmanager/workflowrunstatechange/LibraryRecord.py new file mode 100644 index 000000000..db8e1dee7 --- /dev/null +++ b/lib/workload/stateless/stacks/workflow-manager/workflow_manager_proc/domain/workflowmanager/workflowrunstatechange/LibraryRecord.py @@ -0,0 +1,92 @@ +# coding: utf-8 +import pprint +import re # noqa: F401 + +import six +from enum import Enum + +class LibraryRecord(object): + + + _types = { + 'libraryId': 'str', + 'orcabusId': 'str' + } + + _attribute_map = { + 'libraryId': 'libraryId', + 'orcabusId': 'orcabusId' + } + + def __init__(self, libraryId=None, orcabusId=None): # noqa: E501 + self._libraryId = None + self._orcabusId = None + self.discriminator = None + self.libraryId = libraryId + self.orcabusId = orcabusId + + + @property + def libraryId(self): + + return self._libraryId + + @libraryId.setter + def libraryId(self, libraryId): + + + self._libraryId = libraryId + + + @property + def orcabusId(self): + + return self._orcabusId + + @orcabusId.setter + def orcabusId(self, orcabusId): + + + self._orcabusId = orcabusId + + def to_dict(self): + result = {} + + for attr, _ in six.iteritems(self._types): + value = getattr(self, attr) + if isinstance(value, list): + result[attr] = list(map( + lambda x: x.to_dict() if hasattr(x, "to_dict") else x, + value + )) + elif hasattr(value, "to_dict"): + result[attr] = value.to_dict() + elif isinstance(value, dict): + result[attr] = dict(map( + lambda item: (item[0], item[1].to_dict()) + if hasattr(item[1], "to_dict") else item, + value.items() + )) + else: + result[attr] = value + if issubclass(LibraryRecord, dict): + for key, value in self.items(): + result[key] = value + + return result + + def to_str(self): + return pprint.pformat(self.to_dict()) + + def __repr__(self): + return self.to_str() + + def __eq__(self, other): + if not isinstance(other, LibraryRecord): + return False + + return self.__dict__ == other.__dict__ + + def __ne__(self, other): + return not self == other + diff --git a/lib/workload/stateless/stacks/workflow-manager/workflow_manager_proc/domain/workflowmanager/workflowrunstatechange/WorkflowRunStateChange.py b/lib/workload/stateless/stacks/workflow-manager/workflow_manager_proc/domain/workflowmanager/workflowrunstatechange/WorkflowRunStateChange.py index 276da2935..b0a2b545d 100644 --- a/lib/workload/stateless/stacks/workflow-manager/workflow_manager_proc/domain/workflowmanager/workflowrunstatechange/WorkflowRunStateChange.py +++ b/lib/workload/stateless/stacks/workflow-manager/workflow_manager_proc/domain/workflowmanager/workflowrunstatechange/WorkflowRunStateChange.py @@ -13,7 +13,7 @@ class WorkflowRunStateChange(object): 'workflowName': 'str', 'workflowVersion': 'str', 'workflowRunName': 'str', - 'linkedLibraries': 'list[str]', + 'linkedLibraries': 'list[LibraryRecord]', 'payload': 'Payload' } @@ -119,7 +119,7 @@ def workflowRunName(self, workflowRunName): self._workflowRunName = workflowRunName - + @property def linkedLibraries(self): diff --git a/lib/workload/stateless/stacks/workflow-manager/workflow_manager_proc/domain/workflowmanager/workflowrunstatechange/__init__.py b/lib/workload/stateless/stacks/workflow-manager/workflow_manager_proc/domain/workflowmanager/workflowrunstatechange/__init__.py index 394eb38ee..bef47cdab 100644 --- a/lib/workload/stateless/stacks/workflow-manager/workflow_manager_proc/domain/workflowmanager/workflowrunstatechange/__init__.py +++ b/lib/workload/stateless/stacks/workflow-manager/workflow_manager_proc/domain/workflowmanager/workflowrunstatechange/__init__.py @@ -4,5 +4,6 @@ from .marshaller import Marshaller from .AWSEvent import AWSEvent +from .LibraryRecord import LibraryRecord from .Payload import Payload from .WorkflowRunStateChange import WorkflowRunStateChange From 5a947117041f388e6b11edd9ebf85d699a39f8c4 Mon Sep 17 00:00:00 2001 From: Florian Reisinger Date: Tue, 20 Aug 2024 10:49:26 +1000 Subject: [PATCH 08/14] Update linked library model to contain lab and metadata IDs * added library endpoint * adopt metadata library id for internal model * fix generator / test --- .../commands/generate_mock_workflow_run.py | 32 +++++++++++--- ...ibraryassociation_workflowrun_libraries.py | 42 +++++++++++++++++++ .../workflow_manager/models/library.py | 8 ++-- .../workflow_manager/tests/factories.py | 8 +++- .../workflow_manager/urls/base.py | 3 +- .../workflow_manager/viewsets/library.py | 2 +- .../workflow_manager/viewsets/workflow_run.py | 1 + .../services/create_workflow_run.py | 32 +++++++------- .../services/get_workflow_run.py | 4 +- .../tests/test_workflow_srv.py | 15 ++++++- 10 files changed, 116 insertions(+), 31 deletions(-) create mode 100644 lib/workload/stateless/stacks/workflow-manager/workflow_manager/migrations/0002_library_libraryassociation_workflowrun_libraries.py diff --git a/lib/workload/stateless/stacks/workflow-manager/workflow_manager/management/commands/generate_mock_workflow_run.py b/lib/workload/stateless/stacks/workflow-manager/workflow_manager/management/commands/generate_mock_workflow_run.py index 14c2beb76..ee8dcf680 100644 --- a/lib/workload/stateless/stacks/workflow-manager/workflow_manager/management/commands/generate_mock_workflow_run.py +++ b/lib/workload/stateless/stacks/workflow-manager/workflow_manager/management/commands/generate_mock_workflow_run.py @@ -5,9 +5,11 @@ import json from datetime import datetime from libumccr import libjson -from workflow_manager.models import WorkflowRun, LibraryAssociation +from workflow_manager.models import Workflow, WorkflowRun, LibraryAssociation from workflow_manager.tests.factories import WorkflowRunFactory, WorkflowFactory, PayloadFactory, LibraryFactory +WORKFLOW_NAME = "TestWorkflow" + # https://docs.djangoproject.com/en/5.0/howto/custom-management-commands/ class Command(BaseCommand): @@ -15,21 +17,41 @@ class Command(BaseCommand): def handle(self, *args, **options): wf_payload = PayloadFactory() - wf_workflow = WorkflowFactory() + qs: QuerySet = Workflow.objects.filter(workflow_name=WORKFLOW_NAME) + + if qs.exists(): + print("Mock data found, Skipping creation.") + return + wf = WorkflowFactory(workflow_name=WORKFLOW_NAME) - wf: WorkflowRun = WorkflowRunFactory( + wfr: WorkflowRun = WorkflowRunFactory( workflow_run_name="MockWorkflowRun", + portal_run_id="1234", payload=wf_payload, - workflow=wf_workflow + workflow=wf ) library = LibraryFactory() LibraryAssociation.objects.create( - workflow_run=wf, + workflow_run=wfr, library=library, association_date=make_aware(datetime.now()), status="ACTIVE", ) + wfr2: WorkflowRun = WorkflowRunFactory( + workflow_run_name="MockWorkflowRun2", + portal_run_id="1235", + payload=wf_payload, + workflow=wf + ) + library2 = LibraryFactory(orcabus_id="lib.01J5M2JFE1JPYV62RYQEG99CP5", library_id="L000002") + LibraryAssociation.objects.create( + workflow_run=wfr2, + library=library2, + association_date=make_aware(datetime.now()), + status="ACTIVE", + ) + print(libjson.dumps(wf.to_dict())) print("Done") diff --git a/lib/workload/stateless/stacks/workflow-manager/workflow_manager/migrations/0002_library_libraryassociation_workflowrun_libraries.py b/lib/workload/stateless/stacks/workflow-manager/workflow_manager/migrations/0002_library_libraryassociation_workflowrun_libraries.py new file mode 100644 index 000000000..1d7e891f3 --- /dev/null +++ b/lib/workload/stateless/stacks/workflow-manager/workflow_manager/migrations/0002_library_libraryassociation_workflowrun_libraries.py @@ -0,0 +1,42 @@ +# Generated by Django 5.1 on 2024-08-19 01:52 + +import django.db.models.deletion +from django.db import migrations, models + + +class Migration(migrations.Migration): + + dependencies = [ + ('workflow_manager', '0001_initial'), + ] + + operations = [ + migrations.CreateModel( + name='Library', + fields=[ + ('orcabus_id', models.CharField(max_length=255, primary_key=True, serialize=False)), + ('library_id', models.CharField(max_length=255)), + ], + options={ + 'abstract': False, + }, + ), + migrations.CreateModel( + name='LibraryAssociation', + fields=[ + ('id', models.BigAutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID')), + ('association_date', models.DateTimeField()), + ('status', models.CharField(max_length=255)), + ('library', models.ForeignKey(on_delete=django.db.models.deletion.CASCADE, to='workflow_manager.library')), + ('workflow_run', models.ForeignKey(on_delete=django.db.models.deletion.CASCADE, to='workflow_manager.workflowrun')), + ], + options={ + 'abstract': False, + }, + ), + migrations.AddField( + model_name='workflowrun', + name='libraries', + field=models.ManyToManyField(through='workflow_manager.LibraryAssociation', to='workflow_manager.library'), + ), + ] diff --git a/lib/workload/stateless/stacks/workflow-manager/workflow_manager/models/library.py b/lib/workload/stateless/stacks/workflow-manager/workflow_manager/models/library.py index 940f03e69..fc57ac4dc 100644 --- a/lib/workload/stateless/stacks/workflow-manager/workflow_manager/models/library.py +++ b/lib/workload/stateless/stacks/workflow-manager/workflow_manager/models/library.py @@ -10,17 +10,17 @@ class LibraryManager(OrcaBusBaseManager): class Library(OrcaBusBaseModel): - id = models.BigAutoField(primary_key=True) - library_id = models.CharField(max_length=255, unique=True) + orcabus_id = models.CharField(primary_key=True, max_length=255) + library_id = models.CharField(max_length=255) objects = LibraryManager() def __str__(self): - return f"ID: {self.id}, library_id: {self.library_id}" + return f"orcabus_id: {self.orcabus_id}, library_id: {self.library_id}" def to_dict(self): return { - "id": self.id, + "orcabus_id": self.orcabus_id, "library_id": self.library_id } diff --git a/lib/workload/stateless/stacks/workflow-manager/workflow_manager/tests/factories.py b/lib/workload/stateless/stacks/workflow-manager/workflow_manager/tests/factories.py index 916b0ccbe..a530cddaf 100644 --- a/lib/workload/stateless/stacks/workflow-manager/workflow_manager/tests/factories.py +++ b/lib/workload/stateless/stacks/workflow-manager/workflow_manager/tests/factories.py @@ -16,8 +16,11 @@ class TestConstant(Enum): "foo": uuid.uuid4(), "bar": datetime.now().astimezone(ZoneInfo('Australia/Sydney')), "sub": {"my": "sub"} + }, + library = { + "library_id": "L000001", + "orcabus_id": "lib.01J5M2J44HFJ9424G7074NKTGN" } - library_id = "L000001" class WorkflowFactory(factory.django.DjangoModelFactory): @@ -60,4 +63,5 @@ class LibraryFactory(factory.django.DjangoModelFactory): class Meta: model = Library - library_id = TestConstant.library_id.value + library_id = TestConstant.library.value["library_id"] + orcabus_id = TestConstant.library.value["orcabus_id"] diff --git a/lib/workload/stateless/stacks/workflow-manager/workflow_manager/urls/base.py b/lib/workload/stateless/stacks/workflow-manager/workflow_manager/urls/base.py index beb14bdc2..e642cc8e2 100644 --- a/lib/workload/stateless/stacks/workflow-manager/workflow_manager/urls/base.py +++ b/lib/workload/stateless/stacks/workflow-manager/workflow_manager/urls/base.py @@ -4,6 +4,7 @@ from workflow_manager.viewsets.workflow import WorkflowViewSet from workflow_manager.viewsets.workflow_run import WorkflowRunViewSet from workflow_manager.viewsets.payload import PayloadViewSet +from workflow_manager.viewsets.library import LibraryViewSet from workflow_manager.settings.base import API_VERSION api_namespace = "api" @@ -14,7 +15,7 @@ router.register(r"workflow", WorkflowViewSet, basename="workflow") router.register(r"workflowrun", WorkflowRunViewSet, basename="workflowrun") router.register(r"payload", PayloadViewSet, basename="payload") -router.register(r"library", PayloadViewSet, basename="library") +router.register(r"library", LibraryViewSet, basename="library") urlpatterns = [ path(f"{api_base}", include(router.urls)), diff --git a/lib/workload/stateless/stacks/workflow-manager/workflow_manager/viewsets/library.py b/lib/workload/stateless/stacks/workflow-manager/workflow_manager/viewsets/library.py index d529aa5a9..bf2d5f150 100644 --- a/lib/workload/stateless/stacks/workflow-manager/workflow_manager/viewsets/library.py +++ b/lib/workload/stateless/stacks/workflow-manager/workflow_manager/viewsets/library.py @@ -11,7 +11,7 @@ class LibraryViewSet(ReadOnlyModelViewSet): pagination_class = StandardResultsSetPagination filter_backends = [filters.OrderingFilter, filters.SearchFilter] ordering_fields = '__all__' - ordering = ['-id'] + ordering = ['-orcabus_id'] search_fields = Library.get_base_fields() def get_queryset(self): diff --git a/lib/workload/stateless/stacks/workflow-manager/workflow_manager/viewsets/workflow_run.py b/lib/workload/stateless/stacks/workflow-manager/workflow_manager/viewsets/workflow_run.py index 842abc907..baac9093b 100644 --- a/lib/workload/stateless/stacks/workflow-manager/workflow_manager/viewsets/workflow_run.py +++ b/lib/workload/stateless/stacks/workflow-manager/workflow_manager/viewsets/workflow_run.py @@ -15,4 +15,5 @@ class WorkflowRunViewSet(ReadOnlyModelViewSet): search_fields = WorkflowRun.get_base_fields() def get_queryset(self): + print(self.request.query_params) return WorkflowRun.objects.get_by_keyword(**self.request.query_params) diff --git a/lib/workload/stateless/stacks/workflow-manager/workflow_manager_proc/services/create_workflow_run.py b/lib/workload/stateless/stacks/workflow-manager/workflow_manager_proc/services/create_workflow_run.py index 76abca99c..dc0295d8c 100644 --- a/lib/workload/stateless/stacks/workflow-manager/workflow_manager_proc/services/create_workflow_run.py +++ b/lib/workload/stateless/stacks/workflow-manager/workflow_manager_proc/services/create_workflow_run.py @@ -10,6 +10,7 @@ from django.utils.timezone import make_aware from workflow_manager_proc.domain.executionservice.workflowrunstatechange import ( WorkflowRunStateChange, + LibraryRecord, Marshaller, ) from workflow_manager.models.workflow_run import ( @@ -80,21 +81,24 @@ def handler(event, context): wfr.save() # if the workflow run is linked to library record(s), create the association(s) - input_libraries: list[str] = wrsc.linkedLibraries - for input_lib in input_libraries: - # check if the library has already a DB record - db_lib: Library = Library.objects.get_by_keyword(library_id=input_lib) - # create it if not - if not db_lib: - db_lib = Library.objects.create(library_id=input_lib) + input_libraries: list[LibraryRecord] = wrsc.linkedLibraries + if input_libraries: + for input_rec in input_libraries: + # check if the library has already a DB record + db_lib: Library = Library.objects.get_by_keyword(orcabus_id=input_rec.orcabusId) + # create it if not + if not db_lib: + # TODO: the library record should exist in the future - synced with metadata service on + # LibraryStateChange events + db_lib = Library.objects.create(orcabus_id=input_rec.orcabusId, library_id=input_rec.libraryId) - # create the library association - LibraryAssociation.objects.create( - workflow_run=wfr, - library=db_lib, - association_date=make_aware(datetime.now()), - status=ASSOCIATION_STATUS, - ) + # create the library association + LibraryAssociation.objects.create( + workflow_run=wfr, + library=db_lib, + association_date=make_aware(datetime.now()), + status=ASSOCIATION_STATUS, + ) print(f"{__name__} done.") return wfr # FIXME: serialise in future (json.dumps) diff --git a/lib/workload/stateless/stacks/workflow-manager/workflow_manager_proc/services/get_workflow_run.py b/lib/workload/stateless/stacks/workflow-manager/workflow_manager_proc/services/get_workflow_run.py index 13ac0a716..42a6d5f68 100644 --- a/lib/workload/stateless/stacks/workflow-manager/workflow_manager_proc/services/get_workflow_run.py +++ b/lib/workload/stateless/stacks/workflow-manager/workflow_manager_proc/services/get_workflow_run.py @@ -24,11 +24,11 @@ def handler(event, context): # time_window = event.get('time_window', None) # FIXME: make configurable later? qs = WorkflowRun.objects.filter( - portal_run_id = portal_run_id + portal_run_id=portal_run_id ) if status: qs = qs.filter( - status = status + status=status ) if timestamp: dt = datetime.datetime.fromisoformat(str(timestamp)) diff --git a/lib/workload/stateless/stacks/workflow-manager/workflow_manager_proc/tests/test_workflow_srv.py b/lib/workload/stateless/stacks/workflow-manager/workflow_manager_proc/tests/test_workflow_srv.py index a7c8a5407..829c1b0fb 100644 --- a/lib/workload/stateless/stacks/workflow-manager/workflow_manager_proc/tests/test_workflow_srv.py +++ b/lib/workload/stateless/stacks/workflow-manager/workflow_manager_proc/tests/test_workflow_srv.py @@ -45,7 +45,17 @@ def test_get_workflow_from_db2(self): """ python manage.py test workflow_manager_proc.tests.test_workflow_srv.WorkflowSrvUnitTests.test_get_workflow_from_db2 """ - lib_ids = ["L000001", "L000002"] + library_ids = ["L000001", "L000002"] + lib_ids = [ + { + "libraryId": library_ids[0], + "orcabusId": "lib.01J5M2J44HFJ9424G7074NKTGN" + }, + { + "libraryId": library_ids[1], + "orcabusId": "lib.01J5M2JFE1JPYV62RYQEG99CP5" + } + ] test_event = { "portalRunId": "202405012397gatc", @@ -78,4 +88,5 @@ def test_get_workflow_from_db2(self): self.assertEqual("ctTSO500-L000002", test_wfl.workflow_run_name) libs = test_wfl.libraries.all() for lib in libs: - self.assertIn(lib.library_id, lib_ids) + logger.info(lib) + self.assertIn(lib.library_id, library_ids) From 24a4c158ed0e7f8c471047c2292e16b1f111ac32 Mon Sep 17 00:00:00 2001 From: Florian Reisinger Date: Tue, 20 Aug 2024 14:10:36 +1000 Subject: [PATCH 09/14] Remove library endpoint --- .../stacks/workflow-manager/workflow_manager/urls/base.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/workload/stateless/stacks/workflow-manager/workflow_manager/urls/base.py b/lib/workload/stateless/stacks/workflow-manager/workflow_manager/urls/base.py index e642cc8e2..ce9d7ca47 100644 --- a/lib/workload/stateless/stacks/workflow-manager/workflow_manager/urls/base.py +++ b/lib/workload/stateless/stacks/workflow-manager/workflow_manager/urls/base.py @@ -15,7 +15,7 @@ router.register(r"workflow", WorkflowViewSet, basename="workflow") router.register(r"workflowrun", WorkflowRunViewSet, basename="workflowrun") router.register(r"payload", PayloadViewSet, basename="payload") -router.register(r"library", LibraryViewSet, basename="library") +# router.register(r"library", LibraryViewSet, basename="library") urlpatterns = [ path(f"{api_base}", include(router.urls)), From 437742e08191ff2d43daab9f25a7396c3943a8ef Mon Sep 17 00:00:00 2001 From: Florian Reisinger Date: Tue, 20 Aug 2024 16:50:16 +1000 Subject: [PATCH 10/14] Fix newline --- .../events/workflowmanager/WorkflowRunStateChange.schema.json | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/schemas/events/workflowmanager/WorkflowRunStateChange.schema.json b/docs/schemas/events/workflowmanager/WorkflowRunStateChange.schema.json index e3932acd2..e30646448 100644 --- a/docs/schemas/events/workflowmanager/WorkflowRunStateChange.schema.json +++ b/docs/schemas/events/workflowmanager/WorkflowRunStateChange.schema.json @@ -119,4 +119,4 @@ } } } -} \ No newline at end of file +} From e12f0a8cf9eee78553b8fd193e4a168f08e7050b Mon Sep 17 00:00:00 2001 From: Florian Reisinger Date: Tue, 20 Aug 2024 18:00:12 +1000 Subject: [PATCH 11/14] Refactor service methods according to model changes --- .../workflow_manager/models/__init__.py | 1 + .../workflow_manager/models/state.py | 41 +++++++++++++ .../workflow_manager/models/workflow_run.py | 22 +++---- .../lambdas/handle_service_wrsc_event.py | 57 +++++++++++-------- .../services/__init__.py | 28 +++++++++ .../services/create_workflow_run.py | 48 ++++++++-------- .../services/get_workflow_run.py | 49 ---------------- 7 files changed, 135 insertions(+), 111 deletions(-) create mode 100644 lib/workload/stateless/stacks/workflow-manager/workflow_manager/models/state.py delete mode 100644 lib/workload/stateless/stacks/workflow-manager/workflow_manager_proc/services/get_workflow_run.py diff --git a/lib/workload/stateless/stacks/workflow-manager/workflow_manager/models/__init__.py b/lib/workload/stateless/stacks/workflow-manager/workflow_manager/models/__init__.py index a1e68b280..81f4f3d24 100644 --- a/lib/workload/stateless/stacks/workflow-manager/workflow_manager/models/__init__.py +++ b/lib/workload/stateless/stacks/workflow-manager/workflow_manager/models/__init__.py @@ -4,3 +4,4 @@ from .payload import Payload from .workflow_run import WorkflowRun, LibraryAssociation from .library import Library +from .state import State \ No newline at end of file diff --git a/lib/workload/stateless/stacks/workflow-manager/workflow_manager/models/state.py b/lib/workload/stateless/stacks/workflow-manager/workflow_manager/models/state.py new file mode 100644 index 000000000..76f927ae1 --- /dev/null +++ b/lib/workload/stateless/stacks/workflow-manager/workflow_manager/models/state.py @@ -0,0 +1,41 @@ +from django.db import models + +from workflow_manager.models.base import OrcaBusBaseModel, OrcaBusBaseManager +from workflow_manager.models import WorkflowRun, Payload + + +class StateManager(OrcaBusBaseManager): + pass + + +class State(OrcaBusBaseModel): + class Meta: + unique_together = ["workflow_run", "status", "timestamp"] + + id = models.BigAutoField(primary_key=True) + + # --- mandatory fields + workflow_run = models.ForeignKey(WorkflowRun, on_delete=models.CASCADE) + status = models.CharField(max_length=255) + timestamp = models.DateTimeField() + + comment = models.CharField(max_length=255, null=True, blank=True) + + # Link to workflow run payload data + payload = models.ForeignKey(Payload, null=True, blank=True, on_delete=models.SET_NULL) + + objects = StateManager() + + def __str__(self): + return f"ID: {self.id}, status: {self.status}" + + def to_dict(self): + return { + "id": self.id, + "workflow_run_id": self.workflow_run.id, + "status": self.status, + "timestamp": str(self.timestamp), + "comment": self.comment, + "payload": self.payload.to_dict() if (self.payload is not None) else None, + } + diff --git a/lib/workload/stateless/stacks/workflow-manager/workflow_manager/models/workflow_run.py b/lib/workload/stateless/stacks/workflow-manager/workflow_manager/models/workflow_run.py index baba1278f..26fc64a59 100644 --- a/lib/workload/stateless/stacks/workflow-manager/workflow_manager/models/workflow_run.py +++ b/lib/workload/stateless/stacks/workflow-manager/workflow_manager/models/workflow_run.py @@ -1,9 +1,7 @@ from django.db import models from workflow_manager.models.base import OrcaBusBaseModel, OrcaBusBaseManager -from workflow_manager.models.payload import Payload -from workflow_manager.models.workflow import Workflow -from workflow_manager.models.library import Library +from workflow_manager.models import Workflow, Library, State class WorkflowRunManager(OrcaBusBaseManager): @@ -11,16 +9,15 @@ class WorkflowRunManager(OrcaBusBaseManager): class WorkflowRun(OrcaBusBaseModel): - class Meta: - unique_together = ["portal_run_id", "status", "timestamp"] id = models.BigAutoField(primary_key=True) # --- mandatory fields - portal_run_id = models.CharField(max_length=255) - status = models.CharField(max_length=255) - timestamp = models.DateTimeField() + portal_run_id = models.CharField(max_length=255, unique=True) + current_status = models.CharField(max_length=255) + created = models.DateTimeField() + last_modified = models.DateTimeField() # --- optional fields @@ -34,9 +31,6 @@ class Meta: # Link to workflow table workflow = models.ForeignKey(Workflow, null=True, blank=True, on_delete=models.SET_NULL) - # Link to workflow payload data - payload = models.ForeignKey(Payload, null=True, blank=True, on_delete=models.SET_NULL) - # Link to library table libraries = models.ManyToManyField(Library, through="LibraryAssociation") @@ -49,12 +43,12 @@ def to_dict(self): return { "id": self.id, "portal_run_id": self.portal_run_id, - "status": self.status, - "timestamp": str(self.timestamp), + "current_status": self.current_status, + "created": str(self.created), + "last_modified": str(self.last_modified), "execution_id": self.execution_id, "workflow_run_name": self.workflow_run_name, "comment": self.comment, - "payload": self.payload.to_dict() if (self.payload is not None) else None, "workflow": self.workflow.to_dict() if (self.workflow is not None) else None } diff --git a/lib/workload/stateless/stacks/workflow-manager/workflow_manager_proc/lambdas/handle_service_wrsc_event.py b/lib/workload/stateless/stacks/workflow-manager/workflow_manager_proc/lambdas/handle_service_wrsc_event.py index e19b1c120..cba142950 100644 --- a/lib/workload/stateless/stacks/workflow-manager/workflow_manager_proc/lambdas/handle_service_wrsc_event.py +++ b/lib/workload/stateless/stacks/workflow-manager/workflow_manager_proc/lambdas/handle_service_wrsc_event.py @@ -3,10 +3,14 @@ django.setup() # --- keep ^^^ at top of the module -from workflow_manager.models.workflow_run import WorkflowRun +import datetime +from workflow_manager.models import WorkflowRun, State import workflow_manager_proc.domain.executionservice.workflowrunstatechange as srv import workflow_manager_proc.domain.workflowmanager.workflowrunstatechange as wfm -from workflow_manager_proc.services import get_workflow_run, create_workflow_run, emit_workflow_run_state_change +from workflow_manager_proc.services import create_workflow_run, emit_workflow_run_state_change, \ + create_workflow_run_state + +default_time_window = datetime.timedelta(hours=1) def handler(event, context): @@ -16,49 +20,56 @@ def handler(event, context): input_event: srv.AWSEvent = srv.Marshaller.unmarshall(event, srv.AWSEvent) input_wrsc: srv.WorkflowRunStateChange = input_event.detail - query = { - "portal_run_id": input_wrsc.portalRunId, - "status": input_wrsc.status, - "timestamp": input_wrsc.timestamp - } - print(f"Finding WorkflowRun records for query:{query}") - wrsc_matches = get_workflow_run.handler(query, None) # FIXME: may only need to be a "exist" query + print(f"Finding WorkflowRun records for portal_run_id:{input_wrsc.portalRunId}") + try: + wfr: WorkflowRun = WorkflowRun.objects.get(portal_run_id=input_wrsc.portalRunId) + except Exception: + wfr: WorkflowRun = create_workflow_run.handler(srv.Marshaller.marshall(input_wrsc), None) + + state_matches = State.objects.filter(workflow_run=wfr) + if input_wrsc.status: + state_matches = state_matches.filter(status=input_wrsc.status) + if input_wrsc.timestamp: + dt = datetime.datetime.fromisoformat(str(input_wrsc.timestamp)) + start_t = dt - default_time_window + end_t = dt + default_time_window + state_matches = state_matches.filter(timestamp__range=(start_t, end_t)) - # check workflow run list - if len(wrsc_matches) == 0: - print(f"No matching WorkflowRun found. Creating...") - # create new entry - db_wfr: WorkflowRun = create_workflow_run.handler(srv.Marshaller.marshall(input_wrsc), None) + # check state list + if len(state_matches) == 0: + print(f"No matching WorkflowRun State found. Creating...") + # create new state entry + wfr_state: State = create_workflow_run_state(wrsc=input_wrsc, wfr=wfr) # create outgoing event - out_event = map_db_record_to_wrsc(db_wfr) + out_event = map_db_record_to_wrsc(wfr, wfr_state) # emit state change print("Emitting WRSC.") emit_workflow_run_state_change.handler(wfm.Marshaller.marshall(out_event), None) else: # ignore - status already exists - print(f"WorkflowRun already exists. Nothing to do.") + print(f"WorkflowRun state already exists. Nothing to do.") print(f"{__name__} done.") -def map_db_record_to_wrsc(db_record: WorkflowRun) -> wfm.WorkflowRunStateChange: +def map_db_record_to_wrsc(db_record: WorkflowRun, state: State) -> wfm.WorkflowRunStateChange: wrsc = wfm.WorkflowRunStateChange( portalRunId=db_record.portal_run_id, - timestamp=db_record.timestamp, - status=db_record.status, + timestamp=state.timestamp, + status=state.status, workflowName=db_record.workflow.workflow_name, workflowVersion=db_record.workflow.workflow_version, workflowRunName=db_record.workflow_run_name, ) # handle condition: Payload is optional - if db_record.payload: + if state.payload: wrsc.payload = wfm.Payload( - refId=db_record.payload.payload_ref_id, - version=db_record.payload.version, - data=db_record.payload.data + refId=state.payload.payload_ref_id, + version=state.payload.version, + data=state.payload.data ) return wrsc diff --git a/lib/workload/stateless/stacks/workflow-manager/workflow_manager_proc/services/__init__.py b/lib/workload/stateless/stacks/workflow-manager/workflow_manager_proc/services/__init__.py index e69de29bb..2775986e0 100644 --- a/lib/workload/stateless/stacks/workflow-manager/workflow_manager_proc/services/__init__.py +++ b/lib/workload/stateless/stacks/workflow-manager/workflow_manager_proc/services/__init__.py @@ -0,0 +1,28 @@ +import uuid +from workflow_manager_proc.domain.executionservice.workflowrunstatechange import WorkflowRunStateChange +from workflow_manager.models import WorkflowRun, State, Payload + + +def create_workflow_run_state(wrsc: WorkflowRunStateChange, wfr: WorkflowRun): + input_payload: Payload = wrsc.payload + pld = None + if input_payload: + pld: Payload = Payload( + payload_ref_id=str(uuid.uuid4()), + version=input_payload.version, + data=input_payload.data, + ) + print("Persisting Payload record.") + pld.save() + + # create state for the workflow run + workflow_state: State = State( + workflow_run=wfr, + status=wrsc.status, + timestamp=wrsc.timestamp, + comment=None, + payload=pld + ) + workflow_state.save() + + return workflow_state diff --git a/lib/workload/stateless/stacks/workflow-manager/workflow_manager_proc/services/create_workflow_run.py b/lib/workload/stateless/stacks/workflow-manager/workflow_manager_proc/services/create_workflow_run.py index dc0295d8c..5310f7472 100644 --- a/lib/workload/stateless/stacks/workflow-manager/workflow_manager_proc/services/create_workflow_run.py +++ b/lib/workload/stateless/stacks/workflow-manager/workflow_manager_proc/services/create_workflow_run.py @@ -13,13 +13,15 @@ LibraryRecord, Marshaller, ) -from workflow_manager.models.workflow_run import ( +from workflow_manager.models import ( WorkflowRun, Workflow, + State, Payload, Library, LibraryAssociation, ) +from . import create_workflow_run_state ASSOCIATION_STATUS = "ACTIVE" @@ -53,33 +55,29 @@ def handler(event, context): print("Persisting Workflow record.") workflow.save() - # then create the actual workflow run state change entry - wfr = WorkflowRun( - workflow=workflow, - portal_run_id=wrsc.portalRunId, - execution_id=wrsc.executionId, # the execution service WRSC does carry the execution ID - workflow_run_name=wrsc.workflowRunName, - status=wrsc.status, - comment=None, - timestamp=wrsc.timestamp, - ) - - # if payload is not null, create a new payload entry and assign a unique reference ID for it - input_payload: Payload = wrsc.payload - if input_payload: - pld = Payload( - payload_ref_id=str(uuid.uuid4()), - version=input_payload.version, - data=input_payload.data, + # then create the actual workflow run entry if it does not exist + try: + wfr: WorkflowRun = WorkflowRun.objects.get(portal_run_id=wrsc.portalRunId) + wfr.current_status = wrsc.status + wfr.last_modified = wrsc.timestamp + except Exception: + print("No workflow found! Creating new entry.") + wfr = WorkflowRun( + workflow=workflow, + portal_run_id=wrsc.portalRunId, + execution_id=wrsc.executionId, # the execution service WRSC does carry the execution ID + workflow_run_name=wrsc.workflowRunName, + current_status=wrsc.status, + comment=None, + last_modified=wrsc.timestamp, + created=wrsc.timestamp ) - print("Persisting Payload record.") - pld.save() - - wfr.payload = pld # Note: payload type depend on workflow + status and will carry a version in it - - print("Persisting WorkflowRun record.") + print("Persisting Workflow record.") wfr.save() + # create the related state & payload entries for the WRSC + create_workflow_run_state(wrsc=wrsc, wfr=wfr) + # if the workflow run is linked to library record(s), create the association(s) input_libraries: list[LibraryRecord] = wrsc.linkedLibraries if input_libraries: diff --git a/lib/workload/stateless/stacks/workflow-manager/workflow_manager_proc/services/get_workflow_run.py b/lib/workload/stateless/stacks/workflow-manager/workflow_manager_proc/services/get_workflow_run.py deleted file mode 100644 index 42a6d5f68..000000000 --- a/lib/workload/stateless/stacks/workflow-manager/workflow_manager_proc/services/get_workflow_run.py +++ /dev/null @@ -1,49 +0,0 @@ -# import django - -# django.setup() - -# # --- keep ^^^ at top of the module -import datetime -from workflow_manager.models.workflow_run import WorkflowRun - -default_time_window = datetime.timedelta(hours=1) - -def handler(event, context): - """event will be - { - portal_run_id: "", - status: "", # optional - timestamp: "" # optional - time_window: "" # currenty not used, defaults 1h - } - """ - print(f"Processing get_workflow_run with: {event}, {context}") - portal_run_id = event['portal_run_id'] - status = event.get('status', None) - timestamp = event.get('timestamp', None) - # time_window = event.get('time_window', None) # FIXME: make configurable later? - - qs = WorkflowRun.objects.filter( - portal_run_id=portal_run_id - ) - if status: - qs = qs.filter( - status=status - ) - if timestamp: - dt = datetime.datetime.fromisoformat(str(timestamp)) - print(f"Filter for time window around: {str(timestamp)}") - start_t = dt - default_time_window - end_t = dt + default_time_window - print(f"Time window from {start_t} to {end_t}.") - qs = qs.filter( - timestamp__range=(start_t, end_t) - ) - - workflow_runs = [] - for w in qs.all(): - workflow_runs.append(w) - print(w.to_dict()) - print(f"Found {len(workflow_runs)} WorkflowRun records.") - - return workflow_runs # FIXME: need to deserialise in future From b6dcd4efb76d917d37664cc94be4296d225f1a80 Mon Sep 17 00:00:00 2001 From: Florian Reisinger Date: Thu, 22 Aug 2024 14:54:01 +1000 Subject: [PATCH 12/14] Add state sub-endpoint to workflowrun API --- .../commands/generate_mock_workflow_run.py | 20 ++- .../migrations/0001_initial.py | 121 ++++++++++-------- ...ibraryassociation_workflowrun_libraries.py | 42 ------ .../workflow_manager/models/__init__.py | 2 +- .../workflow_manager/models/library.py | 1 - .../workflow_manager/models/state.py | 3 +- .../workflow_manager/models/workflow_run.py | 9 +- .../workflow_manager/serializers.py | 12 +- .../workflow_manager/tests/factories.py | 18 ++- .../workflow_manager/urls/base.py | 9 +- .../workflow_manager/viewsets/state.py | 22 ++++ .../services/create_workflow_run.py | 9 +- .../tests/test_workflow_srv.py | 21 ++- 13 files changed, 153 insertions(+), 136 deletions(-) delete mode 100644 lib/workload/stateless/stacks/workflow-manager/workflow_manager/migrations/0002_library_libraryassociation_workflowrun_libraries.py create mode 100644 lib/workload/stateless/stacks/workflow-manager/workflow_manager/viewsets/state.py diff --git a/lib/workload/stateless/stacks/workflow-manager/workflow_manager/management/commands/generate_mock_workflow_run.py b/lib/workload/stateless/stacks/workflow-manager/workflow_manager/management/commands/generate_mock_workflow_run.py index ee8dcf680..e7ff462f0 100644 --- a/lib/workload/stateless/stacks/workflow-manager/workflow_manager/management/commands/generate_mock_workflow_run.py +++ b/lib/workload/stateless/stacks/workflow-manager/workflow_manager/management/commands/generate_mock_workflow_run.py @@ -5,8 +5,9 @@ import json from datetime import datetime from libumccr import libjson -from workflow_manager.models import Workflow, WorkflowRun, LibraryAssociation -from workflow_manager.tests.factories import WorkflowRunFactory, WorkflowFactory, PayloadFactory, LibraryFactory +from workflow_manager.models import Workflow, WorkflowRun, LibraryAssociation, State +from workflow_manager.tests.factories import WorkflowRunFactory, WorkflowFactory, PayloadFactory, LibraryFactory, \ + StateFactory WORKFLOW_NAME = "TestWorkflow" @@ -16,7 +17,6 @@ class Command(BaseCommand): help = "Generate mock Workflow data into database for local development and testing" def handle(self, *args, **options): - wf_payload = PayloadFactory() qs: QuerySet = Workflow.objects.filter(workflow_name=WORKFLOW_NAME) if qs.exists(): @@ -27,10 +27,15 @@ def handle(self, *args, **options): wfr: WorkflowRun = WorkflowRunFactory( workflow_run_name="MockWorkflowRun", portal_run_id="1234", - payload=wf_payload, workflow=wf ) + wf_payload = PayloadFactory() + StateFactory( + workflow_run=wfr, + payload=wf_payload + ) + library = LibraryFactory() LibraryAssociation.objects.create( workflow_run=wfr, @@ -42,9 +47,14 @@ def handle(self, *args, **options): wfr2: WorkflowRun = WorkflowRunFactory( workflow_run_name="MockWorkflowRun2", portal_run_id="1235", - payload=wf_payload, workflow=wf ) + StateFactory( + workflow_run=wfr2, + status="RUNNING", + payload=wf_payload + ) + library2 = LibraryFactory(orcabus_id="lib.01J5M2JFE1JPYV62RYQEG99CP5", library_id="L000002") LibraryAssociation.objects.create( workflow_run=wfr2, diff --git a/lib/workload/stateless/stacks/workflow-manager/workflow_manager/migrations/0001_initial.py b/lib/workload/stateless/stacks/workflow-manager/workflow_manager/migrations/0001_initial.py index 67baab19e..484ed0407 100644 --- a/lib/workload/stateless/stacks/workflow-manager/workflow_manager/migrations/0001_initial.py +++ b/lib/workload/stateless/stacks/workflow-manager/workflow_manager/migrations/0001_initial.py @@ -1,4 +1,4 @@ -# Generated by Django 5.0.6 on 2024-05-17 20:10 +# Generated by Django 5.1 on 2024-08-21 05:48 import django.core.serializers.json import django.db.models.deletion @@ -9,77 +9,90 @@ class Migration(migrations.Migration): initial = True - dependencies = [] + dependencies = [ + ] operations = [ migrations.CreateModel( - name="Payload", + name='Library', + fields=[ + ('orcabus_id', models.CharField(max_length=255, primary_key=True, serialize=False)), + ('library_id', models.CharField(max_length=255)), + ], + options={ + 'abstract': False, + }, + ), + migrations.CreateModel( + name='Payload', + fields=[ + ('id', models.BigAutoField(primary_key=True, serialize=False)), + ('payload_ref_id', models.CharField(max_length=255, unique=True)), + ('version', models.CharField(max_length=255)), + ('data', models.JSONField(encoder=django.core.serializers.json.DjangoJSONEncoder)), + ], + options={ + 'abstract': False, + }, + ), + migrations.CreateModel( + name='LibraryAssociation', + fields=[ + ('id', models.BigAutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID')), + ('association_date', models.DateTimeField()), + ('status', models.CharField(max_length=255)), + ('library', models.ForeignKey(on_delete=django.db.models.deletion.CASCADE, to='workflow_manager.library')), + ], + options={ + 'abstract': False, + }, + ), + migrations.CreateModel( + name='Workflow', fields=[ - ("id", models.BigAutoField(primary_key=True, serialize=False)), - ("payload_ref_id", models.CharField(max_length=255, unique=True)), - ("version", models.CharField(max_length=255)), - ( - "data", - models.JSONField( - encoder=django.core.serializers.json.DjangoJSONEncoder - ), - ), + ('id', models.BigAutoField(primary_key=True, serialize=False)), + ('workflow_name', models.CharField(max_length=255)), + ('workflow_version', models.CharField(max_length=255)), + ('execution_engine', models.CharField(max_length=255)), + ('execution_engine_pipeline_id', models.CharField(max_length=255)), + ('approval_state', models.CharField(max_length=255)), ], options={ - "abstract": False, + 'unique_together': {('workflow_name', 'workflow_version')}, }, ), migrations.CreateModel( - name="Workflow", + name='WorkflowRun', fields=[ - ("id", models.BigAutoField(primary_key=True, serialize=False)), - ("workflow_name", models.CharField(max_length=255)), - ("workflow_version", models.CharField(max_length=255)), - ("execution_engine", models.CharField(max_length=255)), - ("execution_engine_pipeline_id", models.CharField(max_length=255)), - ("approval_state", models.CharField(max_length=255)), + ('id', models.BigAutoField(primary_key=True, serialize=False)), + ('portal_run_id', models.CharField(max_length=255, unique=True)), + ('execution_id', models.CharField(blank=True, max_length=255, null=True)), + ('workflow_run_name', models.CharField(blank=True, max_length=255, null=True)), + ('comment', models.CharField(blank=True, max_length=255, null=True)), + ('libraries', models.ManyToManyField(through='workflow_manager.LibraryAssociation', to='workflow_manager.library')), + ('workflow', models.ForeignKey(blank=True, null=True, on_delete=django.db.models.deletion.SET_NULL, to='workflow_manager.workflow')), ], options={ - "unique_together": {("workflow_name", "workflow_version")}, + 'abstract': False, }, ), + migrations.AddField( + model_name='libraryassociation', + name='workflow_run', + field=models.ForeignKey(on_delete=django.db.models.deletion.CASCADE, to='workflow_manager.workflowrun'), + ), migrations.CreateModel( - name="WorkflowRun", + name='State', fields=[ - ("id", models.BigAutoField(primary_key=True, serialize=False)), - ("portal_run_id", models.CharField(max_length=255)), - ("status", models.CharField(max_length=255)), - ("timestamp", models.DateTimeField()), - ( - "execution_id", - models.CharField(blank=True, max_length=255, null=True), - ), - ( - "workflow_run_name", - models.CharField(blank=True, max_length=255, null=True), - ), - ("comment", models.CharField(blank=True, max_length=255, null=True)), - ( - "payload", - models.ForeignKey( - blank=True, - null=True, - on_delete=django.db.models.deletion.SET_NULL, - to="workflow_manager.payload", - ), - ), - ( - "workflow", - models.ForeignKey( - blank=True, - null=True, - on_delete=django.db.models.deletion.SET_NULL, - to="workflow_manager.workflow", - ), - ), + ('id', models.BigAutoField(primary_key=True, serialize=False)), + ('status', models.CharField(max_length=255)), + ('timestamp', models.DateTimeField()), + ('comment', models.CharField(blank=True, max_length=255, null=True)), + ('payload', models.ForeignKey(blank=True, null=True, on_delete=django.db.models.deletion.SET_NULL, to='workflow_manager.payload')), + ('workflow_run', models.ForeignKey(on_delete=django.db.models.deletion.CASCADE, to='workflow_manager.workflowrun')), ], options={ - "unique_together": {("portal_run_id", "status", "timestamp")}, + 'unique_together': {('workflow_run', 'status', 'timestamp')}, }, ), ] diff --git a/lib/workload/stateless/stacks/workflow-manager/workflow_manager/migrations/0002_library_libraryassociation_workflowrun_libraries.py b/lib/workload/stateless/stacks/workflow-manager/workflow_manager/migrations/0002_library_libraryassociation_workflowrun_libraries.py deleted file mode 100644 index 1d7e891f3..000000000 --- a/lib/workload/stateless/stacks/workflow-manager/workflow_manager/migrations/0002_library_libraryassociation_workflowrun_libraries.py +++ /dev/null @@ -1,42 +0,0 @@ -# Generated by Django 5.1 on 2024-08-19 01:52 - -import django.db.models.deletion -from django.db import migrations, models - - -class Migration(migrations.Migration): - - dependencies = [ - ('workflow_manager', '0001_initial'), - ] - - operations = [ - migrations.CreateModel( - name='Library', - fields=[ - ('orcabus_id', models.CharField(max_length=255, primary_key=True, serialize=False)), - ('library_id', models.CharField(max_length=255)), - ], - options={ - 'abstract': False, - }, - ), - migrations.CreateModel( - name='LibraryAssociation', - fields=[ - ('id', models.BigAutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID')), - ('association_date', models.DateTimeField()), - ('status', models.CharField(max_length=255)), - ('library', models.ForeignKey(on_delete=django.db.models.deletion.CASCADE, to='workflow_manager.library')), - ('workflow_run', models.ForeignKey(on_delete=django.db.models.deletion.CASCADE, to='workflow_manager.workflowrun')), - ], - options={ - 'abstract': False, - }, - ), - migrations.AddField( - model_name='workflowrun', - name='libraries', - field=models.ManyToManyField(through='workflow_manager.LibraryAssociation', to='workflow_manager.library'), - ), - ] diff --git a/lib/workload/stateless/stacks/workflow-manager/workflow_manager/models/__init__.py b/lib/workload/stateless/stacks/workflow-manager/workflow_manager/models/__init__.py index 81f4f3d24..c90b18c43 100644 --- a/lib/workload/stateless/stacks/workflow-manager/workflow_manager/models/__init__.py +++ b/lib/workload/stateless/stacks/workflow-manager/workflow_manager/models/__init__.py @@ -4,4 +4,4 @@ from .payload import Payload from .workflow_run import WorkflowRun, LibraryAssociation from .library import Library -from .state import State \ No newline at end of file +from .state import State diff --git a/lib/workload/stateless/stacks/workflow-manager/workflow_manager/models/library.py b/lib/workload/stateless/stacks/workflow-manager/workflow_manager/models/library.py index fc57ac4dc..149947390 100644 --- a/lib/workload/stateless/stacks/workflow-manager/workflow_manager/models/library.py +++ b/lib/workload/stateless/stacks/workflow-manager/workflow_manager/models/library.py @@ -2,7 +2,6 @@ from django.db import models from workflow_manager.models.base import OrcaBusBaseModel, OrcaBusBaseManager -from workflow_manager.models.workflow import Workflow class LibraryManager(OrcaBusBaseManager): diff --git a/lib/workload/stateless/stacks/workflow-manager/workflow_manager/models/state.py b/lib/workload/stateless/stacks/workflow-manager/workflow_manager/models/state.py index 76f927ae1..22dc8b016 100644 --- a/lib/workload/stateless/stacks/workflow-manager/workflow_manager/models/state.py +++ b/lib/workload/stateless/stacks/workflow-manager/workflow_manager/models/state.py @@ -1,7 +1,8 @@ from django.db import models from workflow_manager.models.base import OrcaBusBaseModel, OrcaBusBaseManager -from workflow_manager.models import WorkflowRun, Payload +from workflow_manager.models.workflow_run import WorkflowRun +from workflow_manager.models.payload import Payload class StateManager(OrcaBusBaseManager): diff --git a/lib/workload/stateless/stacks/workflow-manager/workflow_manager/models/workflow_run.py b/lib/workload/stateless/stacks/workflow-manager/workflow_manager/models/workflow_run.py index 26fc64a59..9c6fa6b00 100644 --- a/lib/workload/stateless/stacks/workflow-manager/workflow_manager/models/workflow_run.py +++ b/lib/workload/stateless/stacks/workflow-manager/workflow_manager/models/workflow_run.py @@ -1,7 +1,8 @@ from django.db import models from workflow_manager.models.base import OrcaBusBaseModel, OrcaBusBaseManager -from workflow_manager.models import Workflow, Library, State +from workflow_manager.models.library import Library +from workflow_manager.models.workflow import Workflow class WorkflowRunManager(OrcaBusBaseManager): @@ -15,9 +16,6 @@ class WorkflowRun(OrcaBusBaseModel): # --- mandatory fields portal_run_id = models.CharField(max_length=255, unique=True) - current_status = models.CharField(max_length=255) - created = models.DateTimeField() - last_modified = models.DateTimeField() # --- optional fields @@ -43,9 +41,6 @@ def to_dict(self): return { "id": self.id, "portal_run_id": self.portal_run_id, - "current_status": self.current_status, - "created": str(self.created), - "last_modified": str(self.last_modified), "execution_id": self.execution_id, "workflow_run_name": self.workflow_run_name, "comment": self.comment, diff --git a/lib/workload/stateless/stacks/workflow-manager/workflow_manager/serializers.py b/lib/workload/stateless/stacks/workflow-manager/workflow_manager/serializers.py index aaa783fde..da6ba63f0 100644 --- a/lib/workload/stateless/stacks/workflow-manager/workflow_manager/serializers.py +++ b/lib/workload/stateless/stacks/workflow-manager/workflow_manager/serializers.py @@ -1,9 +1,6 @@ -from typing import Dict, List - from rest_framework import serializers -from rest_framework.fields import empty -from workflow_manager.models import Workflow, WorkflowRun, Payload, Library +from workflow_manager.models import Workflow, WorkflowRun, Payload, Library, State READ_ONLY_SERIALIZER = "READ ONLY SERIALIZER" @@ -53,7 +50,14 @@ class Meta: model = Payload fields = '__all__' + class LibraryModelSerializer(serializers.ModelSerializer): class Meta: model = Library fields = '__all__' + + +class StateModelSerializer(serializers.ModelSerializer): + class Meta: + model = State + fields = '__all__' diff --git a/lib/workload/stateless/stacks/workflow-manager/workflow_manager/tests/factories.py b/lib/workload/stateless/stacks/workflow-manager/workflow_manager/tests/factories.py index a530cddaf..dce8c2842 100644 --- a/lib/workload/stateless/stacks/workflow-manager/workflow_manager/tests/factories.py +++ b/lib/workload/stateless/stacks/workflow-manager/workflow_manager/tests/factories.py @@ -1,12 +1,12 @@ from enum import Enum -import uuid, json +import uuid from datetime import datetime from zoneinfo import ZoneInfo import factory from django.utils.timezone import make_aware -from workflow_manager.models import Workflow, WorkflowRun, Payload, Library +from workflow_manager.models import Workflow, WorkflowRun, Payload, Library, State class TestConstant(Enum): @@ -51,11 +51,8 @@ class Meta: portal_run_id = f"20240130{_uid[:8]}" execution_id = _uid workflow_run_name = f"TestWorkflowRun{_uid[:8]}" - status = "READY" comment = "Lorem Ipsum" - timestamp = make_aware(datetime.now()) # If required, set later - payload = None workflow = None @@ -65,3 +62,14 @@ class Meta: library_id = TestConstant.library.value["library_id"] orcabus_id = TestConstant.library.value["orcabus_id"] + + +class StateFactory(factory.django.DjangoModelFactory): + class Meta: + model = State + + status = "READY" + timestamp = make_aware(datetime.now()) + comment = "Comment" + payload = None + workflow_run = factory.SubFactory(WorkflowRunFactory) diff --git a/lib/workload/stateless/stacks/workflow-manager/workflow_manager/urls/base.py b/lib/workload/stateless/stacks/workflow-manager/workflow_manager/urls/base.py index ce9d7ca47..7232769bc 100644 --- a/lib/workload/stateless/stacks/workflow-manager/workflow_manager/urls/base.py +++ b/lib/workload/stateless/stacks/workflow-manager/workflow_manager/urls/base.py @@ -4,7 +4,7 @@ from workflow_manager.viewsets.workflow import WorkflowViewSet from workflow_manager.viewsets.workflow_run import WorkflowRunViewSet from workflow_manager.viewsets.payload import PayloadViewSet -from workflow_manager.viewsets.library import LibraryViewSet +from workflow_manager.viewsets.state import StateViewSet from workflow_manager.settings.base import API_VERSION api_namespace = "api" @@ -15,7 +15,12 @@ router.register(r"workflow", WorkflowViewSet, basename="workflow") router.register(r"workflowrun", WorkflowRunViewSet, basename="workflowrun") router.register(r"payload", PayloadViewSet, basename="payload") -# router.register(r"library", LibraryViewSet, basename="library") + +router.register( + "workflowrun/(?P[^/.]+)/state", + StateViewSet, + basename="workflowrun-state", +) urlpatterns = [ path(f"{api_base}", include(router.urls)), diff --git a/lib/workload/stateless/stacks/workflow-manager/workflow_manager/viewsets/state.py b/lib/workload/stateless/stacks/workflow-manager/workflow_manager/viewsets/state.py new file mode 100644 index 000000000..660209330 --- /dev/null +++ b/lib/workload/stateless/stacks/workflow-manager/workflow_manager/viewsets/state.py @@ -0,0 +1,22 @@ +from rest_framework import filters +from rest_framework.viewsets import ReadOnlyModelViewSet + +from workflow_manager.models import State +from workflow_manager.pagination import StandardResultsSetPagination +from workflow_manager.serializers import StateModelSerializer + + +class StateViewSet(ReadOnlyModelViewSet): + serializer_class = StateModelSerializer + pagination_class = StandardResultsSetPagination + filter_backends = [filters.OrderingFilter, filters.SearchFilter] + ordering_fields = '__all__' + ordering = ['-id'] + search_fields = State.get_base_fields() + + def get_queryset(self): + qs = State.objects.filter(workflow_run=self.kwargs["workflowrun_id"]) + qs = State.objects.get_model_fields_query(qs, **self.request.query_params) + return qs + + diff --git a/lib/workload/stateless/stacks/workflow-manager/workflow_manager_proc/services/create_workflow_run.py b/lib/workload/stateless/stacks/workflow-manager/workflow_manager_proc/services/create_workflow_run.py index 5310f7472..267da069a 100644 --- a/lib/workload/stateless/stacks/workflow-manager/workflow_manager_proc/services/create_workflow_run.py +++ b/lib/workload/stateless/stacks/workflow-manager/workflow_manager_proc/services/create_workflow_run.py @@ -16,8 +16,6 @@ from workflow_manager.models import ( WorkflowRun, Workflow, - State, - Payload, Library, LibraryAssociation, ) @@ -58,8 +56,6 @@ def handler(event, context): # then create the actual workflow run entry if it does not exist try: wfr: WorkflowRun = WorkflowRun.objects.get(portal_run_id=wrsc.portalRunId) - wfr.current_status = wrsc.status - wfr.last_modified = wrsc.timestamp except Exception: print("No workflow found! Creating new entry.") wfr = WorkflowRun( @@ -67,10 +63,7 @@ def handler(event, context): portal_run_id=wrsc.portalRunId, execution_id=wrsc.executionId, # the execution service WRSC does carry the execution ID workflow_run_name=wrsc.workflowRunName, - current_status=wrsc.status, - comment=None, - last_modified=wrsc.timestamp, - created=wrsc.timestamp + comment=None ) print("Persisting Workflow record.") wfr.save() diff --git a/lib/workload/stateless/stacks/workflow-manager/workflow_manager_proc/tests/test_workflow_srv.py b/lib/workload/stateless/stacks/workflow-manager/workflow_manager_proc/tests/test_workflow_srv.py index 829c1b0fb..a810e8bef 100644 --- a/lib/workload/stateless/stacks/workflow-manager/workflow_manager_proc/tests/test_workflow_srv.py +++ b/lib/workload/stateless/stacks/workflow-manager/workflow_manager_proc/tests/test_workflow_srv.py @@ -1,7 +1,10 @@ from unittest import skip + +from django.db.models import QuerySet + from workflow_manager_proc.services import create_workflow_run from workflow_manager_proc.tests.case import WorkflowManagerProcUnitTestCase, logger -from workflow_manager.models.workflow import Workflow +from workflow_manager.models import WorkflowRun class WorkflowSrvUnitTests(WorkflowManagerProcUnitTestCase): @@ -9,7 +12,7 @@ class WorkflowSrvUnitTests(WorkflowManagerProcUnitTestCase): # @skip def test_get_workflow_from_db(self): """ - python manage.py test workflow_manager_proc.tests.test_workflow_srv.WorkflowSrvUnitTests.test_get_workflow_from_db + # python manage.py test workflow_manager_proc.tests.test_workflow_srv.WorkflowSrvUnitTests.test_get_workflow_from_db """ test_event = { @@ -36,10 +39,16 @@ def test_get_workflow_from_db(self): } } - test_wfl = create_workflow_run.handler(test_event, None) - logger.info(test_wfl) - self.assertIsNotNone(test_wfl) - self.assertEqual("ctTSO500-L000002", test_wfl.workflow_run_name) + test_wfr = create_workflow_run.handler(test_event, None) + logger.info(test_wfr) + self.assertIsNotNone(test_wfr) + self.assertEqual("ctTSO500-L000002", test_wfr.workflow_run_name) + + logger.info("Retrieve persisted DB records") + wfr_qs: QuerySet = WorkflowRun.objects.all() + self.assertEqual(1, wfr_qs.count()) + db_wfr: WorkflowRun = wfr_qs.first() + self.assertEqual("ctTSO500-L000002", db_wfr.workflow_run_name) def test_get_workflow_from_db2(self): """ From 5d90020c00387be7447923e78734dbe3523f6d78 Mon Sep 17 00:00:00 2001 From: Florian Reisinger Date: Thu, 22 Aug 2024 15:28:36 +1000 Subject: [PATCH 13/14] Add nested library endpoint to workflowrun API --- .../stacks/workflow-manager/workflow_manager/urls/base.py | 7 +++++++ .../workflow-manager/workflow_manager/viewsets/library.py | 4 +++- 2 files changed, 10 insertions(+), 1 deletion(-) diff --git a/lib/workload/stateless/stacks/workflow-manager/workflow_manager/urls/base.py b/lib/workload/stateless/stacks/workflow-manager/workflow_manager/urls/base.py index 7232769bc..207d2bbce 100644 --- a/lib/workload/stateless/stacks/workflow-manager/workflow_manager/urls/base.py +++ b/lib/workload/stateless/stacks/workflow-manager/workflow_manager/urls/base.py @@ -5,6 +5,7 @@ from workflow_manager.viewsets.workflow_run import WorkflowRunViewSet from workflow_manager.viewsets.payload import PayloadViewSet from workflow_manager.viewsets.state import StateViewSet +from workflow_manager.viewsets.library import LibraryViewSet from workflow_manager.settings.base import API_VERSION api_namespace = "api" @@ -22,6 +23,12 @@ basename="workflowrun-state", ) +router.register( + "workflowrun/(?P[^/.]+)/library", + LibraryViewSet, + basename="workflowrun-library", +) + urlpatterns = [ path(f"{api_base}", include(router.urls)), ] diff --git a/lib/workload/stateless/stacks/workflow-manager/workflow_manager/viewsets/library.py b/lib/workload/stateless/stacks/workflow-manager/workflow_manager/viewsets/library.py index bf2d5f150..36d1349e8 100644 --- a/lib/workload/stateless/stacks/workflow-manager/workflow_manager/viewsets/library.py +++ b/lib/workload/stateless/stacks/workflow-manager/workflow_manager/viewsets/library.py @@ -15,4 +15,6 @@ class LibraryViewSet(ReadOnlyModelViewSet): search_fields = Library.get_base_fields() def get_queryset(self): - return Library.objects.get_by_keyword(**self.request.query_params) + qs = Library.objects.filter(workflowrun=self.kwargs["workflowrun_id"]) + qs = Library.objects.get_model_fields_query(qs, **self.request.query_params) + return qs From 1ccd835080a01a7d49e5fae0ffc6458b52c61727 Mon Sep 17 00:00:00 2001 From: Florian Reisinger Date: Thu, 22 Aug 2024 19:18:18 +1000 Subject: [PATCH 14/14] Update mock data --- .../stacks/workflow-manager/Makefile | 5 + .../commands/generate_mock_workflow_run.py | 152 ++++++++++++++---- 2 files changed, 128 insertions(+), 29 deletions(-) diff --git a/lib/workload/stateless/stacks/workflow-manager/Makefile b/lib/workload/stateless/stacks/workflow-manager/Makefile index a910d004e..ee07e2122 100644 --- a/lib/workload/stateless/stacks/workflow-manager/Makefile +++ b/lib/workload/stateless/stacks/workflow-manager/Makefile @@ -30,6 +30,11 @@ migrate: start: migrate @python manage.py runserver_plus 0.0.0.0:8000 +mock: + @python manage.py generate_mock_workflow_run + +run-mock: reset-db migrate mock start + openapi: @python manage.py generateschema > orcabus.hlo.openapi.yaml diff --git a/lib/workload/stateless/stacks/workflow-manager/workflow_manager/management/commands/generate_mock_workflow_run.py b/lib/workload/stateless/stacks/workflow-manager/workflow_manager/management/commands/generate_mock_workflow_run.py index e7ff462f0..20bcffe2a 100644 --- a/lib/workload/stateless/stacks/workflow-manager/workflow_manager/management/commands/generate_mock_workflow_run.py +++ b/lib/workload/stateless/stacks/workflow-manager/workflow_manager/management/commands/generate_mock_workflow_run.py @@ -11,57 +11,151 @@ WORKFLOW_NAME = "TestWorkflow" +STATUS_START = "READY" +STATUS_RUNNING = "RUNNING" +STATUS_END = "SUCCEEDED" +STATUS_FAIL = "FAILED" # https://docs.djangoproject.com/en/5.0/howto/custom-management-commands/ class Command(BaseCommand): - help = "Generate mock Workflow data into database for local development and testing" + help = """ + Generate mock data and populate DB for local testing. + """ def handle(self, *args, **options): - qs: QuerySet = Workflow.objects.filter(workflow_name=WORKFLOW_NAME) - - if qs.exists(): + # don't do anything if there is already mock data + if Workflow.objects.filter(workflow_name__startswith=WORKFLOW_NAME).exists(): print("Mock data found, Skipping creation.") return - wf = WorkflowFactory(workflow_name=WORKFLOW_NAME) - wfr: WorkflowRun = WorkflowRunFactory( - workflow_run_name="MockWorkflowRun", + # Common components: payload and libraries + generic_payload = PayloadFactory() # Payload content is not important for now + libraries = [ + LibraryFactory(orcabus_id="lib.01J5M2JFE1JPYV62RYQEG99CP1", library_id="L000001"), + LibraryFactory(orcabus_id="lib.02J5M2JFE1JPYV62RYQEG99CP2", library_id="L000002"), + LibraryFactory(orcabus_id="lib.03J5M2JFE1JPYV62RYQEG99CP3", library_id="L000003"), + LibraryFactory(orcabus_id="lib.04J5M2JFE1JPYV62RYQEG99CP4", library_id="L000004") + ] + + # First case: a primary workflow with two executions linked to 4 libraries + # The first execution failed and led to a repetition that succeeded + self.create_primary(generic_payload, libraries) + self.create_secondary(generic_payload, libraries) + + print("Done") + + @staticmethod + def create_primary(generic_payload, libraries): + """ + Case: a primary workflow with two executions linked to 4 libraries + The first execution failed and led to a repetition that succeeded + """ + + wf = WorkflowFactory(workflow_name=WORKFLOW_NAME + "Primary") + + # The first execution (workflow run 1) + wfr_1: WorkflowRun = WorkflowRunFactory( + workflow_run_name=WORKFLOW_NAME + "PrimaryRun1", portal_run_id="1234", workflow=wf ) + for state in [STATUS_START, STATUS_RUNNING, STATUS_FAIL]: + StateFactory(workflow_run=wfr_1, status=state, payload=generic_payload) + for i in [0, 1, 2, 3]: + LibraryAssociation.objects.create( + workflow_run=wfr_1, + library=libraries[i], + association_date=make_aware(datetime.now()), + status="ACTIVE", + ) - wf_payload = PayloadFactory() - StateFactory( - workflow_run=wfr, - payload=wf_payload + # The second execution (workflow run 2) + wfr_2: WorkflowRun = WorkflowRunFactory( + workflow_run_name=WORKFLOW_NAME + "PrimaryRun2", + portal_run_id="1235", + workflow=wf ) + for state in [STATUS_START, STATUS_RUNNING, STATUS_END]: + StateFactory(workflow_run=wfr_2, status=state, payload=generic_payload) + for i in [0, 1, 2, 3]: + LibraryAssociation.objects.create( + workflow_run=wfr_2, + library=libraries[i], + association_date=make_aware(datetime.now()), + status="ACTIVE", + ) + + @staticmethod + def create_secondary(generic_payload, libraries): + """ + Case: a secondary pipeline comprising 3 workflows with corresponding executions + First workflow: QC (2 runs for 2 libraries) + Second workflow: Alignment (1 run for 2 libraries) + Third workflow: VariantCalling (1 run for 2 libraries) + """ - library = LibraryFactory() + wf_qc = WorkflowFactory(workflow_name=WORKFLOW_NAME + "QC") + + # QC of Library 1 + wfr_qc_1: WorkflowRun = WorkflowRunFactory( + workflow_run_name=WORKFLOW_NAME + "QCRunLib1", + portal_run_id="2345", + workflow=wf_qc + ) + for state in [STATUS_START, STATUS_RUNNING, STATUS_END]: + StateFactory(workflow_run=wfr_qc_1, status=state, payload=generic_payload) LibraryAssociation.objects.create( - workflow_run=wfr, - library=library, + workflow_run=wfr_qc_1, + library=libraries[0], association_date=make_aware(datetime.now()), status="ACTIVE", ) - wfr2: WorkflowRun = WorkflowRunFactory( - workflow_run_name="MockWorkflowRun2", - portal_run_id="1235", - workflow=wf - ) - StateFactory( - workflow_run=wfr2, - status="RUNNING", - payload=wf_payload + # QC of Library 2 + wfr_qc_2: WorkflowRun = WorkflowRunFactory( + workflow_run_name=WORKFLOW_NAME + "QCRunLib2", + portal_run_id="2346", + workflow=wf_qc ) - - library2 = LibraryFactory(orcabus_id="lib.01J5M2JFE1JPYV62RYQEG99CP5", library_id="L000002") + for state in [STATUS_START, STATUS_RUNNING, STATUS_END]: + StateFactory(workflow_run=wfr_qc_2, status=state, payload=generic_payload) LibraryAssociation.objects.create( - workflow_run=wfr2, - library=library2, + workflow_run=wfr_qc_2, + library=libraries[1], association_date=make_aware(datetime.now()), status="ACTIVE", ) - print(libjson.dumps(wf.to_dict())) - print("Done") + # Alignment + wf_align = WorkflowFactory(workflow_name=WORKFLOW_NAME + "Alignment") + wfr_a: WorkflowRun = WorkflowRunFactory( + workflow_run_name=WORKFLOW_NAME + "AlignmentRun", + portal_run_id="3456", + workflow=wf_align + ) + for state in [STATUS_START, STATUS_RUNNING, STATUS_END]: + StateFactory(workflow_run=wfr_a, status=state, payload=generic_payload) + for i in [0, 1]: + LibraryAssociation.objects.create( + workflow_run=wfr_a, + library=libraries[i], + association_date=make_aware(datetime.now()), + status="ACTIVE", + ) + + # Variant Calling + wf_vc = WorkflowFactory(workflow_name=WORKFLOW_NAME + "VariantCalling") + wfr_vc: WorkflowRun = WorkflowRunFactory( + workflow_run_name=WORKFLOW_NAME + "VariantCallingRun", + portal_run_id="4567", + workflow=wf_vc + ) + for state in [STATUS_START, STATUS_RUNNING, STATUS_END]: + StateFactory(workflow_run=wfr_vc, status=state, payload=generic_payload) + for i in [0, 1]: + LibraryAssociation.objects.create( + workflow_run=wfr_vc, + library=libraries[i], + association_date=make_aware(datetime.now()), + status="ACTIVE", + )