Skip to content

Commit

Permalink
fix acceptance tests for prometheus metrics
Browse files Browse the repository at this point in the history
  • Loading branch information
ekneg54 committed Oct 24, 2023
1 parent 01fd3d8 commit 866593c
Show file tree
Hide file tree
Showing 4 changed files with 52 additions and 61 deletions.
2 changes: 2 additions & 0 deletions logprep/connector/s3/output.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@
)

from logprep.abc.output import Output
from logprep.metrics.metrics import Metric
from logprep.util.helper import get_dotted_field_value
from logprep.util.time import TimeParser

Expand Down Expand Up @@ -186,6 +187,7 @@ def _add_dates(self, prefix):
prefix = re.sub(date_format_match, formatted_date, prefix)
return prefix

@Metric.measure_time()
def _write_to_s3_resource(self, document: dict, prefix: str):
"""Writes a document into s3 bucket using given prefix.
Expand Down
109 changes: 50 additions & 59 deletions tests/acceptance/test_full_configuration.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import os
import re
import tempfile
import time
from pathlib import Path

import requests
Expand All @@ -23,7 +24,7 @@ def teardown_function():


def test_start_of_logprep_with_full_configuration_from_file(tmp_path):
pipeline = get_full_pipeline()
pipeline = get_full_pipeline(exclude=["normalizer"])
config = get_default_logprep_config(pipeline, with_hmac=False)
config.get("output").update({"kafka": {"type": "dummy_output", "default": False}})
config_path = str(tmp_path / "generated_config.yml")
Expand All @@ -42,7 +43,7 @@ def test_start_of_logprep_with_full_configuration_from_file(tmp_path):


def test_start_of_logprep_with_full_configuration_http():
pipeline = get_full_pipeline()
pipeline = get_full_pipeline(exclude=["normalizer"])
config = get_default_logprep_config(pipeline, with_hmac=False)
config.get("output").update({"kafka": {"type": "dummy_output", "default": False}})
endpoint = "http://localhost:32000"
Expand Down Expand Up @@ -121,7 +122,8 @@ def test_logprep_exposes_prometheus_metrics(tmp_path):
input_file_path.touch()
# requester is excluded because it tries to connect to non-existing server
# selective_extractor is excluded because of output mismatch (rules expect kafka as output)
pipeline = get_full_pipeline(exclude=["requester", "selective_extractor"])
# normalizer is excluded because of deprecation
pipeline = get_full_pipeline(exclude=["requester", "selective_extractor", "normalizer"])
config = get_default_logprep_config(pipeline, with_hmac=False)
config |= {
"metrics": {
Expand Down Expand Up @@ -160,64 +162,53 @@ def test_logprep_exposes_prometheus_metrics(tmp_path):
assert "critical" not in output.lower(), "error message"
assert "exception" not in output.lower(), "error message"
assert "error" not in output.lower(), "error message"
if "Started exposing metrics" in output:
if "Finished building pipeline" in output:
break
response = requests.get("http://127.0.0.1:8000", timeout=0.1)
response = requests.get("http://127.0.0.1:8000", timeout=5)
response.raise_for_status()
metrics = response.text
connector_name_type_tuples = [
("fileinput", "file_input"),
("kafka", "console_output"),
("second_output", "console_output"),
expected_metrics = [
r"logprep_number_of_processed_events_total\{component=\"input\",description=\"FileInput \(fileinput\)\",name=\"fileinput\",type=\"file_input\"}",
r"logprep_number_of_processed_events_total\{component=\"rule\",description=\"id:.+\",name=\".+\",type\=\".+\"}",
r"logprep_number_of_processed_events_total\{component=\"output\",description=\".+\",name=\"kafka\",type=\"console_output\"}",
r"logprep_number_of_processed_events_total\{component=\"output\",description=\".+\",name=\"second_output\",type=\"console_output\"}",
r"logprep_number_of_warnings_total{component=\"rule\",description=\"id:.+\",name=\".+\",type=\".+\"}",
r"logprep_number_of_warnings_total{component=\"input\",description=\".+\",name=\"fileinput\",type=\"file_input\"}",
r"logprep_number_of_warnings_total{component=\"output\",description=\".+\",name=\"kafka\",type=\"console_output\"}",
r"logprep_number_of_warnings_total{component=\"output\",description=\".+\",name=\"second_output\",type=\"console_output\"}",
r"logprep_number_of_errors_total{component=\"rule\",description=\"id:.+\",name=\".+\",type=\".+\"}",
r"logprep_number_of_errors_total{component=\"input\",description=\".+\",name=\"fileinput\",type=\"file_input\"}",
r"logprep_number_of_errors_total{component=\"output\",description=\".+\",name=\"kafka\",type=\"console_output\"}",
r"logprep_number_of_errors_total{component=\"output\",description=\".+\",name=\"second_output\",type=\"console_output\"}",
r"logprep_processing_time_per_event_sum{component=\"rule\",description=\"id:.+\",name=\".+\",type=\".+\"}",
r"logprep_processing_time_per_event_count{component=\"rule\",description=\"id:.+\",name=\".+\",type=\".+\"}",
r"logprep_processing_time_per_event_bucket{component=\"rule\",description=\"id:.+\",name=\".+\",type=\".+\"}",
r"logprep_processing_time_per_event_sum{component=\"input\",description=\".+\",name=\"fileinput\",type=\"file_input\"}",
r"logprep_processing_time_per_event_count{component=\"input\",description=\".+\",name=\"fileinput\",type=\"file_input\"}",
r"logprep_processing_time_per_event_bucket{component=\"input\",description=\".+\",name=\"fileinput\",type=\"file_input\"}",
r"logprep_processing_time_per_event_sum{component=\"output\",description=\".+\",name=\"kafka\",type=\"console_output\"}",
r"logprep_processing_time_per_event_count{component=\"output\",description=\".+\",name=\"kafka\",type=\"console_output\"}",
r"logprep_processing_time_per_event_bucket{component=\"output\",description=\".+\",name=\"kafka\",type=\"console_output\"}",
r"logprep_processing_time_per_event_sum{component=\"output\",description=\".+\",name=\"second_output\",type=\"console_output\"}",
r"logprep_processing_time_per_event_count{component=\"output\",description=\".+\",name=\"second_output\",type=\"console_output\"}",
r"logprep_processing_time_per_event_bucket{component=\"output\",description=\".+\",name=\"second_output\",type=\"console_output\"}",
r"logprep_domain_resolver_total_urls_total",
r"logprep_domain_resolver_resolved_new_total",
r"logprep_domain_resolver_resolved_cached_total",
r"logprep_domain_resolver_timeouts_total",
r"logprep_pseudonymizer_pseudonymized_urls_total",
r"logprep_amides_total_cmdlines_total",
r"logprep_amides_new_results",
r"logprep_amides_cached_results",
r"logprep_amides_num_cache_entries",
r"logprep_amides_cache_load",
r"logprep_amides_mean_misuse_detection_time_sum",
r"logprep_amides_mean_misuse_detection_time_count",
r"logprep_amides_mean_misuse_detection_time_bucket",
r"logprep_amides_mean_rule_attribution_time_sum",
r"logprep_amides_mean_rule_attribution_time_count",
r"logprep_amides_mean_rule_attribution_time_bucket",
]
for name, connector_type in connector_name_type_tuples:
assert re.search(
rf"logprep_connector_number_of_processed_events.*{name}.*{connector_type}.* 1\.0",
metrics,
)
assert re.search(
rf"logprep_connector_mean_processing_time_per_event.*{name}.*{connector_type}.* \d\..*",
metrics,
)
assert re.search(
rf"logprep_connector_number_of_warnings.*{name}.*{connector_type}.* 0\.0", metrics
)
assert re.search(
rf"logprep_connector_number_of_errors.*{name}.*{connector_type}.* 0\.0", metrics
)

processor_names = [list(p.keys())[0] for p in config.get("pipeline")]
for processor_name in processor_names:
assert re.search(
rf"logprep_processor_number_of_processed_events.*{processor_name}.* 1\.0", metrics
)
assert re.search(
rf"logprep_processor_mean_processing_time_per_event.*{processor_name}.* \d\..*", metrics
)
assert re.search(rf"logprep_processor_number_of_warnings.*{processor_name}.* 0\.0", metrics)
assert re.search(rf"logprep_processor_number_of_errors.*{processor_name}.* 0\.0", metrics)
assert re.search(rf"logprep_number_of_rules.*{processor_name}.*generic.* \d\.0", metrics)
assert re.search(rf"logprep_number_of_rules.*{processor_name}.*specific.* \d\.0", metrics)
assert re.search(rf"logprep_number_of_matches.*{processor_name}.*specific.* \d\.0", metrics)
assert re.search(rf"logprep_number_of_matches.*{processor_name}.*generic.* \d\.0", metrics)
assert re.search(
rf"logprep_mean_processing_time.*{processor_name}.*specific.* \d\..*", metrics
)
assert re.search(
rf"logprep_mean_processing_time.*{processor_name}.*generic.* \d\..*", metrics
)

assert re.search("logprep_processor_total_urls.*domain_resolver.* 0\.0", metrics)
assert re.search("logprep_processor_resolved_new.*domain_resolver.* 0\.0", metrics)
assert re.search("logprep_processor_resolved_cached.*domain_resolver.* 0\.0", metrics)
assert re.search("logprep_processor_timeouts.*domain_resolver.* 0\.0", metrics)
assert re.search("logprep_processor_pseudonymized_urls.*pseudonymizer.* 0\.0", metrics)

assert re.search(
r"logprep_pipeline_mean_processing_time_per_event.*pipeline-1.* \d\..*", metrics
)
assert re.search("logprep_pipeline_number_of_processed_events.*pipeline-1.* 1\.0", metrics)
assert re.search("logprep_pipeline_sum_of_processor_warnings.*pipeline-1.* 0\.0", metrics)
assert re.search("logprep_pipeline_sum_of_processor_errors.*pipeline-1.* 0\.0", metrics)

for expeced_metric in expected_metrics:
assert re.search(expeced_metric, metrics), f"Metric {expeced_metric} not found in metrics"
proc.kill()
1 change: 0 additions & 1 deletion tests/unit/processor/normalizer/test_normalizer.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
from tests.unit.processor.base import BaseProcessorTestCase


@pytest.skip("deprecated", allow_module_level=True)
class TestNormalizer(BaseProcessorTestCase):
CONFIG = {
"type": "normalizer",
Expand Down
1 change: 0 additions & 1 deletion tests/unit/processor/normalizer/test_normalizer_rule.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ def fixture_specific_rule_definition():
}


@pytest.skip("deprecated", allow_module_level=True)
class TestNormalizerRule:
@pytest.mark.parametrize(
"testcase, other_rule_definition, is_equal",
Expand Down

0 comments on commit 866593c

Please sign in to comment.