diff --git a/lib/workload/stateless/stacks/cttso-v2-pipeline-manager/deploy/constructs/cttsov2-icav2-manager/index.ts b/lib/workload/stateless/stacks/cttso-v2-pipeline-manager/deploy/constructs/cttsov2-icav2-manager/index.ts index 54c7c96b0..7a73e9b8f 100644 --- a/lib/workload/stateless/stacks/cttso-v2-pipeline-manager/deploy/constructs/cttsov2-icav2-manager/index.ts +++ b/lib/workload/stateless/stacks/cttso-v2-pipeline-manager/deploy/constructs/cttsov2-icav2-manager/index.ts @@ -43,6 +43,7 @@ interface Cttsov2Icav2PipelineManagerConstructProps { setOutputJsonLambdaObj: PythonFunction; getVcfsLambdaObj: PythonFunction; compressVcfLambdaObj: DockerImageFunction; + checkSuccessSampleLambdaObj: PythonFunction; // ICAv2 Copy Batch State Machine Object icav2CopyFilesStateMachineObj: sfn.IStateMachine; } @@ -119,6 +120,7 @@ export class Cttsov2Icav2PipelineManagerConstruct extends Construct { props.setOutputJsonLambdaObj, props.getVcfsLambdaObj, props.compressVcfLambdaObj, + props.checkSuccessSampleLambdaObj, ].forEach((lambda_obj) => { props.icav2AccessTokenSecretObj.grantRead(lambda_obj.currentVersion.role); }); @@ -139,6 +141,8 @@ export class Cttsov2Icav2PipelineManagerConstruct extends Construct { props.getVcfsLambdaObj.currentVersion.functionArn, __compress_vcf_file_lambda_function_arn__: props.compressVcfLambdaObj.currentVersion.functionArn, + __check_successful_analysis_lambda_function_arn__: + props.checkSuccessSampleLambdaObj.currentVersion.functionArn, }, }); @@ -151,6 +155,7 @@ export class Cttsov2Icav2PipelineManagerConstruct extends Construct { props.setOutputJsonLambdaObj, props.getVcfsLambdaObj, props.compressVcfLambdaObj, + props.checkSuccessSampleLambdaObj, ].forEach((lambda_obj) => { lambda_obj.currentVersion.grantInvoke(configure_outputs_sfn.role); }); diff --git a/lib/workload/stateless/stacks/cttso-v2-pipeline-manager/deploy/stack.ts b/lib/workload/stateless/stacks/cttso-v2-pipeline-manager/deploy/stack.ts index c901920cb..a12c9defb 100644 --- a/lib/workload/stateless/stacks/cttso-v2-pipeline-manager/deploy/stack.ts +++ b/lib/workload/stateless/stacks/cttso-v2-pipeline-manager/deploy/stack.ts @@ -197,6 +197,24 @@ export class Cttsov2Icav2PipelineManagerStack extends cdk.Stack { }, }); + // Check success lambda + const check_success_lambda_function = new PythonFunction( + this, + 'check_success_lambda_function', + { + entry: path.join(__dirname, '../lambdas/check_success_py'), + runtime: lambda.Runtime.PYTHON_3_12, + architecture: lambda.Architecture.ARM_64, + index: 'check_success.py', + handler: 'handler', + memorySize: 1024, + timeout: Duration.seconds(60), + environment: { + ICAV2_ACCESS_TOKEN_SECRET_ID: icav2_access_token_secret_obj.secretName, + }, + } + ); + // Create the state machine to launch the nextflow workflow on ICAv2 const cttso_v2_launch_state_machine = new Cttsov2Icav2PipelineManagerConstruct(this, id, { /* Stack Objects */ @@ -211,6 +229,7 @@ export class Cttsov2Icav2PipelineManagerStack extends cdk.Stack { setOutputJsonLambdaObj: set_output_json_lambda_function, getVcfsLambdaObj: get_vcfs_lambda_function, compressVcfLambdaObj: compress_vcf_lambda_function, + checkSuccessSampleLambdaObj: check_success_lambda_function, /* Step function templates */ generateInputJsonSfnTemplatePath: path.join( __dirname, diff --git a/lib/workload/stateless/stacks/cttso-v2-pipeline-manager/lambdas/check_success_py/check_success.py b/lib/workload/stateless/stacks/cttso-v2-pipeline-manager/lambdas/check_success_py/check_success.py new file mode 100644 index 000000000..6229d7be7 --- /dev/null +++ b/lib/workload/stateless/stacks/cttso-v2-pipeline-manager/lambdas/check_success_py/check_success.py @@ -0,0 +1,323 @@ +#!/usr/bin/env python3 + +""" +Check success analysis results + +Success if: +1* Errors folder does not exist besides Logs_Intermediates and Results directories +2* If Results/MetricsOutput.tsv +""" + +# Standard imports +import json +import typing +from typing import Union +import logging +from pathlib import Path +from os import environ +import boto3 + +# Wrapica imports +from wrapica.project_data import ( + get_project_data_obj_from_project_id_and_path, + read_icav2_file_contents_to_string, + list_project_data_non_recursively, + convert_uri_to_project_data_obj, + ProjectData +) +from wrapica.enums import DataType + +# Set logger +logging.basicConfig(level=logging.INFO) +logger = logging.getLogger(__name__) + +if typing.TYPE_CHECKING: + from mypy_boto3_secretsmanager.client import SecretsManagerClient + from mypy_boto3_ssm.client import SSMClient + + +# Globals +ICAV2_BASE_URL = "https://ica.illumina.com/ica/rest" + + +# AWS things +def get_ssm_client() -> 'SSMClient': + """ + Return SSM client + """ + return boto3.client("ssm") + + +def get_secrets_manager_client() -> 'SecretsManagerClient': + """ + Return Secrets Manager client + """ + return boto3.client("secretsmanager") + + +def get_ssm_parameter_value(parameter_path) -> str: + """ + Get the ssm parameter value from the parameter path + :param parameter_path: + :return: + """ + return get_ssm_client().get_parameter(Name=parameter_path)["Parameter"]["Value"] + + +def get_secret(secret_arn: str) -> str: + """ + Return secret value + """ + return get_secrets_manager_client().get_secret_value(SecretId=secret_arn)["SecretString"] + + +# Set the icav2 environment variables +def set_icav2_env_vars(): + """ + Set the icav2 environment variables + :return: + """ + environ["ICAV2_BASE_URL"] = ICAV2_BASE_URL + environ["ICAV2_ACCESS_TOKEN"] = get_secret( + environ["ICAV2_ACCESS_TOKEN_SECRET_ID"] + ) + + +# Functions related to this script +def get_metrics_output_tsv(output_obj: ProjectData) -> str: + """ + Convert the following to a json + + DRAGEN TruSight Oncology 500 ctDNA v2.6.0 Analysis Software - Metrics Output + For Research Use Only. Not for use in diagnostic procedures. + + [Header] + Output Date 2024-09-15 + Output Time 02:46:34 + Pipeline Version 2.6.0.22 + + [Run QC Metrics] + Metric (UOM) LSL Guideline USL Guideline Value + PCT_Q30_R1 (%) NA NA NA + PCT_Q30_R2 (%) NA NA NA + + [Analysis Status] + L2401294 + COMPLETED_ALL_STEPS FALSE + FAILED_STEPS DragenCaller + STEPS_NOT_EXECUTED CoverageReports,TmbAnnotation,Tmb,CDxAnnotation,Contamination,DnaFusionFiltering + + [DNA Library QC Metrics] + Metric (UOM) LSL Guideline USL Guideline L2401294 + CONTAMINATION_SCORE (NA) 0 1227 NA + + [DNA Library QC Metrics for Small Variant Calling and TMB] + Metric (UOM) LSL Guideline USL Guideline L2401294 + MEDIAN_EXON_COVERAGE (count) 1300 NA NA + PCT_EXON_1000X (%) 80.0 NA NA + + [DNA Library QC Metrics for MSI and Fusions] + Metric (UOM) LSL Guideline USL Guideline L2401294 + MEDIAN_EXON_COVERAGE (count) 1300 NA NA + + [DNA Library QC Metrics for CNV Calling] + Metric (UOM) LSL Guideline USL Guideline L2401294 + GENE_SCALED_MAD (count) 0.000 0.059 NA + MEDIAN_BIN_COUNT_CNV_TARGET (count) 6.0 NA NA + + [DNA Expanded Metrics] + Metric (UOM) LSL Guideline USL Guideline L2401294 + TOTAL_PF_READS (count) NA NA NA + MEAN_FAMILY_SIZE (count) NA NA NA + MEDIAN_TARGET_COVERAGE (count) NA NA NA + PCT_CHIMERIC_READS (%) NA NA NA + PCT_EXON_500X (%) NA NA NA + PCT_EXON_1500X (%) NA NA NA + PCT_READ_ENRICHMENT (%) NA NA NA + PCT_USABLE_UMI_READS (%) NA NA NA + MEAN_TARGET_COVERAGE (count) NA NA NA + PCT_ALIGNED_READS (%) NA NA NA + PCT_CONTAMINATION_EST (%) NA NA NA + PCT_TARGET_0.4X_MEAN (%) NA NA NA + PCT_TARGET_500X (%) NA NA NA + PCT_TARGET_1000X (%) NA NA NA + PCT_TARGET_1500X (%) NA NA NA + PCT_DUPLEXFAMILIES (%) NA NA NA + MEDIAN_INSERT_SIZE (bp) NA NA NA + MAX_SOMATIC_AF (NA) NA NA NA + PCT_SOFT_CLIPPED_BASES (%) NA NA NA + PCT_Q30_BASES (%) NA NA NA + + [Notes] + Run Metrics Run Metrics are not generated and values are reported as NA when starting analysis from FASTQ files. + DNA Library QC Metrics DNA library QC Metrics are evaluated using contamination score + DNA Library QC Metrics for CNV Calling GENE_SCALED_MAD LSL guideline only applies to real cell free DNA. + DNA Library QC Metrics for Small Variant Calling and TMB MEDIAN_EXON_COVERAGE is a Fusion QC Metric. + + :return: + """ + + # Extend projectdata object to the MetricsOutput.tsv + metrics_output_project_data_obj = get_project_data_obj_from_project_id_and_path( + project_id=output_obj.project_id, + data_path=Path(output_obj.data.details.path) / 'Results/MetricsOutput.tsv', + data_type=DataType.FILE + ) + + # Read the contents of the MetricsOutput.tsv + metrics_output_tsv = read_icav2_file_contents_to_string( + metrics_output_project_data_obj.project_id, + metrics_output_project_data_obj.data.id + ) + + return metrics_output_tsv + + +def check_failed_steps(output_obj: ProjectData) -> bool: + """ + Returns True if the analysis has failed steps + :param output_obj: + :return: + """ + metrics_output_tsv_str = get_metrics_output_tsv(output_obj) + + if 'FAILED_STEPS\tNA' in metrics_output_tsv_str: + return False + return True + + +def check_errors_folder(output_obj: ProjectData) -> Union[ProjectData, bool]: + """ + Returns True if the Errors folder exists + :param output_obj: + :return: + """ + + try: + errors_folder_project_data_obj = get_project_data_obj_from_project_id_and_path( + project_id=output_obj.project_id, + data_path=Path(output_obj.data.details.path) / 'errors', + data_type=DataType.FOLDER + ) + except NotADirectoryError: + return False + + return errors_folder_project_data_obj + + +def get_workflow_step_of_failure(error_folder_obj: ProjectData) -> str: + """ + Get the first file in the Errors folder that ends with .json + + Read the json file and return the 'step' attribute value + + :param error_folder_obj: + :return: + """ + + # Get the first file in the Errors folder that ends with .json + error_json_file = next( + filter( + lambda json_file_iter: json_file_iter.data.details.path.endswith('.json'), + list_project_data_non_recursively( + project_id=error_folder_obj.project_id, + parent_folder_id=error_folder_obj.data.id, + ) + ) + ) + + # Read the json file + error_json_file_contents_dict = json.loads( + read_icav2_file_contents_to_string( + error_json_file.project_id, + error_json_file.data.id + ) + ) + + # Return the 'step' attribute value + return error_json_file_contents_dict['step'] + + + +def handler(event, context): + """ + Check success analysis results - + + Passes if FAILED_STEPS\tNA in MetricsOutput.tsv and Errors folder does not exist + + :param event: + :param context: + :return: + """ + # Set icav2 environment variables + set_icav2_env_vars() + + # Get output uri from event + output_uri = event['output_uri'] + + # Convert uri to project data object + output_obj = convert_uri_to_project_data_obj(output_uri) + + # Check for failed steps in MetricsOutput.tsv + has_failed_steps = check_failed_steps(output_obj) + + # If failed steps is false, we can return success + if not has_failed_steps: + return { + 'success': True, + 'message': 'Analysis completed successfully' + } + + # If failed steps is true, find the errors folder + errors_folder = check_errors_folder(output_obj) + + if not errors_folder: + logger.error("Errors folder not found, but workflow failed") + raise Exception("Errors folder not found, but workflow failed") + + # Get the workflow step of failure + errors_folder: ProjectData + return { + 'success': False, + 'message': f"Workflow failed at '{get_workflow_step_of_failure(errors_folder)}' step" + } + +# Failed workflow +# if __name__ == "__main__": +# environ['ICAV2_ACCESS_TOKEN_SECRET_ID'] = "ICAv2JWTKey-umccr-prod-service-production" +# print( +# json.dumps( +# handler( +# { +# "output_uri": "s3://pipeline-prod-cache-503977275616-ap-southeast-2/byob-icav2/production/analysis/cttsov2/202409156f4e1c52/" +# }, +# None +# ), +# indent=4 +# ) +# ) +# +# # { +# # "success": false, +# # "message": "Workflow failed at 'DragenCaller' step" +# # } + +# Passing workflow +# if __name__ == "__main__": +# environ['ICAV2_ACCESS_TOKEN_SECRET_ID'] = "ICAv2JWTKey-umccr-prod-service-production" +# print( +# json.dumps( +# handler( +# { +# "output_uri": "s3://pipeline-prod-cache-503977275616-ap-southeast-2/byob-icav2/production/analysis/cttsov2/202409151d85b3c4/" +# }, +# None +# ), +# indent=4 +# ) +# ) +# +# # { +# # "success": true, +# # "message": "Analysis completed successfully" +# # } diff --git a/lib/workload/stateless/stacks/cttso-v2-pipeline-manager/lambdas/check_success_py/requirements.txt b/lib/workload/stateless/stacks/cttso-v2-pipeline-manager/lambdas/check_success_py/requirements.txt new file mode 100644 index 000000000..25d5e5f58 --- /dev/null +++ b/lib/workload/stateless/stacks/cttso-v2-pipeline-manager/lambdas/check_success_py/requirements.txt @@ -0,0 +1 @@ +wrapica>=2.27.1.post20240830140737 diff --git a/lib/workload/stateless/stacks/cttso-v2-pipeline-manager/step_functions_templates/set_cttso_v2_nf_outputs.asl.json b/lib/workload/stateless/stacks/cttso-v2-pipeline-manager/step_functions_templates/set_cttso_v2_nf_outputs.asl.json index f4f5e201f..368797f11 100644 --- a/lib/workload/stateless/stacks/cttso-v2-pipeline-manager/step_functions_templates/set_cttso_v2_nf_outputs.asl.json +++ b/lib/workload/stateless/stacks/cttso-v2-pipeline-manager/step_functions_templates/set_cttso_v2_nf_outputs.asl.json @@ -28,7 +28,6 @@ "Delete Cache Uri": { "Type": "Task", "Resource": "arn:aws:states:::lambda:invoke", - "OutputPath": "$.Payload", "Parameters": { "FunctionName": "${__delete_cache_uri_lambda_function_arn__}", "Payload": { @@ -50,21 +49,21 @@ } ], "TimeoutSeconds": 60, + "ResultPath": null, "End": true } } }, { - "StartAt": "Set outputs from analysis uri", + "StartAt": "Check successful analysis", "States": { - "Set outputs from analysis uri": { + "Check successful analysis": { "Type": "Task", "Resource": "arn:aws:states:::lambda:invoke", "Parameters": { - "FunctionName": "${__set_outputs_json_lambda_function_arn__}", + "FunctionName": "${__check_successful_analysis_lambda_function_arn__}", "Payload": { - "sample_id.$": "$.get_db_attributes_step.ready_event_data_inputs.sampleId", - "analysis_output_uri.$": "$.get_db_attributes_step.engine_parameters.outputUri" + "output_uri.$": "$.get_db_attributes_step.engine_parameters.outputUri" } }, "Retry": [ @@ -80,19 +79,172 @@ "BackoffRate": 2 } ], - "TimeoutSeconds": 60, "ResultSelector": { - "output_json": { - "resultsDir.$": "$.Payload.results_dir", - "logsIntermediatesDir.$": "$.Payload.logs_intermediates_dir", - "tso500NextflowLogs.$": "$.Payload.nextflow_logs_dir", - "samplePassed.$": "$.Payload.sample_passed" - } + "workflow_success.$": "$.Payload.success", + "error_message.$": "$.Payload.error_message" }, - "ResultPath": "$.analysis_outputs_step", - "Next": "Update Database entry" + "ResultPath": "$.check_successful_analysis_step", + "Next": "Is successful analysis" + }, + "Is successful analysis": { + "Type": "Choice", + "Choices": [ + { + "Variable": "$.check_successful_analysis_step.workflow_success", + "BooleanEquals": false, + "Next": "Update Database with error status" + } + ], + "Default": "Set Outputs JSON" + }, + "Set Outputs JSON": { + "Type": "Parallel", + "Branches": [ + { + "StartAt": "Set outputs from analysis uri", + "States": { + "Set outputs from analysis uri": { + "Type": "Task", + "Resource": "arn:aws:states:::lambda:invoke", + "Parameters": { + "FunctionName": "${__set_outputs_json_lambda_function_arn__}", + "Payload": { + "sample_id.$": "$.get_db_attributes_step.ready_event_data_inputs.sampleId", + "analysis_output_uri.$": "$.get_db_attributes_step.engine_parameters.outputUri" + } + }, + "Retry": [ + { + "ErrorEquals": [ + "Lambda.ServiceException", + "Lambda.AWSLambdaException", + "Lambda.SdkClientException", + "Lambda.TooManyRequestsException" + ], + "IntervalSeconds": 1, + "MaxAttempts": 3, + "BackoffRate": 2 + } + ], + "TimeoutSeconds": 60, + "ResultSelector": { + "output_json": { + "resultsDir.$": "$.Payload.results_dir", + "logsIntermediatesDir.$": "$.Payload.logs_intermediates_dir", + "tso500NextflowLogs.$": "$.Payload.nextflow_logs_dir", + "samplePassed.$": "$.Payload.sample_passed" + } + }, + "ResultPath": "$.analysis_outputs_step", + "Next": "Update Database entry" + }, + "Update Database entry": { + "Type": "Task", + "Resource": "arn:aws:states:::dynamodb:updateItem", + "Parameters": { + "TableName": "${__table_name__}", + "Key": { + "id.$": "$.portal_run_id", + "id_type": "portal_run_id" + }, + "UpdateExpression": "SET analysis_output = :output_json", + "ExpressionAttributeValues": { + ":output_json": { + "S.$": "States.JsonToString($.analysis_outputs_step.output_json)" + } + } + }, + "ResultPath": "$.update_entry_post_launch_step", + "Next": "Wait 1 Second (Post database-updates)" + }, + "Wait 1 Second (Post database-updates)": { + "Type": "Wait", + "Seconds": 1, + "Comment": "Wait for databases to sync before continuing", + "End": true + } + } + }, + { + "StartAt": "Find all vcf files in the output directory", + "States": { + "Find all vcf files in the output directory": { + "Type": "Task", + "Resource": "arn:aws:states:::lambda:invoke", + "Parameters": { + "Payload": { + "icav2_uri.$": "States.Format('{}/Results/', $.get_db_attributes_step.engine_parameters.outputUri)" + }, + "FunctionName": "${__find_all_vcf_files_lambda_function_arn__}" + }, + "Retry": [ + { + "ErrorEquals": [ + "Lambda.ServiceException", + "Lambda.AWSLambdaException", + "Lambda.SdkClientException", + "Lambda.TooManyRequestsException" + ], + "IntervalSeconds": 1, + "MaxAttempts": 3, + "BackoffRate": 2 + } + ], + "ResultSelector": { + "vcf_files_list.$": "$.Payload.vcf_icav2_uri_list" + }, + "ResultPath": "$.get_vcf_files_step", + "Next": "For each vcf file" + }, + "For each vcf file": { + "Type": "Map", + "ItemsPath": "$.get_vcf_files_step.vcf_files_list", + "ItemProcessor": { + "ProcessorConfig": { + "Mode": "INLINE" + }, + "StartAt": "Compress vcf file", + "States": { + "Compress vcf file": { + "Type": "Task", + "Resource": "arn:aws:states:::lambda:invoke", + "Parameters": { + "FunctionName": "${__compress_vcf_file_lambda_function_arn__}", + "Payload": { + "vcf_icav2_uri.$": "$.vcf_icav2_uri" + } + }, + "Retry": [ + { + "ErrorEquals": [ + "Lambda.ServiceException", + "Lambda.AWSLambdaException", + "Lambda.SdkClientException", + "Lambda.TooManyRequestsException" + ], + "IntervalSeconds": 1, + "MaxAttempts": 3, + "BackoffRate": 2 + } + ], + "ResultPath": null, + "End": true + } + } + }, + "ResultPath": null, + "End": true, + "ItemSelector": { + "vcf_icav2_uri.$": "$$.Map.Item.Value" + } + } + } + } + ], + "ResultPath": null, + "End": true }, - "Update Database entry": { + "Update Database with error status": { "Type": "Task", "Resource": "arn:aws:states:::dynamodb:updateItem", "Parameters": { @@ -101,95 +253,16 @@ "id.$": "$.portal_run_id", "id_type": "portal_run_id" }, - "UpdateExpression": "SET analysis_output = :output_json", + "UpdateExpression": "SET error_message = :error_message, analysis_status = :analysis_status", "ExpressionAttributeValues": { - ":output_json": { - "S.$": "States.JsonToString($.analysis_outputs_step.output_json)" + ":error_message": { + "S.$": "$.check_successful_analysis_step.error_message" + }, + ":analysis_status": { + "S": "FAILED" } - } - }, - "ResultPath": "$.update_entry_post_launch_step", - "Next": "Wait 1 Second (Post database-updates)" - }, - "Wait 1 Second (Post database-updates)": { - "Type": "Wait", - "Seconds": 1, - "Comment": "Wait for databases to sync before continuing", - "End": true - } - } - }, - { - "StartAt": "Find all vcf files in the output directory", - "States": { - "Find all vcf files in the output directory": { - "Type": "Task", - "Resource": "arn:aws:states:::lambda:invoke", - "Parameters": { - "Payload": { - "icav2_uri.$": "States.Format('{}/Results/', $.get_db_attributes_step.engine_parameters.outputUri)" - }, - "FunctionName": "${__find_all_vcf_files_lambda_function_arn__}" - }, - "Retry": [ - { - "ErrorEquals": [ - "Lambda.ServiceException", - "Lambda.AWSLambdaException", - "Lambda.SdkClientException", - "Lambda.TooManyRequestsException" - ], - "IntervalSeconds": 1, - "MaxAttempts": 3, - "BackoffRate": 2 - } - ], - "ResultSelector": { - "vcf_files_list.$": "$.Payload.vcf_icav2_uri_list" - }, - "ResultPath": "$.get_vcf_files_step", - "Next": "For each vcf file" - }, - "For each vcf file": { - "Type": "Map", - "ItemsPath": "$.get_vcf_files_step.vcf_files_list", - "ItemProcessor": { - "ProcessorConfig": { - "Mode": "INLINE" }, - "StartAt": "Compress vcf file", - "States": { - "Compress vcf file": { - "Type": "Task", - "Resource": "arn:aws:states:::lambda:invoke", - "Parameters": { - "FunctionName": "${__compress_vcf_file_lambda_function_arn__}", - "Payload": { - "vcf_icav2_uri.$": "$.vcf_icav2_uri" - } - }, - "Retry": [ - { - "ErrorEquals": [ - "Lambda.ServiceException", - "Lambda.AWSLambdaException", - "Lambda.SdkClientException", - "Lambda.TooManyRequestsException" - ], - "IntervalSeconds": 1, - "MaxAttempts": 3, - "BackoffRate": 2 - } - ], - "ResultPath": null, - "End": true - } - } - }, - "ResultPath": null, - "End": true, - "ItemSelector": { - "vcf_icav2_uri.$": "$$.Map.Item.Value" + "End": true } } }