From 4e454d7fe372749398119be4bdeaef9ec20ecebd Mon Sep 17 00:00:00 2001 From: Alexis Lucattini Date: Tue, 20 Aug 2024 15:41:02 +1000 Subject: [PATCH] Stacky updates for generating umccrise events Added the stateful umccrise stacky database Added the logic parts for generating a umccrise ready event --- config/constants.ts | 2 +- config/stacks/stackyMcStackFace.ts | 3 + .../stacky-mcstackface-dynamodb/index.ts | 10 + .../glue-constructs/index.ts | 39 +++- .../generate_draft_event_payload.py | 4 +- ...complete_to_tn_draft_sfn_template.asl.json | 1 + .../generate_draft_event_payload.py | 7 - ...omplete_to_wts_draft_sfn_template.asl.json | 3 +- .../glue-constructs/pva/index.ts | 148 ++++++++++++++ .../initialise-umccrise-subject-dbs/index.ts | 81 ++++++++ ..._umccrise_subject_db_sfn_template.asl.json | 57 ++++++ .../initialise-umccrise-library-dbs/index.ts | 140 +++++++++++++ ..._umccrise_library_db_sfn_template.asl.json | 188 ++++++++++++++++++ .../update-fastq-list-rows-dbs/index.ts | 87 ++++++++ ...d_fastq_list_rows_db_sfn_template.asl.json | 61 ++++++ .../tn-complete-to-umccrise-draft/index.ts | 169 ++++++++++++++++ .../generate_draft_event_payload.py | 69 +++++++ ...te_to_umccrise_draft_sfn_template.asl.json | 122 ++++++++++++ .../part_5/umccrise-draft-to-ready/index.ts | 104 ++++++++++ 19 files changed, 1279 insertions(+), 16 deletions(-) create mode 100644 lib/workload/stateless/stacks/stacky-mcstackface/glue-constructs/pva/index.ts create mode 100644 lib/workload/stateless/stacks/stacky-mcstackface/glue-constructs/pva/part_1/initialise-umccrise-subject-dbs/index.ts create mode 100644 lib/workload/stateless/stacks/stacky-mcstackface/glue-constructs/pva/part_1/initialise-umccrise-subject-dbs/step_functions_templates/initialise_umccrise_subject_db_sfn_template.asl.json create mode 100644 lib/workload/stateless/stacks/stacky-mcstackface/glue-constructs/pva/part_2/initialise-umccrise-library-dbs/index.ts create mode 100644 lib/workload/stateless/stacks/stacky-mcstackface/glue-constructs/pva/part_2/initialise-umccrise-library-dbs/step_functions_templates/initialise_umccrise_library_db_sfn_template.asl.json create mode 100644 lib/workload/stateless/stacks/stacky-mcstackface/glue-constructs/pva/part_3/update-fastq-list-rows-dbs/index.ts create mode 100644 lib/workload/stateless/stacks/stacky-mcstackface/glue-constructs/pva/part_3/update-fastq-list-rows-dbs/step_functions_templates/add_fastq_list_rows_db_sfn_template.asl.json create mode 100644 lib/workload/stateless/stacks/stacky-mcstackface/glue-constructs/pva/part_4/tn-complete-to-umccrise-draft/index.ts create mode 100644 lib/workload/stateless/stacks/stacky-mcstackface/glue-constructs/pva/part_4/tn-complete-to-umccrise-draft/lambdas/generate_draft_event_payload_py/generate_draft_event_payload.py create mode 100644 lib/workload/stateless/stacks/stacky-mcstackface/glue-constructs/pva/part_4/tn-complete-to-umccrise-draft/step_functions_templates/tn_complete_to_umccrise_draft_sfn_template.asl.json create mode 100644 lib/workload/stateless/stacks/stacky-mcstackface/glue-constructs/pva/part_5/umccrise-draft-to-ready/index.ts diff --git a/config/constants.ts b/config/constants.ts index 2c112b521..0a272f3ee 100644 --- a/config/constants.ts +++ b/config/constants.ts @@ -403,8 +403,8 @@ export const mockInputMakerTableName = 'stacky-input-maker-table'; export const mockCttsov2InputGlueTableName = 'stacky-cttsov2-workflow-glue-table'; export const mockWgtsQcGlueTableName = 'stacky-wgts-qc-glue-table'; export const mockTnGlueTableName = 'stacky-tn-glue-table'; - export const mockWtsGlueTableName = 'stacky-wts-glue-table'; +export const mockUmccriseGlueTableName = 'stacky-umccrise-glue-table'; export const mockWorkflowManagerTableName = 'stacky-workflow-manager-table'; diff --git a/config/stacks/stackyMcStackFace.ts b/config/stacks/stackyMcStackFace.ts index d53ebeb3b..28d919279 100644 --- a/config/stacks/stackyMcStackFace.ts +++ b/config/stacks/stackyMcStackFace.ts @@ -14,6 +14,7 @@ import { AppStage, mockTnGlueTableName, mockWtsGlueTableName, + mockUmccriseGlueTableName, } from '../constants'; import { GlueStackConfig } from '../../lib/workload/stateless/stacks/stacky-mcstackface/glue-constructs'; import { StackyStatefulTablesConfig } from '../../lib/workload/stateful/stacks/stacky-mcstackface-dynamodb'; @@ -36,6 +37,7 @@ export const getGlueStackProps = (stage: AppStage): GlueStackConfig => { wgtsQcGlueTableName: mockWgtsQcGlueTableName, tnGlueTableName: mockTnGlueTableName, wtsGlueTableName: mockWtsGlueTableName, + umccriseGlueTableName: mockUmccriseGlueTableName, /* Secrets */ icav2AccessTokenSecretName: icav2AccessTokenSecretName[stage], }; @@ -50,5 +52,6 @@ export const getStatefulGlueStackProps = (): StackyStatefulTablesConfig => { dynamodbWgtsQcGlueTableName: mockWgtsQcGlueTableName, dynamodbTnGlueTableName: mockTnGlueTableName, dynamodbWtsGlueTableName: mockWtsGlueTableName, + dynamodbUmccriseGlueTableName: mockUmccriseGlueTableName, }; }; diff --git a/lib/workload/stateful/stacks/stacky-mcstackface-dynamodb/index.ts b/lib/workload/stateful/stacks/stacky-mcstackface-dynamodb/index.ts index 2fbccfddd..bba19a0bd 100644 --- a/lib/workload/stateful/stacks/stacky-mcstackface-dynamodb/index.ts +++ b/lib/workload/stateful/stacks/stacky-mcstackface-dynamodb/index.ts @@ -12,6 +12,7 @@ export interface StackyStatefulTablesConfig { dynamodbWgtsQcGlueTableName: string; dynamodbTnGlueTableName: string; dynamodbWtsGlueTableName: string; + dynamodbUmccriseGlueTableName: string; removalPolicy?: RemovalPolicy; } @@ -25,6 +26,7 @@ export class StackyStatefulTablesStack extends Stack { public readonly wgtsQcGlueTable: dynamodb.ITableV2; public readonly tnGlueTable: dynamodb.ITableV2; public readonly wtsGlueTable: dynamodb.ITableV2; + public readonly umccriseGlueTable: dynamodb.ITableV2; constructor(scope: Construct, id: string, props: StackProps & StackyStatefulTablesStackProps) { super(scope, id, props); @@ -95,5 +97,13 @@ export class StackyStatefulTablesStack extends Stack { tableName: props.dynamodbWtsGlueTableName, removalPolicy: props.removalPolicy, }).tableObj; + + /* + Initialise dynamodb table for the umccrise glue service + */ + this.umccriseGlueTable = new DynamodbPartitionedPipelineConstruct(this, 'umccriseGlueTable', { + tableName: props.dynamodbUmccriseGlueTableName, + removalPolicy: props.removalPolicy, + }).tableObj; } } diff --git a/lib/workload/stateless/stacks/stacky-mcstackface/glue-constructs/index.ts b/lib/workload/stateless/stacks/stacky-mcstackface/glue-constructs/index.ts index d4bb2003f..0095a8f27 100644 --- a/lib/workload/stateless/stacks/stacky-mcstackface/glue-constructs/index.ts +++ b/lib/workload/stateless/stacks/stacky-mcstackface/glue-constructs/index.ts @@ -11,6 +11,7 @@ import { Cttsov2GlueHandlerConstruct } from './jb-weld'; import { WgtsQcGlueHandlerConstruct } from './kwik'; import { TnGlueHandlerConstruct } from './loctite'; import { WtsGlueHandlerConstruct } from './mod-podge'; +import { UmccriseGlueHandlerConstruct } from './pva'; /* Provide the glue to get from the bclconvertmanager success event @@ -28,6 +29,7 @@ export interface GlueConstructProps { wgtsQcGlueTableObj: dynamodb.ITableV2; tnGlueTableObj: dynamodb.ITableV2; wtsGlueTableObj: dynamodb.ITableV2; + umccriseGlueTableObj: dynamodb.ITableV2; /* SSM Parameters */ icav2ProjectIdSsmParameterObj: ssm.IStringParameter; bsshOutputFastqCopyOutputUriSsmParameterObj: ssm.IStringParameter; @@ -43,8 +45,8 @@ export class GlueConstruct extends Construct { super(scope, id); /* - Part A: Send 'showered' events if a new samplesheet arrives or new fastq list rows arrive - */ + Part A: Send 'showered' events if a new samplesheet arrives or new fastq list rows arrive + */ const clag = new showerGlueHandlerConstruct(this, 'clag', { /* Event Bus */ eventBusObj: props.eventBusObj, @@ -67,8 +69,8 @@ export class GlueConstruct extends Construct { }); /* - Part C: Connect the bclconvert interop qc - */ + Part C: Connect the bclconvert interop qc + */ const gorilla = new BsshFastqCopyToBclconvertInteropQcConstruct(this, 'gorilla', { /* Event Objects */ eventBusObj: props.eventBusObj, @@ -101,8 +103,8 @@ export class GlueConstruct extends Construct { }); /* - Part E: Plumber-up the WGTS QC Execution Service to the shower services - */ + Part E: Plumber-up the WGTS QC Execution Service to the shower services + */ const kwik = new WgtsQcGlueHandlerConstruct(this, 'kwik', { /* Event Bus */ eventBusObj: props.eventBusObj, @@ -153,6 +155,24 @@ export class GlueConstruct extends Construct { /* Secrets */ icav2AccessTokenSecretObj: props.icav2AccessTokenSecretObj, }); + + /* + Part H: Plumber-up the UMCCRise Execution Service to the shower services + */ + const pva = new UmccriseGlueHandlerConstruct(this, 'pva', { + /* Event Bus */ + eventBusObj: props.eventBusObj, + /* Tables */ + inputMakerTableObj: props.inputMakerTableObj, + umccriseGlueTableObj: props.umccriseGlueTableObj, + /* SSM Parameters */ + icav2ProjectIdSsmParameterObj: props.icav2ProjectIdSsmParameterObj, + analysisOutputUriSsmParameterObj: props.analysisOutputUriSsmParameterObj, + analysisLogsUriSsmParameterObj: props.analysisLogsUriSsmParameterObj, + analysisCacheUriSsmParameterObj: props.analysisCacheUriSsmParameterObj, + /* Secrets */ + icav2AccessTokenSecretObj: props.icav2AccessTokenSecretObj, + }); } } @@ -167,6 +187,7 @@ export interface GlueStackConfig { wgtsQcGlueTableName: string; tnGlueTableName: string; wtsGlueTableName: string; + umccriseGlueTableName: string; /* SSM Parameters */ icav2ProjectIdSsmParameterName: string; bsshOutputFastqCopyUriSsmParameterName: string; @@ -230,6 +251,11 @@ export class GlueStack extends cdk.Stack { 'wtsGlueTableObj', props.wtsGlueTableName ); + const umccriseGlueTableObj = dynamodb.Table.fromTableName( + this, + 'umccriseGlueTableObj', + props.umccriseGlueTableName + ); /* Get the SSM Parameters @@ -285,6 +311,7 @@ export class GlueStack extends cdk.Stack { cttsov2GlueTableObj: cttsov2GlueTableObj, tnGlueTableObj: tnGlueTableObj, wtsGlueTableObj: wtsGlueTableObj, + umccriseGlueTableObj: umccriseGlueTableObj, /* SSM Parameters */ icav2ProjectIdSsmParameterObj: icav2ProjectIdSsmParameterObj, bsshOutputFastqCopyOutputUriSsmParameterObj: bsshOutputFastqCopyUriSsmParameterObj, diff --git a/lib/workload/stateless/stacks/stacky-mcstackface/glue-constructs/loctite/part_5/library-qc-complete-db-to-tn-draft/lambdas/generate_draft_event_payload_py/generate_draft_event_payload.py b/lib/workload/stateless/stacks/stacky-mcstackface/glue-constructs/loctite/part_5/library-qc-complete-db-to-tn-draft/lambdas/generate_draft_event_payload_py/generate_draft_event_payload.py index be4cd3af4..87b2b11b4 100644 --- a/lib/workload/stateless/stacks/stacky-mcstackface/glue-constructs/loctite/part_5/library-qc-complete-db-to-tn-draft/lambdas/generate_draft_event_payload_py/generate_draft_event_payload.py +++ b/lib/workload/stateless/stacks/stacky-mcstackface/glue-constructs/loctite/part_5/library-qc-complete-db-to-tn-draft/lambdas/generate_draft_event_payload_py/generate_draft_event_payload.py @@ -29,6 +29,7 @@ def handler(event, context) -> Dict: :param event: event object :return: draft event payload """ + subject_id = event['subject_id'] tumor_library_id = event['tumor_library_id'] normal_library_id = event['normal_library_id'] @@ -52,8 +53,9 @@ def handler(event, context) -> Dict: "dragenReferenceVersion": DEFAULT_DRAGEN_REFERENCE_VERSION }, "event_tags": { + "subjectId": subject_id, "tumorLibraryId": tumor_library_id, - "normalLibrary_id": normal_library_id, + "normalLibraryId": normal_library_id, "tumorFastqListRowIds": tumor_fastq_list_row_ids, "normalFastqListRowIds": fastq_list_row_ids } diff --git a/lib/workload/stateless/stacks/stacky-mcstackface/glue-constructs/loctite/part_5/library-qc-complete-db-to-tn-draft/step_functions_templates/add_library_qc_complete_to_tn_draft_sfn_template.asl.json b/lib/workload/stateless/stacks/stacky-mcstackface/glue-constructs/loctite/part_5/library-qc-complete-db-to-tn-draft/step_functions_templates/add_library_qc_complete_to_tn_draft_sfn_template.asl.json index ca765cf78..68be74908 100644 --- a/lib/workload/stateless/stacks/stacky-mcstackface/glue-constructs/loctite/part_5/library-qc-complete-db-to-tn-draft/step_functions_templates/add_library_qc_complete_to_tn_draft_sfn_template.asl.json +++ b/lib/workload/stateless/stacks/stacky-mcstackface/glue-constructs/loctite/part_5/library-qc-complete-db-to-tn-draft/step_functions_templates/add_library_qc_complete_to_tn_draft_sfn_template.asl.json @@ -340,6 +340,7 @@ "Parameters": { "FunctionName": "${__generate_draft_event_payload_lambda_function_arn__}", "Payload": { + "subject_id.$": "$.get_library_item_step.Item.subject_id.S", "tumor_library_id.$": "$.get_tn_pair_step.tumor_library.id", "normal_library_id.$": "$.get_tn_pair_step.normal_library.id", "tumor_fastq_list_rows.$": "$.get_parameters_step.tumor_fastq_list_rows", diff --git a/lib/workload/stateless/stacks/stacky-mcstackface/glue-constructs/mod-podge/part_4/library-qc-complete-to-wts-draft/lambdas/generate_draft_event_payload_py/generate_draft_event_payload.py b/lib/workload/stateless/stacks/stacky-mcstackface/glue-constructs/mod-podge/part_4/library-qc-complete-to-wts-draft/lambdas/generate_draft_event_payload_py/generate_draft_event_payload.py index 90420ed4b..0c8eb659e 100644 --- a/lib/workload/stateless/stacks/stacky-mcstackface/glue-constructs/mod-podge/part_4/library-qc-complete-to-wts-draft/lambdas/generate_draft_event_payload_py/generate_draft_event_payload.py +++ b/lib/workload/stateless/stacks/stacky-mcstackface/glue-constructs/mod-podge/part_4/library-qc-complete-to-wts-draft/lambdas/generate_draft_event_payload_py/generate_draft_event_payload.py @@ -1,12 +1,5 @@ #!/usr/bin/env python3 - -# Set the fastq list rows - -# Set the output file prefix - -#!/usr/bin/env python3 - """ Generate draft event payload for the event diff --git a/lib/workload/stateless/stacks/stacky-mcstackface/glue-constructs/mod-podge/part_4/library-qc-complete-to-wts-draft/step_functions_templates/add_library_qc_complete_to_wts_draft_sfn_template.asl.json b/lib/workload/stateless/stacks/stacky-mcstackface/glue-constructs/mod-podge/part_4/library-qc-complete-to-wts-draft/step_functions_templates/add_library_qc_complete_to_wts_draft_sfn_template.asl.json index 2e5b1829f..25aeefde9 100644 --- a/lib/workload/stateless/stacks/stacky-mcstackface/glue-constructs/mod-podge/part_4/library-qc-complete-to-wts-draft/step_functions_templates/add_library_qc_complete_to_wts_draft_sfn_template.asl.json +++ b/lib/workload/stateless/stacks/stacky-mcstackface/glue-constructs/mod-podge/part_4/library-qc-complete-to-wts-draft/step_functions_templates/add_library_qc_complete_to_wts_draft_sfn_template.asl.json @@ -136,7 +136,8 @@ "ResultPath": "$.generate_draft_event_payload_data_step", "Next": "Push WTS Draft Event", "ResultSelector": { - "input_event_data.$": "$.Payload.input_event_data" + "input_event_data.$": "$.Payload.input_event_data", + "event_tags.$": "$.Payload.event_tags" } }, "Push WTS Draft Event": { diff --git a/lib/workload/stateless/stacks/stacky-mcstackface/glue-constructs/pva/index.ts b/lib/workload/stateless/stacks/stacky-mcstackface/glue-constructs/pva/index.ts new file mode 100644 index 000000000..85f3e5699 --- /dev/null +++ b/lib/workload/stateless/stacks/stacky-mcstackface/glue-constructs/pva/index.ts @@ -0,0 +1,148 @@ +import { Construct } from 'constructs'; +import * as events from 'aws-cdk-lib/aws-events'; +import * as dynamodb from 'aws-cdk-lib/aws-dynamodb'; +import * as ssm from 'aws-cdk-lib/aws-ssm'; +import * as secretsManager from 'aws-cdk-lib/aws-secretsmanager'; +import { UmccriseInitialiseSubjectDbRowConstruct } from './part_1/initialise-umccrise-subject-dbs'; +import { UmccriseInitialiseLibraryAndFastqListRowConstruct } from './part_2/initialise-umccrise-library-dbs'; +import { UmccrisePopulateFastqListRowConstruct } from './part_3/update-fastq-list-rows-dbs'; +import { TnCompleteToUmccriseDraftConstruct } from './part_4/tn-complete-to-umccrise-draft'; +import { UmccriseInputMakerConstruct } from './part_5/umccrise-draft-to-ready'; + +/* +Provide the glue to get from the bssh fastq copy manager to submitting wgts qc analyses +*/ + +export interface umccriseGlueHandlerConstructProps { + /* General */ + eventBusObj: events.IEventBus; + /* Tables */ + umccriseGlueTableObj: dynamodb.ITableV2; + inputMakerTableObj: dynamodb.ITableV2; + /* SSM Parameters */ + analysisOutputUriSsmParameterObj: ssm.IStringParameter; + analysisLogsUriSsmParameterObj: ssm.IStringParameter; + analysisCacheUriSsmParameterObj: ssm.IStringParameter; + icav2ProjectIdSsmParameterObj: ssm.IStringParameter; + /* Secrets */ + icav2AccessTokenSecretObj: secretsManager.ISecret; +} + +export class UmccriseGlueHandlerConstruct extends Construct { + constructor(scope: Construct, id: string, props: umccriseGlueHandlerConstructProps) { + super(scope, id); + + /* + Part 1 + + Input Event Source: `orcabus.instrumentrunmanager` + Input Event DetailType: `SamplesheetMetadataUnion` + Input Event status: `SubjectInSamplesheet` + + * Initialise umccrise instrument db construct + */ + const umccrise_initialise_subject = new UmccriseInitialiseSubjectDbRowConstruct( + this, + 'umccrise_initialise_subject', + { + eventBusObj: props.eventBusObj, + tableObj: props.umccriseGlueTableObj, + } + ); + + /* + Part 2 + + Input Event Source: `orcabus.instrumentrunmanager` + Input Event DetailType: `SamplesheetMetadataUnion` + Input Event status: `LibraryInSamplesheet` + + * Initialise umccrise instrument db construct + */ + const umccrise_initialise_library_and_fastq_list_row = + new UmccriseInitialiseLibraryAndFastqListRowConstruct( + this, + 'umccrise_initialise_library_and_fastq_list_row', + { + eventBusObj: props.eventBusObj, + tableObj: props.umccriseGlueTableObj, + } + ); + + /* + Part 3 + + Input Event Source: `orcabus.instrumentrunmanager` + Input Event DetailType: `FastqListRowStateChange` + Input Event status: `newFastqListRow` + + * Populate the fastq list row attributes for the rgid for this workflow + */ + + const umccrise_populate_fastq_list_row = new UmccrisePopulateFastqListRowConstruct( + this, + 'umccrise_populate_fastq_list_row', + { + eventBusObj: props.eventBusObj, + tableObj: props.umccriseGlueTableObj, + } + ); + + /* + Part 4 + + Input Event Source: `orcabus.workflowmanager` + Input Event DetailType: `WorkflowRunStateChange` + Input Event status: `succeeded` + + Output Event source: `orcabus.umccriseinputeventglue` + Output Event DetailType: `WorkflowDraftRunStateChange` + Output Event status: `draft` + + * Populate the fastq list row attributes for the rgid for this workflow + */ + + const tn_to_umccrise_draft = new TnCompleteToUmccriseDraftConstruct( + this, + 'tn_to_umccrise_draft', + { + eventBusObj: props.eventBusObj, + tableObj: props.umccriseGlueTableObj, + workflowsTableObj: props.inputMakerTableObj, + } + ); + + /* + Part 5 + + Input Event source: `orcabus.umccriseinputeventglue` + Input Event DetailType: `WorkflowDraftRunStateChange` + Input Event status: `draft` + + Output Event source: `orcabus.umccriseinputeventglue` + Output Event DetailType: `WorkflowRunStateChange` + Output Event status: `ready` + + * The umccriseInputMaker, subscribes to the umccrise input event glue (itself) and generates a ready event for the umccriseReadySfn + * However, in order to be 'ready' we need to use a few more variables such as + * icaLogsUri, + * analysisOutputUri + * cacheUri + * projectId + * userReference + */ + const umccriseInputMaker = new UmccriseInputMakerConstruct(this, 'fastq_list_row_qc_complete', { + /* Event bus */ + eventBusObj: props.eventBusObj, + /* Tables */ + inputMakerTableObj: props.inputMakerTableObj, + /* SSM Param objects */ + icav2ProjectIdSsmParameterObj: props.icav2ProjectIdSsmParameterObj, + outputUriSsmParameterObj: props.analysisOutputUriSsmParameterObj, + cacheUriSsmParameterObj: props.analysisCacheUriSsmParameterObj, + logsUriSsmParameterObj: props.analysisLogsUriSsmParameterObj, + /* Secrets Manager */ + icav2AccessTokenSecretObj: props.icav2AccessTokenSecretObj, + }); + } +} diff --git a/lib/workload/stateless/stacks/stacky-mcstackface/glue-constructs/pva/part_1/initialise-umccrise-subject-dbs/index.ts b/lib/workload/stateless/stacks/stacky-mcstackface/glue-constructs/pva/part_1/initialise-umccrise-subject-dbs/index.ts new file mode 100644 index 000000000..dfcc7e994 --- /dev/null +++ b/lib/workload/stateless/stacks/stacky-mcstackface/glue-constructs/pva/part_1/initialise-umccrise-subject-dbs/index.ts @@ -0,0 +1,81 @@ +import { Construct } from 'constructs'; +import * as dynamodb from 'aws-cdk-lib/aws-dynamodb'; +import path from 'path'; +import * as sfn from 'aws-cdk-lib/aws-stepfunctions'; +import * as events from 'aws-cdk-lib/aws-events'; +import * as eventsTargets from 'aws-cdk-lib/aws-events-targets'; + +/* +Part 1 + +Input Event Source: `orcabus.instrumentrunmanager` +Input Event DetailType: `SamplesheetMetadataUnion` +Input Event status: `SubjectInSamplesheet` + +* Initialise umccrise subject db construct +*/ + +export interface UmccriseInitialiseSubjectDbRowConstructProps { + tableObj: dynamodb.ITableV2; + eventBusObj: events.IEventBus; +} + +export class UmccriseInitialiseSubjectDbRowConstruct extends Construct { + public readonly UmccriseInitialiseSubjectDbRowMap = { + prefix: 'pva-make-subject-row', + tablePartition: 'subject', + triggerSource: 'orcabus.instrumentrunmanager', + triggerStatus: 'SubjectInSamplesheet', + triggerDetailType: 'SamplesheetMetadataUnion', + }; + + constructor(scope: Construct, id: string, props: UmccriseInitialiseSubjectDbRowConstructProps) { + super(scope, id); + + /* + Part 1: Build the internal sfn + */ + const inputMakerSfn = new sfn.StateMachine(this, 'initialise_subject_db_row', { + stateMachineName: `${this.UmccriseInitialiseSubjectDbRowMap.prefix}-initialise-subject`, + definitionBody: sfn.DefinitionBody.fromFile( + path.join( + __dirname, + 'step_functions_templates', + 'initialise_umccrise_subject_db_sfn_template.asl.json' + ) + ), + definitionSubstitutions: { + __table_name__: props.tableObj.tableName, + __subject_partition_name__: this.UmccriseInitialiseSubjectDbRowMap.tablePartition, + }, + }); + + /* + Part 2: Grant the internal sfn permissions + */ + // access the dynamodb table + props.tableObj.grantReadWriteData(inputMakerSfn.role); + + /* + Part 3: Subscribe to the event bus and trigger the internal sfn + */ + const rule = new events.Rule(this, 'umccrise_subscribe_to_samplesheet_shower_subject', { + ruleName: `stacky-${this.UmccriseInitialiseSubjectDbRowMap.prefix}-rule`, + eventBus: props.eventBusObj, + eventPattern: { + source: [this.UmccriseInitialiseSubjectDbRowMap.triggerSource], + detailType: [this.UmccriseInitialiseSubjectDbRowMap.triggerDetailType], + detail: { + status: [{ 'equals-ignore-case': this.UmccriseInitialiseSubjectDbRowMap.triggerStatus }], + }, + }, + }); + + // Add target of event to be the state machine + rule.addTarget( + new eventsTargets.SfnStateMachine(inputMakerSfn, { + input: events.RuleTargetInput.fromEventPath('$.detail'), + }) + ); + } +} diff --git a/lib/workload/stateless/stacks/stacky-mcstackface/glue-constructs/pva/part_1/initialise-umccrise-subject-dbs/step_functions_templates/initialise_umccrise_subject_db_sfn_template.asl.json b/lib/workload/stateless/stacks/stacky-mcstackface/glue-constructs/pva/part_1/initialise-umccrise-subject-dbs/step_functions_templates/initialise_umccrise_subject_db_sfn_template.asl.json new file mode 100644 index 000000000..796cf7c7d --- /dev/null +++ b/lib/workload/stateless/stacks/stacky-mcstackface/glue-constructs/pva/part_1/initialise-umccrise-subject-dbs/step_functions_templates/initialise_umccrise_subject_db_sfn_template.asl.json @@ -0,0 +1,57 @@ +{ + "Comment": "A description of my state machine", + "StartAt": "Move Inputs", + "States": { + "Move Inputs": { + "Type": "Pass", + "Parameters": { + "payload_data.$": "$.payload.data" + }, + "Next": "Get Subject Item" + }, + "Get Subject Item": { + "Type": "Task", + "Resource": "arn:aws:states:::dynamodb:getItem", + "Parameters": { + "TableName": "${__table_name__}", + "Key": { + "id.$": "$.payload_data.subject.internalId", + "id_type": "${__subject_partition_name__}" + } + }, + "ResultPath": "$.get_subject_item_step", + "Next": "Subject in Database" + }, + "Subject in Database": { + "Type": "Choice", + "Choices": [ + { + "Variable": "$.get_subject_item_step.Item", + "IsPresent": false, + "Comment": "Subject Not In Database", + "Next": "Initialise Subject" + } + ], + "Default": "Pass" + }, + "Pass": { + "Type": "Pass", + "End": true + }, + "Initialise Subject": { + "Type": "Task", + "Resource": "arn:aws:states:::dynamodb:putItem", + "Parameters": { + "TableName": "${__table_name__}", + "Item": { + "id.$": "$.payload_data.subject.internalId", + "id_type": "${__subject_partition_name__}", + "metadata_db_id": { + "N.$": "States.Format('{}', $.payload_data.subject.id)" + } + } + }, + "End": true + } + } +} diff --git a/lib/workload/stateless/stacks/stacky-mcstackface/glue-constructs/pva/part_2/initialise-umccrise-library-dbs/index.ts b/lib/workload/stateless/stacks/stacky-mcstackface/glue-constructs/pva/part_2/initialise-umccrise-library-dbs/index.ts new file mode 100644 index 000000000..2657edf6d --- /dev/null +++ b/lib/workload/stateless/stacks/stacky-mcstackface/glue-constructs/pva/part_2/initialise-umccrise-library-dbs/index.ts @@ -0,0 +1,140 @@ +import { Construct } from 'constructs'; +import * as dynamodb from 'aws-cdk-lib/aws-dynamodb'; +import path from 'path'; +import * as sfn from 'aws-cdk-lib/aws-stepfunctions'; +import * as events from 'aws-cdk-lib/aws-events'; +import * as eventsTargets from 'aws-cdk-lib/aws-events-targets'; + +/* +Part 2 + +Input Event Source: `orcabus.instrumentrunmanager` +Input Event DetailType: `SamplesheetMetadataUnion` +Input Event status: `LibraryInSamplesheet` + +* Initialise umccrise instrument db construct +*/ + +export interface UmccriseInitialiseLibraryAndFastqListRowConstructProps { + tableObj: dynamodb.ITableV2; + eventBusObj: events.IEventBus; +} + +export class UmccriseInitialiseLibraryAndFastqListRowConstruct extends Construct { + public readonly UmccriseInitialiseLibraryAndFastqListRowMap = { + prefix: 'pva-make-library-and-fqlr-row', + tablePartition: { + subject: 'subject', + library: 'library', + fastqListRow: 'fastq_list_row', + }, + triggerSource: 'orcabus.instrumentrunmanager', + triggerStatus: 'LibraryInSamplesheet', + triggerDetailType: 'SamplesheetMetadataUnion', + triggerSampleType: { + WGS: 'WGS', + }, + triggerWorkflowType: { + RESEARCH: 'research', + CLINICAL: 'clinical', + }, + triggerPhenotypeType: { + NORMAL: 'normal', + TUMOR: 'tumor', + }, + }; + + constructor( + scope: Construct, + id: string, + props: UmccriseInitialiseLibraryAndFastqListRowConstructProps + ) { + super(scope, id); + + /* + Part 1: Build the internal sfn + */ + const inputMakerSfn = new sfn.StateMachine(this, 'initialise_umccrise_library_db_row', { + stateMachineName: `${this.UmccriseInitialiseLibraryAndFastqListRowMap.prefix}-initialise-umccrise-library-db`, + definitionBody: sfn.DefinitionBody.fromFile( + path.join( + __dirname, + 'step_functions_templates', + 'initialise_umccrise_library_db_sfn_template.asl.json' + ) + ), + definitionSubstitutions: { + /* General */ + __table_name__: props.tableObj.tableName, + + /* Table Partitions */ + __subject_partition_name__: + this.UmccriseInitialiseLibraryAndFastqListRowMap.tablePartition.subject, + __library_partition_name__: + this.UmccriseInitialiseLibraryAndFastqListRowMap.tablePartition.library, + __fastq_list_row_partition_name__: + this.UmccriseInitialiseLibraryAndFastqListRowMap.tablePartition.fastqListRow, + }, + }); + + /* + Part 2: Grant the sfn permissions + */ + // access the dynamodb table + props.tableObj.grantReadWriteData(inputMakerSfn.role); + + /* + Part 3: Subscribe to the library events from the event bus where the library assay type + is WGS and the workflow is RESEARCH or CLINICAL + and where the phenotype is NORMAL or TUMOR + */ + const rule = new events.Rule(this, 'initialise_library_assay', { + eventBus: props.eventBusObj, + eventPattern: { + source: [this.UmccriseInitialiseLibraryAndFastqListRowMap.triggerSource], + detailType: [this.UmccriseInitialiseLibraryAndFastqListRowMap.triggerDetailType], + detail: { + payload: { + data: { + library: { + type: [ + { + 'equals-ignore-case': + this.UmccriseInitialiseLibraryAndFastqListRowMap.triggerSampleType.WGS, + }, + ], + workflow: [ + { + 'equals-ignore-case': + this.UmccriseInitialiseLibraryAndFastqListRowMap.triggerWorkflowType.RESEARCH, + }, + { + 'equals-ignore-case': + this.UmccriseInitialiseLibraryAndFastqListRowMap.triggerWorkflowType.CLINICAL, + }, + ], + phenotype: [ + { + 'equals-ignore-case': + this.UmccriseInitialiseLibraryAndFastqListRowMap.triggerPhenotypeType.NORMAL, + }, + { + 'equals-ignore-case': + this.UmccriseInitialiseLibraryAndFastqListRowMap.triggerPhenotypeType.TUMOR, + }, + ], + }, + }, + }, + }, + }, + }); + + // Add target of event to be the state machine + rule.addTarget( + new eventsTargets.SfnStateMachine(inputMakerSfn, { + input: events.RuleTargetInput.fromEventPath('$.detail'), + }) + ); + } +} diff --git a/lib/workload/stateless/stacks/stacky-mcstackface/glue-constructs/pva/part_2/initialise-umccrise-library-dbs/step_functions_templates/initialise_umccrise_library_db_sfn_template.asl.json b/lib/workload/stateless/stacks/stacky-mcstackface/glue-constructs/pva/part_2/initialise-umccrise-library-dbs/step_functions_templates/initialise_umccrise_library_db_sfn_template.asl.json new file mode 100644 index 000000000..6d65c99f9 --- /dev/null +++ b/lib/workload/stateless/stacks/stacky-mcstackface/glue-constructs/pva/part_2/initialise-umccrise-library-dbs/step_functions_templates/initialise_umccrise_library_db_sfn_template.asl.json @@ -0,0 +1,188 @@ +{ + "Comment": "A description of my state machine", + "StartAt": "Move Inputs", + "States": { + "Move Inputs": { + "Type": "Pass", + "Parameters": { + "payload_data.$": "$.payload.data" + }, + "Next": "Get Library Item" + }, + "Get Library Item": { + "Type": "Task", + "Resource": "arn:aws:states:::dynamodb:getItem", + "Parameters": { + "TableName": "${__table_name__}", + "Key": { + "id.$": "$.payload_data.library.internalId", + "id_type": "${__library_partition_name__}" + } + }, + "ResultPath": "$.get_library_item_step", + "Next": "Update Databases" + }, + "Update Databases": { + "Type": "Parallel", + "Branches": [ + { + "StartAt": "Add Library to Subject", + "States": { + "Add Library to Subject": { + "Type": "Task", + "Resource": "arn:aws:states:::dynamodb:updateItem", + "Parameters": { + "TableName": "${__table_name__}", + "Key": { + "id.$": "$.payload_data.library.subject.internalId", + "id_type": "${__subject_partition_name__}" + }, + "UpdateExpression": "ADD library_set :library_set", + "ExpressionAttributeValues": { + ":library_set": { + "SS.$": "States.Array($.payload_data.library.internalId)" + } + } + }, + "ResultPath": null, + "End": true + } + } + }, + { + "StartAt": "Library in Database", + "States": { + "Library in Database": { + "Type": "Choice", + "Choices": [ + { + "Variable": "$.get_library_item_step.Item", + "IsPresent": false, + "Comment": "Library Not In Database", + "Next": "Initialise Library" + } + ], + "Default": "No Need to Initialise Library" + }, + "Initialise Library": { + "Type": "Task", + "Resource": "arn:aws:states:::dynamodb:putItem", + "Parameters": { + "TableName": "${__table_name__}", + "Item": { + "id.$": "$.payload_data.library.internalId", + "id_type": "${__library_partition_name__}", + "metadata_db_id": { + "N.$": "States.Format('{}', $.payload_data.library.id)" + }, + "phenotype": { + "S.$": "$.payload_data.library.phenotype" + }, + "workflow": { + "S.$": "$.payload_data.library.workflow" + }, + "type": { + "S.$": "$.payload_data.library.type" + }, + "assay": { + "S.$": "$.payload_data.library.assay" + }, + "subject_id": { + "S.$": "$.payload_data.library.subject.internalId" + } + } + }, + "ResultPath": null, + "End": true + }, + "No Need to Initialise Library": { + "Type": "Pass", + "End": true, + "ResultPath": null + } + } + } + ], + "Next": "Initialise Fastq List Rows", + "ResultPath": null + }, + "Initialise Fastq List Rows": { + "Type": "Map", + "ItemsPath": "$.payload_data.bclconvertDataRows", + "ItemSelector": { + "bclconvert_data_row.$": "$$.Map.Item.Value", + "instrument_run_id.$": "$.payload_data.instrumentRunId", + "library_id.$": "$.payload_data.library.internalId" + }, + "ItemProcessor": { + "ProcessorConfig": { + "Mode": "INLINE" + }, + "StartAt": "Get Fastq List Row Id", + "States": { + "Get Fastq List Row Id": { + "Type": "Pass", + "Next": "Initialise Fastq List Row and Update Library", + "Parameters": { + "fastq_list_row_id.$": "States.Format('{}.{}.{}.{}.{}', $.bclconvert_data_row.index, $.bclconvert_data_row.index2, $.bclconvert_data_row.lane, $.instrument_run_id, $.bclconvert_data_row.sampleId)" + }, + "ResultPath": "$.get_fastq_list_row_id_step" + }, + "Initialise Fastq List Row and Update Library": { + "Type": "Parallel", + "Branches": [ + { + "StartAt": "Initialise Fastq List Row", + "States": { + "Initialise Fastq List Row": { + "Type": "Task", + "Resource": "arn:aws:states:::dynamodb:putItem", + "Parameters": { + "TableName": "${__table_name__}", + "Item": { + "id.$": "$.get_fastq_list_row_id_step.fastq_list_row_id", + "id_type": "${__fastq_list_row_partition_name__}", + "library_id": { + "S.$": "$.library_id" + } + } + }, + "ResultPath": null, + "End": true + } + } + }, + { + "StartAt": "Append Fastq List Row to Library", + "States": { + "Append Fastq List Row to Library": { + "Type": "Task", + "Resource": "arn:aws:states:::dynamodb:updateItem", + "Parameters": { + "TableName": "${__table_name__}", + "Key": { + "id.$": "$.library_id", + "id_type": "${__library_partition_name__}" + }, + "UpdateExpression": "ADD fastq_list_row_id_set :fastq_list_row_id_set", + "ExpressionAttributeValues": { + ":fastq_list_row_id_set": { + "SS.$": "States.Array($.get_fastq_list_row_id_step.fastq_list_row_id)" + } + } + }, + "End": true, + "ResultPath": null + } + } + } + ], + "End": true + } + } + }, + "End": true, + "ResultPath": null + } + } +} diff --git a/lib/workload/stateless/stacks/stacky-mcstackface/glue-constructs/pva/part_3/update-fastq-list-rows-dbs/index.ts b/lib/workload/stateless/stacks/stacky-mcstackface/glue-constructs/pva/part_3/update-fastq-list-rows-dbs/index.ts new file mode 100644 index 000000000..890bbbf92 --- /dev/null +++ b/lib/workload/stateless/stacks/stacky-mcstackface/glue-constructs/pva/part_3/update-fastq-list-rows-dbs/index.ts @@ -0,0 +1,87 @@ +import { Construct } from 'constructs'; +import * as dynamodb from 'aws-cdk-lib/aws-dynamodb'; +import path from 'path'; +import * as sfn from 'aws-cdk-lib/aws-stepfunctions'; +import * as events from 'aws-cdk-lib/aws-events'; +import * as eventsTargets from 'aws-cdk-lib/aws-events-targets'; + +/* +Part 3 + +Input Event Source: `orcabus.instrumentrunmanager` +Input Event DetailType: `FastqListRowStateChange` +Input Event status: `newFastqListRow` + +* Populate the fastq list row attributes for the rgid for this workflow +*/ + +export interface UmccrisePopulateFastqListRowDbRowConstructProps { + tableObj: dynamodb.ITableV2; + eventBusObj: events.IEventBus; +} + +export class UmccrisePopulateFastqListRowConstruct extends Construct { + public readonly UmccrisePopulateFastqListRowDbRowMap = { + prefix: 'pva-populate-fqlr-row', + tablePartition: 'fastq_list_row', + triggerSource: 'orcabus.instrumentrunmanager', + triggerStatus: 'newFastqListRow', + triggerDetailType: 'FastqListRowStateChange', + }; + + constructor( + scope: Construct, + id: string, + props: UmccrisePopulateFastqListRowDbRowConstructProps + ) { + super(scope, id); + + /* + Part 1: Build the internal sfn + */ + const inputMakerSfn = new sfn.StateMachine(this, 'update_fastq-list-row_db_row', { + stateMachineName: `${this.UmccrisePopulateFastqListRowDbRowMap.prefix}-sfn`, + definitionBody: sfn.DefinitionBody.fromFile( + path.join( + __dirname, + 'step_functions_templates', + 'add_fastq_list_rows_db_sfn_template.asl.json' + ) + ), + definitionSubstitutions: { + __table_name__: props.tableObj.tableName, + __fastq_list_row_partition_name__: this.UmccrisePopulateFastqListRowDbRowMap.tablePartition, + }, + }); + + /* + Part 2: Grant the internal sfn permissions + */ + // access the dynamodb table + props.tableObj.grantReadWriteData(inputMakerSfn.role); + + /* + Part 3: Subscribe to the event bus for this event type + */ + const rule = new events.Rule(this, 'umccrise_populate_fastq_list_row', { + ruleName: `stacky-${this.UmccrisePopulateFastqListRowDbRowMap.prefix}-event-rule`, + eventBus: props.eventBusObj, + eventPattern: { + source: [this.UmccrisePopulateFastqListRowDbRowMap.triggerSource], + detailType: [this.UmccrisePopulateFastqListRowDbRowMap.triggerDetailType], + detail: { + status: [ + { 'equals-ignore-case': this.UmccrisePopulateFastqListRowDbRowMap.triggerStatus }, + ], + }, + }, + }); + + // Add target of event to be the state machine + rule.addTarget( + new eventsTargets.SfnStateMachine(inputMakerSfn, { + input: events.RuleTargetInput.fromEventPath('$.detail'), + }) + ); + } +} diff --git a/lib/workload/stateless/stacks/stacky-mcstackface/glue-constructs/pva/part_3/update-fastq-list-rows-dbs/step_functions_templates/add_fastq_list_rows_db_sfn_template.asl.json b/lib/workload/stateless/stacks/stacky-mcstackface/glue-constructs/pva/part_3/update-fastq-list-rows-dbs/step_functions_templates/add_fastq_list_rows_db_sfn_template.asl.json new file mode 100644 index 000000000..45e63d437 --- /dev/null +++ b/lib/workload/stateless/stacks/stacky-mcstackface/glue-constructs/pva/part_3/update-fastq-list-rows-dbs/step_functions_templates/add_fastq_list_rows_db_sfn_template.asl.json @@ -0,0 +1,61 @@ +{ + "Comment": "A description of my state machine", + "StartAt": "Move Inputs", + "States": { + "Move Inputs": { + "Type": "Pass", + "Parameters": { + "payload_data.$": "$.payload.data" + }, + "Next": "Get Fastq List Row Item" + }, + "Get Fastq List Row Item": { + "Type": "Task", + "Resource": "arn:aws:states:::dynamodb:getItem", + "Parameters": { + "TableName": "${__table_name__}", + "Key": { + "id.$": "$.payload_data.id", + "id_type": "${__fastq_list_row_partition_name__}" + } + }, + "ResultPath": "$.get_fastq_list_row_item_step", + "Next": "Is Fastq List Row In Db" + }, + "Is Fastq List Row In Db": { + "Type": "Choice", + "Choices": [ + { + "Variable": "$.get_fastq_list_row_item_step.Item", + "IsPresent": true, + "Comment": "Fastq List Row In Database", + "Next": "Populate Fastq List Row" + } + ], + "Default": "Pass" + }, + "Populate Fastq List Row": { + "Type": "Task", + "Resource": "arn:aws:states:::dynamodb:updateItem", + "Parameters": { + "TableName": "${__table_name__}", + "Key": { + "id.$": "$.payload_data.id", + "id_type": "${__fastq_list_row_partition_name__}" + }, + "UpdateExpression": "SET fastq_list_row_json = :fastq_list_row_json", + "ExpressionAttributeValues": { + ":fastq_list_row_json": { + "S.$": "States.JsonToString($.payload_data.fastqListRow)" + } + } + }, + "ResultPath": null, + "End": true + }, + "Pass": { + "Type": "Pass", + "End": true + } + } +} diff --git a/lib/workload/stateless/stacks/stacky-mcstackface/glue-constructs/pva/part_4/tn-complete-to-umccrise-draft/index.ts b/lib/workload/stateless/stacks/stacky-mcstackface/glue-constructs/pva/part_4/tn-complete-to-umccrise-draft/index.ts new file mode 100644 index 000000000..603c4c628 --- /dev/null +++ b/lib/workload/stateless/stacks/stacky-mcstackface/glue-constructs/pva/part_4/tn-complete-to-umccrise-draft/index.ts @@ -0,0 +1,169 @@ +import { Construct } from 'constructs'; +import * as cdk from 'aws-cdk-lib'; +import * as iam from 'aws-cdk-lib/aws-iam'; +import * as dynamodb from 'aws-cdk-lib/aws-dynamodb'; +import path from 'path'; +import * as sfn from 'aws-cdk-lib/aws-stepfunctions'; +import * as events from 'aws-cdk-lib/aws-events'; +import * as eventsTargets from 'aws-cdk-lib/aws-events-targets'; +import { PythonFunction } from '@aws-cdk/aws-lambda-python-alpha'; +import * as lambda from 'aws-cdk-lib/aws-lambda'; +import { WorkflowDraftRunStateChangeCommonPreambleConstruct } from '../../../../../../../components/sfn-workflowdraftrunstatechange-common-preamble'; + +/* +Part 4 + +Input Event Source: `orcabus.workflowmanager` +Input Event DetailType: `WorkflowRunStateChange` +Input Event WorkflowName: tumor_normal +Input Event status: `succeeded` + +Output Event Source: `orcabus.umccriseinputeventglue` +Output Event DetailType: `WorkflowDraftRunStateChange` +Output Event status: `draft` + +* Subscribe to the workflow manager succeeded event for tumor normal libraries. +* Launch a draft event for the umccrise pipeline +*/ + +export interface TnCompleteToUmccriseDraftConstructProps { + eventBusObj: events.IEventBus; + tableObj: dynamodb.ITableV2; + workflowsTableObj: dynamodb.ITableV2; +} + +export class TnCompleteToUmccriseDraftConstruct extends Construct { + public readonly UmccriseDraftMap = { + prefix: 'pva-tn-complete-to-umccrise-draft', + portalRunPartitionName: 'portal_run', + triggerSource: 'orcabus.workflowmanager', + triggerStatus: 'succeeded', + triggerWorkflowName: 'tumor_normal', + triggerDetailType: 'WorkflowRunStateChange', + outputSource: 'orcabus.umccriseinputeventglue', + outputDetailType: 'WorkflowDraftRunStateChange', + outputStatus: 'draft', + payloadVersion: '2024.07.23', + workflowName: 'umccrise', + workflowVersion: '4.2.4', + }; + + constructor(scope: Construct, id: string, props: TnCompleteToUmccriseDraftConstructProps) { + super(scope, id); + + /* + Part 1: Build the lambdas + */ + // Generate event data lambda object + const generateEventDataLambdaObj = new PythonFunction(this, 'generate_draft_event_payload_py', { + entry: path.join(__dirname, 'lambdas', 'generate_draft_event_payload_py'), + runtime: lambda.Runtime.PYTHON_3_12, + architecture: lambda.Architecture.ARM_64, + index: 'generate_draft_event_payload.py', + handler: 'handler', + memorySize: 1024, + }); + + /* + Part 1: Generate the preamble (sfn to generate the portal run id and the workflow run name) + */ + const sfn_preamble = new WorkflowDraftRunStateChangeCommonPreambleConstruct( + this, + `${this.UmccriseDraftMap.prefix}_sfn_preamble`, + { + portalRunTablePartitionName: this.UmccriseDraftMap.portalRunPartitionName, + stateMachinePrefix: this.UmccriseDraftMap.prefix, + tableObj: props.workflowsTableObj, + workflowName: this.UmccriseDraftMap.workflowName, + workflowVersion: this.UmccriseDraftMap.workflowVersion, + } + ).stepFunctionObj; + + /* + Part 2: Build the sfn + */ + const qcCompleteToDraftSfn = new sfn.StateMachine(this, 'tn_complete_to_umccrise_draft_sfn', { + stateMachineName: `${this.UmccriseDraftMap.prefix}-sfn`, + definitionBody: sfn.DefinitionBody.fromFile( + path.join( + __dirname, + 'step_functions_templates', + 'tn_complete_to_umccrise_draft_sfn_template.asl.json' + ) + ), + definitionSubstitutions: { + /* Events */ + __event_bus_name__: props.eventBusObj.eventBusName, + __event_source__: this.UmccriseDraftMap.outputSource, + __detail_type__: this.UmccriseDraftMap.outputDetailType, + __output_status__: this.UmccriseDraftMap.outputStatus, + __payload_version__: this.UmccriseDraftMap.payloadVersion, + __workflow_name__: this.UmccriseDraftMap.workflowName, + __workflow_version__: this.UmccriseDraftMap.workflowVersion, + + /* Lambdas */ + __generate_draft_event_payload_lambda_function_arn__: + generateEventDataLambdaObj.currentVersion.functionArn, + + /* Tables */ + __table_name__: props.tableObj.tableName, + + // State Machines + __sfn_preamble_state_machine_arn__: sfn_preamble.stateMachineArn, + }, + }); + + /* + Part 2: Grant the sfn permissions + */ + // access the dynamodb table + props.tableObj.grantReadWriteData(qcCompleteToDraftSfn); + + // allow the step function to submit events + props.eventBusObj.grantPutEventsTo(qcCompleteToDraftSfn); + + // allow the step function to invoke the lambdas + generateEventDataLambdaObj.currentVersion.grantInvoke(qcCompleteToDraftSfn); + + /* Allow step function to call nested state machine */ + // Because we run a nested state machine, we need to add the permissions to the state machine role + // See https://stackoverflow.com/questions/60612853/nested-step-function-in-a-step-function-unknown-error-not-authorized-to-cr + qcCompleteToDraftSfn.addToRolePolicy( + new iam.PolicyStatement({ + resources: [ + `arn:aws:events:${cdk.Aws.REGION}:${cdk.Aws.ACCOUNT_ID}:rule/StepFunctionsGetEventsForStepFunctionsExecutionRule`, + ], + actions: ['events:PutTargets', 'events:PutRule', 'events:DescribeRule'], + }) + ); + // Allow the state machine to be able to invoke the preamble sfn + sfn_preamble.grantStartExecution(qcCompleteToDraftSfn); + + /* + Part 3: Subscribe to the event bus and trigger the internal sfn + */ + const rule = new events.Rule(this, 'tn_complete_to_umccrise_draft_rule', { + ruleName: `stacky-${this.UmccriseDraftMap.prefix}-rule`, + eventBus: props.eventBusObj, + eventPattern: { + source: [this.UmccriseDraftMap.triggerSource], + detailType: [this.UmccriseDraftMap.triggerDetailType], + detail: { + status: [{ 'equals-ignore-case': this.UmccriseDraftMap.triggerStatus }], + workflowName: [ + { + 'equals-ignore-case': this.UmccriseDraftMap.triggerWorkflowName, + }, + ], + }, + }, + }); + + // Add target of event to be the state machine + rule.addTarget( + new eventsTargets.SfnStateMachine(qcCompleteToDraftSfn, { + input: events.RuleTargetInput.fromEventPath('$.detail'), + }) + ); + } +} diff --git a/lib/workload/stateless/stacks/stacky-mcstackface/glue-constructs/pva/part_4/tn-complete-to-umccrise-draft/lambdas/generate_draft_event_payload_py/generate_draft_event_payload.py b/lib/workload/stateless/stacks/stacky-mcstackface/glue-constructs/pva/part_4/tn-complete-to-umccrise-draft/lambdas/generate_draft_event_payload_py/generate_draft_event_payload.py new file mode 100644 index 000000000..043bd1660 --- /dev/null +++ b/lib/workload/stateless/stacks/stacky-mcstackface/glue-constructs/pva/part_4/tn-complete-to-umccrise-draft/lambdas/generate_draft_event_payload_py/generate_draft_event_payload.py @@ -0,0 +1,69 @@ +#!/usr/bin/env python3 + +""" +Generate draft event payload for the umccrise event + +Given the following parameters, generate the draft payload for the umccrise event + +{ + subject_id + tumor_library_id + normal_library_id + tumor_fastq_list_row_ids + normal_fastq_list_row_ids + dragen_somatic_output_s3_uri + dragen_germline_output_s3_uri +} + +We need the inputs + +subjectId +dragenSomaticLibraryId +dragenGermlineLibraryId +dragenSomaticOutputUri +dragenGermlineOutputUri + +And tags + +subjectId +tumorLibraryId +normalLibraryId +tumorFastqListRowIds +normalFastqListRowIds + + +""" +from typing import List, Dict + + +def handler(event, context) -> Dict: + """ + Generate draft event payload for the event + :param event: event object + :return: draft event payload + """ + + subject_id = event['subject_id'] + tumor_library_id = event['tumor_library_id'] + normal_library_id = event['normal_library_id'] + dragen_somatic_output_s3_uri = event['dragen_somatic_output_s3_uri'] + dragen_germline_output_s3_uri = event['dragen_germline_output_s3_uri'] + tumor_fastq_list_row_ids: List[str] = event['tumor_fastq_list_row_ids'] + normal_fastq_list_row_ids: List[str] = event['normal_fastq_list_row_ids'] + + return { + "input_event_data": { + "subjectId": subject_id, + "dragenSomaticLibraryId": tumor_library_id, + "dragenGermlineLibraryId": normal_library_id, + "dragenSomaticOutputUri": dragen_somatic_output_s3_uri, + "dragenGermlineOutputUri": dragen_germline_output_s3_uri, + }, + "event_tags": { + "subjectId": subject_id, + "tumorLibraryId": tumor_library_id, + "normalLibraryId": normal_library_id, + "tumorFastqListRowIds": tumor_fastq_list_row_ids, + "normalFastqListRowIds": normal_fastq_list_row_ids + } + } diff --git a/lib/workload/stateless/stacks/stacky-mcstackface/glue-constructs/pva/part_4/tn-complete-to-umccrise-draft/step_functions_templates/tn_complete_to_umccrise_draft_sfn_template.asl.json b/lib/workload/stateless/stacks/stacky-mcstackface/glue-constructs/pva/part_4/tn-complete-to-umccrise-draft/step_functions_templates/tn_complete_to_umccrise_draft_sfn_template.asl.json new file mode 100644 index 000000000..fbfa0a0ac --- /dev/null +++ b/lib/workload/stateless/stacks/stacky-mcstackface/glue-constructs/pva/part_4/tn-complete-to-umccrise-draft/step_functions_templates/tn_complete_to_umccrise_draft_sfn_template.asl.json @@ -0,0 +1,122 @@ +{ + "Comment": "A description of my state machine", + "StartAt": "Move Inputs", + "States": { + "Move Inputs": { + "Type": "Pass", + "Parameters": { + "payload_data.$": "$.payload.data" + }, + "Next": "Get Subject Item" + }, + "Get Subject Item": { + "Type": "Task", + "Resource": "arn:aws:states:::dynamodb:getItem", + "Parameters": { + "TableName": "${__table_name__}", + "Key": { + "id.$": "$.payload_data.tags.subjectId", + "id_type": "${__subject_partition_name__}" + } + }, + "ResultPath": "$.get_subject_item_step", + "Next": "Subject Item In DataBase" + }, + "Subject Item In DataBase": { + "Type": "Choice", + "Choices": [ + { + "Variable": "$.get_subject_item_step.Item", + "IsPresent": true, + "Comment": "Subject Item In DataBase", + "Next": "Get Portal Run Id and Workflow Run Name" + } + ], + "Default": "Not an automated library" + }, + "Get Portal Run Id and Workflow Run Name": { + "Type": "Task", + "Resource": "arn:aws:states:::states:startExecution.sync:2", + "Parameters": { + "StateMachineArn": "${__sfn_preamble_state_machine_arn__}", + "Input": {} + }, + "ResultSelector": { + "portal_run_id.$": "$.Output.portal_run_id", + "workflow_run_name.$": "$.Output.workflow_run_name" + }, + "ResultPath": "$.get_portal_and_run_name_step", + "Next": "Generate Draft Event Payload" + }, + "Generate Draft Event Payload": { + "Type": "Task", + "Resource": "arn:aws:states:::lambda:invoke", + "Parameters": { + "FunctionName": "${__generate_draft_event_payload_lambda_function_arn__}", + "Payload": { + "subject_id.$": "$.payload_data.tags.subjectId", + "tumor_library_id.$": "$.payload_data.tags.tumorLibraryId", + "normal_library_id.$": "$.payload_data.tags.normalLibraryId", + "tumor_fastq_list_row_ids.$": "$.payload_data.tags.tumorFastqListRowIds", + "normal_fastq_list_row_ids.$": "$.payload_data.tags.normalFastqListRowIds", + "dragen_somatic_output_s3_uri.$": "$.payload_data.outputs.dragenSomaticOutputUri", + "dragen_germline_output_s3_uri.$": "$.payload_data.outputs.dragenGermlineOutputUri" + } + }, + "Retry": [ + { + "ErrorEquals": [ + "Lambda.ServiceException", + "Lambda.AWSLambdaException", + "Lambda.SdkClientException", + "Lambda.TooManyRequestsException" + ], + "IntervalSeconds": 1, + "MaxAttempts": 3, + "BackoffRate": 2 + } + ], + "ResultSelector": { + "input_event_data.$": "$.Payload.input_event_data", + "event_tags.$": "$.Payload.event_tags" + }, + "ResultPath": "$.generate_draft_event_payload_data_step", + "Next": "Push UMCCRise Draft Event" + }, + "Push UMCCRise Draft Event": { + "Type": "Task", + "Resource": "arn:aws:states:::events:putEvents", + "Parameters": { + "Entries": [ + { + "Detail": { + "portalRunId.$": "$.get_parameters_step.portal_run_id", + "timestamp.$": "$$.State.EnteredTime", + "status": "${__output_status__}", + "workflowName": "${__workflow_name__}", + "workflowVersion": "${__workflow_version__}", + "workflowRunName.$": "$.get_parameters_step.workflow_run_name", + "payload": { + "refId": null, + "version": "${__payload_version__}", + "data": { + "inputs.$": "$.generate_draft_event_payload_data_step.input_event_data", + "tags.$": "$.generate_draft_event_payload_data_step.event_tags" + } + } + }, + "DetailType": "${__detail_type__}", + "EventBusName": "${__event_bus_name__}", + "Source": "${__event_source__}" + } + ] + }, + "ResultPath": null, + "End": true + }, + "Not an automated library": { + "Type": "Pass", + "End": true + } + } +} diff --git a/lib/workload/stateless/stacks/stacky-mcstackface/glue-constructs/pva/part_5/umccrise-draft-to-ready/index.ts b/lib/workload/stateless/stacks/stacky-mcstackface/glue-constructs/pva/part_5/umccrise-draft-to-ready/index.ts new file mode 100644 index 000000000..6ee442242 --- /dev/null +++ b/lib/workload/stateless/stacks/stacky-mcstackface/glue-constructs/pva/part_5/umccrise-draft-to-ready/index.ts @@ -0,0 +1,104 @@ +import { Construct } from 'constructs'; +import * as dynamodb from 'aws-cdk-lib/aws-dynamodb'; +import * as ssm from 'aws-cdk-lib/aws-ssm'; +import * as secretsManager from 'aws-cdk-lib/aws-secretsmanager'; +import * as events from 'aws-cdk-lib/aws-events'; +import { WorkflowDraftRunStateChangeToWorkflowRunStateChangeReadyConstruct } from '../../../../../../../components/event-workflowdraftrunstatechange-to-workflowrunstatechange-ready'; + +/* +Part 5 + +Input Event source: `orcabus.umccriseinputeventglue` +Input Event DetailType: `WorkflowDraftRunStateChange` +Input Event status: `draft` + +Output Event source: `orcabus.umccriseinputeventglue` +Output Event DetailType: `WorkflowRunStateChange` +Output Event status: `ready` + +* The umccriseInputMaker, subscribes to the umccrise input event glue (itself) and generates a ready event for the umccriseReadySfn + * However, in order to be 'ready' we need to use a few more variables such as + * icaLogsUri, + * analysisOutputUri + * cacheUri + * projectId + * userReference +*/ + +export interface UmccriseInputMakerConstructProps { + /* Event bus object */ + eventBusObj: events.IEventBus; + /* Tables */ + inputMakerTableObj: dynamodb.ITableV2; + /* SSM Parameter Objects */ + icav2ProjectIdSsmParameterObj: ssm.IStringParameter; + outputUriSsmParameterObj: ssm.IStringParameter; + logsUriSsmParameterObj: ssm.IStringParameter; + cacheUriSsmParameterObj: ssm.IStringParameter; + /* Secrets */ + icav2AccessTokenSecretObj: secretsManager.ISecret; +} + +export class UmccriseInputMakerConstruct extends Construct { + public readonly umccriseInputMakerEventMap = { + prefix: 'pva-umccrise', + tablePartition: 'umccrise', + triggerSource: 'orcabus.umccriseinputeventglue', + triggerStatus: 'draft', + triggerDetailType: 'WorkflowDraftRunStateChange', + outputSource: 'orcabus.umccriseinputeventglue', + outputStatus: 'ready', + payloadVersion: '2024.07.16', + workflowName: 'umccrise', + workflowVersion: '4.2.4', + }; + + constructor(scope: Construct, id: string, props: UmccriseInputMakerConstructProps) { + super(scope, id); + + /* + Part 3: Build the external sfn + */ + new WorkflowDraftRunStateChangeToWorkflowRunStateChangeReadyConstruct( + this, + 'umccrise_internal_input_maker', + { + /* + Set Input StateMachine Object + */ + lambdaPrefix: this.umccriseInputMakerEventMap.prefix, + payloadVersion: this.umccriseInputMakerEventMap.payloadVersion, + stateMachinePrefix: this.umccriseInputMakerEventMap.prefix, + + /* + Table objects + */ + tableObj: props.inputMakerTableObj, + tablePartitionName: this.umccriseInputMakerEventMap.tablePartition, + + /* + Event Triggers + */ + eventBusObj: props.eventBusObj, + triggerDetailType: this.umccriseInputMakerEventMap.triggerDetailType, + triggerSource: this.umccriseInputMakerEventMap.triggerSource, + triggerStatus: this.umccriseInputMakerEventMap.triggerStatus, + outputSource: this.umccriseInputMakerEventMap.outputSource, + workflowName: this.umccriseInputMakerEventMap.workflowName, + workflowVersion: this.umccriseInputMakerEventMap.workflowVersion, + + /* + SSM Parameter Objects + */ + icav2ProjectIdSsmParameterObj: props.icav2ProjectIdSsmParameterObj, + outputUriSsmParameterObj: props.outputUriSsmParameterObj, + logsUriSsmParameterObj: props.logsUriSsmParameterObj, + + /* + Secrets + */ + icav2AccessTokenSecretObj: props.icav2AccessTokenSecretObj, + } + ); + } +}