Skip to content

Commit

Permalink
Run a partition over the rgid first and then run merge fastq list csv…
Browse files Browse the repository at this point in the history
… per partition
  • Loading branch information
alexiswl committed Dec 1, 2024
1 parent 7b78311 commit 8a4a72a
Showing 1 changed file with 98 additions and 73 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,102 +16,127 @@
"rgids.$": "$.Item.rgid_list.SS"
},
"ResultPath": "$.get_rgids_from_db_step",
"Next": "Merge fastq list csv with rgids"
"Next": "Partition RGID array"
},
"Merge fastq list csv with rgids": {
"Type": "Task",
"Resource": "arn:aws:states:::lambda:invoke",
"Partition RGID array": {
"Type": "Pass",
"Next": "For each rgid partition",
"Parameters": {
"FunctionName": "${__merge_fastq_list_csv_with_rgid_lambda_function_arn__}",
"Payload": {
"rgids_list.$": "$.get_rgids_from_db_step.rgids",
"instrument_run_folder_uri.$": "$.instrument_run_folder_uri"
}
},
"Retry": [
{
"ErrorEquals": [
"Lambda.ServiceException",
"Lambda.AWSLambdaException",
"Lambda.SdkClientException",
"Lambda.TooManyRequestsException"
],
"IntervalSeconds": 1,
"MaxAttempts": 3,
"BackoffRate": 2,
"JitterStrategy": "FULL"
}
],
"ResultSelector": {
"fastq_ora_file_ora_by_rgid.$": "$.Payload"
"rgid_partitions.$": "States.ArrayPartition($.get_rgids_from_db_step.rgids, 4)"
},
"ResultPath": "$.merge_rgids_step",
"Next": "Validate Fastq Outputs"
"ResultPath": "$.partition_rgid_array_step"
},
"Validate Fastq Outputs": {
"For each rgid partition": {
"Type": "Map",
"ItemsPath": "$.merge_rgids_step.fastq_ora_file_ora_by_rgid",
"ItemsPath": "$.partition_rgid_array_step.rgid_partitions",
"ItemSelector": {
"rgids_list.$": "$$.Map.Item.Value",
"instrument_run_folder_uri.$": "$.instrument_run_folder_uri"
},
"ItemProcessor": {
"ProcessorConfig": {
"Mode": "INLINE"
},
"StartAt": "Get Raw md5sums for rgid",
"StartAt": "Merge fastq list csv with rgids",
"States": {
"Get Raw md5sums for rgid": {
"Merge fastq list csv with rgids": {
"Type": "Task",
"Resource": "arn:aws:states:::dynamodb:getItem",
"Resource": "arn:aws:states:::lambda:invoke",
"Parameters": {
"TableName": "${__table_name__}",
"Key": {
"id.$": "$.rgid",
"id_type": "${__fastq_list_row_table_partition_name__}"
"FunctionName": "${__merge_fastq_list_csv_with_rgid_lambda_function_arn__}",
"Payload": {
"rgids_list.$": "$.rgids_list",
"instrument_run_folder_uri.$": "$.instrument_run_folder_uri"
}
},
"Retry": [
{
"ErrorEquals": [
"Lambda.ServiceException",
"Lambda.AWSLambdaException",
"Lambda.SdkClientException",
"Lambda.TooManyRequestsException"
],
"IntervalSeconds": 1,
"MaxAttempts": 3,
"BackoffRate": 2,
"JitterStrategy": "FULL"
}
],
"ResultSelector": {
"fastq_ora_file_ora_by_rgid.$": "$.Payload"
},
"ResultPath": "$.get_raw_md5sums_for_rgid_step",
"Next": "Update fastq list row partition with ora outputs"
"ResultPath": "$.merge_rgids_step",
"Next": "Validate Fastq Outputs"
},
"Update fastq list row partition with ora outputs": {
"Type": "Task",
"Resource": "arn:aws:states:::dynamodb:updateItem",
"Parameters": {
"TableName": "${__table_name__}",
"Key": {
"id.$": "$.rgid",
"id_type": "${__fastq_list_row_table_partition_name__}"
"Validate Fastq Outputs": {
"Type": "Map",
"ItemsPath": "$.merge_rgids_step.fastq_ora_file_ora_by_rgid",
"ItemProcessor": {
"ProcessorConfig": {
"Mode": "INLINE"
},
"UpdateExpression": "SET read1_ora_file_uri = :read1OraFileUri, read2_ora_file_uri = :read2OraFileUri",
"ExpressionAttributeValues": {
":read1OraFileUri": {
"S.$": "$.read_1_file_uri"
"StartAt": "Get Raw md5sums for rgid",
"States": {
"Get Raw md5sums for rgid": {
"Type": "Task",
"Resource": "arn:aws:states:::dynamodb:getItem",
"Parameters": {
"TableName": "${__table_name__}",
"Key": {
"id.$": "$.rgid",
"id_type": "${__fastq_list_row_table_partition_name__}"
}
},
"ResultPath": "$.get_raw_md5sums_for_rgid_step",
"Next": "Update fastq list row partition with ora outputs"
},
":read2OraFileUri": {
"S.$": "$.read_2_file_uri"
"Update fastq list row partition with ora outputs": {
"Type": "Task",
"Resource": "arn:aws:states:::dynamodb:updateItem",
"Parameters": {
"TableName": "${__table_name__}",
"Key": {
"id.$": "$.rgid",
"id_type": "${__fastq_list_row_table_partition_name__}"
},
"UpdateExpression": "SET read1_ora_file_uri = :read1OraFileUri, read2_ora_file_uri = :read2OraFileUri",
"ExpressionAttributeValues": {
":read1OraFileUri": {
"S.$": "$.read_1_file_uri"
},
":read2OraFileUri": {
"S.$": "$.read_2_file_uri"
}
}
},
"ResultPath": null,
"Next": "Start ORA Validation"
},
"Start ORA Validation": {
"Type": "Task",
"Resource": "arn:aws:states:::states:startExecution.sync:2",
"Parameters": {
"StateMachineArn": "${__ora_validation_sfn_arn__}",
"Input": {
"read1OraFileUri.$": "$.read_1_file_uri",
"read2OraFileUri.$": "$.read_2_file_uri",
"read1RawMd5sum.$": "$.get_raw_md5sums_for_rgid_step.Item.read1_raw_md5sum.S",
"read2RawMd5sum.$": "$.get_raw_md5sums_for_rgid_step.Item.read2_raw_md5sum.S",
"validationOnly": true
}
},
"ResultPath": null,
"End": true
}
}
},
"ResultPath": null,
"Next": "Step Functions StartExecution"
},
"Step Functions StartExecution": {
"Type": "Task",
"Resource": "arn:aws:states:::states:startExecution.sync:2",
"Parameters": {
"StateMachineArn": "${__ora_validation_sfn_arn__}",
"Input": {
"read1OraFileUri.$": "$.read_1_file_uri",
"read2OraFileUri.$": "$.read_2_file_uri",
"read1RawMd5sum.$": "$.get_raw_md5sums_for_rgid_step.Item.read1_raw_md5sum.S",
"read2RawMd5sum.$": "$.get_raw_md5sums_for_rgid_step.Item.read2_raw_md5sum.S",
"validationOnly": true
}
},
"ResultPath": null,
"End": true
"End": true,
"ResultPath": null
}
}
},
"End": true,
"ResultPath": null
"End": true
}
}
}

0 comments on commit 8a4a72a

Please sign in to comment.