Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add fastqc / sequali stats to fastq list row events for first 1 million reads #602

Merged
merged 3 commits into from
Oct 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import * as dynamodb from 'aws-cdk-lib/aws-dynamodb';
import * as ssm from 'aws-cdk-lib/aws-ssm';
import { NewSamplesheetEventShowerConstruct } from './part_1/samplesheet-event-shower';
import { NewFastqListRowsEventShowerConstruct } from './part_2/fastq-list-rows-event-shower';
import * as secretsManager from 'aws-cdk-lib/aws-secretsmanager';

/*
Provide the glue to push 'shower' events
Expand All @@ -13,6 +14,7 @@ When either new fastq list rows arrive or when a new samplesheet arrives
export interface showerGlueHandlerConstructProps {
eventBusObj: events.IEventBus;
instrumentRunTableObj: dynamodb.ITableV2;
icav2AccessTokenSecretObj: secretsManager.ISecret;
}

export class showerGlueHandlerConstruct extends Construct {
Expand Down Expand Up @@ -40,10 +42,12 @@ export class showerGlueHandlerConstruct extends Construct {
this,
'fastq_list_rows_shower',
{
// Event bus
/* Event bus */
eventBusObj: props.eventBusObj,
// Tables
/* Tables */
tableObj: props.instrumentRunTableObj,
/* Secrets */
icav2AccessTokenSecretObj: props.icav2AccessTokenSecretObj,
}
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,25 @@ import * as eventsTargets from 'aws-cdk-lib/aws-events-targets';
import path from 'path';
import { LambdaB64GzTranslatorConstruct } from '../../../../../../../components/python-lambda-b64gz-translator';
import { PythonFunction } from '@aws-cdk/aws-lambda-python-alpha';
import { Architecture, Runtime } from 'aws-cdk-lib/aws-lambda';
import {
Architecture,
DockerImageCode,
DockerImageFunction,
Runtime,
} from 'aws-cdk-lib/aws-lambda';
import { Duration } from 'aws-cdk-lib';
import * as lambda from 'aws-cdk-lib/aws-lambda';
import * as secretsManager from 'aws-cdk-lib/aws-secretsmanager';

export interface NewFastqListRowsEventShowerConstructProps {
tableObj: dynamodb.ITableV2;
/* Event Bus */
eventBusObj: events.IEventBus;

/* Tables */
tableObj: dynamodb.ITableV2;

/* Secrets */
icav2AccessTokenSecretObj: secretsManager.ISecret;
}

