Skip to content

Commit

Permalink
WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
ekneg54 committed Oct 16, 2023
1 parent a68bd66 commit 35afb4d
Show file tree
Hide file tree
Showing 13 changed files with 204 additions and 298 deletions.
23 changes: 0 additions & 23 deletions logprep/abc/component.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,30 +28,7 @@ class Metrics:

_labels: dict

number_of_processed_events: CounterMetric = field(
factory=lambda: CounterMetric(
description="Number of events that were processed",
name="number_of_processed_events",
)
)
"""Number of events that were processed"""

number_of_failed_events: CounterMetric = field(
factory=lambda: CounterMetric(
description="Number of events that were send to error output",
name="number_of_failed_events",
)
)
"""Number of events that were send to error output"""

_processing_time_per_event_target: Callable = field(default=None)
processing_time_per_event: HistogramMetric = field(
factory=lambda: HistogramMetric(
description="Time in seconds that it took to process an event",
name="processing_time_per_event",
)
)
"""Time in seconds that it took to process an event"""

def __attrs_post_init__(self):
for attribute in asdict(self):
Expand Down
46 changes: 28 additions & 18 deletions logprep/abc/connector.py
Original file line number Diff line number Diff line change
@@ -1,36 +1,46 @@
""" abstract module for connectors"""
from attr import define
from attrs import define, field

from logprep.abc.component import Component
from logprep.metrics.metrics import calculate_new_average
from logprep.metrics.metrics import CounterMetric, HistogramMetric


class Connector(Component):
"""Abstract Connector Class to define the Interface"""

@define(kw_only=True)
class ConnectorMetrics(Component.Metrics):
class Metrics(Component.Metrics):
"""Tracks statistics about this connector"""

mean_processing_time_per_event: float = 0.0
"""Mean processing time for one event"""
_mean_processing_time_sample_counter: int = 0
"""Helper to calculate mean processing time"""
number_of_processed_events: CounterMetric = field(
factory=lambda: CounterMetric(
description="Number of events that were processed",
name="number_of_processed_events",
)
)
"""Number of events that were processed"""

number_of_failed_events: CounterMetric = field(
factory=lambda: CounterMetric(
description="Number of events that were send to error output",
name="number_of_failed_events",
)
)
"""Number of events that were send to error output"""

processing_time_per_event: HistogramMetric = field(
factory=lambda: HistogramMetric(
description="Time in seconds that it took to process an event",
name="processing_time_per_event",
)
)
"""Time in seconds that it took to process an event"""

number_of_warnings: int = 0
"""Number of warnings that occurred while processing events"""
number_of_errors: int = 0
"""Number of errors that occurred while processing events"""

def update_mean_processing_time_per_event(self, new_sample):
"""Updates the mean processing time per event"""
new_avg, new_sample_counter = calculate_new_average(
self.mean_processing_time_per_event,
new_sample,
self._mean_processing_time_sample_counter,
)
self.mean_processing_time_per_event = new_avg
self._mean_processing_time_sample_counter = new_sample_counter

__slots__ = ["metrics"]

metrics: ConnectorMetrics
metrics: Metrics
36 changes: 28 additions & 8 deletions logprep/abc/processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,15 @@
import time
from abc import abstractmethod
from functools import partial, reduce
from logging import Logger
from logging import DEBUG, Logger
from pathlib import Path
from typing import TYPE_CHECKING, List, Optional

from attr import define, field, validators

