Skip to content

Commit

Permalink
ruff format
Browse files Browse the repository at this point in the history
  • Loading branch information
kedhammar committed Sep 2, 2024
1 parent fa473cd commit 7e84ac1
Show file tree
Hide file tree
Showing 3 changed files with 55 additions and 40 deletions.
57 changes: 32 additions & 25 deletions taca/analysis/analysis_element.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,15 @@

logger = logging.getLogger(__name__)


def _upload_to_statusdb(run):
"""Triggers the upload to statusdb.
:param Run run: the object run
"""
pass


def run_preprocessing(given_run):
"""Run demultiplexing in all data directories.
Expand All @@ -29,59 +31,64 @@ def _process(run):
:param taca.element.Run run: Run to be processed and transferred
"""
#TODO: Fetch statusdb document for run
#TODO: Get previous status of run from statusdb document
# TODO: Fetch statusdb document for run
# TODO: Get previous status of run from statusdb document
sequencing_done = run.check_sequencing_status()
demultiplexing_status = run.get_demultiplexing_status()
if not sequencing_done:
#TODO: compare previous status with current status and update statusdb document if different
# TODO: compare previous status with current status and update statusdb document if different
return
elif sequencing_done and demultiplexing_status == "not started":
if not run.manifest_exists(): # Assumes that we use the same manifest as for sequencing. TODO: demux settings need to be added to the original manifest by lims
#TODO: email operator that manifest is missing
if not run.manifest_exists(): # Assumes that we use the same manifest as for sequencing. TODO: demux settings need to be added to the original manifest by lims
# TODO: email operator that manifest is missing
return
# Start demux
run.start_demux()
#TODO: compare previous status with current status and update statusdb document if different
# TODO: compare previous status with current status and update statusdb document if different
return
elif sequencing_done and demultiplexing_status == "ongoing":
#TODO: compare previous status with current status and update statusdb document if different
# TODO: compare previous status with current status and update statusdb document if different
return
elif sequencing_done and demultiplexing_status == "finished":
# Sync metadata to ngi-data-ns
# check if run is transferred or transfer is ongoing
# if run has not been transferred and transfer is not ongoing
# make a hidden file to indicate that transfer has started
# compare previous status with current status and update statusdb document if different
# Also update statusdb with a timestamp of when the transfer started
# transfer run to miarka
# remove hidden file if transfer was successful
# Update transfer log
# update statusdb document
# archive run to nosync
# update statusdb document
# make a hidden file to indicate that transfer has started
# compare previous status with current status and update statusdb document if different
# Also update statusdb with a timestamp of when the transfer started
# transfer run to miarka
# remove hidden file if transfer was successful
# Update transfer log
# update statusdb document
# archive run to nosync
# update statusdb document
# elif run is being transferred (hidden file exists)
# compare previous status with current status and update statusdb document if different
# return
# compare previous status with current status and update statusdb document if different
# return
# elif run is already transferred (in transfer log)
# compare previous status with current status and update statusdb document if different
# warn that transferred run has not been archived
# compare previous status with current status and update statusdb document if different
# warn that transferred run has not been archived
pass


if given_run:
run = Aviti_Run(run) #TODO: Needs to change if more Element machines are aquired in the future
run = Aviti_Run(
run
) # TODO: Needs to change if more Element machines are aquired in the future
_process(runObj)
else:
data_dirs = CONFIG.get("element_analysis").get("data_dirs") #TODO: add to config
data_dirs = CONFIG.get("element_analysis").get(
"data_dirs"
) # TODO: add to config
for data_dir in data_dirs:
# Run folder looks like DATE_*_*_*, the last section is the FC name.
runs = glob.glob(os.path.join(data_dir, "[1-9]*_*_*_*")) #TODO: adapt to aviti format
runs = glob.glob(
os.path.join(data_dir, "[1-9]*_*_*_*")
) # TODO: adapt to aviti format
for run in runs:
runObj = Aviti_Run(run)
try:
_process(runObj)
except: #TODO: chatch error message and print it
except: # TODO: chatch error message and print it
# This function might throw and exception,
# it is better to continue processing other runs
logger.warning(f"There was an error processing the run {run}")
Expand Down
1 change: 1 addition & 0 deletions taca/analysis/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ def updatedb(rundir, software):
"""Save the run to statusdb."""
an.upload_to_statusdb(rundir, software)


# Element analysis subcommands


Expand Down
37 changes: 22 additions & 15 deletions taca/element/Element_Runs.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,11 @@ def __init__(self, run_dir, configuration):
self.CONFIG = configuration
self.demux_dir = os.path.join(self.run_dir, "Demultiplexing")
self.final_sequencing_file = os.path.join(self.run_dir, "RunUploaded.json")
self.demux_stats_file = os.path.join(self.demux_dir, "RunStats.json") #TODO: How to handle SideA/SideB?
self.demux_stats_file = os.path.join(
self.demux_dir, "RunStats.json"
) # TODO: How to handle SideA/SideB?
self.run_manifest_file = os.path.join(self.run_dir, "RunManifest.csv")

def check_sequencing_status(self):
if os.path.exists(self.final_sequencing_file):
with open(self.final_sequencing_file) as json_file:
Expand All @@ -32,26 +34,31 @@ def check_sequencing_status(self):
return True
else:
return False

def get_demultiplexing_status(self):
if not os.path.exists(self.demux_dir):
return "not started"
elif os.path.exists(self.demux_dir) and not os.path.isfile(self.demux_stats_file):
elif os.path.exists(self.demux_dir) and not os.path.isfile(
self.demux_stats_file
):
return "ongoing"
elif os.path.exists(self.demux_dir) and os.path.isfile(self.demux_stats_file):
return "finished"

def manifest_exists(self):
return os.path.isfile(self.run_manifest_file)

def generate_demux_command(self):
command = [self.CONFIG.get(self.software)["bin"], #TODO add path to bases2fastq executable to config
self.run_dir,
self.demux_dir, #TODO: how to handle SideA/SideB?
"-p 12"
]
command = [
self.CONFIG.get(self.software)[
"bin"
], # TODO add path to bases2fastq executable to config
self.run_dir,
self.demux_dir, # TODO: how to handle SideA/SideB?
"-p 12",
]
return command

def start_demux(self):
with chdir(self.run_dir):
cmd = self.generate_demux_command()
Expand All @@ -62,9 +69,9 @@ def start_demux(self):
"Bases2Fastq conversion and demultiplexing "
f"started for run {os.path.basename(self.run_dir)} on {datetime.now()}"
)

def is_transferred(self, transfer_file):
pass

def parse_rundir(self):
pass
pass

0 comments on commit 7e84ac1

Please sign in to comment.