Skip to content

Commit

Permalink
Merge pull request #602 from umccr/enhancement/add-fastqc-stats-to-fqlr
Browse files Browse the repository at this point in the history
Add fastqc / sequali stats to fastq list row events for first 1 million reads
  • Loading branch information
alexiswl authored Oct 18, 2024
2 parents 35e81a5 + c6b7ad8 commit f9b52d4
Show file tree
Hide file tree
Showing 14 changed files with 1,725 additions and 312 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import * as dynamodb from 'aws-cdk-lib/aws-dynamodb';
import * as ssm from 'aws-cdk-lib/aws-ssm';
import { NewSamplesheetEventShowerConstruct } from './part_1/samplesheet-event-shower';
import { NewFastqListRowsEventShowerConstruct } from './part_2/fastq-list-rows-event-shower';
import * as secretsManager from 'aws-cdk-lib/aws-secretsmanager';

/*
Provide the glue to push 'shower' events
Expand All @@ -13,6 +14,7 @@ When either new fastq list rows arrive or when a new samplesheet arrives
export interface showerGlueHandlerConstructProps {
eventBusObj: events.IEventBus;
instrumentRunTableObj: dynamodb.ITableV2;
icav2AccessTokenSecretObj: secretsManager.ISecret;
}

export class showerGlueHandlerConstruct extends Construct {
Expand Down Expand Up @@ -40,10 +42,12 @@ export class showerGlueHandlerConstruct extends Construct {
this,
'fastq_list_rows_shower',
{
// Event bus
/* Event bus */
eventBusObj: props.eventBusObj,
// Tables
/* Tables */
tableObj: props.instrumentRunTableObj,
/* Secrets */
icav2AccessTokenSecretObj: props.icav2AccessTokenSecretObj,
}
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,25 @@ import * as eventsTargets from 'aws-cdk-lib/aws-events-targets';
import path from 'path';
import { LambdaB64GzTranslatorConstruct } from '../../../../../../../components/python-lambda-b64gz-translator';
import { PythonFunction } from '@aws-cdk/aws-lambda-python-alpha';
import { Architecture, Runtime } from 'aws-cdk-lib/aws-lambda';
import {
Architecture,
DockerImageCode,
DockerImageFunction,
Runtime,
} from 'aws-cdk-lib/aws-lambda';
import { Duration } from 'aws-cdk-lib';
import * as lambda from 'aws-cdk-lib/aws-lambda';
import * as secretsManager from 'aws-cdk-lib/aws-secretsmanager';

export interface NewFastqListRowsEventShowerConstructProps {
tableObj: dynamodb.ITableV2;
/* Event Bus */
eventBusObj: events.IEventBus;

/* Tables */
tableObj: dynamodb.ITableV2;

/* Secrets */
icav2AccessTokenSecretObj: secretsManager.ISecret;
}

