diff --git a/examples/exampledata/config/http_pipeline.yml b/examples/exampledata/config/http_pipeline.yml index 5a63faca4..909b28499 100644 --- a/examples/exampledata/config/http_pipeline.yml +++ b/examples/exampledata/config/http_pipeline.yml @@ -60,6 +60,6 @@ output: statistics.interval.ms: "60000" queue.buffering.max.messages: "100000000" queue.buffering.max.kbytes: "1048576" - queue.buffering.max.ms: "5000" + queue.buffering.max.ms: "10000" batch.size: "1000000" request.required.acks: "-1" diff --git a/logprep/framework/pipeline.py b/logprep/framework/pipeline.py index 5e5ffd751..1d7a35ea6 100644 --- a/logprep/framework/pipeline.py +++ b/logprep/framework/pipeline.py @@ -259,17 +259,19 @@ def process_pipeline(self) -> PipelineResult: event = self._get_event() if not event: return None, None - result: PipelineResult = self.process_event(event) - if result.warnings: - self.logger.warning(",".join((str(warning) for warning in result.warnings))) - if result.errors: - self.logger.error(",".join((str(error) for error in result.errors))) - self._store_failed_event(result.errors, result.event_received, event) - return + if self._pipeline: + result: PipelineResult = self.process_event(event) + if result.warnings: + self.logger.warning(",".join((str(warning) for warning in result.warnings))) + if result.errors: + self.logger.error(",".join((str(error) for error in result.errors))) + self._store_failed_event(result.errors, result.event_received, event) + return if self._output: - result_data = [res.data for res in result if res.data] - if result_data: - self._store_extra_data(itertools.chain(*result_data)) + if self._pipeline: + result_data = [res.data for res in result if res.data] + if result_data: + self._store_extra_data(itertools.chain(*result_data)) if event: self._store_event(event) return result