Skip to content

Commit

Permalink
add abiltity to configure error output (#668)
Browse files Browse the repository at this point in the history
* add abiltity to configure error output
* remove store_failed methods
---------

Co-authored-by: dtrai2 <[email protected]>
  • Loading branch information
ekneg54 and dtrai2 authored Oct 29, 2024
1 parent 1a42a12 commit b593be6
Show file tree
Hide file tree
Showing 71 changed files with 2,171 additions and 1,653 deletions.
9 changes: 8 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,9 @@
### Breaking

* remove AutoRuleCorpusTester
* removes the option to use synchronous `bulk` or `parallel_bulk` operation in favor of `parallel_bulk` in `opensearch_output`
* reimplement error handling by introducing the option to configure an error output
* if no error output is configured, failed event will be dropped

### Features

Expand All @@ -15,7 +18,11 @@
* make `imagePullPolicy` configurable for helm chart deployments
* it is now possible to use Lucene compliant Filter Expressions
* make `terminationGracePeriodSeconds` configurable in helm chart values

* adds ability to configure error output
* adds option `default_op_type` to `opensearch_output` connector to set the default operation for indexing documents (default: index)
* adds option `max_chunk_bytes` to `opensearch_output` connector to set the maximum size of the request in bytes (default: 100MB)
* adds option `error_backlog_size` to logprep configuration to configure the queue size of the error queue
* the opensearch default index is now only used for processed events, errors will be written to the error output, if configured

### Improvements

Expand Down
8 changes: 4 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ log message, based on a configured geo-ip database.
Or the `Dropper` deletes fields from the log message.

As detailed overview of all processors can be found in the
[processor documentation](https://logprep.readthedocs.io/en/latest/user_manual/configuration/processor.html).
[processor documentation](https://logprep.readthedocs.io/en/latest/configuration/processor.html).

To influence the behaviour of those processors, each can be configured with a set of rules.
These rules define two things.
Expand Down Expand Up @@ -147,9 +147,9 @@ kafka-topic. Addionally, you can use the Opensearch or Opensearch output connect
messages directly to Opensearch or Opensearch after processing.

The details regarding the connectors can be found in the
[input connector documentation](https://logprep.readthedocs.io/en/latest/user_manual/configuration/input.html)
[input connector documentation](https://logprep.readthedocs.io/en/latest/configuration/input.html)
and
[output connector documentation](https://logprep.readthedocs.io/en/latest/user_manual/configuration/output.html).
[output connector documentation](https://logprep.readthedocs.io/en/latest/configuration/output.html).

### Configuration

Expand Down Expand Up @@ -228,7 +228,7 @@ The condition of this rule would check if the field `message` exists in the log.
If it does exist then the dropper would delete this field from the log message.

Details about the rule language and how to write rules for the processors can be found in the
[rule configuration documentation](https://logprep.readthedocs.io/en/latest/user_manual/configuration/rules.html).
[rule configuration documentation](https://logprep.readthedocs.io/en/latest/configuration/rules.html).

## Getting Started

Expand Down
4 changes: 2 additions & 2 deletions charts/logprep/Chart.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,10 @@ type: application
# This is the chart version. This version number should be incremented each time you make changes
# to the chart and its templates, including the app version.
# Versions are expected to follow Semantic Versioning (https://semver.org/)
version: "13.4.0"
version: "14.0.0"

# This is the version number of the application being deployed. This version number should be
# incremented each time you make changes to the application. Versions are not expected to
# follow Semantic Versioning. They should reflect the version the application is using.
# It is recommended to use it with quotes.
appVersion: "13.1.2"
appVersion: "14.0.0"
13 changes: 13 additions & 0 deletions charts/logprep/templates/deployment.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,9 @@ spec:
- output-config.yaml
- logger-config.yaml
- exporter-config.yaml
{{- if .Values.error_output }}
- error-output-config.yaml
{{- end }}
env:
{{- if eq .Values.logger.level "DEBUG" }}
- name: DEBUG # to get python stacktrace on error
Expand Down Expand Up @@ -103,6 +106,11 @@ spec:
- name: output-config
mountPath: /home/logprep/output-config.yaml
subPath: output-config.yaml
{{- if .Values.error_output }}
- name: error-output-config
mountPath: /home/logprep/error-output-config.yaml
subPath: error-output-config.yaml
{{- end }}
{{- if .Values.extraMounts }}
{{- toYaml .Values.extraMounts | nindent 12 }}
{{- end }}
Expand Down Expand Up @@ -155,6 +163,11 @@ spec:
- name: output-config
configMap:
name: {{ include "logprep.fullname" . }}-output
{{- if .Values.error_output }}
- name: error-output-config
configMap:
name: {{ include "logprep.fullname" . }}-error-output
{{- end }}
{{- if .Values.artifacts }}
- name: artifacts
configMap:
Expand Down
14 changes: 14 additions & 0 deletions charts/logprep/templates/error-output-config.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
{{- if .Values.error_output }}
---
apiVersion: v1
kind: ConfigMap
metadata:
name: {{ include "logprep.fullname" . }}-error-output
labels:
{{- include "logprep.labels" . | nindent 4 }}
data:
error-output-config.yaml: |
error_output:
{{- trimSuffix "_output" .Values.error_output.type | nindent 6}}:
{{- toYaml .Values.error_output | nindent 8}}
{{- end }}
30 changes: 26 additions & 4 deletions charts/logprep/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ exporter:
argocd.argoproj.io/sync-options: SkipDryRunOnMissingResource=true

# Logprep logging configuration.
# See: https://logprep.readthedocs.io/en/latest/user_manual/configuration/index.html#configuration-file-structure
# See: https://logprep.readthedocs.io/en/latest/configuration/index.html#configuration-file-structure
# for available configuration options.
logger:
level: DEBUG
Expand All @@ -110,7 +110,7 @@ logger:
# If the type is `http_input`, an extra service will be populated and the readiness
# probe will be set to the health check of the configured http input.
#
# See https://logprep.readthedocs.io/en/latest/user_manual/configuration/input.html
# See https://logprep.readthedocs.io/en/latest/configuration/input.html
# for available configuration options.
# Note:
# For the `http_input` endpoints you have to add the endpoint `/health: plaintext` to ensure
Expand All @@ -123,10 +123,32 @@ input: {}
# `type` of the output. For example, if the type is `opensearch_output`, the
# name of the output will be `opensearch`. Keep this in mind if you specify
# additional outputs in the configurations section.
# See https://logprep.readthedocs.io/en/latest/user_manual/configuration/output.html
# See https://logprep.readthedocs.io/en/latest/configuration/output.html
# for available configuration options.
output: {}

# The logprep error output connector configuration
# Note: If this is not set, failed events will be dropped.
# Available error outputs are the same as the normal outputs.
# See https://logprep.readthedocs.io/en/latest/configuration/output.html
# Example:
#
# error_output:
# type: confluentkafka_output
# topic: errors
# flush_timeout: 300
# send_timeout: 0
# kafka_config:
# bootstrap.servers: 127.0.0.1:9092
# compression.type: none
# statistics.interval.ms: "60000"
# queue.buffering.max.messages: "10"
# queue.buffering.max.kbytes: "1024"
# queue.buffering.max.ms: "1000"
# batch.size: "100"
# request.required.acks: "-1"
error_output: {}

# Additional logprep configurations
# Note: The configuration keys 'logger', 'metrics', 'input' and 'output' will overwrite the
# corresponding keys in these configurations as they will be merged into
Expand All @@ -141,7 +163,7 @@ output: {}
# pipeline: []
# - name: https://rule-server.de
#
# See https://logprep.readthedocs.io/en/latest/user_manual/configuration/index.html#configuration-file-structure
# See https://logprep.readthedocs.io/en/latest/configuration/index.html#configuration-file-structure
# for available configuration options.
configurations:
- name: logprep-config
Expand Down
2 changes: 1 addition & 1 deletion doc/source/configuration/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ Configuration
:no-index:

.. autoclass:: logprep.util.configuration.Configuration
:members: version, config_refresh_interval, process_count, timeout, logger, input, output, pipeline, metrics, profile_pipelines, print_auto_test_stack_trace
:members: version, config_refresh_interval, process_count, restart_count, timeout, logger, input, output, error_output, pipeline, metrics, profile_pipelines, print_auto_test_stack_trace, error_backlog_size
:no-index:

.. toctree::
Expand Down
20 changes: 13 additions & 7 deletions doc/source/development/connector_how_to.rst
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ Implementing a new Connector

Connectors are used to fetch or store log messages.
Input and ouput connectors work each independently, with the exception that an output connector
might call a callback function inside the input, to notify that the current batch was sucessfully
might call a callback function inside the input, to notify that the current batch was sucessfully
processed. Only then the input would start collecting new inputs.
Because of this independence, it is possible to receive messages from one system and to store them
in another, i.e. reading from Kafka and writing to OpenSearch.
Expand Down Expand Up @@ -40,7 +40,7 @@ An exception should be thrown if an error occurs on calling this function.
These exceptions must inherit from the exception classes in :py:class:`~logprep.input.input.Input`.
They should return a helpful message when calling `str(exception)`.
Exceptions requiring Logprep to restart should inherit from `FatalInputError`.
Exceptions that inherit from `WarningInputError` will be logged, but they do not require any error
Exceptions that inherit from `WarningInputError` will be logged, but they do not require any error
handling.

.. _connector_output:
Expand All @@ -62,12 +62,18 @@ They should return a helpful message when calling `str(exception)`.
Analogous to the input, exceptions that require a restart of Logprep should inherit from `FatalOutputError`.
Exceptions that inherit from `OutputWarning` will be logged, but they do not require any error handling.

:py:meth:`~logprep.output.output.Output.store_failed`
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

This method is being called if an error occurred during the processing of a log message.
The original log message, the partially processed log message and the error message are being passed to this method.
These will be stored separately from regular log messages for debugging and error handling.
.. _error_output:

Error Output
------------

Error output is setup in :py:class:`~logprep.framework.pipeline_manager.PipelineManager`. The error
output connector is instantiated and setup only once during the initialization of the pipeline manager
together with :py:class:`~logprep.framework.pipeline_manager.OutputQueueListener`.
The listener is used to listen on the populated error queue and to send the log messages to the
:code:`store` method of the error output connector.
The error queue is given to the listener and to all pipelines instantiated by the pipeline manager.


Factory
Expand Down
13 changes: 8 additions & 5 deletions doc/source/development/processor_how_to.rst
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ process

This method is implemented in the :py:class:`~logprep.abc.processor.Processor` and is called for every log message.
To process the event it invokes the processors `apply_rules` method.
If you want to do somthing to the event after all rules have been applied, then you could overwrite this method and implement your code after calling the `super().process(event)`.
If you want to do something to the event after all rules have been applied, then you could overwrite this method and implement your code after calling the `super().process(event)`.
The log message is being passed as a dictionary and modified 'in place', meaning that modifications are being performed directly on the input event.

.. code-block:: python
Expand All @@ -112,11 +112,14 @@ The log message is being passed as a dictionary and modified 'in place', meaning
Exceptions/Error Handling
~~~~~~~~~~~~~~~~~~~~~~~~~

An exception should be thrown if an error occurs during the processing of a log message.
All exceptions are being logged and should return a helpful error message with `str(exception)`.
Exceptions derived from `ProcessorWarningError` have no impact on the operation of the processor.
An exception should not been thrown on processor level. Instead it should be added to
the :py:class:`~logprep.abc.processor.ProcessorResult` if an error occurs during the processing of a log message.
All exceptions are being logged in :py:class:`~logprep.framework.pipeline.Pipeline` and should provide a helpful
error message with `str(exception)`. You do not have to log the exception in the processor itself.
Exceptions derived from :py:class:`~logprep.processor.base.exceptions.ProcessingWarning` have no impact on the
operation of the processor.
Other exceptions stop the processing of a log message.
However, the log message will be separately stored as failed (see :ref:`connector_output`, `store_failed``).
However, the log message will be separately stored as failed (see :ref:`error_output`).


Metrics
Expand Down
2 changes: 0 additions & 2 deletions examples/compose/docker-compose.yml
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
version: "3.9"

# The following command must be executed after each restart on linux or opensearch exits with an error
# sudo sysctl -w vm.max_map_count=262144

Expand Down
41 changes: 41 additions & 0 deletions examples/exampledata/config/file_input_pipeline.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
version: 2
process_count: 4
config_refresh_interval: 5
profile_pipelines: false
restart_count: 3
logger:
level: INFO
loggers:
uvicorn:
level: INFO
uvicorn.access:
level: INFO
uvicorn.error:
level: INFO

metrics:
enabled: true
port: 8003
uvicorn_config:
host: 0.0.0.0
access_log: true
server_header: false
date_header: false
workers: 1
ws: none
interface: asgi3
backlog: 16384
timeout_keep_alive: 65
input:
file_input:
type: file_input
logfile_path: /tmp/logfiletest
start: begin
interval: 1
watch_file: True
output:
dummy:
type: dummy_output
error_output:
console:
type: console_output
Loading

0 comments on commit b593be6

Please sign in to comment.