Skip to content

Commit

Permalink
Merge branch 'aviti' of https://github.com/ssjunnebo/TACA into aviti
Browse files Browse the repository at this point in the history
  • Loading branch information
ssjunnebo committed Sep 19, 2024
2 parents a9fc4f7 + c9df021 commit 8e3f05c
Show file tree
Hide file tree
Showing 6 changed files with 122 additions and 69 deletions.
7 changes: 6 additions & 1 deletion .devcontainer/devcontainer.json
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,12 @@
"features": {},
"customizations": {
"vscode": {
"extensions": ["ms-python.python", "eamodio.gitlens"]
"extensions": [
"ms-python.python",
"eamodio.gitlens",
"charliermarsh.ruff",
"ms-python.mypy-type-checker"
]
}
},
// Features to add to the dev container. More info: https://containers.dev/features.
Expand Down
1 change: 0 additions & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ COPY requirements-dev.txt requirements-dev.txt
RUN python -m pip install -r requirements-dev.txt

RUN mkdir /root/.taca/
COPY tests/data/taca_test_cfg.yaml /root/.taca/taca.yaml

FROM base AS testing
COPY . /taca
Expand Down
2 changes: 1 addition & 1 deletion taca/element/Aviti_Runs.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,5 +3,5 @@

class Aviti_Run(Run):
def __init__(self, run_dir, configuration):
super().__init__(run_dir, configuration)
self.sequencer_type = "Aviti"
super().__init__(run_dir, configuration)
142 changes: 87 additions & 55 deletions taca/element/Element_Runs.py
Original file line number Diff line number Diff line change
@@ -1,18 +1,17 @@
import csv
import glob
import json
import logging
import os
import re
import csv
import zipfile
import subprocess
import shutil
import subprocess
import zipfile
from datetime import datetime
from pathlib import Path
from glob import glob

import pandas as pd

from taca.utils import misc
from taca.utils.filesystem import chdir
from taca.utils.statusdb import ElementRunsConnection

Expand All @@ -23,6 +22,10 @@ class Run:
"""Defines an Element run"""

def __init__(self, run_dir, configuration):
if not hasattr(self, "sequencer_type"):
# Mostly for testing, since this class is not meant to be instantiated
self.sequencer_type = "GenericElement"

if not os.path.exists(run_dir):
raise RuntimeError(f"Could not locate run directory {run_dir}")
self.run_parameters_parsed = False
Expand All @@ -32,11 +35,15 @@ def __init__(self, run_dir, configuration):

self.demux_dir = os.path.join(self.run_dir, "Demultiplexing")
self.final_sequencing_file = os.path.join(self.run_dir, "RunUploaded.json")
self.demux_stats_file = "RunStats.json" # Assumes demux is finished when this file is created
self.demux_stats_file = (
"RunStats.json" # Assumes demux is finished when this file is created
)
self.transfer_file = (
self.CONFIG.get("Element").get(self.sequencer_type).get("transfer_log")
self.CONFIG.get("Element", {})
.get(self.sequencer_type, {})
.get("transfer_log")
) # TODO: change and add to taca.yaml
self.rsync_exit_file = os.path.join(self.run_dir, '.rsync_exit_status')
self.rsync_exit_file = os.path.join(self.run_dir, ".rsync_exit_status")

# Instrument generated files
self.run_parameters_file = os.path.join(self.run_dir, "RunParameters.json")
Expand All @@ -46,7 +53,9 @@ def __init__(self, run_dir, configuration):
)
self.run_uploaded_file = os.path.join(self.run_dir, "RunUploaded.json")

self.db = ElementRunsConnection(self.CONFIG["statusdb"], dbname="element_runs")
self.db = ElementRunsConnection(
self.CONFIG.get("statusdb", {}), dbname="element_runs"
)

