Skip to content

Commit

Permalink
Decompress ora files prior to running if gzip files are also present
Browse files Browse the repository at this point in the history
  • Loading branch information
alexiswl committed Dec 13, 2024
1 parent 8a6af8f commit ba2a11c
Show file tree
Hide file tree
Showing 5 changed files with 294 additions and 50 deletions.
2 changes: 1 addition & 1 deletion config/constants.ts
Original file line number Diff line number Diff line change
Expand Up @@ -354,7 +354,7 @@ TN Stateless stack
*/

// Deployed under dev/stg/prod
export const tnIcav2PipelineIdSSMParameterPath = '/icav2/umccr-prod/tumor_normal_4.2.4_pipeline_id'; // 6ce2b636-ba2f-4004-8065-f3557f286c98
export const tnIcav2PipelineIdSSMParameterPath = '/icav2/umccr-prod/tumor_normal_4.2.4_pipeline_id'; // 7d3cb608-80e0-4ecf-a67e-ef524e9bfb8b
export const tnIcav2PipelineWorkflowType = 'tumor-normal';
export const tnIcav2PipelineWorkflowTypeVersion = '4.2.4';
export const tnIcav2ServiceVersion = '2024.07.01';
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,7 @@ export class Cttsov2Icav2PipelineManagerConstruct extends Construct {
props.icav2CopyFilesStateMachineObj.grantStartExecution(configureInputsSfn);
props.icav2CopyFilesStateMachineObj.grantRead(configureInputsSfn);
props.oraDecompressionStateMachineObj.grantStartExecution(configureInputsSfn);
props.oraDecompressionStateMachineObj.grantRead(configureInputsSfn);

// Because we run a nested state machine, we need to add the permissions to the state machine role
// See https://stackoverflow.com/questions/60612853/nested-step-function-in-a-step-function-unknown-error-not-authorized-to-cr
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ import { DefinitionBody } from 'aws-cdk-lib/aws-stepfunctions';
import { PythonLambdaFastqListRowsToCwlInputConstruct } from '../../../../components/python-lambda-fastq-list-rows-to-cwl-input';
import { WfmWorkflowStateChangeIcav2ReadyEventHandlerConstruct } from '../../../../components/sfn-icav2-ready-event-handler';
import { Icav2AnalysisEventHandlerConstruct } from '../../../../components/sfn-icav2-state-change-event-handler';
import { OraDecompressionConstruct } from '../../../../components/ora-file-decompression-fq-pair-sfn';
import { NagSuppressions } from 'cdk-nag';

export interface TnIcav2PipelineManagerConfig {
/* ICAv2 Pipeline analysis essentials */
Expand Down Expand Up @@ -127,6 +129,16 @@ export class TnIcav2PipelineManagerStack extends cdk.Stack {
}
);

// Get the ora decompression construct
const oraDecompressionStateMachineObj = new OraDecompressionConstruct(
this,
'ora_decompression_state_machine_obj',
{
icav2AccessTokenSecretId: this.icav2AccessTokenSecretObj.secretName,
sfnPrefix: `${props.stateMachinePrefix}-ora-to-gz`,
}
).sfnObject;

// Specify the statemachine and replace the arn placeholders with the lambda arns defined above
const configureInputsSfn = new sfn.StateMachine(
this,
Expand All @@ -153,6 +165,9 @@ export class TnIcav2PipelineManagerStack extends cdk.Stack {
getBooleanParametersFromEventInputLambdaObj.currentVersion.functionArn,
__add_ora_reference_lambda_function_arn__:
addOraReferenceLambdaObj.currentVersion.functionArn,
/* Step functions */
__ora_fastq_list_row_decompression_sfn_arn__:
oraDecompressionStateMachineObj.stateMachineArn,
},
}
);
Expand All @@ -174,6 +189,35 @@ export class TnIcav2PipelineManagerStack extends cdk.Stack {
ssmObj.grantRead(configureInputsSfn);
});

