Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add abiltity to configure error output #668

Merged
merged 91 commits into from
Oct 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
91 commits
Select commit Hold shift + click to select a range
0b62cff
add abiltity to configure error output
ekneg54 Sep 16, 2024
43dd315
remove store_failed and error_topic
ekneg54 Sep 16, 2024
d4504be
add error queue in pipeline_manager
ekneg54 Sep 16, 2024
28019da
refactor pipeline.py
ekneg54 Sep 18, 2024
1f5317c
add tests for ComponentQueueListener
ekneg54 Sep 19, 2024
0f177fa
add wait for erorr output health
ekneg54 Sep 20, 2024
102d246
remove double log message
ekneg54 Sep 21, 2024
638f256
add test for infinite restart
ekneg54 Sep 23, 2024
915bf6e
adjust examples
ekneg54 Sep 23, 2024
ae0574e
add more result documentation
ekneg54 Sep 23, 2024
fba1f16
fix some tests
ekneg54 Sep 24, 2024
607eef3
move serialisation to enqueue_error
ekneg54 Sep 24, 2024
bc10c7d
add tests for enqueue_error
ekneg54 Sep 24, 2024
a61185d
add developer documentation
ekneg54 Sep 28, 2024
210e013
Refactor Kafka container command in docker-compose.yml
ekneg54 Sep 28, 2024
270b162
Rewrite acceptance tests
ekneg54 Sep 28, 2024
d2d9af3
make hmac not return non_critical_error_message
ekneg54 Sep 28, 2024
de0047f
add error_output_queue
ekneg54 Sep 28, 2024
753b3a4
change order of except blocks in pipeline
ekneg54 Sep 30, 2024
d670bee
change CriticalOutputError message and signature
ekneg54 Sep 30, 2024
48a5ec9
add handling of critical output errors during bulk indexing
ekneg54 Sep 30, 2024
e61f2a1
add event to critical output error raised during opensearch bulk oper…
ekneg54 Sep 30, 2024
4088ad7
add matching errors in enqueue_error
ekneg54 Sep 30, 2024
0ba27e5
add matching errors in enqueue_error
ekneg54 Sep 30, 2024
c5a0c2e
refactor opensearch output connector
ekneg54 Sep 30, 2024
993db77
complete item handling in enqueue_error
ekneg54 Oct 1, 2024
d4b2665
add tests for clearing backlogs
ekneg54 Oct 1, 2024
02e6e4c
fix componentqueuelistener tests
ekneg54 Oct 1, 2024
a9b8e99
ensure raw_input is copied
ekneg54 Oct 1, 2024
915a34e
remove version from compose file due to deprecation warning
ekneg54 Oct 1, 2024
098c9c0
add debugging and additional tests
ekneg54 Oct 1, 2024
6f541cf
fix processor tests in cause of changing processingcrititcalerror
ekneg54 Oct 1, 2024
5061aaa
Working error output start bahavior
ekneg54 Oct 1, 2024
28405a2
fix pipeline_manager tests
ekneg54 Oct 1, 2024
2d5bf03
remove connection between input and output
ekneg54 Oct 1, 2024
ac5c9ca
add tests for shutdown behavior
ekneg54 Oct 1, 2024
c8c7325
update changelog
ekneg54 Oct 1, 2024
85456fc
fix deadlock in pipeline_manager tests
ekneg54 Oct 1, 2024
ccaf128
fix test for kafka output connector
ekneg54 Oct 1, 2024
cbbe7df
fix test for injecting sentinal
ekneg54 Oct 1, 2024
aec7bc4
fix run_logprep tests
ekneg54 Oct 1, 2024
0fc5751
fix linting for Value
ekneg54 Oct 1, 2024
33814c1
remove output from revoke callback
ekneg54 Oct 1, 2024
f17571e
add queuelistener shutdown behavior
ekneg54 Oct 2, 2024
f40bbc9
add tests for shutdown behavior to pipeline_manager
ekneg54 Oct 2, 2024
baadb39
remove opensearch retry tests
ekneg54 Oct 2, 2024
1e1e57f
remove calling batch_finished_callback in store method
ekneg54 Oct 2, 2024
7634333
add opensearch options
ekneg54 Oct 2, 2024
c9feb3f
readd max_retries option
ekneg54 Oct 3, 2024
03e2069
add option to configure error backlog size
ekneg54 Oct 3, 2024
31aa074
add unknown error handling to opensearch output
ekneg54 Oct 3, 2024
0665cfb
fix Changelog
ekneg54 Oct 3, 2024
6e8efec
bump falcon dependency
ekneg54 Oct 3, 2024
26fab9e
make consumer instance id constant for a pipeline process
ekneg54 Oct 9, 2024
4661308
add pid to consumerinstanceid
ekneg54 Oct 9, 2024
81ababa
fix tests for http input and confluent kafka outptu
ekneg54 Oct 9, 2024
0f183a1
fix failed event metric
ekneg54 Oct 9, 2024
2669fee
rewrite failed events metric counting
ekneg54 Oct 10, 2024
2b0465f
add error_output config options to helm chart
ekneg54 Oct 10, 2024
fc81731
add error queue size metric
ekneg54 Oct 10, 2024
04a2373
better opensearch error messages
ekneg54 Oct 14, 2024
fd9ff13
make implementation for queuelistener configurable
ekneg54 Oct 14, 2024
0612bc0
readd event to error event
ekneg54 Oct 15, 2024
01d4651
remove chuck_size from bulk call
ekneg54 Oct 15, 2024
1f5e84d
add error health to healthchecks
ekneg54 Oct 21, 2024
1e150eb
fix typing
ekneg54 Oct 21, 2024
a4a1b23
Refactor test names and improve error logging
dtrai2 Oct 24, 2024
d42b3f0
fix black
dtrai2 Oct 24, 2024
6acb311
refactor and clean up OutputQueueListener
dtrai2 Oct 24, 2024
3ea7916
rename get_component_instance to get_output_instance
dtrai2 Oct 24, 2024
74ff842
remove mathc cases for critical output error handling
dtrai2 Oct 24, 2024
ae1a130
remove unnecessary loop in test
dtrai2 Oct 24, 2024
342156b
remove unused`self.rule = rule` assignment
dtrai2 Oct 24, 2024
d79ca2f
update configuration doc with new parameters
dtrai2 Oct 24, 2024
872bfef
remove error_index key from opensearch_config dict in charts/test_out…
dtrai2 Oct 24, 2024
6e15030
add timeout to error output test
dtrai2 Oct 25, 2024
68d7850
extend error output config test
dtrai2 Oct 25, 2024
af71620
add assertion to verify event _index in message backlog event
dtrai2 Oct 25, 2024
0e945c0
add *args to LogprepException
dtrai2 Oct 25, 2024
8504424
add error handling acceptance test
dtrai2 Oct 25, 2024
59508ed
prevent writing 1 to error output
dtrai2 Oct 25, 2024
8fda657
update error output check in acceptance test
dtrai2 Oct 25, 2024
de06bd6
remove redundant error log statement as it can't be reached
dtrai2 Oct 28, 2024
355fcca
add tests pipeline manager
dtrai2 Oct 28, 2024
d69495f
add test for reading out error from opensearch bulk call
dtrai2 Oct 28, 2024
183fe1a
simplify test for ci pipeline
dtrai2 Oct 28, 2024
29c0611
change loglevel for bypassing rule tree message
dtrai2 Oct 28, 2024
d210506
remove stuck test
dtrai2 Oct 28, 2024
0070857
Refactor ConfluentKafkaInput to store offsets for last message
ekneg54 Oct 28, 2024
99b393b
fix last valid record handling in confluent kafka input tests
dtrai2 Oct 28, 2024
4db7e73
add test to check if error output healthcheck is added to exporter
dtrai2 Oct 28, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 8 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,9 @@
### Breaking

