Skip to content

Commit

Permalink
refact(db): move some db calls into methods in class
Browse files Browse the repository at this point in the history
  • Loading branch information
davidlougheed committed Apr 11, 2024
1 parent d15d755 commit 35af319
Show file tree
Hide file tree
Showing 2 changed files with 16 additions and 13 deletions.
11 changes: 2 additions & 9 deletions bento_wes/backends/_wes_backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -354,11 +354,7 @@ def _initialize_run_and_get_command(
self.log_error("Could not find workflow name in workflow file")
return self._finish_run_and_clean_up(run, states.STATE_SYSTEM_ERROR)

c = self.db.cursor()

# TODO: To avoid having multiple names, we should maybe only set this once?
c.execute("UPDATE runs SET run_log__name = ? WHERE id = ?", (workflow_name, run.run_id))
self.db.commit()
self.db.set_run_log_name(run, workflow_name)

# -- Store input for the workflow in a file in the temporary folder --------------------------------------------
with open(self._params_path(run), "w") as pf:
Expand All @@ -368,10 +364,7 @@ def _initialize_run_and_get_command(
cmd = self._get_command(self.workflow_path(run), self._params_path(run), self.run_dir(run))

# -- Update run log with command and Celery ID -----------------------------------------------------------------
c.execute(
"UPDATE runs SET run_log__cmd = ?, run_log__celery_id = ? WHERE id = ?",
(" ".join(cmd), celery_id, run.run_id))
self.db.commit()
self.db.set_run_log_command_and_celery_id(run, cmd, celery_id)

return cmd, workflow_params_with_secrets

Expand Down
18 changes: 14 additions & 4 deletions bento_wes/db.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
from urllib.parse import urljoin

from . import states
from .backends.backend_types import Command
from .constants import SERVICE_ARTIFACT
from .events import get_flask_event_bus
from .models import RunLog, RunRequest, Run, RunWithDetailsAndOutput
Expand Down Expand Up @@ -95,10 +96,8 @@ def close(self):
self._conn.close()

def init(self):
c = self.cursor()

with current_app.open_resource("schema.sql") as sf:
c.executescript(sf.read().decode("utf-8"))
self.cursor().executescript(sf.read().decode("utf-8"))

self.commit()

Expand All @@ -113,10 +112,10 @@ def finish_run(
"""
Updates a run's state, sets the run log's end time, and publishes an event corresponding with a run failure
or a run success, depending on the state.
:param c: An SQLite connection cursor
:param event_bus: A bento_lib-defined event bus implementation for sending events
:param run: The run which just finished
:param state: The terminal state for the finished run
:param cursor: An SQLite connection cursor to re-use (optional)
:param logger: An optionally-provided logger object.
:return:
"""
Expand Down Expand Up @@ -224,6 +223,17 @@ def get_run_with_details(
return cls.run_with_details_and_output_from_row(c, run, stream_content)
return None

def set_run_log_name(self, run: Run, workflow_name: str):
# TODO: To avoid having multiple names, we should maybe only set this once?
self.cursor().execute("UPDATE runs SET run_log__name = ? WHERE id = ?", (workflow_name, run.run_id))
self.commit()

def set_run_log_command_and_celery_id(self, run: Run, cmd: Command, celery_id: int):
self.cursor().execute(
"UPDATE runs SET run_log__cmd = ?, run_log__celery_id = ? WHERE id = ?",
(" ".join(cmd), celery_id, run.run_id))
self.commit()

@staticmethod
def set_run_outputs(c: sqlite3.Cursor, run_id: str, outputs: dict[str, Any]):
c.execute("UPDATE runs SET outputs = ? WHERE id = ?", (json.dumps(outputs), str(run_id)))
Expand Down

0 comments on commit 35af319

Please sign in to comment.