Skip to content

Commit

Permalink
more debug prints
Browse files Browse the repository at this point in the history
  • Loading branch information
dhirving committed Aug 29, 2024
1 parent eb62362 commit 823aa1b
Showing 1 changed file with 9 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -66,20 +66,23 @@
def query_execute(
request: QueryExecuteRequestModel, factory: Factory = Depends(factory_dependency)
) -> StreamingResponse:
print("top")
# Managing the lifetime of the query context object is a little tricky. We
# need to enter the context here, so that we can immediately deal with any
# exceptions raised by query set-up. We eventually transfer control to an
# iterator consumed by FastAPI's StreamingResponse handler, which will
# start iterating after this function returns. So we use this ExitStack
# instance to hand over the context manager to the iterator.
with ExitStack() as exit_stack:
print("before context")
ctx = exit_stack.enter_context(_get_query_context(factory, request.query))
spec = request.result_spec.to_result_spec(ctx.driver.universe)

# We write the response incrementally, one page at a time, as
# newline-separated chunks of JSON. This allows clients to start
# reading results earlier and prevents the server from exhausting
# all its memory buffering rows from large queries.
print("before stream")
output_generator = _stream_query_pages(
# Transfer control of the context manager to
# _stream_query_pages.
Expand All @@ -96,7 +99,9 @@ def query_execute(
def _retrieve_query_pages(ctx: _QueryContext, spec: ResultSpec) -> Iterator[QueryExecuteResultData]:
"""Execute the database query and and return pages of results."""
try:
print("before execute")
pages = ctx.driver.execute(spec, ctx.tree)
print("page1")
for page in pages:
yield convert_query_page(spec, page)
except ButlerUserError as e:
Expand Down Expand Up @@ -133,6 +138,7 @@ async def _stream_query_pages(
# Ensure that the database connection is cleaned up by taking control of
# exit_stack.
async with contextmanager_in_threadpool(exit_stack):
print("in context")
iterator = iterate_in_threadpool(_retrieve_query_pages(ctx, spec))
done = False
while not done:
Expand All @@ -154,16 +160,19 @@ async def _fetch_next_with_keepalives(
return a value. Yields `None` if there is nothing left to read from the
iterator.
"""
print("fetch_next")
try:
future = asyncio.ensure_future(anext(iterator, None))
ready = False
while not ready:
(finished_task, pending_task) = await asyncio.wait([future], timeout=15)
if pending_task:
print("timeout")

Check warning on line 170 in python/lsst/daf/butler/remote_butler/server/handlers/_external_query.py

View check run for this annotation

Codecov / codecov/patch

python/lsst/daf/butler/remote_butler/server/handlers/_external_query.py#L170

Added line #L170 was not covered by tests
# Hit the timeout, send a keep-alive and keep waiting.
yield QueryKeepAliveModel()

Check warning on line 172 in python/lsst/daf/butler/remote_butler/server/handlers/_external_query.py

View check run for this annotation

Codecov / codecov/patch

python/lsst/daf/butler/remote_butler/server/handlers/_external_query.py#L172

Added line #L172 was not covered by tests
else:
# The next value from the iterator is ready to read.
print("done")
ready = True
finally:
# Even if we get cancelled above, we need to wait for this iteration to
Expand Down

0 comments on commit 823aa1b

Please sign in to comment.