From cb01378faa0646adf8c1d9b3d6dac8d615aceeb8 Mon Sep 17 00:00:00 2001 From: John Davis Date: Fri, 12 Jan 2024 15:47:31 -0500 Subject: [PATCH 1/3] Rollback invalidated transaction --- lib/galaxy/jobs/handler.py | 6 +++++- lib/galaxy/jobs/runners/pulsar.py | 3 +++ lib/galaxy/model/base.py | 12 ++++++++++++ 3 files changed, 20 insertions(+), 1 deletion(-) diff --git a/lib/galaxy/jobs/handler.py b/lib/galaxy/jobs/handler.py index 2472f874be37..0b4819f684f6 100644 --- a/lib/galaxy/jobs/handler.py +++ b/lib/galaxy/jobs/handler.py @@ -37,7 +37,10 @@ ) from galaxy.jobs.mapper import JobNotReadyException from galaxy.managers.jobs import get_jobs_to_check_at_startup -from galaxy.model.base import transaction +from galaxy.model.base import ( + check_database_connection, + transaction, +) from galaxy.structured_app import MinimalManagerApp from galaxy.util import unicodify from galaxy.util.custom_logging import get_logger @@ -400,6 +403,7 @@ def __handle_waiting_jobs(self): the waiting queue. If the job has dependencies with errors, it is marked as having errors and removed from the queue. If the job belongs to an inactive user it is ignored. Otherwise, the job is dispatched. """ + check_database_connection(self.sa_session) # Pull all new jobs from the queue at once jobs_to_check = [] resubmit_jobs = [] diff --git a/lib/galaxy/jobs/runners/pulsar.py b/lib/galaxy/jobs/runners/pulsar.py index f9142ee8c59f..3925f4d8a7be 100644 --- a/lib/galaxy/jobs/runners/pulsar.py +++ b/lib/galaxy/jobs/runners/pulsar.py @@ -50,6 +50,7 @@ AsynchronousJobState, JobState, ) +from galaxy.model.base import check_database_connection from galaxy.tool_util.deps import dependencies from galaxy.util import ( galaxy_directory, @@ -273,6 +274,7 @@ def url_to_destination(self, url): return JobDestination(runner="pulsar", params=url_to_destination_params(url)) def check_watched_item(self, job_state): + check_database_connection(self.app.model.session()) if self.use_mq: # Might still need to check pod IPs. job_wrapper = job_state.job_wrapper @@ -972,6 +974,7 @@ def __async_update(self, full_status): galaxy_job_id = None remote_job_id = None try: + check_database_connection(self.sa_session) remote_job_id = full_status["job_id"] if len(remote_job_id) == 32: # It is a UUID - assign_ids = uuid in destination params... diff --git a/lib/galaxy/model/base.py b/lib/galaxy/model/base.py index 2aade3f4ff36..4c9e3b8082f7 100644 --- a/lib/galaxy/model/base.py +++ b/lib/galaxy/model/base.py @@ -58,6 +58,18 @@ def transaction(session: Union[scoped_session, Session, "SessionlessContext"]): yield +def check_database_connection(session): + """ + In the event of a database disconnect, if there exists an active database + transaction, that transaction becomes invalidated. Accessing the database + will raise sqlalchemy.exc.PendingRollbackError. This handles this situation + by rolling back the invalidated transaction. + Ref: https://docs.sqlalchemy.org/en/14/errors.html#can-t-reconnect-until-invalid-transaction-is-rolled-back + """ + if session and session.connection().invalidated: + session.rollback() + + # TODO: Refactor this to be a proper class, not a bunch. class ModelMapping(Bunch): def __init__(self, model_modules, engine): From cf53c05fd167caf44e318a9fd87afe9e0927d3d3 Mon Sep 17 00:00:00 2001 From: John Davis Date: Sat, 13 Jan 2024 11:01:27 -0500 Subject: [PATCH 2/3] Log an error message --- lib/galaxy/model/base.py | 1 + 1 file changed, 1 insertion(+) diff --git a/lib/galaxy/model/base.py b/lib/galaxy/model/base.py index 4c9e3b8082f7..829bc773ec6d 100644 --- a/lib/galaxy/model/base.py +++ b/lib/galaxy/model/base.py @@ -67,6 +67,7 @@ def check_database_connection(session): Ref: https://docs.sqlalchemy.org/en/14/errors.html#can-t-reconnect-until-invalid-transaction-is-rolled-back """ if session and session.connection().invalidated: + log.error("Database transaction rolled back due to invalid state.") session.rollback() From d1d8b08a20b9d3fe80b8998a35d6a4d97900d3b5 Mon Sep 17 00:00:00 2001 From: John Davis Date: Sat, 13 Jan 2024 16:17:42 -0500 Subject: [PATCH 3/3] Add one more check for invalidated transaction --- lib/galaxy/managers/base.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/lib/galaxy/managers/base.py b/lib/galaxy/managers/base.py index f3abe35e7ceb..15503678cb9a 100644 --- a/lib/galaxy/managers/base.py +++ b/lib/galaxy/managers/base.py @@ -54,7 +54,10 @@ model, ) from galaxy.model import tool_shed_install -from galaxy.model.base import transaction +from galaxy.model.base import ( + check_database_connection, + transaction, +) from galaxy.schema import ValueFilterQueryParams from galaxy.schema.fields import DecodedDatabaseIdField from galaxy.schema.storage_cleaner import ( @@ -315,6 +318,7 @@ def _one_with_recast_errors(self, query: Query) -> Query: :raises exceptions.ObjectNotFound: if no model is found :raises exceptions.InconsistentDatabase: if more than one model is found """ + check_database_connection(self.session()) # overridden to raise serializable errors try: return query.one()