// Allow state machine to invoke the ora decompression state machine
oraDecompressionStateMachineObj.grantStartExecution(configureInputsSfn);
oraDecompressionStateMachineObj.grantRead(configureInputsSfn);

// Because we run a nested state machine, we need to add the permissions to the state machine role
// See https://stackoverflow.com/questions/60612853/nested-step-function-in-a-step-function-unknown-error-not-authorized-to-cr
configureInputsSfn.addToRolePolicy(
new iam.PolicyStatement({
resources: [
`arn:aws:events:${cdk.Aws.REGION}:${cdk.Aws.ACCOUNT_ID}:rule/StepFunctionsGetEventsForStepFunctionsExecutionRule`,
],
actions: ['events:PutTargets', 'events:PutRule', 'events:DescribeRule'],
})
);

// https://docs.aws.amazon.com/step-functions/latest/dg/connect-stepfunctions.html#sync-async-iam-policies
// Polling requires permission for states:DescribeExecution
NagSuppressions.addResourceSuppressions(
configureInputsSfn,
[
{
id: 'AwsSolutions-IAM5',
reason:
'grantRead uses asterisk at the end of executions, as we need permissions for all execution invocations',
},
],
true
);

/*
Part 2: Configure the lambdas and outputs step function
Quite a bit more complicated than regular ICAv2 workflow setup since we need to
Expand Down Expand Up @@ -201,7 +245,7 @@ export class TnIcav2PipelineManagerStack extends cdk.Stack {
);

// Add permissions to lambda
this.icav2AccessTokenSecretObj.grantRead(<iam.IRole>setOutputJsonLambdaObj.currentVersion.role);
this.icav2AccessTokenSecretObj.grantRead(setOutputJsonLambdaObj.currentVersion);

const configureOutputsSfn = new sfn.StateMachine(this, 'sfn_configure_outputs_json', {
stateMachineName: `${props.stateMachinePrefix}-configure-outputs-json`,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
"""
Add ora reference
"""
from pathlib import Path
from typing import Dict, Optional, List


Expand All @@ -24,9 +25,11 @@ def handler(event, context) -> Dict[str, bool]:
# If tumorFastqListRows is None and fastqListRows is None, return false
if tumor_fastq_list_rows is None and normal_fastq_list_rows is None:
return {
"add_ora_step": False
"add_ora_step": False,
"is_hybrid": False
}

add_ora_step = False
for fastq_list_row_iter in [tumor_fastq_list_rows, normal_fastq_list_rows]:
if fastq_list_row_iter is not None:
# If fastqListRows is not None, return true
Expand All @@ -39,13 +42,28 @@ def handler(event, context) -> Dict[str, bool]:
for row in fastq_list_row_iter
]
):
return {
"add_ora_step": True
}
add_ora_step = True

# Check if hybrid
endings = []
for fastq_list_row_iter in [tumor_fastq_list_rows, normal_fastq_list_rows]:
if fastq_list_row_iter is not None:
endings.extend(
list(set(list(map(
lambda fastq_list_row_: Path(fastq_list_row_.get("read1FileUri")).suffix,
fastq_list_row_iter
))))
)

if len(list(set(endings))) > 1:
is_hybrid = True
else:
is_hybrid = False

# Got to here? Return false
return {
"add_ora_step": False
"add_ora_step": add_ora_step,
"is_hybrid": is_hybrid
}


Expand Down Expand Up @@ -95,9 +113,10 @@ def handler(event, context) -> Dict[str, bool]:
# )
#
# # {
# # "add_ora_step": false
# # "add_ora_step": false,
# # "is_hybrid": false
# # }

#
# if __name__ == "__main__":
# import json
#
Expand Down Expand Up @@ -144,5 +163,6 @@ def handler(event, context) -> Dict[str, bool]:
# )
#
# # {
# # "add_ora_step": true
# # "add_ora_step": true,
# # "is_hybrid": true
# # }
Loading

0 comments on commit ba2a11c

Please sign in to comment.