export class NewFastqListRowsEventShowerConstruct extends Construct {
Expand All @@ -34,6 +48,7 @@ export class NewFastqListRowsEventShowerConstruct extends Construct {
subject: 'subject',
library: 'library',
project: 'project',
fastqListRow: 'fastq_list_row',
},
// Set Event Triggers
triggerSource: 'orcabus.workflowmanager',
Expand All @@ -60,7 +75,6 @@ export class NewFastqListRowsEventShowerConstruct extends Construct {

constructor(scope: Construct, id: string, props: NewFastqListRowsEventShowerConstructProps) {
super(scope, id);

/*
Part 1: Build the lambdas
*/
Expand All @@ -73,6 +87,14 @@ export class NewFastqListRowsEventShowerConstruct extends Construct {
}
).lambdaObj;

const cleanupFastqListRowLambda = new PythonFunction(this, 'cleanup_fastq_list_rows_lambda', {
entry: path.join(__dirname, 'lambdas', 'clean_up_fastq_list_rows_py'),
index: 'clean_up_fastq_list_rows.py',
handler: 'handler',
runtime: Runtime.PYTHON_3_12,
architecture: Architecture.ARM_64,
});

// Generate Data Objects
// Translate the libraryrunstatechange event
const generateEventDataObjsLambda = new PythonFunction(
Expand All @@ -87,6 +109,66 @@ export class NewFastqListRowsEventShowerConstruct extends Construct {
}
);

// Add the demux stats
const generateDemuxStatsLambda = new PythonFunction(this, 'generate_demux_stats_py', {
entry: path.join(__dirname, 'lambdas', 'get_demultiplex_stats_py'),
index: 'get_demultiplex_stats.py',
handler: 'handler',
runtime: Runtime.PYTHON_3_12,
architecture: Architecture.ARM_64,
memorySize: 1024, // Don't want pandas to kill the lambda
environment: {
ICAV2_ACCESS_TOKEN_SECRET_ID: props.icav2AccessTokenSecretObj.secretName,
},
timeout: Duration.seconds(300),
});

// Give fastqc stats lambda permission to access the secret
props.icav2AccessTokenSecretObj.grantRead(generateDemuxStatsLambda.currentVersion);

// Get the fastqc stats
const architecture = lambda.Architecture.ARM_64;
const getFastqcStats = new DockerImageFunction(this, 'get_fastqc_stats', {
description: 'Get Fastqc stats from first 1 million reads',
code: DockerImageCode.fromImageAsset(path.join(__dirname, 'lambdas/get_fastqc_stats'), {
file: 'Dockerfile',
buildArgs: {
platform: architecture.dockerPlatform,
},
}),
// Pulling data from icav2 can take time
timeout: Duration.seconds(180), // Maximum length of lambda duration is 15 minutes
retryAttempts: 0, // Never perform a retry if it fails
memorySize: 2048, // Don't want pandas to kill the lambda
architecture: architecture,
environment: {
ICAV2_ACCESS_TOKEN_SECRET_ID: props.icav2AccessTokenSecretObj.secretName,
},
});

// Give fastqc stats lambda permission to access the secret
props.icav2AccessTokenSecretObj.grantRead(getFastqcStats.currentVersion);

// Get the sequali stats
const getSequaliStatsLambdaObj = new DockerImageFunction(this, 'get_sequali_stats', {
description: 'Get the sequali stats from first 1 million reads',
code: DockerImageCode.fromImageAsset(path.join(__dirname, 'lambdas/get_sequali_stats'), {
file: 'Dockerfile',
buildArgs: {
platform: architecture.dockerPlatform,
},
}),
memorySize: 2048, // Don't want pandas to kill the lambda
timeout: Duration.seconds(300),
architecture: Architecture.ARM_64,
environment: {
ICAV2_ACCESS_TOKEN_SECRET_ID: props.icav2AccessTokenSecretObj.secretName,
},
});

// Give the lambda permission to access the secret
props.icav2AccessTokenSecretObj.grantRead(getSequaliStatsLambdaObj.currentVersion);

/*
Part 2: Build state machine
*/
Expand Down Expand Up @@ -147,12 +229,21 @@ export class NewFastqListRowsEventShowerConstruct extends Construct {
this.newFastqListRowsEventShowerMap.tablePartition.instrumentRun,
__project_table_partition_name__:
this.newFastqListRowsEventShowerMap.tablePartition.project,
__fastq_list_row_table_partition_name__:
this.newFastqListRowsEventShowerMap.tablePartition.fastqListRow,

/* Lambda functions */
__decompress_fastq_list_rows_lambda_function_arn__:
decompressFastqListRowLambda.currentVersion.functionArn,
__clean_up_fastq_list_rows_lambda_function_arn__:
cleanupFastqListRowLambda.currentVersion.functionArn,
__generate_event_maps_lambda_function_arn__:
generateEventDataObjsLambda.currentVersion.functionArn,
__get_read_counts_per_rgid_lambda_function_arn__:
generateDemuxStatsLambda.currentVersion.functionArn,
__get_fastqc_stats_lambda_function_arn__: getFastqcStats.currentVersion.functionArn,
__get_sequali_stats_lambda_function_arn__:
getSequaliStatsLambdaObj.currentVersion.functionArn,
},
});

