From a4a1b233d4ab0eeaa0936f1919e014ddc6a03d0f Mon Sep 17 00:00:00 2001 From: dtrai2 Date: Thu, 24 Oct 2024 10:59:18 +0200 Subject: [PATCH] Refactor test names and improve error logging - Renamed multiple test functions for clarity and consistency. - Updated logging messages to provide better context for errors. - Improved documentation links and descriptions in YAML and Markdown files. - Fix method signatures --- CHANGELOG.md | 2 +- README.md | 8 +++--- charts/logprep/values.yaml | 12 ++++----- logprep/abc/component.py | 2 +- logprep/abc/input.py | 7 ++--- logprep/abc/output.py | 2 -- logprep/connector/opensearch/output.py | 8 +++--- logprep/framework/pipeline.py | 8 +++--- logprep/framework/pipeline_manager.py | 10 +++---- tests/acceptance/test_error_output.py | 1 - tests/unit/charts/test_error_output_config.py | 4 +-- .../unit/connector/test_opensearch_output.py | 26 +------------------ tests/unit/connector/test_real_kafka.py | 4 +-- .../exceptions/test_processing_exceptions.py | 2 -- tests/unit/framework/test_pipeline.py | 5 ++-- tests/unit/framework/test_pipeline_manager.py | 6 ++--- tests/unit/util/test_configuration.py | 1 - 17 files changed, 36 insertions(+), 72 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 8fac8e2a2..6e8642121 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -21,7 +21,7 @@ * adds option `default_op_type` to `opensearch_output` connector to set the default operation for indexing documents (default: index) * adds option `max_chunk_bytes` to `opensearch_output` connector to set the maximum size of the request in bytes (default: 100MB) * adds option `error_backlog_size` to logprep configuration to configure the queue size of the error queue - +* the opensearch default index is now only used for processed events, errors will be written to the error output, if configured ### Improvements diff --git a/README.md b/README.md index 2511c545b..10b4fd2ed 100644 --- a/README.md +++ b/README.md @@ -84,7 +84,7 @@ log message, based on a configured geo-ip database. Or the `Dropper` deletes fields from the log message. As detailed overview of all processors can be found in the -[processor documentation](https://logprep.readthedocs.io/en/latest/user_manual/configuration/processor.html). +[processor documentation](https://logprep.readthedocs.io/en/latest/configuration/processor.html). To influence the behaviour of those processors, each can be configured with a set of rules. These rules define two things. @@ -147,9 +147,9 @@ kafka-topic. Addionally, you can use the Opensearch or Opensearch output connect messages directly to Opensearch or Opensearch after processing. The details regarding the connectors can be found in the -[input connector documentation](https://logprep.readthedocs.io/en/latest/user_manual/configuration/input.html) +[input connector documentation](https://logprep.readthedocs.io/en/latest/configuration/input.html) and -[output connector documentation](https://logprep.readthedocs.io/en/latest/user_manual/configuration/output.html). +[output connector documentation](https://logprep.readthedocs.io/en/latest/configuration/output.html). ### Configuration @@ -228,7 +228,7 @@ The condition of this rule would check if the field `message` exists in the log. If it does exist then the dropper would delete this field from the log message. Details about the rule language and how to write rules for the processors can be found in the -[rule configuration documentation](https://logprep.readthedocs.io/en/latest/user_manual/configuration/rules.html). +[rule configuration documentation](https://logprep.readthedocs.io/en/latest/configuration/rules.html). ## Getting Started diff --git a/charts/logprep/values.yaml b/charts/logprep/values.yaml index 446a7a55c..11a658444 100644 --- a/charts/logprep/values.yaml +++ b/charts/logprep/values.yaml @@ -98,7 +98,7 @@ exporter: argocd.argoproj.io/sync-options: SkipDryRunOnMissingResource=true # Logprep logging configuration. -# See: https://logprep.readthedocs.io/en/latest/user_manual/configuration/index.html#configuration-file-structure +# See: https://logprep.readthedocs.io/en/latest/configuration/index.html#configuration-file-structure # for available configuration options. logger: level: DEBUG @@ -110,7 +110,7 @@ logger: # If the type is `http_input`, an extra service will be populated and the readiness # probe will be set to the health check of the configured http input. # -# See https://logprep.readthedocs.io/en/latest/user_manual/configuration/input.html +# See https://logprep.readthedocs.io/en/latest/configuration/input.html # for available configuration options. # Note: # For the `http_input` endpoints you have to add the endpoint `/health: plaintext` to ensure @@ -123,14 +123,14 @@ input: {} # `type` of the output. For example, if the type is `opensearch_output`, the # name of the output will be `opensearch`. Keep this in mind if you specify # additional outputs in the configurations section. -# See https://logprep.readthedocs.io/en/latest/user_manual/configuration/output.html +# See https://logprep.readthedocs.io/en/latest/configuration/output.html # for available configuration options. output: {} # The logprep error output connector configuration -# Note: If this is not set, error events will be dropped. +# Note: If this is not set, failed events will be dropped. # Available error outputs are the same as the normal outputs. -# See https://logprep.readthedocs.io/en/latest/user_manual/configuration/output.html +# See https://logprep.readthedocs.io/en/latest/configuration/output.html # Example: # # error_output: @@ -163,7 +163,7 @@ error_output: {} # pipeline: [] # - name: https://rule-server.de # -# See https://logprep.readthedocs.io/en/latest/user_manual/configuration/index.html#configuration-file-structure +# See https://logprep.readthedocs.io/en/latest/configuration/index.html#configuration-file-structure # for available configuration options. configurations: - name: logprep-config diff --git a/logprep/abc/component.py b/logprep/abc/component.py index a04b7e641..c988458ce 100644 --- a/logprep/abc/component.py +++ b/logprep/abc/component.py @@ -14,7 +14,7 @@ from attrs import asdict from schedule import Scheduler -from logprep.metrics.metrics import CounterMetric, Metric +from logprep.metrics.metrics import Metric from logprep.util.defaults import DEFAULT_HEALTH_TIMEOUT, EXITCODES from logprep.util.helper import camel_to_snake diff --git a/logprep/abc/input.py b/logprep/abc/input.py index 8237ccb96..b63b6d820 100644 --- a/logprep/abc/input.py +++ b/logprep/abc/input.py @@ -361,7 +361,6 @@ def _add_hmac_to(self, event_dict, raw_event) -> Tuple[dict, str]: """ hmac_options = self._config.preprocessing.get("hmac", {}) hmac_target_field_name = hmac_options.get("target") - non_critical_error_msg = None if raw_event is None: raw_event = self._encoder.encode(event_dict) @@ -372,10 +371,8 @@ def _add_hmac_to(self, event_dict, raw_event) -> Tuple[dict, str]: received_orig_message = get_dotted_field_value(event_dict, hmac_target_field_name) if received_orig_message is None: - non_critical_error_msg = ( - f"Couldn't find the hmac target " f"field '{hmac_target_field_name}'" - ) - raise CriticalInputError(self, non_critical_error_msg, raw_event) + error_message = f"Couldn't find the hmac target field '{hmac_target_field_name}'" + raise CriticalInputError(self, error_message, raw_event) if isinstance(received_orig_message, str): received_orig_message = received_orig_message.encode("utf-8") hmac = HMAC( diff --git a/logprep/abc/output.py b/logprep/abc/output.py index ab5994c20..b4bdba23a 100644 --- a/logprep/abc/output.py +++ b/logprep/abc/output.py @@ -55,8 +55,6 @@ class Config(Connector.Config): But this output can be called as output for extra_data. """ - __slots__ = {"input_connector"} - @property def default(self): """returns the default parameter""" diff --git a/logprep/connector/opensearch/output.py b/logprep/connector/opensearch/output.py index e66ca380d..4dcf9c2f8 100644 --- a/logprep/connector/opensearch/output.py +++ b/logprep/connector/opensearch/output.py @@ -274,18 +274,16 @@ def _write_backlog(self): raise error from error except Exception as error: # pylint: disable=broad-except logger.error("Failed to index documents: %s", error) - self.metrics.number_of_errors += 1 raise CriticalOutputError(self, str(error), self._message_backlog) from error finally: self._message_backlog.clear() self._failed.clear() - def _bulk(self, client, actions, *args, **kwargs) -> Optional[List[dict]]: + def _bulk(self, client, actions): """Bulk index documents into Opensearch. - uses the parallel_bulk function from the opensearchpy library. - all args are passed to :code:`streaming_bulk` function. + Uses the parallel_bulk function from the opensearchpy library. """ - kwargs |= { + kwargs = { "max_chunk_bytes": self._config.max_chunk_bytes, "chunk_size": self._config.chunk_size, "queue_size": self._config.queue_size, diff --git a/logprep/framework/pipeline.py b/logprep/framework/pipeline.py index 116c7445c..303a4faa9 100644 --- a/logprep/framework/pipeline.py +++ b/logprep/framework/pipeline.py @@ -274,7 +274,7 @@ def run(self) -> None: # pylint: disable=method-hidden self._shut_down() @_handle_pipeline_error - def process_pipeline(self) -> PipelineResult: + def process_pipeline(self) -> PipelineResult | None: """Retrieve next event, process event with full pipeline and store or return results""" Component.run_pending_tasks() event = self._input.get_next(self._timeout) @@ -345,7 +345,7 @@ def stop(self) -> None: with self._continue_iterating.get_lock(): self._continue_iterating.value = False - def get_health_functions(self) -> Tuple[bool]: + def get_health_functions(self) -> Tuple: """Return health function of components""" output_health_functions = [] if self._output: @@ -400,11 +400,11 @@ def enqueue_error( self.error_queue.put(event, timeout=0.1) self.logger.debug("Enqueued error item") except Exception as error: # pylint: disable=broad-except - self.logger.error((f"Couldn't enqueue error item due to: {error} | Item: '{event}'")) + self.logger.error(f"[Error Event] Couldn't enqueue error item due to: {error} | Item: '{event}'") if self._input: self._input.batch_finished_callback() - def _get_output_error_event(self, item: CriticalOutputError) -> dict: + def _get_output_error_event(self, item: CriticalOutputError) -> dict | list: match item: case CriticalOutputError([{"errors": _, "event": _}, *_]): event = [ diff --git a/logprep/framework/pipeline_manager.py b/logprep/framework/pipeline_manager.py index ad2f2da58..3adb9d2f5 100644 --- a/logprep/framework/pipeline_manager.py +++ b/logprep/framework/pipeline_manager.py @@ -68,10 +68,10 @@ class ComponentQueueListener: """The queue to listen to.""" target: str = field(validator=validators.instance_of(str)) - """The method name to call with the items from the queue.""" + """The method name of the component which will be used to handle the items from the queue.""" config: dict = field(validator=validators.instance_of(dict)) - """The configuration for the listener component.""" + """The configuration of the component used in this listener instance.""" sentinel: Any = field(default=None) """The sentinel object to stop the process. This has to implement identity comparison.""" @@ -122,7 +122,7 @@ def _listen(self): try: target(item) except Exception as error: # pylint: disable=broad-except - logger.error("Error processing item: %s due to %s", item, error) + logger.error(f"[Error Event] Couldn't enqueue error item due to: {error} | Item: '{item}'") self._drain_queue(target) component.shut_down() @@ -134,7 +134,7 @@ def _drain_queue(self, target): try: target(item) except Exception as error: # pylint: disable=broad-except - logger.error("Error processing item: %s due to %s", item, error) + logger.error(f"[Error Event] Couldn't enqueue error item due to: {error} | Item: '{item}'") self.queue.close() # close queue after draining to prevent message loss def stop(self): @@ -211,7 +211,7 @@ def _setup_error_queue(self): ) self._error_listener.start() # wait for the error listener to be ready before starting the pipelines - if self.error_queue.get(block=True) is None: + if self.error_queue.get(block=True) is self._error_listener.sentinel: self.stop() sys.exit(EXITCODES.ERROR_OUTPUT_NOT_REACHABLE.value) diff --git a/tests/acceptance/test_error_output.py b/tests/acceptance/test_error_output.py index 58c12ba39..01d89ff5b 100644 --- a/tests/acceptance/test_error_output.py +++ b/tests/acceptance/test_error_output.py @@ -3,7 +3,6 @@ # pylint: disable=missing-docstring import json -import re import tempfile import uuid from logging import DEBUG, basicConfig, getLogger diff --git a/tests/unit/charts/test_error_output_config.py b/tests/unit/charts/test_error_output_config.py index f58bc3a9d..b2d547210 100644 --- a/tests/unit/charts/test_error_output_config.py +++ b/tests/unit/charts/test_error_output_config.py @@ -41,7 +41,7 @@ def test_error_output_config_file_is_set(self): error_output_config = error_output_config[0] assert error_output_config["data"]["error-output-config.yaml"] == expected_output_config - def test_deployment_mounts_output_config(self): + def test_deployment_mounts_error_output_config(self): self.manifests = self.render_chart("logprep", {"error_output": kafka_output_config}) deployment = self.manifests.by_query("kind: Deployment")[0] volume_mounts = deployment["spec.template.spec.containers"][0]["volumeMounts"] @@ -64,7 +64,7 @@ def test_error_output_config_volume_is_populated(self): assert volume assert volume["configMap"]["name"] == error_output_config_name - def test_error_output_config_is_used_to_start_logprep(self): + def test_error_output_config_is_used_in_start_command_of_logprep(self): self.manifests = self.render_chart("logprep", {"error_output": kafka_output_config}) container = self.deployment["spec.template.spec.containers"][0] volume_mounts = container["volumeMounts"] diff --git a/tests/unit/connector/test_opensearch_output.py b/tests/unit/connector/test_opensearch_output.py index 6a336465f..9b65a2506 100644 --- a/tests/unit/connector/test_opensearch_output.py +++ b/tests/unit/connector/test_opensearch_output.py @@ -94,30 +94,6 @@ def test_message_backlog_is_cleared_after_it_was_written(self): self.object.store({"event": "test_event"}) assert len(self.object._message_backlog) == 0 - @pytest.mark.skip(reason="This test is only for local debugging") - def test_opensearch_parallel_bulk(self): - config = { - "type": "opensearch_output", - "hosts": ["localhost:9200"], - "default_index": "default_index", - "error_index": "error_index", - "message_backlog_size": 1, - "timeout": 5000, - } - output: OpensearchOutput = Factory.create({"opensearch_output": config}) - uuid_str = str(uuid.uuid4()) - result = output._search_context.search( - index="defaultindex", body={"query": {"match": {"foo": uuid_str}}} - ) - len_before = len(result["hits"]["hits"]) - output._message_backlog = [{"foo": uuid_str, "_index": "defaultindex"}] - output._write_backlog() - time.sleep(1) - result = output._search_context.search( - index="defaultindex", body={"query": {"match": {"foo": uuid_str}}} - ) - assert len(result["hits"]["hits"]) > len_before - @mock.patch( "logprep.connector.opensearch.output.OpensearchOutput._search_context", new=mock.MagicMock(), @@ -172,7 +148,7 @@ def test_write_backlog_clears_message_backlog_on_failure(self): assert len(self.object._message_backlog) == 0, "Message backlog should be cleared" def test_write_backlog_clears_failed_on_success(self): - self.object._message_backlog = [{"some": "event"}] + self.object._failed = [{"some": "event"}] self.object._write_backlog() assert len(self.object._failed) == 0, "temporary failed backlog should be cleared" diff --git a/tests/unit/connector/test_real_kafka.py b/tests/unit/connector/test_real_kafka.py index be187af6b..df0aa208e 100644 --- a/tests/unit/connector/test_real_kafka.py +++ b/tests/unit/connector/test_real_kafka.py @@ -30,7 +30,7 @@ def setup_module(): @pytest.mark.skipif(in_ci, reason="requires kafka") class TestKafkaConnection: def get_topic_partition_size(self, topic_partition: TopicPartition) -> int: - time.sleep(1) # nosemgrep + time.sleep(1) consumer = Consumer(kafka_config | {"group.id": str(uuid.uuid4())}) lowwater, highwater = consumer.get_watermark_offsets(topic_partition) consumer.close() @@ -38,7 +38,7 @@ def get_topic_partition_size(self, topic_partition: TopicPartition) -> int: def wait_for_topic_creation(self): while self.topic_name not in self.admin.list_topics().topics: - time.sleep(2) # nosemgrep + time.sleep(2) def setup_method(self): self.admin = AdminClient(kafka_config) diff --git a/tests/unit/exceptions/test_processing_exceptions.py b/tests/unit/exceptions/test_processing_exceptions.py index fe42b648c..52c8534b6 100644 --- a/tests/unit/exceptions/test_processing_exceptions.py +++ b/tests/unit/exceptions/test_processing_exceptions.py @@ -3,8 +3,6 @@ # pylint: disable=protected-access # pylint: disable=line-too-long -from unittest import mock - from logprep.processor.base.exceptions import ( FieldExistsWarning, ProcessingCriticalError, diff --git a/tests/unit/framework/test_pipeline.py b/tests/unit/framework/test_pipeline.py index 0c2850e9b..c6cf5deca 100644 --- a/tests/unit/framework/test_pipeline.py +++ b/tests/unit/framework/test_pipeline.py @@ -155,7 +155,7 @@ def delete_event(event) -> ProcessorResult: last_processor.process.assert_not_called() assert not event, "event was deleted" - def test_not_empty_documents_are_stored_in_the_output(self, _): + def test_documents_are_stored_in_the_output(self, _): self.pipeline._setup() self.pipeline._input.get_next.return_value = {"message": "test"} self.pipeline._store_event = mock.MagicMock() @@ -811,7 +811,6 @@ def test_pipeline_hmac_error_was_send_to_error_queue(self): b'{"applyrule":"yes"}', ) pipeline.enqueue_error.assert_called_with(expected_error) - # assert result.event["hmac"]["hmac"] == "error" def test_pipeline_run_raises_assertion_when_run_without_input(self): self.config.input = {} @@ -947,7 +946,7 @@ def test_sets_attributes(self, parameters, error, message): assert pipeline_result.pipeline == parameters["pipeline"] assert isinstance(pipeline_result.results[0], ProcessorResult) - def test_pipeline_result_produces_results(self, mock_processor): + def test_pipeline_result_instantiation_produces_results(self, mock_processor): pipeline_result = PipelineResult( event={"some": "event"}, pipeline=[ diff --git a/tests/unit/framework/test_pipeline_manager.py b/tests/unit/framework/test_pipeline_manager.py index 154fe90eb..767701717 100644 --- a/tests/unit/framework/test_pipeline_manager.py +++ b/tests/unit/framework/test_pipeline_manager.py @@ -21,7 +21,7 @@ ) from logprep.metrics.exporter import PrometheusExporter from logprep.util.configuration import Configuration, MetricsConfig -from logprep.util.defaults import DEFAULT_LOG_CONFIG +from logprep.util.defaults import DEFAULT_LOG_CONFIG, EXITCODES from logprep.util.logging import logqueue from tests.testdata.metadata import path_to_config @@ -307,7 +307,7 @@ def test_setup_error_queue_raises_system_exit_if_error_listener_fails(self): with mock.patch("logprep.framework.pipeline_manager.ComponentQueueListener"): with mock.patch("logprep.framework.pipeline_manager.ThrottlingQueue.get") as mock_get: mock_get.return_value = None - with pytest.raises(SystemExit, match="4"): + with pytest.raises(SystemExit, match=EXITCODES.ERROR_OUTPUT_NOT_REACHABLE.value): PipelineManager(self.config) def test_stop_calls_stop_on_error_listener(self): @@ -564,7 +564,7 @@ def test_listen_setups_component(self): listener._listen() mock_setup.assert_called() - def testget_component_instance_raises_if_setup_not_successful(self): + def test_get_component_instance_raises_if_setup_not_successful(self): target = "store" output_config = {"random_name": {"type": "dummy_output"}} queue = ThrottlingQueue(multiprocessing.get_context(), 100) diff --git a/tests/unit/util/test_configuration.py b/tests/unit/util/test_configuration.py index e60807736..35cb2dca1 100644 --- a/tests/unit/util/test_configuration.py +++ b/tests/unit/util/test_configuration.py @@ -340,7 +340,6 @@ def test_verify_passes_for_valid_configuration(self): ), ("verifies input config", {"input": {"random_name": {"type": "UNKNOWN"}}}, 1), ("verifies output config", {"output": {"kafka_output": {"type": "UNKNOWN"}}}, 1), - ("verifies error_output config", {"output": {"kafka_output": {"type": "UNKNOWN"}}}, 1), ( "multiple outputs but one config failure", {