Skip to content

Commit

Permalink
Merge branch 'aviti' of https://github.com/ssjunnebo/TACA into aviti
Browse files Browse the repository at this point in the history
  • Loading branch information
ssjunnebo committed Sep 17, 2024
2 parents eda9f3f + c840ea8 commit 918f16f
Showing 1 changed file with 206 additions and 22 deletions.
228 changes: 206 additions & 22 deletions taca/element/Element_Runs.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,13 @@
import json
import logging
import os
import re
import shutil
import zipfile
from datetime import datetime
from glob import glob

import pandas as pd

from taca.utils import misc
from taca.utils.filesystem import chdir
Expand All @@ -28,14 +33,9 @@ def __init__(self, run_dir, configuration):
self.demux_dir,
"RunStats.json", # Assumes demux is finished when this file is created
)
self.run_manifest_file = os.path.join(self.run_dir, "RunManifest.csv")
self.run_manifest_zip_file = os.path.join(
self.CONFIG.get("Aviti").get("manifest_zip_location"),
self.flowcell_id + ".tar.gz",
) # TODO: change and add to taca.yaml
# TODO, need to be real careful when using the flowcell_id as it is manually entered and can mean three different things
self.transfer_file = (
self.CONFIG.get("Element").get(self.sequencer_type).get("transfer_log")) # TODO: change and add to taca.yaml
self.CONFIG.get("Element").get(self.sequencer_type).get("transfer_log")
) # TODO: change and add to taca.yaml

# Instrument generated files
self.run_parameters_file = os.path.join(self.run_dir, "RunParameters.json")
Expand All @@ -49,6 +49,10 @@ def __init__(self, run_dir, configuration):

# Fields to be set by TACA
self.status = None
self.lims_step_id = None
self.lims_full_manifest = None
self.lims_start_manifest = None
self.lims_demux_manifests = None

# Fields that will be set when parsing run parameters
self.run_name = None
Expand Down Expand Up @@ -147,9 +151,9 @@ def get_demultiplexing_status(self):
elif os.path.exists(self.demux_dir) and not os.path.isfile(
self.demux_stats_file
):
return "ongoing" # TODO: check for exit status file instead
return "ongoing" # TODO: check for exit status file instead
elif os.path.exists(self.demux_dir) and os.path.isfile(self.demux_stats_file):
return "finished" # TODO: check exit status of demux in exit status file
return "finished" # TODO: check exit status of demux in exit status file
else:
return "unknown"

Expand All @@ -168,9 +172,192 @@ def update_statusdb(self):
def manifest_exists(self):
return os.path.isfile(self.run_manifest_zip_file)

def copy_manifests(self):
shutil.copy(self.run_manifest_zip_file, self.run_dir)
# TODO: unzip
def get_lims_step_id(self) -> str | None:
"""If the run was started using a LIMS-generated manifest,
the ID of the LIMS step can be extracted from it.
"""

# TODO test me

assert self.manifest_exists(), "Run manifest not found"
with open(self.run_manifest_file_from_instrument) as csv_file:
manifest_lines = csv_file.readlines()
for line in manifest_lines:
if "lims_step_id" in line:
lims_step_id = line.split(",")[1]
return lims_step_id
return None

def copy_manifests(self) -> bool:
"""Fetch the LIMS-generated run manifests from ngi-nas-ns and unzip them into a run subdir."""

# TODO test me

# Specify dir in which LIMS drop the manifest zip files
dir_to_search = os.path.join(
self.CONFIG.get("Aviti").get(
"manifest_zip_location"
), # TODO: change and add to taca.yaml
datetime.now().year,
)

# Use LIMS step ID if available, else flowcell ID, to make a query pattern
if self.lims_step_id:
logging.info(
f"Using LIMS step ID '{self.lims_step_id}' to find LIMS run manifests."
)
glob_pattern = f"{dir_to_search}/*{self.lims_step_id}*.zip"
else:
logging.warning(
"LIMS step ID not available, using flowcell ID to find LIMS run manifests."
)
glob_pattern = f"{dir_to_search}/*{self.flowcell_id}*.zip"

# Find paths matching the pattern
glob_results = glob(glob_pattern)
if len(glob_results) == 0:
logger.warning(
f"No manifest found for run '{self.run_dir}' with pattern '{glob_pattern}'."
)
return False # TODO determine whether to raise an error here instead
elif len(glob_results) > 1:
logger.warning(
f"Multiple manifests found for run '{self.run_dir}' with pattern '{glob_pattern}', using latest one."
)
glob_results.sort()
zip_src_path = glob_results[-1]
else:
zip_src_path = glob_results[0]

# Make a run subdir named after the zip file and extract manifests there
zip_name = os.path.basename(zip_src_path)
zip_dst_path = os.path.join(self.run_dir, zip_name)
os.mkdir(zip_dst_path)

with zipfile.ZipFile(zip_src_path, "r") as zip_ref:
zip_ref.extractall(zip_dst_path)

# Set the paths of the different manifests as attributes
manifests = os.listdir(zip_dst_path)
self.lims_full_manifest = [
m for m in manifests if re.match(r".*_untrimmed\.csv$", m)
][0]
self.lims_start_manifest = [
m for m in manifests if re.match(r".*_trimmed\.csv$", m)
][0]
self.lims_demux_manifests = [
m for m in manifests if re.match(r".*_\d+\.csv$", m)
]

