Skip to content

Commit

Permalink
Merge pull request #763 from umccr/enhancement/allow-cttsov2-ora-comp…
Browse files Browse the repository at this point in the history
…ression-inputs

Add ora compression to cttsov2 workflow
  • Loading branch information
alexiswl authored Dec 9, 2024
2 parents 4ce27c7 + 9dc91a1 commit da840b6
Show file tree
Hide file tree
Showing 6 changed files with 394 additions and 121 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,28 @@ else
exit 1
fi

# If the estimated gz file size is -1, we need to do a double extraction
# Since we do not have the space to store the gz file and then upload it
if [[ "${ESTIMATED_GZ_FILE_SIZE}" == "-1" ]]; then
echo "$(date -Iseconds): Estimated file gz file size is -1, we need to do a double extraction to get the file size" 1>&2
ESTIMATED_GZ_FILE_SIZE="$( \
wget \
--quiet \
--output-document - \
"$( \
python3 scripts/get_icav2_download_url.py \
"${INPUT_URI}"
)" | \
/usr/local/bin/orad \
--gzip \
--stdout \
--ora-reference "${ORADATA_PATH}" \
- | \
wc -c \
)"
echo "$(date -Iseconds): Estimated gz file size is ${ESTIMATED_GZ_FILE_SIZE}" 1>&2
fi

