Skip to content

Commit

Permalink
refact: change a state update to use db func
Browse files Browse the repository at this point in the history
  • Loading branch information
davidlougheed committed Jul 12, 2023
1 parent 96beb14 commit 556e865
Show file tree
Hide file tree
Showing 3 changed files with 10 additions and 8 deletions.
2 changes: 1 addition & 1 deletion bento_wes/backends/_wes_backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -262,7 +262,7 @@ def _update_run_state_and_commit(self, run_id: Union[uuid.UUID, str], state: str
:param run_id: The ID of the run whose state is getting updated
:param state: The value to set the run's current state to
"""
update_run_state_and_commit(self.db, self.db.cursor(), self.event_bus, run_id, state)
update_run_state_and_commit(self.db, self.db.cursor(), run_id, state, event_bus=self.event_bus)

def _finish_run_and_clean_up(self, run: dict, state: str) -> None:
"""
Expand Down
8 changes: 5 additions & 3 deletions bento_wes/db.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ def finish_run(

# Explicitly don't commit here to sync with state update
c.execute("UPDATE runs SET run_log__end_time = ? WHERE id = ?", (end_time, run_id))
update_run_state_and_commit(db, c, event_bus, run_id, state, logger=logger)
update_run_state_and_commit(db, c, run_id, state, event_bus=event_bus, logger=logger)

if logger:
logger.info(f"Run {run_id} finished with state {state} at {end_time}")
Expand Down Expand Up @@ -225,13 +225,15 @@ def get_run_details(c: sqlite3.Cursor, run_id: uuid.UUID | str) -> tuple[None, s
def update_run_state_and_commit(
db: sqlite3.Connection,
c: sqlite3.Cursor,
event_bus: EventBus,
run_id: uuid.UUID | str,
state: str,
event_bus: EventBus | None = None,
logger: logging.Logger | None = None,
publish_event: bool = True,
):
if logger:
logger.info(f"Updating run state of {run_id} to {state}")
c.execute("UPDATE runs SET state = ? WHERE id = ?", (state, str(run_id)))
db.commit()
event_bus.publish_service_event(SERVICE_ARTIFACT, EVENT_WES_RUN_UPDATED, get_run_details(c, run_id)[0])
if event_bus and publish_event:
event_bus.publish_service_event(SERVICE_ARTIFACT, EVENT_WES_RUN_UPDATED, get_run_details(c, run_id)[0])
8 changes: 4 additions & 4 deletions bento_wes/runs.py
Original file line number Diff line number Diff line change
Expand Up @@ -213,8 +213,8 @@ def _create_run(db: sqlite3.Connection, c: sqlite3.Cursor) -> Response:

# TODO: figure out timeout
# TODO: retry policy
c.execute("UPDATE runs SET state = ? WHERE id = ?", (states.STATE_QUEUED, str(run_id)))
db.commit()

update_run_state_and_commit(db, c, run_id, states.STATE_QUEUED, logger=logger, publish_event=False)

run_workflow.delay(run_id)

Expand Down Expand Up @@ -383,7 +383,7 @@ def perform_run_cancel():
event_bus = get_flask_event_bus()

# TODO: terminate=True might be iffy
update_run_state_and_commit(db, c, event_bus, run["id"], states.STATE_CANCELING)
update_run_state_and_commit(db, c, run["id"], states.STATE_CANCELING, event_bus=event_bus)
celery.control.revoke(celery_id, terminate=True) # Remove from queue if there, terminate if running

# TODO: wait for revocation / failure and update status...
Expand All @@ -393,7 +393,7 @@ def perform_run_cancel():
if not current_app.config["BENTO_DEBUG"]:
shutil.rmtree(run_dir, ignore_errors=True)

update_run_state_and_commit(db, c, event_bus, run["id"], states.STATE_CANCELED)
update_run_state_and_commit(db, c, run["id"], states.STATE_CANCELED, event_bus=event_bus)

return current_app.response_class(status=204) # TODO: Better response

Expand Down

0 comments on commit 556e865

Please sign in to comment.