Skip to content

Commit

Permalink
Rollback invalidated transaction
Browse files Browse the repository at this point in the history
  • Loading branch information
jdavcs committed Jan 12, 2024
1 parent fca7e3a commit cb01378
Show file tree
Hide file tree
Showing 3 changed files with 20 additions and 1 deletion.
6 changes: 5 additions & 1 deletion lib/galaxy/jobs/handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,10 @@
)
from galaxy.jobs.mapper import JobNotReadyException
from galaxy.managers.jobs import get_jobs_to_check_at_startup
from galaxy.model.base import transaction
from galaxy.model.base import (
check_database_connection,
transaction,
)
from galaxy.structured_app import MinimalManagerApp
from galaxy.util import unicodify
from galaxy.util.custom_logging import get_logger
Expand Down Expand Up @@ -400,6 +403,7 @@ def __handle_waiting_jobs(self):
the waiting queue. If the job has dependencies with errors, it is marked as having errors and removed from the
queue. If the job belongs to an inactive user it is ignored. Otherwise, the job is dispatched.
"""
check_database_connection(self.sa_session)
# Pull all new jobs from the queue at once
jobs_to_check = []
resubmit_jobs = []
Expand Down
3 changes: 3 additions & 0 deletions lib/galaxy/jobs/runners/pulsar.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
AsynchronousJobState,
JobState,
)
from galaxy.model.base import check_database_connection
from galaxy.tool_util.deps import dependencies
from galaxy.util import (
galaxy_directory,
Expand Down Expand Up @@ -273,6 +274,7 @@ def url_to_destination(self, url):
return JobDestination(runner="pulsar", params=url_to_destination_params(url))

def check_watched_item(self, job_state):
check_database_connection(self.app.model.session())
if self.use_mq:
# Might still need to check pod IPs.
job_wrapper = job_state.job_wrapper
Expand Down Expand Up @@ -972,6 +974,7 @@ def __async_update(self, full_status):
galaxy_job_id = None
remote_job_id = None
try:
check_database_connection(self.sa_session)
remote_job_id = full_status["job_id"]
if len(remote_job_id) == 32:
# It is a UUID - assign_ids = uuid in destination params...
Expand Down
12 changes: 12 additions & 0 deletions lib/galaxy/model/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,18 @@ def transaction(session: Union[scoped_session, Session, "SessionlessContext"]):
yield


def check_database_connection(session):
"""
In the event of a database disconnect, if there exists an active database
transaction, that transaction becomes invalidated. Accessing the database
will raise sqlalchemy.exc.PendingRollbackError. This handles this situation
by rolling back the invalidated transaction.
Ref: https://docs.sqlalchemy.org/en/14/errors.html#can-t-reconnect-until-invalid-transaction-is-rolled-back
"""
if session and session.connection().invalidated:
session.rollback()


# TODO: Refactor this to be a proper class, not a bunch.
class ModelMapping(Bunch):
def __init__(self, model_modules, engine):
Expand Down

0 comments on commit cb01378

Please sign in to comment.