Skip to content

Commit

Permalink
Merge pull request galaxyproject#17280 from jdavcs/23.2_pendingrollba…
Browse files Browse the repository at this point in the history
…ckerror

[23.2] Rollback invalidated transaction
  • Loading branch information
natefoo authored Jan 13, 2024
2 parents fca7e3a + d1d8b08 commit 8ae1db1
Show file tree
Hide file tree
Showing 4 changed files with 26 additions and 2 deletions.
6 changes: 5 additions & 1 deletion lib/galaxy/jobs/handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,10 @@
)
from galaxy.jobs.mapper import JobNotReadyException
from galaxy.managers.jobs import get_jobs_to_check_at_startup
from galaxy.model.base import transaction
from galaxy.model.base import (
check_database_connection,
transaction,
)
from galaxy.structured_app import MinimalManagerApp
from galaxy.util import unicodify
from galaxy.util.custom_logging import get_logger
Expand Down Expand Up @@ -400,6 +403,7 @@ def __handle_waiting_jobs(self):
the waiting queue. If the job has dependencies with errors, it is marked as having errors and removed from the
queue. If the job belongs to an inactive user it is ignored. Otherwise, the job is dispatched.
"""
check_database_connection(self.sa_session)
# Pull all new jobs from the queue at once
jobs_to_check = []
resubmit_jobs = []
Expand Down
3 changes: 3 additions & 0 deletions lib/galaxy/jobs/runners/pulsar.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
AsynchronousJobState,
JobState,
)
from galaxy.model.base import check_database_connection
from galaxy.tool_util.deps import dependencies
from galaxy.util import (
galaxy_directory,
Expand Down Expand Up @@ -273,6 +274,7 @@ def url_to_destination(self, url):
return JobDestination(runner="pulsar", params=url_to_destination_params(url))

def check_watched_item(self, job_state):
check_database_connection(self.app.model.session())
if self.use_mq:
# Might still need to check pod IPs.
job_wrapper = job_state.job_wrapper
Expand Down Expand Up @@ -972,6 +974,7 @@ def __async_update(self, full_status):
galaxy_job_id = None
remote_job_id = None
try:
check_database_connection(self.sa_session)
remote_job_id = full_status["job_id"]
if len(remote_job_id) == 32:
# It is a UUID - assign_ids = uuid in destination params...
Expand Down
6 changes: 5 additions & 1 deletion lib/galaxy/managers/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,10 @@
model,
)
from galaxy.model import tool_shed_install
from galaxy.model.base import transaction
from galaxy.model.base import (
check_database_connection,
transaction,
)
from galaxy.schema import ValueFilterQueryParams
from galaxy.schema.fields import DecodedDatabaseIdField
from galaxy.schema.storage_cleaner import (
Expand Down Expand Up @@ -315,6 +318,7 @@ def _one_with_recast_errors(self, query: Query) -> Query:
:raises exceptions.ObjectNotFound: if no model is found
:raises exceptions.InconsistentDatabase: if more than one model is found
"""
check_database_connection(self.session())
# overridden to raise serializable errors
try:
return query.one()
Expand Down
13 changes: 13 additions & 0 deletions lib/galaxy/model/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,19 @@ def transaction(session: Union[scoped_session, Session, "SessionlessContext"]):
yield


def check_database_connection(session):
"""
In the event of a database disconnect, if there exists an active database
transaction, that transaction becomes invalidated. Accessing the database
will raise sqlalchemy.exc.PendingRollbackError. This handles this situation
by rolling back the invalidated transaction.
Ref: https://docs.sqlalchemy.org/en/14/errors.html#can-t-reconnect-until-invalid-transaction-is-rolled-back
"""
if session and session.connection().invalidated:
log.error("Database transaction rolled back due to invalid state.")
session.rollback()


# TODO: Refactor this to be a proper class, not a bunch.
class ModelMapping(Bunch):
def __init__(self, model_modules, engine):
Expand Down

0 comments on commit 8ae1db1

Please sign in to comment.