from logprep.abc.component import Component
from logprep.framework.rule_tree.rule_tree import RuleTree, RuleTreeType
from logprep.metrics.metrics import HistogramMetric
from logprep.processor.base.exceptions import (
FieldExistsWarning,
ProcessingCriticalError,
Expand Down Expand Up @@ -73,9 +74,17 @@ class Config(Component.Config):
class Metrics(Component.Metrics):
"""Tracks statistics about this processor"""

mean_processing_time_per_event: float = 0.0
"""Mean processing time for one event"""
_mean_processing_time_sample_counter: int = 0
number_of_processed_events = field(default=None)
number_of_failed_events = field(default=None)

processing_time_per_event: HistogramMetric = field(
factory=lambda: HistogramMetric(
description="Time in seconds that it took to process an event",
name="processing_time_per_event",
)
)
"""Time in seconds that it took to process an event"""

number_of_warnings: int = 0
"""Number of warnings that occurred while processing events"""
number_of_errors: int = 0
Expand Down Expand Up @@ -151,7 +160,6 @@ def metric_labels(self) -> dict:
"""Return the metric labels for this component."""
return super().metric_labels | {"component": "processor", "type": self._config.type}

@TimeMeasurement.measure_time()
def process(self, event: dict):
"""Process a log event by calling the implemented `process` method of the
strategy object set in `_strategy` attribute.
Expand All @@ -162,7 +170,6 @@ def process(self, event: dict):
A dictionary representing a log event.
"""
self.metrics.number_of_processed_events += 1
self._logger.debug(f"{self.describe()} processing event {event}")
self._process_rule_tree(event, self._specific_tree)
self._process_rule_tree(event, self._generic_tree)
Expand All @@ -172,7 +179,7 @@ def _process_rule_tree(self, event: dict, tree: "RuleTree"):
applied_rules = set()

@TimeMeasurement.measure_time("Rule processing")
def _process_rule(event, rule):
def _process_rule(_, event, rule):
begin = time.time()
self._apply_rules_wrapper(event, rule)
processing_time = time.time() - begin
Expand All @@ -184,7 +191,7 @@ def _process_rule(event, rule):
if self._config.apply_multiple_times:
matching_rules = tree.get_matching_rules(event)
while matching_rules:
reduce(_process_rule, (event, *matching_rules))
reduce(_process_rule, (None, event, *matching_rules))
matching_rules = set(tree.get_matching_rules(event)).difference(applied_rules)
else:
reduce(_process_rule, (event, *tree.get_matching_rules(event)))
Expand Down Expand Up @@ -262,6 +269,11 @@ def load_rules(self, specific_rules_targets: List[str], generic_rules_targets: L
rules = self.rule_class.create_rules_from_target(generic_rules_target)
for rule in rules:
self._generic_tree.add_rule(rule, self._logger)
if self._logger.isEnabledFor(DEBUG): # pragma: no cover
number_specific_rules = self._specific_tree.number_of_rules
self._logger.debug(f"{self.describe()} loaded {number_specific_rules} specific rules")
number_generic_rules = self._generic_tree.number_of_rules
self._logger.debug(f"{self.describe()} loaded {number_generic_rules} generic rules")

@staticmethod
def _field_exists(event: dict, dotted_field: str) -> bool:
Expand Down Expand Up @@ -292,6 +304,14 @@ def _has_missing_values(self, event, rule, source_field_dict):
dict(filter(lambda x: x[1] in [None, ""], source_field_dict.items())).keys()
)
if missing_fields:
if rule.ignore_missing_fields:
return True
if rule.ignore_missing_fields:
return True
if rule.ignore_missing_fields:
return True
if rule.ignore_missing_fields:
return True
if rule.ignore_missing_fields:
return True
error = BaseException(f"{self.name}: no value for fields: {missing_fields}")
Expand Down
2 changes: 1 addition & 1 deletion logprep/connector/confluent_kafka/input.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ class ConfluentKafkaInput(Input):
"""A kafka input connector."""

@define(kw_only=True, slots=False)
class ConnectorMetrics(Input.ConnectorMetrics):
class Metrics(Input.Metrics):
"""Metrics for ConfluentKafkaInput"""

_prefix = "logprep_connector_input_kafka_"
Expand Down
2 changes: 1 addition & 1 deletion logprep/connector/confluent_kafka/output.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ class ConfluentKafkaOutput(Output):
"""A kafka connector that serves as output connector."""

@define(kw_only=True, slots=False)
class Metrics(Output.ConnectorMetrics):
class Metrics(Output.Metrics):
"""Metrics for ConfluentKafkaOutput"""

_prefix = "logprep_connector_output_kafka_"
Expand Down
18 changes: 13 additions & 5 deletions logprep/framework/rule_tree/rule_tree.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
from logprep.abc.component import Component
from logprep.framework.rule_tree.node import Node
from logprep.framework.rule_tree.rule_parser import RuleParser
from logprep.metrics.metrics import HistogramMetric
from logprep.util import getter

if TYPE_CHECKING:
Expand All @@ -34,6 +35,13 @@ class Metrics(Component.Metrics):

number_of_processed_events = field(default=None)
number_of_failed_events = field(default=None)
processing_time_per_event: HistogramMetric = field(
factory=lambda: HistogramMetric(
description="Time in seconds that it took to process an event",
name="processing_time_per_event",
)
)
"""Time in seconds that it took to process an event"""

__slots__ = (
"rule_parser",
Expand All @@ -45,7 +53,6 @@ class Metrics(Component.Metrics):
"_processor_type",
"_processor_name",
"_root",
"_number_of_rules"
)

rule_parser: Optional[RuleParser]
Expand All @@ -57,7 +64,6 @@ class Metrics(Component.Metrics):
_processor_config: "Processor.Config"
_processor_type: str
_root: Node
_number_of_rules: int

def __init__(
self,
Expand Down Expand Up @@ -88,7 +94,6 @@ def __init__(
self._processor_type = processor_config.type if processor_name is not None else ""
self._setup()
self.metrics = self.Metrics(labels=self.metric_labels)
self._number_of_rules = 0

if root:
self._root = root
Expand All @@ -105,6 +110,10 @@ def metric_labels(self) -> dict:
"processor_type": self._processor_type,
}

@property
def number_of_rules(self) -> int:
return len(self._rule_mapping)

def _setup(self):
"""Basic setup of rule tree.
Expand Down Expand Up @@ -148,12 +157,11 @@ def add_rule(self, rule: "Rule", logger: Logger = None):
f"\nIgnore and continue with next rule."
)
return
self._number_of_rules += 1
for rule_segment in parsed_rule:
end_node = self._add_parsed_rule(rule_segment)
if rule not in end_node.matching_rules:
end_node.matching_rules.append(rule)
self._rule_mapping[rule] = self._number_of_rules - 1
self._rule_mapping[rule] = self.number_of_rules

def _add_parsed_rule(self, parsed_rule: list):
"""Add parsed rule to rule tree.
Expand Down
67 changes: 30 additions & 37 deletions logprep/metrics/metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,7 @@
from typing import Callable

from attr import asdict, define, field, validators
from prometheus_client import Counter, Histogram

LOGPREP_REGISTRY = None # to inject a custom registry
from prometheus_client import CollectorRegistry, Counter, Histogram


def is_public(attribute, _):
Expand Down Expand Up @@ -51,62 +49,57 @@ class Metric(ABC):
],
default={},
)
tracker: object = field(default=None)
trackers: dict = {}
target: Callable = field(default=None)
_registry: CollectorRegistry = field(default=None)
_prefix: str = "logprep_"

