diff --git a/lib/galaxy/jobs/handler.py b/lib/galaxy/jobs/handler.py index 2472f874be37..0b4819f684f6 100644 --- a/lib/galaxy/jobs/handler.py +++ b/lib/galaxy/jobs/handler.py @@ -37,7 +37,10 @@ ) from galaxy.jobs.mapper import JobNotReadyException from galaxy.managers.jobs import get_jobs_to_check_at_startup -from galaxy.model.base import transaction +from galaxy.model.base import ( + check_database_connection, + transaction, +) from galaxy.structured_app import MinimalManagerApp from galaxy.util import unicodify from galaxy.util.custom_logging import get_logger @@ -400,6 +403,7 @@ def __handle_waiting_jobs(self): the waiting queue. If the job has dependencies with errors, it is marked as having errors and removed from the queue. If the job belongs to an inactive user it is ignored. Otherwise, the job is dispatched. """ + check_database_connection(self.sa_session) # Pull all new jobs from the queue at once jobs_to_check = [] resubmit_jobs = [] diff --git a/lib/galaxy/jobs/runners/pulsar.py b/lib/galaxy/jobs/runners/pulsar.py index f9142ee8c59f..3925f4d8a7be 100644 --- a/lib/galaxy/jobs/runners/pulsar.py +++ b/lib/galaxy/jobs/runners/pulsar.py @@ -50,6 +50,7 @@ AsynchronousJobState, JobState, ) +from galaxy.model.base import check_database_connection from galaxy.tool_util.deps import dependencies from galaxy.util import ( galaxy_directory, @@ -273,6 +274,7 @@ def url_to_destination(self, url): return JobDestination(runner="pulsar", params=url_to_destination_params(url)) def check_watched_item(self, job_state): + check_database_connection(self.app.model.session()) if self.use_mq: # Might still need to check pod IPs. job_wrapper = job_state.job_wrapper @@ -972,6 +974,7 @@ def __async_update(self, full_status): galaxy_job_id = None remote_job_id = None try: + check_database_connection(self.sa_session) remote_job_id = full_status["job_id"] if len(remote_job_id) == 32: # It is a UUID - assign_ids = uuid in destination params... diff --git a/lib/galaxy/managers/base.py b/lib/galaxy/managers/base.py index f3abe35e7ceb..15503678cb9a 100644 --- a/lib/galaxy/managers/base.py +++ b/lib/galaxy/managers/base.py @@ -54,7 +54,10 @@ model, ) from galaxy.model import tool_shed_install -from galaxy.model.base import transaction +from galaxy.model.base import ( + check_database_connection, + transaction, +) from galaxy.schema import ValueFilterQueryParams from galaxy.schema.fields import DecodedDatabaseIdField from galaxy.schema.storage_cleaner import ( @@ -315,6 +318,7 @@ def _one_with_recast_errors(self, query: Query) -> Query: :raises exceptions.ObjectNotFound: if no model is found :raises exceptions.InconsistentDatabase: if more than one model is found """ + check_database_connection(self.session()) # overridden to raise serializable errors try: return query.one() diff --git a/lib/galaxy/model/base.py b/lib/galaxy/model/base.py index 2aade3f4ff36..829bc773ec6d 100644 --- a/lib/galaxy/model/base.py +++ b/lib/galaxy/model/base.py @@ -58,6 +58,19 @@ def transaction(session: Union[scoped_session, Session, "SessionlessContext"]): yield +def check_database_connection(session): + """ + In the event of a database disconnect, if there exists an active database + transaction, that transaction becomes invalidated. Accessing the database + will raise sqlalchemy.exc.PendingRollbackError. This handles this situation + by rolling back the invalidated transaction. + Ref: https://docs.sqlalchemy.org/en/14/errors.html#can-t-reconnect-until-invalid-transaction-is-rolled-back + """ + if session and session.connection().invalidated: + log.error("Database transaction rolled back due to invalid state.") + session.rollback() + + # TODO: Refactor this to be a proper class, not a bunch. class ModelMapping(Bunch): def __init__(self, model_modules, engine):