diff --git a/tests/acceptance/test_error_output.py b/tests/acceptance/test_error_output.py index d7ac88d27..6ea357e72 100644 --- a/tests/acceptance/test_error_output.py +++ b/tests/acceptance/test_error_output.py @@ -59,7 +59,9 @@ def teardown_function(): stop_logprep() -def test_error_output_for_missing_hmac_target_field(tmp_path, config: Configuration): +def test_error_output_for_critical_input_error_with_missing_hmac_target_field( + tmp_path, config: Configuration +): input_path = Path(config.input["jsonl_input"]["documents_path"]) error_output_path = Path(config.error_output["jsonl"]["output_file"]) content = str(uuid.uuid4()) @@ -79,3 +81,23 @@ def test_error_output_for_missing_hmac_target_field(tmp_path, config: Configurat assert False, "Timeout reached" error_content = error_output_path.read_text(encoding="utf8") assert content in error_content + + +def test_error_output_errors_are_logged_if_error_output_has_an_error( + tmp_path, config: Configuration +): + config.input = { + "dummy": {"type": "dummy_input", "documents": [{"something": "yeah"}, "Exception"]} + } + config.error_output = {"dummy": {"type": "dummy_output", "exceptions": ["Exception"]}} + config.error_backlog_size = 1 + config.output.update({"kafka": {"type": "dummy_output", "default": False}}) + config_path = tmp_path / "generated_config.yml" + config_path.write_text(config.as_yaml(), encoding="utf-8") + proc = start_logprep(config_path) + wait_for_output( + proc, + ".*\[Error Event\] Couldn't enqueue error item due to:.*", + test_timeout=30, + forbidden_outputs=[], + ) diff --git a/tests/acceptance/util.py b/tests/acceptance/util.py index bbb20bca7..04de101df 100644 --- a/tests/acceptance/util.py +++ b/tests/acceptance/util.py @@ -261,18 +261,15 @@ def start_logprep(config_path: str, env: dict = None) -> subprocess.Popen: ) -def wait_for_output(proc, expected_output, test_timeout=10): +def wait_for_output(proc, expected_output, test_timeout=10, forbidden_outputs=None): + if forbidden_outputs is None: + forbidden_outputs = ["Invalid", "Exception", "critical", "Error", "ERROR"] + @timeout(test_timeout) def wait_for_output_inner( proc, expected_output, - forbidden_outputs=[ - "Invalid", - "Exception", - "critical", - "Error", - "ERROR", - ], + forbidden_outputs, ): output = proc.stdout.readline() while 1: @@ -282,8 +279,8 @@ def wait_for_output_inner( assert not re.search(forbidden_output, output.decode("utf8")), output output = proc.stdout.readline() - wait_for_output_inner(proc, expected_output) - time.sleep(0.1) # nosemgrep + wait_for_output_inner(proc, expected_output, forbidden_outputs) + time.sleep(0.1) def stop_logprep(proc=None): diff --git a/tests/unit/framework/test_pipeline_manager.py b/tests/unit/framework/test_pipeline_manager.py index 044ece235..d44d08dd3 100644 --- a/tests/unit/framework/test_pipeline_manager.py +++ b/tests/unit/framework/test_pipeline_manager.py @@ -3,7 +3,6 @@ # pylint: disable=attribute-defined-outside-init # pylint: disable=unnecessary-lambda-assignment import multiprocessing -import random from copy import deepcopy from logging import Logger from logging.config import dictConfig