diff --git a/config/constants.ts b/config/constants.ts index b496ecee9..23de60031 100644 --- a/config/constants.ts +++ b/config/constants.ts @@ -354,7 +354,7 @@ TN Stateless stack */ // Deployed under dev/stg/prod -export const tnIcav2PipelineIdSSMParameterPath = '/icav2/umccr-prod/tumor_normal_4.2.4_pipeline_id'; // 6ce2b636-ba2f-4004-8065-f3557f286c98 +export const tnIcav2PipelineIdSSMParameterPath = '/icav2/umccr-prod/tumor_normal_4.2.4_pipeline_id'; // 7d3cb608-80e0-4ecf-a67e-ef524e9bfb8b export const tnIcav2PipelineWorkflowType = 'tumor-normal'; export const tnIcav2PipelineWorkflowTypeVersion = '4.2.4'; export const tnIcav2ServiceVersion = '2024.07.01'; @@ -430,7 +430,7 @@ WTS Stateless stack */ // Deployed under dev/stg/prod -export const wtsIcav2PipelineIdSSMParameterPath = '/icav2/umccr-prod/wts_4.2.4_pipeline_id'; // 1e53ae07-08a6-458b-9fa3-9cf7430409a0 +export const wtsIcav2PipelineIdSSMParameterPath = '/icav2/umccr-prod/wts_4.2.4_pipeline_id'; // 73e21ce0-60d7-4e3e-b130-88aff78d500d export const wtsIcav2PipelineWorkflowType = 'wts'; export const wtsIcav2PipelineWorkflowTypeVersion = '4.2.4'; export const wtsIcav2ServiceVersion = '2024.07.01'; diff --git a/config/stacks/wtsPipelineManager.ts b/config/stacks/wtsPipelineManager.ts index 6e8d9ea2c..8c790059c 100644 --- a/config/stacks/wtsPipelineManager.ts +++ b/config/stacks/wtsPipelineManager.ts @@ -24,6 +24,7 @@ import { icav2FastaReferenceUriMappingSSMParameterPath, icav2GencodeAnnotationUriMappingSSMParameterPath, icav2WtsQcReferenceSamplesUriMappingSSMParameterPath, + dragenIcav2OraReferenceUriSSMParameterPath, } from '../constants'; import { WtsIcav2PipelineManagerConfig } from '../../lib/workload/stateless/stacks/transcriptome-pipeline-manager/deploy'; import { WtsIcav2PipelineTableConfig } from '../../lib/workload/stateful/stacks/wts-dynamo-db/deploy/stack'; @@ -65,6 +66,7 @@ export const getWtsIcav2PipelineManagerStackProps = ( fastaReferenceUriSsmPath: icav2FastaReferenceUriMappingSSMParameterPath, gencodeAnnotationUriSsmPath: icav2GencodeAnnotationUriMappingSSMParameterPath, wtsQcReferenceSamplesSsmPath: icav2WtsQcReferenceSamplesUriMappingSSMParameterPath, + oraReferenceUriSsmPath: dragenIcav2OraReferenceUriSSMParameterPath, /* Default Versions */ defaultArribaVersion: wtsDefaultArribaVersion, defaultDragenReferenceVersion: wtsDefaultDragenReferenceVersion, diff --git a/lib/workload/components/ora-file-decompression-fq-pair-sfn/tasks/ora_decompression/docker-entrypoint.sh b/lib/workload/components/ora-file-decompression-fq-pair-sfn/tasks/ora_decompression/docker-entrypoint.sh index 2c178947b..0ffb6a4f5 100644 --- a/lib/workload/components/ora-file-decompression-fq-pair-sfn/tasks/ora_decompression/docker-entrypoint.sh +++ b/lib/workload/components/ora-file-decompression-fq-pair-sfn/tasks/ora_decompression/docker-entrypoint.sh @@ -102,7 +102,8 @@ else "${INPUT_URI}" )" | \ /usr/local/bin/orad \ - --gzip \ + --gz \ + --gz-level 1 \ --stdout \ --ora-reference "${ORADATA_PATH}" \ - | \ @@ -139,7 +140,8 @@ else --gz \ --gz-level 1 \ --stdout \ - --ora-reference "${ORADATA_PATH}" | \ + --ora-reference "${ORADATA_PATH}" \ + - | \ ( AWS_ACCESS_KEY_ID="$( \ jq -r '.AWS_ACCESS_KEY_ID' <<< "${aws_s3_access_creds_json_str}" diff --git a/lib/workload/stateless/stacks/cttso-v2-pipeline-manager/deploy/constructs/cttsov2-icav2-manager/index.ts b/lib/workload/stateless/stacks/cttso-v2-pipeline-manager/deploy/constructs/cttsov2-icav2-manager/index.ts index 256d1634b..74a270023 100644 --- a/lib/workload/stateless/stacks/cttso-v2-pipeline-manager/deploy/constructs/cttsov2-icav2-manager/index.ts +++ b/lib/workload/stateless/stacks/cttso-v2-pipeline-manager/deploy/constructs/cttsov2-icav2-manager/index.ts @@ -122,6 +122,7 @@ export class Cttsov2Icav2PipelineManagerConstruct extends Construct { props.icav2CopyFilesStateMachineObj.grantStartExecution(configureInputsSfn); props.icav2CopyFilesStateMachineObj.grantRead(configureInputsSfn); props.oraDecompressionStateMachineObj.grantStartExecution(configureInputsSfn); + props.oraDecompressionStateMachineObj.grantRead(configureInputsSfn); // Because we run a nested state machine, we need to add the permissions to the state machine role // See https://stackoverflow.com/questions/60612853/nested-step-function-in-a-step-function-unknown-error-not-authorized-to-cr diff --git a/lib/workload/stateless/stacks/cttso-v2-pipeline-manager/step_functions_templates/set_cttso_v2_nf_inputs.asl.json b/lib/workload/stateless/stacks/cttso-v2-pipeline-manager/step_functions_templates/set_cttso_v2_nf_inputs.asl.json index 819a28acd..fe820a788 100644 --- a/lib/workload/stateless/stacks/cttso-v2-pipeline-manager/step_functions_templates/set_cttso_v2_nf_inputs.asl.json +++ b/lib/workload/stateless/stacks/cttso-v2-pipeline-manager/step_functions_templates/set_cttso_v2_nf_inputs.asl.json @@ -175,7 +175,8 @@ "read1GzOutputFileUri.$": "$.get_ora_cache_uri_step.read_1_gz_output_uri", "read2GzOutputFileUri.$": "$.get_ora_cache_uri_step.read_2_gz_output_uri", "read1EstimatedGzFileSize": -1, - "read2EstimatedGzFileSize": -1 + "read2EstimatedGzFileSize": -1, + "validationOnly.$": false } }, "End": true diff --git a/lib/workload/stateless/stacks/transcriptome-pipeline-manager/deploy/index.ts b/lib/workload/stateless/stacks/transcriptome-pipeline-manager/deploy/index.ts index 8b9d405db..b1fd28aeb 100644 --- a/lib/workload/stateless/stacks/transcriptome-pipeline-manager/deploy/index.ts +++ b/lib/workload/stateless/stacks/transcriptome-pipeline-manager/deploy/index.ts @@ -24,6 +24,7 @@ export interface WtsIcav2PipelineManagerConfig { gencodeAnnotationUriSsmPath: string; // "/icav2/umccr-prod/gencode-annotation-uri" // FIXME arribaUriSsmPath: string; // "/icav2/umccr-prod/arriba-uri" // FIXME wtsQcReferenceSamplesSsmPath: string; // "/icav2/umccr-prod/wts-qc-reference-samples" // FIXME + oraReferenceUriSsmPath: string; // // FIXME /* Defaults */ defaultDragenReferenceVersion: string; // v9-r3 defaultFastaReferenceVersion: string; // hg38 @@ -90,6 +91,13 @@ export class WtsIcav2PipelineManagerStack extends cdk.Stack { props.dragenReferenceUriSsmPath ); + // ORA Reference + const oraReferenceSsmObj = ssm.StringParameter.fromStringParameterName( + this, + props.oraReferenceUriSsmPath, + props.oraReferenceUriSsmPath + ); + // Fasta Reference const fastaReferenceSsmObj = ssm.StringParameter.fromStringParameterName( this, @@ -146,6 +154,21 @@ export class WtsIcav2PipelineManagerStack extends cdk.Stack { } ); + // Lambda function to check fastq list rows for ora inputs + const hasOraInputsLambdaObj = new PythonFunction( + this, + 'add_ora_reference_lambda_python_function', + { + entry: path.join(__dirname, '../lambdas/has_ora_inputs_py'), + runtime: lambda.Runtime.PYTHON_3_12, + architecture: lambda.Architecture.ARM_64, + index: 'has_ora_inputs.py', + handler: 'handler', + memorySize: 1024, + timeout: Duration.seconds(60), + } + ); + // Specify the statemachine and replace the arn placeholders with the lambda arns defined above const configureInputsSfn = new sfn.StateMachine( this, @@ -167,6 +190,7 @@ export class WtsIcav2PipelineManagerStack extends cdk.Stack { __annotation_version_uri_ssm_parameter_name__: annotationSsmObj.parameterName, __qc_reference_samples_version_uri_ssm_parameter_name__: wtsQcReferenceSamplesSsmObj.parameterName, + __ora_reference_uri_ssm_parameter_path__: oraReferenceSsmObj.parameterName, /* Defaults */ __default_reference_version__: props.defaultDragenReferenceVersion, @@ -180,6 +204,7 @@ export class WtsIcav2PipelineManagerStack extends cdk.Stack { convertFastqListRowsToCwlInputObjectsLambdaObj.currentVersion.functionArn, __get_boolean_parameters_lambda_function_arn__: getBooleanParametersFromEventInputLambdaObj.currentVersion.functionArn, + __has_ora_inputs_lambda_function_arn__: hasOraInputsLambdaObj.currentVersion.functionArn, }, } ); @@ -188,6 +213,7 @@ export class WtsIcav2PipelineManagerStack extends cdk.Stack { [ convertFastqListRowsToCwlInputObjectsLambdaObj, getBooleanParametersFromEventInputLambdaObj, + hasOraInputsLambdaObj, ].forEach((lambdaObj) => { lambdaObj.currentVersion.grantInvoke(configureInputsSfn); }); @@ -202,6 +228,7 @@ export class WtsIcav2PipelineManagerStack extends cdk.Stack { arribaSsmObj, annotationSsmObj, wtsQcReferenceSamplesSsmObj, + oraReferenceSsmObj, ].forEach((ssmObj) => { ssmObj.grantRead(configureInputsSfn); }); diff --git a/lib/workload/stateless/stacks/transcriptome-pipeline-manager/lambdas/has_ora_inputs_py/has_ora_inputs.py b/lib/workload/stateless/stacks/transcriptome-pipeline-manager/lambdas/has_ora_inputs_py/has_ora_inputs.py new file mode 100644 index 000000000..ba68b3065 --- /dev/null +++ b/lib/workload/stateless/stacks/transcriptome-pipeline-manager/lambdas/has_ora_inputs_py/has_ora_inputs.py @@ -0,0 +1,170 @@ +#!/usr/bin/env python3 + +""" +Add ora reference +""" +from pathlib import Path +from typing import Dict, Optional, List + + +def handler(event, context) -> Dict[str, bool]: + """ + Get the boolean parameters from the event input + :param event: + :param context: + :return: Dictionary of boolean parameters + """ + + # Collect the event data input + event_data_input: Dict = event['event_data_input'] + + # Get the fastq list rows + tumor_fastq_list_rows: Optional[List] = event_data_input.get('tumorFastqListRows', None) + normal_fastq_list_rows: Optional[List] = event_data_input.get('fastqListRows', None) + + # If tumorFastqListRows is None and fastqListRows is None, return false + if tumor_fastq_list_rows is None and normal_fastq_list_rows is None: + return { + "add_ora_step": False, + "is_hybrid": False + } + + add_ora_step = False + for fastq_list_row_iter in [tumor_fastq_list_rows, normal_fastq_list_rows]: + if fastq_list_row_iter is not None: + # If fastqListRows is not None, return true + # Iterate over each of the fastq list rows, if one of the read1FileUri or read2FileUri end with .fastq.ora + # return true + if any( + [ + row.get('read1FileUri', '').endswith('.fastq.ora') or + row.get('read2FileUri', '').endswith('.fastq.ora') + for row in fastq_list_row_iter + ] + ): + add_ora_step = True + + # Check if hybrid + endings = [] + for fastq_list_row_iter in [tumor_fastq_list_rows, normal_fastq_list_rows]: + if fastq_list_row_iter is not None: + endings.extend( + list(set(list(map( + lambda fastq_list_row_: Path(fastq_list_row_.get("read1FileUri")).suffix, + fastq_list_row_iter + )))) + ) + + if len(list(set(endings))) > 1: + # Don't need ora when hybrid since ora samples will be dropped + is_hybrid = True + add_ora_step = False + else: + is_hybrid = False + + # Got to here? Return false + return { + "add_ora_step": add_ora_step, + "is_hybrid": is_hybrid + } + + +# if __name__ == "__main__": +# import json +# +# print( +# json.dumps( +# handler( +# { +# "event_data_input": { +# "tumorFastqListRows": [ +# { +# "rgid": "ATGAGGCC.CAATTAAC.2", +# "rgsm": "L2400195", +# "rglb": "L2400195", +# "lane": 2, +# "read1FileUri": "icav2://ea19a3f5-ec7c-4940-a474-c31cd91dbad4/primary/240229_A00130_0288_BH5HM2DSXC/2024071110689063/Samples/Lane_2/L2400195/L2400195_S9_L002_R1_001.fastq.gz", +# "read2FileUri": "icav2://ea19a3f5-ec7c-4940-a474-c31cd91dbad4/primary/240229_A00130_0288_BH5HM2DSXC/2024071110689063/Samples/Lane_2/L2400195/L2400195_S9_L002_R2_001.fastq.gz" +# }, +# { +# "rgid": "ATGAGGCC.CAATTAAC.3", +# "rgsm": "L2400195", +# "rglb": "L2400195", +# "lane": 3, +# "read1FileUri": "icav2://ea19a3f5-ec7c-4940-a474-c31cd91dbad4/primary/240229_A00130_0288_BH5HM2DSXC/2024071110689063/Samples/Lane_3/L2400195/L2400195_S9_L003_R1_001.fastq.gz", +# "read2FileUri": "icav2://ea19a3f5-ec7c-4940-a474-c31cd91dbad4/primary/240229_A00130_0288_BH5HM2DSXC/2024071110689063/Samples/Lane_3/L2400195/L2400195_S9_L003_R2_001.fastq.gz" +# } +# ], +# "fastqListRows": [ +# { +# "rgid": "GCACGGAC.TGCGAGAC.4", +# "rgsm": "L2400191", +# "rglb": "L2400191", +# "lane": 4, +# "read1FileUri": "icav2://ea19a3f5-ec7c-4940-a474-c31cd91dbad4/primary/240229_A00130_0288_BH5HM2DSXC/2024071110689063/Samples/Lane_4/L2400191/L2400191_S17_L004_R1_001.fastq.gz", +# "read2FileUri": "icav2://ea19a3f5-ec7c-4940-a474-c31cd91dbad4/primary/240229_A00130_0288_BH5HM2DSXC/2024071110689063/Samples/Lane_4/L2400191/L2400191_S17_L004_R2_001.fastq.gz" +# } +# ], +# "dragenReferenceVersion": "v9-r3" +# } +# }, +# None +# ), +# indent=4 +# ) +# ) +# +# # { +# # "add_ora_step": false, +# # "is_hybrid": false +# # } +# +# if __name__ == "__main__": +# import json +# +# print( +# json.dumps( +# handler( +# { +# "event_data_input": { +# "tumorFastqListRows": [ +# { +# "rgid": "ATGAGGCC.CAATTAAC.2", +# "rgsm": "L2400195", +# "rglb": "L2400195", +# "lane": 2, +# "read1FileUri": "icav2://ea19a3f5-ec7c-4940-a474-c31cd91dbad4/primary/240229_A00130_0288_BH5HM2DSXC/2024071110689063/Samples/Lane_2/L2400195/L2400195_S9_L002_R1_001.fastq.gz", +# "read2FileUri": "icav2://ea19a3f5-ec7c-4940-a474-c31cd91dbad4/primary/240229_A00130_0288_BH5HM2DSXC/2024071110689063/Samples/Lane_2/L2400195/L2400195_S9_L002_R2_001.fastq.gz" +# }, +# { +# "rgid": "ATGAGGCC.CAATTAAC.3", +# "rgsm": "L2400195", +# "rglb": "L2400195", +# "lane": 3, +# "read1FileUri": "icav2://ea19a3f5-ec7c-4940-a474-c31cd91dbad4/primary/240229_A00130_0288_BH5HM2DSXC/2024071110689063/Samples/Lane_3/L2400195/L2400195_S9_L003_R1_001.fastq.gz", +# "read2FileUri": "icav2://ea19a3f5-ec7c-4940-a474-c31cd91dbad4/primary/240229_A00130_0288_BH5HM2DSXC/2024071110689063/Samples/Lane_3/L2400195/L2400195_S9_L003_R2_001.fastq.gz" +# } +# ], +# "fastqListRows": [ +# { +# "rgid": "GCACGGAC.TGCGAGAC.4", +# "rgsm": "L2400191", +# "rglb": "L2400191", +# "lane": 4, +# "read1FileUri": "icav2://ea19a3f5-ec7c-4940-a474-c31cd91dbad4/primary/240229_A00130_0288_BH5HM2DSXC/2024071110689063/Samples/Lane_4/L2400191/L2400191_S17_L004_R1_001.fastq.ora", +# "read2FileUri": "icav2://ea19a3f5-ec7c-4940-a474-c31cd91dbad4/primary/240229_A00130_0288_BH5HM2DSXC/2024071110689063/Samples/Lane_4/L2400191/L2400191_S17_L004_R2_001.fastq.ora" +# } +# ], +# "dragenReferenceVersion": "v9-r3" +# } +# }, +# None +# ), +# indent=4 +# ) +# ) +# +# # { +# # "add_ora_step": true, +# # "is_hybrid": true +# # } diff --git a/lib/workload/stateless/stacks/transcriptome-pipeline-manager/step_functions_templates/set_wts_cwl_inputs_sfn.asl.json b/lib/workload/stateless/stacks/transcriptome-pipeline-manager/step_functions_templates/set_wts_cwl_inputs_sfn.asl.json index 0d98e3e69..10301ff92 100644 --- a/lib/workload/stateless/stacks/transcriptome-pipeline-manager/step_functions_templates/set_wts_cwl_inputs_sfn.asl.json +++ b/lib/workload/stateless/stacks/transcriptome-pipeline-manager/step_functions_templates/set_wts_cwl_inputs_sfn.asl.json @@ -407,6 +407,72 @@ } } } + }, + { + "StartAt": "Has ORA Inputs", + "States": { + "Has ORA Inputs": { + "Type": "Task", + "Resource": "arn:aws:states:::lambda:invoke", + "Parameters": { + "FunctionName": "${__has_ora_inputs_lambda_function_arn__}", + "Payload": { + "event_data_input.$": "$.get_input_parameters_from_event_step.inputs" + } + }, + "Retry": [ + { + "ErrorEquals": [ + "Lambda.ServiceException", + "Lambda.AWSLambdaException", + "Lambda.SdkClientException", + "Lambda.TooManyRequestsException" + ], + "IntervalSeconds": 1, + "MaxAttempts": 3, + "BackoffRate": 2, + "JitterStrategy": "FULL" + } + ], + "Next": "Need Ora Reference", + "ResultSelector": { + "add_ora_step.$": "$.Payload.add_ora_step" + } + }, + "Need Ora Reference": { + "Type": "Choice", + "Choices": [ + { + "Variable": "$.add_ora_step", + "BooleanEquals": true, + "Next": "Get the ORA Reference Version", + "Comment": "Needs ora reference tar" + } + ], + "Default": "Set Output to null" + }, + "Get the ORA Reference Version": { + "Type": "Task", + "Parameters": { + "Name": "${__ora_reference_uri_ssm_parameter_path__}" + }, + "Resource": "arn:aws:states:::aws-sdk:ssm:getParameter", + "ResultSelector": { + "ora_reference_tar": { + "class": "File", + "location.$": "$.Parameter.Value" + } + }, + "End": true + }, + "Set Output to null": { + "Type": "Pass", + "End": true, + "Result": { + "ora_reference_tar": null + } + } + } } ], "ResultPath": "$.configure_inputs_step", @@ -422,7 +488,8 @@ "cl_config_sample_names_replace": { "sample_names_replace.$": "$.[5].get_qc_reference_samples_version_uri_step.cl_config_sample_names_replace" }, - "boolean_parameters.$": "$.[6].get_boolean_parameters_step.boolean_parameters" + "boolean_parameters.$": "$.[6].get_boolean_parameters_step.boolean_parameters", + "ora_reference_tar.$": "$.[7].ora_reference_tar" } }, "Set Input JSON": { @@ -439,7 +506,8 @@ "protein_domains.$": "$.configure_inputs_step.protein_domains_uri", "qc_reference_samples.$": "$.configure_inputs_step.qc_reference_samples_list", "reference_fasta.$": "$.configure_inputs_step.reference_fasta_uri", - "reference_tar.$": "$.configure_inputs_step.reference_tar_uri" + "reference_tar.$": "$.configure_inputs_step.reference_tar_uri", + "ora_reference_tar.$": "$.configure_inputs_step.ora_reference_tar" } }, "ResultPath": "$.set_input_json_step" diff --git a/lib/workload/stateless/stacks/tumor-normal-pipeline-manager/deploy/index.ts b/lib/workload/stateless/stacks/tumor-normal-pipeline-manager/deploy/index.ts index 35295f687..27dfa2588 100644 --- a/lib/workload/stateless/stacks/tumor-normal-pipeline-manager/deploy/index.ts +++ b/lib/workload/stateless/stacks/tumor-normal-pipeline-manager/deploy/index.ts @@ -14,6 +14,8 @@ import { DefinitionBody } from 'aws-cdk-lib/aws-stepfunctions'; import { PythonLambdaFastqListRowsToCwlInputConstruct } from '../../../../components/python-lambda-fastq-list-rows-to-cwl-input'; import { WfmWorkflowStateChangeIcav2ReadyEventHandlerConstruct } from '../../../../components/sfn-icav2-ready-event-handler'; import { Icav2AnalysisEventHandlerConstruct } from '../../../../components/sfn-icav2-state-change-event-handler'; +import { OraDecompressionConstruct } from '../../../../components/ora-file-decompression-fq-pair-sfn'; +import { NagSuppressions } from 'cdk-nag'; export interface TnIcav2PipelineManagerConfig { /* ICAv2 Pipeline analysis essentials */ @@ -127,6 +129,16 @@ export class TnIcav2PipelineManagerStack extends cdk.Stack { } ); + // Get the ora decompression construct + const oraDecompressionStateMachineObj = new OraDecompressionConstruct( + this, + 'ora_decompression_state_machine_obj', + { + icav2AccessTokenSecretId: this.icav2AccessTokenSecretObj.secretName, + sfnPrefix: `${props.stateMachinePrefix}-ora-to-gz`, + } + ).sfnObject; + // Specify the statemachine and replace the arn placeholders with the lambda arns defined above const configureInputsSfn = new sfn.StateMachine( this, @@ -153,6 +165,9 @@ export class TnIcav2PipelineManagerStack extends cdk.Stack { getBooleanParametersFromEventInputLambdaObj.currentVersion.functionArn, __add_ora_reference_lambda_function_arn__: addOraReferenceLambdaObj.currentVersion.functionArn, + /* Step functions */ + __ora_fastq_list_row_decompression_sfn_arn__: + oraDecompressionStateMachineObj.stateMachineArn, }, } ); @@ -174,6 +189,35 @@ export class TnIcav2PipelineManagerStack extends cdk.Stack { ssmObj.grantRead(configureInputsSfn); }); + // Allow state machine to invoke the ora decompression state machine + oraDecompressionStateMachineObj.grantStartExecution(configureInputsSfn); + oraDecompressionStateMachineObj.grantRead(configureInputsSfn); + + // Because we run a nested state machine, we need to add the permissions to the state machine role + // See https://stackoverflow.com/questions/60612853/nested-step-function-in-a-step-function-unknown-error-not-authorized-to-cr + configureInputsSfn.addToRolePolicy( + new iam.PolicyStatement({ + resources: [ + `arn:aws:events:${cdk.Aws.REGION}:${cdk.Aws.ACCOUNT_ID}:rule/StepFunctionsGetEventsForStepFunctionsExecutionRule`, + ], + actions: ['events:PutTargets', 'events:PutRule', 'events:DescribeRule'], + }) + ); + + // https://docs.aws.amazon.com/step-functions/latest/dg/connect-stepfunctions.html#sync-async-iam-policies + // Polling requires permission for states:DescribeExecution + NagSuppressions.addResourceSuppressions( + configureInputsSfn, + [ + { + id: 'AwsSolutions-IAM5', + reason: + 'grantRead uses asterisk at the end of executions, as we need permissions for all execution invocations', + }, + ], + true + ); + /* Part 2: Configure the lambdas and outputs step function Quite a bit more complicated than regular ICAv2 workflow setup since we need to @@ -201,7 +245,7 @@ export class TnIcav2PipelineManagerStack extends cdk.Stack { ); // Add permissions to lambda - this.icav2AccessTokenSecretObj.grantRead(setOutputJsonLambdaObj.currentVersion.role); + this.icav2AccessTokenSecretObj.grantRead(setOutputJsonLambdaObj.currentVersion); const configureOutputsSfn = new sfn.StateMachine(this, 'sfn_configure_outputs_json', { stateMachineName: `${props.stateMachinePrefix}-configure-outputs-json`, diff --git a/lib/workload/stateless/stacks/tumor-normal-pipeline-manager/lambdas/add_ora_reference_py/add_ora_reference.py b/lib/workload/stateless/stacks/tumor-normal-pipeline-manager/lambdas/add_ora_reference_py/add_ora_reference.py index eb98093dc..ba68b3065 100644 --- a/lib/workload/stateless/stacks/tumor-normal-pipeline-manager/lambdas/add_ora_reference_py/add_ora_reference.py +++ b/lib/workload/stateless/stacks/tumor-normal-pipeline-manager/lambdas/add_ora_reference_py/add_ora_reference.py @@ -3,6 +3,7 @@ """ Add ora reference """ +from pathlib import Path from typing import Dict, Optional, List @@ -24,9 +25,11 @@ def handler(event, context) -> Dict[str, bool]: # If tumorFastqListRows is None and fastqListRows is None, return false if tumor_fastq_list_rows is None and normal_fastq_list_rows is None: return { - "add_ora_step": False + "add_ora_step": False, + "is_hybrid": False } + add_ora_step = False for fastq_list_row_iter in [tumor_fastq_list_rows, normal_fastq_list_rows]: if fastq_list_row_iter is not None: # If fastqListRows is not None, return true @@ -39,13 +42,30 @@ def handler(event, context) -> Dict[str, bool]: for row in fastq_list_row_iter ] ): - return { - "add_ora_step": True - } + add_ora_step = True + + # Check if hybrid + endings = [] + for fastq_list_row_iter in [tumor_fastq_list_rows, normal_fastq_list_rows]: + if fastq_list_row_iter is not None: + endings.extend( + list(set(list(map( + lambda fastq_list_row_: Path(fastq_list_row_.get("read1FileUri")).suffix, + fastq_list_row_iter + )))) + ) + + if len(list(set(endings))) > 1: + # Don't need ora when hybrid since ora samples will be dropped + is_hybrid = True + add_ora_step = False + else: + is_hybrid = False # Got to here? Return false return { - "add_ora_step": False + "add_ora_step": add_ora_step, + "is_hybrid": is_hybrid } @@ -95,9 +115,10 @@ def handler(event, context) -> Dict[str, bool]: # ) # # # { -# # "add_ora_step": false +# # "add_ora_step": false, +# # "is_hybrid": false # # } - +# # if __name__ == "__main__": # import json # @@ -144,5 +165,6 @@ def handler(event, context) -> Dict[str, bool]: # ) # # # { -# # "add_ora_step": true +# # "add_ora_step": true, +# # "is_hybrid": true # # } diff --git a/lib/workload/stateless/stacks/tumor-normal-pipeline-manager/step_functions_templates/set_tn_cwl_inputs_sfn.asl.json b/lib/workload/stateless/stacks/tumor-normal-pipeline-manager/step_functions_templates/set_tn_cwl_inputs_sfn.asl.json index ef3f033a2..8cdec24b3 100644 --- a/lib/workload/stateless/stacks/tumor-normal-pipeline-manager/step_functions_templates/set_tn_cwl_inputs_sfn.asl.json +++ b/lib/workload/stateless/stacks/tumor-normal-pipeline-manager/step_functions_templates/set_tn_cwl_inputs_sfn.asl.json @@ -41,7 +41,7 @@ } ], "Comment": "Fastq List Row Inputs", - "Next": "Handle FQLR" + "Next": "Add ORA Reference Bool / Is Hybrid Input" }, { "And": [ @@ -74,6 +74,36 @@ ], "Default": "Fail" }, + "Add ORA Reference Bool / Is Hybrid Input": { + "Type": "Task", + "Resource": "arn:aws:states:::lambda:invoke", + "Parameters": { + "Payload": { + "event_data_input.$": "$.get_input_parameters_from_event_step.inputs" + }, + "FunctionName": "${__add_ora_reference_lambda_function_arn__}" + }, + "Retry": [ + { + "ErrorEquals": [ + "Lambda.ServiceException", + "Lambda.AWSLambdaException", + "Lambda.SdkClientException", + "Lambda.TooManyRequestsException" + ], + "IntervalSeconds": 1, + "MaxAttempts": 3, + "BackoffRate": 2, + "JitterStrategy": "FULL" + } + ], + "ResultSelector": { + "add_ora_step.$": "$.Payload.add_ora_step", + "is_hybrid.$": "$.Payload.is_hybrid" + }, + "ResultPath": "$.add_ora_step_path", + "Next": "Handle FQLR" + }, "Handle FQLR": { "Type": "Parallel", "Branches": [ @@ -84,15 +114,105 @@ "Type": "Parallel", "Branches": [ { - "StartAt": "Convert Fastq List Rows to CWL Input Objects (Tumor)", + "StartAt": "For each fastq list row (T)", "States": { - "Convert Fastq List Rows to CWL Input Objects (Tumor)": { + "For each fastq list row (T)": { + "Type": "Map", + "ItemsPath": "$.get_input_parameters_from_event_step.inputs.tumorFastqListRows", + "ItemSelector": { + "fastq_list_row.$": "$$.Map.Item.Value", + "cache_uri_prefix.$": "$.get_input_parameters_from_event_step.engineParameters.cacheUri", + "is_hybrid_run.$": "$.add_ora_step_path.is_hybrid" + }, + "ItemProcessor": { + "ProcessorConfig": { + "Mode": "INLINE" + }, + "StartAt": "Is ORA (T)", + "States": { + "Is ORA (T)": { + "Type": "Pass", + "Next": "Is Hybrid Run and Ora Fastq (T)", + "Parameters": { + "read_1_file_suffix.$": "States.ArrayGetItem(States.StringSplit($.fastq_list_row.read1FileUri, '.'), States.MathAdd(States.ArrayLength(States.StringSplit($.fastq_list_row.read1FileUri, '.')), -1))" + }, + "ResultPath": "$.get_r1_suffix_step" + }, + "Is Hybrid Run and Ora Fastq (T)": { + "Type": "Choice", + "Choices": [ + { + "And": [ + { + "Variable": "$.is_hybrid_run", + "BooleanEquals": true + }, + { + "Variable": "$.get_r1_suffix_step.read_1_file_suffix", + "StringEquals": "ora" + } + ], + "Next": "Set GZIP Outputs (T)", + "Comment": "Is ORA and hybrid fastq format inputs" + } + ], + "Default": "Pass (T)" + }, + "Set GZIP Outputs (T)": { + "Type": "Pass", + "Next": "Decompress ORA (T)", + "Parameters": { + "read1FileUri.$": "States.Format('{}{}/{}_read1_ora_decompressed.fastq.gz', $.cache_uri_prefix, States.Hash($.fastq_list_row.rgid, 'SHA-1'), $.fastq_list_row.rgsm)", + "read2FileUri.$": "States.Format('{}{}/{}_read2_ora_decompressed.fastq.gz', $.cache_uri_prefix, States.Hash($.fastq_list_row.rgid, 'SHA-1'), $.fastq_list_row.rgsm)" + }, + "ResultPath": "$.get_gzip_outputs_step" + }, + "Pass (T)": { + "Type": "Pass", + "End": true, + "Parameters": { + "fastq_list_row.$": "$.fastq_list_row" + } + }, + "Decompress ORA (T)": { + "Type": "Task", + "Resource": "arn:aws:states:::states:startExecution.sync:2", + "Parameters": { + "StateMachineArn": "${__ora_fastq_list_row_decompression_sfn_arn__}", + "Input": { + "read1OraFileUri.$": "$.fastq_list_row.read1FileUri", + "read2OraFileUri.$": "$.fastq_list_row.read2FileUri", + "read1GzOutputFileUri.$": "$.get_gzip_outputs_step.read1FileUri", + "read2GzOutputFileUri.$": "$.get_gzip_outputs_step.read2FileUri", + "read1EstimatedGzFileSize": -1, + "read2EstimatedGzFileSize": -1, + "validationOnly": false + } + }, + "Next": "Set New FQLR (T)", + "ResultPath": null + }, + "Set New FQLR (T)": { + "Type": "Pass", + "Parameters": { + "fastq_list_row.$": "States.JsonMerge($.fastq_list_row, $.get_gzip_outputs_step, false)" + }, + "End": true + } + } + }, + "Next": "Convert Fastq List Rows to CWL Input Objects (T)", + "ResultSelector": { + "fastq_list_rows.$": "$[*].fastq_list_row" + } + }, + "Convert Fastq List Rows to CWL Input Objects (T)": { "Type": "Task", "Resource": "arn:aws:states:::lambda:invoke", "Parameters": { "FunctionName": "${__convert_fastq_list_rows_lambda_function_arn__}", "Payload": { - "fastq_list_rows.$": "$.get_input_parameters_from_event_step.inputs.tumorFastqListRows" + "fastq_list_rows.$": "$.fastq_list_rows" } }, "Retry": [ @@ -109,23 +229,112 @@ "BackoffRate": 2 } ], - "End": true, "ResultSelector": { "fastq_list_rows.$": "$.Payload.fastq_list_rows" }, - "ResultPath": "$.convert_tumor_fastq_list_rows_to_cwl_input_objects_step" + "End": true } } }, { - "StartAt": "Convert Fastq List Rows to CWL Input Objects (Normal)", + "StartAt": "For each fastq list row (N)", "States": { - "Convert Fastq List Rows to CWL Input Objects (Normal)": { + "For each fastq list row (N)": { + "Type": "Map", + "ItemsPath": "$.get_input_parameters_from_event_step.inputs.fastqListRows", + "ItemSelector": { + "fastq_list_row.$": "$$.Map.Item.Value", + "cache_uri_prefix.$": "$.get_input_parameters_from_event_step.engineParameters.cacheUri", + "is_hybrid_run.$": "$.add_ora_step_path.is_hybrid" + }, + "ItemProcessor": { + "ProcessorConfig": { + "Mode": "INLINE" + }, + "StartAt": "Is ORA (N)", + "States": { + "Is ORA (N)": { + "Type": "Pass", + "Next": "Is Hybrid Run and Ora Fastq (N)", + "Parameters": { + "read_1_file_suffix.$": "States.ArrayGetItem(States.StringSplit($.fastq_list_row.read1FileUri, '.'), States.MathAdd(States.ArrayLength(States.StringSplit($.fastq_list_row.read1FileUri, '.')), -1))" + }, + "ResultPath": "$.get_r1_suffix_step" + }, + "Is Hybrid Run and Ora Fastq (N)": { + "Type": "Choice", + "Choices": [ + { + "And": [ + { + "Variable": "$.is_hybrid_run", + "BooleanEquals": true + }, + { + "Variable": "$.get_r1_suffix_step.read_1_file_suffix", + "StringEquals": "ora" + } + ], + "Next": "Set GZIP Outputs (N)", + "Comment": "Is ORA and hybrid fastq format inputs" + } + ], + "Default": "Pass (N)" + }, + "Set GZIP Outputs (N)": { + "Type": "Pass", + "Next": "Decompress ORA (N)", + "Parameters": { + "read1FileUri.$": "States.Format('{}{}/{}_read1_ora_decompressed.fastq.gz', $.cache_uri_prefix, States.Hash($.fastq_list_row.rgid, 'SHA-1'), $.fastq_list_row.rgsm)", + "read2FileUri.$": "States.Format('{}{}/{}_read2_ora_decompressed.fastq.gz', $.cache_uri_prefix, States.Hash($.fastq_list_row.rgid, 'SHA-1'), $.fastq_list_row.rgsm)" + }, + "ResultPath": "$.get_gzip_outputs_step" + }, + "Pass (N)": { + "Type": "Pass", + "End": true, + "Parameters": { + "fastq_list_row.$": "$.fastq_list_row" + } + }, + "Decompress ORA (N)": { + "Type": "Task", + "Resource": "arn:aws:states:::states:startExecution.sync:2", + "Parameters": { + "StateMachineArn": "${__ora_fastq_list_row_decompression_sfn_arn__}", + "Input": { + "read1OraFileUri.$": "$.fastq_list_row.read1FileUri", + "read2OraFileUri.$": "$.fastq_list_row.read2FileUri", + "read1GzOutputFileUri.$": "$.get_gzip_outputs_step.read1FileUri", + "read2GzOutputFileUri.$": "$.get_gzip_outputs_step.read2FileUri", + "read1EstimatedGzFileSize": -1, + "read2EstimatedGzFileSize": -1, + "validationOnly": false + } + }, + "Next": "Set New FQLR (N)", + "ResultPath": null + }, + "Set New FQLR (N)": { + "Type": "Pass", + "Parameters": { + "fastq_list_row.$": "States.JsonMerge($.fastq_list_row, $.get_gzip_outputs_step, false)" + }, + "End": true + } + } + }, + "Next": "Convert Fastq List Rows to CWL Input Objects (N)", + "ResultSelector": { + "fastq_list_rows.$": "$[*].fastq_list_row" + } + }, + "Convert Fastq List Rows to CWL Input Objects (N)": { "Type": "Task", "Resource": "arn:aws:states:::lambda:invoke", "Parameters": { "Payload": { - "fastq_list_rows.$": "$.get_input_parameters_from_event_step.inputs.fastqListRows" + "fastq_list_rows.$": "$.fastq_list_rows" }, "FunctionName": "${__convert_fastq_list_rows_lambda_function_arn__}" }, @@ -143,7 +352,6 @@ "BackoffRate": 2 } ], - "ResultPath": "$.convert_fastq_list_rows_to_cwl_input_objects_step", "ResultSelector": { "fastq_list_rows.$": "$.Payload.fastq_list_rows" }, @@ -155,45 +363,16 @@ "End": true, "ResultSelector": { "cwl_data_inputs": { - "tumor_fastq_list_rows.$": "$.[0].convert_tumor_fastq_list_rows_to_cwl_input_objects_step.fastq_list_rows", - "fastq_list_rows.$": "$.[1].convert_fastq_list_rows_to_cwl_input_objects_step.fastq_list_rows" + "tumor_fastq_list_rows.$": "$.[0].fastq_list_rows", + "fastq_list_rows.$": "$.[1].fastq_list_rows" } } } } }, { - "StartAt": "Add ORA Reference Bool", + "StartAt": "Need Ora Reference", "States": { - "Add ORA Reference Bool": { - "Type": "Task", - "Resource": "arn:aws:states:::lambda:invoke", - "Parameters": { - "Payload": { - "event_data_input.$": "$.get_input_parameters_from_event_step.inputs" - }, - "FunctionName": "${__add_ora_reference_lambda_function_arn__}" - }, - "Retry": [ - { - "ErrorEquals": [ - "Lambda.ServiceException", - "Lambda.AWSLambdaException", - "Lambda.SdkClientException", - "Lambda.TooManyRequestsException" - ], - "IntervalSeconds": 1, - "MaxAttempts": 3, - "BackoffRate": 2, - "JitterStrategy": "FULL" - } - ], - "ResultSelector": { - "add_ora_step.$": "$.Payload.add_ora_step" - }, - "ResultPath": "$.add_ora_step_path", - "Next": "Need Ora Reference" - }, "Need Ora Reference": { "Type": "Choice", "Choices": [