Skip to content

Commit

Permalink
Merge pull request #419 from kedhammar/fix-anglerfish
Browse files Browse the repository at this point in the history
Fix and improve TACA for handling QC runs
  • Loading branch information
kedhammar authored Mar 13, 2024
2 parents 802cc78 + 91f86ce commit 6d7d339
Show file tree
Hide file tree
Showing 26 changed files with 365 additions and 182 deletions.
8 changes: 8 additions & 0 deletions VERSIONLOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,13 @@
# TACA Version Log

## 20240304.1

- Make sure TACA can handle runs that generate NO sequencing data at all
- Refactor logic of control function to reduce complexity
- Introduce custom Exception for quiet skipping (waiting on) runs
- Improve documentation
- Minor polishing of test to pass

## 20240229.1

Increase test coverage to 20%.
Expand Down
3 changes: 1 addition & 2 deletions taca/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
""" Main TACA module
"""
"""Main TACA module"""

__version__ = "1.0.0"
3 changes: 2 additions & 1 deletion taca/analysis/analysis.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
"""Analysis methods for TACA."""

import glob
import logging
import os
Expand Down Expand Up @@ -34,7 +35,7 @@ def get_runObj(
logger.error(
f"Cannot find RunParameters.xml or runParameters.xml in the run folder for run {run}"
)
return
return None

run_parameters_path = os.path.join(run, run_parameters_file)
try:
Expand Down
194 changes: 108 additions & 86 deletions taca/analysis/analysis_nanopore.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
"""Nanopore analysis methods for TACA."""

import logging
import os
import re
Expand Down Expand Up @@ -114,36 +115,34 @@ def process_user_run(ont_user_run: ONT_user_run):


def process_qc_run(ont_qc_run: ONT_qc_run):
"""This control function orchestrates the sequential execution of the ONT_qc_run class methods.
For a single ONT QC run...
- Ensure there is a database entry corresponding to an ongoing run
If not fully synced:
- Skip
If fully synced:
- Ensure all necessary files to proceed with processing are present
- Update the StatusDB entry
- Copy HTML report to GenStat
If Anglerfish has not been run:
If Anglerfish is ongoing:
- Skip
If Anglerfish is not ongoing:
- Run Anglerfish
If Anglerfish has been run:
If Anglerfish failed:
- Throw error
If Anglerfish finished successfully:
- Copy metadata
- Transfer run to cluster
- Update transfer log
- Archive run
f"""This control function orchestrates the sequential execution of the {ONT_qc_run} methods.
For a single ONT QC run...
├── Ensure there is a database entry corresponding to an ongoing run
├── If not fully synced
│ └── Skip run
├── Ensure all necessary files to proceed with processing are present
├── Update the StatusDB entry
├── Copy HTML report to GenStat
├── If there is sequencing raw data
│ ├── If no fastq output
│ │ └── Skip run
│ ├── If Anglerfish has not been run
│ │ ├── If Anglerfish is ongoing
│ │ │ └── Skip run
│ │ ├── If Anglerfish samplesheet could not be found
│ │ │ └── Skip run
│ │ ├── Run Anglerfish as subprocess
│ │ └── Skip run
│ └── If Anglerfish has failed
│ └── Throw error
├── If run has already been transferred
│ └── Skip run
├── Copy metadata
├── Transfer run to cluster
├── Update transfer log
└── Archive run
Any errors raised here-in should be sent with traceback as an email.
"""
Expand All @@ -153,19 +152,29 @@ def process_qc_run(ont_qc_run: ONT_qc_run):

# Is the run fully synced?
if not ont_qc_run.is_synced():
logger.info(f"{ont_qc_run.run_name}: Run is not fully synced, skipping.")
else:
# Assert all files are in place
logger.info(f"{ont_qc_run.run_name}: Asserting run contents...")
ont_qc_run.assert_contents()
raise WaitForRun(f"{ont_qc_run.run_name}: Run is not fully synced, skipping.")

# Assert all files are in place
logger.info(f"{ont_qc_run.run_name}: Asserting run contents...")
ont_qc_run.assert_contents()

# Update StatusDB
logger.info(f"{ont_qc_run.run_name}: Updating StatusDB...")
ont_qc_run.update_db_entry()

# Update StatusDB
logger.info(f"{ont_qc_run.run_name}: Updating StatusDB...")
ont_qc_run.update_db_entry()
# Copy HTML report
logger.info(f"{ont_qc_run.run_name}: Putting HTML report on GenStat...")
ont_qc_run.copy_html_report()

# Copy HTML report
logger.info(f"{ont_qc_run.run_name}: Putting HTML report on GenStat...")
ont_qc_run.copy_html_report()
# Look at seq data
if not ont_qc_run.has_raw_seq_output():
logger.info(f"{ont_qc_run.run_name}: Run has no sequencing output, continuing")

else:
if not ont_qc_run.has_fastq_output():
raise WaitForRun(
f"{ont_qc_run.run_name}: Run has no fastq output, skipping."
)

# Anglerfish
logger.info(
Expand All @@ -174,13 +183,8 @@ def process_qc_run(ont_qc_run: ONT_qc_run):

anglerfish_exit_code = ont_qc_run.get_anglerfish_exit_code()

if isinstance(anglerfish_exit_code, int) and anglerfish_exit_code > 0:
logger.warning(
f"{ont_qc_run.run_name}: Anglerfish has failed, throwing error."
)
raise AssertionError(f"{ont_qc_run.run_name}: Anglerfish failed.")

elif anglerfish_exit_code is None:
# Anglerfish not run
if anglerfish_exit_code is None:
logger.info(
f"{ont_qc_run.run_name}: Anglerfish has not been run, continuing."
)
Expand All @@ -190,58 +194,65 @@ def process_qc_run(ont_qc_run: ONT_qc_run):
)

anglerfish_pid = ont_qc_run.get_anglerfish_pid()

# Anglerfish ongoing
if anglerfish_pid:
logger.info(
f"{ont_qc_run.run_name}: Anglerfish is ongoing with process ID {anglerfish_pid}, skipping."
)
else:
logger.info(
f"{ont_qc_run.run_name}: Anglerfish is not ongoing, continuing."
)
raise WaitForRun("Anglerfish is ongoing, skipping.")

logger.info(
f"{ont_qc_run.run_name}: Fetching Anglerfish samplesheet..."
)
if not ont_qc_run.fetch_anglerfish_samplesheet():
logger.info(
f"{ont_qc_run.run_name}: Could not find Anglerfish sample sheet, skipping."
)
elif not ont_qc_run.has_fastq_output():
logger.info(
f"{ont_qc_run.run_name}: Run has no fastq output, skipping."
)
else:
logger.info(f"{ont_qc_run.run_name}: Starting Anglerfish...")
ont_qc_run.run_anglerfish()

elif isinstance(anglerfish_exit_code, int) and anglerfish_exit_code == 0:
logger.info(
f"{ont_qc_run.run_name}: Anglerfish has finished successfully, continuing."
f"{ont_qc_run.run_name}: Anglerfish is not ongoing, continuing."
)

if ont_qc_run.is_transferred():
logger.warning(
f"{ont_qc_run.run_name}: Run is already logged as transferred, skipping."
logger.info(f"{ont_qc_run.run_name}: Fetching Anglerfish samplesheet...")

if not ont_qc_run.fetch_anglerfish_samplesheet():
raise WaitForRun("Could not find Anglerfish sample sheet, skipping.")

logger.info(f"{ont_qc_run.run_name}: Starting Anglerfish...")
ont_qc_run.run_anglerfish()
raise WaitForRun("Anglerfish has been started, skipping.")

# Anglerfish run
elif isinstance(anglerfish_exit_code, int):
if anglerfish_exit_code == 0:
logger.info(
f"{ont_qc_run.run_name}: Anglerfish has finished successfully, continuing."
)
elif anglerfish_exit_code > 0:
logger.error(
f"{ont_qc_run.run_name}: Anglerfish has failed, throwing error."
)
raise AssertionError(f"{ont_qc_run.run_name}: Anglerfish failed.")
else:
raise AssertionError("Unexpected Anglerfish exit code.")

# Check transfer status
if ont_qc_run.is_transferred():
logger.warning(
f"{ont_qc_run.run_name}: Run is already logged as transferred, skipping."
)
raise WaitForRun("Run is already logged as transferred.")

else:
logger.info(f"{ont_qc_run.run_name}: Processing transfer...")
logger.info(f"{ont_qc_run.run_name}: Processing transfer...")

# Copy metadata
logger.info(f"{ont_qc_run.run_name}: Copying metadata...")
ont_qc_run.copy_metadata()
# Copy metadata
logger.info(f"{ont_qc_run.run_name}: Copying metadata...")
ont_qc_run.copy_metadata()

# Transfer run
logger.info(f"{ont_qc_run.run_name}: Transferring to cluster...")
ont_qc_run.transfer_run()
# Transfer run
logger.info(f"{ont_qc_run.run_name}: Transferring to cluster...")
ont_qc_run.transfer_run()

# Update transfer log
logger.info(f"{ont_qc_run.run_name}: Updating transfer log...")
ont_qc_run.update_transfer_log()
# Update transfer log
logger.info(f"{ont_qc_run.run_name}: Updating transfer log...")
ont_qc_run.update_transfer_log()

# Archive run
logger.info(f"{ont_qc_run.run_name}: Archiving run...")
ont_qc_run.archive_run()
# Archive run
logger.info(f"{ont_qc_run.run_name}: Archiving run...")
ont_qc_run.archive_run()


def ont_transfer(run_abspath: str | None, qc: bool = False):
Expand Down Expand Up @@ -276,10 +287,21 @@ def ont_transfer(run_abspath: str | None, qc: bool = False):
process_user_run(ONT_user_run(run_dir))
else:
process_qc_run(ONT_qc_run(run_dir))
except WaitForRun as e:
logger.info(f"Skipping run {os.path.basename(run_dir)}: {e}")
except BaseException as e:
send_error_mail(os.path.basename(run_dir), e)


class WaitForRun(Exception):
"""Exception defined to exit processing the current run and continue
with the next one without sending an error email.
"""

def __init__(self, message: str):
logging.info(message)


def ont_updatedb(run_abspath: str):
"""CLI entry function."""

Expand Down
1 change: 1 addition & 0 deletions taca/analysis/cli.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
"""CLI for the analysis subcommand."""

import click

from taca.analysis import analysis as an
Expand Down
1 change: 1 addition & 0 deletions taca/backup/backup.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
"""Backup methods and utilities."""

import csv
import logging
import os
Expand Down
1 change: 1 addition & 0 deletions taca/backup/cli.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
"""CLI for the backup subcommand."""

import click

from taca.backup.backup import backup_utils as bkut
Expand Down
1 change: 1 addition & 0 deletions taca/cleanup/cleanup.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
"""Storage methods and utilities"""

import logging
import os
import re
Expand Down
1 change: 1 addition & 0 deletions taca/cleanup/cli.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
"""CLI for the storage subcommand."""

import click

from taca.cleanup import cleanup as cln
Expand Down
6 changes: 3 additions & 3 deletions taca/illumina/MiSeq_Runs.py
Original file line number Diff line number Diff line change
Expand Up @@ -102,9 +102,9 @@ def _copy_samplesheet(self):
os.path.join(self.run_dir, "SampleSheet_copy.csv")
)
if not self.runParserObj.obj.get("samplesheet_csv"):
self.runParserObj.obj[
"samplesheet_csv"
] = self.runParserObj.samplesheet.data
self.runParserObj.obj["samplesheet_csv"] = (
self.runParserObj.samplesheet.data
)

def _generate_clean_samplesheet(
self,
Expand Down
42 changes: 21 additions & 21 deletions taca/illumina/Runs.py
Original file line number Diff line number Diff line change
Expand Up @@ -797,20 +797,20 @@ def _fix_html_reports_for_complex_lanes(
self.NumberReads_Summary[entry["Lane"]]["total_sample_cluster"] = 0
self.NumberReads_Summary[entry["Lane"]]["total_sample_yield"] = 0
if entry["Project"] != "default":
self.NumberReads_Summary[entry["Lane"]][
"total_sample_cluster"
] += int(entry["PF Clusters"].replace(",", ""))
self.NumberReads_Summary[entry["Lane"]][
"total_sample_yield"
] += int(entry["Yield (Mbases)"].replace(",", ""))
self.NumberReads_Summary[entry["Lane"]]["total_sample_cluster"] += (
int(entry["PF Clusters"].replace(",", ""))
)
self.NumberReads_Summary[entry["Lane"]]["total_sample_yield"] += (
int(entry["Yield (Mbases)"].replace(",", ""))
)
else:
if entry["Project"] != "default":
self.NumberReads_Summary[entry["Lane"]][
"total_sample_cluster"
] += int(entry["PF Clusters"].replace(",", ""))
self.NumberReads_Summary[entry["Lane"]][
"total_sample_yield"
] += int(entry["Yield (Mbases)"].replace(",", ""))
self.NumberReads_Summary[entry["Lane"]]["total_sample_cluster"] += (
int(entry["PF Clusters"].replace(",", ""))
)
self.NumberReads_Summary[entry["Lane"]]["total_sample_yield"] += (
int(entry["Yield (Mbases)"].replace(",", ""))
)

# Calculate the numbers clusters/yields of undet reads
for key, value in self.NumberReads_Summary.items():
Expand Down Expand Up @@ -856,15 +856,15 @@ def _fix_html_reports_for_complex_lanes(
)

# Update the values in Flowcell Summary
html_report_laneBarcode_parser.flowcell_data[
"Clusters (Raw)"
] = f"{Clusters_Raw:,}"
html_report_laneBarcode_parser.flowcell_data[
"Clusters(PF)"
] = f"{Clusters_PF:,}"
html_report_laneBarcode_parser.flowcell_data[
"Yield (MBases)"
] = f"{Yield_Mbases:,}"
html_report_laneBarcode_parser.flowcell_data["Clusters (Raw)"] = (
f"{Clusters_Raw:,}"
)
html_report_laneBarcode_parser.flowcell_data["Clusters(PF)"] = (
f"{Clusters_PF:,}"
)
html_report_laneBarcode_parser.flowcell_data["Yield (MBases)"] = (
f"{Yield_Mbases:,}"
)
# Generate the new report for laneBarcode.html
new_html_report_laneBarcode = os.path.join(
new_html_report_lane_dir, "laneBarcode.html"
Expand Down
Loading

0 comments on commit 6d7d339

Please sign in to comment.