diff --git a/connectors/es/sink.py b/connectors/es/sink.py index 8a3b8590d..4fa0051e1 100644 --- a/connectors/es/sink.py +++ b/connectors/es/sink.py @@ -598,8 +598,18 @@ async def get_docs(self, generator, skip_unchanged_documents=False): "doc": doc, } ) + # We try raising every loop to not miss a moment when + # too many errors happened when downloading + lazy_downloads.raise_any_exception() await asyncio.sleep(0) + + # Sit and wait until an error happens + await lazy_downloads.join(raise_on_error=True) + except Exception as ex: + self._logger.error(f"Extractor failed with an error: {ex}") + lazy_downloads.cancel() + raise finally: # wait for all downloads to be finished await lazy_downloads.join() diff --git a/tests/test_sink.py b/tests/test_sink.py index dc5dda00f..6c35cba5d 100644 --- a/tests/test_sink.py +++ b/tests/test_sink.py @@ -60,6 +60,7 @@ DOC_TWO = {"_id": 2, "_timestamp": TIMESTAMP} DOC_THREE = {"_id": 3, "_timestamp": TIMESTAMP} +DOC_FOUR = {"_id": 4, "_timestamp": TIMESTAMP} BULK_ACTION_ERROR = "some error" @@ -433,6 +434,14 @@ async def lazy_download(**kwargs): return lazy_download +def crashing_lazy_download_fake(): + async def lazy_download(**kwargs): + msg = "Could not download" + raise Exception(msg) + + return lazy_download + + def queue_called_with_operations(queue, operations): expected_calls = [call(operation) for operation in operations] actual_calls = queue.put.call_args_list @@ -1282,6 +1291,34 @@ async def test_extractor_put_doc(): queue.put.assert_awaited_once_with(doc) +@pytest.mark.asyncio +@mock.patch( + "connectors.es.management_client.ESManagementClient.yield_existing_documents_metadata" +) +@mock.patch("connectors.utils.ConcurrentTasks.cancel") +async def test_extractor_get_docs_when_downloads_fail( + yield_existing_documents_metadata, concurrent_tasks_cancel +): + queue = await queue_mock() + + yield_existing_documents_metadata.return_value = AsyncIterator([]) + + docs_from_source = [ + (DOC_ONE, crashing_lazy_download_fake(), "index"), + (DOC_TWO, crashing_lazy_download_fake(), "index"), + (DOC_THREE, crashing_lazy_download_fake(), "index"), + (DOC_FOUR, crashing_lazy_download_fake(), "index"), + ] + # deep copying docs is needed as get_docs mutates the document ids which has side effects on other test + # instances + doc_generator = AsyncIterator([deepcopy(doc) for doc in docs_from_source]) + + extractor = await setup_extractor(queue, content_extraction_enabled=True) + + await extractor.run(doc_generator, JobType.FULL) + concurrent_tasks_cancel.assert_called_once() + + @pytest.mark.asyncio async def test_force_canceled_extractor_put_doc(): doc = {"id": 123}