diff --git a/lib/workload/stateless/stacks/workflow-manager/workflow_manager/management/commands/generate_mock_workflow_run.py b/lib/workload/stateless/stacks/workflow-manager/workflow_manager/management/commands/generate_mock_workflow_run.py index 20bcffe2a..d0e995a1a 100644 --- a/lib/workload/stateless/stacks/workflow-manager/workflow_manager/management/commands/generate_mock_workflow_run.py +++ b/lib/workload/stateless/stacks/workflow-manager/workflow_manager/management/commands/generate_mock_workflow_run.py @@ -11,6 +11,7 @@ WORKFLOW_NAME = "TestWorkflow" +STATUS_DRAFT = "DRAFT" STATUS_START = "READY" STATUS_RUNNING = "RUNNING" STATUS_END = "SUCCEEDED" @@ -59,7 +60,7 @@ def create_primary(generic_payload, libraries): portal_run_id="1234", workflow=wf ) - for state in [STATUS_START, STATUS_RUNNING, STATUS_FAIL]: + for state in [STATUS_DRAFT, STATUS_START, STATUS_RUNNING, STATUS_FAIL]: StateFactory(workflow_run=wfr_1, status=state, payload=generic_payload) for i in [0, 1, 2, 3]: LibraryAssociation.objects.create( @@ -75,7 +76,7 @@ def create_primary(generic_payload, libraries): portal_run_id="1235", workflow=wf ) - for state in [STATUS_START, STATUS_RUNNING, STATUS_END]: + for state in [STATUS_DRAFT, STATUS_START, STATUS_RUNNING, STATUS_END]: StateFactory(workflow_run=wfr_2, status=state, payload=generic_payload) for i in [0, 1, 2, 3]: LibraryAssociation.objects.create( @@ -102,7 +103,7 @@ def create_secondary(generic_payload, libraries): portal_run_id="2345", workflow=wf_qc ) - for state in [STATUS_START, STATUS_RUNNING, STATUS_END]: + for state in [STATUS_DRAFT, STATUS_START, STATUS_RUNNING, STATUS_END]: StateFactory(workflow_run=wfr_qc_1, status=state, payload=generic_payload) LibraryAssociation.objects.create( workflow_run=wfr_qc_1, @@ -117,7 +118,7 @@ def create_secondary(generic_payload, libraries): portal_run_id="2346", workflow=wf_qc ) - for state in [STATUS_START, STATUS_RUNNING, STATUS_END]: + for state in [STATUS_DRAFT, STATUS_START, STATUS_RUNNING, STATUS_END]: StateFactory(workflow_run=wfr_qc_2, status=state, payload=generic_payload) LibraryAssociation.objects.create( workflow_run=wfr_qc_2, @@ -133,7 +134,7 @@ def create_secondary(generic_payload, libraries): portal_run_id="3456", workflow=wf_align ) - for state in [STATUS_START, STATUS_RUNNING, STATUS_END]: + for state in [STATUS_DRAFT, STATUS_START, STATUS_RUNNING, STATUS_END]: StateFactory(workflow_run=wfr_a, status=state, payload=generic_payload) for i in [0, 1]: LibraryAssociation.objects.create( @@ -150,7 +151,7 @@ def create_secondary(generic_payload, libraries): portal_run_id="4567", workflow=wf_vc ) - for state in [STATUS_START, STATUS_RUNNING, STATUS_END]: + for state in [STATUS_DRAFT, STATUS_START, STATUS_RUNNING, STATUS_END]: StateFactory(workflow_run=wfr_vc, status=state, payload=generic_payload) for i in [0, 1]: LibraryAssociation.objects.create( diff --git a/lib/workload/stateless/stacks/workflow-manager/workflow_manager/models/__init__.py b/lib/workload/stateless/stacks/workflow-manager/workflow_manager/models/__init__.py index c90b18c43..5e2e6da1d 100644 --- a/lib/workload/stateless/stacks/workflow-manager/workflow_manager/models/__init__.py +++ b/lib/workload/stateless/stacks/workflow-manager/workflow_manager/models/__init__.py @@ -5,3 +5,5 @@ from .workflow_run import WorkflowRun, LibraryAssociation from .library import Library from .state import State +from .state import Status +from .utils import WorkflowRunUtil diff --git a/lib/workload/stateless/stacks/workflow-manager/workflow_manager/models/state.py b/lib/workload/stateless/stacks/workflow-manager/workflow_manager/models/state.py index 22dc8b016..de8c607a5 100644 --- a/lib/workload/stateless/stacks/workflow-manager/workflow_manager/models/state.py +++ b/lib/workload/stateless/stacks/workflow-manager/workflow_manager/models/state.py @@ -1,10 +1,86 @@ from django.db import models +from enum import Enum +from typing import List from workflow_manager.models.base import OrcaBusBaseModel, OrcaBusBaseManager from workflow_manager.models.workflow_run import WorkflowRun from workflow_manager.models.payload import Payload +class Status(Enum): + DRAFT = "DRAFT", ['DRAFT', 'INITIAL', 'CREATED'] + READY = "READY", ['READY'] + RUNNING = "RUNNING", ['RUNNING', 'IN_PROGRESS'] + SUCCEEDED = "SUCCEEDED", ['SUCCEEDED', 'SUCCESS'] + FAILED = "FAILED", ['FAILED', 'FAILURE'] + ABORTED = "ABORTED", ['ABORTED', 'CANCELLED', 'CANCELED'] + + def __init__(self, convention: str, aliases: List[str]): + self.convention = convention + self.aliases = aliases + + def __str__(self): + return self.convention + + @staticmethod + def get_convention(status: str): + # enforce upper case convention + status = status.upper() + status = status.replace("-", "_") + # TODO: handle other characters? + for s in Status: + if status in s.aliases: + return s.convention + + # retain all uncontrolled states + return status + + @staticmethod + def is_supported(status: str) -> bool: + # enforce upper case convention + status = status.upper() + for s in Status: + if status in s.aliases: + return True + return False + + @staticmethod + def is_terminal(status: str) -> bool: + # enforce upper case convention + status = status.upper() + for s in [Status.SUCCEEDED, Status.FAILED, Status.ABORTED]: + if status in s.aliases: + return True + return False + + @staticmethod + def is_draft(status: str) -> bool: + # enforce upper case convention + status = status.upper() + if status in Status.DRAFT.aliases: + return True + else: + return False + + @staticmethod + def is_running(status: str) -> bool: + # enforce upper case convention + status = status.upper() + if status in Status.RUNNING.aliases: + return True + else: + return False + + @staticmethod + def is_ready(status: str) -> bool: + # enforce upper case convention + status = status.upper() + if status in Status.READY.aliases: + return True + else: + return False + + class StateManager(OrcaBusBaseManager): pass @@ -17,7 +93,7 @@ class Meta: # --- mandatory fields workflow_run = models.ForeignKey(WorkflowRun, on_delete=models.CASCADE) - status = models.CharField(max_length=255) + status = models.CharField(max_length=255) # TODO: How and where to enforce conventions? timestamp = models.DateTimeField() comment = models.CharField(max_length=255, null=True, blank=True) @@ -40,3 +116,14 @@ def to_dict(self): "payload": self.payload.to_dict() if (self.payload is not None) else None, } + def is_terminal(self) -> bool: + return Status.is_terminal(str(self.status)) + + def is_draft(self) -> bool: + return Status.is_draft(str(self.status)) + + def is_ready(self) -> bool: + return Status.is_ready(str(self.status)) + + def is_running(self) -> bool: + return Status.is_running(str(self.status)) diff --git a/lib/workload/stateless/stacks/workflow-manager/workflow_manager/models/utils.py b/lib/workload/stateless/stacks/workflow-manager/workflow_manager/models/utils.py new file mode 100644 index 000000000..3d41ab2d3 --- /dev/null +++ b/lib/workload/stateless/stacks/workflow-manager/workflow_manager/models/utils.py @@ -0,0 +1,143 @@ +import logging +from datetime import timedelta +from typing import List + +from workflow_manager.models import Status, State, WorkflowRun + +logger = logging.getLogger() +logger.setLevel(logging.INFO) + +RUNNING_MIN_TIME_DELTA_SEC = timedelta(hours=1).total_seconds() +TIMEDELTA_1H = timedelta(hours=1) + + +class WorkflowRunUtil: + """ + Utility methods for a WorkflowRun. + # TODO: this could be integrated into the WorflowRun model class? (figure out performance / implications) + """ + + def __init__(self, workflow_run: WorkflowRun): + self.workflow_run = workflow_run + self.states: List[State] = list(self.workflow_run.get_all_states()) + + def get_current_state(self): + if len(self.states) < 1: + return None + elif len(self.states) == 1: + return self.states[0] + else: + return WorkflowRunUtil.get_latest_state(self.states) + + def is_complete(self): + return self.get_current_state().is_terminal() + + def is_draft(self): + # There may be multiple DRAFT states. We assume they are in order, e.g. no other state inbetween + return self.get_current_state().is_draft() + + def is_ready(self): + return self.get_current_state().is_ready() + + def is_running(self): + return self.get_current_state().is_running() + + def contains_status(self, status: str): + # NOTE: we assume status is following conventions + for s in self.states: + if status == s.status: + return True + return False + + def transition_to(self, new_state: State) -> bool: + """ + Parameter: + new_state: the new state to transition to + Process: + Transition to the new state if possible and update the WorkflowRun. + Return: + False: if the transition is not possible + True: if the state was updated + # TODO: consider race conditions? + """ + # enforce status conventions on new state + new_state.status = Status.get_convention(new_state.status) # TODO: encapsulate into State ?! + + # If it's a brand new WorkflowRun we expect the first state to be DRAFT + # TODO: handle exceptions; + # BCL Convert may not create a DRAFT state + if not self.get_current_state(): + if new_state.is_draft(): + self.persist_state(new_state) + return True + else: + logger.warning(f"WorkflowRun does not have state yet, but new state is not DRAFT: {new_state}") + self.persist_state(new_state) # FIXME: remove once convention is enforced + return True + + # Ignore any state that's older than the current one + if new_state.timestamp < self.get_current_state().timestamp: + return False + + # Don't allow any changes once in terminal state + if self.is_complete(): + logger.info(f"WorkflowRun in terminal state, can't transition to: {new_state.status}") + return False + + # Allowed transitions from DRAFT state + if self.is_draft(): + if new_state.is_draft(): # allow "updates" of the DRAFT state + self.persist_state(new_state) + return True + elif new_state.is_ready(): # allow transition from DRAFT to READY state + self.persist_state(new_state) + return True + else: + return False # Don't allow any other transitions from DRAFT state + + # Allowed transitions from READY state + if self.is_ready(): + if new_state.is_draft(): # no going back + return False + if new_state.is_ready(): # no updates to READY state + return False + # Transitions to other states is allowed (may not be controlled states though, so we can't control) + + # Allowed transitions from RUNNING state + if self.is_running(): + if new_state.is_draft(): # no going back + return False + if new_state.is_ready(): # no going back + return False + if new_state.is_running(): + # Only allow updates every so often + time_delta = new_state.timestamp - self.get_current_state().timestamp + if time_delta.total_seconds() < TIMEDELTA_1H.total_seconds(): + # Avoid too frequent updates for RUNNING state + return False + else: + self.persist_state(new_state) + return True + + # Allowed transitions from other state + if self.contains_status(new_state.status): + # Don't allow updates/duplications of other states + return False + + # Assume other state transitions are OK + self.persist_state(new_state) + return True + + def persist_state(self, new_state): + new_state.workflow_run = self.workflow_run + new_state.payload.save() # Need to save Payload before we can save State + new_state.save() + + @staticmethod + def get_latest_state(states: List[State]) -> State: + last: State = states[0] + for s in states: + if s.timestamp > last.timestamp: + last = s + return last + diff --git a/lib/workload/stateless/stacks/workflow-manager/workflow_manager/models/workflow_run.py b/lib/workload/stateless/stacks/workflow-manager/workflow_manager/models/workflow_run.py index 9c6fa6b00..41eb0b7fd 100644 --- a/lib/workload/stateless/stacks/workflow-manager/workflow_manager/models/workflow_run.py +++ b/lib/workload/stateless/stacks/workflow-manager/workflow_manager/models/workflow_run.py @@ -10,7 +10,6 @@ class WorkflowRunManager(OrcaBusBaseManager): class WorkflowRun(OrcaBusBaseModel): - id = models.BigAutoField(primary_key=True) # --- mandatory fields @@ -35,7 +34,8 @@ class WorkflowRun(OrcaBusBaseModel): objects = WorkflowRunManager() def __str__(self): - return f"ID: {self.id}, portal_run_id: {self.portal_run_id}" + return f"ID: {self.id}, portal_run_id: {self.portal_run_id}, workflow_run_name: {self.workflow_run_name}, " \ + f"workflow: {self.workflow.workflow_name} " def to_dict(self): return { @@ -47,6 +47,10 @@ def to_dict(self): "workflow": self.workflow.to_dict() if (self.workflow is not None) else None } + def get_all_states(self): + # retrieve all states (DB records rather than a queryset) + return list(self.state_set.all()) # TODO: ensure order by timestamp ? + class LibraryAssociationManager(OrcaBusBaseManager): pass diff --git a/lib/workload/stateless/stacks/workflow-manager/workflow_manager/viewsets/workflow_run.py b/lib/workload/stateless/stacks/workflow-manager/workflow_manager/viewsets/workflow_run.py index baac9093b..842abc907 100644 --- a/lib/workload/stateless/stacks/workflow-manager/workflow_manager/viewsets/workflow_run.py +++ b/lib/workload/stateless/stacks/workflow-manager/workflow_manager/viewsets/workflow_run.py @@ -15,5 +15,4 @@ class WorkflowRunViewSet(ReadOnlyModelViewSet): search_fields = WorkflowRun.get_base_fields() def get_queryset(self): - print(self.request.query_params) return WorkflowRun.objects.get_by_keyword(**self.request.query_params) diff --git a/lib/workload/stateless/stacks/workflow-manager/workflow_manager_proc/lambdas/handle_service_wrsc_event.py b/lib/workload/stateless/stacks/workflow-manager/workflow_manager_proc/lambdas/handle_service_wrsc_event.py index cba142950..59868f4cf 100644 --- a/lib/workload/stateless/stacks/workflow-manager/workflow_manager_proc/lambdas/handle_service_wrsc_event.py +++ b/lib/workload/stateless/stacks/workflow-manager/workflow_manager_proc/lambdas/handle_service_wrsc_event.py @@ -3,73 +3,35 @@ django.setup() # --- keep ^^^ at top of the module -import datetime -from workflow_manager.models import WorkflowRun, State import workflow_manager_proc.domain.executionservice.workflowrunstatechange as srv import workflow_manager_proc.domain.workflowmanager.workflowrunstatechange as wfm -from workflow_manager_proc.services import create_workflow_run, emit_workflow_run_state_change, \ - create_workflow_run_state - -default_time_window = datetime.timedelta(hours=1) +from workflow_manager_proc.services import emit_workflow_run_state_change, create_workflow_run_state def handler(event, context): - """event will be a .WorkflowRunStateChange event""" + """ + Parameters: + event: JSON event conform to .WorkflowRunStateChange + context: ignored for now (only used to conform to Lambda handler conventions) + Procedure: + - Unpack AWS event + - create new State for WorkflowRun if required + - relay the state change as WorkflowManager WRSC event if applicable + """ print(f"Processing {event}, {context}") + # remove the AWSEvent wrapper from our WRSC event input_event: srv.AWSEvent = srv.Marshaller.unmarshall(event, srv.AWSEvent) input_wrsc: srv.WorkflowRunStateChange = input_event.detail - print(f"Finding WorkflowRun records for portal_run_id:{input_wrsc.portalRunId}") - try: - wfr: WorkflowRun = WorkflowRun.objects.get(portal_run_id=input_wrsc.portalRunId) - except Exception: - wfr: WorkflowRun = create_workflow_run.handler(srv.Marshaller.marshall(input_wrsc), None) - - state_matches = State.objects.filter(workflow_run=wfr) - if input_wrsc.status: - state_matches = state_matches.filter(status=input_wrsc.status) - if input_wrsc.timestamp: - dt = datetime.datetime.fromisoformat(str(input_wrsc.timestamp)) - start_t = dt - default_time_window - end_t = dt + default_time_window - state_matches = state_matches.filter(timestamp__range=(start_t, end_t)) - # check state list - if len(state_matches) == 0: - print(f"No matching WorkflowRun State found. Creating...") - # create new state entry - wfr_state: State = create_workflow_run_state(wrsc=input_wrsc, wfr=wfr) - - # create outgoing event - out_event = map_db_record_to_wrsc(wfr, wfr_state) - - # emit state change + out_wrsc = create_workflow_run_state.handler(srv.Marshaller.marshall(input_wrsc), None) + if out_wrsc: + # new state resulted in state transition, we can relay the WRSC print("Emitting WRSC.") - emit_workflow_run_state_change.handler(wfm.Marshaller.marshall(out_event), None) + emit_workflow_run_state_change.handler(wfm.Marshaller.marshall(out_wrsc), None) else: - # ignore - status already exists - print(f"WorkflowRun state already exists. Nothing to do.") + # ignore - state has not been updated + print(f"WorkflowRun state not updated. No event to emit.") print(f"{__name__} done.") - - -def map_db_record_to_wrsc(db_record: WorkflowRun, state: State) -> wfm.WorkflowRunStateChange: - wrsc = wfm.WorkflowRunStateChange( - portalRunId=db_record.portal_run_id, - timestamp=state.timestamp, - status=state.status, - workflowName=db_record.workflow.workflow_name, - workflowVersion=db_record.workflow.workflow_version, - workflowRunName=db_record.workflow_run_name, - ) - - # handle condition: Payload is optional - if state.payload: - wrsc.payload = wfm.Payload( - refId=state.payload.payload_ref_id, - version=state.payload.version, - data=state.payload.data - ) - - return wrsc diff --git a/lib/workload/stateless/stacks/workflow-manager/workflow_manager_proc/lambdas/transition_bcm_fastq_copy.py b/lib/workload/stateless/stacks/workflow-manager/workflow_manager_proc/lambdas/transition_bcm_fastq_copy.py index 486a9b4a4..bcf91f14f 100644 --- a/lib/workload/stateless/stacks/workflow-manager/workflow_manager_proc/lambdas/transition_bcm_fastq_copy.py +++ b/lib/workload/stateless/stacks/workflow-manager/workflow_manager_proc/lambdas/transition_bcm_fastq_copy.py @@ -6,7 +6,6 @@ from workflow_manager.models.workflow_run import WorkflowRun import workflow_manager_proc.domain.executionservice.workflowrunstatechange as srv import workflow_manager_proc.domain.workflowmanager.workflowrunstatechange as wfm -from workflow_manager_proc.services import get_workflow_run, create_workflow_run, emit_workflow_run_state_change def handler(event, context): diff --git a/lib/workload/stateless/stacks/workflow-manager/workflow_manager_proc/services/__init__.py b/lib/workload/stateless/stacks/workflow-manager/workflow_manager_proc/services/__init__.py index 2775986e0..d94862699 100644 --- a/lib/workload/stateless/stacks/workflow-manager/workflow_manager_proc/services/__init__.py +++ b/lib/workload/stateless/stacks/workflow-manager/workflow_manager_proc/services/__init__.py @@ -1,28 +1,16 @@ import uuid from workflow_manager_proc.domain.executionservice.workflowrunstatechange import WorkflowRunStateChange -from workflow_manager.models import WorkflowRun, State, Payload +from workflow_manager.models import Payload -def create_workflow_run_state(wrsc: WorkflowRunStateChange, wfr: WorkflowRun): +def create_payload_stub_from_wrsc(wrsc: WorkflowRunStateChange): + # TODO: find better place for this input_payload: Payload = wrsc.payload - pld = None if input_payload: pld: Payload = Payload( payload_ref_id=str(uuid.uuid4()), version=input_payload.version, data=input_payload.data, ) - print("Persisting Payload record.") - pld.save() - - # create state for the workflow run - workflow_state: State = State( - workflow_run=wfr, - status=wrsc.status, - timestamp=wrsc.timestamp, - comment=None, - payload=pld - ) - workflow_state.save() - - return workflow_state + return pld + return None diff --git a/lib/workload/stateless/stacks/workflow-manager/workflow_manager_proc/services/create_workflow_run.py b/lib/workload/stateless/stacks/workflow-manager/workflow_manager_proc/services/create_workflow_run.py deleted file mode 100644 index ca8ebe72d..000000000 --- a/lib/workload/stateless/stacks/workflow-manager/workflow_manager_proc/services/create_workflow_run.py +++ /dev/null @@ -1,95 +0,0 @@ -# import django - -# django.setup() - -# # --- keep ^^^ at top of the module -import uuid -from datetime import datetime - -from django.db import transaction -from django.utils.timezone import make_aware -from workflow_manager_proc.domain.executionservice.workflowrunstatechange import ( - WorkflowRunStateChange, - LibraryRecord, - Marshaller, -) -from workflow_manager.models import ( - WorkflowRun, - Workflow, - Library, - LibraryAssociation, -) -from . import create_workflow_run_state - -ASSOCIATION_STATUS = "ACTIVE" - - -@transaction.atomic -def handler(event, context): - """ - event will be JSON conform to executionservice.WorkflowRunStateChange - """ - print(f"Processing {event}, {context}") - - wrsc: WorkflowRunStateChange = Marshaller.unmarshall(event, WorkflowRunStateChange) - print(wrsc) - - # We expect: a corresponding Workflow has to exist for each workflow run - # TODO: decide whether we allow dynamic workflow creation or expect them to exist and fail if not - try: - print(f"Looking for workflow ({wrsc.workflowName}:{wrsc.workflowVersion}).") - workflow: Workflow = Workflow.objects.get( - workflow_name=wrsc.workflowName, workflow_version=wrsc.workflowVersion - ) - except Exception: - print("No workflow found! Creating new entry.") - workflow = Workflow( - workflow_name=wrsc.workflowName, - workflow_version=wrsc.workflowVersion, - execution_engine="Unknown", - execution_engine_pipeline_id="Unknown", - approval_state="RESEARCH", - ) - print("Persisting Workflow record.") - workflow.save() - - # then create the actual workflow run entry if it does not exist - try: - wfr: WorkflowRun = WorkflowRun.objects.get(portal_run_id=wrsc.portalRunId) - except Exception: - print("No workflow found! Creating new entry.") - wfr = WorkflowRun( - workflow=workflow, - portal_run_id=wrsc.portalRunId, - execution_id=wrsc.executionId, # the execution service WRSC does carry the execution ID - workflow_run_name=wrsc.workflowRunName, - comment=None - ) - print("Persisting Workflow record.") - wfr.save() - - # create the related state & payload entries for the WRSC - # create_workflow_run_state(wrsc=wrsc, wfr=wfr) # FIXME State creation is "time window" WRSC timestamp dependant - - # if the workflow run is linked to library record(s), create the association(s) - input_libraries: list[LibraryRecord] = wrsc.linkedLibraries - if input_libraries: - for input_rec in input_libraries: - # check if the library has already a DB record - db_lib: Library = Library.objects.get_by_keyword(orcabus_id=input_rec.orcabusId) - # create it if not - if not db_lib: - # TODO: the library record should exist in the future - synced with metadata service on - # LibraryStateChange events - db_lib = Library.objects.create(orcabus_id=input_rec.orcabusId, library_id=input_rec.libraryId) - - # create the library association - LibraryAssociation.objects.create( - workflow_run=wfr, - library=db_lib, - association_date=make_aware(datetime.now()), - status=ASSOCIATION_STATUS, - ) - - print(f"{__name__} done.") - return wfr # FIXME: serialise in future (json.dumps) diff --git a/lib/workload/stateless/stacks/workflow-manager/workflow_manager_proc/services/create_workflow_run_state.py b/lib/workload/stateless/stacks/workflow-manager/workflow_manager_proc/services/create_workflow_run_state.py new file mode 100644 index 000000000..7b4b21399 --- /dev/null +++ b/lib/workload/stateless/stacks/workflow-manager/workflow_manager_proc/services/create_workflow_run_state.py @@ -0,0 +1,145 @@ +# import django + +# django.setup() + +# # --- keep ^^^ at top of the module +import datetime +import logging + +from django.db import transaction +import workflow_manager_proc.domain.executionservice.workflowrunstatechange as srv +import workflow_manager_proc.domain.workflowmanager.workflowrunstatechange as wfm +from workflow_manager.models import ( + WorkflowRun, + Workflow, + Library, + LibraryAssociation, + Payload, + State, + Status, +) +from workflow_manager.models.utils import WorkflowRunUtil +from . import create_payload_stub_from_wrsc + +logger = logging.getLogger() +logger.setLevel(logging.INFO) + +ASSOCIATION_STATUS = "ACTIVE" + + +@transaction.atomic +def handler(event, context): + """ + Parameters: + event: JSON event conform to .WorkflowRunStateChange + context: ignored for now (only used to conform to Lambda handler conventions) + Procedure: + - check whether a corresponding Workflow record exists (it should according to the pre-planning approach) + - if not exist, create (support on-the-fly approach) + - check whether a WorkflowRun record exists (it should if this is not the first/initial state) + - if not exist, create + - associate any libraries at this point (later updates/linking is not supported at this point) + - check whether the state change event constitutes a new state + - the DRAFT state allows payload updates, until it enters the READY state + - the RUNNING state allows "infrequent" updates (i.e. that happen outside a certain time window) + - other states will ignore updates of the same state + - if we have new state, then persist it + NOTE: all events that don't change any state value should be ignored + """ + logger.info(f"Start processing {event}, {context}...") + srv_wrsc: srv.WorkflowRunStateChange = srv.Marshaller.unmarshall(event, srv.WorkflowRunStateChange) + + # We expect: a corresponding Workflow has to exist for each workflow run + # NOTE: for now we allow dynamic workflow creation + # TODO: expect workflows to be pre-registered + # TODO: could move that logic to caller and expect WF to exist here + try: + logger.info(f"Looking for Workflow ({srv_wrsc.workflowName}:{srv_wrsc.workflowVersion}).") + workflow: Workflow = Workflow.objects.get( + workflow_name=srv_wrsc.workflowName, workflow_version=srv_wrsc.workflowVersion + ) + except Exception: + logger.warning("No Workflow record found! Creating new entry.") + workflow = Workflow( + workflow_name=srv_wrsc.workflowName, + workflow_version=srv_wrsc.workflowVersion, + execution_engine="Unknown", + execution_engine_pipeline_id="Unknown", + approval_state="RESEARCH", + ) + logger.info("Persisting Workflow record.") + workflow.save() + + # then create the actual workflow run entry if it does not exist + try: + wfr: WorkflowRun = WorkflowRun.objects.get(portal_run_id=srv_wrsc.portalRunId) + except Exception: + logger.info("No WorkflowRun record found! Creating new entry.") + # NOTE: the library linking is expected to be established at workflow run creation time. + # Later changes will currently be ignored. + wfr = WorkflowRun( + workflow=workflow, + portal_run_id=srv_wrsc.portalRunId, + execution_id=srv_wrsc.executionId, # the execution service WRSC does carry the execution ID + workflow_run_name=srv_wrsc.workflowRunName, + comment=None + ) + logger.info(wfr) + logger.info("Persisting WorkflowRun record.") + wfr.save() + + # if the workflow run is linked to library record(s), create the association(s) + input_libraries: list[srv.LibraryRecord] = srv_wrsc.linkedLibraries + if input_libraries: + for input_rec in input_libraries: + # check if the library has already a DB record + db_lib: Library = Library.objects.get_by_keyword(orcabus_id=input_rec.orcabusId) + # create it if not + if not db_lib: + # TODO: the library record should exist in the future - synced with metadata service on + # LibraryStateChange events + db_lib = Library.objects.create(orcabus_id=input_rec.orcabusId, library_id=input_rec.libraryId) + + # create the library association + LibraryAssociation.objects.create( + workflow_run=wfr, + library=db_lib, + association_date=datetime.datetime.now(), + status=ASSOCIATION_STATUS, + ) + + wfr_util = WorkflowRunUtil(wfr) + + # Create a new State sub (not persisted) + new_state = State( + status=srv_wrsc.status, + timestamp=srv_wrsc.timestamp, + payload=create_payload_stub_from_wrsc(srv_wrsc) + ) + + # attempt to transition to new state (will persist new state if successful) + success = wfr_util.transition_to(new_state) + if not success: + logger.warning(f"Could not apply new state: {new_state}") + return None + + wfm_wrsc = map_srv_wrsc_to_wfm_wrsc(srv_wrsc) + # Update payload ID + wfm_wrsc.payload.refId = new_state.payload.payload_ref_id + + logger.info(f"{__name__} done.") + return wfm_wrsc + + +def map_srv_wrsc_to_wfm_wrsc(input_wrsc: srv.WorkflowRunStateChange) -> wfm.WorkflowRunStateChange: + out_wrsc = wfm.WorkflowRunStateChange( + portalRunId=input_wrsc.portalRunId, + timestamp=input_wrsc.timestamp, + status=Status.get_convention(input_wrsc.status), # ensure we follow conventions + workflowName=input_wrsc.workflowName, + workflowVersion=input_wrsc.workflowVersion, + workflowRunName=input_wrsc.workflowRunName, + linkedLibraries=input_wrsc.linkedLibraries, + payload=input_wrsc.payload, # requires payload ID + ) + return out_wrsc diff --git a/lib/workload/stateless/stacks/workflow-manager/workflow_manager_proc/services/emit_workflow_run_state_change.py b/lib/workload/stateless/stacks/workflow-manager/workflow_manager_proc/services/emit_workflow_run_state_change.py index bef27fb2d..dabd70c5a 100644 --- a/lib/workload/stateless/stacks/workflow-manager/workflow_manager_proc/services/emit_workflow_run_state_change.py +++ b/lib/workload/stateless/stacks/workflow-manager/workflow_manager_proc/services/emit_workflow_run_state_change.py @@ -8,6 +8,7 @@ source = "orcabus.workflowmanager" event_bus_name = os.environ["EVENT_BUS_NAME"] + def handler(event, context): """ event has to be JSON conform to workflowmanager.WorkflowRunStateChange @@ -15,15 +16,15 @@ def handler(event, context): print(f"Processing {event}, {context}") response = client.put_events( - Entries=[ - { - 'Source': source, - 'DetailType': WorkflowRunStateChange.__name__, - 'Detail': json.dumps(wfm.Marshaller.marshall(event)), - 'EventBusName': event_bus_name, - }, - ], - ) + Entries=[ + { + 'Source': source, + 'DetailType': WorkflowRunStateChange.__name__, + 'Detail': json.dumps(wfm.Marshaller.marshall(event)), + 'EventBusName': event_bus_name, + }, + ], + ) print(f"Sent a WRSC event to event bus {event_bus_name}:") print(event) diff --git a/lib/workload/stateless/stacks/workflow-manager/workflow_manager_proc/tests/test_create_workflow_run_state.py b/lib/workload/stateless/stacks/workflow-manager/workflow_manager_proc/tests/test_create_workflow_run_state.py new file mode 100644 index 000000000..aee2b1420 --- /dev/null +++ b/lib/workload/stateless/stacks/workflow-manager/workflow_manager_proc/tests/test_create_workflow_run_state.py @@ -0,0 +1,180 @@ +from datetime import datetime, timedelta +from typing import List + +from django.db.models import QuerySet +from django.utils.timezone import make_aware + +from workflow_manager_proc.domain.workflowmanager.workflowrunstatechange import WorkflowRunStateChange +from workflow_manager_proc.services import create_workflow_run_state +from workflow_manager_proc.tests.case import WorkflowManagerProcUnitTestCase, logger +from workflow_manager.models import WorkflowRun, State, WorkflowRunUtil +from workflow_manager.tests.factories import WorkflowRunFactory + + +class WorkflowSrvUnitTests(WorkflowManagerProcUnitTestCase): + + def test_get_workflow_from_db(self): + """ + python manage.py test workflow_manager_proc.tests.test_create_workflow_run_state.WorkflowSrvUnitTests.test_get_workflow_from_db + """ + + test_event = { + "portalRunId": "202405012397gatc", + "executionId": "icav2.id.12345", + "timestamp": "2025-05-01T09:25:44Z", + "status": "DRAFT", + "workflowName": "ctTSO500", + "workflowVersion": "4.2.7", + "workflowRunName": "ctTSO500-L000002", + "payload": { + "version": "0.1.0", + "data": { + "projectId": "bxxxxxxxx-dxxx-4xxxx-adcc-xxxxxxxxx", + "analysisId": "12345678-238c-4200-b632-d5dd8c8db94a", + "userReference": "540424_A01001_0193_BBBBMMDRX5_c754de_bd822f", + "timeCreated": "2024-05-01T10:11:35Z", + "timeModified": "2024-05-01T11:24:29Z", + "pipelineId": "bfffffff-cb27-4dfa-846e-acd6eb081aca", + "pipelineCode": "CTTSO500 v4_2_7", + "pipelineDescription": "This is an ctTSO500 workflow execution", + "pipelineUrn": "urn:ilmn:ica:pipeline:bfffffff-cb27-4dfa-846e-acd6eb081aca#CTTSO500_v4_2_7" + } + } + } + + logger.info("Test the created WRSC event...") + result_wrsc: WorkflowRunStateChange = create_workflow_run_state.handler(test_event, None) + logger.info(result_wrsc) + self.assertIsNotNone(result_wrsc) + self.assertEqual("ctTSO500-L000002", result_wrsc.workflowRunName) + # We don't expect any library associations here! + self.assertIsNone(result_wrsc.linkedLibraries) + + logger.info("Test the persisted DB record...") + wfr_qs: QuerySet = WorkflowRun.objects.all() + self.assertEqual(1, wfr_qs.count()) + db_wfr: WorkflowRun = wfr_qs.first() + self.assertEqual("ctTSO500-L000002", db_wfr.workflow_run_name) + # We don't expect any library associations here! + self.assertEqual(0, db_wfr.libraries.count()) + + def test_get_workflow_from_db2(self): + """ + python manage.py test workflow_manager_proc.tests.test_create_workflow_run_state.WorkflowSrvUnitTests.test_get_workflow_from_db2 + """ + library_ids = ["L000001", "L000002"] + lib_ids = [ + { + "libraryId": library_ids[0], + "orcabusId": "lib.01J5M2J44HFJ9424G7074NKTGN" + }, + { + "libraryId": library_ids[1], + "orcabusId": "lib.01J5M2JFE1JPYV62RYQEG99CP5" + } + ] + + test_event = { + "portalRunId": "202405012397gatc", + "executionId": "icav2.id.12345", + "timestamp": "2025-05-01T09:25:44Z", + "status": "DRAFT", + "workflowName": "ctTSO500", + "workflowVersion": "4.2.7", + "workflowRunName": "ctTSO500-L000002", + "linkedLibraries": lib_ids, + "payload": { + "version": "0.1.0", + "data": { + "projectId": "bxxxxxxxx-dxxx-4xxxx-adcc-xxxxxxxxx", + "analysisId": "12345678-238c-4200-b632-d5dd8c8db94a", + "userReference": "540424_A01001_0193_BBBBMMDRX5_c754de_bd822f", + "timeCreated": "2024-05-01T10:11:35Z", + "timeModified": "2024-05-01T11:24:29Z", + "pipelineId": "bfffffff-cb27-4dfa-846e-acd6eb081aca", + "pipelineCode": "CTTSO500 v4_2_7", + "pipelineDescription": "This is an ctTSO500 workflow execution", + "pipelineUrn": "urn:ilmn:ica:pipeline:bfffffff-cb27-4dfa-846e-acd6eb081aca#CTTSO500_v4_2_7" + } + } + } + + logger.info("Test the created WRSC event...") + result_wrsc: WorkflowRunStateChange = create_workflow_run_state.handler(test_event, None) + logger.info(result_wrsc) + self.assertIsNotNone(result_wrsc) + self.assertEqual("ctTSO500-L000002", result_wrsc.workflowRunName) + # We do expect 2 library associations here! + self.assertIsNotNone(result_wrsc.linkedLibraries) + self.assertEqual(2, len(result_wrsc.linkedLibraries)) + for lib in result_wrsc.linkedLibraries: + self.assertTrue(lib.libraryId in library_ids) + + logger.info("Test the persisted DB record...") + wfr_qs: QuerySet = WorkflowRun.objects.all() + self.assertEqual(1, wfr_qs.count()) + db_wfr: WorkflowRun = wfr_qs.first() + self.assertEqual("ctTSO500-L000002", db_wfr.workflow_run_name) + # We do expect 2 library associations here! + self.assertEqual(2, db_wfr.libraries.count()) + for lib in db_wfr.libraries.all(): + self.assertTrue(lib.library_id in library_ids) + + def test_get_last_state(self): + """ + python manage.py test workflow_manager_proc.tests.test_create_workflow_run_state.WorkflowSrvUnitTests.test_get_last_state + """ + + wfr: WorkflowRun = WorkflowRunFactory() + s1: State = State( + timestamp=make_aware(datetime(2024, 1, 3, 23, 55, 59, 342380)), + workflow_run=wfr, + status='DRAFT' + ) + s2: State = State( + timestamp=make_aware(datetime(2024, 1, 1, 23, 55, 59, 342380)), + workflow_run=wfr, + status='DRAFT' + ) + s3: State = State( + timestamp=make_aware(datetime(2024, 1, 4, 23, 55, 59, 342380)), + workflow_run=wfr, + status='DRAFT' + ) + s4: State = State( + timestamp=make_aware(datetime(2024, 1, 2, 23, 55, 59, 342380)), + workflow_run=wfr, + status='DRAFT' + ) + + # Test different orders, they all have to come to the same conclusion + states: List[State] = [s1, s2, s3, s4] + latest: State = WorkflowRunUtil.get_latest_state(states) + self.assertEqual(s3.timestamp, latest.timestamp) + + states: List[State] = [s4, s1, s2, s3] + latest: State = WorkflowRunUtil.get_latest_state(states) + self.assertEqual(s3.timestamp, latest.timestamp) + + states: List[State] = [s3, s2, s1, s4] + latest: State = WorkflowRunUtil.get_latest_state(states) + self.assertEqual(s3.timestamp, latest.timestamp) + + # Now test from WorkflowRun level (need to persist DB objects though) + s1.save() + s2.save() + s3.save() + s4.save() + wfr.save() + util = WorkflowRunUtil(wfr) + latest = util.get_current_state() + self.assertEqual(s3.timestamp, latest.timestamp) + + # Test we can correctly apply a time delta + t1 = s1.timestamp + t2 = s2.timestamp + delta = t1 - t2 # = 2 days + print(f"delta sec: {abs(delta.total_seconds())}") + window = timedelta(hours=1) + print(f"window sec: {window.total_seconds()}") + self.assertTrue(delta > window, "delta > 1h") diff --git a/lib/workload/stateless/stacks/workflow-manager/workflow_manager_proc/tests/test_workflow_srv.py b/lib/workload/stateless/stacks/workflow-manager/workflow_manager_proc/tests/test_workflow_srv.py deleted file mode 100644 index a810e8bef..000000000 --- a/lib/workload/stateless/stacks/workflow-manager/workflow_manager_proc/tests/test_workflow_srv.py +++ /dev/null @@ -1,101 +0,0 @@ -from unittest import skip - -from django.db.models import QuerySet - -from workflow_manager_proc.services import create_workflow_run -from workflow_manager_proc.tests.case import WorkflowManagerProcUnitTestCase, logger -from workflow_manager.models import WorkflowRun - - -class WorkflowSrvUnitTests(WorkflowManagerProcUnitTestCase): - - # @skip - def test_get_workflow_from_db(self): - """ - # python manage.py test workflow_manager_proc.tests.test_workflow_srv.WorkflowSrvUnitTests.test_get_workflow_from_db - """ - - test_event = { - "portalRunId": "202405012397gatc", - "executionId": "icav2.id.12345", - "timestamp": "2025-05-01T09:25:44Z", - "status": "SUCCEEDED", - "workflowName": "ctTSO500", - "workflowVersion": "4.2.7", - "workflowRunName": "ctTSO500-L000002", - "payload": { - "version": "0.1.0", - "data": { - "projectId": "bxxxxxxxx-dxxx-4xxxx-adcc-xxxxxxxxx", - "analysisId": "12345678-238c-4200-b632-d5dd8c8db94a", - "userReference": "540424_A01001_0193_BBBBMMDRX5_c754de_bd822f", - "timeCreated": "2024-05-01T10:11:35Z", - "timeModified": "2024-05-01T11:24:29Z", - "pipelineId": "bfffffff-cb27-4dfa-846e-acd6eb081aca", - "pipelineCode": "CTTSO500 v4_2_7", - "pipelineDescription": "This is an ctTSO500 workflow execution", - "pipelineUrn": "urn:ilmn:ica:pipeline:bfffffff-cb27-4dfa-846e-acd6eb081aca#CTTSO500_v4_2_7" - } - } - } - - test_wfr = create_workflow_run.handler(test_event, None) - logger.info(test_wfr) - self.assertIsNotNone(test_wfr) - self.assertEqual("ctTSO500-L000002", test_wfr.workflow_run_name) - - logger.info("Retrieve persisted DB records") - wfr_qs: QuerySet = WorkflowRun.objects.all() - self.assertEqual(1, wfr_qs.count()) - db_wfr: WorkflowRun = wfr_qs.first() - self.assertEqual("ctTSO500-L000002", db_wfr.workflow_run_name) - - def test_get_workflow_from_db2(self): - """ - python manage.py test workflow_manager_proc.tests.test_workflow_srv.WorkflowSrvUnitTests.test_get_workflow_from_db2 - """ - library_ids = ["L000001", "L000002"] - lib_ids = [ - { - "libraryId": library_ids[0], - "orcabusId": "lib.01J5M2J44HFJ9424G7074NKTGN" - }, - { - "libraryId": library_ids[1], - "orcabusId": "lib.01J5M2JFE1JPYV62RYQEG99CP5" - } - ] - - test_event = { - "portalRunId": "202405012397gatc", - "executionId": "icav2.id.12345", - "timestamp": "2025-05-01T09:25:44Z", - "status": "SUCCEEDED", - "workflowName": "ctTSO500", - "workflowVersion": "4.2.7", - "workflowRunName": "ctTSO500-L000002", - "linkedLibraries": lib_ids, - "payload": { - "version": "0.1.0", - "data": { - "projectId": "bxxxxxxxx-dxxx-4xxxx-adcc-xxxxxxxxx", - "analysisId": "12345678-238c-4200-b632-d5dd8c8db94a", - "userReference": "540424_A01001_0193_BBBBMMDRX5_c754de_bd822f", - "timeCreated": "2024-05-01T10:11:35Z", - "timeModified": "2024-05-01T11:24:29Z", - "pipelineId": "bfffffff-cb27-4dfa-846e-acd6eb081aca", - "pipelineCode": "CTTSO500 v4_2_7", - "pipelineDescription": "This is an ctTSO500 workflow execution", - "pipelineUrn": "urn:ilmn:ica:pipeline:bfffffff-cb27-4dfa-846e-acd6eb081aca#CTTSO500_v4_2_7" - } - } - } - - test_wfl = create_workflow_run.handler(test_event, None) - logger.info(test_wfl) - self.assertIsNotNone(test_wfl) - self.assertEqual("ctTSO500-L000002", test_wfl.workflow_run_name) - libs = test_wfl.libraries.all() - for lib in libs: - logger.info(lib) - self.assertIn(lib.library_id, library_ids)