Skip to content

Commit

Permalink
better query cancellation
Browse files Browse the repository at this point in the history
  • Loading branch information
dhirving committed Aug 30, 2024
1 parent 7efb63b commit 0f2afe9
Show file tree
Hide file tree
Showing 4 changed files with 61 additions and 8 deletions.
4 changes: 4 additions & 0 deletions python/lsst/daf/butler/registry/databases/postgresql.py
Original file line number Diff line number Diff line change
Expand Up @@ -396,6 +396,10 @@ def apply_any_aggregate(self, column: sqlalchemy.ColumnElement[Any]) -> sqlalche
# would become a String column in the output.
return sqlalchemy.cast(sqlalchemy.func.any_value(column), column.type)

def _cancel_running_query(self, connection: sqlalchemy.engine.interfaces.DBAPIConnection) -> None:
# This is a psycopg2-specific extension method.
connection.cancel() # type: ignore

Check warning on line 401 in python/lsst/daf/butler/registry/databases/postgresql.py

View check run for this annotation

Codecov / codecov/patch

python/lsst/daf/butler/registry/databases/postgresql.py#L401

Added line #L401 was not covered by tests


class _RangeTimespanType(sqlalchemy.TypeDecorator):
"""A single-column `Timespan` representation usable only with
Expand Down
4 changes: 4 additions & 0 deletions python/lsst/daf/butler/registry/databases/sqlite.py
Original file line number Diff line number Diff line change
Expand Up @@ -412,6 +412,10 @@ def apply_any_aggregate(self, column: sqlalchemy.ColumnElement[Any]) -> sqlalche
# arbitrary value picked if there is more than one.
return column

def _cancel_running_query(self, connection: sqlalchemy.engine.interfaces.DBAPIConnection) -> None:
# This is a pysqlite-specific extension method.
connection.interrupt() # type: ignore

Check warning on line 417 in python/lsst/daf/butler/registry/databases/sqlite.py

View check run for this annotation

Codecov / codecov/patch

python/lsst/daf/butler/registry/databases/sqlite.py#L417

Added line #L417 was not covered by tests

filename: str | None
"""Name of the file this database is connected to (`str` or `None`).
Expand Down
24 changes: 24 additions & 0 deletions python/lsst/daf/butler/registry/interfaces/_database.py
Original file line number Diff line number Diff line change
Expand Up @@ -1995,6 +1995,30 @@ def apply_any_aggregate(self, column: sqlalchemy.ColumnElement[Any]) -> sqlalche
"""
raise NotImplementedError()

def cancel_running_query(self) -> None:
"""Attempt to cancel an in-progress query that is using this database
connection.
Notes
-----
If no query is active, does nothing. This may be called from a
different thread than the one performing the query. The underlying
database driver functions for cancellation are generally not guaranteed
to succeed.
"""
connection = self._session_connection

Check warning on line 2009 in python/lsst/daf/butler/registry/interfaces/_database.py

View check run for this annotation

Codecov / codecov/patch

python/lsst/daf/butler/registry/interfaces/_database.py#L2009

Added line #L2009 was not covered by tests
if connection is not None:
db = connection.connection.dbapi_connection

Check warning on line 2011 in python/lsst/daf/butler/registry/interfaces/_database.py

View check run for this annotation

Codecov / codecov/patch

python/lsst/daf/butler/registry/interfaces/_database.py#L2011

Added line #L2011 was not covered by tests
if db is not None:
self._cancel_running_query(db)

Check warning on line 2013 in python/lsst/daf/butler/registry/interfaces/_database.py

View check run for this annotation

Codecov / codecov/patch

python/lsst/daf/butler/registry/interfaces/_database.py#L2013

Added line #L2013 was not covered by tests

@abstractmethod
def _cancel_running_query(self, connection: sqlalchemy.engine.interfaces.DBAPIConnection) -> None:
"""Driver-specific inner implementation for ``cancel_running_query``
above.
"""
raise NotImplementedError()

origin: int
"""An integer ID that should be used as the default for any datasets,
quanta, or other entities that use a (autoincrement, origin) compound
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,8 @@
)

from ...._exceptions import ButlerUserError
from ....queries.driver import QueryDriver, QueryTree, ResultSpec
from ....direct_query_driver import DirectQueryDriver
from ....queries.driver import QueryTree, ResultSpec
from ..._errors import serialize_butler_user_error
from .._dependencies import factory_dependency
from .._factory import Factory
Expand Down Expand Up @@ -98,25 +99,45 @@ async def _stream_query_pages(request: QueryExecuteRequestModel, factory: Factor
async with asyncio.TaskGroup() as tg:
# Run a background task to read from the DB and insert the result pages
# into a queue.
tg.create_task(_enqueue_query_pages(queue, request, factory))
tg.create_task(_execute_query(queue, request, factory))
# Read the result pages from the queue and send them to the client,
# inserting a keep-alive message every 15 seconds if we are waiting a
# long time for the database.
async for message in _dequeue_query_pages_with_keepalive(queue):
yield message.model_dump_json() + "\n"

print("closed")

async def _enqueue_query_pages(

async def _execute_query(
queue: asyncio.Queue[QueryExecuteResultData | None], request: QueryExecuteRequestModel, factory: Factory
) -> None:
"""Set up a QueryDriver to run the query, and copy the results into a
queue. Send `None` to the queue when there is no more data to read.
"""
async with contextmanager_in_threadpool(_get_query_context(factory, request.query)) as ctx:
# Do the actual work in another task so we can explicitly handle
# cancellation. The database calls in _retrieve_query_pages can
# block for a very long time waiting for a response from the DB.
# Since that is a synchronous call in another thread, the `await`
# can't be cancelled until the sync call finishes. So we have to
# forcibly cancel the database query to get the sync call to abort.
async with asyncio.TaskGroup() as tg:
task = tg.create_task(_enqueue_query_pages(queue, request, ctx))
try:
await asyncio.wait_for(task, None)
except asyncio.CancelledError:
ctx.driver.db.cancel_running_query()
raise

Check warning on line 131 in python/lsst/daf/butler/remote_butler/server/handlers/_external_query.py

View check run for this annotation

Codecov / codecov/patch

python/lsst/daf/butler/remote_butler/server/handlers/_external_query.py#L129-L131

Added lines #L129 - L131 were not covered by tests


async def _enqueue_query_pages(
queue: asyncio.Queue[QueryExecuteResultData | None], request: QueryExecuteRequestModel, ctx: _QueryContext
) -> None:
try:
async with contextmanager_in_threadpool(_get_query_context(factory, request.query)) as ctx:
spec = request.result_spec.to_result_spec(ctx.driver.universe)
async for page in iterate_in_threadpool(_retrieve_query_pages(ctx, spec)):
await queue.put(page)
spec = request.result_spec.to_result_spec(ctx.driver.universe)
async for page in iterate_in_threadpool(_retrieve_query_pages(ctx, spec)):
await queue.put(page)
except ButlerUserError as e:
# If a user-facing error occurs, serialize it and send it to the
# client.
Expand Down Expand Up @@ -219,5 +240,5 @@ def _get_query_context(factory: Factory, query: QueryInputs) -> Iterator[_QueryC


class _QueryContext(NamedTuple):
driver: QueryDriver
driver: DirectQueryDriver
tree: QueryTree

0 comments on commit 0f2afe9

Please sign in to comment.