Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add ora compression to cttsov2 workflow #763

Merged
merged 1 commit into from
Dec 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,28 @@ else
exit 1
fi

# If the estimated gz file size is -1, we need to do a double extraction
# Since we do not have the space to store the gz file and then upload it
if [[ "${ESTIMATED_GZ_FILE_SIZE}" == "-1" ]]; then
echo "$(date -Iseconds): Estimated file gz file size is -1, we need to do a double extraction to get the file size" 1>&2
ESTIMATED_GZ_FILE_SIZE="$( \
wget \
--quiet \
--output-document - \
"$( \
python3 scripts/get_icav2_download_url.py \
"${INPUT_URI}"
)" | \
/usr/local/bin/orad \
--gzip \
--stdout \
--ora-reference "${ORADATA_PATH}" \
- | \
wc -c \
)"
echo "$(date -Iseconds): Estimated gz file size is ${ESTIMATED_GZ_FILE_SIZE}" 1>&2
fi

# Set AWS credentials access for aws s3 cp
echo "$(date -Iseconds): Collecting the AWS S3 Access credentials" 1>&2
aws_s3_access_creds_json_str="$( \
Expand Down Expand Up @@ -145,4 +167,3 @@ else
echo "$(date -Iseconds): Stream and upload of decompression complete" 1>&2
fi


Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ interface Cttsov2Icav2PipelineManagerConstructProps {
generateCopyManifestDictLambdaObj: PythonFunction;
checkNumRunningSfnsLambdaObj: PythonFunction;
getRandomNumberLambdaObj: PythonFunction;
checkFastqListRowIsOraLambdaObj: PythonFunction;
convertOraToCacheUriGzPathLambdaObj: PythonFunction;
// SFN Output lambdas
deleteCacheUriLambdaObj: PythonFunction;
setOutputJsonLambdaObj: PythonFunction;
Expand All @@ -49,6 +51,8 @@ interface Cttsov2Icav2PipelineManagerConstructProps {
checkSuccessSampleLambdaObj: PythonFunction;
// ICAv2 Copy Batch State Machine Object
icav2CopyFilesStateMachineObj: sfn.IStateMachine;
// ORA Decompression Statemachine Object
oraDecompressionStateMachineObj: sfn.IStateMachine;
}

export class Cttsov2Icav2PipelineManagerConstruct extends Construct {
Expand Down Expand Up @@ -84,9 +88,15 @@ export class Cttsov2Icav2PipelineManagerConstruct extends Construct {
props.getRandomNumberLambdaObj.currentVersion.functionArn,
__check_number_of_copy_jobs_running_lambda_function_arn__:
props.checkNumRunningSfnsLambdaObj.currentVersion.functionArn,
__fastq_list_rows_are_ora_lambda_function_arn__:
props.checkFastqListRowIsOraLambdaObj.currentVersion.functionArn,
__convert_ora_uri_to_gz_cache_uri_lambda_function_arn__:
props.convertOraToCacheUriGzPathLambdaObj.currentVersion.functionArn,
/* Subfunction state machines */
__copy_icav2_files_state_machine_arn__:
props.icav2CopyFilesStateMachineObj.stateMachineArn,
__ora_fastq_list_row_decompression_sfn_arn__:
props.oraDecompressionStateMachineObj.stateMachineArn,
/* Dynamodb tables */
__table_name__: props.dynamodbTableObj.tableName,
},
Expand All @@ -99,6 +109,8 @@ export class Cttsov2Icav2PipelineManagerConstruct extends Construct {
props.uploadSamplesheetToCacheDirLambdaObj,
props.getRandomNumberLambdaObj,
props.checkNumRunningSfnsLambdaObj,
props.checkFastqListRowIsOraLambdaObj,
props.convertOraToCacheUriGzPathLambdaObj,
].forEach((lambda_obj) => {
lambda_obj.currentVersion.grantInvoke(configureInputsSfn);
});
Expand All @@ -109,6 +121,7 @@ export class Cttsov2Icav2PipelineManagerConstruct extends Construct {
// Add state machine execution permissions to stateMachine role
props.icav2CopyFilesStateMachineObj.grantStartExecution(configureInputsSfn);
props.icav2CopyFilesStateMachineObj.grantRead(configureInputsSfn);
props.oraDecompressionStateMachineObj.grantStartExecution(configureInputsSfn);

// Because we run a nested state machine, we need to add the permissions to the state machine role
// See https://stackoverflow.com/questions/60612853/nested-step-function-in-a-step-function-unknown-error-not-authorized-to-cr
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import { PythonFunction } from '@aws-cdk/aws-lambda-python-alpha';
import * as lambda from 'aws-cdk-lib/aws-lambda';
import { Duration } from 'aws-cdk-lib';
import { DockerImageCode, DockerImageFunction } from 'aws-cdk-lib/aws-lambda';
import { OraDecompressionConstruct } from '../../../../components/ora-file-decompression-fq-pair-sfn';

export interface Cttsov2Icav2PipelineManagerConfig {
/* ICAv2 Pipeline analysis essentials */
Expand Down Expand Up @@ -67,6 +68,16 @@ export class Cttsov2Icav2PipelineManagerStack extends cdk.Stack {
}
);

// Get the ora decompression construct
const oraDecompressionStateMachineObj = new OraDecompressionConstruct(
this,
'ora_decompression_state_machine_obj',
{
icav2AccessTokenSecretId: icav2AccessTokenSecretObj.secretName,
sfnPrefix: props.stateMachinePrefix,
}
);

// Set ssm parameter object list
const pipelineIdSsmObjList = ssm.StringParameter.fromStringParameterName(
this,
Expand Down Expand Up @@ -151,6 +162,40 @@ export class Cttsov2Icav2PipelineManagerStack extends cdk.Stack {
}
);

const checkFastqListRowIsOraLambdaObj = new PythonFunction(
this,
'check_fastq_list_row_is_ora_lambda_python_function',
{
entry: path.join(__dirname, '../lambdas/check_fastq_list_row_is_ora_py'),
runtime: lambda.Runtime.PYTHON_3_12,
architecture: lambda.Architecture.ARM_64,
index: 'check_fastq_list_row_is_ora.py',
handler: 'handler',
memorySize: 1024,
timeout: Duration.seconds(60),
environment: {
ICAV2_ACCESS_TOKEN_SECRET_ID: icav2AccessTokenSecretObj.secretName,
},
}
);

const convertOraToCacheUriGzPathLambdaObj = new PythonFunction(
this,
'convert_ora_to_cache_uri_gz_path_lambda_python_function',
{
entry: path.join(__dirname, '../lambdas/convert_ora_to_cache_uri_gz_path_py'),
runtime: lambda.Runtime.PYTHON_3_12,
architecture: lambda.Architecture.ARM_64,
index: 'convert_ora_to_cache_uri_gz_path.py',
handler: 'handler',
memorySize: 1024,
timeout: Duration.seconds(60),
environment: {
ICAV2_ACCESS_TOKEN_SECRET_ID: icav2AccessTokenSecretObj.secretName,
},
}
);

/*
Part 2: Build lambdas for output json generation
*/
Expand Down Expand Up @@ -245,12 +290,15 @@ export class Cttsov2Icav2PipelineManagerStack extends cdk.Stack {
dynamodbTableObj: dynamodbTableObj,
icav2AccessTokenSecretObj: icav2AccessTokenSecretObj,
icav2CopyFilesStateMachineObj: icav2CopyFilesStateMachineObj.icav2CopyFilesSfnObj,
oraDecompressionStateMachineObj: oraDecompressionStateMachineObj.sfnObject,
pipelineIdSsmObj: pipelineIdSsmObjList,
/* Lambdas paths */
uploadSamplesheetToCacheDirLambdaObj: uploadSamplesheetToCacheDirLambdaObj, // __dirname + '/../../../lambdas/upload_samplesheet_to_cache_dir_py'
generateCopyManifestDictLambdaObj: generateCopyManifestDictLambdaObj, // __dirname + '/../../../lambdas/generate_copy_manifest_dict_py'
getRandomNumberLambdaObj: getRandomNumberLambdaObj,
checkNumRunningSfnsLambdaObj: checkNumRunningSfns,
convertOraToCacheUriGzPathLambdaObj: convertOraToCacheUriGzPathLambdaObj,
checkFastqListRowIsOraLambdaObj: checkFastqListRowIsOraLambdaObj,
deleteCacheUriLambdaObj: deleteCacheUriLambdaFunction,
setOutputJsonLambdaObj: setOutputJsonLambdaFunction,
getVcfsLambdaObj: getVcfsLambdaFunction,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
#!/usr/bin/env python3

"""
Check fastq list row is ora
"""


def handler(event, context):
"""
Collect the read1FileUri and read2FileUri from the fastq list and check if they are in the ora format,
return True if they are, False otherwise
:param event:
:param context:
:return:
"""

# Get the fastq list from the event
fastq_list_row = event['fastq_list_row']

# Check if the read1FileUri and read2FileUri are in the ora format
if fastq_list_row.get("read1FileUri").endswith(".ora") and fastq_list_row.get("read2FileUri").endswith(".ora"):
return {
"is_ora": True
}
elif fastq_list_row.get("read1FileUri").endswith(".gz") and fastq_list_row.get("read2FileUri").endswith(".gz"):
return {
"is_ora": False
}
else:
raise ValueError("The read1FileUri and read2FileUri need to be in the same format")
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
#!/usr/bin/env python

"""
Given a fastq list row in ora format, a cache uri and a sample id,

Determine the output gzip path for the fastq files

Returns read_1_gz_output_uri and read_2_gz_output_uri
"""

from urllib.parse import (urlparse, urlunparse)
from pathlib import Path

def extend_url(url, path_ext: str) -> str:
"""
Extend the url path with the path_ext
"""
url_obj = urlparse(url)

return str(
urlunparse(
(
url_obj.scheme,
url_obj.netloc,
str(Path(url_obj.path) / path_ext),
url_obj.params,
url_obj.query,
url_obj.fragment
)
)
)


def handler(event, context):
# Get the input event
cache_uri = event['cache_uri']

# Get the input event
sample_id = event['sample_id']

# Get the input event
fastq_list_row = event['fastq_list_row']
read_1_ora_file_uri = fastq_list_row['read1FileUri']
read_2_ora_file_uri = fastq_list_row['read2FileUri']

# Extend the cache uri to include the sample id
sample_cache_uri = extend_url(cache_uri, sample_id)

# Get the file name from the ora file uri
# And replace the .ora extension with .gz
read_1_file_name = Path(read_1_ora_file_uri).name.replace('.ora', '.gz')
read_2_file_name = Path(read_2_ora_file_uri).name.replace('.ora', '.gz')

# Get the output uri for the gz files
return {
'read_1_gz_output_uri': extend_url(sample_cache_uri, read_1_file_name),
'read_2_gz_output_uri': extend_url(sample_cache_uri, read_2_file_name)
}
Loading
Loading