Skip to content

Commit

Permalink
Merge pull request #774 from umccr/enhancement/allow-gzip-and-ora-com…
Browse files Browse the repository at this point in the history
…pression

Decompress ora files prior to running if gzip files are also present
  • Loading branch information
alexiswl authored Dec 13, 2024
2 parents 8a6af8f + 1d304c2 commit e4e42c6
Show file tree
Hide file tree
Showing 11 changed files with 574 additions and 58 deletions.
4 changes: 2 additions & 2 deletions config/constants.ts
Original file line number Diff line number Diff line change
Expand Up @@ -354,7 +354,7 @@ TN Stateless stack
*/

// Deployed under dev/stg/prod
export const tnIcav2PipelineIdSSMParameterPath = '/icav2/umccr-prod/tumor_normal_4.2.4_pipeline_id'; // 6ce2b636-ba2f-4004-8065-f3557f286c98
export const tnIcav2PipelineIdSSMParameterPath = '/icav2/umccr-prod/tumor_normal_4.2.4_pipeline_id'; // 7d3cb608-80e0-4ecf-a67e-ef524e9bfb8b
export const tnIcav2PipelineWorkflowType = 'tumor-normal';
export const tnIcav2PipelineWorkflowTypeVersion = '4.2.4';
export const tnIcav2ServiceVersion = '2024.07.01';
Expand Down Expand Up @@ -430,7 +430,7 @@ WTS Stateless stack
*/

