From 8a4a72a317310e594b223c20d48ce83616936f09 Mon Sep 17 00:00:00 2001 From: Alexis Lucattini Date: Mon, 2 Dec 2024 07:12:34 +1100 Subject: [PATCH] Run a partition over the rgid first and then run merge fastq list csv per partition --- ...um_for_all_fastq_ora_sfn_template.asl.json | 171 ++++++++++-------- 1 file changed, 98 insertions(+), 73 deletions(-) diff --git a/lib/workload/stateless/stacks/ora-compression-manager/step_functions_templates/get_raw_md5sum_for_all_fastq_ora_sfn_template.asl.json b/lib/workload/stateless/stacks/ora-compression-manager/step_functions_templates/get_raw_md5sum_for_all_fastq_ora_sfn_template.asl.json index 6f8b16337..cb3e56262 100644 --- a/lib/workload/stateless/stacks/ora-compression-manager/step_functions_templates/get_raw_md5sum_for_all_fastq_ora_sfn_template.asl.json +++ b/lib/workload/stateless/stacks/ora-compression-manager/step_functions_templates/get_raw_md5sum_for_all_fastq_ora_sfn_template.asl.json @@ -16,102 +16,127 @@ "rgids.$": "$.Item.rgid_list.SS" }, "ResultPath": "$.get_rgids_from_db_step", - "Next": "Merge fastq list csv with rgids" + "Next": "Partition RGID array" }, - "Merge fastq list csv with rgids": { - "Type": "Task", - "Resource": "arn:aws:states:::lambda:invoke", + "Partition RGID array": { + "Type": "Pass", + "Next": "For each rgid partition", "Parameters": { - "FunctionName": "${__merge_fastq_list_csv_with_rgid_lambda_function_arn__}", - "Payload": { - "rgids_list.$": "$.get_rgids_from_db_step.rgids", - "instrument_run_folder_uri.$": "$.instrument_run_folder_uri" - } - }, - "Retry": [ - { - "ErrorEquals": [ - "Lambda.ServiceException", - "Lambda.AWSLambdaException", - "Lambda.SdkClientException", - "Lambda.TooManyRequestsException" - ], - "IntervalSeconds": 1, - "MaxAttempts": 3, - "BackoffRate": 2, - "JitterStrategy": "FULL" - } - ], - "ResultSelector": { - "fastq_ora_file_ora_by_rgid.$": "$.Payload" + "rgid_partitions.$": "States.ArrayPartition($.get_rgids_from_db_step.rgids, 4)" }, - "ResultPath": "$.merge_rgids_step", - "Next": "Validate Fastq Outputs" + "ResultPath": "$.partition_rgid_array_step" }, - "Validate Fastq Outputs": { + "For each rgid partition": { "Type": "Map", - "ItemsPath": "$.merge_rgids_step.fastq_ora_file_ora_by_rgid", + "ItemsPath": "$.partition_rgid_array_step.rgid_partitions", + "ItemSelector": { + "rgids_list.$": "$$.Map.Item.Value", + "instrument_run_folder_uri.$": "$.instrument_run_folder_uri" + }, "ItemProcessor": { "ProcessorConfig": { "Mode": "INLINE" }, - "StartAt": "Get Raw md5sums for rgid", + "StartAt": "Merge fastq list csv with rgids", "States": { - "Get Raw md5sums for rgid": { + "Merge fastq list csv with rgids": { "Type": "Task", - "Resource": "arn:aws:states:::dynamodb:getItem", + "Resource": "arn:aws:states:::lambda:invoke", "Parameters": { - "TableName": "${__table_name__}", - "Key": { - "id.$": "$.rgid", - "id_type": "${__fastq_list_row_table_partition_name__}" + "FunctionName": "${__merge_fastq_list_csv_with_rgid_lambda_function_arn__}", + "Payload": { + "rgids_list.$": "$.rgids_list", + "instrument_run_folder_uri.$": "$.instrument_run_folder_uri" + } + }, + "Retry": [ + { + "ErrorEquals": [ + "Lambda.ServiceException", + "Lambda.AWSLambdaException", + "Lambda.SdkClientException", + "Lambda.TooManyRequestsException" + ], + "IntervalSeconds": 1, + "MaxAttempts": 3, + "BackoffRate": 2, + "JitterStrategy": "FULL" } + ], + "ResultSelector": { + "fastq_ora_file_ora_by_rgid.$": "$.Payload" }, - "ResultPath": "$.get_raw_md5sums_for_rgid_step", - "Next": "Update fastq list row partition with ora outputs" + "ResultPath": "$.merge_rgids_step", + "Next": "Validate Fastq Outputs" }, - "Update fastq list row partition with ora outputs": { - "Type": "Task", - "Resource": "arn:aws:states:::dynamodb:updateItem", - "Parameters": { - "TableName": "${__table_name__}", - "Key": { - "id.$": "$.rgid", - "id_type": "${__fastq_list_row_table_partition_name__}" + "Validate Fastq Outputs": { + "Type": "Map", + "ItemsPath": "$.merge_rgids_step.fastq_ora_file_ora_by_rgid", + "ItemProcessor": { + "ProcessorConfig": { + "Mode": "INLINE" }, - "UpdateExpression": "SET read1_ora_file_uri = :read1OraFileUri, read2_ora_file_uri = :read2OraFileUri", - "ExpressionAttributeValues": { - ":read1OraFileUri": { - "S.$": "$.read_1_file_uri" + "StartAt": "Get Raw md5sums for rgid", + "States": { + "Get Raw md5sums for rgid": { + "Type": "Task", + "Resource": "arn:aws:states:::dynamodb:getItem", + "Parameters": { + "TableName": "${__table_name__}", + "Key": { + "id.$": "$.rgid", + "id_type": "${__fastq_list_row_table_partition_name__}" + } + }, + "ResultPath": "$.get_raw_md5sums_for_rgid_step", + "Next": "Update fastq list row partition with ora outputs" }, - ":read2OraFileUri": { - "S.$": "$.read_2_file_uri" + "Update fastq list row partition with ora outputs": { + "Type": "Task", + "Resource": "arn:aws:states:::dynamodb:updateItem", + "Parameters": { + "TableName": "${__table_name__}", + "Key": { + "id.$": "$.rgid", + "id_type": "${__fastq_list_row_table_partition_name__}" + }, + "UpdateExpression": "SET read1_ora_file_uri = :read1OraFileUri, read2_ora_file_uri = :read2OraFileUri", + "ExpressionAttributeValues": { + ":read1OraFileUri": { + "S.$": "$.read_1_file_uri" + }, + ":read2OraFileUri": { + "S.$": "$.read_2_file_uri" + } + } + }, + "ResultPath": null, + "Next": "Start ORA Validation" + }, + "Start ORA Validation": { + "Type": "Task", + "Resource": "arn:aws:states:::states:startExecution.sync:2", + "Parameters": { + "StateMachineArn": "${__ora_validation_sfn_arn__}", + "Input": { + "read1OraFileUri.$": "$.read_1_file_uri", + "read2OraFileUri.$": "$.read_2_file_uri", + "read1RawMd5sum.$": "$.get_raw_md5sums_for_rgid_step.Item.read1_raw_md5sum.S", + "read2RawMd5sum.$": "$.get_raw_md5sums_for_rgid_step.Item.read2_raw_md5sum.S", + "validationOnly": true + } + }, + "ResultPath": null, + "End": true } } }, - "ResultPath": null, - "Next": "Step Functions StartExecution" - }, - "Step Functions StartExecution": { - "Type": "Task", - "Resource": "arn:aws:states:::states:startExecution.sync:2", - "Parameters": { - "StateMachineArn": "${__ora_validation_sfn_arn__}", - "Input": { - "read1OraFileUri.$": "$.read_1_file_uri", - "read2OraFileUri.$": "$.read_2_file_uri", - "read1RawMd5sum.$": "$.get_raw_md5sums_for_rgid_step.Item.read1_raw_md5sum.S", - "read2RawMd5sum.$": "$.get_raw_md5sums_for_rgid_step.Item.read2_raw_md5sum.S", - "validationOnly": true - } - }, - "ResultPath": null, - "End": true + "End": true, + "ResultPath": null } } }, - "End": true, - "ResultPath": null + "End": true } } }