From 09b9f43ab1603eb820fc34e1c175fdf4f76afc36 Mon Sep 17 00:00:00 2001 From: Alexis Lucattini Date: Mon, 25 Nov 2024 13:39:12 +1100 Subject: [PATCH] Rate limit ora compression workflow 1. Rate limit the number of concurrent ecs tasks to 400 2. Rate limit the number of concurrent instrument runs either gzip -> md5sum or ora -> md5sum to 10 --- config/constants.ts | 2 +- .../gzip-raw-md5sum-fq-pair-sfn/index.ts | 69 +++++ ...s_task_rate_limiters_sfn_template.asl.json | 38 +++ ...5sum_for_fastq_gzip_pair_template.asl.json | 23 +- .../index.ts | 70 +++++ ...press_ora_fastq_pair_sfn_template.asl.json | 25 +- ...s_task_rate_limiters_sfn_template.asl.json | 38 +++ .../ora-compression-manager/deploy/index.ts | 242 ++++++++++++++++-- ..._for_all_fastq_gzips_sfn_template.asl.json | 75 ++++++ ...um_for_all_fastq_ora_sfn_template.asl.json | 117 +++++++++ ...d5sum_for_fastq_gzip_sfn_template.asl.json | 39 +++ ...md5sum_for_fastq_ora_sfn_template.asl.json | 39 +++ .../set_compression_inputs.asl.json | 86 ++----- .../set_compression_outputs.asl.json | 122 +-------- 14 files changed, 778 insertions(+), 207 deletions(-) create mode 100644 lib/workload/components/gzip-raw-md5sum-fq-pair-sfn/step_functions_templates/ecs_task_rate_limiters_sfn_template.asl.json create mode 100644 lib/workload/components/ora-file-decompression-fq-pair-sfn/step_functions_templates/ecs_task_rate_limiters_sfn_template.asl.json create mode 100644 lib/workload/stateless/stacks/ora-compression-manager/step_functions_templates/get_raw_md5sum_for_all_fastq_gzips_sfn_template.asl.json create mode 100644 lib/workload/stateless/stacks/ora-compression-manager/step_functions_templates/get_raw_md5sum_for_all_fastq_ora_sfn_template.asl.json create mode 100644 lib/workload/stateless/stacks/ora-compression-manager/step_functions_templates/rate_limit_get_raw_md5sum_for_fastq_gzip_sfn_template.asl.json create mode 100644 lib/workload/stateless/stacks/ora-compression-manager/step_functions_templates/rate_limit_get_raw_md5sum_for_fastq_ora_sfn_template.asl.json diff --git a/config/constants.ts b/config/constants.ts index f87033125..1d58cbfce 100644 --- a/config/constants.ts +++ b/config/constants.ts @@ -859,7 +859,7 @@ Resources used by the ora compression pipeline // Pipeline ID is: 5c1c2fa2-30dc-46ed-9e7f-dc4fefac77b6 // deployed to dev, stg and prod export const oraCompressionIcav2PipelineIdSSMParameterPath = - '/icav2/umccr-prod/ora_compression_pipeline_id'; // 5c1c2fa2-30dc-46ed-9e7f-dc4fefac77b6 + '/icav2/umccr-prod/ora_compression_pipeline_id'; // 0540fca4-cc40-45ac-88e2-d32df69c6954 // Default Reference Uri for compressing ORA files // Reference URI is icav2://reference-data/dragen-ora/v2/ora_reference_v2.tar.gz diff --git a/lib/workload/components/gzip-raw-md5sum-fq-pair-sfn/index.ts b/lib/workload/components/gzip-raw-md5sum-fq-pair-sfn/index.ts index 77778276a..c4a4e23c3 100644 --- a/lib/workload/components/gzip-raw-md5sum-fq-pair-sfn/index.ts +++ b/lib/workload/components/gzip-raw-md5sum-fq-pair-sfn/index.ts @@ -112,6 +112,45 @@ export class GzipRawMd5sumDecompressionConstruct extends Construct { icav2SecretObj.grantRead(lambdaObj.currentVersion); }); + // Set up the step function for rate limiting the number of tasks running simultaneously + const rateLimitStateMachine = new sfn.StateMachine(this, 'RateLimitStateMachine', { + stateMachineName: `${props.sfnPrefix}-ecs-limiter-sfn`, + definitionBody: sfn.DefinitionBody.fromFile( + path.join( + __dirname, + 'step_functions_templates/ecs_task_rate_limiters_sfn_template.asl.json' + ) + ), + definitionSubstitutions: { + __cluster_arn__: cluster.clusterArn, + }, + }); + + // Allow the rate limit state machine to list the ECS tasks + // in the cluster + rateLimitStateMachine.addToRolePolicy( + new iam.PolicyStatement({ + resources: ['*'], + actions: ['ecs:ListTasks'], + conditions: { + ArnEquals: { + 'ecs:cluster': cluster.clusterArn, + }, + }, + }) + ); + + NagSuppressions.addResourceSuppressions( + rateLimitStateMachine, + [ + { + id: 'AwsSolutions-IAM5', + reason: 'Give permissions to resources *, to list tasks with condition on ecs cluster', + }, + ], + true + ); + // Set up step function // Build state machine object this.sfnObject = new sfn.StateMachine(this, 'state_machine', { @@ -136,9 +175,39 @@ export class GzipRawMd5sumDecompressionConstruct extends Construct { readIcav2FileContentsLambdaObj.currentVersion.functionArn, __delete_icav2_cache_uri_lambda_function_arn__: deleteIcav2CacheUriLambdaObj.currentVersion.functionArn, + /* Child step functions */ + __ecs_task_rate_limit_sfn_arn__: rateLimitStateMachine.stateMachineArn, }, }); + rateLimitStateMachine.grantStartExecution(this.sfnObject); + rateLimitStateMachine.grantRead(this.sfnObject); + + // Because we run a nested state machine, we need to add the permissions to the state machine role + // See https://stackoverflow.com/questions/60612853/nested-step-function-in-a-step-function-unknown-error-not-authorized-to-cr + this.sfnObject.addToRolePolicy( + new iam.PolicyStatement({ + resources: [ + `arn:aws:events:${cdk.Aws.REGION}:${cdk.Aws.ACCOUNT_ID}:rule/StepFunctionsGetEventsForStepFunctionsExecutionRule`, + ], + actions: ['events:PutTargets', 'events:PutRule', 'events:DescribeRule'], + }) + ); + + // https://docs.aws.amazon.com/step-functions/latest/dg/connect-stepfunctions.html#sync-async-iam-policies + // Polling requires permission for states:DescribeExecution + NagSuppressions.addResourceSuppressions( + this.sfnObject, + [ + { + id: 'AwsSolutions-IAM5', + reason: + 'grantRead uses asterisk at the end of executions, as we need permissions for all execution invocations', + }, + ], + true + ); + // Allow the step function to invoke the lambda [readIcav2FileContentsLambdaObj, deleteIcav2CacheUriLambdaObj].forEach((lambdaObj) => { lambdaObj.currentVersion.grantInvoke(this.sfnObject); diff --git a/lib/workload/components/gzip-raw-md5sum-fq-pair-sfn/step_functions_templates/ecs_task_rate_limiters_sfn_template.asl.json b/lib/workload/components/gzip-raw-md5sum-fq-pair-sfn/step_functions_templates/ecs_task_rate_limiters_sfn_template.asl.json new file mode 100644 index 000000000..9f0eda22f --- /dev/null +++ b/lib/workload/components/gzip-raw-md5sum-fq-pair-sfn/step_functions_templates/ecs_task_rate_limiters_sfn_template.asl.json @@ -0,0 +1,38 @@ +{ + "Comment": "Task Backoff SFN", + "StartAt": "ListTasks", + "States": { + "ListTasks": { + "Type": "Task", + "Parameters": { + "Cluster": "${__cluster_arn__}" + }, + "Resource": "arn:aws:states:::aws-sdk:ecs:listTasks", + "ResultSelector": { + "num_tasks_running.$": "States.ArrayLength($.TaskArns)" + }, + "ResultPath": "$.get_num_tasks_step", + "Next": "Over 400 tasks running" + }, + "Over 400 tasks running": { + "Type": "Choice", + "Choices": [ + { + "Variable": "$.get_num_tasks_step.num_tasks_running", + "NumericGreaterThan": 400, + "Next": "Sleep a minute", + "Comment": "More than 400 tasks running" + } + ], + "Default": "Success" + }, + "Sleep a minute": { + "Type": "Wait", + "Seconds": 60, + "Next": "ListTasks" + }, + "Success": { + "Type": "Succeed" + } + } +} diff --git a/lib/workload/components/gzip-raw-md5sum-fq-pair-sfn/step_functions_templates/get_raw_md5sum_for_fastq_gzip_pair_template.asl.json b/lib/workload/components/gzip-raw-md5sum-fq-pair-sfn/step_functions_templates/get_raw_md5sum_for_fastq_gzip_pair_template.asl.json index 809b39116..95055ebdc 100644 --- a/lib/workload/components/gzip-raw-md5sum-fq-pair-sfn/step_functions_templates/get_raw_md5sum_for_fastq_gzip_pair_template.asl.json +++ b/lib/workload/components/gzip-raw-md5sum-fq-pair-sfn/step_functions_templates/get_raw_md5sum_for_fastq_gzip_pair_template.asl.json @@ -4,7 +4,7 @@ "States": { "Convert Fastq List Row GZ URIs to Array": { "Type": "Pass", - "Next": "Decompress GZIP Files", + "Next": "Wait for Task Availability", "Parameters": { "gzip_files_map": [ { @@ -19,6 +19,16 @@ }, "ResultPath": "$.fastq_list_row_as_map" }, + "Wait for Task Availability": { + "Type": "Task", + "Resource": "arn:aws:states:::states:startExecution.sync:2", + "Parameters": { + "StateMachineArn": "${__ecs_task_rate_limit_sfn_arn__}", + "Input": {} + }, + "Next": "Decompress GZIP Files", + "ResultPath": null + }, "Decompress GZIP Files": { "Type": "Map", "ItemsPath": "$.fastq_list_row_as_map.gzip_files_map", @@ -71,8 +81,15 @@ { "ErrorEquals": ["ECS.AmazonECSException"], "BackoffRate": 2, - "IntervalSeconds": 300, - "MaxAttempts": 3, + "IntervalSeconds": 3600, + "MaxAttempts": 5, + "JitterStrategy": "FULL" + }, + { + "ErrorEquals": ["States.TaskFailed"], + "BackoffRate": 2, + "IntervalSeconds": 3600, + "MaxAttempts": 5, "JitterStrategy": "FULL" } ], diff --git a/lib/workload/components/ora-file-decompression-fq-pair-sfn/index.ts b/lib/workload/components/ora-file-decompression-fq-pair-sfn/index.ts index 1898abd61..279efda10 100644 --- a/lib/workload/components/ora-file-decompression-fq-pair-sfn/index.ts +++ b/lib/workload/components/ora-file-decompression-fq-pair-sfn/index.ts @@ -77,6 +77,45 @@ export class OraDecompressionConstruct extends Construct { }), }); + // Set up the step function for rate limiting the number of tasks running simultaneously + const rateLimitStateMachine = new sfn.StateMachine(this, 'RateLimitStateMachine', { + stateMachineName: `${props.sfnPrefix}-ecs-limiter-sfn`, + definitionBody: sfn.DefinitionBody.fromFile( + path.join( + __dirname, + 'step_functions_templates/ecs_task_rate_limiters_sfn_template.asl.json' + ) + ), + definitionSubstitutions: { + __cluster_arn__: cluster.clusterArn, + }, + }); + + // Allow the rate limit state machine to list the ECS tasks + // in the cluster + rateLimitStateMachine.addToRolePolicy( + new iam.PolicyStatement({ + resources: ['*'], + actions: ['ecs:ListTasks'], + conditions: { + ArnEquals: { + 'ecs:cluster': cluster.clusterArn, + }, + }, + }) + ); + + NagSuppressions.addResourceSuppressions( + rateLimitStateMachine, + [ + { + id: 'AwsSolutions-IAM5', + reason: 'Give permissions to resources *, to list tasks with condition on ecs cluster', + }, + ], + true + ); + // Set up step function // Build state machine object this.sfnObject = new sfn.StateMachine(this, 'state_machine', { @@ -96,9 +135,40 @@ export class OraDecompressionConstruct extends Construct { __ora_container_name__: oraDecompressionContainer.containerName, __subnets__: cluster.vpc.privateSubnets.map((subnet) => subnet.subnetId).join(','), __sg_group__: securityGroup.securityGroupId, + /* Child step functions */ + __ecs_task_rate_limit_sfn_arn__: rateLimitStateMachine.stateMachineArn, }, }); + // Grant permissions to the state machine + rateLimitStateMachine.grantStartExecution(this.sfnObject); + rateLimitStateMachine.grantRead(this.sfnObject); + + // Because we run a nested state machine, we need to add the permissions to the state machine role + // See https://stackoverflow.com/questions/60612853/nested-step-function-in-a-step-function-unknown-error-not-authorized-to-cr + this.sfnObject.addToRolePolicy( + new iam.PolicyStatement({ + resources: [ + `arn:aws:events:${cdk.Aws.REGION}:${cdk.Aws.ACCOUNT_ID}:rule/StepFunctionsGetEventsForStepFunctionsExecutionRule`, + ], + actions: ['events:PutTargets', 'events:PutRule', 'events:DescribeRule'], + }) + ); + + // https://docs.aws.amazon.com/step-functions/latest/dg/connect-stepfunctions.html#sync-async-iam-policies + // Polling requires permission for states:DescribeExecution + NagSuppressions.addResourceSuppressions( + this.sfnObject, + [ + { + id: 'AwsSolutions-IAM5', + reason: + 'grantRead uses asterisk at the end of executions, as we need permissions for all execution invocations', + }, + ], + true + ); + // Allow step function to run the ECS task taskDefinition.grantRun(this.sfnObject); diff --git a/lib/workload/components/ora-file-decompression-fq-pair-sfn/step_functions_templates/decompress_ora_fastq_pair_sfn_template.asl.json b/lib/workload/components/ora-file-decompression-fq-pair-sfn/step_functions_templates/decompress_ora_fastq_pair_sfn_template.asl.json index 436fea6a8..02bea38f3 100644 --- a/lib/workload/components/ora-file-decompression-fq-pair-sfn/step_functions_templates/decompress_ora_fastq_pair_sfn_template.asl.json +++ b/lib/workload/components/ora-file-decompression-fq-pair-sfn/step_functions_templates/decompress_ora_fastq_pair_sfn_template.asl.json @@ -15,7 +15,7 @@ }, "Convert Fastq List Row ORA Validation to Array": { "Type": "Pass", - "Next": "Decompress ORA Files", + "Next": "Wait for Task Availability", "Parameters": { "ora_files_map": [ { @@ -32,7 +32,7 @@ }, "Convert Fastq List Row ORA to Array": { "Type": "Pass", - "Next": "Decompress ORA Files", + "Next": "Wait for Task Availability", "Parameters": { "ora_files_map": [ { @@ -49,6 +49,16 @@ }, "ResultPath": "$.fastq_list_row_as_map" }, + "Wait for Task Availability": { + "Type": "Task", + "Resource": "arn:aws:states:::states:startExecution.sync:2", + "Parameters": { + "StateMachineArn": "${__ecs_task_rate_limit_sfn_arn__}", + "Input": {} + }, + "Next": "Decompress ORA Files", + "ResultPath": null + }, "Decompress ORA Files": { "Type": "Map", "ItemsPath": "$.fastq_list_row_as_map.ora_files_map", @@ -117,8 +127,15 @@ { "ErrorEquals": ["ECS.AmazonECSException"], "BackoffRate": 2, - "IntervalSeconds": 300, - "MaxAttempts": 3, + "IntervalSeconds": 3600, + "MaxAttempts": 5, + "JitterStrategy": "FULL" + }, + { + "ErrorEquals": ["States.TaskFailed"], + "BackoffRate": 2, + "IntervalSeconds": 3600, + "MaxAttempts": 2, "JitterStrategy": "FULL" } ], diff --git a/lib/workload/components/ora-file-decompression-fq-pair-sfn/step_functions_templates/ecs_task_rate_limiters_sfn_template.asl.json b/lib/workload/components/ora-file-decompression-fq-pair-sfn/step_functions_templates/ecs_task_rate_limiters_sfn_template.asl.json new file mode 100644 index 000000000..9f0eda22f --- /dev/null +++ b/lib/workload/components/ora-file-decompression-fq-pair-sfn/step_functions_templates/ecs_task_rate_limiters_sfn_template.asl.json @@ -0,0 +1,38 @@ +{ + "Comment": "Task Backoff SFN", + "StartAt": "ListTasks", + "States": { + "ListTasks": { + "Type": "Task", + "Parameters": { + "Cluster": "${__cluster_arn__}" + }, + "Resource": "arn:aws:states:::aws-sdk:ecs:listTasks", + "ResultSelector": { + "num_tasks_running.$": "States.ArrayLength($.TaskArns)" + }, + "ResultPath": "$.get_num_tasks_step", + "Next": "Over 400 tasks running" + }, + "Over 400 tasks running": { + "Type": "Choice", + "Choices": [ + { + "Variable": "$.get_num_tasks_step.num_tasks_running", + "NumericGreaterThan": 400, + "Next": "Sleep a minute", + "Comment": "More than 400 tasks running" + } + ], + "Default": "Success" + }, + "Sleep a minute": { + "Type": "Wait", + "Seconds": 60, + "Next": "ListTasks" + }, + "Success": { + "Type": "Succeed" + } + } +} diff --git a/lib/workload/stateless/stacks/ora-compression-manager/deploy/index.ts b/lib/workload/stateless/stacks/ora-compression-manager/deploy/index.ts index 1112b8a1b..450729cac 100644 --- a/lib/workload/stateless/stacks/ora-compression-manager/deploy/index.ts +++ b/lib/workload/stateless/stacks/ora-compression-manager/deploy/index.ts @@ -78,7 +78,6 @@ export type OraCompressionIcav2PipelineManagerStackProps = OraCompressionIcav2PipelineManagerConfig & cdk.StackProps; export class OraCompressionIcav2PipelineManagerStack extends cdk.Stack { - public readonly OraCompressionLaunchStateMachineObj: string; private globals = { workflowManagerSource: 'orcabus.workflowmanager', outputCompressionDetailType: 'FastqListRowCompressed', @@ -86,6 +85,7 @@ export class OraCompressionIcav2PipelineManagerStack extends cdk.Stack { tablePartitionNames: { fastqListRow: 'fastq_list_row', instrumentRunId: 'instrument_run_id', + portalRunId: 'portal_run_id', }, }; @@ -167,10 +167,105 @@ export class OraCompressionIcav2PipelineManagerStack extends cdk.Stack { /* Generate an instance of the gzip raw md5sum sfn arn */ - const gzipRawMd5sumSfnObj = new GzipRawMd5sumDecompressionConstruct(this, 'gzip_raw_md5sum', { - sfnPrefix: props.stateMachinePrefix, - icav2AccessTokenSecretId: icav2AccessTokenSecretObj.secretName, - }).sfnObject; + const gzipRawMd5sumSingleSfnObj = new GzipRawMd5sumDecompressionConstruct( + this, + 'gzip_raw_md5sum_single', + { + sfnPrefix: props.stateMachinePrefix, + icav2AccessTokenSecretId: icav2AccessTokenSecretObj.secretName, + } + ).sfnObject; + + // Create the wrapper that runs this for all fastq files + const gzipRawMd5sumInstrumentRunScatterSfnObj = new sfn.StateMachine( + this, + 'gzip_raw_md5sum_run', + { + stateMachineName: `${props.stateMachinePrefix}-gzip-raw-md5sum-instrument-run`, + definitionBody: sfn.DefinitionBody.fromFile( + path.join( + __dirname, + '../step_functions_templates/get_raw_md5sum_for_all_fastq_gzips_sfn_template.asl.json' + ) + ), + definitionSubstitutions: { + /* Table */ + __table_name__: dynamodbTableObj.tableName, + __fastq_list_row_table_partition_name__: this.globals.tablePartitionNames.fastqListRow, + /* Step functions */ + __gzip_raw_md5sum_sfn_arn__: gzipRawMd5sumSingleSfnObj.stateMachineArn, + }, + } + ); + + // Configure step function write access to the dynamodb table + dynamodbTableObj.grantReadWriteData(gzipRawMd5sumInstrumentRunScatterSfnObj); + + // Configure step function invoke access to the gzip raw md5sum sfn + gzipRawMd5sumSingleSfnObj.grantStartExecution(gzipRawMd5sumInstrumentRunScatterSfnObj); + gzipRawMd5sumSingleSfnObj.grantRead(gzipRawMd5sumInstrumentRunScatterSfnObj); + + // Allow step function to call nested state machine + // See https://stackoverflow.com/questions/60612853/nested-step-function-in-a-step-function-unknown-error-not-authorized-to-cr + gzipRawMd5sumInstrumentRunScatterSfnObj.addToRolePolicy( + new iam.PolicyStatement({ + resources: [ + `arn:aws:events:${cdk.Aws.REGION}:${cdk.Aws.ACCOUNT_ID}:rule/StepFunctionsGetEventsForStepFunctionsExecutionRule`, + ], + actions: ['events:PutTargets', 'events:PutRule', 'events:DescribeRule'], + }) + ); + + // https://docs.aws.amazon.com/step-functions/latest/dg/connect-stepfunctions.html#sync-async-iam-policies + // Polling requires permission for states:DescribeExecution + NagSuppressions.addResourceSuppressions( + gzipRawMd5sumInstrumentRunScatterSfnObj, + [ + { + id: 'AwsSolutions-IAM5', + reason: + 'grantRead uses asterisk at the end of executions, as we need permissions for all execution invocations', + }, + ], + true + ); + + // Generate the rate limit sfn for the gzip raw md5sum sfn + const gzipRawMd5sumRateLimitSfnObj = new sfn.StateMachine( + this, + 'rate_limit_get_raw_md5sums_gzip_sfn', + { + stateMachineName: `${props.stateMachinePrefix}-rate-limit-gzip-raw-md5sum-sfn`, + definitionBody: sfn.DefinitionBody.fromFile( + path.join( + __dirname, + '../step_functions_templates/rate_limit_get_raw_md5sum_for_fastq_gzip_sfn_template.asl.json' + ) + ), + definitionSubstitutions: { + /* Step functions */ + __instrument_run_gzip_to_md5_sfn_arn__: + gzipRawMd5sumInstrumentRunScatterSfnObj.stateMachineArn, + }, + } + ); + + // Configure step function to have listexecutions access to the gzip raw md5sum sfn + gzipRawMd5sumInstrumentRunScatterSfnObj.grantRead(gzipRawMd5sumRateLimitSfnObj); + + // https://docs.aws.amazon.com/step-functions/latest/dg/connect-stepfunctions.html#sync-async-iam-policies + // Polling requires permission for states:DescribeExecution + NagSuppressions.addResourceSuppressions( + gzipRawMd5sumRateLimitSfnObj, + [ + { + id: 'AwsSolutions-IAM5', + reason: + 'grantRead uses asterisk at the end of executions, as we need permissions for all execution invocations', + }, + ], + true + ); /* Generate the inputs sfn @@ -194,7 +289,9 @@ export class OraCompressionIcav2PipelineManagerStack extends cdk.Stack { __find_fastq_pairs_lambda_function_arn__: findAllFastqPairsInInstrumentRunLambdaObj.currentVersion.functionArn, /* Step functions */ - __gzip_raw_md5sum_sfn_arn__: gzipRawMd5sumSfnObj.stateMachineArn, + __rate_limit_get_raw_md5sums_gzip_sfn_arn__: gzipRawMd5sumRateLimitSfnObj.stateMachineArn, + __get_raw_md5sums_for_fastq_gzip_pair_sfn_arn__: + gzipRawMd5sumInstrumentRunScatterSfnObj.stateMachineArn, }, }); @@ -211,9 +308,11 @@ export class OraCompressionIcav2PipelineManagerStack extends cdk.Stack { } ); - // Configure step function invoke access to the gzip raw md5sum sfn - gzipRawMd5sumSfnObj.grantStartExecution(configureInputsSfn); - gzipRawMd5sumSfnObj.grantRead(configureInputsSfn); + // Configure step functions invoke access to the gzip raw md5sum sfn + [gzipRawMd5sumRateLimitSfnObj, gzipRawMd5sumInstrumentRunScatterSfnObj].forEach((sfnObj) => { + sfnObj.grantStartExecution(configureInputsSfn); + sfnObj.grantRead(configureInputsSfn); + }); // Configure the step function to have invoke access to the gzip raw md5sum sfn /* Allow step function to call nested state machine */ @@ -290,6 +389,105 @@ export class OraCompressionIcav2PipelineManagerStack extends cdk.Stack { icav2AccessTokenSecretId: icav2AccessTokenSecretObj.secretName, }).sfnObject; + // Create the wrapper that runs this for all fastq files + const oraRawMd5sumInstrumentRunScatterSfnObj = new sfn.StateMachine( + this, + 'ora_raw_md5sum_run', + { + stateMachineName: `${props.stateMachinePrefix}-ora-raw-md5sum-instrument-run`, + definitionBody: sfn.DefinitionBody.fromFile( + path.join( + __dirname, + '../step_functions_templates/get_raw_md5sum_for_all_fastq_ora_sfn_template.asl.json' + ) + ), + definitionSubstitutions: { + /* Table */ + __table_name__: dynamodbTableObj.tableName, + __fastq_list_row_table_partition_name__: this.globals.tablePartitionNames.fastqListRow, + __instrument_run_id_table_partition_name__: + this.globals.tablePartitionNames.instrumentRunId, + /* Lambdas */ + __merge_fastq_list_csv_with_rgid_lambda_function_arn__: + setMergeRgidsLambdaObj.currentVersion.functionArn, + /* Step functions */ + __ora_validation_sfn_arn__: oraDecompressionSfn.stateMachineArn, + }, + } + ); + + // Configure step function write access to the dynamodb table + dynamodbTableObj.grantReadWriteData(oraRawMd5sumInstrumentRunScatterSfnObj); + + // Configure step function invoke access to the lambda function + setMergeRgidsLambdaObj.currentVersion.grantInvoke(oraRawMd5sumInstrumentRunScatterSfnObj); + + // Configure step function invoke access to the ora raw md5sum sfn + oraDecompressionSfn.grantStartExecution(oraRawMd5sumInstrumentRunScatterSfnObj); + oraDecompressionSfn.grantRead(oraRawMd5sumInstrumentRunScatterSfnObj); + + // Allow step function to call nested state machine + // See https://stackoverflow.com/questions/60612853/nested-step-function-in-a-step-function-unknown-error-not-authorized-to-cr + oraRawMd5sumInstrumentRunScatterSfnObj.addToRolePolicy( + new iam.PolicyStatement({ + resources: [ + `arn:aws:events:${cdk.Aws.REGION}:${cdk.Aws.ACCOUNT_ID}:rule/StepFunctionsGetEventsForStepFunctionsExecutionRule`, + ], + actions: ['events:PutTargets', 'events:PutRule', 'events:DescribeRule'], + }) + ); + + // https://docs.aws.amazon.com/step-functions/latest/dg/connect-stepfunctions.html#sync-async-iam-policies + // Polling requires permission for states:DescribeExecution + NagSuppressions.addResourceSuppressions( + oraRawMd5sumInstrumentRunScatterSfnObj, + [ + { + id: 'AwsSolutions-IAM5', + reason: + 'grantRead uses asterisk at the end of executions, as we need permissions for all execution invocations', + }, + ], + true + ); + + // Generate the rate limit sfn for the ora raw md5sum sfn + const oraRawMd5sumRateLimitSfnObj = new sfn.StateMachine( + this, + 'rate_limit_get_raw_md5sums_ora_sfn', + { + stateMachineName: `${props.stateMachinePrefix}-rate-limit-ora-raw-md5sum-sfn`, + definitionBody: sfn.DefinitionBody.fromFile( + path.join( + __dirname, + '../step_functions_templates/rate_limit_get_raw_md5sum_for_fastq_ora_sfn_template.asl.json' + ) + ), + definitionSubstitutions: { + /* Step functions */ + __instrument_run_ora_to_md5_sfn_arn__: + oraRawMd5sumInstrumentRunScatterSfnObj.stateMachineArn, + }, + } + ); + + // Configure step function to have listexecutions access to the ora raw md5sum sfn + oraRawMd5sumInstrumentRunScatterSfnObj.grantRead(oraRawMd5sumRateLimitSfnObj); + + // https://docs.aws.amazon.com/step-functions/latest/dg/connect-stepfunctions.html#sync-async-iam-policies + // Polling requires permission for states:DescribeExecution + NagSuppressions.addResourceSuppressions( + oraRawMd5sumRateLimitSfnObj, + [ + { + id: 'AwsSolutions-IAM5', + reason: + 'grantRead uses asterisk at the end of executions, as we need permissions for all execution invocations', + }, + ], + true + ); + // Generate outputs const configureOutputsSfn = new sfn.StateMachine(this, 'configure_outputs_sfn', { stateMachineName: `${props.stateMachinePrefix}-configure-outputs-json`, @@ -299,16 +497,11 @@ export class OraCompressionIcav2PipelineManagerStack extends cdk.Stack { definitionSubstitutions: { /* Table */ __table_name__: dynamodbTableObj.tableName, - __fastq_list_row_table_partition_name__: this.globals.tablePartitionNames.fastqListRow, - __instrument_run_id_table_partition_name__: - this.globals.tablePartitionNames.instrumentRunId, - /* Lambda Functions */ - __set_outputs_json_lambda_function_arn__: - setOutputsJsonLambdaFunctionObj.currentVersion.functionArn, - __merge_fastq_list_csv_with_rgid_lambda_function_arn__: - setMergeRgidsLambdaObj.currentVersion.functionArn, + __portal_run_id_table_partition_name__: this.globals.tablePartitionNames.portalRunId, /* Step functions */ - __ora_validation_sfn_arn__: oraDecompressionSfn.stateMachineArn, + __rate_limit_get_raw_md5sums_ora_sfn_arn__: oraRawMd5sumRateLimitSfnObj.stateMachineArn, + __get_raw_md5sums_for_fastq_ora_pair_sfn_arn__: + oraRawMd5sumInstrumentRunScatterSfnObj.stateMachineArn, }, }); @@ -316,13 +509,14 @@ export class OraCompressionIcav2PipelineManagerStack extends cdk.Stack { dynamodbTableObj.grantReadWriteData(configureOutputsSfn); // Configure step function invoke access to the lambda function - [setMergeRgidsLambdaObj, setOutputsJsonLambdaFunctionObj].forEach((lambda_obj) => { - lambda_obj.currentVersion.grantInvoke(configureOutputsSfn); - }); + setOutputsJsonLambdaFunctionObj.currentVersion.grantInvoke(configureOutputsSfn); - // Configure step function invoke access to the ora decompression sfn - oraDecompressionSfn.grantStartExecution(configureOutputsSfn); - oraDecompressionSfn.grantRead(configureOutputsSfn); + // Configure step function invoke access to the ora raw md5sum sfn rate limiter + // And to the ora raw md5sum sfn + [oraRawMd5sumInstrumentRunScatterSfnObj, oraRawMd5sumRateLimitSfnObj].forEach((sfnObj) => { + sfnObj.grantStartExecution(configureOutputsSfn); + sfnObj.grantRead(configureOutputsSfn); + }); /* Allow step function to call nested state machine */ // Because we run a nested state machine, we need to add the permissions to the state machine role diff --git a/lib/workload/stateless/stacks/ora-compression-manager/step_functions_templates/get_raw_md5sum_for_all_fastq_gzips_sfn_template.asl.json b/lib/workload/stateless/stacks/ora-compression-manager/step_functions_templates/get_raw_md5sum_for_all_fastq_gzips_sfn_template.asl.json new file mode 100644 index 000000000..8a3a2bf28 --- /dev/null +++ b/lib/workload/stateless/stacks/ora-compression-manager/step_functions_templates/get_raw_md5sum_for_all_fastq_gzips_sfn_template.asl.json @@ -0,0 +1,75 @@ +{ + "Comment": "Launch Icav2 Pipeline and log in db", + "StartAt": "For each rgid", + "States": { + "For each rgid": { + "Type": "Map", + "ItemsPath": "$.rgid_list", + "ItemSelector": { + "rgid_pair.$": "$$.Map.Item.Value" + }, + "ItemProcessor": { + "ProcessorConfig": { + "Mode": "INLINE" + }, + "StartAt": "DynamoDB Get Fastq RGID", + "States": { + "DynamoDB Get Fastq RGID": { + "Type": "Task", + "Resource": "arn:aws:states:::dynamodb:getItem", + "Parameters": { + "TableName": "${__table_name__}", + "Key": { + "id.$": "$.rgid_pair.rgid", + "id_type": "${__fastq_list_row_table_partition_name__}" + } + }, + "ResultPath": "$.get_fastq_rgid_from_dynamodb_step", + "Next": "Get Raw MD5sum for fastq gzip pair" + }, + "Get Raw MD5sum for fastq gzip pair": { + "Type": "Task", + "Resource": "arn:aws:states:::states:startExecution.sync:2", + "Parameters": { + "StateMachineArn": "${__gzip_raw_md5sum_sfn_arn__}", + "Input": { + "read1GzFileUri.$": "$.get_fastq_rgid_from_dynamodb_step.Item.read1_gz_file_uri.S", + "read2GzFileUri.$": "$.get_fastq_rgid_from_dynamodb_step.Item.read2_gz_file_uri.S", + "cacheDir.$": "$.get_fastq_rgid_from_dynamodb_step.Item.cache_uri.S" + } + }, + "ResultSelector": { + "read1RawMd5sum.$": "$.Output.read1RawMd5sum", + "read2RawMd5sum.$": "$.Output.read2RawMd5sum" + }, + "ResultPath": "$.get_raw_md5sums_step", + "Next": "Add raw md5sums to db" + }, + "Add raw md5sums to db": { + "Type": "Task", + "Resource": "arn:aws:states:::dynamodb:updateItem", + "Parameters": { + "TableName": "${__table_name__}", + "Key": { + "id.$": "$.rgid_pair.rgid", + "id_type": "${__fastq_list_row_table_partition_name__}" + }, + "UpdateExpression": "SET read1_raw_md5sum = :read1RawMd5sum, read2_raw_md5sum = :read2RawMd5sum", + "ExpressionAttributeValues": { + ":read1RawMd5sum": { + "S.$": "$.get_raw_md5sums_step.read1RawMd5sum" + }, + ":read2RawMd5sum": { + "S.$": "$.get_raw_md5sums_step.read2RawMd5sum" + } + } + }, + "End": true + } + } + }, + "End": true, + "ResultPath": null + } + } +} diff --git a/lib/workload/stateless/stacks/ora-compression-manager/step_functions_templates/get_raw_md5sum_for_all_fastq_ora_sfn_template.asl.json b/lib/workload/stateless/stacks/ora-compression-manager/step_functions_templates/get_raw_md5sum_for_all_fastq_ora_sfn_template.asl.json new file mode 100644 index 000000000..6f8b16337 --- /dev/null +++ b/lib/workload/stateless/stacks/ora-compression-manager/step_functions_templates/get_raw_md5sum_for_all_fastq_ora_sfn_template.asl.json @@ -0,0 +1,117 @@ +{ + "Comment": "Get CWL Outputs from BCLConvert InterOp QC pipeline", + "StartAt": "Get RGIDs from DB", + "States": { + "Get RGIDs from DB": { + "Type": "Task", + "Resource": "arn:aws:states:::dynamodb:getItem", + "Parameters": { + "TableName": "${__table_name__}", + "Key": { + "id.$": "$.instrument_run_id", + "id_type": "${__instrument_run_id_table_partition_name__}" + } + }, + "ResultSelector": { + "rgids.$": "$.Item.rgid_list.SS" + }, + "ResultPath": "$.get_rgids_from_db_step", + "Next": "Merge fastq list csv with rgids" + }, + "Merge fastq list csv with rgids": { + "Type": "Task", + "Resource": "arn:aws:states:::lambda:invoke", + "Parameters": { + "FunctionName": "${__merge_fastq_list_csv_with_rgid_lambda_function_arn__}", + "Payload": { + "rgids_list.$": "$.get_rgids_from_db_step.rgids", + "instrument_run_folder_uri.$": "$.instrument_run_folder_uri" + } + }, + "Retry": [ + { + "ErrorEquals": [ + "Lambda.ServiceException", + "Lambda.AWSLambdaException", + "Lambda.SdkClientException", + "Lambda.TooManyRequestsException" + ], + "IntervalSeconds": 1, + "MaxAttempts": 3, + "BackoffRate": 2, + "JitterStrategy": "FULL" + } + ], + "ResultSelector": { + "fastq_ora_file_ora_by_rgid.$": "$.Payload" + }, + "ResultPath": "$.merge_rgids_step", + "Next": "Validate Fastq Outputs" + }, + "Validate Fastq Outputs": { + "Type": "Map", + "ItemsPath": "$.merge_rgids_step.fastq_ora_file_ora_by_rgid", + "ItemProcessor": { + "ProcessorConfig": { + "Mode": "INLINE" + }, + "StartAt": "Get Raw md5sums for rgid", + "States": { + "Get Raw md5sums for rgid": { + "Type": "Task", + "Resource": "arn:aws:states:::dynamodb:getItem", + "Parameters": { + "TableName": "${__table_name__}", + "Key": { + "id.$": "$.rgid", + "id_type": "${__fastq_list_row_table_partition_name__}" + } + }, + "ResultPath": "$.get_raw_md5sums_for_rgid_step", + "Next": "Update fastq list row partition with ora outputs" + }, + "Update fastq list row partition with ora outputs": { + "Type": "Task", + "Resource": "arn:aws:states:::dynamodb:updateItem", + "Parameters": { + "TableName": "${__table_name__}", + "Key": { + "id.$": "$.rgid", + "id_type": "${__fastq_list_row_table_partition_name__}" + }, + "UpdateExpression": "SET read1_ora_file_uri = :read1OraFileUri, read2_ora_file_uri = :read2OraFileUri", + "ExpressionAttributeValues": { + ":read1OraFileUri": { + "S.$": "$.read_1_file_uri" + }, + ":read2OraFileUri": { + "S.$": "$.read_2_file_uri" + } + } + }, + "ResultPath": null, + "Next": "Step Functions StartExecution" + }, + "Step Functions StartExecution": { + "Type": "Task", + "Resource": "arn:aws:states:::states:startExecution.sync:2", + "Parameters": { + "StateMachineArn": "${__ora_validation_sfn_arn__}", + "Input": { + "read1OraFileUri.$": "$.read_1_file_uri", + "read2OraFileUri.$": "$.read_2_file_uri", + "read1RawMd5sum.$": "$.get_raw_md5sums_for_rgid_step.Item.read1_raw_md5sum.S", + "read2RawMd5sum.$": "$.get_raw_md5sums_for_rgid_step.Item.read2_raw_md5sum.S", + "validationOnly": true + } + }, + "ResultPath": null, + "End": true + } + } + }, + "End": true, + "ResultPath": null + } + } +} diff --git a/lib/workload/stateless/stacks/ora-compression-manager/step_functions_templates/rate_limit_get_raw_md5sum_for_fastq_gzip_sfn_template.asl.json b/lib/workload/stateless/stacks/ora-compression-manager/step_functions_templates/rate_limit_get_raw_md5sum_for_fastq_gzip_sfn_template.asl.json new file mode 100644 index 000000000..d469d9f27 --- /dev/null +++ b/lib/workload/stateless/stacks/ora-compression-manager/step_functions_templates/rate_limit_get_raw_md5sum_for_fastq_gzip_sfn_template.asl.json @@ -0,0 +1,39 @@ +{ + "Comment": "Task Backoff SFN", + "StartAt": "ListExecutions", + "States": { + "ListExecutions": { + "Type": "Task", + "Parameters": { + "StateMachineArn": "${__instrument_run_gzip_to_md5_sfn_arn__}", + "StatusFilter": "RUNNING" + }, + "Resource": "arn:aws:states:::aws-sdk:sfn:listExecutions", + "Next": "Over 10 executions running", + "ResultSelector": { + "num_executions_running.$": "States.ArrayLength(.Executions)" + }, + "ResultPath": "$.get_num_running_executions_step" + }, + "Over 10 executions running": { + "Type": "Choice", + "Choices": [ + { + "Variable": "$.get_num_running_executions_step.num_executions_running", + "NumericGreaterThan": 10, + "Comment": "More than 10 executions running", + "Next": "Sleep 5 mins" + } + ], + "Default": "Success" + }, + "Sleep 5 mins": { + "Type": "Wait", + "Seconds": 300, + "Next": "Over 10 executions running" + }, + "Success": { + "Type": "Succeed" + } + } +} diff --git a/lib/workload/stateless/stacks/ora-compression-manager/step_functions_templates/rate_limit_get_raw_md5sum_for_fastq_ora_sfn_template.asl.json b/lib/workload/stateless/stacks/ora-compression-manager/step_functions_templates/rate_limit_get_raw_md5sum_for_fastq_ora_sfn_template.asl.json new file mode 100644 index 000000000..f83a290ac --- /dev/null +++ b/lib/workload/stateless/stacks/ora-compression-manager/step_functions_templates/rate_limit_get_raw_md5sum_for_fastq_ora_sfn_template.asl.json @@ -0,0 +1,39 @@ +{ + "Comment": "Task Backoff SFN", + "StartAt": "ListExecutions", + "States": { + "ListExecutions": { + "Type": "Task", + "Parameters": { + "StateMachineArn": "${__instrument_run_ora_to_md5_sfn_arn__}", + "StatusFilter": "RUNNING" + }, + "Resource": "arn:aws:states:::aws-sdk:sfn:listExecutions", + "Next": "Over 10 executions running", + "ResultSelector": { + "num_executions_running.$": "States.ArrayLength(.Executions)" + }, + "ResultPath": "$.get_num_running_executions_step" + }, + "Over 10 executions running": { + "Type": "Choice", + "Choices": [ + { + "Variable": "$.get_num_running_executions_step.num_executions_running", + "NumericGreaterThan": 10, + "Comment": "More than 10 executions running", + "Next": "Sleep 5 mins" + } + ], + "Default": "Success" + }, + "Sleep 5 mins": { + "Type": "Wait", + "Seconds": 300, + "Next": "Over 10 executions running" + }, + "Success": { + "Type": "Succeed" + } + } +} diff --git a/lib/workload/stateless/stacks/ora-compression-manager/step_functions_templates/set_compression_inputs.asl.json b/lib/workload/stateless/stacks/ora-compression-manager/step_functions_templates/set_compression_inputs.asl.json index 242dea127..8427ea4b8 100644 --- a/lib/workload/stateless/stacks/ora-compression-manager/step_functions_templates/set_compression_inputs.asl.json +++ b/lib/workload/stateless/stacks/ora-compression-manager/step_functions_templates/set_compression_inputs.asl.json @@ -259,77 +259,29 @@ } } }, - "Next": "For each rgid", - "ResultPath": null + "ResultPath": null, + "Next": "Rate Limit Get Raw MD5sums for fastq gzip run" }, - "For each rgid": { - "Type": "Map", - "ItemsPath": "$.get_rgid_pairs.rgids_in_samplesheet", - "ItemSelector": { - "rgid_pair.$": "$$.Map.Item.Value" + "Rate Limit Get Raw MD5sums for fastq gzip run": { + "Type": "Task", + "Resource": "arn:aws:states:::states:startExecution.sync:2", + "Parameters": { + "StateMachineArn": "${__rate_limit_get_raw_md5sums_gzip_sfn_arn__}", + "Input": {} }, - "ItemProcessor": { - "ProcessorConfig": { - "Mode": "INLINE" - }, - "StartAt": "DynamoDB Get Fastq RGID", - "States": { - "DynamoDB Get Fastq RGID": { - "Type": "Task", - "Resource": "arn:aws:states:::dynamodb:getItem", - "Parameters": { - "TableName": "${__table_name__}", - "Key": { - "id.$": "$.rgid_pair.rgid", - "id_type": "${__fastq_list_row_table_partition_name__}" - } - }, - "ResultPath": "$.get_fastq_rgid_from_dynamodb_step", - "Next": "Get Raw MD5sum for fastq gzip pair" - }, - "Get Raw MD5sum for fastq gzip pair": { - "Type": "Task", - "Resource": "arn:aws:states:::states:startExecution.sync:2", - "Parameters": { - "StateMachineArn": "${__gzip_raw_md5sum_sfn_arn__}", - "Input": { - "read1GzFileUri.$": "$.get_fastq_rgid_from_dynamodb_step.Item.read1_gz_file_uri.S", - "read2GzFileUri.$": "$.get_fastq_rgid_from_dynamodb_step.Item.read2_gz_file_uri.S", - "cacheDir.$": "$.get_fastq_rgid_from_dynamodb_step.Item.cache_uri.S" - } - }, - "ResultSelector": { - "read1RawMd5sum.$": "$.Output.read1RawMd5sum", - "read2RawMd5sum.$": "$.Output.read2RawMd5sum" - }, - "ResultPath": "$.get_raw_md5sums_step", - "Next": "Add raw md5sums to db" - }, - "Add raw md5sums to db": { - "Type": "Task", - "Resource": "arn:aws:states:::dynamodb:updateItem", - "Parameters": { - "TableName": "${__table_name__}", - "Key": { - "id.$": "$.rgid_pair.rgid", - "id_type": "${__fastq_list_row_table_partition_name__}" - }, - "UpdateExpression": "SET read1_raw_md5sum = :read1RawMd5sum, read2_raw_md5sum = :read2RawMd5sum", - "ExpressionAttributeValues": { - ":read1RawMd5sum": { - "S.$": "$.get_raw_md5sums_step.read1RawMd5sum" - }, - ":read2RawMd5sum": { - "S.$": "$.get_raw_md5sums_step.read2RawMd5sum" - } - } - }, - "End": true - } + "ResultPath": null, + "Next": "Get Raw MD5sums for fastq gzip pair" + }, + "Get Raw MD5sums for fastq gzip pair": { + "Type": "Task", + "Resource": "arn:aws:states:::states:startExecution.sync:2", + "Parameters": { + "StateMachineArn": "${__get_raw_md5sums_for_fastq_gzip_pair_sfn_arn__}", + "Input": { + "rgid_list.$": "$.get_rgid_pairs.rgids_in_samplesheet" } }, - "End": true, - "ResultPath": null + "End": true } } } diff --git a/lib/workload/stateless/stacks/ora-compression-manager/step_functions_templates/set_compression_outputs.asl.json b/lib/workload/stateless/stacks/ora-compression-manager/step_functions_templates/set_compression_outputs.asl.json index bfe2fa77f..d7dfef23c 100644 --- a/lib/workload/stateless/stacks/ora-compression-manager/step_functions_templates/set_compression_outputs.asl.json +++ b/lib/workload/stateless/stacks/ora-compression-manager/step_functions_templates/set_compression_outputs.asl.json @@ -9,7 +9,7 @@ "TableName": "${__table_name__}", "Key": { "id.$": "$.portal_run_id", - "id_type": "portal_run_id" + "id_type": "${__portal_run_id_table_partition_name__}" } }, "ResultSelector": { @@ -58,7 +58,7 @@ "TableName": "${__table_name__}", "Key": { "id.$": "$.portal_run_id", - "id_type": "portal_run_id" + "id_type": "${__portal_run_id_table_partition_name__}" }, "UpdateExpression": "SET analysis_output = :output_json", "ExpressionAttributeValues": { @@ -68,122 +68,28 @@ } }, "ResultPath": "$.update_entry_post_launch_step", - "Next": "Get RGIDs from DB" + "Next": "Rate Limit Get Raw MD5sums for fastq ora run" }, - "Get RGIDs from DB": { + "Rate Limit Get Raw MD5sums for fastq ora run": { "Type": "Task", - "Resource": "arn:aws:states:::dynamodb:getItem", + "Resource": "arn:aws:states:::states:startExecution.sync:2", "Parameters": { - "TableName": "${__table_name__}", - "Key": { - "id.$": "$.get_db_attributes_step.ready_event_data_inputs.instrumentRunId", - "id_type": "${__instrument_run_id_table_partition_name__}" - } + "StateMachineArn": "${__rate_limit_get_raw_md5sums_ora_sfn_arn__}", + "Input": {} }, - "ResultSelector": { - "rgids.$": "$.Item.rgid_list.SS" - }, - "ResultPath": "$.get_rgids_from_db_step", - "Next": "Merge fastq list csv with rgids" + "ResultPath": null, + "Next": "Get Raw MD5sums for fastq ora pair" }, - "Merge fastq list csv with rgids": { + "Get Raw MD5sums for fastq ora pair": { "Type": "Task", - "Resource": "arn:aws:states:::lambda:invoke", + "Resource": "arn:aws:states:::states:startExecution.sync:2", "Parameters": { - "FunctionName": "${__merge_fastq_list_csv_with_rgid_lambda_function_arn__}", - "Payload": { - "rgids_list.$": "$.get_rgids_from_db_step.rgids", + "StateMachineArn": "${__get_raw_md5sums_for_fastq_ora_pair_sfn_arn__}", + "Input": { + "instrument_run_id.$": "$.get_db_attributes_step.ready_event_data_inputs.instrumentRunId", "instrument_run_folder_uri.$": "$.analysis_outputs_step.output_json.instrumentRunOraOutputUri" } }, - "Retry": [ - { - "ErrorEquals": [ - "Lambda.ServiceException", - "Lambda.AWSLambdaException", - "Lambda.SdkClientException", - "Lambda.TooManyRequestsException" - ], - "IntervalSeconds": 1, - "MaxAttempts": 3, - "BackoffRate": 2, - "JitterStrategy": "FULL" - } - ], - "ResultSelector": { - "fastq_ora_file_ora_by_rgid.$": "$.Payload" - }, - "ResultPath": "$.merge_rgids_step", - "Next": "Validate Fastq Outputs" - }, - "Validate Fastq Outputs": { - "Type": "Map", - "ItemsPath": "$.merge_rgids_step.fastq_ora_file_ora_by_rgid", - "ItemProcessor": { - "ProcessorConfig": { - "Mode": "INLINE" - }, - "StartAt": "Get Raw md5sums for rgid", - "States": { - "Get Raw md5sums for rgid": { - "Type": "Task", - "Resource": "arn:aws:states:::dynamodb:getItem", - "Parameters": { - "TableName": "${__table_name__}", - "Key": { - "id.$": "$.rgid", - "id_type": "${__fastq_list_row_table_partition_name__}" - } - }, - "ResultPath": "$.get_raw_md5sums_for_rgid_step", - "Next": "Update fastq list row partition with ora outputs" - }, - "Update fastq list row partition with ora outputs": { - "Type": "Task", - "Resource": "arn:aws:states:::dynamodb:updateItem", - "Parameters": { - "TableName": "${__table_name__}", - "Key": { - "id.$": "$.rgid", - "id_type": "${__fastq_list_row_table_partition_name__}" - }, - "UpdateExpression": "SET read1_ora_file_uri = :read1OraFileUri, read2_ora_file_uri = :read2OraFileUri", - "ExpressionAttributeValues": { - ":read1OraFileUri": { - "S.$": "$.read_1_file_uri" - }, - ":read2OraFileUri": { - "S.$": "$.read_2_file_uri" - } - } - }, - "ResultPath": null, - "Next": "Step Functions StartExecution" - }, - "Step Functions StartExecution": { - "Type": "Task", - "Resource": "arn:aws:states:::states:startExecution.sync:2", - "Parameters": { - "StateMachineArn": "${__ora_validation_sfn_arn__}", - "Input": { - "read1OraFileUri.$": "$.read_1_file_uri", - "read2OraFileUri.$": "$.read_2_file_uri", - "read1RawMd5sum.$": "$.get_raw_md5sums_for_rgid_step.Item.read1_raw_md5sum.S", - "read2RawMd5sum.$": "$.get_raw_md5sums_for_rgid_step.Item.read2_raw_md5sum.S", - "validationOnly": true - } - }, - "ResultPath": null, - "End": true - } - } - }, - "Next": "Wait 1 Second (Post-update)" - }, - "Wait 1 Second (Post-update)": { - "Type": "Wait", - "Seconds": 1, - "Comment": "Wait for databases to sync before continuing", "End": true } }