* remove AutoRuleCorpusTester
* removes the option to use synchronous `bulk` or `parallel_bulk` operation in favor of `parallel_bulk` in `opensearch_output`
* reimplement error handling by introducing the option to configure an error output
* if no error output is configured, failed event will be dropped

### Features

Expand All @@ -14,7 +17,11 @@
* initially run health checks on setup for every configured component
* make `imagePullPolicy` configurable for helm chart deployments
* make `terminationGracePeriodSeconds` configurable in helm chart values

* adds ability to configure error output
* adds option `default_op_type` to `opensearch_output` connector to set the default operation for indexing documents (default: index)
* adds option `max_chunk_bytes` to `opensearch_output` connector to set the maximum size of the request in bytes (default: 100MB)
* adds option `error_backlog_size` to logprep configuration to configure the queue size of the error queue
* the opensearch default index is now only used for processed events, errors will be written to the error output, if configured

### Improvements

Expand Down
8 changes: 4 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ log message, based on a configured geo-ip database.
Or the `Dropper` deletes fields from the log message.

As detailed overview of all processors can be found in the
[processor documentation](https://logprep.readthedocs.io/en/latest/user_manual/configuration/processor.html).
[processor documentation](https://logprep.readthedocs.io/en/latest/configuration/processor.html).

To influence the behaviour of those processors, each can be configured with a set of rules.
These rules define two things.
Expand Down Expand Up @@ -147,9 +147,9 @@ kafka-topic. Addionally, you can use the Opensearch or Opensearch output connect
messages directly to Opensearch or Opensearch after processing.

The details regarding the connectors can be found in the
[input connector documentation](https://logprep.readthedocs.io/en/latest/user_manual/configuration/input.html)
[input connector documentation](https://logprep.readthedocs.io/en/latest/configuration/input.html)
and
[output connector documentation](https://logprep.readthedocs.io/en/latest/user_manual/configuration/output.html).
[output connector documentation](https://logprep.readthedocs.io/en/latest/configuration/output.html).

### Configuration

Expand Down Expand Up @@ -228,7 +228,7 @@ The condition of this rule would check if the field `message` exists in the log.
If it does exist then the dropper would delete this field from the log message.

Details about the rule language and how to write rules for the processors can be found in the
[rule configuration documentation](https://logprep.readthedocs.io/en/latest/user_manual/configuration/rules.html).
[rule configuration documentation](https://logprep.readthedocs.io/en/latest/configuration/rules.html).

## Getting Started

Expand Down
4 changes: 2 additions & 2 deletions charts/logprep/Chart.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,10 @@ type: application
# This is the chart version. This version number should be incremented each time you make changes
# to the chart and its templates, including the app version.
# Versions are expected to follow Semantic Versioning (https://semver.org/)
version: "13.4.0"
version: "14.0.0"

# This is the version number of the application being deployed. This version number should be
# incremented each time you make changes to the application. Versions are not expected to
# follow Semantic Versioning. They should reflect the version the application is using.
# It is recommended to use it with quotes.
appVersion: "13.1.2"
appVersion: "14.0.0"
13 changes: 13 additions & 0 deletions charts/logprep/templates/deployment.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,9 @@ spec:
- output-config.yaml
- logger-config.yaml
- exporter-config.yaml
{{- if .Values.error_output }}
- error-output-config.yaml
{{- end }}
env:
{{- if eq .Values.logger.level "DEBUG" }}
- name: DEBUG # to get python stacktrace on error
Expand Down Expand Up @@ -103,6 +106,11 @@ spec:
- name: output-config
mountPath: /home/logprep/output-config.yaml
subPath: output-config.yaml
{{- if .Values.error_output }}
- name: error-output-config
mountPath: /home/logprep/error-output-config.yaml
subPath: error-output-config.yaml
{{- end }}
{{- if .Values.extraMounts }}
{{- toYaml .Values.extraMounts | nindent 12 }}
{{- end }}
Expand Down Expand Up @@ -155,6 +163,11 @@ spec:
- name: output-config
configMap:
name: {{ include "logprep.fullname" . }}-output
{{- if .Values.error_output }}
- name: error-output-config
configMap:
name: {{ include "logprep.fullname" . }}-error-output
{{- end }}
{{- if .Values.artifacts }}
- name: artifacts
configMap:
Expand Down
14 changes: 14 additions & 0 deletions charts/logprep/templates/error-output-config.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
{{- if .Values.error_output }}
---
apiVersion: v1
kind: ConfigMap
metadata:
name: {{ include "logprep.fullname" . }}-error-output
labels:
{{- include "logprep.labels" . | nindent 4 }}
data:
error-output-config.yaml: |
error_output:
{{- trimSuffix "_output" .Values.error_output.type | nindent 6}}:
{{- toYaml .Values.error_output | nindent 8}}
{{- end }}
30 changes: 26 additions & 4 deletions charts/logprep/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ exporter:
argocd.argoproj.io/sync-options: SkipDryRunOnMissingResource=true

# Logprep logging configuration.
# See: https://logprep.readthedocs.io/en/latest/user_manual/configuration/index.html#configuration-file-structure
# See: https://logprep.readthedocs.io/en/latest/configuration/index.html#configuration-file-structure
# for available configuration options.
logger:
level: DEBUG
Expand All @@ -110,7 +110,7 @@ logger:
# If the type is `http_input`, an extra service will be populated and the readiness
# probe will be set to the health check of the configured http input.
#
# See https://logprep.readthedocs.io/en/latest/user_manual/configuration/input.html
# See https://logprep.readthedocs.io/en/latest/configuration/input.html
# for available configuration options.
# Note:
# For the `http_input` endpoints you have to add the endpoint `/health: plaintext` to ensure
Expand All @@ -123,10 +123,32 @@ input: {}
# `type` of the output. For example, if the type is `opensearch_output`, the
# name of the output will be `opensearch`. Keep this in mind if you specify
# additional outputs in the configurations section.
# See https://logprep.readthedocs.io/en/latest/user_manual/configuration/output.html
# See https://logprep.readthedocs.io/en/latest/configuration/output.html
# for available configuration options.
output: {}

# The logprep error output connector configuration
# Note: If this is not set, failed events will be dropped.
# Available error outputs are the same as the normal outputs.
# See https://logprep.readthedocs.io/en/latest/configuration/output.html
# Example:
#
# error_output:
# type: confluentkafka_output
# topic: errors
# flush_timeout: 300
# send_timeout: 0
# kafka_config:
# bootstrap.servers: 127.0.0.1:9092
# compression.type: none
# statistics.interval.ms: "60000"
# queue.buffering.max.messages: "10"
# queue.buffering.max.kbytes: "1024"
# queue.buffering.max.ms: "1000"
# batch.size: "100"
# request.required.acks: "-1"
error_output: {}

# Additional logprep configurations
# Note: The configuration keys 'logger', 'metrics', 'input' and 'output' will overwrite the
# corresponding keys in these configurations as they will be merged into
Expand All @@ -141,7 +163,7 @@ output: {}
# pipeline: []
# - name: https://rule-server.de
#
# See https://logprep.readthedocs.io/en/latest/user_manual/configuration/index.html#configuration-file-structure
# See https://logprep.readthedocs.io/en/latest/configuration/index.html#configuration-file-structure
# for available configuration options.
configurations:
- name: logprep-config
Expand Down
2 changes: 1 addition & 1 deletion doc/source/configuration/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ Configuration
:no-index:

.. autoclass:: logprep.util.configuration.Configuration
:members: version, config_refresh_interval, process_count, timeout, logger, input, output, pipeline, metrics, profile_pipelines, print_auto_test_stack_trace
:members: version, config_refresh_interval, process_count, restart_count, timeout, logger, input, output, error_output, pipeline, metrics, profile_pipelines, print_auto_test_stack_trace, error_backlog_size
:no-index:

.. toctree::
Expand Down
20 changes: 13 additions & 7 deletions doc/source/development/connector_how_to.rst
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ Implementing a new Connector

Connectors are used to fetch or store log messages.
Input and ouput connectors work each independently, with the exception that an output connector
might call a callback function inside the input, to notify that the current batch was sucessfully
might call a callback function inside the input, to notify that the current batch was sucessfully
processed. Only then the input would start collecting new inputs.
Because of this independence, it is possible to receive messages from one system and to store them
in another, i.e. reading from Kafka and writing to OpenSearch.
Expand Down Expand Up @@ -40,7 +40,7 @@ An exception should be thrown if an error occurs on calling this function.
These exceptions must inherit from the exception classes in :py:class:`~logprep.input.input.Input`.
They should return a helpful message when calling `str(exception)`.
Exceptions requiring Logprep to restart should inherit from `FatalInputError`.
Exceptions that inherit from `WarningInputError` will be logged, but they do not require any error
Exceptions that inherit from `WarningInputError` will be logged, but they do not require any error
handling.

.. _connector_output:
Expand All @@ -62,12 +62,18 @@ They should return a helpful message when calling `str(exception)`.
Analogous to the input, exceptions that require a restart of Logprep should inherit from `FatalOutputError`.
Exceptions that inherit from `OutputWarning` will be logged, but they do not require any error handling.

:py:meth:`~logprep.output.output.Output.store_failed`
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

This method is being called if an error occurred during the processing of a log message.
The original log message, the partially processed log message and the error message are being passed to this method.
These will be stored separately from regular log messages for debugging and error handling.
.. _error_output:

Error Output
------------

Error output is setup in :py:class:`~logprep.framework.pipeline_manager.PipelineManager`. The error
output connector is instantiated and setup only once during the initialization of the pipeline manager
together with :py:class:`~logprep.framework.pipeline_manager.OutputQueueListener`.
The listener is used to listen on the populated error queue and to send the log messages to the
:code:`store` method of the error output connector.
The error queue is given to the listener and to all pipelines instantiated by the pipeline manager.


Factory
Expand Down
13 changes: 8 additions & 5 deletions doc/source/development/processor_how_to.rst
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ process

This method is implemented in the :py:class:`~logprep.abc.processor.Processor` and is called for every log message.
To process the event it invokes the processors `apply_rules` method.
If you want to do somthing to the event after all rules have been applied, then you could overwrite this method and implement your code after calling the `super().process(event)`.
If you want to do something to the event after all rules have been applied, then you could overwrite this method and implement your code after calling the `super().process(event)`.
The log message is being passed as a dictionary and modified 'in place', meaning that modifications are being performed directly on the input event.

.. code-block:: python
Expand All @@ -112,11 +112,14 @@ The log message is being passed as a dictionary and modified 'in place', meaning
Exceptions/Error Handling
~~~~~~~~~~~~~~~~~~~~~~~~~

An exception should be thrown if an error occurs during the processing of a log message.
All exceptions are being logged and should return a helpful error message with `str(exception)`.
Exceptions derived from `ProcessorWarningError` have no impact on the operation of the processor.
An exception should not been thrown on processor level. Instead it should be added to
the :py:class:`~logprep.abc.processor.ProcessorResult` if an error occurs during the processing of a log message.
All exceptions are being logged in :py:class:`~logprep.framework.pipeline.Pipeline` and should provide a helpful
error message with `str(exception)`. You do not have to log the exception in the processor itself.
Exceptions derived from :py:class:`~logprep.processor.base.exceptions.ProcessingWarning` have no impact on the
operation of the processor.
Other exceptions stop the processing of a log message.
However, the log message will be separately stored as failed (see :ref:`connector_output`, `store_failed``).
However, the log message will be separately stored as failed (see :ref:`error_output`).


Metrics
Expand Down
2 changes: 0 additions & 2 deletions examples/compose/docker-compose.yml
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
version: "3.9"

# The following command must be executed after each restart on linux or opensearch exits with an error
# sudo sysctl -w vm.max_map_count=262144

Expand Down
41 changes: 41 additions & 0 deletions examples/exampledata/config/file_input_pipeline.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
version: 2
process_count: 4
config_refresh_interval: 5
profile_pipelines: false
restart_count: 3
logger:
level: INFO
loggers:
uvicorn:
level: INFO
uvicorn.access:
level: INFO
uvicorn.error:
level: INFO

metrics:
enabled: true
port: 8003
uvicorn_config:
host: 0.0.0.0
access_log: true
server_header: false
date_header: false
workers: 1
ws: none
interface: asgi3
backlog: 16384
timeout_keep_alive: 65
input:
file_input:
type: file_input
logfile_path: /tmp/logfiletest
start: begin
interval: 1
watch_file: True
output:
dummy:
type: dummy_output
error_output:
console:
type: console_output
Loading
Loading