Skip to content

Commit

Permalink
Refactor test names and improve error logging
Browse files Browse the repository at this point in the history
- Renamed multiple test functions for clarity and consistency.
- Updated logging messages to provide better context for errors.
- Improved documentation links and descriptions in YAML and Markdown files.
- Fix method signatures
  • Loading branch information
dtrai2 committed Oct 24, 2024
1 parent 1e150eb commit a4a1b23
Show file tree
Hide file tree
Showing 17 changed files with 36 additions and 72 deletions.
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
* adds option `default_op_type` to `opensearch_output` connector to set the default operation for indexing documents (default: index)
* adds option `max_chunk_bytes` to `opensearch_output` connector to set the maximum size of the request in bytes (default: 100MB)
* adds option `error_backlog_size` to logprep configuration to configure the queue size of the error queue

* the opensearch default index is now only used for processed events, errors will be written to the error output, if configured

### Improvements

Expand Down
8 changes: 4 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ log message, based on a configured geo-ip database.
Or the `Dropper` deletes fields from the log message.

As detailed overview of all processors can be found in the
[processor documentation](https://logprep.readthedocs.io/en/latest/user_manual/configuration/processor.html).
[processor documentation](https://logprep.readthedocs.io/en/latest/configuration/processor.html).

To influence the behaviour of those processors, each can be configured with a set of rules.
These rules define two things.
Expand Down Expand Up @@ -147,9 +147,9 @@ kafka-topic. Addionally, you can use the Opensearch or Opensearch output connect
messages directly to Opensearch or Opensearch after processing.

The details regarding the connectors can be found in the
[input connector documentation](https://logprep.readthedocs.io/en/latest/user_manual/configuration/input.html)
[input connector documentation](https://logprep.readthedocs.io/en/latest/configuration/input.html)
and
[output connector documentation](https://logprep.readthedocs.io/en/latest/user_manual/configuration/output.html).
[output connector documentation](https://logprep.readthedocs.io/en/latest/configuration/output.html).

### Configuration

Expand Down Expand Up @@ -228,7 +228,7 @@ The condition of this rule would check if the field `message` exists in the log.
If it does exist then the dropper would delete this field from the log message.

Details about the rule language and how to write rules for the processors can be found in the
[rule configuration documentation](https://logprep.readthedocs.io/en/latest/user_manual/configuration/rules.html).
[rule configuration documentation](https://logprep.readthedocs.io/en/latest/configuration/rules.html).

## Getting Started

Expand Down
12 changes: 6 additions & 6 deletions charts/logprep/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ exporter:
argocd.argoproj.io/sync-options: SkipDryRunOnMissingResource=true

# Logprep logging configuration.
# See: https://logprep.readthedocs.io/en/latest/user_manual/configuration/index.html#configuration-file-structure
# See: https://logprep.readthedocs.io/en/latest/configuration/index.html#configuration-file-structure
# for available configuration options.
logger:
level: DEBUG
Expand All @@ -110,7 +110,7 @@ logger:
# If the type is `http_input`, an extra service will be populated and the readiness
# probe will be set to the health check of the configured http input.
#
# See https://logprep.readthedocs.io/en/latest/user_manual/configuration/input.html
# See https://logprep.readthedocs.io/en/latest/configuration/input.html
# for available configuration options.
# Note:
# For the `http_input` endpoints you have to add the endpoint `/health: plaintext` to ensure
Expand All @@ -123,14 +123,14 @@ input: {}
# `type` of the output. For example, if the type is `opensearch_output`, the
# name of the output will be `opensearch`. Keep this in mind if you specify
# additional outputs in the configurations section.
# See https://logprep.readthedocs.io/en/latest/user_manual/configuration/output.html
# See https://logprep.readthedocs.io/en/latest/configuration/output.html
# for available configuration options.
output: {}

# The logprep error output connector configuration
# Note: If this is not set, error events will be dropped.
# Note: If this is not set, failed events will be dropped.
# Available error outputs are the same as the normal outputs.
# See https://logprep.readthedocs.io/en/latest/user_manual/configuration/output.html
# See https://logprep.readthedocs.io/en/latest/configuration/output.html
# Example:
#
# error_output:
Expand Down Expand Up @@ -163,7 +163,7 @@ error_output: {}
# pipeline: []
# - name: https://rule-server.de
#
# See https://logprep.readthedocs.io/en/latest/user_manual/configuration/index.html#configuration-file-structure
# See https://logprep.readthedocs.io/en/latest/configuration/index.html#configuration-file-structure
# for available configuration options.
configurations:
- name: logprep-config
Expand Down
2 changes: 1 addition & 1 deletion logprep/abc/component.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
from attrs import asdict
from schedule import Scheduler

from logprep.metrics.metrics import CounterMetric, Metric
from logprep.metrics.metrics import Metric
from logprep.util.defaults import DEFAULT_HEALTH_TIMEOUT, EXITCODES
from logprep.util.helper import camel_to_snake

Expand Down
7 changes: 2 additions & 5 deletions logprep/abc/input.py
Original file line number Diff line number Diff line change
Expand Up @@ -361,7 +361,6 @@ def _add_hmac_to(self, event_dict, raw_event) -> Tuple[dict, str]:
"""
hmac_options = self._config.preprocessing.get("hmac", {})
hmac_target_field_name = hmac_options.get("target")
non_critical_error_msg = None

if raw_event is None:
raw_event = self._encoder.encode(event_dict)
Expand All @@ -372,10 +371,8 @@ def _add_hmac_to(self, event_dict, raw_event) -> Tuple[dict, str]:
received_orig_message = get_dotted_field_value(event_dict, hmac_target_field_name)

if received_orig_message is None:
non_critical_error_msg = (
f"Couldn't find the hmac target " f"field '{hmac_target_field_name}'"
)
raise CriticalInputError(self, non_critical_error_msg, raw_event)
error_message = f"Couldn't find the hmac target field '{hmac_target_field_name}'"
raise CriticalInputError(self, error_message, raw_event)
if isinstance(received_orig_message, str):
received_orig_message = received_orig_message.encode("utf-8")
hmac = HMAC(
Expand Down
2 changes: 0 additions & 2 deletions logprep/abc/output.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,6 @@ class Config(Connector.Config):
But this output can be called as output for extra_data.
"""

__slots__ = {"input_connector"}

@property
def default(self):
"""returns the default parameter"""
Expand Down
8 changes: 3 additions & 5 deletions logprep/connector/opensearch/output.py
Original file line number Diff line number Diff line change
Expand Up @@ -274,18 +274,16 @@ def _write_backlog(self):
raise error from error
except Exception as error: # pylint: disable=broad-except
logger.error("Failed to index documents: %s", error)
self.metrics.number_of_errors += 1
raise CriticalOutputError(self, str(error), self._message_backlog) from error
finally:
self._message_backlog.clear()
self._failed.clear()

def _bulk(self, client, actions, *args, **kwargs) -> Optional[List[dict]]:
def _bulk(self, client, actions):
"""Bulk index documents into Opensearch.
uses the parallel_bulk function from the opensearchpy library.
all args are passed to :code:`streaming_bulk` function.
Uses the parallel_bulk function from the opensearchpy library.
"""
kwargs |= {
kwargs = {
"max_chunk_bytes": self._config.max_chunk_bytes,
"chunk_size": self._config.chunk_size,
"queue_size": self._config.queue_size,
Expand Down
8 changes: 4 additions & 4 deletions logprep/framework/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -274,7 +274,7 @@ def run(self) -> None: # pylint: disable=method-hidden
self._shut_down()

@_handle_pipeline_error
def process_pipeline(self) -> PipelineResult:
def process_pipeline(self) -> PipelineResult | None:
"""Retrieve next event, process event with full pipeline and store or return results"""
Component.run_pending_tasks()
event = self._input.get_next(self._timeout)
Expand Down Expand Up @@ -345,7 +345,7 @@ def stop(self) -> None:
with self._continue_iterating.get_lock():
self._continue_iterating.value = False

def get_health_functions(self) -> Tuple[bool]:
def get_health_functions(self) -> Tuple:
"""Return health function of components"""
output_health_functions = []
if self._output:
Expand Down Expand Up @@ -400,11 +400,11 @@ def enqueue_error(
self.error_queue.put(event, timeout=0.1)
self.logger.debug("Enqueued error item")
except Exception as error: # pylint: disable=broad-except
self.logger.error((f"Couldn't enqueue error item due to: {error} | Item: '{event}'"))
self.logger.error(f"[Error Event] Couldn't enqueue error item due to: {error} | Item: '{event}'")
if self._input:
self._input.batch_finished_callback()

def _get_output_error_event(self, item: CriticalOutputError) -> dict:
def _get_output_error_event(self, item: CriticalOutputError) -> dict | list:
match item:
case CriticalOutputError([{"errors": _, "event": _}, *_]):
event = [
Expand Down
10 changes: 5 additions & 5 deletions logprep/framework/pipeline_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,10 +68,10 @@ class ComponentQueueListener:
"""The queue to listen to."""

target: str = field(validator=validators.instance_of(str))
"""The method name to call with the items from the queue."""
"""The method name of the component which will be used to handle the items from the queue."""

config: dict = field(validator=validators.instance_of(dict))
"""The configuration for the listener component."""
"""The configuration of the component used in this listener instance."""

sentinel: Any = field(default=None)
"""The sentinel object to stop the process. This has to implement identity comparison."""
Expand Down Expand Up @@ -122,7 +122,7 @@ def _listen(self):
try:
target(item)
except Exception as error: # pylint: disable=broad-except
logger.error("Error processing item: %s due to %s", item, error)
logger.error(f"[Error Event] Couldn't enqueue error item due to: {error} | Item: '{item}'")
self._drain_queue(target)
component.shut_down()

Expand All @@ -134,7 +134,7 @@ def _drain_queue(self, target):
try:
target(item)
except Exception as error: # pylint: disable=broad-except
logger.error("Error processing item: %s due to %s", item, error)
logger.error(f"[Error Event] Couldn't enqueue error item due to: {error} | Item: '{item}'")
self.queue.close() # close queue after draining to prevent message loss

def stop(self):
Expand Down Expand Up @@ -211,7 +211,7 @@ def _setup_error_queue(self):
)
self._error_listener.start()
# wait for the error listener to be ready before starting the pipelines
if self.error_queue.get(block=True) is None:
if self.error_queue.get(block=True) is self._error_listener.sentinel:
self.stop()
sys.exit(EXITCODES.ERROR_OUTPUT_NOT_REACHABLE.value)

Expand Down
1 change: 0 additions & 1 deletion tests/acceptance/test_error_output.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
# pylint: disable=missing-docstring

import json
import re
import tempfile
import uuid
from logging import DEBUG, basicConfig, getLogger
Expand Down
4 changes: 2 additions & 2 deletions tests/unit/charts/test_error_output_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ def test_error_output_config_file_is_set(self):
error_output_config = error_output_config[0]
assert error_output_config["data"]["error-output-config.yaml"] == expected_output_config

def test_deployment_mounts_output_config(self):
def test_deployment_mounts_error_output_config(self):
self.manifests = self.render_chart("logprep", {"error_output": kafka_output_config})
deployment = self.manifests.by_query("kind: Deployment")[0]
volume_mounts = deployment["spec.template.spec.containers"][0]["volumeMounts"]
Expand All @@ -64,7 +64,7 @@ def test_error_output_config_volume_is_populated(self):
assert volume
assert volume["configMap"]["name"] == error_output_config_name

def test_error_output_config_is_used_to_start_logprep(self):
def test_error_output_config_is_used_in_start_command_of_logprep(self):
self.manifests = self.render_chart("logprep", {"error_output": kafka_output_config})
container = self.deployment["spec.template.spec.containers"][0]
volume_mounts = container["volumeMounts"]
Expand Down
26 changes: 1 addition & 25 deletions tests/unit/connector/test_opensearch_output.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,30 +94,6 @@ def test_message_backlog_is_cleared_after_it_was_written(self):
self.object.store({"event": "test_event"})
assert len(self.object._message_backlog) == 0

@pytest.mark.skip(reason="This test is only for local debugging")
def test_opensearch_parallel_bulk(self):
config = {
"type": "opensearch_output",
"hosts": ["localhost:9200"],
"default_index": "default_index",
"error_index": "error_index",
"message_backlog_size": 1,
"timeout": 5000,
}
output: OpensearchOutput = Factory.create({"opensearch_output": config})
uuid_str = str(uuid.uuid4())
result = output._search_context.search(
index="defaultindex", body={"query": {"match": {"foo": uuid_str}}}
)
len_before = len(result["hits"]["hits"])
output._message_backlog = [{"foo": uuid_str, "_index": "defaultindex"}]
output._write_backlog()
time.sleep(1)
result = output._search_context.search(
index="defaultindex", body={"query": {"match": {"foo": uuid_str}}}
)
assert len(result["hits"]["hits"]) > len_before

@mock.patch(
"logprep.connector.opensearch.output.OpensearchOutput._search_context",
new=mock.MagicMock(),
Expand Down Expand Up @@ -172,7 +148,7 @@ def test_write_backlog_clears_message_backlog_on_failure(self):
assert len(self.object._message_backlog) == 0, "Message backlog should be cleared"

def test_write_backlog_clears_failed_on_success(self):
self.object._message_backlog = [{"some": "event"}]
self.object._failed = [{"some": "event"}]
self.object._write_backlog()
assert len(self.object._failed) == 0, "temporary failed backlog should be cleared"

Expand Down
4 changes: 2 additions & 2 deletions tests/unit/connector/test_real_kafka.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,15 +30,15 @@ def setup_module():
@pytest.mark.skipif(in_ci, reason="requires kafka")
class TestKafkaConnection:
def get_topic_partition_size(self, topic_partition: TopicPartition) -> int:
time.sleep(1) # nosemgrep
time.sleep(1)
consumer = Consumer(kafka_config | {"group.id": str(uuid.uuid4())})
lowwater, highwater = consumer.get_watermark_offsets(topic_partition)
consumer.close()
return highwater - lowwater

def wait_for_topic_creation(self):
while self.topic_name not in self.admin.list_topics().topics:
time.sleep(2) # nosemgrep
time.sleep(2)

def setup_method(self):
self.admin = AdminClient(kafka_config)
Expand Down
2 changes: 0 additions & 2 deletions tests/unit/exceptions/test_processing_exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,6 @@
# pylint: disable=protected-access
# pylint: disable=line-too-long

from unittest import mock

from logprep.processor.base.exceptions import (
FieldExistsWarning,
ProcessingCriticalError,
Expand Down
5 changes: 2 additions & 3 deletions tests/unit/framework/test_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ def delete_event(event) -> ProcessorResult:
last_processor.process.assert_not_called()
assert not event, "event was deleted"

def test_not_empty_documents_are_stored_in_the_output(self, _):
def test_documents_are_stored_in_the_output(self, _):
self.pipeline._setup()
self.pipeline._input.get_next.return_value = {"message": "test"}
self.pipeline._store_event = mock.MagicMock()
Expand Down Expand Up @@ -811,7 +811,6 @@ def test_pipeline_hmac_error_was_send_to_error_queue(self):
b'{"applyrule":"yes"}',
)
pipeline.enqueue_error.assert_called_with(expected_error)
# assert result.event["hmac"]["hmac"] == "error"

def test_pipeline_run_raises_assertion_when_run_without_input(self):
self.config.input = {}
Expand Down Expand Up @@ -947,7 +946,7 @@ def test_sets_attributes(self, parameters, error, message):
assert pipeline_result.pipeline == parameters["pipeline"]
assert isinstance(pipeline_result.results[0], ProcessorResult)

def test_pipeline_result_produces_results(self, mock_processor):
def test_pipeline_result_instantiation_produces_results(self, mock_processor):
pipeline_result = PipelineResult(
event={"some": "event"},
pipeline=[
Expand Down
6 changes: 3 additions & 3 deletions tests/unit/framework/test_pipeline_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
)
from logprep.metrics.exporter import PrometheusExporter
from logprep.util.configuration import Configuration, MetricsConfig
from logprep.util.defaults import DEFAULT_LOG_CONFIG
from logprep.util.defaults import DEFAULT_LOG_CONFIG, EXITCODES
from logprep.util.logging import logqueue
from tests.testdata.metadata import path_to_config

Expand Down Expand Up @@ -307,7 +307,7 @@ def test_setup_error_queue_raises_system_exit_if_error_listener_fails(self):
with mock.patch("logprep.framework.pipeline_manager.ComponentQueueListener"):
with mock.patch("logprep.framework.pipeline_manager.ThrottlingQueue.get") as mock_get:
mock_get.return_value = None
with pytest.raises(SystemExit, match="4"):
with pytest.raises(SystemExit, match=EXITCODES.ERROR_OUTPUT_NOT_REACHABLE.value):
PipelineManager(self.config)

def test_stop_calls_stop_on_error_listener(self):
Expand Down Expand Up @@ -564,7 +564,7 @@ def test_listen_setups_component(self):
listener._listen()
mock_setup.assert_called()

def testget_component_instance_raises_if_setup_not_successful(self):
def test_get_component_instance_raises_if_setup_not_successful(self):
target = "store"
output_config = {"random_name": {"type": "dummy_output"}}
queue = ThrottlingQueue(multiprocessing.get_context(), 100)
Expand Down
1 change: 0 additions & 1 deletion tests/unit/util/test_configuration.py
Original file line number Diff line number Diff line change
Expand Up @@ -340,7 +340,6 @@ def test_verify_passes_for_valid_configuration(self):
),
("verifies input config", {"input": {"random_name": {"type": "UNKNOWN"}}}, 1),
("verifies output config", {"output": {"kafka_output": {"type": "UNKNOWN"}}}, 1),
("verifies error_output config", {"output": {"kafka_output": {"type": "UNKNOWN"}}}, 1),
(
"multiple outputs but one config failure",
{
Expand Down

0 comments on commit a4a1b23

Please sign in to comment.