export class NewFastqListRowsEventShowerConstruct extends Construct {
Expand All @@ -34,6 +48,7 @@ export class NewFastqListRowsEventShowerConstruct extends Construct {
subject: 'subject',
library: 'library',
project: 'project',
fastqListRow: 'fastq_list_row',
},
// Set Event Triggers
triggerSource: 'orcabus.workflowmanager',
Expand All @@ -60,7 +75,6 @@ export class NewFastqListRowsEventShowerConstruct extends Construct {

constructor(scope: Construct, id: string, props: NewFastqListRowsEventShowerConstructProps) {
super(scope, id);

/*
Part 1: Build the lambdas
*/
Expand All @@ -73,6 +87,14 @@ export class NewFastqListRowsEventShowerConstruct extends Construct {
}
).lambdaObj;

const cleanupFastqListRowLambda = new PythonFunction(this, 'cleanup_fastq_list_rows_lambda', {
entry: path.join(__dirname, 'lambdas', 'clean_up_fastq_list_rows_py'),
index: 'clean_up_fastq_list_rows.py',
handler: 'handler',
runtime: Runtime.PYTHON_3_12,
architecture: Architecture.ARM_64,
});

// Generate Data Objects
// Translate the libraryrunstatechange event
const generateEventDataObjsLambda = new PythonFunction(
Expand All @@ -87,6 +109,66 @@ export class NewFastqListRowsEventShowerConstruct extends Construct {
}
);

// Add the demux stats
const generateDemuxStatsLambda = new PythonFunction(this, 'generate_demux_stats_py', {
entry: path.join(__dirname, 'lambdas', 'get_demultiplex_stats_py'),
index: 'get_demultiplex_stats.py',
handler: 'handler',
runtime: Runtime.PYTHON_3_12,
architecture: Architecture.ARM_64,
memorySize: 1024, // Don't want pandas to kill the lambda
environment: {
ICAV2_ACCESS_TOKEN_SECRET_ID: props.icav2AccessTokenSecretObj.secretName,
},
timeout: Duration.seconds(300),
});

// Give fastqc stats lambda permission to access the secret
props.icav2AccessTokenSecretObj.grantRead(generateDemuxStatsLambda.currentVersion);

// Get the fastqc stats
const architecture = lambda.Architecture.ARM_64;
const getFastqcStats = new DockerImageFunction(this, 'get_fastqc_stats', {
description: 'Get Fastqc stats from first 1 million reads',
code: DockerImageCode.fromImageAsset(path.join(__dirname, 'lambdas/get_fastqc_stats'), {
file: 'Dockerfile',
buildArgs: {
platform: architecture.dockerPlatform,
},
}),
// Pulling data from icav2 can take time
timeout: Duration.seconds(180), // Maximum length of lambda duration is 15 minutes
retryAttempts: 0, // Never perform a retry if it fails
memorySize: 2048, // Don't want pandas to kill the lambda
architecture: architecture,
environment: {
ICAV2_ACCESS_TOKEN_SECRET_ID: props.icav2AccessTokenSecretObj.secretName,
},
});

// Give fastqc stats lambda permission to access the secret
props.icav2AccessTokenSecretObj.grantRead(getFastqcStats.currentVersion);

// Get the sequali stats
const getSequaliStatsLambdaObj = new DockerImageFunction(this, 'get_sequali_stats', {
description: 'Get the sequali stats from first 1 million reads',
code: DockerImageCode.fromImageAsset(path.join(__dirname, 'lambdas/get_sequali_stats'), {
file: 'Dockerfile',
buildArgs: {
platform: architecture.dockerPlatform,
},
}),
memorySize: 2048, // Don't want pandas to kill the lambda
timeout: Duration.seconds(300),
architecture: Architecture.ARM_64,
environment: {
ICAV2_ACCESS_TOKEN_SECRET_ID: props.icav2AccessTokenSecretObj.secretName,
},
});

// Give the lambda permission to access the secret
props.icav2AccessTokenSecretObj.grantRead(getSequaliStatsLambdaObj.currentVersion);

/*
Part 2: Build state machine
*/
Expand Down Expand Up @@ -147,12 +229,21 @@ export class NewFastqListRowsEventShowerConstruct extends Construct {
this.newFastqListRowsEventShowerMap.tablePartition.instrumentRun,
__project_table_partition_name__:
this.newFastqListRowsEventShowerMap.tablePartition.project,
__fastq_list_row_table_partition_name__:
this.newFastqListRowsEventShowerMap.tablePartition.fastqListRow,

/* Lambda functions */
__decompress_fastq_list_rows_lambda_function_arn__:
decompressFastqListRowLambda.currentVersion.functionArn,
__clean_up_fastq_list_rows_lambda_function_arn__:
cleanupFastqListRowLambda.currentVersion.functionArn,
__generate_event_maps_lambda_function_arn__:
generateEventDataObjsLambda.currentVersion.functionArn,
__get_read_counts_per_rgid_lambda_function_arn__:
generateDemuxStatsLambda.currentVersion.functionArn,
__get_fastqc_stats_lambda_function_arn__: getFastqcStats.currentVersion.functionArn,
__get_sequali_stats_lambda_function_arn__:
getSequaliStatsLambdaObj.currentVersion.functionArn,
},
});