// Deployed under dev/stg/prod
export const wtsIcav2PipelineIdSSMParameterPath = '/icav2/umccr-prod/wts_4.2.4_pipeline_id'; // 1e53ae07-08a6-458b-9fa3-9cf7430409a0
export const wtsIcav2PipelineIdSSMParameterPath = '/icav2/umccr-prod/wts_4.2.4_pipeline_id'; // 73e21ce0-60d7-4e3e-b130-88aff78d500d
export const wtsIcav2PipelineWorkflowType = 'wts';
export const wtsIcav2PipelineWorkflowTypeVersion = '4.2.4';
export const wtsIcav2ServiceVersion = '2024.07.01';
Expand Down
2 changes: 2 additions & 0 deletions config/stacks/wtsPipelineManager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import {
icav2FastaReferenceUriMappingSSMParameterPath,
icav2GencodeAnnotationUriMappingSSMParameterPath,
icav2WtsQcReferenceSamplesUriMappingSSMParameterPath,
dragenIcav2OraReferenceUriSSMParameterPath,
} from '../constants';
import { WtsIcav2PipelineManagerConfig } from '../../lib/workload/stateless/stacks/transcriptome-pipeline-manager/deploy';
import { WtsIcav2PipelineTableConfig } from '../../lib/workload/stateful/stacks/wts-dynamo-db/deploy/stack';
Expand Down Expand Up @@ -65,6 +66,7 @@ export const getWtsIcav2PipelineManagerStackProps = (
fastaReferenceUriSsmPath: icav2FastaReferenceUriMappingSSMParameterPath,
gencodeAnnotationUriSsmPath: icav2GencodeAnnotationUriMappingSSMParameterPath,
wtsQcReferenceSamplesSsmPath: icav2WtsQcReferenceSamplesUriMappingSSMParameterPath,
oraReferenceUriSsmPath: dragenIcav2OraReferenceUriSSMParameterPath,
/* Default Versions */
defaultArribaVersion: wtsDefaultArribaVersion,
defaultDragenReferenceVersion: wtsDefaultDragenReferenceVersion,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,8 @@ else
"${INPUT_URI}"
)" | \
/usr/local/bin/orad \
--gzip \
--gz \
--gz-level 1 \
--stdout \
--ora-reference "${ORADATA_PATH}" \
- | \
Expand Down Expand Up @@ -139,7 +140,8 @@ else
--gz \
--gz-level 1 \
--stdout \
--ora-reference "${ORADATA_PATH}" | \
--ora-reference "${ORADATA_PATH}" \
- | \
(
AWS_ACCESS_KEY_ID="$( \
jq -r '.AWS_ACCESS_KEY_ID' <<< "${aws_s3_access_creds_json_str}"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,7 @@ export class Cttsov2Icav2PipelineManagerConstruct extends Construct {
props.icav2CopyFilesStateMachineObj.grantStartExecution(configureInputsSfn);
props.icav2CopyFilesStateMachineObj.grantRead(configureInputsSfn);
props.oraDecompressionStateMachineObj.grantStartExecution(configureInputsSfn);
props.oraDecompressionStateMachineObj.grantRead(configureInputsSfn);

// Because we run a nested state machine, we need to add the permissions to the state machine role
// See https://stackoverflow.com/questions/60612853/nested-step-function-in-a-step-function-unknown-error-not-authorized-to-cr
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,8 @@
"read1GzOutputFileUri.$": "$.get_ora_cache_uri_step.read_1_gz_output_uri",
"read2GzOutputFileUri.$": "$.get_ora_cache_uri_step.read_2_gz_output_uri",
"read1EstimatedGzFileSize": -1,
"read2EstimatedGzFileSize": -1
"read2EstimatedGzFileSize": -1,
"validationOnly.$": false
}
},
"End": true
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ export interface WtsIcav2PipelineManagerConfig {
gencodeAnnotationUriSsmPath: string; // "/icav2/umccr-prod/gencode-annotation-uri" // FIXME
arribaUriSsmPath: string; // "/icav2/umccr-prod/arriba-uri" // FIXME
wtsQcReferenceSamplesSsmPath: string; // "/icav2/umccr-prod/wts-qc-reference-samples" // FIXME
oraReferenceUriSsmPath: string; // // FIXME
/* Defaults */
defaultDragenReferenceVersion: string; // v9-r3
defaultFastaReferenceVersion: string; // hg38
Expand Down Expand Up @@ -90,6 +91,13 @@ export class WtsIcav2PipelineManagerStack extends cdk.Stack {
props.dragenReferenceUriSsmPath
);

// ORA Reference
const oraReferenceSsmObj = ssm.StringParameter.fromStringParameterName(
this,
props.oraReferenceUriSsmPath,
props.oraReferenceUriSsmPath
);

// Fasta Reference
const fastaReferenceSsmObj = ssm.StringParameter.fromStringParameterName(
this,
Expand Down Expand Up @@ -146,6 +154,21 @@ export class WtsIcav2PipelineManagerStack extends cdk.Stack {
}
);

// Lambda function to check fastq list rows for ora inputs
const hasOraInputsLambdaObj = new PythonFunction(
this,
'add_ora_reference_lambda_python_function',
{
entry: path.join(__dirname, '../lambdas/has_ora_inputs_py'),
runtime: lambda.Runtime.PYTHON_3_12,
architecture: lambda.Architecture.ARM_64,
index: 'has_ora_inputs.py',
handler: 'handler',
memorySize: 1024,
timeout: Duration.seconds(60),
}
);

// Specify the statemachine and replace the arn placeholders with the lambda arns defined above
const configureInputsSfn = new sfn.StateMachine(
this,
Expand All @@ -167,6 +190,7 @@ export class WtsIcav2PipelineManagerStack extends cdk.Stack {
__annotation_version_uri_ssm_parameter_name__: annotationSsmObj.parameterName,
__qc_reference_samples_version_uri_ssm_parameter_name__:
wtsQcReferenceSamplesSsmObj.parameterName,
__ora_reference_uri_ssm_parameter_path__: oraReferenceSsmObj.parameterName,

/* Defaults */
__default_reference_version__: props.defaultDragenReferenceVersion,
Expand All @@ -180,6 +204,7 @@ export class WtsIcav2PipelineManagerStack extends cdk.Stack {
convertFastqListRowsToCwlInputObjectsLambdaObj.currentVersion.functionArn,
__get_boolean_parameters_lambda_function_arn__:
getBooleanParametersFromEventInputLambdaObj.currentVersion.functionArn,
__has_ora_inputs_lambda_function_arn__: hasOraInputsLambdaObj.currentVersion.functionArn,
},
}
);
Expand All @@ -188,6 +213,7 @@ export class WtsIcav2PipelineManagerStack extends cdk.Stack {
[
convertFastqListRowsToCwlInputObjectsLambdaObj,
getBooleanParametersFromEventInputLambdaObj,
hasOraInputsLambdaObj,
].forEach((lambdaObj) => {
lambdaObj.currentVersion.grantInvoke(configureInputsSfn);
});
Expand All @@ -202,6 +228,7 @@ export class WtsIcav2PipelineManagerStack extends cdk.Stack {
arribaSsmObj,
annotationSsmObj,
wtsQcReferenceSamplesSsmObj,
oraReferenceSsmObj,
].forEach((ssmObj) => {
ssmObj.grantRead(configureInputsSfn);
});
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,170 @@
#!/usr/bin/env python3

"""
Add ora reference
"""
from pathlib import Path
from typing import Dict, Optional, List


def handler(event, context) -> Dict[str, bool]:
"""
Get the boolean parameters from the event input
:param event:
:param context:
:return: Dictionary of boolean parameters
"""

# Collect the event data input
event_data_input: Dict = event['event_data_input']

# Get the fastq list rows
tumor_fastq_list_rows: Optional[List] = event_data_input.get('tumorFastqListRows', None)
normal_fastq_list_rows: Optional[List] = event_data_input.get('fastqListRows', None)

# If tumorFastqListRows is None and fastqListRows is None, return false
if tumor_fastq_list_rows is None and normal_fastq_list_rows is None:
return {
"add_ora_step": False,
"is_hybrid": False
}

add_ora_step = False
for fastq_list_row_iter in [tumor_fastq_list_rows, normal_fastq_list_rows]:
if fastq_list_row_iter is not None:
# If fastqListRows is not None, return true
# Iterate over each of the fastq list rows, if one of the read1FileUri or read2FileUri end with .fastq.ora
# return true
if any(
[
row.get('read1FileUri', '').endswith('.fastq.ora') or
row.get('read2FileUri', '').endswith('.fastq.ora')
for row in fastq_list_row_iter
]
):
add_ora_step = True

# Check if hybrid
endings = []
for fastq_list_row_iter in [tumor_fastq_list_rows, normal_fastq_list_rows]:
if fastq_list_row_iter is not None:
endings.extend(
list(set(list(map(
lambda fastq_list_row_: Path(fastq_list_row_.get("read1FileUri")).suffix,
fastq_list_row_iter
))))
)

if len(list(set(endings))) > 1:
# Don't need ora when hybrid since ora samples will be dropped
is_hybrid = True
add_ora_step = False
else:
is_hybrid = False

# Got to here? Return false
return {
"add_ora_step": add_ora_step,
"is_hybrid": is_hybrid
}


# if __name__ == "__main__":
# import json
#
# print(
# json.dumps(
# handler(
# {
# "event_data_input": {
# "tumorFastqListRows": [
# {
# "rgid": "ATGAGGCC.CAATTAAC.2",
# "rgsm": "L2400195",
# "rglb": "L2400195",
# "lane": 2,
# "read1FileUri": "icav2://ea19a3f5-ec7c-4940-a474-c31cd91dbad4/primary/240229_A00130_0288_BH5HM2DSXC/2024071110689063/Samples/Lane_2/L2400195/L2400195_S9_L002_R1_001.fastq.gz",
# "read2FileUri": "icav2://ea19a3f5-ec7c-4940-a474-c31cd91dbad4/primary/240229_A00130_0288_BH5HM2DSXC/2024071110689063/Samples/Lane_2/L2400195/L2400195_S9_L002_R2_001.fastq.gz"
# },
# {
# "rgid": "ATGAGGCC.CAATTAAC.3",
# "rgsm": "L2400195",
# "rglb": "L2400195",
# "lane": 3,
# "read1FileUri": "icav2://ea19a3f5-ec7c-4940-a474-c31cd91dbad4/primary/240229_A00130_0288_BH5HM2DSXC/2024071110689063/Samples/Lane_3/L2400195/L2400195_S9_L003_R1_001.fastq.gz",
# "read2FileUri": "icav2://ea19a3f5-ec7c-4940-a474-c31cd91dbad4/primary/240229_A00130_0288_BH5HM2DSXC/2024071110689063/Samples/Lane_3/L2400195/L2400195_S9_L003_R2_001.fastq.gz"
# }
# ],
# "fastqListRows": [
# {
# "rgid": "GCACGGAC.TGCGAGAC.4",
# "rgsm": "L2400191",
# "rglb": "L2400191",
# "lane": 4,
# "read1FileUri": "icav2://ea19a3f5-ec7c-4940-a474-c31cd91dbad4/primary/240229_A00130_0288_BH5HM2DSXC/2024071110689063/Samples/Lane_4/L2400191/L2400191_S17_L004_R1_001.fastq.gz",
# "read2FileUri": "icav2://ea19a3f5-ec7c-4940-a474-c31cd91dbad4/primary/240229_A00130_0288_BH5HM2DSXC/2024071110689063/Samples/Lane_4/L2400191/L2400191_S17_L004_R2_001.fastq.gz"
# }
# ],
# "dragenReferenceVersion": "v9-r3"
# }
# },
# None
# ),
# indent=4
# )
# )
#
# # {
# # "add_ora_step": false,
# # "is_hybrid": false
# # }
#
# if __name__ == "__main__":
# import json
#
# print(
# json.dumps(
# handler(
# {
# "event_data_input": {
# "tumorFastqListRows": [
# {
# "rgid": "ATGAGGCC.CAATTAAC.2",
# "rgsm": "L2400195",
# "rglb": "L2400195",
# "lane": 2,
# "read1FileUri": "icav2://ea19a3f5-ec7c-4940-a474-c31cd91dbad4/primary/240229_A00130_0288_BH5HM2DSXC/2024071110689063/Samples/Lane_2/L2400195/L2400195_S9_L002_R1_001.fastq.gz",
# "read2FileUri": "icav2://ea19a3f5-ec7c-4940-a474-c31cd91dbad4/primary/240229_A00130_0288_BH5HM2DSXC/2024071110689063/Samples/Lane_2/L2400195/L2400195_S9_L002_R2_001.fastq.gz"
# },
# {
# "rgid": "ATGAGGCC.CAATTAAC.3",
# "rgsm": "L2400195",
# "rglb": "L2400195",
# "lane": 3,
# "read1FileUri": "icav2://ea19a3f5-ec7c-4940-a474-c31cd91dbad4/primary/240229_A00130_0288_BH5HM2DSXC/2024071110689063/Samples/Lane_3/L2400195/L2400195_S9_L003_R1_001.fastq.gz",
# "read2FileUri": "icav2://ea19a3f5-ec7c-4940-a474-c31cd91dbad4/primary/240229_A00130_0288_BH5HM2DSXC/2024071110689063/Samples/Lane_3/L2400195/L2400195_S9_L003_R2_001.fastq.gz"
# }
# ],
# "fastqListRows": [
# {
# "rgid": "GCACGGAC.TGCGAGAC.4",
# "rgsm": "L2400191",
# "rglb": "L2400191",
# "lane": 4,
# "read1FileUri": "icav2://ea19a3f5-ec7c-4940-a474-c31cd91dbad4/primary/240229_A00130_0288_BH5HM2DSXC/2024071110689063/Samples/Lane_4/L2400191/L2400191_S17_L004_R1_001.fastq.ora",
# "read2FileUri": "icav2://ea19a3f5-ec7c-4940-a474-c31cd91dbad4/primary/240229_A00130_0288_BH5HM2DSXC/2024071110689063/Samples/Lane_4/L2400191/L2400191_S17_L004_R2_001.fastq.ora"
# }
# ],
# "dragenReferenceVersion": "v9-r3"
# }
# },
# None
# ),
# indent=4
# )
# )
#
# # {
# # "add_ora_step": true,
# # "is_hybrid": true
# # }
Loading

0 comments on commit e4e42c6

Please sign in to comment.