Skip to content

Commit

Permalink
ORA Compression fixes for too large sfns, single indexed libraries
Browse files Browse the repository at this point in the history
And single read libraries
  • Loading branch information
alexiswl committed Nov 30, 2024
1 parent 948056e commit 5a15720
Show file tree
Hide file tree
Showing 6 changed files with 317 additions and 153 deletions.
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
{
"Comment": "A description of my state machine",
"StartAt": "Convert Fastq List Row GZ URIs to Array",
"StartAt": "Convert Fastq List Row GZ Pair URIs to Array",
"States": {
"Convert Fastq List Row GZ URIs to Array": {
"Convert Fastq List Row GZ Pair URIs to Array": {
"Type": "Pass",
"Next": "Wait for Task Availability",
"Parameters": {
Expand Down Expand Up @@ -39,8 +39,27 @@
"ProcessorConfig": {
"Mode": "INLINE"
},
"StartAt": "Validate ORA File",
"StartAt": "Check if fileUri is not Null",
"States": {
"Check if fileUri is not Null": {
"Type": "Choice",
"Choices": [
{
"Variable": "$.fastqMap.input_uri",
"IsNull": true,
"Next": "Set null outputs"
}
],
"Default": "Validate ORA File"
},
"Set null outputs": {
"Type": "Pass",
"End": true,
"Result": {
"fileUri": null,
"md5sum": null
}
},
"Validate ORA File": {
"Type": "Task",
"Resource": "arn:aws:states:::ecs:runTask.sync",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,8 +70,25 @@
"ProcessorConfig": {
"Mode": "INLINE"
},
"StartAt": "Run Validation Only",
"StartAt": "Is read uri null",
"States": {
"Is read uri null": {
"Type": "Choice",
"Choices": [
{
"Variable": "$.fastqMap.input_uri",
"IsNull": true,
"Next": "Set Null outputs",
"Comment": "URI is null"
}
],
"Default": "Run Validation Only"
},
"Set Null outputs": {
"Type": "Pass",
"End": true,
"ResultPath": null
},
"Run Validation Only": {
"Type": "Choice",
"Choices": [
Expand Down Expand Up @@ -139,7 +156,8 @@
"JitterStrategy": "FULL"
}
],
"End": true
"End": true,
"ResultPath": null
},
"Decompress ORA File": {
"Type": "Task",
Expand Down Expand Up @@ -185,7 +203,8 @@
}
},
"TimeoutSeconds": 7200,
"End": true
"End": true,
"ResultPath": null
}
}
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
convert_uri_to_project_data_obj,
ProjectData, convert_project_data_obj_to_uri
)
from wrapica.enums import DataType, UriType
from wrapica.enums import DataType

if typing.TYPE_CHECKING:
from mypy_boto3_ssm import SSMClient
Expand Down Expand Up @@ -87,6 +87,11 @@ def handler(event, context):

instrument_run_folder_uri = event["instrument_run_folder_uri"]
instrument_run_id = event["instrument_run_id"]
get_lanes_only = event.get("get_lanes_only", None)
lane_num = event.get("filter_lane", None)

if get_lanes_only is None:
get_lanes_only = False

# Get the project data obj
instrument_run_folder_obj: ProjectData = convert_uri_to_project_data_obj(
Expand Down Expand Up @@ -118,14 +123,10 @@ def handler(event, context):
fastq_gz_project_data_obj_list
))

# Check if the number of R1 and R2 files are the same
if len(r1_files) != len(r2_files):
raise ValueError("Number of R1 and R2 files are not the same")

# Create the fastq pair list
fastq_pair_list = []

for r1_file, r2_file in zip(r1_files, r2_files):
for r1_file in r1_files:
rgid_regex_match = FASTQ_REGEX_OBJ.fullmatch(r1_file.data.details.name)

sample_id = rgid_regex_match.group(1)
Expand All @@ -137,18 +138,49 @@ def handler(event, context):
if lane is None:
lane = "1"

fastq_pair_list.append({
"rgid_partial": f"{lane}.{sample_id}",
"read_1_file_uri": convert_project_data_obj_to_uri(r1_file),
"read_2_file_uri": convert_project_data_obj_to_uri(r2_file)
})
# Find the corresponding R2 file
try:
r2_file = next(filter(
lambda x: x.data.details.name == r1_file.data.details.name.replace("_R1_001.fastq.gz", "_R2_001.fastq.gz"),
r2_files
))

fastq_pair_list.append({
"rgid_partial": f"{lane}.{sample_id}",
"read_1_file_uri": convert_project_data_obj_to_uri(r1_file),
"read_2_file_uri": convert_project_data_obj_to_uri(r2_file)
})
except StopIteration:
fastq_pair_list.append({
"rgid_partial": f"{lane}.{sample_id}",
"read_1_file_uri": convert_project_data_obj_to_uri(r1_file),
"read_2_file_uri": None
})

# Assert that the all rgid_partial are unique
assert \
len(pd.DataFrame(fastq_pair_list)['rgid_partial'].unique().tolist()) == len(fastq_pair_list), \
"rgid_partial are not unique"

return fastq_pair_list
if get_lanes_only:
return {
"lanes_list": list(
set(map(
lambda fastq_pair_iter_: int(fastq_pair_iter_['rgid_partial'].split('.')[0]),
fastq_pair_list
))
)
}

if lane_num is None:
return fastq_pair_list
else:
return list(
filter(
lambda fastq_pair_iter_: int(fastq_pair_iter_['rgid_partial'].split('.')[0]) == lane_num,
fastq_pair_list
)
)


# if __name__ == "__main__":
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
import typing
from io import StringIO
import boto3
from typing import List, Dict
from typing import List, Dict, Optional
from os import environ
import pandas as pd

Expand Down Expand Up @@ -76,6 +76,20 @@ def set_icav2_env_vars():
environ["ICAV2_ACCESS_TOKEN_SECRET_ID"]
)

def get_rgid(
sample_id: str,
lane: int,
instrument_run_id: str,
index1: str,
index2: Optional[str] = None,
):
"""
Generate the rgid
"""
if index2 is None:
return f"{index1}.{lane}.{sample_id}.{instrument_run_id}"
return f"{index1}.{index2}.{lane}.{sample_id}.{instrument_run_id}"


def handler(event, context):
"""
Expand All @@ -85,6 +99,9 @@ def handler(event, context):

instrument_run_folder_uri = event["instrument_run_folder_uri"]
instrument_run_id = event["instrument_run_id"]
filter_lane = event.get("filter_lane", None)
if filter_lane is None:
filter_lane = 1

# Get the project data obj
instrument_run_folder_obj: ProjectData = convert_uri_to_project_data_obj(
Expand Down Expand Up @@ -121,10 +138,23 @@ def handler(event, context):

# Convert each item in the bclconvert data section to rgids
for bclconvert_iter_ in samplesheet_data_dict["bclconvert_data"]:
# Add in rgids if the lane is not specified
if bclconvert_iter_.get('lane', None) is None:
bclconvert_iter_['lane'] = filter_lane
# Skip if the lane is specified and does not match the filter lane
elif bclconvert_iter_['lane'] != filter_lane:
continue

rgids_list.append(
{
"rgid": f"{bclconvert_iter_['index']}.{bclconvert_iter_['index2']}.{bclconvert_iter_.get('lane', 1)}.{bclconvert_iter_['sample_id']}.{instrument_run_id}",
"rgid_partial": f"{bclconvert_iter_.get('lane', 1)}.{bclconvert_iter_['sample_id']}",
"rgid": get_rgid(
index1=bclconvert_iter_["index"],
index2=bclconvert_iter_.get("index2", None),
lane=bclconvert_iter_["lane"],
sample_id=bclconvert_iter_["sample_id"],
instrument_run_id=instrument_run_id,
),
"rgid_partial": f"{bclconvert_iter_['lane']}.{bclconvert_iter_['sample_id']}",
}
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
"Parameters": {
"TableName": "${__table_name__}",
"Key": {
"id.$": "$.rgid_pair.rgid",
"id.$": "$.rgid_pair",
"id_type": "${__fastq_list_row_table_partition_name__}"
}
},
Expand Down Expand Up @@ -51,7 +51,7 @@
"Parameters": {
"TableName": "${__table_name__}",
"Key": {
"id.$": "$.rgid_pair.rgid",
"id.$": "$.rgid_pair",
"id_type": "${__fastq_list_row_table_partition_name__}"
},
"UpdateExpression": "SET read1_raw_md5sum = :read1RawMd5sum, read2_raw_md5sum = :read2RawMd5sum",
Expand Down
Loading

0 comments on commit 5a15720

Please sign in to comment.