Skip to content

Commit

Permalink
Add status of a run
Browse files Browse the repository at this point in the history
  • Loading branch information
ssjunnebo committed Sep 10, 2024
1 parent 3521443 commit 1a43207
Show file tree
Hide file tree
Showing 2 changed files with 52 additions and 39 deletions.
66 changes: 34 additions & 32 deletions taca/analysis/analysis_element.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,6 @@
logger = logging.getLogger(__name__)

Check warning on line 10 in taca/analysis/analysis_element.py

View check run for this annotation

Codecov / codecov/patch

taca/analysis/analysis_element.py#L10

Added line #L10 was not covered by tests


def _upload_to_statusdb(run):
"""Triggers the upload to statusdb.
:param Run run: the object run
"""
pass


def run_preprocessing(given_run):

Check warning on line 13 in taca/analysis/analysis_element.py

View check run for this annotation

Codecov / codecov/patch

taca/analysis/analysis_element.py#L13

Added line #L13 was not covered by tests
"""Run demultiplexing in all data directories.
Expand All @@ -29,20 +21,19 @@ def _process(run):
:param taca.element.Run run: Run to be processed and transferred
"""
# TODO: Fetch statusdb document for run
# TODO: Get previous status of run from statusdb document
sequencing_done = run.check_sequencing_status()
demultiplexing_status = run.get_demultiplexing_status()
if not sequencing_done:
# TODO: compare previous status with current status and update statusdb document if different
return
elif sequencing_done and demultiplexing_status == "not started":
if not sequencing_done: # Sequencing ongoing
current_run_status = 'sequencing'
if run.status_changed(current_run_status):
run.update_statusdb(current_run_status) #TODO: what info needs to be gathered and uploaded?
elif sequencing_done and demultiplexing_status == "not started": # Sequencing done. Start demux
if not run.manifest_exists():
logger.warn(f"Run manifest is missing for {run.flowcell_id}")

Check warning on line 32 in taca/analysis/analysis_element.py

View check run for this annotation

Codecov / codecov/patch

taca/analysis/analysis_element.py#L24-L32

Added lines #L24 - L32 were not covered by tests
#TODO: email operator warning
return
elif run.manifest_exists():
# Get sample info from manifest
sample_info = run.get_sample_info()
sample_info = run.get_sample_info_from_manifest()
sample_types = run.get_sample_types(sample_info)
if len(sample_types) == 1:
run.start_demux()
Expand All @@ -51,48 +42,59 @@ def _process(run):
run.make_manifest(sample_info, sample_type)
run.start_demux()

Check warning on line 43 in taca/analysis/analysis_element.py

View check run for this annotation

Codecov / codecov/patch

taca/analysis/analysis_element.py#L34-L43

Added lines #L34 - L43 were not covered by tests
else:
#TODO: warn that no samples were found in the run manifest
logger.warn(f"No samples were found in the sample manifest for run {run.flowcell_id}.")

Check warning on line 45 in taca/analysis/analysis_element.py

View check run for this annotation

Codecov / codecov/patch

taca/analysis/analysis_element.py#L45

Added line #L45 was not covered by tests
#TODO: email operator warning
return
#TODO: compare previous status with current status and update statusdb document if different
current_run_status = "demultiplexing"
if run.status_changed(current_run_status):
run.update_statusdb(current_run_status)
elif sequencing_done and demultiplexing_status == "ongoing":
# TODO: compare previous status with current status and update statusdb document if different
current_run_status = "demultiplexing"
if run.status_changed(current_run_status):
run.update_statusdb(current_run_status)
return
elif sequencing_done and demultiplexing_status == "finished":
transfer_file = CONFIG.get('Element').get('Aviti').get('transfer_log')
if not run.is_transferred(transfer_file) and not run.transfer_ongoing():
run.sync_metadata()
run.make_transfer_indicator()
#TODO: compare previous status with current status and update statusdb document if different
# Also update statusdb with a timestamp of when the transfer started
current_run_status = "transferring"
if run.status_changed(current_run_status):
run.update_statusdb(current_run_status)

Check warning on line 63 in taca/analysis/analysis_element.py

View check run for this annotation

Codecov / codecov/patch

taca/analysis/analysis_element.py#L47-L63

Added lines #L47 - L63 were not covered by tests
#TODO: Also update statusdb with a timestamp of when the transfer started
run.transfer()
run.remove_transfer_indicator()
run.update_transfer_log(transfer_file)
#TODO: update statusdb document
current_run_status = "transferred"
if run.status_changed(current_run_status):
run.update_statusdb(current_run_status)
run.archive()
current_run_status = "archived"
if run.status_changed(current_run_status):
run.update_statusdb(current_run_status)
elif not run.is_transferred(transfer_file) and run.transfer_ongoing():
#TODO: compare previous status with current status and update statusdb document if different
logger.info("Run is being transferred. Skipping.")
current_run_status = "transferring"
if run.status_changed(current_run_status):
run.update_statusdb(current_run_status)
logger.info(f"Run {run.flowcell_id} is being transferred. Skipping.")
return
elif run.is_transferred(transfer_file):
#TODO: compare previous status with current status and update statusdb document if different
# warn that transferred run has not been archived
logger.warn("The run has already been transferred but has not been archived. Please investigate")
logger.warn(f"The run {run.flowcell_id} has already been transferred but has not been archived. Please investigate")

