diff --git a/taca/element/Element_Runs.py b/taca/element/Element_Runs.py index c0c2c0c2..a338d691 100644 --- a/taca/element/Element_Runs.py +++ b/taca/element/Element_Runs.py @@ -7,6 +7,8 @@ from datetime import datetime from glob import glob +import pandas as pd + from taca.utils import misc from taca.utils.filesystem import chdir from taca.utils.statusdb import ElementRunsConnection @@ -174,6 +176,9 @@ def get_lims_step_id(self) -> str | None: """If the run was started using a LIMS-generated manifest, the ID of the LIMS step can be extracted from it. """ + + # TODO test me + assert self.manifest_exists(), "Run manifest not found" with open(self.run_manifest_file_from_instrument) as csv_file: manifest_lines = csv_file.readlines() @@ -186,6 +191,8 @@ def get_lims_step_id(self) -> str | None: def copy_manifests(self) -> bool: """Fetch the LIMS-generated run manifests from ngi-nas-ns and unzip them into a run subdir.""" + # TODO test me + # Specify dir in which LIMS drop the manifest zip files dir_to_search = os.path.join( self.CONFIG.get("Aviti").get( @@ -244,6 +251,114 @@ def copy_manifests(self) -> bool: return True + def make_demux_manifests( + self, manifest_to_split: os.PathLike, outdir: os.PathLike | None = None + ) -> list[os.PathLike]: + """Derive composite demultiplexing manifests (grouped by index duplicity and lengths) + from a single information-rich manifest. + """ + + # TODO test me + + # Read specified manifest + with open(manifest_to_split) as f: + manifest_contents = f.read() + + # Get '[SAMPLES]' section + split_contents = "[SAMPLES]".split(manifest_contents) + assert ( + len(split_contents) == 2 + ), f"Could not split sample rows out of manifest {manifest_contents}" + sample_section = split_contents[1].split("\n") + + # Split into header and rows + header = sample_section[0] + sample_rows = sample_section[1:] + + # Convert to list of dicts + sample_dicts = [] + for row in sample_rows: + row_dict = dict(zip(header.split(","), row.split(","))) + sample_dicts.append(row_dict) + + # Convert to dataframe + df = pd.DataFrame.from_dict(sample_dicts) + + # Separate samples from controls + df_samples = df[df["Project"] != "Control"].copy() + df_controls = df[df["Project"] == "Control"].copy() + + # Apply default dir path for output + if outdir is None: + outdir = self.run_dir + + ## Build composite manifests + + manifest_root_name = f"{self.NGI_run_id}_demux" + + # Get idx lengths for calculations + df_samples.loc[:, "len_idx1"] = df["Index1"].apply(len) + df_samples.loc[:, "len_idx2"] = df["Index2"].apply(len) + + # Break down by index lengths and lane, creating composite manifests + manifests = [] + n = 0 + for (len_idx1, len_idx2, lane), group in df_samples.groupby( + ["len_idx1", "len_idx2", "Lane"] + ): + file_name = f"{manifest_root_name}_{n}.csv" + runValues_section = "\n".join( + [ + "[RUNVALUES]", + "KeyName, Value", + f'manifest_file, "{file_name}"', + f"manifest_group, {n+1}/{len(df.groupby(['len_idx1', 'len_idx2', 'Lane']))}", + f"grouped_by, len_idx1:{len_idx1} len_idx2:{len_idx2} lane:{lane}", + ] + ) + + settings_section = "\n".join( + [ + "[SETTINGS]", + "SettingName, Value", + ] + ) + + # Add PhiX stratified by index length + if group["phix_loaded"].any(): + # Subset controls by lane + group_controls = df_controls[df_controls["Lane"] == lane].copy() + + # Trim PhiX indexes to match group + group_controls.loc[:, "Index1"] = group_controls.loc[:, "Index1"].apply( + lambda x: x[:len_idx1] + ) + group_controls.loc[:, "Index2"] = group_controls.loc[:, "Index2"].apply( + lambda x: x[:len_idx2] + ) + + # Add PhiX to group + group = pd.concat([group, group_controls], axis=0, ignore_index=True) + + samples_section = ( + f"[SAMPLES]\n{group.iloc[:, 0:6].to_csv(index=None, header=True)}" + ) + + manifest_contents = "\n\n".join( + [runValues_section, settings_section, samples_section] + ) + + file_path = os.path.join(outdir, file_name) + manifests.append((file_path, manifest_contents)) + n += 1 + + for manifest_path, manifest_contents in manifests: + with open(os.path.join(outdir, manifest_path), "w") as f: + f.write(manifest_contents) + + manifest_paths = [t[0] for t in manifests] + return manifest_paths + def generate_demux_command(self, run_manifest, demux_dir): command = [ self.CONFIG.get(self.software)[