diff --git a/logprep/framework/pipeline.py b/logprep/framework/pipeline.py index be1fde91a..06d1894c4 100644 --- a/logprep/framework/pipeline.py +++ b/logprep/framework/pipeline.py @@ -268,7 +268,6 @@ def process_pipeline(self) -> PipelineResult: self.logger.error(",".join((str(error) for error in result.errors))) self._store_failed_event(result.errors, result.event_received, event) return - if self._output: if self._pipeline: result_data = [res.data for res in result if res.data] diff --git a/tests/unit/framework/test_pipeline.py b/tests/unit/framework/test_pipeline.py index 2f30825a6..8a1594430 100644 --- a/tests/unit/framework/test_pipeline.py +++ b/tests/unit/framework/test_pipeline.py @@ -625,6 +625,32 @@ def test_pipeline_result_provides_event_received(self, _): assert result.event_received == {"some": "event"}, "received event is as expected" assert result.event == {"some": "event", "field": "foo"}, "processed event is as expected" + def test_process_event_can_be_bypassed_with_no_pipeline(self, _): + self.pipeline._setup() + input_config = { + "testinput": { + "type": "http_input", + "uvicorn_config": { + "host": "127.0.0.1", + "port": 9000, + "ssl_certfile": "tests/testdata/acceptance/http_input/cert.crt", + "ssl_keyfile": "tests/testdata/acceptance/http_input/cert.key", + }, + "endpoints": {"/json": "json", "/jsonl": "jsonl", "/plaintext": "plaintext"}, + } + } + self.pipeline._input = original_create(input_config) + self.pipeline._input.pipeline_index = 1 + self.pipeline._input.messages = multiprocessing.Queue(-1) + self.pipeline._input.setup() + self.pipeline._input.messages.put({"message": "test message"}) + self.pipeline._pipeline = None + with mock.patch("logprep.framework.pipeline.Pipeline.process_event") as mock_process_event: + mock_process_event.return_value = None + result = self.pipeline.process_pipeline() + mock_process_event.assert_not_called() + assert isinstance(result, type(None)) + class TestPipelineWithActualInput: def setup_method(self):