# Fields to be set by TACA
self.status = None
Expand Down Expand Up @@ -149,19 +158,19 @@ def check_sequencing_status(self):
def get_demultiplexing_status(self):
if not os.path.exists(self.demux_dir):
return "not started"
demux_dirs = glob.glob(
os.path.join(self.run_dir, "Delmultiplexing*")
)
demux_dirs = glob.glob(os.path.join(self.run_dir, "Delmultiplexing*"))
finished_count = 0
for demux_dir in demux_dirs:
if os.path.exists(self.demux_dir) and not os.path.isfile(
os.path.join(demux_dir, self.demux_stats_file)
):
):
return "ongoing"
elif os.path.exists(self.demux_dir) and os.path.isfile(
os.path.join(demux_dir, self.demux_stats_file)
):
finished_count += 1 # TODO: check exit status of demux in exit status file
):
finished_count += (
1 # TODO: check exit status of demux in exit status file
)
if finished_count == len(demux_dirs):
return "finished"
else:
Expand Down Expand Up @@ -373,58 +382,65 @@ def make_demux_manifests(
return manifest_paths

def generate_demux_command(self, run_manifest, demux_dir):
command = (f"{self.CONFIG.get(self.software)["bin"]}" # TODO: add path to bases2fastq executable to config
command = (
f"{self.CONFIG.get(self.software)['bin']}" # TODO: add path to bases2fastq executable to config
+ f" {self.run_dir}"
+ f" {demux_dir}"
+ " -p 8"
+ f" -r {run_manifest}"
+ " --legacy-fastq" # TODO: except if Smart-seq3
+ f" --force-index-orientation"
) # TODO: any other options?
+ " --force-index-orientation"
) # TODO: any other options?
return command

def start_demux(self, run_manifest, demux_dir):
with chdir(self.run_dir):
cmd = self.generate_demux_command(run_manifest, demux_dir)
# TODO: handle multiple composite manifests for demux
try:
p_handle = subprocess.Popen(cmd, stdout=subprocess.PIPE, shell=True, cwd=self.run_dir)
p_handle = subprocess.Popen(
cmd, stdout=subprocess.PIPE, shell=True, cwd=self.run_dir
)
logger.info(
"Bases2Fastq conversion and demultiplexing "
f"started for run {self} on {datetime.now()}"
)
except subprocess.CalledProcessError:
logger.warning("An error occurred while starting demultiplexing for "
f"{self} on {datetime.now()}."
logger.warning(
"An error occurred while starting demultiplexing for "
f"{self} on {datetime.now()}."
)
return


def get_transfer_status(self):
if not self.in_transfer_log() and not self.transfer_ongoing() and not self.rsync_complete():
if (
not self.in_transfer_log()
and not self.transfer_ongoing()
and not self.rsync_complete()
):
return "not started"
elif self.transfer_ongoing() and not self.rsync_complete():
return "ongoing"
elif self.rsync_complete() and not self.in_transfer_log():
return "rsync done"
elif self.in_transfer_log():
return "unknown"

def in_transfer_log(self):
with open(self.transfer_file, "r") as transfer_file:
with open(self.transfer_file) as transfer_file:
for row in transfer_file.read():
if self.NGI_run_id in row:
return True
return False

def transfer_ongoing(self):
return os.path.isfile(os.path.join(self.run_dir, '.rsync_ongoing'))
return os.path.isfile(os.path.join(self.run_dir, ".rsync_ongoing"))

def rsync_complete(self):
return os.path.isfile(self.rsync_exit_file)

def rsync_successful(self):
with open(os.path.join(self.run_dir, '.rsync_exit_status')) as rsync_exit_file:
with open(os.path.join(self.run_dir, ".rsync_exit_status")) as rsync_exit_file:
rsync_exit_status = rsync_exit_file.readlines()
if rsync_exit_status[0].strip() == 0:
return True
Expand All @@ -434,27 +450,35 @@ def rsync_successful(self):
def aggregate_demux_results(self, demux_results_dirs):
# TODO: Correct this based on comments from Chuan
for demux_dir in demux_results_dirs:
data_dirs = [f.path for f in os.scandir(os.path.join(demux_dir, 'Samples')) if f.is_dir()]
data_dirs = [
f.path
for f in os.scandir(os.path.join(demux_dir, "Samples"))
if f.is_dir()
]
for data_dir in data_dirs:
if not "PhiX" in data_dir in data_dir:
shutil.move(data_dir, self.demux_dir)

def upload_demux_results_to_statusdb(self):
doc_obj = self.db.get_db_entry(self.NGI_run_id)
index_assignement_file = os.path.join(self.run_dir, "Demultiplexing", "IndexAssignment.csv")
with open(index_assignement_file, 'r') as index_file:
index_assignement_file = os.path.join(
self.run_dir, "Demultiplexing", "IndexAssignment.csv"
)
with open(index_assignement_file) as index_file:
reader = csv.DictReader(index_file)
index_assignments = [row for row in reader]
unassigned_sequences_file = os.path.join(self.run_dir, "Demultiplexing", "UnassignedSequences.csv")
with open(unassigned_sequences_file, 'r') as unassigned_file:
unassigned_sequences_file = os.path.join(
self.run_dir, "Demultiplexing", "UnassignedSequences.csv"
)
with open(unassigned_sequences_file) as unassigned_file:
reader = csv.DictReader(unassigned_file)
unassigned_sequences = [row for row in reader]
dirs = os.scandir("Demultiplexing")
project_dirs = []
for directory in dirs:
if os.path.isdir(directory.path) and not "Unassigned" in directory.path:
if os.path.isdir(directory.path) and "Unassigned" not in directory.path:
project_dirs.append(directory.path)
for project_dir in project_dirs: # TODO: remove this block when q30 is added to IndexAssignment.csv by Element
for project_dir in project_dirs: # TODO: remove this block when q30 is added to IndexAssignment.csv by Element
run_stats_file = glob.glob(os.path.join(project_dir, "*_RunStats.json"))
with open(run_stats_file) as stats_json:
project_sample_stats_raw = json.load(stats_json)
Expand All @@ -467,21 +491,25 @@ def upload_demux_results_to_statusdb(self):
collected_sample_stats[sample_name] = {
"PercentQ30": percent_q30,
"QualityScoreMean": quality_score_mean,
"PercentMismatch": percent_mismatch
}
"PercentMismatch": percent_mismatch,
}
for assignment in index_assignments:
sample = assignment.get("SampleName")
if sample != "PhiX":
sample_stats_to_add = collected_sample_stats.get(sample)
assignment["PercentQ30"] = sample_stats_to_add.get("PercentQ30")
assignment["QualityScoreMean"] = sample_stats_to_add.get("QualityScoreMean")
assignment["PercentMismatch"] = sample_stats_to_add.get("PercentMismatch")
assignment["QualityScoreMean"] = sample_stats_to_add.get(
"QualityScoreMean"
)
assignment["PercentMismatch"] = sample_stats_to_add.get(
"PercentMismatch"
)
demultiplex_stats = {
"Demultiplex_Stats": {
"Index_Assignment": index_assignments,
"Unassigned_Sequences": unassigned_sequences
}
"Unassigned_Sequences": unassigned_sequences,
}
}
doc_obj["Aviti"] = demultiplex_stats
self.db.upload_to_statusdb(doc_obj)

