Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Change db names #422

Closed
wants to merge 7 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions VERSIONLOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
# TACA Version Log

## 20240418.1

Removed dbname option from classes where it was not used. Renamed StatusDB database variables to attempt to standardize them.

## 20240410.1

Expand test coverage by starting and checking demultiplexing for a NovaSeqXPlus run.
Expand Down
4 changes: 2 additions & 2 deletions taca/analysis/analysis.py
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ def _upload_to_statusdb(run):
"""
couch_conf = CONFIG["statusdb"]
couch_connection = statusdb.StatusdbSession(couch_conf).connection
db = couch_connection[couch_conf["xten_db"]]
x_flowcells_db = couch_connection[couch_conf["xten_db"]]
parser = run.runParserObj
# Check if I have NoIndex lanes
for element in parser.obj["samplesheet_csv"]:
Expand Down Expand Up @@ -155,7 +155,7 @@ def _upload_to_statusdb(run):
parser.obj["DemultiplexConfig"] = {
"Setup": {"Software": run.CONFIG.get("bcl2fastq", {})}
}
statusdb.update_doc(db, parser.obj, over_write_db_entry=True)
statusdb.update_doc(x_flowcells_db, parser.obj, over_write_db_entry=True)


def transfer_run(run_dir, software):
Expand Down
10 changes: 6 additions & 4 deletions taca/backup/backup.py
Original file line number Diff line number Diff line change
Expand Up @@ -265,12 +265,14 @@ def _log_pdc_statusdb(self, run):
run_date = run_vals[0]
run_fc = f"{run_date}_{run_vals[-1]}"
couch_connection = statusdb.StatusdbSession(self.couch_info).connection
db = couch_connection[self.couch_info["db"]]
fc_names = {e.key: e.id for e in db.view("names/name", reduce=False)}
x_flowcells_db = couch_connection[self.couch_info["db"]]
fc_names = {
e.key: e.id for e in x_flowcells_db.view("names/name", reduce=False)
}
d_id = fc_names[run_fc]
doc = db.get(d_id)
doc = x_flowcells_db.get(d_id)
doc["pdc_archived"] = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
db.save(doc)
x_flowcells_db.save(doc)
logger.info(
f'Logged "pdc_archived" timestamp for fc {run} in statusdb doc "{d_id}"'
)
Expand Down
14 changes: 7 additions & 7 deletions taca/utils/bioinfo_tab.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,8 @@ def update_statusdb(run_dir):
statusdb_conf = CONFIG.get("statusdb")
couch_connection = statusdb.StatusdbSession(statusdb_conf).connection
valueskey = datetime.datetime.now().isoformat()
db = couch_connection["bioinfo_analysis"]
view = db.view("latest_data/sample_id")
bioinfo_db = couch_connection["bioinfo_analysis"]
view = bioinfo_db.view("latest_data/sample_id")
# Construction and sending of individual records, if samplesheet is incorrectly formatted the loop is skipped
if project_info:
for flowcell in project_info:
Expand Down Expand Up @@ -87,8 +87,8 @@ def update_statusdb(run_dir):
if len(view[[project, run_id, lane, sample]].rows) >= 1:
remote_id = view[[project, run_id, lane, sample]].rows[0].id
lane = str(lane)
remote_doc = db[remote_id]["values"]
remote_status = db[remote_id]["status"]
remote_doc = bioinfo_db[remote_id]["values"]
remote_status = bioinfo_db[remote_id]["status"]
# Only updates the listed statuses
if (
remote_status
Expand All @@ -110,16 +110,16 @@ def update_statusdb(run_dir):
)
)
# Update record cluster
obj["_rev"] = db[remote_id].rev
obj["_rev"] = bioinfo_db[remote_id].rev
obj["_id"] = remote_id
db.save(obj)
bioinfo_db.save(obj)
# Creates new entry
else:
logger.info(
f"Creating {run_id} {project} {flowcell} {lane} {sample} as {sample_status}"
)
# Creates record
db.save(obj)
bioinfo_db.save(obj)
# Sets FC error flag
if project_info[flowcell].value is not None:
if (
Expand Down
6 changes: 3 additions & 3 deletions taca/utils/misc.py
Original file line number Diff line number Diff line change
Expand Up @@ -214,11 +214,11 @@ def run_is_demuxed(run, couch_info=None, seq_run_type=None):
run_name = f"{run_date}_{run_fc}"
try:
couch_connection = statusdb.StatusdbSession(couch_info).connection
fc_db = couch_connection[couch_info["xten_db"]]
for fc in fc_db.view("names/name", reduce=False, descending=True):
x_flowcells_db = couch_connection[couch_info["xten_db"]]
for fc in x_flowcells_db.view("names/name", reduce=False, descending=True):
if fc.key != run_name:
continue
fc_doc = fc_db.get(fc.id)
fc_doc = x_flowcells_db.get(fc.id)
if not fc_doc or not fc_doc.get("illumina", {}).get(
"Demultiplex_Stats", {}
):
Expand Down
42 changes: 23 additions & 19 deletions taca/utils/statusdb.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,57 +83,61 @@ def get_project_flowcell(


class ProjectSummaryConnection(StatusdbSession):
def __init__(self, config, dbname="projects"):
def __init__(self, config):
super().__init__(config)
self.db = self.connection[dbname]
self.projects_db = self.connection["projects"]
self.name_view = {
k.key: k.id for k in self.db.view("project/project_name", reduce=False)
k.key: k.id
for k in self.projects_db.view("project/project_name", reduce=False)
}
self.id_view = {
k.key: k.id for k in self.db.view("project/project_id", reduce=False)
k.key: k.id
for k in self.projects_db.view("project/project_id", reduce=False)
}


class FlowcellRunMetricsConnection(StatusdbSession):
def __init__(self, config, dbname="flowcells"):
def __init__(self, config):
super().__init__(config)
self.db = self.connection[dbname]
self.flowcells_db = self.connection["flowcells"]
self.name_view = {k.key: k.id for k in self.db.view("names/name", reduce=False)}
self.proj_list = {
k.key: k.value
for k in self.db.view("names/project_ids_list", reduce=False)
for k in self.flowcells_db.view("names/project_ids_list", reduce=False)
if k.key
}


class X_FlowcellRunMetricsConnection(StatusdbSession):
def __init__(self, config, dbname="x_flowcells"):
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is the dbname here used in taca_ngi_pipeline?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes you're right. It is reused in the common methods in the StatusDBSession class. Hmm, might have to rethink this.

super().__init__(config)
self.db = self.connection[dbname]
self.name_view = {k.key: k.id for k in self.db.view("names/name", reduce=False)}
self.x_flowcells_db = self.connection["x_flowcells"]
self.name_view = {
k.key: k.id for k in self.x_flowcells_db.view("names/name", reduce=False)
}
self.proj_list = {
k.key: k.value
for k in self.db.view("names/project_ids_list", reduce=False)
for k in self.x_flowcells_db.view("names/project_ids_list", reduce=False)
if k.key
}


class NanoporeRunsConnection(StatusdbSession):
def __init__(self, config, dbname="nanopore_runs"):
def __init__(self, config):
super().__init__(config)
self.db = self.connection[dbname]
self.nanopore_runs_db = self.connection["nanopore_runs"]

def check_run_exists(self, ont_run) -> bool:
view_names = self.db.view("names/name")
view_names = self.nanopore_runs_db.view("names/name")
if len(view_names[ont_run.run_name].rows) > 0:
return True
else:
return False

def check_run_status(self, ont_run) -> str:
view_all_stats = self.db.view("names/name")
view_all_stats = self.nanopore_runs_db.view("names/name")
doc_id = view_all_stats[ont_run.run_name].rows[0].id
return self.db[doc_id]["run_status"]
return self.nanopore_runs_db[doc_id]["run_status"]

def create_ongoing_run(
self, ont_run, run_path_file: str, pore_count_history_file: str
Expand All @@ -151,19 +155,19 @@ def create_ongoing_run(
"pore_count_history": pore_counts,
}

new_doc_id, new_doc_rev = self.db.save(new_doc)
new_doc_id, new_doc_rev = self.nanopore_runs_db.save(new_doc)
logger.info(
f"New database entry created: {ont_run.run_name}, id {new_doc_id}, rev {new_doc_rev}"
)

def finish_ongoing_run(self, ont_run, dict_json: dict):
view_names = self.db.view("names/name")
view_names = self.nanopore_runs_db.view("names/name")
doc_id = view_names[ont_run.run_name].rows[0].id
doc = self.db[doc_id]
doc = self.nanopore_runs_db[doc_id]

doc.update(dict_json)
doc["run_status"] = "finished"
self.db[doc.id] = doc
self.nanopore_runs_db[doc.id] = doc


def update_doc(db, obj, over_write_db_entry=False):
Expand Down
Loading