From 3e03bcda328a69c6b0956d98675155a02205d60e Mon Sep 17 00:00:00 2001 From: Roman Valls Guimera Date: Mon, 20 Nov 2023 13:19:04 +1100 Subject: [PATCH] Remove _deprecated --- .../_deprecated/bcl_convert/component.ts | 55 ----- .../bcl_convert/runtime/bcl_convert.py | 71 ------- .../_deprecated/bcl_convert/tests/__init__.py | 0 .../bcl_convert/tests/test_bcl_convert.py | 6 - .../_deprecated/dragen_wgs_qc/component.ts | 1 - .../dragen_wgs_qc/runtime/dragen_wgs_qc.py | 46 ---- .../dragen_wgs_somatic/component.ts | 1 - .../runtime/dragen_wgs_somatic.py | 50 ----- .../ens_event_manager/component.ts | 1 - .../runtime/ens_event_manager.py | 147 ------------- .../_deprecated/gds_manager/component.ts | 1 - .../gds_manager/runtime/gds_manager.py | 23 -- .../_deprecated/layers/base/requirements.txt | 1 - .../stateless/_deprecated/layers/component.ts | 47 ----- .../layers/create_layer_package.sh | 37 ---- .../layers/domain/model/data_file.py | 7 - .../layers/domain/model/library.py | 3 - .../layers/domain/model/sequence_run.py | 4 - .../layers/domain/model/subject.py | 3 - .../layers/domain/requirements.txt | 0 .../_deprecated/layers/eb_util/eb_util.py | 120 ----------- .../layers/schema/requirements.txt | 2 - .../layers/schema/schema/__init__.py | 0 .../schema/sequencerunstatechange/Event.py | 140 ------------- .../schema/sequencerunstatechange/__init__.py | 6 - .../sequencerunstatechange/marshaller.py | 140 ------------- .../schema/schema/weslaunchrequest/Event.py | 140 ------------- .../schema/weslaunchrequest/__init__.py | 6 - .../schema/weslaunchrequest/marshaller.py | 140 ------------- .../schema/schema/workflowrequest/Event.py | 110 ---------- .../schema/schema/workflowrequest/__init__.py | 6 - .../schema/workflowrequest/marshaller.py | 140 ------------- .../schema/workflowrunstatechange/Event.py | 110 ---------- .../schema/workflowrunstatechange/__init__.py | 6 - .../workflowrunstatechange/marshaller.py | 140 ------------- .../_deprecated/orchestrator/component.ts | 1 - .../orchestrator/runtime/orchestrator.py | 197 ------------------ .../_deprecated/wes_launcher/component.ts | 1 - .../wes_launcher/runtime/wes_launcher.py | 164 --------------- 39 files changed, 2073 deletions(-) delete mode 100644 lib/workload/stateless/_deprecated/bcl_convert/component.ts delete mode 100644 lib/workload/stateless/_deprecated/bcl_convert/runtime/bcl_convert.py delete mode 100644 lib/workload/stateless/_deprecated/bcl_convert/tests/__init__.py delete mode 100644 lib/workload/stateless/_deprecated/bcl_convert/tests/test_bcl_convert.py delete mode 100644 lib/workload/stateless/_deprecated/dragen_wgs_qc/component.ts delete mode 100644 lib/workload/stateless/_deprecated/dragen_wgs_qc/runtime/dragen_wgs_qc.py delete mode 100644 lib/workload/stateless/_deprecated/dragen_wgs_somatic/component.ts delete mode 100644 lib/workload/stateless/_deprecated/dragen_wgs_somatic/runtime/dragen_wgs_somatic.py delete mode 100644 lib/workload/stateless/_deprecated/ens_event_manager/component.ts delete mode 100644 lib/workload/stateless/_deprecated/ens_event_manager/runtime/ens_event_manager.py delete mode 100644 lib/workload/stateless/_deprecated/gds_manager/component.ts delete mode 100644 lib/workload/stateless/_deprecated/gds_manager/runtime/gds_manager.py delete mode 100644 lib/workload/stateless/_deprecated/layers/base/requirements.txt delete mode 100644 lib/workload/stateless/_deprecated/layers/component.ts delete mode 100755 lib/workload/stateless/_deprecated/layers/create_layer_package.sh delete mode 100644 lib/workload/stateless/_deprecated/layers/domain/model/data_file.py delete mode 100644 lib/workload/stateless/_deprecated/layers/domain/model/library.py delete mode 100644 lib/workload/stateless/_deprecated/layers/domain/model/sequence_run.py delete mode 100644 lib/workload/stateless/_deprecated/layers/domain/model/subject.py delete mode 100644 lib/workload/stateless/_deprecated/layers/domain/requirements.txt delete mode 100644 lib/workload/stateless/_deprecated/layers/eb_util/eb_util.py delete mode 100644 lib/workload/stateless/_deprecated/layers/schema/requirements.txt delete mode 100644 lib/workload/stateless/_deprecated/layers/schema/schema/__init__.py delete mode 100644 lib/workload/stateless/_deprecated/layers/schema/schema/sequencerunstatechange/Event.py delete mode 100644 lib/workload/stateless/_deprecated/layers/schema/schema/sequencerunstatechange/__init__.py delete mode 100644 lib/workload/stateless/_deprecated/layers/schema/schema/sequencerunstatechange/marshaller.py delete mode 100644 lib/workload/stateless/_deprecated/layers/schema/schema/weslaunchrequest/Event.py delete mode 100644 lib/workload/stateless/_deprecated/layers/schema/schema/weslaunchrequest/__init__.py delete mode 100644 lib/workload/stateless/_deprecated/layers/schema/schema/weslaunchrequest/marshaller.py delete mode 100644 lib/workload/stateless/_deprecated/layers/schema/schema/workflowrequest/Event.py delete mode 100644 lib/workload/stateless/_deprecated/layers/schema/schema/workflowrequest/__init__.py delete mode 100644 lib/workload/stateless/_deprecated/layers/schema/schema/workflowrequest/marshaller.py delete mode 100644 lib/workload/stateless/_deprecated/layers/schema/schema/workflowrunstatechange/Event.py delete mode 100644 lib/workload/stateless/_deprecated/layers/schema/schema/workflowrunstatechange/__init__.py delete mode 100644 lib/workload/stateless/_deprecated/layers/schema/schema/workflowrunstatechange/marshaller.py delete mode 100644 lib/workload/stateless/_deprecated/orchestrator/component.ts delete mode 100644 lib/workload/stateless/_deprecated/orchestrator/runtime/orchestrator.py delete mode 100644 lib/workload/stateless/_deprecated/wes_launcher/component.ts delete mode 100644 lib/workload/stateless/_deprecated/wes_launcher/runtime/wes_launcher.py diff --git a/lib/workload/stateless/_deprecated/bcl_convert/component.ts b/lib/workload/stateless/_deprecated/bcl_convert/component.ts deleted file mode 100644 index c0b042128..000000000 --- a/lib/workload/stateless/_deprecated/bcl_convert/component.ts +++ /dev/null @@ -1,55 +0,0 @@ -import { Construct } from 'constructs'; -import { aws_events_targets, aws_lambda } from 'aws-cdk-lib'; -import * as path from 'path'; -import { ISecurityGroup, IVpc, SubnetType } from 'aws-cdk-lib/aws-ec2'; -import { IEventBus, Rule } from 'aws-cdk-lib/aws-events'; -import { ILayerVersion } from 'aws-cdk-lib/aws-lambda'; - -export interface BclConvertProps { - layers: ILayerVersion[]; - securityGroups: ISecurityGroup[]; - vpc: IVpc; - mainBus: IEventBus; - functionName: string; - lambdaRuntimePythonVersion: aws_lambda.Runtime; -} - -export class BclConvertConstruct extends Construct { - constructor(scope: Construct, id: string, props: BclConvertProps) { - super(scope, id); - - const bclConvertLambda = new aws_lambda.Function(this, 'BclConvertFunction', { - runtime: props.lambdaRuntimePythonVersion, - code: aws_lambda.Code.fromAsset(path.join(__dirname, 'runtime/')), - handler: 'handler', - vpc: props.vpc, - vpcSubnets: { - subnetType: SubnetType.PRIVATE_WITH_EGRESS, - }, - securityGroups: props.securityGroups, - layers: props.layers, - functionName: props.functionName, - environment: { - EVENT_BUS_NAME: props.mainBus.eventBusName, - }, - }); - - props.mainBus.grantPutEventsTo(bclConvertLambda); - - // --- - - // TODO also consider Aspect to cross cut creating "the same Rule for multiple consumers" scenario - - const bclConvertEventRule = new Rule(this, 'BclConvertEventRule', { - ruleName: 'BclConvertEventRule', - description: 'Rule to send {event_type.value} events to the {handler.function_name} Lambda', - eventBus: props.mainBus, - }); - - bclConvertEventRule.addTarget(new aws_events_targets.LambdaFunction(bclConvertLambda)); - bclConvertEventRule.addEventPattern({ - source: ['ORCHESTRATOR'], // FIXME how to impl? how to share a "code construct" between TS and Py... One way is jsii - detailType: ['SequenceRunStateChange'], - }); - } -} diff --git a/lib/workload/stateless/_deprecated/bcl_convert/runtime/bcl_convert.py b/lib/workload/stateless/_deprecated/bcl_convert/runtime/bcl_convert.py deleted file mode 100644 index f1402c755..000000000 --- a/lib/workload/stateless/_deprecated/bcl_convert/runtime/bcl_convert.py +++ /dev/null @@ -1,71 +0,0 @@ -import json -import logging -from datetime import datetime -import eb_util as util -import schema.sequencerunstatechange as srsc -import schema.weslaunchrequest as wlr - -logger = logging.getLogger() -logger.setLevel(logging.INFO) - - -def handler(event, context): - """ - Lambda to prepare and trigger BCL CONVERT workflow runs. - - An event payload (detail) of the following format is expected: - SequenceRunStateChange: - { - "sequence_run_name": "210525_A00100_0999_AHYYKD8MYX", - "sequence_run_id": "r.c1IvasRL4U2MubrbB13cI4", - 'gds_volume_name': "bssh.xxxx", - 'gds_folder_path': "/Runs/210525_A00100_0999_AHYYKD8MYX_r.c1IvasRL4U2MubrbB13cI4", - 'status': "", - 'timestamp': "" - } - - :param event: An AWSEvent with a SequenceRunStateChange event payload (detail) and SequenceRunStateChange detail-type - :param context: Not used. - :return: - """ - logger.info("Starting bcl_convert handler") - logger.info(json.dumps(event)) - - event_type = event.get(util.BusEventKey.DETAIL_TYPE.value) - if event_type != util.EventType.SRSC.value: - raise ValueError(f"Unsupported event type: {event_type}") - - payload = event.get(util.BusEventKey.DETAIL.value) - srsc_event: srsc.Event = srsc.Marshaller.unmarshall(payload, typeName=srsc.Event) - - workflow_input = { - "seq_run_name": srsc_event.sequence_run_name, - "seq_run_id": srsc_event.sequence_run_id, - "gds_volume": srsc_event.gds_volume_name, - "gds_path": srsc_event.gds_folder_path, - "sample_sheet": f"gds://{srsc_event.gds_volume_name}{srsc_event.gds_folder_path}/SampleSheet.csv", - } - logger.info(f"Created workflow input: {workflow_input}") - wf_name = f"{util.WorkflowType.BCL_CONVERT}_workflow_{srsc_event.sequence_run_name}_{srsc_event.sequence_run_id}" - - logger.info(f"Sending WES launch request for workflow {wf_name}") - wes_launch_request = wlr.Event( - workflow_run_name=wf_name, - workflow_id="wfl.w94tygian4p8g4", - workflow_version="3.7.5-34afe2c", - workflow_input=workflow_input, - timestamp=datetime.utcnow(), - workflow_engine_parameters={}, - ) - - logger.info( - f"Emitting {util.EventType.WES_LAUNCH} request event: {wes_launch_request}" - ) - # mock: just forward on the payload - util.send_event_to_bus_schema( - event_type=util.EventType.WES_LAUNCH, - event_source=util.EventSource.BCL_CONVERT, - event_payload=wes_launch_request, - ) - - logger.info("All done.") diff --git a/lib/workload/stateless/_deprecated/bcl_convert/tests/__init__.py b/lib/workload/stateless/_deprecated/bcl_convert/tests/__init__.py deleted file mode 100644 index e69de29bb..000000000 diff --git a/lib/workload/stateless/_deprecated/bcl_convert/tests/test_bcl_convert.py b/lib/workload/stateless/_deprecated/bcl_convert/tests/test_bcl_convert.py deleted file mode 100644 index f012c1a46..000000000 --- a/lib/workload/stateless/_deprecated/bcl_convert/tests/test_bcl_convert.py +++ /dev/null @@ -1,6 +0,0 @@ -from unittest import TestCase - - -class BclConvertUnitTest(TestCase): - def test_handler(self): - self.assertTrue(True) diff --git a/lib/workload/stateless/_deprecated/dragen_wgs_qc/component.ts b/lib/workload/stateless/_deprecated/dragen_wgs_qc/component.ts deleted file mode 100644 index 4b34ca06f..000000000 --- a/lib/workload/stateless/_deprecated/dragen_wgs_qc/component.ts +++ /dev/null @@ -1 +0,0 @@ -// TODO impl diff --git a/lib/workload/stateless/_deprecated/dragen_wgs_qc/runtime/dragen_wgs_qc.py b/lib/workload/stateless/_deprecated/dragen_wgs_qc/runtime/dragen_wgs_qc.py deleted file mode 100644 index ed2e88658..000000000 --- a/lib/workload/stateless/_deprecated/dragen_wgs_qc/runtime/dragen_wgs_qc.py +++ /dev/null @@ -1,46 +0,0 @@ -import json -import logging -from datetime import datetime -import eb_util as util -import schema.workflowrequest as wfr -import schema.weslaunchrequest as wlr - -logger = logging.getLogger() -logger.setLevel(logging.INFO) - - -def handler(event, context): - logger.info("Starting dragen_wgs_qc handler") - logger.info(json.dumps(event)) - - event_type = event.get(util.BusEventKey.DETAIL_TYPE.value) - if event_type != util.EventType.DRAGEN_WGS_QC.value: - raise ValueError(f"Unsupported event type: {event_type}") - - payload = event.get(util.BusEventKey.DETAIL.value) - wfr_event: wfr.Event = wfr.Marshaller.unmarshall(payload, typeName=wfr.Event) - - workflow_input = {"library_id": wfr_event.library_id, "fastq_list_rows": {}} - logger.info(f"Created workflow input: {workflow_input}") - wf_name = f"{util.WorkflowType.DRAGEN_WGS_QC}_workflow_{wfr_event.library_id}" - - logger.info(f"Sending WES launch request for workflow {wf_name}") - wes_launch_request = wlr.Event( - workflow_run_name=wf_name, - workflow_id="wfl.w94tygian4p8g4", - workflow_version="3.7.5-34afe2c", - workflow_input=workflow_input, - timestamp=datetime.utcnow(), - workflow_engine_parameters={}, - ) - - logger.info( - f"Emitting {util.EventType.WES_LAUNCH} request event: {wes_launch_request}" - ) - util.send_event_to_bus_schema( - event_type=util.EventType.WES_LAUNCH, - event_source=util.EventSource.DRAGEN_WGS_QC, - event_payload=wes_launch_request, - ) - - logger.info("All done.") diff --git a/lib/workload/stateless/_deprecated/dragen_wgs_somatic/component.ts b/lib/workload/stateless/_deprecated/dragen_wgs_somatic/component.ts deleted file mode 100644 index 4b34ca06f..000000000 --- a/lib/workload/stateless/_deprecated/dragen_wgs_somatic/component.ts +++ /dev/null @@ -1 +0,0 @@ -// TODO impl diff --git a/lib/workload/stateless/_deprecated/dragen_wgs_somatic/runtime/dragen_wgs_somatic.py b/lib/workload/stateless/_deprecated/dragen_wgs_somatic/runtime/dragen_wgs_somatic.py deleted file mode 100644 index a8c066701..000000000 --- a/lib/workload/stateless/_deprecated/dragen_wgs_somatic/runtime/dragen_wgs_somatic.py +++ /dev/null @@ -1,50 +0,0 @@ -import json -import logging -from datetime import datetime -import eb_util as util -import schema.workflowrequest as wfr -import schema.weslaunchrequest as wlr - -logger = logging.getLogger() -logger.setLevel(logging.INFO) - - -def handler(event, context): - logger.info("Starting dragen_wgs_somatic handler") - logger.info(json.dumps(event)) - - event_type = event.get(util.BusEventKey.DETAIL_TYPE.value) - if event_type != util.EventType.DRAGEN_WGS_SOMATIC.value: - raise ValueError(f"Unsupported event type: {event_type}") - - payload = event.get(util.BusEventKey.DETAIL.value) - wfr_event: wfr.Event = wfr.Marshaller.unmarshall(payload, typeName=wfr.Event) - - workflow_input = { - "library_id_normal": wfr_event.library_id, - "library_id_tumor": "L210999", - "fastq_list_rows": {}, - } - logger.info(f"Created workflow input: {workflow_input}") - wf_name = f"{util.WorkflowType.DRAGEN_WGS_SOMATIC}_workflow_{wfr_event.library_id}" - - logger.info(f"Sending WES launch request for workflow {wf_name}") - wes_launch_request = wlr.Event( - workflow_run_name=wf_name, - workflow_id="wfl.w94tygian4p8g4", - workflow_version="3.7.5-34afe2c", - workflow_input=workflow_input, - timestamp=datetime.utcnow(), - workflow_engine_parameters={}, - ) - - logger.info( - f"Emitting {util.EventType.WES_LAUNCH} request event: {wes_launch_request}" - ) - util.send_event_to_bus_schema( - event_type=util.EventType.WES_LAUNCH, - event_source=util.EventSource.DRAGEN_WGS_SOMATIC, - event_payload=wes_launch_request, - ) - - logger.info("All done.") diff --git a/lib/workload/stateless/_deprecated/ens_event_manager/component.ts b/lib/workload/stateless/_deprecated/ens_event_manager/component.ts deleted file mode 100644 index 4b34ca06f..000000000 --- a/lib/workload/stateless/_deprecated/ens_event_manager/component.ts +++ /dev/null @@ -1 +0,0 @@ -// TODO impl diff --git a/lib/workload/stateless/_deprecated/ens_event_manager/runtime/ens_event_manager.py b/lib/workload/stateless/_deprecated/ens_event_manager/runtime/ens_event_manager.py deleted file mode 100644 index 0ab65fd1c..000000000 --- a/lib/workload/stateless/_deprecated/ens_event_manager/runtime/ens_event_manager.py +++ /dev/null @@ -1,147 +0,0 @@ -import json -from datetime import datetime -import logging -from enum import Enum -import eb_util as util -import schema.sequencerunstatechange as srsc -import schema.workflowrunstatechange as wrsc - -logger = logging.getLogger() -logger.setLevel(logging.INFO) -# TODO: split into separate lambdas? each responsible for s specific ENS event type, each with it's own ENS subscription - - -class ENSEventType(Enum): - """ - REF: - https://iap-docs.readme.io/docs/ens_available-events - https://github.com/umccr-illumina/stratus/issues/22#issuecomment-628028147 - https://github.com/umccr-illumina/stratus/issues/58 - https://iap-docs.readme.io/docs/upload-instrument-runs#run-upload-event - """ - - GDS_FILES = "gds.files" - BSSH_RUNS = "bssh.runs" - WES_RUNS = "wes.runs" - - def __str__(self): - return self.value - - def __repr__(self): - return f"{type(self).__name__}.{self}" - - -SUPPORTED_ENS_TYPES = [ - ENSEventType.BSSH_RUNS.value, - ENSEventType.GDS_FILES.value, - ENSEventType.WES_RUNS.value, -] - - -def handle_wes_runs_event(event): - logger.info("Handling wes.runs event") - event_action = event["messageAttributes"]["action"][ - "stringValue" - ] # TODO: check! should probably be 'updated' - message_body = json.loads(event["body"]) - - # TODO: convert SQS/ENS event into corresponding OrcaBus event - event_status = message_body["EventType"] # RunSucceeded - event_time = message_body["Timestamp"] - workflow_name = message_body["Name"] - workflow_id = message_body["WorkflowRun"]["Id"] # wfr.23487yq4508 - workflow_status = message_body["WorkflowRun"]["Status"] # Succeeded - - # Convert Pending status of WES launcher to Succeeded (simulating a successful WES run) - if workflow_status == "Pending": - workflow_status = "Succeeded" - else: - # We are receiving mock events from the WES launcher directly, those have all Pending status - raise ValueError( - "Received non Pending WES run status! Only expect Pending at this point." - ) - - wrsc_event = wrsc.Event( - workflow_run_name=workflow_name, - workflow_run_id=workflow_id, - status=workflow_status, - timestamp=event_time, - ) - - logger.info(f"Emitting {util.EventType.SRSC} event {wrsc_event}") - util.send_event_to_bus_schema( - event_type=util.EventType.WRSC, - event_source=util.EventSource.ENS_HANDLER, - event_payload=wrsc_event, - ) - - -def handle_gds_file_event(event): - logger.info("Handling gds.files event") - event_action = event["messageAttributes"]["action"]["stringValue"] - - # message_body = json.loads(event['body']) - # TODO: also check if report and create a report event - - if event_action == "deleted": - logger.info(f"A file was removed.") - # delete DB file record - else: - logger.info(f"A file was added/updated.") - # create/update DB file record - - -def handle_bssh_runs_event(event): - logger.info("Handling bssh.runs event") - event_action = event["messageAttributes"]["action"]["stringValue"] - if event_action != "statuschanged": - raise ValueError(f"Unexpected event action: {event_action}") - message_body = json.loads(event["body"]) - - # TODO: check difference between run name and instrument run ID - srn = message_body["name"] - iri = message_body["instrumentRunId"] - if srn != iri: - raise ValueError( - f"Sequence run name and instrumentRunId are not the same! {srn} != {iri}" - ) - - ev = srsc.Event( - sequence_run_name=srn, - sequence_run_id=message_body["id"], - gds_folder_path=message_body["gdsFolderPath"], - gds_volume_name=message_body["gdsVolumeName"], - status=message_body["status"], - timestamp=datetime.utcnow(), - ) - - logger.info(f"Emitting {util.EventType.SRSC} event {ev}") - util.send_event_to_bus_schema( - event_type=util.EventType.SRSC, - event_source=util.EventSource.ENS_HANDLER, - event_payload=ev, - ) - - -def handler(event, context): - # Log the received event in CloudWatch - logger.info("Starting ens_event_manager") - logger.info(json.dumps(event)) - - # An SQS event can carry multiple Records, each one may be a different ENS event - for message in event["Records"]: - event_type = message["messageAttributes"]["type"]["stringValue"] - if event_type not in SUPPORTED_ENS_TYPES: - logger.warning(f"Skipping unsupported IAP ENS type: {event_type}") - continue - - if event_type == ENSEventType.WES_RUNS.value: - handle_wes_runs_event(message) - - if event_type == ENSEventType.BSSH_RUNS.value: - handle_bssh_runs_event(message) - - if event_type == ENSEventType.GDS_FILES.value: - handle_gds_file_event(message) - - logger.info("All done.") diff --git a/lib/workload/stateless/_deprecated/gds_manager/component.ts b/lib/workload/stateless/_deprecated/gds_manager/component.ts deleted file mode 100644 index 4b34ca06f..000000000 --- a/lib/workload/stateless/_deprecated/gds_manager/component.ts +++ /dev/null @@ -1 +0,0 @@ -// TODO impl diff --git a/lib/workload/stateless/_deprecated/gds_manager/runtime/gds_manager.py b/lib/workload/stateless/_deprecated/gds_manager/runtime/gds_manager.py deleted file mode 100644 index a09ab0f39..000000000 --- a/lib/workload/stateless/_deprecated/gds_manager/runtime/gds_manager.py +++ /dev/null @@ -1,23 +0,0 @@ -import json -import eb_util as util - - -def is_report(event): - return True - - -def handler(event, context): - # Log the received event in CloudWatch - print("Starting GDS event handler") - print(f"Received event: {json.dumps(event)}") - - print("converting SQS-ENS-GDS event into OrcaBus GDS event") - - object_key = event.get("object_key", "test-file.txt") - payload = {"volume": "test-volume", "name": object_key} - - util.send_event_to_bus( - event_type=util.EventType.FSC, - event_source=util.EventSource.GDS, - event_payload=payload, - ) diff --git a/lib/workload/stateless/_deprecated/layers/base/requirements.txt b/lib/workload/stateless/_deprecated/layers/base/requirements.txt deleted file mode 100644 index 0c53911c4..000000000 --- a/lib/workload/stateless/_deprecated/layers/base/requirements.txt +++ /dev/null @@ -1 +0,0 @@ -boto3==1.26.115 diff --git a/lib/workload/stateless/_deprecated/layers/component.ts b/lib/workload/stateless/_deprecated/layers/component.ts deleted file mode 100644 index 526433646..000000000 --- a/lib/workload/stateless/_deprecated/layers/component.ts +++ /dev/null @@ -1,47 +0,0 @@ -import { Construct } from 'constructs'; -import { aws_lambda } from 'aws-cdk-lib'; -import * as path from 'path'; - -export interface LambdaLayerProps { - lambdaRuntimePythonVersion: aws_lambda.Runtime; -} - -export class LambdaLayerConstruct extends Construct { - private readonly _eb_util: aws_lambda.LayerVersion; - private readonly _schema: aws_lambda.LayerVersion; - private _all: aws_lambda.LayerVersion[] = []; - - constructor(scope: Construct, id: string, props: LambdaLayerProps) { - super(scope, id); - this._eb_util = this.createLambdaLayer('eb_util', props); // FIXME refactor, externalise the deps dir, see todo in orcabus-stateless-stack.ts - this._schema = this.createLambdaLayer('schema', props); - - this._all.push(this._eb_util); - this._all.push(this._schema); - } - - private _build_deps() { - // TODO using docker SDK to build the deps as part of `cdk synth`, if some `build-auto` flag is true - // See https://github.com/umccr/infrastructure/blob/2a1d47c485d11f8a4a9bf0d2cd865f8450164876/cdk/apps/htsget/htsget/goserver.py#L374 - } - - private createLambdaLayer(name: string, props: LambdaLayerProps) { - return new aws_lambda.LayerVersion(this, 'OrcaBus_' + name + '_LayerVersion', { - code: aws_lambda.Code.fromAsset(path.join(__dirname, name + '.zip')), - compatibleRuntimes: [props.lambdaRuntimePythonVersion], - description: 'Lambda layer ' + name + ' for ' + props.lambdaRuntimePythonVersion.name, - }); - } - - get eb_util(): aws_lambda.LayerVersion { - return this._eb_util; - } - - get schema(): aws_lambda.LayerVersion { - return this._schema; - } - - get all(): aws_lambda.LayerVersion[] { - return this._all; - } -} diff --git a/lib/workload/stateless/_deprecated/layers/create_layer_package.sh b/lib/workload/stateless/_deprecated/layers/create_layer_package.sh deleted file mode 100755 index 598a3247d..000000000 --- a/lib/workload/stateless/_deprecated/layers/create_layer_package.sh +++ /dev/null @@ -1,37 +0,0 @@ -#!/bin/bash - -LAYER_NAME=$1 -SCRIPT_DIR=$(dirname $0) -SCRIPT_PATH="$( cd -- "$(dirname "$0")" >/dev/null 2>&1 ; pwd -P )" - -if test -z "$LAYER_NAME"; then - echo "LAYER_NAME is not set! Specify the layer for which to create the Lambda package." - exit 1 -fi - -# specify the lib directory (according to AWS Lambda guidelines) -export PKG_DIR=$SCRIPT_DIR/"python" -export LAYER_DIR=$SCRIPT_DIR/${LAYER_NAME} - -# clean up any existing files -rm -rf ${PKG_DIR} && mkdir -p ${PKG_DIR} -cp -R ${LAYER_DIR}/ ${PKG_DIR}/ - -# install the python libraries (without dependencies) -docker run \ - --platform=linux/x86-64 \ - --rm \ - -v ${SCRIPT_PATH}/python:/foo \ - -w /foo \ - public.ecr.aws/sam/build-python3.10 \ - pip install -r requirements.txt --no-deps -t ./ - -# clean the lib directory -rm -rf ${PKG_DIR}/*.dist-info -find python -type d -name __pycache__ -exec rm -rf {} + - -# create the package zip -zip -r ${LAYER_DIR}.zip ${PKG_DIR}/ - -# remove the inflated directory -rm -rf ${PKG_DIR}/ diff --git a/lib/workload/stateless/_deprecated/layers/domain/model/data_file.py b/lib/workload/stateless/_deprecated/layers/domain/model/data_file.py deleted file mode 100644 index 639125fc6..000000000 --- a/lib/workload/stateless/_deprecated/layers/domain/model/data_file.py +++ /dev/null @@ -1,7 +0,0 @@ -class DataFile: - def __init__(self, name, url): - self.name = name - self.type = "" - self.url = url - self.container = "" - self.path = "" diff --git a/lib/workload/stateless/_deprecated/layers/domain/model/library.py b/lib/workload/stateless/_deprecated/layers/domain/model/library.py deleted file mode 100644 index 563699696..000000000 --- a/lib/workload/stateless/_deprecated/layers/domain/model/library.py +++ /dev/null @@ -1,3 +0,0 @@ -class Library: - def __init__(self, library_id): - self.library_id = library_id diff --git a/lib/workload/stateless/_deprecated/layers/domain/model/sequence_run.py b/lib/workload/stateless/_deprecated/layers/domain/model/sequence_run.py deleted file mode 100644 index 8d4cbad55..000000000 --- a/lib/workload/stateless/_deprecated/layers/domain/model/sequence_run.py +++ /dev/null @@ -1,4 +0,0 @@ -class SequenceRun: - def __init__(self, run_name, run_id): - self.run_name = run_name - self.run_id = run_id diff --git a/lib/workload/stateless/_deprecated/layers/domain/model/subject.py b/lib/workload/stateless/_deprecated/layers/domain/model/subject.py deleted file mode 100644 index 70442ea80..000000000 --- a/lib/workload/stateless/_deprecated/layers/domain/model/subject.py +++ /dev/null @@ -1,3 +0,0 @@ -class Subject: - def __init__(self, subject_id): - self.subject_id = subject_id diff --git a/lib/workload/stateless/_deprecated/layers/domain/requirements.txt b/lib/workload/stateless/_deprecated/layers/domain/requirements.txt deleted file mode 100644 index e69de29bb..000000000 diff --git a/lib/workload/stateless/_deprecated/layers/eb_util/eb_util.py b/lib/workload/stateless/_deprecated/layers/eb_util/eb_util.py deleted file mode 100644 index e2897e6e9..000000000 --- a/lib/workload/stateless/_deprecated/layers/eb_util/eb_util.py +++ /dev/null @@ -1,120 +0,0 @@ -import os -import boto3 -import time -import json -from enum import Enum - -event_bus = boto3.client("events") -event_bus_name = os.environ.get("EVENT_BUS_NAME") - -# TODO: split across multiple util modules? - - -class WorkflowType(Enum): - BCL_CONVERT = "bcl_convert" - DRAGEN_WGS_QC = "dragen_wgs_qc" - DRAGEN_WGS_SOMATIC = "dragen_wgs_somatic" - DRAGEN_WTS = "dragen_wts" - - def __str__(self): - return self.value - - def __repr__(self): - return f"{type(self).__name__}.{self}" - - -class BusEventKey(Enum): - DETAIL_TYPE = "detail-type" - DETAIL = "detail" - ID = "id" - version = "version" - SOURCE = "source" - ACCOUNT = "account" - time = "time" - region = "region" - resources = "resources" - - def __str__(self): - return self.value - - def __repr__(self): - return f"{type(self).__name__}.{self}" - - -class EventSource(Enum): - ENS_HANDLER = "ENS_HANDLER" - GDS = "GDS" - WES = "WES" - BSSH = "BSSH" - ORCHESTRATOR = "ORCHESTRATOR" - BCL_CONVERT = "BCL_CONVERT" - DRAGEN_WGS_QC = "DRAGEN_WGS_QC" - DRAGEN_WGS_SOMATIC = "DRAGEN_WGS_SOMATIC" - - def __str__(self): - return self.value - - def __repr__(self): - return f"{type(self).__name__}.{self}" - - -class EventType(Enum): - SRSC = "SequenceRunStateChange" - WRSC = "WorkflowRunStateChange" - FSC = "FileStateChange" - BCL_CONVERT = "BCL_CONVERT" - DRAGEN_WGS_QC = "DRAGEN_WGS_QC" - DRAGEN_WGS_SOMATIC = "DRAGEN_WGS_SOMATIC" - DRAGEN_TSO_CTDNA = "DRAGEN_TSO_CTDNA" - WES_LAUNCH = "WES_LAUNCH" - - def __str__(self): - return self.value - - def __repr__(self): - return f"{type(self).__name__}.{self}" - - -def send_event_to_bus( - event_source: EventSource, event_type: EventType, event_payload -) -> dict: - # TODO: figure out best timestamp handling - response = event_bus.put_events( - Entries=[ - { - "Time": time.time(), - "Source": event_source.value, - "Resources": [], - "DetailType": event_type.value, - "Detail": json.dumps(event_payload), - "EventBusName": event_bus_name, - }, - ] - ) - - return response - - -def send_event_to_bus_schema( - event_source: EventSource, event_type: EventType, event_payload -) -> dict: - # TODO: figure out best timestamp handling - # TODO: use default str encoding with json.dumps or use model specific marshaller? - response = event_bus.put_events( - Entries=[ - { - "Time": time.time(), - "Source": event_source.value, - "Resources": [], - "DetailType": event_type.value, - "Detail": json.dumps(event_payload.to_dict(), default=str), - "EventBusName": event_bus_name, - }, - ] - ) - - return response - - -def emit_event(event) -> dict: - return event_bus.put_events(Entries=[event]) diff --git a/lib/workload/stateless/_deprecated/layers/schema/requirements.txt b/lib/workload/stateless/_deprecated/layers/schema/requirements.txt deleted file mode 100644 index b85650be4..000000000 --- a/lib/workload/stateless/_deprecated/layers/schema/requirements.txt +++ /dev/null @@ -1,2 +0,0 @@ -six==1.12.0 -regex==2019.11.1 diff --git a/lib/workload/stateless/_deprecated/layers/schema/schema/__init__.py b/lib/workload/stateless/_deprecated/layers/schema/schema/__init__.py deleted file mode 100644 index e69de29bb..000000000 diff --git a/lib/workload/stateless/_deprecated/layers/schema/schema/sequencerunstatechange/Event.py b/lib/workload/stateless/_deprecated/layers/schema/schema/sequencerunstatechange/Event.py deleted file mode 100644 index 0c8db2690..000000000 --- a/lib/workload/stateless/_deprecated/layers/schema/schema/sequencerunstatechange/Event.py +++ /dev/null @@ -1,140 +0,0 @@ -# coding: utf-8 -import pprint -import re # noqa: F401 - -import six -from enum import Enum - - -class Event(object): - _types = { - "sequence_run_id": "str", - "sequence_run_name": "str", - "gds_volume_name": "str", - "gds_folder_path": "str", - "status": "str", - "timestamp": "datetime", - } - - _attribute_map = { - "sequence_run_id": "sequence_run_id", - "sequence_run_name": "sequence_run_name", - "gds_volume_name": "gds_volume_name", - "gds_folder_path": "gds_folder_path", - "status": "status", - "timestamp": "timestamp", - } - - def __init__( - self, - sequence_run_id=None, - sequence_run_name=None, - gds_volume_name=None, - gds_folder_path=None, - status=None, - timestamp=None, - ): # noqa: E501 - self._sequence_run_id = None - self._sequence_run_name = None - self._gds_volume_name = None - self._gds_folder_path = None - self._status = None - self._timestamp = None - self.discriminator = None - self.sequence_run_id = sequence_run_id - self.sequence_run_name = sequence_run_name - self.gds_volume_name = gds_volume_name - self.gds_folder_path = gds_folder_path - self.status = status - self.timestamp = timestamp - - @property - def sequence_run_id(self): - return self._sequence_run_id - - @sequence_run_id.setter - def sequence_run_id(self, sequence_run_id): - self._sequence_run_id = sequence_run_id - - @property - def sequence_run_name(self): - return self._sequence_run_name - - @sequence_run_name.setter - def sequence_run_name(self, sequence_run_name): - self._sequence_run_name = sequence_run_name - - @property - def gds_volume_name(self): - return self._gds_volume_name - - @gds_volume_name.setter - def gds_volume_name(self, gds_volume_name): - self._gds_volume_name = gds_volume_name - - @property - def gds_folder_path(self): - return self._gds_folder_path - - @gds_folder_path.setter - def gds_folder_path(self, gds_folder_path): - self._gds_folder_path = gds_folder_path - - @property - def status(self): - return self._status - - @status.setter - def status(self, status): - self._status = status - - @property - def timestamp(self): - return self._timestamp - - @timestamp.setter - def timestamp(self, timestamp): - self._timestamp = timestamp - - def to_dict(self): - result = {} - - for attr, _ in six.iteritems(self._types): - value = getattr(self, attr) - if isinstance(value, list): - result[attr] = list( - map(lambda x: x.to_dict() if hasattr(x, "to_dict") else x, value) - ) - elif hasattr(value, "to_dict"): - result[attr] = value.to_dict() - elif isinstance(value, dict): - result[attr] = dict( - map( - lambda item: (item[0], item[1].to_dict()) - if hasattr(item[1], "to_dict") - else item, - value.items(), - ) - ) - else: - result[attr] = value - if issubclass(Event, dict): - for key, value in self.items(): - result[key] = value - - return result - - def to_str(self): - return pprint.pformat(self.to_dict()) - - def __repr__(self): - return self.to_str() - - def __eq__(self, other): - if not isinstance(other, Event): - return False - - return self.__dict__ == other.__dict__ - - def __ne__(self, other): - return not self == other diff --git a/lib/workload/stateless/_deprecated/layers/schema/schema/sequencerunstatechange/__init__.py b/lib/workload/stateless/_deprecated/layers/schema/schema/sequencerunstatechange/__init__.py deleted file mode 100644 index aa6f01e79..000000000 --- a/lib/workload/stateless/_deprecated/layers/schema/schema/sequencerunstatechange/__init__.py +++ /dev/null @@ -1,6 +0,0 @@ -# coding: utf-8 - -from __future__ import absolute_import - -from schema.sequencerunstatechange.marshaller import Marshaller -from schema.sequencerunstatechange.Event import Event diff --git a/lib/workload/stateless/_deprecated/layers/schema/schema/sequencerunstatechange/marshaller.py b/lib/workload/stateless/_deprecated/layers/schema/schema/sequencerunstatechange/marshaller.py deleted file mode 100644 index 97dcc9fdd..000000000 --- a/lib/workload/stateless/_deprecated/layers/schema/schema/sequencerunstatechange/marshaller.py +++ /dev/null @@ -1,140 +0,0 @@ -import datetime -import re -import six -import schema.sequencerunstatechange - - -class Marshaller: - PRIMITIVE_TYPES = (float, bool, bytes, six.text_type) + six.integer_types - - NATIVE_TYPES_MAPPING = { - "int": int, - "long": int if six.PY3 else long, - "float": float, - "str": str, - "bool": bool, - "date": datetime.date, - "datetime": datetime.datetime, - "object": object, - } - - @classmethod - def marshall(cls, obj): - if obj is None: - return None - elif isinstance(obj, cls.PRIMITIVE_TYPES): - return obj - elif isinstance(obj, list): - return [cls.marshall(sub_obj) for sub_obj in obj] - elif isinstance(obj, tuple): - return tuple(cls.marshall(sub_obj) for sub_obj in obj) - elif isinstance(obj, (datetime.datetime, datetime.date)): - return obj.isoformat() - - if isinstance(obj, dict): - obj_dict = obj - else: - obj_dict = { - obj._attribute_map[attr]: getattr(obj, attr) - for attr, _ in six.iteritems(obj._types) - if getattr(obj, attr) is not None - } - - return {key: cls.marshall(val) for key, val in six.iteritems(obj_dict)} - - @classmethod - def unmarshall(cls, data, typeName): - if data is None: - return None - - if type(typeName) == str: - if typeName.startswith("list["): - sub_kls = re.match(r"list\[(.*)\]", typeName).group(1) - return [cls.unmarshall(sub_data, sub_kls) for sub_data in data] - - if typeName.startswith("dict("): - sub_kls = re.match(r"dict\(([^,]*), (.*)\)", typeName).group(2) - return {k: cls.unmarshall(v, sub_kls) for k, v in six.iteritems(data)} - - if typeName in cls.NATIVE_TYPES_MAPPING: - typeName = cls.NATIVE_TYPES_MAPPING[typeName] - else: - typeName = getattr(schema.sequencerunstatechange, typeName) - - if typeName in cls.PRIMITIVE_TYPES: - return cls.__unmarshall_primitive(data, typeName) - elif typeName == object: - return cls.__unmarshall_object(data) - elif typeName == datetime.date: - return cls.__unmarshall_date(data) - elif typeName == datetime.datetime: - return cls.__unmarshall_datatime(data) - else: - return cls.__unmarshall_model(data, typeName) - - @classmethod - def __unmarshall_primitive(cls, data, typeName): - try: - return typeName(data) - except UnicodeEncodeError: - return six.text_type(data) - except TypeError: - return data - - @classmethod - def __unmarshall_object(cls, value): - return value - - @classmethod - def __unmarshall_date(cls, string): - try: - from dateutil.parser import parse - - return parse(string).date() - except ImportError: - return string - - @classmethod - def __unmarshall_datatime(cls, string): - try: - from dateutil.parser import parse - - return parse(string) - except ImportError: - return string - - @classmethod - def __unmarshall_model(cls, data, typeName): - if not typeName._types and not cls.__hasattr(typeName, "get_real_child_model"): - return data - - kwargs = {} - if typeName._types is not None: - for attr, attr_type in six.iteritems(typeName._types): - if ( - data is not None - and typeName._attribute_map[attr] in data - and isinstance(data, (list, dict)) - ): - value = data[typeName._attribute_map[attr]] - kwargs[attr] = cls.unmarshall(value, attr_type) - - instance = typeName(**kwargs) - - if ( - isinstance(instance, dict) - and typeName._types is not None - and isinstance(data, dict) - ): - for key, value in data.items(): - if key not in typeName._types: - instance[key] = value - if cls.__hasattr(instance, "get_real_child_model"): - type_name = instance.get_real_child_model(data) - if type_name: - instance = cls.unmarshall(data, type_name) - return instance - - @classmethod - def __hasattr(cls, object, name): - return name in object.__class__.__dict__ diff --git a/lib/workload/stateless/_deprecated/layers/schema/schema/weslaunchrequest/Event.py b/lib/workload/stateless/_deprecated/layers/schema/schema/weslaunchrequest/Event.py deleted file mode 100644 index d54b2b526..000000000 --- a/lib/workload/stateless/_deprecated/layers/schema/schema/weslaunchrequest/Event.py +++ /dev/null @@ -1,140 +0,0 @@ -# coding: utf-8 -import pprint -import re # noqa: F401 - -import six -from enum import Enum - - -class Event(object): - _types = { - "workflow_run_name": "str", - "workflow_id": "str", - "workflow_version": "str", - "workflow_input": "str", - "workflow_engine_parameters": "str", - "timestamp": "datetime", - } - - _attribute_map = { - "workflow_run_name": "workflow_run_name", - "workflow_id": "workflow_id", - "workflow_version": "workflow_version", - "workflow_input": "workflow_input", - "workflow_engine_parameters": "workflow_engine_parameters", - "timestamp": "timestamp", - } - - def __init__( - self, - workflow_run_name=None, - workflow_id=None, - workflow_version=None, - workflow_input=None, - workflow_engine_parameters=None, - timestamp=None, - ): # noqa: E501 - self._workflow_run_name = None - self._workflow_id = None - self._workflow_version = None - self._workflow_input = None - self._workflow_engine_parameters = None - self._timestamp = None - self.discriminator = None - self.workflow_run_name = workflow_run_name - self.workflow_id = workflow_id - self.workflow_version = workflow_version - self.workflow_input = workflow_input - self.workflow_engine_parameters = workflow_engine_parameters - self.timestamp = timestamp - - @property - def workflow_run_name(self): - return self._workflow_run_name - - @workflow_run_name.setter - def workflow_run_name(self, workflow_run_name): - self._workflow_run_name = workflow_run_name - - @property - def workflow_id(self): - return self._workflow_id - - @workflow_id.setter - def workflow_id(self, workflow_id): - self._workflow_id = workflow_id - - @property - def workflow_version(self): - return self._workflow_version - - @workflow_version.setter - def workflow_version(self, workflow_version): - self._workflow_version = workflow_version - - @property - def workflow_input(self): - return self._workflow_input - - @workflow_input.setter - def workflow_input(self, workflow_input): - self._workflow_input = workflow_input - - @property - def workflow_engine_parameters(self): - return self._workflow_engine_parameters - - @workflow_engine_parameters.setter - def workflow_engine_parameters(self, workflow_engine_parameters): - self._workflow_engine_parameters = workflow_engine_parameters - - @property - def timestamp(self): - return self._timestamp - - @timestamp.setter - def timestamp(self, timestamp): - self._timestamp = timestamp - - def to_dict(self): - result = {} - - for attr, _ in six.iteritems(self._types): - value = getattr(self, attr) - if isinstance(value, list): - result[attr] = list( - map(lambda x: x.to_dict() if hasattr(x, "to_dict") else x, value) - ) - elif hasattr(value, "to_dict"): - result[attr] = value.to_dict() - elif isinstance(value, dict): - result[attr] = dict( - map( - lambda item: (item[0], item[1].to_dict()) - if hasattr(item[1], "to_dict") - else item, - value.items(), - ) - ) - else: - result[attr] = value - if issubclass(Event, dict): - for key, value in self.items(): - result[key] = value - - return result - - def to_str(self): - return pprint.pformat(self.to_dict()) - - def __repr__(self): - return self.to_str() - - def __eq__(self, other): - if not isinstance(other, Event): - return False - - return self.__dict__ == other.__dict__ - - def __ne__(self, other): - return not self == other diff --git a/lib/workload/stateless/_deprecated/layers/schema/schema/weslaunchrequest/__init__.py b/lib/workload/stateless/_deprecated/layers/schema/schema/weslaunchrequest/__init__.py deleted file mode 100644 index 75d16a2a8..000000000 --- a/lib/workload/stateless/_deprecated/layers/schema/schema/weslaunchrequest/__init__.py +++ /dev/null @@ -1,6 +0,0 @@ -# coding: utf-8 - -from __future__ import absolute_import - -from schema.weslaunchrequest.marshaller import Marshaller -from schema.weslaunchrequest.Event import Event diff --git a/lib/workload/stateless/_deprecated/layers/schema/schema/weslaunchrequest/marshaller.py b/lib/workload/stateless/_deprecated/layers/schema/schema/weslaunchrequest/marshaller.py deleted file mode 100644 index 63e1bebde..000000000 --- a/lib/workload/stateless/_deprecated/layers/schema/schema/weslaunchrequest/marshaller.py +++ /dev/null @@ -1,140 +0,0 @@ -import datetime -import re -import six -import schema.weslaunchrequest - - -class Marshaller: - PRIMITIVE_TYPES = (float, bool, bytes, six.text_type) + six.integer_types - - NATIVE_TYPES_MAPPING = { - "int": int, - "long": int if six.PY3 else long, - "float": float, - "str": str, - "bool": bool, - "date": datetime.date, - "datetime": datetime.datetime, - "object": object, - } - - @classmethod - def marshall(cls, obj): - if obj is None: - return None - elif isinstance(obj, cls.PRIMITIVE_TYPES): - return obj - elif isinstance(obj, list): - return [cls.marshall(sub_obj) for sub_obj in obj] - elif isinstance(obj, tuple): - return tuple(cls.marshall(sub_obj) for sub_obj in obj) - elif isinstance(obj, (datetime.datetime, datetime.date)): - return obj.isoformat() - - if isinstance(obj, dict): - obj_dict = obj - else: - obj_dict = { - obj._attribute_map[attr]: getattr(obj, attr) - for attr, _ in six.iteritems(obj._types) - if getattr(obj, attr) is not None - } - - return {key: cls.marshall(val) for key, val in six.iteritems(obj_dict)} - - @classmethod - def unmarshall(cls, data, typeName): - if data is None: - return None - - if type(typeName) == str: - if typeName.startswith("list["): - sub_kls = re.match(r"list\[(.*)\]", typeName).group(1) - return [cls.unmarshall(sub_data, sub_kls) for sub_data in data] - - if typeName.startswith("dict("): - sub_kls = re.match(r"dict\(([^,]*), (.*)\)", typeName).group(2) - return {k: cls.unmarshall(v, sub_kls) for k, v in six.iteritems(data)} - - if typeName in cls.NATIVE_TYPES_MAPPING: - typeName = cls.NATIVE_TYPES_MAPPING[typeName] - else: - typeName = getattr(schema.weslaunchrequest, typeName) - - if typeName in cls.PRIMITIVE_TYPES: - return cls.__unmarshall_primitive(data, typeName) - elif typeName == object: - return cls.__unmarshall_object(data) - elif typeName == datetime.date: - return cls.__unmarshall_date(data) - elif typeName == datetime.datetime: - return cls.__unmarshall_datatime(data) - else: - return cls.__unmarshall_model(data, typeName) - - @classmethod - def __unmarshall_primitive(cls, data, typeName): - try: - return typeName(data) - except UnicodeEncodeError: - return six.text_type(data) - except TypeError: - return data - - @classmethod - def __unmarshall_object(cls, value): - return value - - @classmethod - def __unmarshall_date(cls, string): - try: - from dateutil.parser import parse - - return parse(string).date() - except ImportError: - return string - - @classmethod - def __unmarshall_datatime(cls, string): - try: - from dateutil.parser import parse - - return parse(string) - except ImportError: - return string - - @classmethod - def __unmarshall_model(cls, data, typeName): - if not typeName._types and not cls.__hasattr(typeName, "get_real_child_model"): - return data - - kwargs = {} - if typeName._types is not None: - for attr, attr_type in six.iteritems(typeName._types): - if ( - data is not None - and typeName._attribute_map[attr] in data - and isinstance(data, (list, dict)) - ): - value = data[typeName._attribute_map[attr]] - kwargs[attr] = cls.unmarshall(value, attr_type) - - instance = typeName(**kwargs) - - if ( - isinstance(instance, dict) - and typeName._types is not None - and isinstance(data, dict) - ): - for key, value in data.items(): - if key not in typeName._types: - instance[key] = value - if cls.__hasattr(instance, "get_real_child_model"): - type_name = instance.get_real_child_model(data) - if type_name: - instance = cls.unmarshall(data, type_name) - return instance - - @classmethod - def __hasattr(cls, object, name): - return name in object.__class__.__dict__ diff --git a/lib/workload/stateless/_deprecated/layers/schema/schema/workflowrequest/Event.py b/lib/workload/stateless/_deprecated/layers/schema/schema/workflowrequest/Event.py deleted file mode 100644 index cc60238d0..000000000 --- a/lib/workload/stateless/_deprecated/layers/schema/schema/workflowrequest/Event.py +++ /dev/null @@ -1,110 +0,0 @@ -# coding: utf-8 -import pprint -import re # noqa: F401 - -import six -from enum import Enum - - -class Event(object): - _types = { - "workflow_type": "str", - "subject_id": "str", - "library_id": "str", - "seq_run_name": "str", - } - - _attribute_map = { - "workflow_type": "workflow_type", - "subject_id": "subject_id", - "library_id": "library_id", - "seq_run_name": "seq_run_name", - } - - def __init__( - self, workflow_type=None, subject_id=None, library_id=None, seq_run_name=None - ): # noqa: E501 - self._workflow_type = None - self._subject_id = None - self._library_id = None - self._seq_run_name = None - self.discriminator = None - self.workflow_type = workflow_type - self.subject_id = subject_id - self.library_id = library_id - self.seq_run_name = seq_run_name - - @property - def workflow_type(self): - return self._workflow_type - - @workflow_type.setter - def workflow_type(self, workflow_type): - self._workflow_type = workflow_type - - @property - def subject_id(self): - return self._subject_id - - @subject_id.setter - def subject_id(self, subject_id): - self._subject_id = subject_id - - @property - def library_id(self): - return self._library_id - - @library_id.setter - def library_id(self, library_id): - self._library_id = library_id - - @property - def seq_run_name(self): - return self._seq_run_name - - @seq_run_name.setter - def seq_run_name(self, seq_run_name): - self._seq_run_name = seq_run_name - - def to_dict(self): - result = {} - - for attr, _ in six.iteritems(self._types): - value = getattr(self, attr) - if isinstance(value, list): - result[attr] = list( - map(lambda x: x.to_dict() if hasattr(x, "to_dict") else x, value) - ) - elif hasattr(value, "to_dict"): - result[attr] = value.to_dict() - elif isinstance(value, dict): - result[attr] = dict( - map( - lambda item: (item[0], item[1].to_dict()) - if hasattr(item[1], "to_dict") - else item, - value.items(), - ) - ) - else: - result[attr] = value - if issubclass(Event, dict): - for key, value in self.items(): - result[key] = value - - return result - - def to_str(self): - return pprint.pformat(self.to_dict()) - - def __repr__(self): - return self.to_str() - - def __eq__(self, other): - if not isinstance(other, Event): - return False - - return self.__dict__ == other.__dict__ - - def __ne__(self, other): - return not self == other diff --git a/lib/workload/stateless/_deprecated/layers/schema/schema/workflowrequest/__init__.py b/lib/workload/stateless/_deprecated/layers/schema/schema/workflowrequest/__init__.py deleted file mode 100644 index cda35aec9..000000000 --- a/lib/workload/stateless/_deprecated/layers/schema/schema/workflowrequest/__init__.py +++ /dev/null @@ -1,6 +0,0 @@ -# coding: utf-8 - -from __future__ import absolute_import - -from schema.workflowrequest.marshaller import Marshaller -from schema.workflowrequest.Event import Event diff --git a/lib/workload/stateless/_deprecated/layers/schema/schema/workflowrequest/marshaller.py b/lib/workload/stateless/_deprecated/layers/schema/schema/workflowrequest/marshaller.py deleted file mode 100644 index 0336fb0d8..000000000 --- a/lib/workload/stateless/_deprecated/layers/schema/schema/workflowrequest/marshaller.py +++ /dev/null @@ -1,140 +0,0 @@ -import datetime -import re -import six -import schema.workflowrequest - - -class Marshaller: - PRIMITIVE_TYPES = (float, bool, bytes, six.text_type) + six.integer_types - - NATIVE_TYPES_MAPPING = { - "int": int, - "long": int if six.PY3 else long, - "float": float, - "str": str, - "bool": bool, - "date": datetime.date, - "datetime": datetime.datetime, - "object": object, - } - - @classmethod - def marshall(cls, obj): - if obj is None: - return None - elif isinstance(obj, cls.PRIMITIVE_TYPES): - return obj - elif isinstance(obj, list): - return [cls.marshall(sub_obj) for sub_obj in obj] - elif isinstance(obj, tuple): - return tuple(cls.marshall(sub_obj) for sub_obj in obj) - elif isinstance(obj, (datetime.datetime, datetime.date)): - return obj.isoformat() - - if isinstance(obj, dict): - obj_dict = obj - else: - obj_dict = { - obj._attribute_map[attr]: getattr(obj, attr) - for attr, _ in six.iteritems(obj._types) - if getattr(obj, attr) is not None - } - - return {key: cls.marshall(val) for key, val in six.iteritems(obj_dict)} - - @classmethod - def unmarshall(cls, data, typeName): - if data is None: - return None - - if type(typeName) == str: - if typeName.startswith("list["): - sub_kls = re.match(r"list\[(.*)\]", typeName).group(1) - return [cls.unmarshall(sub_data, sub_kls) for sub_data in data] - - if typeName.startswith("dict("): - sub_kls = re.match(r"dict\(([^,]*), (.*)\)", typeName).group(2) - return {k: cls.unmarshall(v, sub_kls) for k, v in six.iteritems(data)} - - if typeName in cls.NATIVE_TYPES_MAPPING: - typeName = cls.NATIVE_TYPES_MAPPING[typeName] - else: - typeName = getattr(schema.workflowrequest, typeName) - - if typeName in cls.PRIMITIVE_TYPES: - return cls.__unmarshall_primitive(data, typeName) - elif typeName == object: - return cls.__unmarshall_object(data) - elif typeName == datetime.date: - return cls.__unmarshall_date(data) - elif typeName == datetime.datetime: - return cls.__unmarshall_datatime(data) - else: - return cls.__unmarshall_model(data, typeName) - - @classmethod - def __unmarshall_primitive(cls, data, typeName): - try: - return typeName(data) - except UnicodeEncodeError: - return six.text_type(data) - except TypeError: - return data - - @classmethod - def __unmarshall_object(cls, value): - return value - - @classmethod - def __unmarshall_date(cls, string): - try: - from dateutil.parser import parse - - return parse(string).date() - except ImportError: - return string - - @classmethod - def __unmarshall_datatime(cls, string): - try: - from dateutil.parser import parse - - return parse(string) - except ImportError: - return string - - @classmethod - def __unmarshall_model(cls, data, typeName): - if not typeName._types and not cls.__hasattr(typeName, "get_real_child_model"): - return data - - kwargs = {} - if typeName._types is not None: - for attr, attr_type in six.iteritems(typeName._types): - if ( - data is not None - and typeName._attribute_map[attr] in data - and isinstance(data, (list, dict)) - ): - value = data[typeName._attribute_map[attr]] - kwargs[attr] = cls.unmarshall(value, attr_type) - - instance = typeName(**kwargs) - - if ( - isinstance(instance, dict) - and typeName._types is not None - and isinstance(data, dict) - ): - for key, value in data.items(): - if key not in typeName._types: - instance[key] = value - if cls.__hasattr(instance, "get_real_child_model"): - type_name = instance.get_real_child_model(data) - if type_name: - instance = cls.unmarshall(data, type_name) - return instance - - @classmethod - def __hasattr(cls, object, name): - return name in object.__class__.__dict__ diff --git a/lib/workload/stateless/_deprecated/layers/schema/schema/workflowrunstatechange/Event.py b/lib/workload/stateless/_deprecated/layers/schema/schema/workflowrunstatechange/Event.py deleted file mode 100644 index 7daf9f3d2..000000000 --- a/lib/workload/stateless/_deprecated/layers/schema/schema/workflowrunstatechange/Event.py +++ /dev/null @@ -1,110 +0,0 @@ -# coding: utf-8 -import pprint -import re # noqa: F401 - -import six -from enum import Enum - - -class Event(object): - _types = { - "workflow_run_id": "str", - "workflow_run_name": "str", - "status": "str", - "timestamp": "datetime", - } - - _attribute_map = { - "workflow_run_id": "workflow_run_id", - "workflow_run_name": "workflow_run_name", - "status": "status", - "timestamp": "timestamp", - } - - def __init__( - self, workflow_run_id=None, workflow_run_name=None, status=None, timestamp=None - ): # noqa: E501 - self._workflow_run_id = None - self._workflow_run_name = None - self._status = None - self._timestamp = None - self.discriminator = None - self.workflow_run_id = workflow_run_id - self.workflow_run_name = workflow_run_name - self.status = status - self.timestamp = timestamp - - @property - def workflow_run_id(self): - return self._workflow_run_id - - @workflow_run_id.setter - def workflow_run_id(self, workflow_run_id): - self._workflow_run_id = workflow_run_id - - @property - def workflow_run_name(self): - return self._workflow_run_name - - @workflow_run_name.setter - def workflow_run_name(self, workflow_run_name): - self._workflow_run_name = workflow_run_name - - @property - def status(self): - return self._status - - @status.setter - def status(self, status): - self._status = status - - @property - def timestamp(self): - return self._timestamp - - @timestamp.setter - def timestamp(self, timestamp): - self._timestamp = timestamp - - def to_dict(self): - result = {} - - for attr, _ in six.iteritems(self._types): - value = getattr(self, attr) - if isinstance(value, list): - result[attr] = list( - map(lambda x: x.to_dict() if hasattr(x, "to_dict") else x, value) - ) - elif hasattr(value, "to_dict"): - result[attr] = value.to_dict() - elif isinstance(value, dict): - result[attr] = dict( - map( - lambda item: (item[0], item[1].to_dict()) - if hasattr(item[1], "to_dict") - else item, - value.items(), - ) - ) - else: - result[attr] = value - if issubclass(Event, dict): - for key, value in self.items(): - result[key] = value - - return result - - def to_str(self): - return pprint.pformat(self.to_dict()) - - def __repr__(self): - return self.to_str() - - def __eq__(self, other): - if not isinstance(other, Event): - return False - - return self.__dict__ == other.__dict__ - - def __ne__(self, other): - return not self == other diff --git a/lib/workload/stateless/_deprecated/layers/schema/schema/workflowrunstatechange/__init__.py b/lib/workload/stateless/_deprecated/layers/schema/schema/workflowrunstatechange/__init__.py deleted file mode 100644 index bf61c13ff..000000000 --- a/lib/workload/stateless/_deprecated/layers/schema/schema/workflowrunstatechange/__init__.py +++ /dev/null @@ -1,6 +0,0 @@ -# coding: utf-8 - -from __future__ import absolute_import - -from schema.workflowrunstatechange.marshaller import Marshaller -from schema.workflowrunstatechange.Event import Event diff --git a/lib/workload/stateless/_deprecated/layers/schema/schema/workflowrunstatechange/marshaller.py b/lib/workload/stateless/_deprecated/layers/schema/schema/workflowrunstatechange/marshaller.py deleted file mode 100644 index a044b3f48..000000000 --- a/lib/workload/stateless/_deprecated/layers/schema/schema/workflowrunstatechange/marshaller.py +++ /dev/null @@ -1,140 +0,0 @@ -import datetime -import re -import six -import schema.workflowrunstatechange - - -class Marshaller: - PRIMITIVE_TYPES = (float, bool, bytes, six.text_type) + six.integer_types - - NATIVE_TYPES_MAPPING = { - "int": int, - "long": int if six.PY3 else long, - "float": float, - "str": str, - "bool": bool, - "date": datetime.date, - "datetime": datetime.datetime, - "object": object, - } - - @classmethod - def marshall(cls, obj): - if obj is None: - return None - elif isinstance(obj, cls.PRIMITIVE_TYPES): - return obj - elif isinstance(obj, list): - return [cls.marshall(sub_obj) for sub_obj in obj] - elif isinstance(obj, tuple): - return tuple(cls.marshall(sub_obj) for sub_obj in obj) - elif isinstance(obj, (datetime.datetime, datetime.date)): - return obj.isoformat() - - if isinstance(obj, dict): - obj_dict = obj - else: - obj_dict = { - obj._attribute_map[attr]: getattr(obj, attr) - for attr, _ in six.iteritems(obj._types) - if getattr(obj, attr) is not None - } - - return {key: cls.marshall(val) for key, val in six.iteritems(obj_dict)} - - @classmethod - def unmarshall(cls, data, typeName): - if data is None: - return None - - if type(typeName) == str: - if typeName.startswith("list["): - sub_kls = re.match(r"list\[(.*)\]", typeName).group(1) - return [cls.unmarshall(sub_data, sub_kls) for sub_data in data] - - if typeName.startswith("dict("): - sub_kls = re.match(r"dict\(([^,]*), (.*)\)", typeName).group(2) - return {k: cls.unmarshall(v, sub_kls) for k, v in six.iteritems(data)} - - if typeName in cls.NATIVE_TYPES_MAPPING: - typeName = cls.NATIVE_TYPES_MAPPING[typeName] - else: - typeName = getattr(schema.workflowrunstatechange, typeName) - - if typeName in cls.PRIMITIVE_TYPES: - return cls.__unmarshall_primitive(data, typeName) - elif typeName == object: - return cls.__unmarshall_object(data) - elif typeName == datetime.date: - return cls.__unmarshall_date(data) - elif typeName == datetime.datetime: - return cls.__unmarshall_datatime(data) - else: - return cls.__unmarshall_model(data, typeName) - - @classmethod - def __unmarshall_primitive(cls, data, typeName): - try: - return typeName(data) - except UnicodeEncodeError: - return six.text_type(data) - except TypeError: - return data - - @classmethod - def __unmarshall_object(cls, value): - return value - - @classmethod - def __unmarshall_date(cls, string): - try: - from dateutil.parser import parse - - return parse(string).date() - except ImportError: - return string - - @classmethod - def __unmarshall_datatime(cls, string): - try: - from dateutil.parser import parse - - return parse(string) - except ImportError: - return string - - @classmethod - def __unmarshall_model(cls, data, typeName): - if not typeName._types and not cls.__hasattr(typeName, "get_real_child_model"): - return data - - kwargs = {} - if typeName._types is not None: - for attr, attr_type in six.iteritems(typeName._types): - if ( - data is not None - and typeName._attribute_map[attr] in data - and isinstance(data, (list, dict)) - ): - value = data[typeName._attribute_map[attr]] - kwargs[attr] = cls.unmarshall(value, attr_type) - - instance = typeName(**kwargs) - - if ( - isinstance(instance, dict) - and typeName._types is not None - and isinstance(data, dict) - ): - for key, value in data.items(): - if key not in typeName._types: - instance[key] = value - if cls.__hasattr(instance, "get_real_child_model"): - type_name = instance.get_real_child_model(data) - if type_name: - instance = cls.unmarshall(data, type_name) - return instance - - @classmethod - def __hasattr(cls, object, name): - return name in object.__class__.__dict__ diff --git a/lib/workload/stateless/_deprecated/orchestrator/component.ts b/lib/workload/stateless/_deprecated/orchestrator/component.ts deleted file mode 100644 index 4b34ca06f..000000000 --- a/lib/workload/stateless/_deprecated/orchestrator/component.ts +++ /dev/null @@ -1 +0,0 @@ -// TODO impl diff --git a/lib/workload/stateless/_deprecated/orchestrator/runtime/orchestrator.py b/lib/workload/stateless/_deprecated/orchestrator/runtime/orchestrator.py deleted file mode 100644 index 00413c981..000000000 --- a/lib/workload/stateless/_deprecated/orchestrator/runtime/orchestrator.py +++ /dev/null @@ -1,197 +0,0 @@ -import json -import logging -import eb_util as util -import schema.workflowrequest as wfr -import schema.sequencerunstatechange as srsc -import schema.workflowrunstatechange as wrsc - -logger = logging.getLogger() -logger.setLevel(logging.INFO) - -# TODO: define workflow request event(s) to cater for all workflow types -# TODO: look into preventing endless/recursive loops -# (https://theburningmonk.com/2019/06/aws-lambda-how-to-detect-and-stop-accidental-infinite-recursions/) - - -def handler(event, context): - # Log the received event in CloudWatch - logger.info("Starting orchestratr handler") - logger.info(json.dumps(event)) - - if is_wrsc_event(event): - if is_bcl_convert_event(event): - handle_bcl_convert_event(event) - elif is_dragen_wgs_qc_event(event): - handle_dragen_wgs_qc_event(event) - elif is_dragen_wgs_somatic_event(event): - handle_dragen_wgs_somatic_event(event) - else: - raise ValueError(f"Unsupported workflow/event type: {event}") - elif is_srsc_event(event): - handle_srsc_event(event) - else: - raise ValueError(f"Unsupported event type: {event.get('detail-type')}") - - -def is_wrsc_event(event): - return event.get(util.BusEventKey.DETAIL_TYPE.value) == util.EventType.WRSC.value - - -def is_srsc_event(event): - return event.get(util.BusEventKey.DETAIL_TYPE.value) == util.EventType.SRSC.value - - -def is_bcl_convert_event(event): - payload = event.get(util.BusEventKey.DETAIL.value) - # TODO: unmarshall full object if we only need one value? - wrsc_event: wrsc.Event = wrsc.Marshaller.unmarshall(payload, typeName=wrsc.Event) - wf_name: str = wrsc_event.workflow_run_name - return wf_name.startswith(util.WorkflowType.BCL_CONVERT.value) - # if not payload: - # raise ValueError("No event payload!") - # return payload.get("workflow_run_name").startswith(util.WorkflowType.BCL_CONVERT.value) - - -def get_sequence_run_from_workflow(workflow_run_id): - pass - - -def get_libs_for_sequence_run(sequence_run) -> list: - pass - - -def submit_dragen_wgs_qc_requests(libraries: list): - # TODO: make sure that's not blocking other runs (i.e. a single WF failure should not prevent others) - for lib in libraries: - if lib.type == "WGS": # TODO: - wfr_event = wfr.Event(library_id=lib.library_id) - logger.info(f"Emitting DRAGEN_WGS_QC event with payload {wfr_event}") - util.send_event_to_bus_schema( - event_source=util.EventSource.ORCHESTRATOR, - event_type=util.EventType.DRAGEN_WGS_QC, - event_payload=wfr_event, - ) - - -def submit_dragen_tso_ctdna_requests(libraries: list): - # TODO: make sure that's not blocking other runs (i.e. a single WF failure should not prevent others) - for lib in libraries: - if lib.type == "TSO": # TODO: - wfr_event = wfr.Event(library_id=lib.library_id) - logger.info(f"Emitting DRAGEN_TSO_CTDNA event with payload {wfr_event}") - util.send_event_to_bus_schema( - event_source=util.EventSource.ORCHESTRATOR, - event_type=util.EventType.DRAGEN_TSO_CTDNA, - event_payload=wfr_event, - ) - - -def handle_bcl_convert_event(event): - logger.info("Handling BCL Convert event") - # TODO: this should be already established (remove at some point?) - event_type = event.get(util.BusEventKey.DETAIL_TYPE.value) - if event_type != util.EventType.WRSC.value: - raise ValueError(f"Unsupported event type: {event_type}") - - payload = event.get(util.BusEventKey.DETAIL.value) - wrsc_event: wrsc.Event = wrsc.Marshaller.unmarshall(payload, typeName=wrsc.Event) - - if wrsc_event.status == "Succeeded": - # retrieve SequenceRun from WES workflow run - sequence_run = get_sequence_run_from_workflow(wrsc_event.workflow_run_id) - # retrieve Library records from SequenceRun - libraries: list = get_libs_for_sequence_run(sequence_run) - # perform adequate operation(s) for libraries - submit_dragen_wgs_qc_requests(libraries) - submit_dragen_tso_ctdna_requests(libraries) - - else: - # ignore other status for now - logger.info(f"Received unsupported workflow status: {wrsc_event.status}") - - # Trigger a WGS QC run for all WGS libraries - - -def is_dragen_wgs_qc_event(event): - payload = event.get(util.BusEventKey.DETAIL.value) - if not payload: - raise ValueError("No event payload!") - return payload.get("workflow_run_name").startswith( - util.WorkflowType.DRAGEN_WGS_QC.value - ) - - -def get_lib_id_from_wrsc_event(event: wrsc.Event) -> str: - return event.workflow_run_name[-7:] - - -def handle_dragen_wgs_qc_event(event): - logger.info(f"Handling {util.EventType.DRAGEN_WGS_QC} event") - event_type = event.get(util.BusEventKey.DETAIL_TYPE.value) - if event_type != util.EventType.WRSC.value: - raise ValueError(f"Unsupported event type: {event_type}") - payload = event.get(util.BusEventKey.DETAIL.value) - wrsc_event: wrsc.Event = wrsc.Marshaller.unmarshall(payload, typeName=wrsc.Event) - - if wrsc_event.status == "Succeeded": - logger.info( - f"{util.EventType.DRAGEN_WGS_QC.value} workflow succeeded! Proceeding to T/N." - ) - # only progress some libraries to mock T/N case - lib_id = get_lib_id_from_wrsc_event(wrsc_event) - if lib_id in ["L210001", "L210003"]: - wfr_event = wfr.Event(library_id=lib_id) - logger.info(f"Emitting DRAGEN_WGS_QC event with payload {wfr_event}") - util.send_event_to_bus_schema( - event_source=util.EventSource.ORCHESTRATOR, - event_type=util.EventType.DRAGEN_WGS_SOMATIC, - event_payload=wfr_event, - ) - else: - # ignore other status for now - logger.info(f"Received unsupported workflow status: {wrsc_event.status}") - - -def is_dragen_wgs_somatic_event(event): - payload = event.get(util.BusEventKey.DETAIL.value) - if not payload: - raise ValueError("No event payload!") - return payload.get("workflow_run_name").startswith( - util.WorkflowType.DRAGEN_WGS_SOMATIC.value - ) - - -def handle_dragen_wgs_somatic_event(event): - logger.info(f"Handling {util.EventType.DRAGEN_WGS_SOMATIC} event") - event_type = event.get(util.BusEventKey.DETAIL_TYPE.value) - if event_type != util.EventType.WRSC.value: - raise ValueError(f"Unsupported event type: {event_type}") - payload = event.get(util.BusEventKey.DETAIL.value) - wrsc_event: wrsc.Event = wrsc.Marshaller.unmarshall(payload, typeName=wrsc.Event) - - if wrsc_event.status == "Succeeded": - logger.info( - f"{util.EventType.DRAGEN_WGS_SOMATIC} workflow succeeded! Analysis results available." - ) - else: - # ignore other status for now - logger.info(f"Received unsupported workflow status: {wrsc_event.status}") - - -def handle_srsc_event(event): - logger.info("Handling srsc event") - event_type = event.get(util.BusEventKey.DETAIL_TYPE.value) - if event_type != util.EventType.SRSC.value: - raise ValueError(f"Unsupported event type: {event_type}") - payload = event.get(util.BusEventKey.DETAIL.value) - srsc_event: srsc.Event = srsc.Marshaller.unmarshall(payload, typeName=srsc.Event) - # if payload.get("status") == "PendingAnalysis": - if srsc_event.status == "PendingAnalysis": - logger.info(f"Emitting event with payload {payload}") - # just forward payload (no need to convert) - util.send_event_to_bus( - event_source=util.EventSource.ORCHESTRATOR, - event_type=util.EventType.SRSC, - event_payload=payload, - ) - # ignore other SequenceRunStateChange events diff --git a/lib/workload/stateless/_deprecated/wes_launcher/component.ts b/lib/workload/stateless/_deprecated/wes_launcher/component.ts deleted file mode 100644 index 4b34ca06f..000000000 --- a/lib/workload/stateless/_deprecated/wes_launcher/component.ts +++ /dev/null @@ -1 +0,0 @@ -// TODO impl diff --git a/lib/workload/stateless/_deprecated/wes_launcher/runtime/wes_launcher.py b/lib/workload/stateless/_deprecated/wes_launcher/runtime/wes_launcher.py deleted file mode 100644 index d1b3d7b99..000000000 --- a/lib/workload/stateless/_deprecated/wes_launcher/runtime/wes_launcher.py +++ /dev/null @@ -1,164 +0,0 @@ -import boto3 -import json -import logging -import eb_util as util -import schema.weslaunchrequest as wlr - -logger = logging.getLogger() -logger.setLevel(logging.INFO) -lambda_client = boto3.client("lambda") - - -def create_sqs_event(workflow_name: str, status: str): - return { - "Records": [ - { - "messageId": "123456789", - "receiptHandle": "123456789/123/foo/bar", - "body": json.dumps( - create_wes_ens_body(workflow_name=workflow_name, status=status) - ), - "attributes": { - "ApproximateReceiveCount": "1", - "SentTimestamp": "1625004165382", - "SenderId": "123456789", - "ApproximateFirstReceiveTimestamp": "1625004165389", - }, - "messageAttributes": { - "subscription-urn": { - "stringValue": "urn:ilmn:igp:us-east-1:123456789:subscription:sub.1234", - "stringListValues": [], - "binaryListValues": [], - "dataType": "String", - }, - "spring_json_header_types": { - "stringValue": "some_irrelevant_string", - "stringListValues": [], - "binaryListValues": [], - "dataType": "String", - }, - "action": { - "stringValue": "Succeeded", # hardcoded as we simulate successful WES runs - "stringListValues": [], - "binaryListValues": [], - "dataType": "String", - }, - "type": { - "stringValue": "wes.runs", # hardcoded as we are simulating only this kind of event - "stringListValues": [], - "binaryListValues": [], - "dataType": "String", - }, - "actionDate": { - "stringValue": "2021-06-29T22:02:23.391Z", - "stringListValues": [], - "binaryListValues": [], - "dataType": "String", - }, - "contentType": { - "stringValue": "application/json", - "stringListValues": [], - "binaryListValues": [], - "dataType": "String", - }, - "contentVersion": { - "stringValue": "1", - "stringListValues": [], - "binaryListValues": [], - "dataType": "String", - }, - "producedBy": { - "stringValue": "WorkflowExecutionService", - "stringListValues": [], - "binaryListValues": [], - "dataType": "String", - }, - "__TypeId__": { - "stringValue": "com.illumina.stratus.wes.worker.model.kafka.WesWorkflowRunEvent", - "stringListValues": [], - "binaryListValues": [], - "dataType": "String", - }, - }, - "md5OfMessageAttributes": "123456789", - "md5OfBody": "123456789", - "eventSource": "aws:sqs", - "eventSourceARN": "arn:aws:sqs:ap-southeast-2:123456789:some-sqs-queue", - "awsRegion": "ap-southeast-2", - } - ] - } - - -def create_wes_ens_body(workflow_name, status): - return { - "Timestamp": "2021-06-29T22:02:23.391Z", - "EventType": "RunSucceeded", - "EventDetails": {}, - "WorkflowRun": { - "TenantId": "123456789", - "Status": status, - "TimeModified": "2021-06-29T22:01:43.129128Z", - "Acl": ["cid:123456789", "tid:123456789"], - "WorkflowVersion": { - "Id": "wfv.123456789", - "TimeModified": "2021-04-30T14:38:21.777722Z", - "Language": {"Name": "CWL"}, - "TenantId": "123456789", - "Acl": ["tid:123456789", "cid:123456789", "wid:123456789"], - "Timestamp": "2021-04-30T14:38:21.777722Z", - "MessageKey": "wfv.123456789", - "Version": "3.7.5--1.3.5-43a0e8a", - "Urn": "urn:ilmn:iap:aps2:123456789:workflowversion:wfv.123456789#3.7.5--1.3.5-43a0e8a", - "Status": "Draft", - "TimeCreated": "2021-04-22T10:13:22.770106Z", - "CreatedByClientId": "iap-aps2", - "CreatedBy": "123456789", - "Href": "https://aps2.platform.illumina.com/v1/workflows/wfl.123456789/versions/3.7.5--1.3.5-43a0e8a", - "ModifiedBy": "123456789", - }, - "Id": "wfr.123456789", - "Urn": f"urn:ilmn:iap:aps2:123456789:workflowrun:wfr.123456789#{workflow_name}", - "TimeCreated": "2021-06-29T17:17:14.438741Z", - "StatusSummary": "", - "TimeStarted": "2021-06-29T17:17:16.404574Z", - "CreatedByClientId": "iap-aps2", - "CreatedBy": "123456789", - "Href": "https://aps2.platform.illumina.com/v1/workflows/runs/wfr.123456789", - "TimeStopped": "2021-06-29T22:02:23.391Z", - "ModifiedBy": "bc99b89c-3bb7-334b-80d1-20ef9e65f0b0", - "Name": workflow_name, - }, - "Acl": ["cid:123456789", "tid:123456789"], - "MessageKey": "wfr.123456789", - "PreviousEventId": 123456788, - "EventId": 123456789, - "Name": workflow_name, - } - - -def handler(event, context): - # Log the received event in CloudWatch - logger.info("Starting wes_launcher lambda") - logger.info(f"Received event: {json.dumps(event)}") - - # TODO: send fake WES-SQS event with real structure (simulates real ICA WES response) - payload = event.get(util.BusEventKey.DETAIL.value) - wlr_event: wlr.Event = wlr.Marshaller.unmarshall(payload, typeName=wlr.Event) - # ignore other values that would be used for a real WES launch - - logger.info("Creating ENS event") - ens_event = create_sqs_event( - workflow_name=wlr_event.workflow_run_name, status="Pending" - ) - - # forward event payload to ens lambda - logger.info("Sending ENS event to ens_event_manager:") - logger.info(json.dumps(ens_event)) - response = lambda_client.invoke( - FunctionName="OrcaBus_ens_event_manager", # TODO: hardcoded for mock impl - InvocationType="Event", - Payload=json.dumps(ens_event), - ) - logger.info(f"Lambda invocation response: {response}") - logger.info("All done.")