Expand All @@ -490,30 +518,34 @@ def sync_metadata(self):
pass

def make_transfer_indicator(self):
transfer_indicator = os.path.join(self.run_dir, '.rsync_ongoing')
transfer_indicator = os.path.join(self.run_dir, ".rsync_ongoing")
Path(transfer_indicator).touch()

def transfer(self):
transfer_details = self.CONFIG.get("Element").get(self.sequencer_type).get("transfer_details") #TODO: Add section to taca.yaml
command = ("rsync"
+ " -rLav"
+ f" --chown={transfer_details.get("owner")}"
+ f" --chmod={transfer_details.get("permissions")}"
+ " --exclude BaseCalls" # TODO: check that we actually want to exclude these
+ " --exclude Alignment"
+ f" {self.run_dir}"
+ f" {transfer_details.get("user")@transfer_details.get("host")}:/"
+ "; echo $? > .rsync_exit_status"
) # TODO: any other options?
transfer_details = (
self.CONFIG.get("Element").get(self.sequencer_type).get("transfer_details")
) # TODO: Add section to taca.yaml
command = (
"rsync"
+ " -rLav"
+ f" --chown={transfer_details.get('owner')}"
+ f" --chmod={transfer_details.get('permissions')}"
+ " --exclude BaseCalls" # TODO: check that we actually want to exclude these
+ " --exclude Alignment"
+ f" {self.run_dir}"
+ f" {transfer_details.get('user')@transfer_details.get('host')}:/"
+ "; echo $? > .rsync_exit_status"
) # TODO: any other options?
try:
p_handle = subprocess.Popen(command, stdout=subprocess.PIPE, shell=True)
logger.info(
"Transfer to analysis cluster "
f"started for run {self} on {datetime.now()}"
)
except subprocess.CalledProcessError:
logger.warning("An error occurred while starting transfer to analysis cluster "
f"for {self} on {datetime.now()}."
logger.warning(
"An error occurred while starting transfer to analysis cluster "
f"for {self} on {datetime.now()}."
)
return

Expand Down
6 changes: 6 additions & 0 deletions tests/element/test_Aviti_Runs.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import tempfile
from unittest.mock import patch

import pytest

Expand All @@ -10,6 +11,11 @@ class TestAviti_Run:
def test_init(self, create_dirs: pytest.fixture):
tmp: tempfile.TemporaryDirectory = create_dirs
run_dir = create_element_run_dir(tmp)

# Mock db
mock_db = patch("taca.element.Element_Runs.ElementRunsConnection")
mock_db.start()

run = to_test.Aviti_Run(run_dir, {})
assert run.run_dir == run_dir
assert run.sequencer_type == "Aviti"
Loading

0 comments on commit 8e3f05c

Please sign in to comment.