Expand All @@ -163,7 +254,14 @@ export class NewFastqListRowsEventShowerConstruct extends Construct {
props.tableObj.grantReadWriteData(this.stateMachineObj);

/* Allow state machine to invoke lambda */
[decompressFastqListRowLambda, generateEventDataObjsLambda].forEach((lambda) => {
[
decompressFastqListRowLambda,
generateEventDataObjsLambda,
generateDemuxStatsLambda,
getFastqcStats,
getSequaliStatsLambdaObj,
cleanupFastqListRowLambda,
].forEach((lambda) => {
lambda.currentVersion.grantInvoke(this.stateMachineObj.role);
});

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,144 @@
#!/usr/bin/env python3

"""
Clean up the fastq list rows
* convert uppercase to lowercase
* extend rgid to contain the instrument run and the sample name
* Otherwise very hard to match the fastq files to the sample names
# [
# {
# "RGID": "GAATTCGT.TTATGAGT.1",
# "RGSM": "L2400102",
# "RGLB": "L2400102",
# "Lane": 1,
# "Read1FileUri": "s3://pipeline-dev-cache-503977275616-ap-southeast-2/byob-icav2/development/primary/240229_A00130_0288_BH5HM2DSXC/202409108ed29dcc/Samples/Lane_1/L2400102/L2400102_S1_L001_R1_001.fastq.gz",
# "Read2FileUri": "s3://pipeline-dev-cache-503977275616-ap-southeast-2/byob-icav2/development/primary/240229_A00130_0288_BH5HM2DSXC/202409108ed29dcc/Samples/Lane_1/L2400102/L2400102_S1_L001_R2_001.fastq.gz"
# },
# {
# "RGID": "GTGACGTT.TCCCAGAT.4",
# "RGSM": "L2400257",
# "RGLB": "L2400257",
# "Lane": 4,
# "Read1FileUri": "s3://pipeline-dev-cache-503977275616-ap-southeast-2/byob-icav2/development/primary/240229_A00130_0288_BH5HM2DSXC/202409108ed29dcc/Samples/Lane_4/L2400257/L2400257_S29_L004_R1_001.fastq.gz",
# "Read2FileUri": "s3://pipeline-dev-cache-503977275616-ap-southeast-2/byob-icav2/development/primary/240229_A00130_0288_BH5HM2DSXC/202409108ed29dcc/Samples/Lane_4/L2400257/L2400257_S29_L004_R2_001.fastq.gz"
# }
# ]
To
# [
# {
# "rgid": "GAATTCGT.TTATGAGT.1.240229_A00130_0288_BH5HM2DSXC.L2400102",
# "rgsm": "L2400102",
# "rglb": "L2400102",
# "lane": 1,
# "read1fileUri": "s3://pipeline-dev-cache-503977275616-ap-southeast-2/byob-icav2/development/primary/240229_A00130_0288_BH5HM2DSXC/202409108ed29dcc/Samples/Lane_1/L2400102/L2400102_S1_L001_R1_001.fastq.gz",
# "read2fileUri": "s3://pipeline-dev-cache-503977275616-ap-southeast-2/byob-icav2/development/primary/240229_A00130_0288_BH5HM2DSXC/202409108ed29dcc/Samples/Lane_1/L2400102/L2400102_S1_L001_R2_001.fastq.gz"
# },
# {
# "rgid": "GTGACGTT.TCCCAGAT.4.240229_A00130_0288_BH5HM2DSXC.L2400257",
# "rgsm": "L2400257",
# "rglb": "L2400257",
# "lane": 4,
# "read1fileUri": "s3://pipeline-dev-cache-503977275616-ap-southeast-2/byob-icav2/development/primary/240229_A00130_0288_BH5HM2DSXC/202409108ed29dcc/Samples/Lane_4/L2400257/L2400257_S29_L004_R1_001.fastq.gz",
# "read2fileUri": "s3://pipeline-dev-cache-503977275616-ap-southeast-2/byob-icav2/development/primary/240229_A00130_0288_BH5HM2DSXC/202409108ed29dcc/Samples/Lane_4/L2400257/L2400257_S29_L004_R2_001.fastq.gz"
# }
# ]
"""

from typing import Dict


def handler(event, context) -> Dict:
"""
Given the instrument run id and fastq list rows, return the fastq list rows with the rgid
extended to contain the instrument run id and the sample name
All keys should be from UPPERCASE / PascalCase to camelCase
:param event:
:param context:
:return:
"""

# Get inputs
instrument_run_id = event["instrument_run_id"]
fastq_list_rows = event["fastq_list_rows"]

# Clean up the fastq list rows
fastq_list_rows = list(
map(
lambda fastq_list_row_iter_: {
"rgid": f"{fastq_list_row_iter_['RGID']}.{instrument_run_id}.{fastq_list_row_iter_['RGSM']}",
"rgsm": fastq_list_row_iter_["RGSM"],
"rglb": fastq_list_row_iter_["RGLB"],
"lane": fastq_list_row_iter_["Lane"],
"read1FileUri": fastq_list_row_iter_["Read1FileUri"],
"read2FileUri": fastq_list_row_iter_["Read2FileUri"]
},
fastq_list_rows
)
)

# Return the fastq list rows
return {
"fastq_list_rows": fastq_list_rows
}


# if __name__ == "__main__":
# import json
#
# print(
# json.dumps(
# handler(
# {
# "instrument_run_id": "240229_A00130_0288_BH5HM2DSXC",
# "fastq_list_rows": [
# {
# "RGID": "GAATTCGT.TTATGAGT.1",
# "RGSM": "L2400102",
# "RGLB": "L2400102",
# "Lane": 1,
# "Read1FileUri": "s3://pipeline-dev-cache-503977275616-ap-southeast-2/byob-icav2/development/primary/240229_A00130_0288_BH5HM2DSXC/202409108ed29dcc/Samples/Lane_1/L2400102/L2400102_S1_L001_R1_001.fastq.gz",
# "Read2FileUri": "s3://pipeline-dev-cache-503977275616-ap-southeast-2/byob-icav2/development/primary/240229_A00130_0288_BH5HM2DSXC/202409108ed29dcc/Samples/Lane_1/L2400102/L2400102_S1_L001_R2_001.fastq.gz"
# },
# {
# "RGID": "GTGACGTT.TCCCAGAT.4",
# "RGSM": "L2400257",
# "RGLB": "L2400257",
# "Lane": 4,
# "Read1FileUri": "s3://pipeline-dev-cache-503977275616-ap-southeast-2/byob-icav2/development/primary/240229_A00130_0288_BH5HM2DSXC/202409108ed29dcc/Samples/Lane_4/L2400257/L2400257_S29_L004_R1_001.fastq.gz",
# "Read2FileUri": "s3://pipeline-dev-cache-503977275616-ap-southeast-2/byob-icav2/development/primary/240229_A00130_0288_BH5HM2DSXC/202409108ed29dcc/Samples/Lane_4/L2400257/L2400257_S29_L004_R2_001.fastq.gz"
# }
# ]
# },
# None
# ),
# indent=4
# )
# )
#
# # {
# # "fastq_list_rows": [
# # {
# # "rgid": "GAATTCGT.TTATGAGT.1.240229_A00130_0288_BH5HM2DSXC.L2400102",
# # "rgsm": "L2400102",
# # "rglb": "L2400102",
# # "lane": 1,
# # "read1fileUri": "s3://pipeline-dev-cache-503977275616-ap-southeast-2/byob-icav2/development/primary/240229_A00130_0288_BH5HM2DSXC/202409108ed29dcc/Samples/Lane_1/L2400102/L2400102_S1_L001_R1_001.fastq.gz",
# # "read2fileUri": "s3://pipeline-dev-cache-503977275616-ap-southeast-2/byob-icav2/development/primary/240229_A00130_0288_BH5HM2DSXC/202409108ed29dcc/Samples/Lane_1/L2400102/L2400102_S1_L001_R2_001.fastq.gz"
# # },
# # {
# # "rgid": "GTGACGTT.TCCCAGAT.4.240229_A00130_0288_BH5HM2DSXC.L2400257",
# # "rgsm": "L2400257",
# # "rglb": "L2400257",
# # "lane": 4,
# # "read1fileUri": "s3://pipeline-dev-cache-503977275616-ap-southeast-2/byob-icav2/development/primary/240229_A00130_0288_BH5HM2DSXC/202409108ed29dcc/Samples/Lane_4/L2400257/L2400257_S29_L004_R1_001.fastq.gz",
# # "read2fileUri": "s3://pipeline-dev-cache-503977275616-ap-southeast-2/byob-icav2/development/primary/240229_A00130_0288_BH5HM2DSXC/202409108ed29dcc/Samples/Lane_4/L2400257/L2400257_S29_L004_R2_001.fastq.gz"
# # }
# # ]
# # }
Loading

0 comments on commit f9b52d4

Please sign in to comment.