Skip to content

Commit

Permalink
WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
ekneg54 committed Oct 13, 2023
1 parent a94cb66 commit d8f03fa
Show file tree
Hide file tree
Showing 6 changed files with 42 additions and 24 deletions.
2 changes: 2 additions & 0 deletions logprep/abc/component.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ class Metrics:
)
"""Number of events that were send to error output"""

_processing_time_per_event_target: Callable = field(default=None)
processing_time_per_event: HistogramMetric = field(
factory=lambda: HistogramMetric(
description="Time in seconds that it took to process an event",
Expand All @@ -57,6 +58,7 @@ def __attrs_post_init__(self):
attribute = getattr(self, attribute)
if isinstance(attribute, Metric):
attribute.labels = self._labels
attribute.target = self._processing_time_per_event_target
attribute.tracker = attribute.init_tracker()

# __dict__ is added to support functools.cached_property
Expand Down
4 changes: 3 additions & 1 deletion logprep/abc/processor.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
"""Abstract module for processors"""
import time
from abc import abstractmethod
from functools import reduce
from functools import partial, reduce
from logging import Logger
from pathlib import Path
from typing import TYPE_CHECKING, List, Optional
Expand Down Expand Up @@ -166,9 +166,11 @@ def process(self, event: dict):
self._process_rule_tree(event, self._specific_tree)
self._process_rule_tree(event, self._generic_tree)

@TimeMeasurement.measure_time("RuleTree processing")
def _process_rule_tree(self, event: dict, tree: "RuleTree"):
applied_rules = set()

@TimeMeasurement.measure_time("Rule processing")
def _process_rule(event, rule):
begin = time.time()
self._apply_rules_wrapper(event, rule)
Expand Down
37 changes: 20 additions & 17 deletions logprep/framework/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -129,11 +129,6 @@ class Pipeline:
class Metrics(Component.Metrics):
"""Tracks statistics about a pipeline"""

input: Connector.ConnectorMetrics
"""Input metrics"""
output: List[Connector.ConnectorMetrics]
"""Output metrics"""
kafka_offset: int = 0
"""The current offset of the kafka input reader"""
mean_processing_time_per_event: float = 0.0
"""Mean processing time for one event"""
Expand Down Expand Up @@ -193,6 +188,9 @@ def __init__(
self.pipeline_index = pipeline_index
self._encoder = msgspec.msgpack.Encoder()
self._decoder = msgspec.msgpack.Decoder()
self.metrics = self.Metrics(
labels=self.metric_labels,
)

@cached_property
def _process_name(self) -> str:
Expand All @@ -210,17 +208,6 @@ def metric_labels(self) -> dict:
"""Return the metric labels for this component."""
return {"component": "pipeline"}

@cached_property
def metrics(self) -> Metrics:
"""The pipeline metrics object"""
if self._prometheus_exporter is None:
return None
return self.Metrics(
input=self._input.metrics,
output=[self._output.get(output).metrics for output in self._output],
labels=self.metric_labels,
)

@cached_property
def _pipeline(self) -> tuple:
self.logger.debug(f"Building '{self._process_name}'")
Expand Down Expand Up @@ -349,9 +336,25 @@ def _get_event(self) -> dict:
pass
return event

@TimeMeasurement.measure_time("pipeline")
@TimeMeasurement.measure_time(name="process_event")
def process_event(self, event: dict):
"""process all processors for one event"""

"""
ToDos:
- TimeTracking
- Processor Specific Metrics (Pseudonymizer, Amides, DomainResolver)
- Fix Pseudonymizer str has no match
- count number warnings/errors separatley or delete them from all metrics?
- Tests
- delete metric exposer
- delete SharedCounter (Events in last 5 min: n)
- create Grafana Dashboards
- add pipelinemanager metrics (pipeline restarts)
- clean up PrometheusExporter ("remove stale metric files" stil needed?)
-
"""

event_received = self._encoder.encode(event)
extra_outputs = []
for processor in self._pipeline:
Expand Down
5 changes: 4 additions & 1 deletion logprep/metrics/metrics.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
"""This module tracks, calculates, exposes and resets logprep metrics"""
from abc import ABC, abstractmethod
from typing import Callable

from attr import asdict, define, field, validators
from prometheus_client import Counter, Histogram
Expand Down Expand Up @@ -49,9 +50,11 @@ class Metric(ABC):
default={},
)
tracker: object = field(default=None)
target: Callable = field(default=None)
_prefix: str = "logprep_"

def init_tracker(self):
tracker = None
if isinstance(self, CounterMetric):
tracker = Counter(
name=f"{self._prefix}{self.name}",
Expand All @@ -64,7 +67,7 @@ def init_tracker(self):
name=f"{self._prefix}{self.name}",
documentation=self.description,
labelnames=self.labels.keys(),
buckets=(0.000001, 0.00001, 0.0001, 0.001, 0.01, 0.1, 1, float("inf")),
buckets=(0.000001, 0.00001, 0.0001, 0.001, 0.01, 0.1, 1),
registry=None,
)
tracker.labels(**self.labels)
Expand Down
3 changes: 2 additions & 1 deletion logprep/run_logprep.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,8 @@ def _run_logprep(arguments, logger: logging.Logger):
logger.critical(f"A critical error occurred: {error}")
if runner:
runner.stop()
sys.exit(1)
raise error
# sys.exit(1)
# pylint: enable=broad-except


Expand Down
15 changes: 11 additions & 4 deletions logprep/util/time_measurement.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,15 @@
"""This module is used to measure the execution time of functions and add the results to events."""

import logging
from socket import gethostname
from time import time
from typing import TYPE_CHECKING

from logprep.util.helper import camel_to_snake

if TYPE_CHECKING:
from logprep.processor.base.rule import Rule


class TimeMeasurement:
"""Measures the execution time of functions and adds the results to events via a decorator."""
Expand All @@ -29,15 +34,17 @@ def inner(*args, **kwargs): # nosemgrep
if TimeMeasurement.TIME_MEASUREMENT_ENABLED:
caller = args[0]
first_argument = args[1]
second_argument = args[2] if len(args) > 2 else None
begin = time()
result = func(*args, **kwargs)
end = time()

processing_time = end - begin

if hasattr(caller, "metrics"):
if hasattr(caller.metrics, "update_mean_processing_time_per_event"):
caller.metrics.update_mean_processing_time_per_event(processing_time)
if name in ("Rule processing",):
caller = first_argument
if name in ("RuleTree processing",):
caller = second_argument
caller.metrics.processing_time_per_event += processing_time

if TimeMeasurement.APPEND_TO_EVENT and isinstance(first_argument, dict):
add_processing_times_to_event(first_argument, processing_time, caller, name)
Expand Down

0 comments on commit d8f03fa

Please sign in to comment.