Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add abiltity to configure error output #668

Merged
merged 91 commits into from
Oct 29, 2024
Merged

Conversation

ekneg54
Copy link
Collaborator

@ekneg54 ekneg54 commented Sep 16, 2024

This changes the error handling implementation of logprep.
This PR aims at two main goals.

  1. no error should permit further processing

This is handled by not raising FatalOutputError or FatalInputError but instead raise a CriticalInputError or a CriticalOutputError.
No error is handled in the pipeline process anymore all error causing events are written to error output.
To achieve this, I had to change the batch_finished_callback mechanic. As now every event gotten from input via get_next is committed to kafka utilizing the batch_finished_callback mechanic in the pipeline.py. no connections between intput and output connectors anymore.

  1. no event should get lost

to make it simple. This PR has to ensure, that every event goes into output, error output or gets logged to console as last resort.

Every error raising event will be serialized together with its raising error to an error event and is put into a multiprocessing.Queue (ThrottlingQueue). In the main Thread these events were handled in a configured error output connector which indeed can be any output connector implemented in logprep.

To achieve theses goals I had to reimplement the opensearch output connector to simplify things a lot.

Please have a look on my changes and lets discuss. Feel free to give feedback and to ask your questions.
It is a very big PR. Sorry for that but the cut was a fundamental one.

@ekneg54 ekneg54 added the enhancement New feature or request label Sep 16, 2024
@ekneg54 ekneg54 self-assigned this Sep 16, 2024
@ekneg54 ekneg54 force-pushed the dev-implement-error-output branch 3 times, most recently from ae1d775 to 757ef5a Compare September 20, 2024 12:42
@ekneg54 ekneg54 force-pushed the dev-implement-error-output branch from 68d2cdb to 69446fc Compare September 26, 2024 17:33
This was referenced Sep 26, 2024
@ekneg54 ekneg54 force-pushed the dev-implement-error-output branch 2 times, most recently from 1ff01f5 to 2f1aab3 Compare October 2, 2024 12:36
@ekneg54 ekneg54 marked this pull request as ready for review October 2, 2024 13:27
@ekneg54 ekneg54 requested review from ppcad and clumsy9 October 2, 2024 13:40
@ekneg54 ekneg54 force-pushed the dev-implement-error-output branch 5 times, most recently from 1238998 to 895c4cd Compare October 7, 2024 11:06
@ekneg54
Copy link
Collaborator Author

ekneg54 commented Oct 8, 2024

additional work:

  • add error output to health checks
  • add metric for error output queue
  • add option in helm chart to configure error_output
  • inspect behavior on high error count situations -> it looks that the error_output is dying and not restartet
  • what happens if the error_queue is full?
    if the error queue is full, the put in pipeline is blocking further processing, but before this happens, the processing is throttled by the throttling queue
  • fix failed events metric
  • is putting an event to the error queue an error? (versus failed event )
    yes, because the counts can be different because of batched outputs causing many failed events with one critcal_output_error
  • kubernetes kills pod before error queue is empty
    grace period was increased to 300s in helm charts. this carries the shutdown behavior in connection with smooth error_backlog_size config options of about 1000. this works in our setup.
  • kafka input is only processing half the topics with 1 process (perhaps a client is not closed but dereferenced during setup)
  • inspect performance regression
  • inspect memeory leak

@ekneg54 ekneg54 marked this pull request as draft October 8, 2024 08:40
ekneg54 added 10 commits October 9, 2024 11:40
add a componentqueuelistener to handle errors from queue into output connector
fix pipeline_manager tests
add tests for componentqueuelistener

add more tests
remove double property

WIP
bump test coverage for pipeline_manager to 100 percent

fix most acceptance tests by adding error_output

add more tests

start fixing pipeline.py tests
add basic tests for pipeline_result

add tests for pipeline_result
Copy link
Collaborator

@dtrai2 dtrai2 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some more considerations:

  • please check the rst configuration references for the documentation, currently the error_output is missing
  • tests/unit/charts/test_output_config.py has a error_index inside the opensearch configuration, which should be removed
  • consider if closing the component queue listener queue before draining (prevents adding new events from other processes)
  • please check the test coverage

CHANGELOG.md Outdated Show resolved Hide resolved
charts/logprep/values.yaml Outdated Show resolved Hide resolved
charts/logprep/values.yaml Outdated Show resolved Hide resolved
examples/exampledata/config/pipeline.yml Outdated Show resolved Hide resolved
logprep/abc/component.py Outdated Show resolved Hide resolved
tests/unit/framework/test_pipeline_manager.py Outdated Show resolved Hide resolved
tests/unit/framework/test_pipeline_manager.py Outdated Show resolved Hide resolved
tests/unit/framework/test_pipeline_manager.py Outdated Show resolved Hide resolved
tests/unit/util/test_configuration.py Outdated Show resolved Hide resolved
logprep/processor/base/exceptions.py Outdated Show resolved Hide resolved
dtrai2 and others added 25 commits October 24, 2024 10:59
- Renamed multiple test functions for clarity and consistency.
- Updated logging messages to provide better context for errors.
- Improved documentation links and descriptions in YAML and Markdown files.
- Fix method signatures
- Consolidate OutputQueueListener to use multiprocessing exclusively.
- Remove threading implementation and related configurations.
- Update tests and documentation to reflect these changes.
- Updated method name for improved clarity and consistency.
- Adjusted related tests and function calls accordingly.
- Enhanced documentation within the new method.
- Removed redundant case clauses for `CriticalOutputError` handling.
- Updated unit tests to cover new error handling logic.
- Ensure test does not hang if error output file is not created.
- Timeout set to 10 seconds to prevent indefinite waiting periods.
- Ensure volume mounts do not include error-output-config
- Check command string does not reference error-output-config.yaml
- InvalidConfigurationError receives an unspecified amount of arguments that couldn't be successfully forwarded to the LogprepException
- Added a new test to ensure logging of errors when error output itself encounters an error.
- ignore the 1 that is added to the error_queque for process synchronization reasons
- adjust `wait_for_output` to exclude specific forbidden outputs
- add comment to clarify purpose of exclusion
- and as it can't be reached it also can't be tested
- Introduced tests for `listen` and `drain_queue` methods.
- Verified logging of unexpected exceptions during queue processing.
- Ensured specific items and sentinel values are ignored during queue operations.
- Increases test coverage
- test is working locally but runs forever in ci pipeline
This commit refactors the ConfluentKafkaInput class to store offsets for the last message referenced by `_last_valid_records`. Previously, offsets were stored for each kafka partition in `_last_valid_records`, but now only the last valid record is stored. This change improves the efficiency of offset storage and reduces memory usage.

Code changes:
- Modified `ConfluentKafkaInput` class in `logprep/connector/confluent_kafka/input.py`
- Removed `_last_valid_records` dictionary and replaced it with `_last_valid_record` variable
- Updated `batch_finished_callback` method to store offsets for the last valid record
@ekneg54 ekneg54 marked this pull request as ready for review October 29, 2024 13:17
@ekneg54 ekneg54 merged commit b593be6 into main Oct 29, 2024
13 checks passed
@ekneg54 ekneg54 deleted the dev-implement-error-output branch October 29, 2024 13:21
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants