From cb01378faa0646adf8c1d9b3d6dac8d615aceeb8 Mon Sep 17 00:00:00 2001 From: John Davis Date: Fri, 12 Jan 2024 15:47:31 -0500 Subject: [PATCH] Rollback invalidated transaction --- lib/galaxy/jobs/handler.py | 6 +++++- lib/galaxy/jobs/runners/pulsar.py | 3 +++ lib/galaxy/model/base.py | 12 ++++++++++++ 3 files changed, 20 insertions(+), 1 deletion(-) diff --git a/lib/galaxy/jobs/handler.py b/lib/galaxy/jobs/handler.py index 2472f874be37..0b4819f684f6 100644 --- a/lib/galaxy/jobs/handler.py +++ b/lib/galaxy/jobs/handler.py @@ -37,7 +37,10 @@ ) from galaxy.jobs.mapper import JobNotReadyException from galaxy.managers.jobs import get_jobs_to_check_at_startup -from galaxy.model.base import transaction +from galaxy.model.base import ( + check_database_connection, + transaction, +) from galaxy.structured_app import MinimalManagerApp from galaxy.util import unicodify from galaxy.util.custom_logging import get_logger @@ -400,6 +403,7 @@ def __handle_waiting_jobs(self): the waiting queue. If the job has dependencies with errors, it is marked as having errors and removed from the queue. If the job belongs to an inactive user it is ignored. Otherwise, the job is dispatched. """ + check_database_connection(self.sa_session) # Pull all new jobs from the queue at once jobs_to_check = [] resubmit_jobs = [] diff --git a/lib/galaxy/jobs/runners/pulsar.py b/lib/galaxy/jobs/runners/pulsar.py index f9142ee8c59f..3925f4d8a7be 100644 --- a/lib/galaxy/jobs/runners/pulsar.py +++ b/lib/galaxy/jobs/runners/pulsar.py @@ -50,6 +50,7 @@ AsynchronousJobState, JobState, ) +from galaxy.model.base import check_database_connection from galaxy.tool_util.deps import dependencies from galaxy.util import ( galaxy_directory, @@ -273,6 +274,7 @@ def url_to_destination(self, url): return JobDestination(runner="pulsar", params=url_to_destination_params(url)) def check_watched_item(self, job_state): + check_database_connection(self.app.model.session()) if self.use_mq: # Might still need to check pod IPs. job_wrapper = job_state.job_wrapper @@ -972,6 +974,7 @@ def __async_update(self, full_status): galaxy_job_id = None remote_job_id = None try: + check_database_connection(self.sa_session) remote_job_id = full_status["job_id"] if len(remote_job_id) == 32: # It is a UUID - assign_ids = uuid in destination params... diff --git a/lib/galaxy/model/base.py b/lib/galaxy/model/base.py index 2aade3f4ff36..4c9e3b8082f7 100644 --- a/lib/galaxy/model/base.py +++ b/lib/galaxy/model/base.py @@ -58,6 +58,18 @@ def transaction(session: Union[scoped_session, Session, "SessionlessContext"]): yield +def check_database_connection(session): + """ + In the event of a database disconnect, if there exists an active database + transaction, that transaction becomes invalidated. Accessing the database + will raise sqlalchemy.exc.PendingRollbackError. This handles this situation + by rolling back the invalidated transaction. + Ref: https://docs.sqlalchemy.org/en/14/errors.html#can-t-reconnect-until-invalid-transaction-is-rolled-back + """ + if session and session.connection().invalidated: + session.rollback() + + # TODO: Refactor this to be a proper class, not a bunch. class ModelMapping(Bunch): def __init__(self, model_modules, engine):