@property
def fullname(self):
return f"{self._prefix}{self.name}"

def init_tracker(self):
tracker = None
if isinstance(self, CounterMetric):
tracker = Counter(
name=f"{self._prefix}{self.name}",
documentation=self.description,
labelnames=self.labels.keys(),
registry=LOGPREP_REGISTRY,
)
if isinstance(self, HistogramMetric):
tracker = Histogram(
name=f"{self._prefix}{self.name}",
documentation=self.description,
labelnames=self.labels.keys(),
buckets=(0.000001, 0.00001, 0.0001, 0.001, 0.01, 0.1, 1),
registry=LOGPREP_REGISTRY,
)
tracker.labels(**self.labels)
self.tracker = tracker
try:
if isinstance(self, CounterMetric):
tracker = Counter(
name=self.fullname,
documentation=self.description,
labelnames=self.labels.keys(),
registry=self._registry,
)
if isinstance(self, HistogramMetric):
tracker = Histogram(
name=self.fullname,
documentation=self.description,
labelnames=self.labels.keys(),
buckets=(0.000001, 0.00001, 0.0001, 0.001, 0.01, 0.1, 1),
registry=self._registry,
)
tracker.labels(**self.labels)

self.trackers.update({self.fullname: tracker})
except ValueError:
self.trackers.get(self.fullname).labels(**self.labels)

@abstractmethod
def __add__(self, other):
"""Add"""

@abstractmethod
def __eq__(self, __value: object) -> bool:
"""Equal"""


@define(kw_only=True)
class CounterMetric(Metric):
@property
def _value(self) -> int:
return int(self.tracker.collect()[-1].samples[0].value)

def __add__(self, other):
self.tracker.labels(**self.labels).inc(other)
self.trackers.get(self.fullname).labels(**self.labels).inc(other)
return self

def __eq__(self, __value: object) -> bool:
return self._value == int(__value)


@define(kw_only=True)
class HistogramMetric(Metric):
def __add__(self, other):
self.tracker.labels(**self.labels).observe(other)
self.trackers.get(self.fullname).labels(**self.labels).observe(other)
return self

def __eq__(self, __value: object) -> bool:
raise NotImplementedError


def calculate_new_average(current_average, next_sample, sample_counter):
"""Calculate a new average by combining a new sample with a sample counter"""
Expand Down
Loading

0 comments on commit 35afb4d

Please sign in to comment.