return True

def make_demux_manifests(
self, manifest_to_split: os.PathLike, outdir: os.PathLike | None = None
) -> list[os.PathLike]:
"""Derive composite demultiplexing manifests (grouped by index duplicity and lengths)
from a single information-rich manifest.
"""

# TODO test me

# Read specified manifest
with open(manifest_to_split) as f:
manifest_contents = f.read()

# Get '[SAMPLES]' section
split_contents = "[SAMPLES]".split(manifest_contents)
assert (
len(split_contents) == 2
), f"Could not split sample rows out of manifest {manifest_contents}"
sample_section = split_contents[1].split("\n")

# Split into header and rows
header = sample_section[0]
sample_rows = sample_section[1:]

# Convert to list of dicts
sample_dicts = []
for row in sample_rows:
row_dict = dict(zip(header.split(","), row.split(",")))
sample_dicts.append(row_dict)

# Convert to dataframe
df = pd.DataFrame.from_dict(sample_dicts)

# Separate samples from controls
df_samples = df[df["Project"] != "Control"].copy()
df_controls = df[df["Project"] == "Control"].copy()

# Apply default dir path for output
if outdir is None:
outdir = self.run_dir

## Build composite manifests

manifest_root_name = f"{self.NGI_run_id}_demux"

# Get idx lengths for calculations
df_samples.loc[:, "len_idx1"] = df["Index1"].apply(len)
df_samples.loc[:, "len_idx2"] = df["Index2"].apply(len)

# Break down by index lengths and lane, creating composite manifests
manifests = []
n = 0
for (len_idx1, len_idx2, lane), group in df_samples.groupby(
["len_idx1", "len_idx2", "Lane"]
):
file_name = f"{manifest_root_name}_{n}.csv"
runValues_section = "\n".join(
[
"[RUNVALUES]",
"KeyName, Value",
f'manifest_file, "{file_name}"',
f"manifest_group, {n+1}/{len(df.groupby(['len_idx1', 'len_idx2', 'Lane']))}",
f"grouped_by, len_idx1:{len_idx1} len_idx2:{len_idx2} lane:{lane}",
]
)

settings_section = "\n".join(
[
"[SETTINGS]",
"SettingName, Value",
]
)

# Add PhiX stratified by index length
if group["phix_loaded"].any():
# Subset controls by lane
group_controls = df_controls[df_controls["Lane"] == lane].copy()

# Trim PhiX indexes to match group
group_controls.loc[:, "Index1"] = group_controls.loc[:, "Index1"].apply(
lambda x: x[:len_idx1]
)
group_controls.loc[:, "Index2"] = group_controls.loc[:, "Index2"].apply(
lambda x: x[:len_idx2]
)

# Add PhiX to group
group = pd.concat([group, group_controls], axis=0, ignore_index=True)

samples_section = (
f"[SAMPLES]\n{group.iloc[:, 0:6].to_csv(index=None, header=True)}"
)

manifest_contents = "\n\n".join(
[runValues_section, settings_section, samples_section]
)

file_path = os.path.join(outdir, file_name)
manifests.append((file_path, manifest_contents))
n += 1

for manifest_path, manifest_contents in manifests:
with open(os.path.join(outdir, manifest_path), "w") as f:
f.write(manifest_contents)

manifest_paths = [t[0] for t in manifests]
return manifest_paths

def generate_demux_command(self, run_manifest, demux_dir):
command = [
Expand All @@ -181,15 +368,16 @@ def generate_demux_command(self, run_manifest, demux_dir):
demux_dir,
"-p 8",
f"-r {run_manifest}",
"--legacy-fastq", # TODO: except if Smart-seq3
"--legacy-fastq", # TODO: except if Smart-seq3
"--force-index-orientation",
] # TODO: any other options?
] # TODO: any other options?
# TODO: write exit status of command to file
return command

def start_demux(self, run_manifest, demux_dir):
with chdir(self.run_dir):
cmd = self.generate_demux_command(run_manifest, demux_dir)
# TODO handle multiple composite manifests for demux
misc.call_external_command_detached(
cmd, with_log_files=True, prefix="demux_"
)
Expand All @@ -209,17 +397,17 @@ def get_transfer_status(self):
return "unknown"

def in_transfer_log(self):
with open(self.transfer_file, 'r') as transfer_file:
with open(self.transfer_file, "r") as transfer_file:
for row in transfer_file.read():
if self.NGI_run_id in row:
return True
return False

def transfer_ongoing(self):
# TODO: return true if hidden transfer file marker exists, else false

pass

def rsync_complete(self):
# TODO: return true if .rsync_exit_status exists
pass
Expand All @@ -232,8 +420,6 @@ def aggregate_demux_results(self):
# TODO: aggregate demux results
pass



def sync_metadata(self):
# TODO: copy metadata from demuxed run to ngi-nas-ns
pass
Expand All @@ -257,5 +443,3 @@ def update_transfer_log(self):
def archive(self):
# TODO: move run dir to nosync
pass


0 comments on commit 918f16f

Please sign in to comment.