diff --git a/bento_wes/backends/_wes_backend.py b/bento_wes/backends/_wes_backend.py index f3cb0a7..26859c3 100644 --- a/bento_wes/backends/_wes_backend.py +++ b/bento_wes/backends/_wes_backend.py @@ -354,11 +354,7 @@ def _initialize_run_and_get_command( self.log_error("Could not find workflow name in workflow file") return self._finish_run_and_clean_up(run, states.STATE_SYSTEM_ERROR) - c = self.db.cursor() - - # TODO: To avoid having multiple names, we should maybe only set this once? - c.execute("UPDATE runs SET run_log__name = ? WHERE id = ?", (workflow_name, run.run_id)) - self.db.commit() + self.db.set_run_log_name(run, workflow_name) # -- Store input for the workflow in a file in the temporary folder -------------------------------------------- with open(self._params_path(run), "w") as pf: @@ -368,10 +364,7 @@ def _initialize_run_and_get_command( cmd = self._get_command(self.workflow_path(run), self._params_path(run), self.run_dir(run)) # -- Update run log with command and Celery ID ----------------------------------------------------------------- - c.execute( - "UPDATE runs SET run_log__cmd = ?, run_log__celery_id = ? WHERE id = ?", - (" ".join(cmd), celery_id, run.run_id)) - self.db.commit() + self.db.set_run_log_command_and_celery_id(run, cmd, celery_id) return cmd, workflow_params_with_secrets diff --git a/bento_wes/db.py b/bento_wes/db.py index f4578e1..69493b7 100644 --- a/bento_wes/db.py +++ b/bento_wes/db.py @@ -11,6 +11,7 @@ from urllib.parse import urljoin from . import states +from .backends.backend_types import Command from .constants import SERVICE_ARTIFACT from .events import get_flask_event_bus from .models import RunLog, RunRequest, Run, RunWithDetailsAndOutput @@ -95,10 +96,8 @@ def close(self): self._conn.close() def init(self): - c = self.cursor() - with current_app.open_resource("schema.sql") as sf: - c.executescript(sf.read().decode("utf-8")) + self.cursor().executescript(sf.read().decode("utf-8")) self.commit() @@ -113,10 +112,10 @@ def finish_run( """ Updates a run's state, sets the run log's end time, and publishes an event corresponding with a run failure or a run success, depending on the state. - :param c: An SQLite connection cursor :param event_bus: A bento_lib-defined event bus implementation for sending events :param run: The run which just finished :param state: The terminal state for the finished run + :param cursor: An SQLite connection cursor to re-use (optional) :param logger: An optionally-provided logger object. :return: """ @@ -224,6 +223,17 @@ def get_run_with_details( return cls.run_with_details_and_output_from_row(c, run, stream_content) return None + def set_run_log_name(self, run: Run, workflow_name: str): + # TODO: To avoid having multiple names, we should maybe only set this once? + self.cursor().execute("UPDATE runs SET run_log__name = ? WHERE id = ?", (workflow_name, run.run_id)) + self.commit() + + def set_run_log_command_and_celery_id(self, run: Run, cmd: Command, celery_id: int): + self.cursor().execute( + "UPDATE runs SET run_log__cmd = ?, run_log__celery_id = ? WHERE id = ?", + (" ".join(cmd), celery_id, run.run_id)) + self.commit() + @staticmethod def set_run_outputs(c: sqlite3.Cursor, run_id: str, outputs: dict[str, Any]): c.execute("UPDATE runs SET outputs = ? WHERE id = ?", (json.dumps(outputs), str(run_id)))