Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cancel and fail downloads if exception happened #2710

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions connectors/es/sink.py
Original file line number Diff line number Diff line change
Expand Up @@ -598,8 +598,18 @@ async def get_docs(self, generator, skip_unchanged_documents=False):
"doc": doc,
}
)
# We try raising every loop to not miss a moment when
# too many errors happened when downloading
lazy_downloads.raise_any_exception()

await asyncio.sleep(0)

# Sit and wait until an error happens
await lazy_downloads.join(raise_on_error=True)
except Exception as ex:
self._logger.error(f"Extractor failed with an error: {ex}")
lazy_downloads.cancel()
raise
finally:
# wait for all downloads to be finished
await lazy_downloads.join()
Comment on lines +609 to 615
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I couldn't find a clear answer on this; is the finally block still called if an error is raised in the caught exception block?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yup, see this for example:

image

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In that case, the finally block is awaiting all of the downloads even if there's an error. But I guess lazy_downloads.cancel() is wiping everything, so that's okay?
(Asking because the PR title is Cancel and fail downloads if exception happened but it looks like it will still eventually await the downloads).

Copy link
Member Author

@artem-shelkovnikov artem-shelkovnikov Jul 17, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah it's intended:

Cancellation does not mean that the task is immediately dead - we need to make sure that the task has an opportunity to gracefully cancel.

For example if the cancelled task catches asyncio.CancelledError and does something, then it will still need to be awaited.

We use similar logic here:

for task in pending:
task.cancel()
try:
await task
except asyncio.CancelledError:
logger.error("Service did not handle cancellation gracefully.")

task.cancelI() injects the throw of asyncio.CancelledError where it can (pretty much any await or async with statement). After that the code can catch it or not. The except part here is checking that task re-raised asyncio.CancelledError cause it could not handle it - there was no except asyncio.CancelledError inside task or this error was re-raised.

Hope the description makes sense, if not I can prepare an illustrative example of this :)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's really helpful, thanks!

Expand Down
37 changes: 37 additions & 0 deletions tests/test_sink.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@

DOC_TWO = {"_id": 2, "_timestamp": TIMESTAMP}
DOC_THREE = {"_id": 3, "_timestamp": TIMESTAMP}
DOC_FOUR = {"_id": 4, "_timestamp": TIMESTAMP}

BULK_ACTION_ERROR = "some error"

Expand Down Expand Up @@ -433,6 +434,14 @@ async def lazy_download(**kwargs):
return lazy_download


def crashing_lazy_download_fake():
async def lazy_download(**kwargs):
msg = "Could not download"
raise Exception(msg)

return lazy_download


def queue_called_with_operations(queue, operations):
expected_calls = [call(operation) for operation in operations]
actual_calls = queue.put.call_args_list
Expand Down Expand Up @@ -1282,6 +1291,34 @@ async def test_extractor_put_doc():
queue.put.assert_awaited_once_with(doc)


@pytest.mark.asyncio
@mock.patch(
"connectors.es.management_client.ESManagementClient.yield_existing_documents_metadata"
)
@mock.patch("connectors.utils.ConcurrentTasks.cancel")
async def test_extractor_get_docs_when_downloads_fail(
yield_existing_documents_metadata, concurrent_tasks_cancel
):
queue = await queue_mock()

yield_existing_documents_metadata.return_value = AsyncIterator([])

docs_from_source = [
(DOC_ONE, crashing_lazy_download_fake(), "index"),
(DOC_TWO, crashing_lazy_download_fake(), "index"),
(DOC_THREE, crashing_lazy_download_fake(), "index"),
(DOC_FOUR, crashing_lazy_download_fake(), "index"),
]
# deep copying docs is needed as get_docs mutates the document ids which has side effects on other test
# instances
doc_generator = AsyncIterator([deepcopy(doc) for doc in docs_from_source])

extractor = await setup_extractor(queue, content_extraction_enabled=True)

await extractor.run(doc_generator, JobType.FULL)
concurrent_tasks_cancel.assert_called_once()


@pytest.mark.asyncio
async def test_force_canceled_extractor_put_doc():
doc = {"id": 123}
Expand Down
Loading