# Set AWS credentials access for aws s3 cp
echo "$(date -Iseconds): Collecting the AWS S3 Access credentials" 1>&2
aws_s3_access_creds_json_str="$( \
Expand Down Expand Up @@ -145,4 +167,3 @@ else
echo "$(date -Iseconds): Stream and upload of decompression complete" 1>&2
fi


Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ interface Cttsov2Icav2PipelineManagerConstructProps {
generateCopyManifestDictLambdaObj: PythonFunction;
checkNumRunningSfnsLambdaObj: PythonFunction;
getRandomNumberLambdaObj: PythonFunction;
checkFastqListRowIsOraLambdaObj: PythonFunction;
convertOraToCacheUriGzPathLambdaObj: PythonFunction;
// SFN Output lambdas
deleteCacheUriLambdaObj: PythonFunction;
setOutputJsonLambdaObj: PythonFunction;
Expand All @@ -49,6 +51,8 @@ interface Cttsov2Icav2PipelineManagerConstructProps {
checkSuccessSampleLambdaObj: PythonFunction;
// ICAv2 Copy Batch State Machine Object
icav2CopyFilesStateMachineObj: sfn.IStateMachine;
// ORA Decompression Statemachine Object
oraDecompressionStateMachineObj: sfn.IStateMachine;
}

export class Cttsov2Icav2PipelineManagerConstruct extends Construct {
Expand Down Expand Up @@ -84,9 +88,15 @@ export class Cttsov2Icav2PipelineManagerConstruct extends Construct {
props.getRandomNumberLambdaObj.currentVersion.functionArn,
__check_number_of_copy_jobs_running_lambda_function_arn__:
props.checkNumRunningSfnsLambdaObj.currentVersion.functionArn,
__fastq_list_rows_are_ora_lambda_function_arn__:
props.checkFastqListRowIsOraLambdaObj.currentVersion.functionArn,
__convert_ora_uri_to_gz_cache_uri_lambda_function_arn__:
props.convertOraToCacheUriGzPathLambdaObj.currentVersion.functionArn,
/* Subfunction state machines */
__copy_icav2_files_state_machine_arn__:
props.icav2CopyFilesStateMachineObj.stateMachineArn,
__ora_fastq_list_row_decompression_sfn_arn__:
props.oraDecompressionStateMachineObj.stateMachineArn,
/* Dynamodb tables */
__table_name__: props.dynamodbTableObj.tableName,
},
Expand All @@ -99,6 +109,8 @@ export class Cttsov2Icav2PipelineManagerConstruct extends Construct {
props.uploadSamplesheetToCacheDirLambdaObj,
props.getRandomNumberLambdaObj,
props.checkNumRunningSfnsLambdaObj,
props.checkFastqListRowIsOraLambdaObj,
props.convertOraToCacheUriGzPathLambdaObj,
].forEach((lambda_obj) => {
lambda_obj.currentVersion.grantInvoke(configureInputsSfn);
});
Expand All @@ -109,6 +121,7 @@ export class Cttsov2Icav2PipelineManagerConstruct extends Construct {
// Add state machine execution permissions to stateMachine role
props.icav2CopyFilesStateMachineObj.grantStartExecution(configureInputsSfn);
props.icav2CopyFilesStateMachineObj.grantRead(configureInputsSfn);
props.oraDecompressionStateMachineObj.grantStartExecution(configureInputsSfn);

// Because we run a nested state machine, we need to add the permissions to the state machine role
// See https://stackoverflow.com/questions/60612853/nested-step-function-in-a-step-function-unknown-error-not-authorized-to-cr
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import { PythonFunction } from '@aws-cdk/aws-lambda-python-alpha';
import * as lambda from 'aws-cdk-lib/aws-lambda';
import { Duration } from 'aws-cdk-lib';
import { DockerImageCode, DockerImageFunction } from 'aws-cdk-lib/aws-lambda';
import { OraDecompressionConstruct } from '../../../../components/ora-file-decompression-fq-pair-sfn';

export interface Cttsov2Icav2PipelineManagerConfig {
/* ICAv2 Pipeline analysis essentials */
Expand Down Expand Up @@ -67,6 +68,16 @@ export class Cttsov2Icav2PipelineManagerStack extends cdk.Stack {
}
);

// Get the ora decompression construct
const oraDecompressionStateMachineObj = new OraDecompressionConstruct(
this,
'ora_decompression_state_machine_obj',
{
icav2AccessTokenSecretId: icav2AccessTokenSecretObj.secretName,
sfnPrefix: props.stateMachinePrefix,
}
);

// Set ssm parameter object list
const pipelineIdSsmObjList = ssm.StringParameter.fromStringParameterName(
this,
Expand Down Expand Up @@ -151,6 +162,40 @@ export class Cttsov2Icav2PipelineManagerStack extends cdk.Stack {
}
);

const checkFastqListRowIsOraLambdaObj = new PythonFunction(
this,
'check_fastq_list_row_is_ora_lambda_python_function',
{
entry: path.join(__dirname, '../lambdas/check_fastq_list_row_is_ora_py'),
runtime: lambda.Runtime.PYTHON_3_12,
architecture: lambda.Architecture.ARM_64,
index: 'check_fastq_list_row_is_ora.py',
handler: 'handler',
memorySize: 1024,
timeout: Duration.seconds(60),
environment: {
ICAV2_ACCESS_TOKEN_SECRET_ID: icav2AccessTokenSecretObj.secretName,
},
}
);

const convertOraToCacheUriGzPathLambdaObj = new PythonFunction(
this,
'convert_ora_to_cache_uri_gz_path_lambda_python_function',
{
entry: path.join(__dirname, '../lambdas/convert_ora_to_cache_uri_gz_path_py'),
runtime: lambda.Runtime.PYTHON_3_12,
architecture: lambda.Architecture.ARM_64,
index: 'convert_ora_to_cache_uri_gz_path.py',
handler: 'handler',
memorySize: 1024,
timeout: Duration.seconds(60),
environment: {
ICAV2_ACCESS_TOKEN_SECRET_ID: icav2AccessTokenSecretObj.secretName,
},
}
);

/*
Part 2: Build lambdas for output json generation
*/
Expand Down Expand Up @@ -245,12 +290,15 @@ export class Cttsov2Icav2PipelineManagerStack extends cdk.Stack {
dynamodbTableObj: dynamodbTableObj,
icav2AccessTokenSecretObj: icav2AccessTokenSecretObj,
icav2CopyFilesStateMachineObj: icav2CopyFilesStateMachineObj.icav2CopyFilesSfnObj,
oraDecompressionStateMachineObj: oraDecompressionStateMachineObj.sfnObject,
pipelineIdSsmObj: pipelineIdSsmObjList,
/* Lambdas paths */
uploadSamplesheetToCacheDirLambdaObj: uploadSamplesheetToCacheDirLambdaObj, // __dirname + '/../../../lambdas/upload_samplesheet_to_cache_dir_py'
generateCopyManifestDictLambdaObj: generateCopyManifestDictLambdaObj, // __dirname + '/../../../lambdas/generate_copy_manifest_dict_py'
getRandomNumberLambdaObj: getRandomNumberLambdaObj,
checkNumRunningSfnsLambdaObj: checkNumRunningSfns,
convertOraToCacheUriGzPathLambdaObj: convertOraToCacheUriGzPathLambdaObj,
checkFastqListRowIsOraLambdaObj: checkFastqListRowIsOraLambdaObj,
deleteCacheUriLambdaObj: deleteCacheUriLambdaFunction,
setOutputJsonLambdaObj: setOutputJsonLambdaFunction,
getVcfsLambdaObj: getVcfsLambdaFunction,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
#!/usr/bin/env python3

"""
Check fastq list row is ora
"""


def handler(event, context):
"""
Collect the read1FileUri and read2FileUri from the fastq list and check if they are in the ora format,
return True if they are, False otherwise
:param event:
:param context:
:return:
"""

# Get the fastq list from the event
fastq_list_row = event['fastq_list_row']

# Check if the read1FileUri and read2FileUri are in the ora format
if fastq_list_row.get("read1FileUri").endswith(".ora") and fastq_list_row.get("read2FileUri").endswith(".ora"):
return {
"is_ora": True
}
elif fastq_list_row.get("read1FileUri").endswith(".gz") and fastq_list_row.get("read2FileUri").endswith(".gz"):
return {
"is_ora": False
}
else:
raise ValueError("The read1FileUri and read2FileUri need to be in the same format")
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
#!/usr/bin/env python

"""
Given a fastq list row in ora format, a cache uri and a sample id,
Determine the output gzip path for the fastq files
Returns read_1_gz_output_uri and read_2_gz_output_uri
"""

from urllib.parse import (urlparse, urlunparse)
from pathlib import Path

def extend_url(url, path_ext: str) -> str:
"""
Extend the url path with the path_ext
"""
url_obj = urlparse(url)

return str(
urlunparse(
(
url_obj.scheme,
url_obj.netloc,
str(Path(url_obj.path) / path_ext),
url_obj.params,
url_obj.query,
url_obj.fragment
)
)
)


def handler(event, context):
# Get the input event
cache_uri = event['cache_uri']

# Get the input event
sample_id = event['sample_id']

# Get the input event
fastq_list_row = event['fastq_list_row']
read_1_ora_file_uri = fastq_list_row['read1FileUri']
read_2_ora_file_uri = fastq_list_row['read2FileUri']

# Extend the cache uri to include the sample id
sample_cache_uri = extend_url(cache_uri, sample_id)

# Get the file name from the ora file uri
# And replace the .ora extension with .gz
read_1_file_name = Path(read_1_ora_file_uri).name.replace('.ora', '.gz')
read_2_file_name = Path(read_2_ora_file_uri).name.replace('.ora', '.gz')

# Get the output uri for the gz files
return {
'read_1_gz_output_uri': extend_url(sample_cache_uri, read_1_file_name),
'read_2_gz_output_uri': extend_url(sample_cache_uri, read_2_file_name)
}
Loading

0 comments on commit da840b6

Please sign in to comment.