Expand All @@ -163,7 +254,14 @@ export class NewFastqListRowsEventShowerConstruct extends Construct {
props.tableObj.grantReadWriteData(this.stateMachineObj);

/* Allow state machine to invoke lambda */
[decompressFastqListRowLambda, generateEventDataObjsLambda].forEach((lambda) => {
[
decompressFastqListRowLambda,
generateEventDataObjsLambda,
generateDemuxStatsLambda,
getFastqcStats,
getSequaliStatsLambdaObj,
cleanupFastqListRowLambda,
].forEach((lambda) => {
lambda.currentVersion.grantInvoke(this.stateMachineObj.role);
});

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,144 @@
#!/usr/bin/env python3

"""
Clean up the fastq list rows
* convert uppercase to lowercase
* extend rgid to contain the instrument run and the sample name

* Otherwise very hard to match the fastq files to the sample names

# [
# {
# "RGID": "GAATTCGT.TTATGAGT.1",
# "RGSM": "L2400102",
# "RGLB": "L2400102",
# "Lane": 1,
# "Read1FileUri": "s3://pipeline-dev-cache-503977275616-ap-southeast-2/byob-icav2/development/primary/240229_A00130_0288_BH5HM2DSXC/202409108ed29dcc/Samples/Lane_1/L2400102/L2400102_S1_L001_R1_001.fastq.gz",
# "Read2FileUri": "s3://pipeline-dev-cache-503977275616-ap-southeast-2/byob-icav2/development/primary/240229_A00130_0288_BH5HM2DSXC/202409108ed29dcc/Samples/Lane_1/L2400102/L2400102_S1_L001_R2_001.fastq.gz"
# },
# {
# "RGID": "GTGACGTT.TCCCAGAT.4",
# "RGSM": "L2400257",
# "RGLB": "L2400257",
# "Lane": 4,
# "Read1FileUri": "s3://pipeline-dev-cache-503977275616-ap-southeast-2/byob-icav2/development/primary/240229_A00130_0288_BH5HM2DSXC/202409108ed29dcc/Samples/Lane_4/L2400257/L2400257_S29_L004_R1_001.fastq.gz",
# "Read2FileUri": "s3://pipeline-dev-cache-503977275616-ap-southeast-2/byob-icav2/development/primary/240229_A00130_0288_BH5HM2DSXC/202409108ed29dcc/Samples/Lane_4/L2400257/L2400257_S29_L004_R2_001.fastq.gz"
# }
# ]

To

# [
# {
# "rgid": "GAATTCGT.TTATGAGT.1.240229_A00130_0288_BH5HM2DSXC.L2400102",
# "rgsm": "L2400102",
# "rglb": "L2400102",
# "lane": 1,
# "read1fileUri": "s3://pipeline-dev-cache-503977275616-ap-southeast-2/byob-icav2/development/primary/240229_A00130_0288_BH5HM2DSXC/202409108ed29dcc/Samples/Lane_1/L2400102/L2400102_S1_L001_R1_001.fastq.gz",
# "read2fileUri": "s3://pipeline-dev-cache-503977275616-ap-southeast-2/byob-icav2/development/primary/240229_A00130_0288_BH5HM2DSXC/202409108ed29dcc/Samples/Lane_1/L2400102/L2400102_S1_L001_R2_001.fastq.gz"
# },
# {
# "rgid": "GTGACGTT.TCCCAGAT.4.240229_A00130_0288_BH5HM2DSXC.L2400257",
# "rgsm": "L2400257",
# "rglb": "L2400257",
# "lane": 4,
# "read1fileUri": "s3://pipeline-dev-cache-503977275616-ap-southeast-2/byob-icav2/development/primary/240229_A00130_0288_BH5HM2DSXC/202409108ed29dcc/Samples/Lane_4/L2400257/L2400257_S29_L004_R1_001.fastq.gz",
# "read2fileUri": "s3://pipeline-dev-cache-503977275616-ap-southeast-2/byob-icav2/development/primary/240229_A00130_0288_BH5HM2DSXC/202409108ed29dcc/Samples/Lane_4/L2400257/L2400257_S29_L004_R2_001.fastq.gz"
# }
# ]
"""

from typing import Dict


def handler(event, context) -> Dict:
"""

Given the instrument run id and fastq list rows, return the fastq list rows with the rgid
extended to contain the instrument run id and the sample name

All keys should be from UPPERCASE / PascalCase to camelCase

:param event:
:param context:
:return:
"""

# Get inputs
instrument_run_id = event["instrument_run_id"]
fastq_list_rows = event["fastq_list_rows"]

# Clean up the fastq list rows
fastq_list_rows = list(
map(
lambda fastq_list_row_iter_: {
"rgid": f"{fastq_list_row_iter_['RGID']}.{instrument_run_id}.{fastq_list_row_iter_['RGSM']}",
"rgsm": fastq_list_row_iter_["RGSM"],
"rglb": fastq_list_row_iter_["RGLB"],
"lane": fastq_list_row_iter_["Lane"],
"read1FileUri": fastq_list_row_iter_["Read1FileUri"],
"read2FileUri": fastq_list_row_iter_["Read2FileUri"]
},
fastq_list_rows
)
)

# Return the fastq list rows
return {
"fastq_list_rows": fastq_list_rows
}


# if __name__ == "__main__":
# import json
#
# print(
# json.dumps(
# handler(
# {
# "instrument_run_id": "240229_A00130_0288_BH5HM2DSXC",
# "fastq_list_rows": [
# {
# "RGID": "GAATTCGT.TTATGAGT.1",
# "RGSM": "L2400102",
# "RGLB": "L2400102",
# "Lane": 1,
# "Read1FileUri": "s3://pipeline-dev-cache-503977275616-ap-southeast-2/byob-icav2/development/primary/240229_A00130_0288_BH5HM2DSXC/202409108ed29dcc/Samples/Lane_1/L2400102/L2400102_S1_L001_R1_001.fastq.gz",
# "Read2FileUri": "s3://pipeline-dev-cache-503977275616-ap-southeast-2/byob-icav2/development/primary/240229_A00130_0288_BH5HM2DSXC/202409108ed29dcc/Samples/Lane_1/L2400102/L2400102_S1_L001_R2_001.fastq.gz"
# },
# {
# "RGID": "GTGACGTT.TCCCAGAT.4",
# "RGSM": "L2400257",
# "RGLB": "L2400257",
# "Lane": 4,
# "Read1FileUri": "s3://pipeline-dev-cache-503977275616-ap-southeast-2/byob-icav2/development/primary/240229_A00130_0288_BH5HM2DSXC/202409108ed29dcc/Samples/Lane_4/L2400257/L2400257_S29_L004_R1_001.fastq.gz",
# "Read2FileUri": "s3://pipeline-dev-cache-503977275616-ap-southeast-2/byob-icav2/development/primary/240229_A00130_0288_BH5HM2DSXC/202409108ed29dcc/Samples/Lane_4/L2400257/L2400257_S29_L004_R2_001.fastq.gz"
# }
# ]
# },
# None
# ),
# indent=4
# )
# )
#
# # {
# # "fastq_list_rows": [
# # {
# # "rgid": "GAATTCGT.TTATGAGT.1.240229_A00130_0288_BH5HM2DSXC.L2400102",
# # "rgsm": "L2400102",
# # "rglb": "L2400102",
# # "lane": 1,
# # "read1fileUri": "s3://pipeline-dev-cache-503977275616-ap-southeast-2/byob-icav2/development/primary/240229_A00130_0288_BH5HM2DSXC/202409108ed29dcc/Samples/Lane_1/L2400102/L2400102_S1_L001_R1_001.fastq.gz",
# # "read2fileUri": "s3://pipeline-dev-cache-503977275616-ap-southeast-2/byob-icav2/development/primary/240229_A00130_0288_BH5HM2DSXC/202409108ed29dcc/Samples/Lane_1/L2400102/L2400102_S1_L001_R2_001.fastq.gz"
# # },
# # {
# # "rgid": "GTGACGTT.TCCCAGAT.4.240229_A00130_0288_BH5HM2DSXC.L2400257",
# # "rgsm": "L2400257",
# # "rglb": "L2400257",
# # "lane": 4,
# # "read1fileUri": "s3://pipeline-dev-cache-503977275616-ap-southeast-2/byob-icav2/development/primary/240229_A00130_0288_BH5HM2DSXC/202409108ed29dcc/Samples/Lane_4/L2400257/L2400257_S29_L004_R1_001.fastq.gz",
# # "read2fileUri": "s3://pipeline-dev-cache-503977275616-ap-southeast-2/byob-icav2/development/primary/240229_A00130_0288_BH5HM2DSXC/202409108ed29dcc/Samples/Lane_4/L2400257/L2400257_S29_L004_R2_001.fastq.gz"
# # }
# # ]
# # }
Loading