Skip to content

Commit

Permalink
bypass process_event if there is not pipeline
Browse files Browse the repository at this point in the history
  • Loading branch information
ekneg54 committed Aug 26, 2024
1 parent 27dc305 commit c76868c
Show file tree
Hide file tree
Showing 2 changed files with 13 additions and 11 deletions.
2 changes: 1 addition & 1 deletion examples/exampledata/config/http_pipeline.yml
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,6 @@ output:
statistics.interval.ms: "60000"
queue.buffering.max.messages: "100000000"
queue.buffering.max.kbytes: "1048576"
queue.buffering.max.ms: "5000"
queue.buffering.max.ms: "10000"
batch.size: "1000000"
request.required.acks: "-1"
22 changes: 12 additions & 10 deletions logprep/framework/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -259,17 +259,19 @@ def process_pipeline(self) -> PipelineResult:
event = self._get_event()
if not event:
return None, None
result: PipelineResult = self.process_event(event)
if result.warnings:
self.logger.warning(",".join((str(warning) for warning in result.warnings)))
if result.errors:
self.logger.error(",".join((str(error) for error in result.errors)))
self._store_failed_event(result.errors, result.event_received, event)
return
if self._pipeline:
result: PipelineResult = self.process_event(event)
if result.warnings:
self.logger.warning(",".join((str(warning) for warning in result.warnings)))
if result.errors:
self.logger.error(",".join((str(error) for error in result.errors)))
self._store_failed_event(result.errors, result.event_received, event)
return
if self._output:
result_data = [res.data for res in result if res.data]
if result_data:
self._store_extra_data(itertools.chain(*result_data))
if self._pipeline:
result_data = [res.data for res in result if res.data]
if result_data:
self._store_extra_data(itertools.chain(*result_data))
if event:
self._store_event(event)
return result
Expand Down

0 comments on commit c76868c

Please sign in to comment.