Check warning on line 82 in taca/analysis/analysis_element.py

View check run for this annotation

Codecov / codecov/patch

taca/analysis/analysis_element.py#L65-L82

Added lines #L65 - L82 were not covered by tests
#TODO: email operator warning
return

Check warning on line 84 in taca/analysis/analysis_element.py

View check run for this annotation

Codecov / codecov/patch

taca/analysis/analysis_element.py#L84

Added line #L84 was not covered by tests
else:
logger.warn("Unknown transfer status. Please investigate")
logger.warn(f"Unknown transfer status of run {run.flowcell_id}. Please investigate")

Check warning on line 86 in taca/analysis/analysis_element.py

View check run for this annotation

Codecov / codecov/patch

taca/analysis/analysis_element.py#L86

Added line #L86 was not covered by tests



if given_run:
run = Aviti_Run(
run
) # TODO: Needs to change if more Element machines are aquired in the future
run = Aviti_Run(given_run) # TODO: Needs to change if more Element machines are aquired in the future
_process(run)

Check warning on line 92 in taca/analysis/analysis_element.py

View check run for this annotation

Codecov / codecov/patch

taca/analysis/analysis_element.py#L90-L92

Added lines #L90 - L92 were not covered by tests
else:
data_dirs = CONFIG.get("element_analysis").get(

Check warning on line 94 in taca/analysis/analysis_element.py

View check run for this annotation

Codecov / codecov/patch

taca/analysis/analysis_element.py#L94

Added line #L94 was not covered by tests
"data_dirs"
) # TODO: add to config
for data_dir in data_dirs:
for data_dir in data_dirs: #TODO: make sure to look in both side A and B

Check warning on line 97 in taca/analysis/analysis_element.py

View check run for this annotation

Codecov / codecov/patch

taca/analysis/analysis_element.py#L97

Added line #L97 was not covered by tests
# Run folder looks like DATE_*_*_*, the last section is the FC name.
runs = glob.glob(

Check warning on line 99 in taca/analysis/analysis_element.py

View check run for this annotation

Codecov / codecov/patch

taca/analysis/analysis_element.py#L99

Added line #L99 was not covered by tests
os.path.join(data_dir, "[1-9]*_*_*_*")
Expand Down
25 changes: 18 additions & 7 deletions taca/element/Element_Runs.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,16 @@ class Run:
def __init__(self, run_dir, configuration):
if not os.path.exists(run_dir):
raise RuntimeError(f"Could not locate run directory {run_dir}")

Check warning on line 17 in taca/element/Element_Runs.py

View check run for this annotation

Codecov / codecov/patch

taca/element/Element_Runs.py#L17

Added line #L17 was not covered by tests
self.run_dir = os.path.abspath(run_dir)
self.flowcell_id = run_dir #TODO: get flowcell id from json instead
self.run_dir = os.path.abspath(run_dir) # TODO: How to handle SideA/SideB?
self.CONFIG = configuration
self.demux_dir = os.path.join(self.run_dir, "Demultiplexing")
self.final_sequencing_file = os.path.join(self.run_dir, "RunUploaded.json")
self.demux_stats_file = os.path.join(
self.demux_dir, "RunStats.json"
) # TODO: How to handle SideA/SideB?
self.demux_dir, "RunStats.json" # Assumes demux is finished when this file is created
)
self.run_manifest_file = os.path.join(self.run_dir, "RunManifest.csv")

def check_sequencing_status(self):
if os.path.exists(self.final_sequencing_file):
with open(self.final_sequencing_file) as json_file:
Expand All @@ -34,7 +35,7 @@ def check_sequencing_status(self):
return True
else:
return False

def get_demultiplexing_status(self):
if not os.path.exists(self.demux_dir):
return "not started"
Expand All @@ -44,12 +45,22 @@ def get_demultiplexing_status(self):
return "ongoing"
elif os.path.exists(self.demux_dir) and os.path.isfile(self.demux_stats_file):
return "finished"
else:
return "unknown"

Check warning on line 49 in taca/element/Element_Runs.py

View check run for this annotation

Codecov / codecov/patch

taca/element/Element_Runs.py#L49

Added line #L49 was not covered by tests

def status_changed(self, current_run_status):
#TODO: get document from statusdb, check status field, return true if status of run changed
pass

Check warning on line 53 in taca/element/Element_Runs.py

View check run for this annotation

Codecov / codecov/patch

taca/element/Element_Runs.py#L53

Added line #L53 was not covered by tests

def update_statusdb(self, current_run_status):
#TODO: Get document from statusdb. Gather data about run and update the statusdb document, then upload to statusdb
pass

Check warning on line 57 in taca/element/Element_Runs.py

View check run for this annotation

Codecov / codecov/patch

taca/element/Element_Runs.py#L57

Added line #L57 was not covered by tests

def manifest_exists(self):
return os.path.isfile(self.run_manifest_file)

def get_sample_info(self):
sample_info = {} #TODO: populate
def get_sample_info_from_manifest(self):
sample_info = {} #TODO: populate with sample info from manifest
return sample_info

Check warning on line 64 in taca/element/Element_Runs.py

View check run for this annotation

Codecov / codecov/patch

taca/element/Element_Runs.py#L63-L64

Added lines #L63 - L64 were not covered by tests

def get_sample_types(self, sample_info):
Expand Down

0 comments on commit 1a43207

Please sign in to comment.