Skip to content

Commit

Permalink
Restructure transfer status
Browse files Browse the repository at this point in the history
  • Loading branch information
ssjunnebo committed Sep 17, 2024
1 parent 3be5484 commit eda9f3f
Show file tree
Hide file tree
Showing 2 changed files with 16 additions and 5 deletions.
9 changes: 5 additions & 4 deletions taca/analysis/analysis_element.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,8 @@ def _process(run):
run.update_statusdb()
return
elif sequencing_done and demultiplexing_status == "finished":
if not run.in_transfer_log() and not run.transfer_ongoing() and not run.rsync_complete():
transfer_status = run.get_transfer_status()
if transfer_status == "not started":
run.aggregate_demux_results() # TODO: if multiple demux dirs, aggregate the results into Demultiplexing?
run.sync_metadata()
run.make_transfer_indicator()
Expand All @@ -81,13 +82,13 @@ def _process(run):
run.update_statusdb()
# TODO: Also update statusdb with a timestamp of when the transfer started
run.transfer() # I think this should be a detached command as well
elif run.transfer_ongoing() and not run.rsync_complete():
elif transfer_status == "ongoing":
run.status = "transferring"
if run.status_changed:
run.update_statusdb()
logger.info(f"{run} is being transferred. Skipping.")
return
elif run.rsync_complete() and not run.in_transfer_log():
elif transfer_status == "finished":
if run.rsync_success():
run.remove_transfer_indicator()
run.update_transfer_log()
Expand All @@ -102,7 +103,7 @@ def _process(run):
run.status = "transfer failed"
logger.warning(f"An issue occurred while transfering {run} to the analysis cluster." )
# TODO: email warning to operator
elif run.in_transfer_log():
elif transfer_status == "unknown":
logger.warning(
f"The run {run} has already been transferred but has not been archived. Please investigate"
)
Expand Down
12 changes: 11 additions & 1 deletion taca/element/Element_Runs.py
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,7 @@ def generate_demux_command(self, run_manifest, demux_dir):
], # TODO add path to bases2fastq executable to config
self.run_dir,
demux_dir,
"-p 12", # TODO: how many? Considering that we may start several demux runs at once
"-p 8",
f"-r {run_manifest}",
"--legacy-fastq", # TODO: except if Smart-seq3
"--force-index-orientation",
Expand All @@ -198,6 +198,16 @@ def start_demux(self, run_manifest, demux_dir):
f"started for run {self} on {datetime.now()}"
)

def get_transfer_status(self):
if not self.in_transfer_log() and not self.transfer_ongoing() and not self.rsync_complete():
return "not started"
elif self.transfer_ongoing() and not self.rsync_complete():
return "ongoing"
elif self.rsync_complete() and not self.in_transfer_log():
return "finished"
elif self.in_transfer_log():
return "unknown"

def in_transfer_log(self):
with open(self.transfer_file, 'r') as transfer_file:
for row in transfer_file.read():
Expand Down

0 comments on commit eda9f3f

Please sign in to comment.