From 1a4320750e8b13294cdc996658f3f36d3a649db1 Mon Sep 17 00:00:00 2001 From: Sara Sjunnebo Date: Tue, 10 Sep 2024 15:20:00 +0200 Subject: [PATCH] Add status of a run --- taca/analysis/analysis_element.py | 66 ++++++++++++++++--------------- taca/element/Element_Runs.py | 25 ++++++++---- 2 files changed, 52 insertions(+), 39 deletions(-) diff --git a/taca/analysis/analysis_element.py b/taca/analysis/analysis_element.py index 50f95cef..433b2483 100755 --- a/taca/analysis/analysis_element.py +++ b/taca/analysis/analysis_element.py @@ -10,14 +10,6 @@ logger = logging.getLogger(__name__) -def _upload_to_statusdb(run): - """Triggers the upload to statusdb. - - :param Run run: the object run - """ - pass - - def run_preprocessing(given_run): """Run demultiplexing in all data directories. @@ -29,20 +21,19 @@ def _process(run): :param taca.element.Run run: Run to be processed and transferred """ - # TODO: Fetch statusdb document for run - # TODO: Get previous status of run from statusdb document sequencing_done = run.check_sequencing_status() demultiplexing_status = run.get_demultiplexing_status() - if not sequencing_done: - # TODO: compare previous status with current status and update statusdb document if different - return - elif sequencing_done and demultiplexing_status == "not started": + if not sequencing_done: # Sequencing ongoing + current_run_status = 'sequencing' + if run.status_changed(current_run_status): + run.update_statusdb(current_run_status) #TODO: what info needs to be gathered and uploaded? + elif sequencing_done and demultiplexing_status == "not started": # Sequencing done. Start demux if not run.manifest_exists(): + logger.warn(f"Run manifest is missing for {run.flowcell_id}") #TODO: email operator warning return elif run.manifest_exists(): - # Get sample info from manifest - sample_info = run.get_sample_info() + sample_info = run.get_sample_info_from_manifest() sample_types = run.get_sample_types(sample_info) if len(sample_types) == 1: run.start_demux() @@ -51,48 +42,59 @@ def _process(run): run.make_manifest(sample_info, sample_type) run.start_demux() else: - #TODO: warn that no samples were found in the run manifest + logger.warn(f"No samples were found in the sample manifest for run {run.flowcell_id}.") + #TODO: email operator warning return - #TODO: compare previous status with current status and update statusdb document if different + current_run_status = "demultiplexing" + if run.status_changed(current_run_status): + run.update_statusdb(current_run_status) elif sequencing_done and demultiplexing_status == "ongoing": - # TODO: compare previous status with current status and update statusdb document if different + current_run_status = "demultiplexing" + if run.status_changed(current_run_status): + run.update_statusdb(current_run_status) return elif sequencing_done and demultiplexing_status == "finished": transfer_file = CONFIG.get('Element').get('Aviti').get('transfer_log') if not run.is_transferred(transfer_file) and not run.transfer_ongoing(): run.sync_metadata() run.make_transfer_indicator() - #TODO: compare previous status with current status and update statusdb document if different - # Also update statusdb with a timestamp of when the transfer started + current_run_status = "transferring" + if run.status_changed(current_run_status): + run.update_statusdb(current_run_status) + #TODO: Also update statusdb with a timestamp of when the transfer started run.transfer() run.remove_transfer_indicator() run.update_transfer_log(transfer_file) - #TODO: update statusdb document + current_run_status = "transferred" + if run.status_changed(current_run_status): + run.update_statusdb(current_run_status) run.archive() + current_run_status = "archived" + if run.status_changed(current_run_status): + run.update_statusdb(current_run_status) elif not run.is_transferred(transfer_file) and run.transfer_ongoing(): - #TODO: compare previous status with current status and update statusdb document if different - logger.info("Run is being transferred. Skipping.") + current_run_status = "transferring" + if run.status_changed(current_run_status): + run.update_statusdb(current_run_status) + logger.info(f"Run {run.flowcell_id} is being transferred. Skipping.") return elif run.is_transferred(transfer_file): - #TODO: compare previous status with current status and update statusdb document if different - # warn that transferred run has not been archived - logger.warn("The run has already been transferred but has not been archived. Please investigate") + logger.warn(f"The run {run.flowcell_id} has already been transferred but has not been archived. Please investigate") + #TODO: email operator warning return else: - logger.warn("Unknown transfer status. Please investigate") + logger.warn(f"Unknown transfer status of run {run.flowcell_id}. Please investigate") if given_run: - run = Aviti_Run( - run - ) # TODO: Needs to change if more Element machines are aquired in the future + run = Aviti_Run(given_run) # TODO: Needs to change if more Element machines are aquired in the future _process(run) else: data_dirs = CONFIG.get("element_analysis").get( "data_dirs" ) # TODO: add to config - for data_dir in data_dirs: + for data_dir in data_dirs: #TODO: make sure to look in both side A and B # Run folder looks like DATE_*_*_*, the last section is the FC name. runs = glob.glob( os.path.join(data_dir, "[1-9]*_*_*_*") diff --git a/taca/element/Element_Runs.py b/taca/element/Element_Runs.py index b8c273b8..813e2ea4 100644 --- a/taca/element/Element_Runs.py +++ b/taca/element/Element_Runs.py @@ -15,15 +15,16 @@ class Run: def __init__(self, run_dir, configuration): if not os.path.exists(run_dir): raise RuntimeError(f"Could not locate run directory {run_dir}") - self.run_dir = os.path.abspath(run_dir) + self.flowcell_id = run_dir #TODO: get flowcell id from json instead + self.run_dir = os.path.abspath(run_dir) # TODO: How to handle SideA/SideB? self.CONFIG = configuration self.demux_dir = os.path.join(self.run_dir, "Demultiplexing") self.final_sequencing_file = os.path.join(self.run_dir, "RunUploaded.json") self.demux_stats_file = os.path.join( - self.demux_dir, "RunStats.json" - ) # TODO: How to handle SideA/SideB? + self.demux_dir, "RunStats.json" # Assumes demux is finished when this file is created + ) self.run_manifest_file = os.path.join(self.run_dir, "RunManifest.csv") - + def check_sequencing_status(self): if os.path.exists(self.final_sequencing_file): with open(self.final_sequencing_file) as json_file: @@ -34,7 +35,7 @@ def check_sequencing_status(self): return True else: return False - + def get_demultiplexing_status(self): if not os.path.exists(self.demux_dir): return "not started" @@ -44,12 +45,22 @@ def get_demultiplexing_status(self): return "ongoing" elif os.path.exists(self.demux_dir) and os.path.isfile(self.demux_stats_file): return "finished" + else: + return "unknown" + + def status_changed(self, current_run_status): + #TODO: get document from statusdb, check status field, return true if status of run changed + pass + + def update_statusdb(self, current_run_status): + #TODO: Get document from statusdb. Gather data about run and update the statusdb document, then upload to statusdb + pass def manifest_exists(self): return os.path.isfile(self.run_manifest_file) - def get_sample_info(self): - sample_info = {} #TODO: populate + def get_sample_info_from_manifest(self): + sample_info = {} #TODO: populate with sample info from manifest return sample_info def get_sample_types(self, sample_info):