From 9dc91a170361fe40d658d3c910da91de02c8cb1a Mon Sep 17 00:00:00 2001 From: Alexis Lucattini Date: Wed, 20 Nov 2024 21:05:57 +1100 Subject: [PATCH] Add ora compression to cttsov2 workflow --- .../ora_decompression/docker-entrypoint.sh | 23 +- .../constructs/cttsov2-icav2-manager/index.ts | 13 + .../cttso-v2-pipeline-manager/deploy/stack.ts | 48 +++ .../check_fastq_list_row_is_ora.py | 30 ++ .../convert_ora_to_cache_uri_gz_path.py | 58 +++ .../set_cttso_v2_nf_inputs.asl.json | 343 ++++++++++++------ 6 files changed, 394 insertions(+), 121 deletions(-) create mode 100644 lib/workload/stateless/stacks/cttso-v2-pipeline-manager/lambdas/check_fastq_list_row_is_ora_py/check_fastq_list_row_is_ora.py create mode 100644 lib/workload/stateless/stacks/cttso-v2-pipeline-manager/lambdas/convert_ora_to_cache_uri_gz_path_py/convert_ora_to_cache_uri_gz_path.py diff --git a/lib/workload/components/ora-file-decompression-fq-pair-sfn/tasks/ora_decompression/docker-entrypoint.sh b/lib/workload/components/ora-file-decompression-fq-pair-sfn/tasks/ora_decompression/docker-entrypoint.sh index 36ab5e75b..2c178947b 100644 --- a/lib/workload/components/ora-file-decompression-fq-pair-sfn/tasks/ora_decompression/docker-entrypoint.sh +++ b/lib/workload/components/ora-file-decompression-fq-pair-sfn/tasks/ora_decompression/docker-entrypoint.sh @@ -89,6 +89,28 @@ else exit 1 fi + # If the estimated gz file size is -1, we need to do a double extraction + # Since we do not have the space to store the gz file and then upload it + if [[ "${ESTIMATED_GZ_FILE_SIZE}" == "-1" ]]; then + echo "$(date -Iseconds): Estimated file gz file size is -1, we need to do a double extraction to get the file size" 1>&2 + ESTIMATED_GZ_FILE_SIZE="$( \ + wget \ + --quiet \ + --output-document - \ + "$( \ + python3 scripts/get_icav2_download_url.py \ + "${INPUT_URI}" + )" | \ + /usr/local/bin/orad \ + --gzip \ + --stdout \ + --ora-reference "${ORADATA_PATH}" \ + - | \ + wc -c \ + )" + echo "$(date -Iseconds): Estimated gz file size is ${ESTIMATED_GZ_FILE_SIZE}" 1>&2 + fi + # Set AWS credentials access for aws s3 cp echo "$(date -Iseconds): Collecting the AWS S3 Access credentials" 1>&2 aws_s3_access_creds_json_str="$( \ @@ -145,4 +167,3 @@ else echo "$(date -Iseconds): Stream and upload of decompression complete" 1>&2 fi - diff --git a/lib/workload/stateless/stacks/cttso-v2-pipeline-manager/deploy/constructs/cttsov2-icav2-manager/index.ts b/lib/workload/stateless/stacks/cttso-v2-pipeline-manager/deploy/constructs/cttsov2-icav2-manager/index.ts index 1a938e25d..256d1634b 100644 --- a/lib/workload/stateless/stacks/cttso-v2-pipeline-manager/deploy/constructs/cttsov2-icav2-manager/index.ts +++ b/lib/workload/stateless/stacks/cttso-v2-pipeline-manager/deploy/constructs/cttsov2-icav2-manager/index.ts @@ -41,6 +41,8 @@ interface Cttsov2Icav2PipelineManagerConstructProps { generateCopyManifestDictLambdaObj: PythonFunction; checkNumRunningSfnsLambdaObj: PythonFunction; getRandomNumberLambdaObj: PythonFunction; + checkFastqListRowIsOraLambdaObj: PythonFunction; + convertOraToCacheUriGzPathLambdaObj: PythonFunction; // SFN Output lambdas deleteCacheUriLambdaObj: PythonFunction; setOutputJsonLambdaObj: PythonFunction; @@ -49,6 +51,8 @@ interface Cttsov2Icav2PipelineManagerConstructProps { checkSuccessSampleLambdaObj: PythonFunction; // ICAv2 Copy Batch State Machine Object icav2CopyFilesStateMachineObj: sfn.IStateMachine; + // ORA Decompression Statemachine Object + oraDecompressionStateMachineObj: sfn.IStateMachine; } export class Cttsov2Icav2PipelineManagerConstruct extends Construct { @@ -84,9 +88,15 @@ export class Cttsov2Icav2PipelineManagerConstruct extends Construct { props.getRandomNumberLambdaObj.currentVersion.functionArn, __check_number_of_copy_jobs_running_lambda_function_arn__: props.checkNumRunningSfnsLambdaObj.currentVersion.functionArn, + __fastq_list_rows_are_ora_lambda_function_arn__: + props.checkFastqListRowIsOraLambdaObj.currentVersion.functionArn, + __convert_ora_uri_to_gz_cache_uri_lambda_function_arn__: + props.convertOraToCacheUriGzPathLambdaObj.currentVersion.functionArn, /* Subfunction state machines */ __copy_icav2_files_state_machine_arn__: props.icav2CopyFilesStateMachineObj.stateMachineArn, + __ora_fastq_list_row_decompression_sfn_arn__: + props.oraDecompressionStateMachineObj.stateMachineArn, /* Dynamodb tables */ __table_name__: props.dynamodbTableObj.tableName, }, @@ -99,6 +109,8 @@ export class Cttsov2Icav2PipelineManagerConstruct extends Construct { props.uploadSamplesheetToCacheDirLambdaObj, props.getRandomNumberLambdaObj, props.checkNumRunningSfnsLambdaObj, + props.checkFastqListRowIsOraLambdaObj, + props.convertOraToCacheUriGzPathLambdaObj, ].forEach((lambda_obj) => { lambda_obj.currentVersion.grantInvoke(configureInputsSfn); }); @@ -109,6 +121,7 @@ export class Cttsov2Icav2PipelineManagerConstruct extends Construct { // Add state machine execution permissions to stateMachine role props.icav2CopyFilesStateMachineObj.grantStartExecution(configureInputsSfn); props.icav2CopyFilesStateMachineObj.grantRead(configureInputsSfn); + props.oraDecompressionStateMachineObj.grantStartExecution(configureInputsSfn); // Because we run a nested state machine, we need to add the permissions to the state machine role // See https://stackoverflow.com/questions/60612853/nested-step-function-in-a-step-function-unknown-error-not-authorized-to-cr diff --git a/lib/workload/stateless/stacks/cttso-v2-pipeline-manager/deploy/stack.ts b/lib/workload/stateless/stacks/cttso-v2-pipeline-manager/deploy/stack.ts index da35eabdb..0da4a4e8e 100644 --- a/lib/workload/stateless/stacks/cttso-v2-pipeline-manager/deploy/stack.ts +++ b/lib/workload/stateless/stacks/cttso-v2-pipeline-manager/deploy/stack.ts @@ -11,6 +11,7 @@ import { PythonFunction } from '@aws-cdk/aws-lambda-python-alpha'; import * as lambda from 'aws-cdk-lib/aws-lambda'; import { Duration } from 'aws-cdk-lib'; import { DockerImageCode, DockerImageFunction } from 'aws-cdk-lib/aws-lambda'; +import { OraDecompressionConstruct } from '../../../../components/ora-file-decompression-fq-pair-sfn'; export interface Cttsov2Icav2PipelineManagerConfig { /* ICAv2 Pipeline analysis essentials */ @@ -67,6 +68,16 @@ export class Cttsov2Icav2PipelineManagerStack extends cdk.Stack { } ); + // Get the ora decompression construct + const oraDecompressionStateMachineObj = new OraDecompressionConstruct( + this, + 'ora_decompression_state_machine_obj', + { + icav2AccessTokenSecretId: icav2AccessTokenSecretObj.secretName, + sfnPrefix: props.stateMachinePrefix, + } + ); + // Set ssm parameter object list const pipelineIdSsmObjList = ssm.StringParameter.fromStringParameterName( this, @@ -151,6 +162,40 @@ export class Cttsov2Icav2PipelineManagerStack extends cdk.Stack { } ); + const checkFastqListRowIsOraLambdaObj = new PythonFunction( + this, + 'check_fastq_list_row_is_ora_lambda_python_function', + { + entry: path.join(__dirname, '../lambdas/check_fastq_list_row_is_ora_py'), + runtime: lambda.Runtime.PYTHON_3_12, + architecture: lambda.Architecture.ARM_64, + index: 'check_fastq_list_row_is_ora.py', + handler: 'handler', + memorySize: 1024, + timeout: Duration.seconds(60), + environment: { + ICAV2_ACCESS_TOKEN_SECRET_ID: icav2AccessTokenSecretObj.secretName, + }, + } + ); + + const convertOraToCacheUriGzPathLambdaObj = new PythonFunction( + this, + 'convert_ora_to_cache_uri_gz_path_lambda_python_function', + { + entry: path.join(__dirname, '../lambdas/convert_ora_to_cache_uri_gz_path_py'), + runtime: lambda.Runtime.PYTHON_3_12, + architecture: lambda.Architecture.ARM_64, + index: 'convert_ora_to_cache_uri_gz_path.py', + handler: 'handler', + memorySize: 1024, + timeout: Duration.seconds(60), + environment: { + ICAV2_ACCESS_TOKEN_SECRET_ID: icav2AccessTokenSecretObj.secretName, + }, + } + ); + /* Part 2: Build lambdas for output json generation */ @@ -245,12 +290,15 @@ export class Cttsov2Icav2PipelineManagerStack extends cdk.Stack { dynamodbTableObj: dynamodbTableObj, icav2AccessTokenSecretObj: icav2AccessTokenSecretObj, icav2CopyFilesStateMachineObj: icav2CopyFilesStateMachineObj.icav2CopyFilesSfnObj, + oraDecompressionStateMachineObj: oraDecompressionStateMachineObj.sfnObject, pipelineIdSsmObj: pipelineIdSsmObjList, /* Lambdas paths */ uploadSamplesheetToCacheDirLambdaObj: uploadSamplesheetToCacheDirLambdaObj, // __dirname + '/../../../lambdas/upload_samplesheet_to_cache_dir_py' generateCopyManifestDictLambdaObj: generateCopyManifestDictLambdaObj, // __dirname + '/../../../lambdas/generate_copy_manifest_dict_py' getRandomNumberLambdaObj: getRandomNumberLambdaObj, checkNumRunningSfnsLambdaObj: checkNumRunningSfns, + convertOraToCacheUriGzPathLambdaObj: convertOraToCacheUriGzPathLambdaObj, + checkFastqListRowIsOraLambdaObj: checkFastqListRowIsOraLambdaObj, deleteCacheUriLambdaObj: deleteCacheUriLambdaFunction, setOutputJsonLambdaObj: setOutputJsonLambdaFunction, getVcfsLambdaObj: getVcfsLambdaFunction, diff --git a/lib/workload/stateless/stacks/cttso-v2-pipeline-manager/lambdas/check_fastq_list_row_is_ora_py/check_fastq_list_row_is_ora.py b/lib/workload/stateless/stacks/cttso-v2-pipeline-manager/lambdas/check_fastq_list_row_is_ora_py/check_fastq_list_row_is_ora.py new file mode 100644 index 000000000..f033d4b90 --- /dev/null +++ b/lib/workload/stateless/stacks/cttso-v2-pipeline-manager/lambdas/check_fastq_list_row_is_ora_py/check_fastq_list_row_is_ora.py @@ -0,0 +1,30 @@ +#!/usr/bin/env python3 + +""" +Check fastq list row is ora +""" + + +def handler(event, context): + """ + Collect the read1FileUri and read2FileUri from the fastq list and check if they are in the ora format, + return True if they are, False otherwise + :param event: + :param context: + :return: + """ + + # Get the fastq list from the event + fastq_list_row = event['fastq_list_row'] + + # Check if the read1FileUri and read2FileUri are in the ora format + if fastq_list_row.get("read1FileUri").endswith(".ora") and fastq_list_row.get("read2FileUri").endswith(".ora"): + return { + "is_ora": True + } + elif fastq_list_row.get("read1FileUri").endswith(".gz") and fastq_list_row.get("read2FileUri").endswith(".gz"): + return { + "is_ora": False + } + else: + raise ValueError("The read1FileUri and read2FileUri need to be in the same format") \ No newline at end of file diff --git a/lib/workload/stateless/stacks/cttso-v2-pipeline-manager/lambdas/convert_ora_to_cache_uri_gz_path_py/convert_ora_to_cache_uri_gz_path.py b/lib/workload/stateless/stacks/cttso-v2-pipeline-manager/lambdas/convert_ora_to_cache_uri_gz_path_py/convert_ora_to_cache_uri_gz_path.py new file mode 100644 index 000000000..f5e42a949 --- /dev/null +++ b/lib/workload/stateless/stacks/cttso-v2-pipeline-manager/lambdas/convert_ora_to_cache_uri_gz_path_py/convert_ora_to_cache_uri_gz_path.py @@ -0,0 +1,58 @@ +#!/usr/bin/env python + +""" +Given a fastq list row in ora format, a cache uri and a sample id, + +Determine the output gzip path for the fastq files + +Returns read_1_gz_output_uri and read_2_gz_output_uri +""" + +from urllib.parse import (urlparse, urlunparse) +from pathlib import Path + +def extend_url(url, path_ext: str) -> str: + """ + Extend the url path with the path_ext + """ + url_obj = urlparse(url) + + return str( + urlunparse( + ( + url_obj.scheme, + url_obj.netloc, + str(Path(url_obj.path) / path_ext), + url_obj.params, + url_obj.query, + url_obj.fragment + ) + ) + ) + + +def handler(event, context): + # Get the input event + cache_uri = event['cache_uri'] + + # Get the input event + sample_id = event['sample_id'] + + # Get the input event + fastq_list_row = event['fastq_list_row'] + read_1_ora_file_uri = fastq_list_row['read1FileUri'] + read_2_ora_file_uri = fastq_list_row['read2FileUri'] + + # Extend the cache uri to include the sample id + sample_cache_uri = extend_url(cache_uri, sample_id) + + # Get the file name from the ora file uri + # And replace the .ora extension with .gz + read_1_file_name = Path(read_1_ora_file_uri).name.replace('.ora', '.gz') + read_2_file_name = Path(read_2_ora_file_uri).name.replace('.ora', '.gz') + + # Get the output uri for the gz files + return { + 'read_1_gz_output_uri': extend_url(sample_cache_uri, read_1_file_name), + 'read_2_gz_output_uri': extend_url(sample_cache_uri, read_2_file_name) + } diff --git a/lib/workload/stateless/stacks/cttso-v2-pipeline-manager/step_functions_templates/set_cttso_v2_nf_inputs.asl.json b/lib/workload/stateless/stacks/cttso-v2-pipeline-manager/step_functions_templates/set_cttso_v2_nf_inputs.asl.json index e1962f777..819a28acd 100644 --- a/lib/workload/stateless/stacks/cttso-v2-pipeline-manager/step_functions_templates/set_cttso_v2_nf_inputs.asl.json +++ b/lib/workload/stateless/stacks/cttso-v2-pipeline-manager/step_functions_templates/set_cttso_v2_nf_inputs.asl.json @@ -79,130 +79,233 @@ } }, { - "StartAt": "Generate Copy Manifest", + "StartAt": "For each fastq list row", "States": { - "Generate Copy Manifest": { - "Type": "Task", - "Resource": "arn:aws:states:::lambda:invoke", - "Parameters": { - "FunctionName": "${__generate_copy_manifest_dict__}", - "Payload": { - "sample_id.$": "$.get_ready_event_step.inputs.sampleId", - "cache_uri.$": "$.get_ready_event_step.engine_parameters.cacheUri", - "fastq_list_rows.$": "$.get_ready_event_step.inputs.fastqListRows" - } - }, - "Retry": [ - { - "ErrorEquals": [ - "Lambda.ServiceException", - "Lambda.AWSLambdaException", - "Lambda.SdkClientException", - "Lambda.TooManyRequestsException", - "States.TaskFailed" - ], - "IntervalSeconds": 60, - "MaxAttempts": 3, - "BackoffRate": 2 - } - ], - "Next": "Get Variable number of seconds", - "Comment": "Generate a copy manifest object, ready to parse into the icav2 copy batch utility step function\n\nWe expect the following inputs:\n\n* cache_path\n* project_id\n* sample_id\n* fastq_list_rows\n\nAnd we expect the following outputs:\n\n* manifest", - "ResultSelector": { - "dest_uri.$": "$.Payload.dest_uri", - "source_uris.$": "$.Payload.source_uris" - }, - "ResultPath": "$.generate_copy_manifest_dict_step" - }, - "Get Variable number of seconds": { - "Type": "Task", - "Resource": "arn:aws:states:::lambda:invoke", - "Parameters": { - "FunctionName": "${__get_variable_number_of_seconds_lambda_function_arn__}" - }, - "Retry": [ - { - "ErrorEquals": [ - "Lambda.ServiceException", - "Lambda.AWSLambdaException", - "Lambda.SdkClientException", - "Lambda.TooManyRequestsException", - "States.TaskFailed" - ], - "IntervalSeconds": 60, - "MaxAttempts": 3, - "BackoffRate": 2 - } - ], - "ResultSelector": { - "num_seconds.$": "$.Payload.random_number" - }, - "ResultPath": "$.get_variable_seconds_step", - "Next": "Wait a variable amount of time" - }, - "Wait a variable amount of time": { - "Type": "Wait", - "SecondsPath": "$.get_variable_seconds_step.num_seconds", - "Next": "Check Running Copy Fastq Job Number" - }, - "Check Running Copy Fastq Job Number": { - "Type": "Task", - "Resource": "arn:aws:states:::lambda:invoke", - "Parameters": { - "FunctionName": "${__check_number_of_copy_jobs_running_lambda_function_arn__}" + "For each fastq list row": { + "Type": "Map", + "ItemsPath": "$.get_ready_event_step.inputs.fastqListRows", + "ItemSelector": { + "fastq_list_row.$": "$$.Map.Item.Value", + "sample_id.$": "$.get_ready_event_step.inputs.sampleId", + "cache_uri.$": "$.get_ready_event_step.engine_parameters.cacheUri" }, - "Retry": [ - { - "ErrorEquals": [ - "Lambda.ServiceException", - "Lambda.AWSLambdaException", - "Lambda.SdkClientException", - "Lambda.TooManyRequestsException", - "States.TaskFailed" - ], - "IntervalSeconds": 60, - "MaxAttempts": 3, - "BackoffRate": 2 + "ItemProcessor": { + "ProcessorConfig": { + "Mode": "INLINE" + }, + "StartAt": "Check fastq list rows are ora", + "States": { + "Check fastq list rows are ora": { + "Type": "Task", + "Resource": "arn:aws:states:::lambda:invoke", + "Parameters": { + "FunctionName": "${__fastq_list_rows_are_ora_lambda_function_arn__}", + "Payload": { + "fastq_list_row.$": "$.fastq_list_row" + } + }, + "Retry": [ + { + "ErrorEquals": [ + "Lambda.ServiceException", + "Lambda.AWSLambdaException", + "Lambda.SdkClientException", + "Lambda.TooManyRequestsException" + ], + "IntervalSeconds": 1, + "MaxAttempts": 3, + "BackoffRate": 2, + "JitterStrategy": "FULL" + } + ], + "ResultSelector": { + "is_ora.$": "$.Payload.is_ora" + }, + "ResultPath": "$.is_ora_step", + "Next": "Is ORA FQ Pair" + }, + "Is ORA FQ Pair": { + "Type": "Choice", + "Choices": [ + { + "Variable": "$.is_ora_step.is_ora", + "BooleanEquals": true, + "Next": "Convert ORA to Cache URI GZ", + "Comment": "Decompress ORA" + } + ], + "Default": "Generate Copy Manifest" + }, + "Convert ORA to Cache URI GZ": { + "Type": "Task", + "Resource": "arn:aws:states:::lambda:invoke", + "Parameters": { + "FunctionName": "${__convert_ora_uri_to_gz_cache_uri_lambda_function_arn__}", + "Payload": { + "sample_id.$": "$.sample_id", + "fastq_list_row.$": "$.fastq_list_row", + "cache_uri.$": "$.cache_uri" + } + }, + "Retry": [ + { + "ErrorEquals": [ + "Lambda.ServiceException", + "Lambda.AWSLambdaException", + "Lambda.SdkClientException", + "Lambda.TooManyRequestsException" + ], + "IntervalSeconds": 1, + "MaxAttempts": 3, + "BackoffRate": 2, + "JitterStrategy": "FULL" + } + ], + "ResultPath": "$.get_ora_cache_uri_step", + "Next": "ORA Decompress Fastqs" + }, + "ORA Decompress Fastqs": { + "Type": "Task", + "Resource": "arn:aws:states:::states:startExecution.sync:2", + "Parameters": { + "StateMachineArn": "${__ora_fastq_list_row_decompression_sfn_arn__}", + "Input": { + "read1OraFileUri.$": "$.fastq_list_row.read1FileUri", + "read2OraFileUri.$": "$.fastq_list_row.read2FileUri", + "read1GzOutputFileUri.$": "$.get_ora_cache_uri_step.read_1_gz_output_uri", + "read2GzOutputFileUri.$": "$.get_ora_cache_uri_step.read_2_gz_output_uri", + "read1EstimatedGzFileSize": -1, + "read2EstimatedGzFileSize": -1 + } + }, + "End": true + }, + "Generate Copy Manifest": { + "Type": "Task", + "Resource": "arn:aws:states:::lambda:invoke", + "Parameters": { + "FunctionName": "${__generate_copy_manifest_dict__}", + "Payload": { + "sample_id.$": "$.sample_id", + "cache_uri.$": "$.cache_uri", + "fastq_list_rows.$": "States.Array($.fastq_list_row)" + } + }, + "Retry": [ + { + "ErrorEquals": [ + "Lambda.ServiceException", + "Lambda.AWSLambdaException", + "Lambda.SdkClientException", + "Lambda.TooManyRequestsException", + "States.TaskFailed" + ], + "IntervalSeconds": 60, + "MaxAttempts": 3, + "BackoffRate": 2 + } + ], + "Comment": "Generate a copy manifest object, ready to parse into the icav2 copy batch utility step function\n\nWe expect the following inputs:\n\n* cache_path\n* project_id\n* sample_id\n* fastq_list_rows\n\nAnd we expect the following outputs:\n\n* manifest", + "ResultSelector": { + "dest_uri.$": "$.Payload.dest_uri", + "source_uris.$": "$.Payload.source_uris" + }, + "ResultPath": "$.generate_copy_manifest_dict_step", + "Next": "Get Variable number of seconds" + }, + "Get Variable number of seconds": { + "Type": "Task", + "Resource": "arn:aws:states:::lambda:invoke", + "Parameters": { + "FunctionName": "${__get_variable_number_of_seconds_lambda_function_arn__}" + }, + "Retry": [ + { + "ErrorEquals": [ + "Lambda.ServiceException", + "Lambda.AWSLambdaException", + "Lambda.SdkClientException", + "Lambda.TooManyRequestsException", + "States.TaskFailed" + ], + "IntervalSeconds": 60, + "MaxAttempts": 3, + "BackoffRate": 2 + } + ], + "ResultSelector": { + "num_seconds.$": "$.Payload.random_number" + }, + "ResultPath": "$.get_variable_seconds_step", + "Next": "Wait a variable amount of time" + }, + "Wait a variable amount of time": { + "Type": "Wait", + "SecondsPath": "$.get_variable_seconds_step.num_seconds", + "Next": "Check Running Copy Fastq Job Number" + }, + "Check Running Copy Fastq Job Number": { + "Type": "Task", + "Resource": "arn:aws:states:::lambda:invoke", + "Parameters": { + "FunctionName": "${__check_number_of_copy_jobs_running_lambda_function_arn__}" + }, + "Retry": [ + { + "ErrorEquals": [ + "Lambda.ServiceException", + "Lambda.AWSLambdaException", + "Lambda.SdkClientException", + "Lambda.TooManyRequestsException", + "States.TaskFailed" + ], + "IntervalSeconds": 60, + "MaxAttempts": 3, + "BackoffRate": 2 + } + ], + "ResultSelector": { + "run_copy_job_step_bool.$": "$.Payload.run_copy_job_step_bool" + }, + "ResultPath": "$.run_copy_job_step", + "Next": "Allowed to run" + }, + "Allowed to run": { + "Type": "Choice", + "Choices": [ + { + "Variable": "$.run_copy_job_step.run_copy_job_step_bool", + "BooleanEquals": false, + "Next": "Wait A Minute", + "Comment": "Too Many jobs running already" + } + ], + "Default": "Copy Fastq Files to Cache Directory" + }, + "Wait A Minute": { + "Type": "Wait", + "Seconds": 60, + "Next": "Check Running Copy Fastq Job Number" + }, + "Copy Fastq Files to Cache Directory": { + "Type": "Task", + "End": true, + "Parameters": { + "StateMachineArn": "${__copy_icav2_files_state_machine_arn__}", + "Input": { + "dest_uri.$": "$.generate_copy_manifest_dict_step.dest_uri", + "source_uris.$": "$.generate_copy_manifest_dict_step.source_uris" + } + }, + "Resource": "arn:aws:states:::states:startExecution.sync:2", + "ResultPath": "$.copy_fastq_files_to_cache_directory_step", + "ResultSelector": { + "job_id.$": "$.Output.job_id" + } + } } - ], - "ResultSelector": { - "run_copy_job_step_bool.$": "$.Payload.run_copy_job_step_bool" }, - "ResultPath": "$.run_copy_job_step", - "Next": "Allowed to run" - }, - "Allowed to run": { - "Type": "Choice", - "Choices": [ - { - "Variable": "$.run_copy_job_step.run_copy_job_step_bool", - "BooleanEquals": false, - "Next": "Wait A Minute", - "Comment": "Too Many jobs running already" - } - ], - "Default": "Copy Fastq Files to Cache Directory" - }, - "Wait A Minute": { - "Type": "Wait", - "Seconds": 60, - "Next": "Check Running Copy Fastq Job Number" - }, - "Copy Fastq Files to Cache Directory": { - "Type": "Task", - "End": true, - "Parameters": { - "StateMachineArn": "${__copy_icav2_files_state_machine_arn__}", - "Input": { - "dest_uri.$": "$.generate_copy_manifest_dict_step.dest_uri", - "source_uris.$": "$.generate_copy_manifest_dict_step.source_uris" - } - }, - "Resource": "arn:aws:states:::states:startExecution.sync:2", - "ResultPath": "$.copy_fastq_files_to_cache_directory_step", - "ResultSelector": { - "job_id.$": "$.Output.job_id" - } + "End": true } } }