Skip to content

Commit

Permalink
WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
ekneg54 committed Oct 12, 2023
1 parent 30eb79d commit cb2338b
Show file tree
Hide file tree
Showing 13 changed files with 81 additions and 54 deletions.
4 changes: 2 additions & 2 deletions logprep/abc/component.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ class Metrics:
number_of_processed_events: Metric = Metric(
type=MetricType.COUNTER,
description="",
labels={"pipeline": "1"},
name=f"{_prefix}number_of_processed_events",
)
"""Number of events that were processed by the processor"""
Expand All @@ -41,7 +40,8 @@ def __attrs_post_init__(self):
for attribute in asdict(self):
attribute = getattr(self, attribute)
if isinstance(attribute, Metric):
attribute.labels |= self._labels
attribute.labels = self._labels
attribute.labels |= {"component": type(self).__module__.split(".")[-1]}
attribute.tracker = attribute.type(
name=attribute.name,
documentation=attribute.description,
Expand Down
2 changes: 1 addition & 1 deletion logprep/abc/input.py
Original file line number Diff line number Diff line change
Expand Up @@ -290,7 +290,7 @@ def get_next(self, timeout: float) -> Tuple[Optional[dict], Optional[str]]:
self._add_arrival_timedelta_information_to_event(event)
if self._add_env_enrichment:
self._add_env_enrichment_to_event(event)
self.metrics.number_of_processed_events += 1
# self.metrics.number_of_processed_events += 1
return event, non_critical_error_msg

def batch_finished_callback(self):
Expand Down
33 changes: 17 additions & 16 deletions logprep/abc/processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
from attr import define, field, validators

from logprep.abc.component import Component
from logprep.framework.rule_tree.rule_tree import RuleTree
from logprep.framework.rule_tree.rule_tree import RuleTree, RuleTreeType
from logprep.processor.base.exceptions import (
FieldExistsWarning,
ProcessingCriticalError,
Expand Down Expand Up @@ -92,7 +92,6 @@ class Metrics(Component.Metrics):
"rule_class",
"has_custom_tests",
"metrics",
"metric_labels",
"_event",
"_specific_tree",
"_generic_tree",
Expand All @@ -101,20 +100,22 @@ class Metrics(Component.Metrics):
rule_class: "Rule"
has_custom_tests: bool
metrics: "Processor.Metrics"
metric_labels: dict
_event: dict
_specific_tree: RuleTree
_generic_tree: RuleTree
_strategy = None

def __init__(self, name: str, configuration: "Processor.Config", logger: Logger):
super().__init__(name, configuration, logger)
self.metric_labels, specific_tree_labels, generic_tree_labels = self._create_metric_labels()
self._specific_tree = RuleTree(
config_path=self._config.tree_config, metric_labels=specific_tree_labels
processor_name=self.name,
processor_config=self._config,
rule_tree_type=RuleTreeType.SPECIFIC,
)
self._generic_tree = RuleTree(
config_path=self._config.tree_config, metric_labels=generic_tree_labels
processor_name=self.name,
processor_config=self._config,
rule_tree_type=RuleTreeType.GENERIC,
)
self.load_rules(
generic_rules_targets=self._config.generic_rules,
Expand All @@ -127,16 +128,6 @@ def __init__(self, name: str, configuration: "Processor.Config", logger: Logger)
)
self.has_custom_tests = False

def _create_metric_labels(self):
"""Reads out the metrics from the configuration and sets up labels for the rule trees"""
metric_labels = self._config.metric_labels
metric_labels.update({"processor": self.name})
specif_tree_labels = copy.deepcopy(metric_labels)
specif_tree_labels.update({"rule_tree": "specific"})
generic_tree_labels = copy.deepcopy(metric_labels)
generic_tree_labels.update({"rule_tree": "generic"})
return metric_labels, specif_tree_labels, generic_tree_labels

@property
def _specific_rules(self):
"""Returns all specific rules
Expand Down Expand Up @@ -167,6 +158,16 @@ def rules(self):
"""
return [*self._generic_rules, *self._specific_rules]

@property
def metric_labels(self) -> dict:
"""Returns the metric labels
Returns
-------
metric_labels: dict
"""
return {"type": self._config.type, "name": self.name}

@TimeMeasurement.measure_time()
def process(self, event: dict):
"""Process a log event by calling the implemented `process` method of the
Expand Down
2 changes: 1 addition & 1 deletion logprep/connector/confluent_kafka/output.py
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ def store(self, document: dict) -> Optional[bool]:
configured input
"""
self.store_custom(document, self._config.topic)
self.metrics.number_of_processed_events += 1
# self.metrics.number_of_processed_events += 1
if self.input_connector:
self.input_connector.batch_finished_callback()

Expand Down
2 changes: 1 addition & 1 deletion logprep/connector/console/output.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ class ConsoleOutput(Output):

def store(self, document: dict):
pprint(document)
self.metrics.number_of_processed_events += 1
# self.metrics.number_of_processed_events += 1
if self.input_connector:
self.input_connector.batch_finished_callback()

Expand Down
4 changes: 2 additions & 2 deletions logprep/connector/dummy/output.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
from logging import Logger
from typing import List

from attr import field, define
from attr import define, field
from attrs import validators

from logprep.abc.output import Output
Expand Down Expand Up @@ -85,7 +85,7 @@ def store(self, document: dict):
if exception is not None:
raise Exception(exception)
self.events.append(document)
self.metrics.number_of_processed_events += 1
# self.metrics.number_of_processed_events += 1
if self.input_connector:
self.input_connector.batch_finished_callback()

Expand Down
6 changes: 3 additions & 3 deletions logprep/connector/elasticsearch/output.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@
import ssl
from functools import cached_property
from logging import Logger
from typing import List, Optional, Tuple, Union, Pattern
from typing import List, Optional, Pattern, Tuple, Union

import elasticsearch as search
from attr import define, field
Expand Down Expand Up @@ -210,7 +210,7 @@ def store(self, document: dict):
document = self._build_failed_index_document(document, "Missing index in document")

self._add_dates(document)
self.metrics.number_of_processed_events += 1
# self.metrics.number_of_processed_events += 1
self._write_to_search_context(document)

def store_custom(self, document: dict, target: str):
Expand All @@ -230,7 +230,7 @@ def store_custom(self, document: dict, target: str):
"""
document["_index"] = target
self._add_dates(document)
self.metrics.number_of_processed_events += 1
# self.metrics.number_of_processed_events += 1
self._write_to_search_context(document)

def store_failed(self, error_message: str, document_received: dict, document_processed: dict):
Expand Down
2 changes: 1 addition & 1 deletion logprep/connector/jsonl/output.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ def _write_json(filepath: str, line: dict):
def store(self, document: dict):
self.events.append(document)
JsonlOutput._write_json(self._config.output_file, document)
self.metrics.number_of_processed_events += 1
# self.metrics.number_of_processed_events += 1
if self.input_connector:
self.input_connector.batch_finished_callback()

Expand Down
4 changes: 2 additions & 2 deletions logprep/connector/s3/output.py
Original file line number Diff line number Diff line change
Expand Up @@ -248,7 +248,7 @@ def store(self, document: dict):
document : dict
Document to store.
"""
self.metrics.number_of_processed_events += 1
# self.metrics.number_of_processed_events += 1

prefix_value = get_dotted_field_value(document, self._config.prefix_field)
if prefix_value is None:
Expand Down Expand Up @@ -284,7 +284,7 @@ def store_custom(self, document: dict, target: str):
Prefix for the document.
"""
self.metrics.number_of_processed_events += 1
# self.metrics.number_of_processed_events += 1
self._write_to_s3_resource(document, target)

def store_failed(self, error_message: str, document_received: dict, document_processed: dict):
Expand Down
15 changes: 3 additions & 12 deletions logprep/framework/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -210,9 +210,9 @@ def _event_version_information(self) -> dict:
"configuration": self._logprep_config.get("version", "unset"),
}

@cached_property
@property
def _metric_labels(self) -> dict:
return {"pipeline": f"pipeline-{self.pipeline_index}"}
return {"pipeline": self._process_name}

@cached_property
def metrics(self) -> Metrics:
Expand Down Expand Up @@ -240,7 +240,6 @@ def _output(self) -> dict[str, Output]:
output_names = list(output_configs.keys())
outputs = {}
for output_name in output_names:
output_configs[output_name]["metric_labels"] = self._metric_labels
output_config = output_configs.get(output_name)
outputs |= {output_name: Factory.create({output_name: output_config}, self.logger)}
return outputs
Expand All @@ -250,11 +249,6 @@ def _input(self) -> Input:
input_connector_config = self._logprep_config.get("input")
if input_connector_config is None:
return None
connector_name = list(input_connector_config.keys())[0]
input_connector_config[connector_name]["metric_labels"] = self._metric_labels
input_connector_config[connector_name].update(
{"version_information": self._event_version_information}
)
return Factory.create(input_connector_config, self.logger)

@_handle_pipeline_error
Expand All @@ -281,8 +275,6 @@ def _setup(self):
self.logger.info("Finished building pipeline")

def _create_processor(self, entry: dict) -> "Processor":
processor_name = list(entry.keys())[0]
entry[processor_name]["metric_labels"] = self._metric_labels
processor = Factory.create(entry, self.logger)
processor.setup()
self.logger.debug(f"Created '{processor}' processor")
Expand Down Expand Up @@ -383,8 +375,7 @@ def process_event(self, event: dict):
break
if self._processing_counter:
self._processing_counter.increment()
if self.metrics:
self.metrics.number_of_processed_events += 1
self.metrics.number_of_processed_events += 1
return extra_outputs

def _store_extra_data(self, extra_data: List[tuple]) -> None:
Expand Down
54 changes: 43 additions & 11 deletions logprep/framework/rule_tree/rule_tree.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
"""This module contains the rule tree functionality."""

from enum import Enum
from logging import Logger
from typing import TYPE_CHECKING, List, Optional

Expand All @@ -12,9 +13,19 @@
from logprep.util import getter

if TYPE_CHECKING:
from logprep.abc.processor import Processor
from logprep.processor.base.rule import Rule


class RuleTreeType(Enum):
"""Types of rule trees."""

SPECIFIC = 1
"""Specific rule tree that is used to match specific rules."""
GENERIC = 2
"""Generic rule tree that is used to match generic rules."""


class RuleTree:
"""Represent a set of rules using a rule tree model."""

Expand Down Expand Up @@ -49,19 +60,29 @@ def mean_processing_time(self):
"rule_parser",
"metrics",
"priority_dict",
"rule_tree_type",
"_rule_mapping",
"_config_path",
"_processor_config",
"_processor_name",
"_root",
)

rule_parser: Optional[RuleParser]
metrics: RuleTreeMetrics
priority_dict: dict
rule_tree_type: RuleTreeType
_rule_mapping: dict
_config_path: str
_processor_name: str
_processor_config: "Processor.Config"
_root: Node

def __init__(self, root: Node = None, config_path: str = None, metric_labels: dict = None):
def __init__(
self,
root: Node = None,
processor_name: str = None,
processor_config: "Processor.Config" = None,
rule_tree_type: RuleTreeType = None,
):
"""Rule tree initialization function.
Initializes a new rule tree with a given root node and a path to the tree's optional config
Expand All @@ -72,23 +93,32 @@ def __init__(self, root: Node = None, config_path: str = None, metric_labels: di
----------
root: Node, optional
Node that should be used as the new rule tree's root node.
config_path: str, optional
Path to the optional configuration file that contains the new rule tree's configuration.
processor_config: Processor.Config, optional
Configuration of the processor that uses the rule tree.
"""
self.rule_parser = None
self._rule_mapping = {}
self._config_path = config_path
self._processor_config = processor_config
self._processor_name = processor_name
self.rule_tree_type = rule_tree_type
self._setup()
if not metric_labels:
metric_labels = {"component": "rule_tree"}
self.metrics = self.RuleTreeMetrics(labels=metric_labels)
self.metrics = self.RuleTreeMetrics(labels=self.metric_labels)

if root:
self._root = root
else:
self._root = Node(None)

@property
def metric_labels(self) -> dict:
"""Return metric labels."""
return {
"rule_tree": self.rule_tree_type.name.lower(),
"processor": self._processor_name,
"processor_type": self._processor_config.type,
}

def _setup(self):
"""Basic setup of rule tree.
Expand All @@ -98,8 +128,10 @@ def _setup(self):
self.priority_dict = {}
tag_map = {}

if self._config_path:
config_data = getter.GetterFactory.from_string(self._config_path).get_json()
if self._processor_config.tree_config:
config_data = getter.GetterFactory.from_string(
self._processor_config.tree_config
).get_json()
self.priority_dict = config_data["priority_dict"]
tag_map = config_data["tag_map"]
self.rule_parser = RuleParser(tag_map)
Expand Down
5 changes: 4 additions & 1 deletion logprep/metrics/metrics.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
"""This module tracks, calculates, exposes and resets logprep metrics"""
import logging
from enum import Enum

from attr import asdict, define, field, validators
Expand Down Expand Up @@ -61,12 +62,14 @@ class Metric:
key_validator=validators.instance_of(str),
value_validator=validators.instance_of(str),
),
]
],
default={},
)
tracker: object = field(default=None)

def __add__(self, other):
self.tracker.labels(**self.labels).inc(other)
logging.getLogger(__name__).info("Incremented metric %s by %s", self.name, self.labels)
return self


Expand Down
2 changes: 1 addition & 1 deletion quickstart/exampledata/config/pipeline.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ version: 1
process_count: 1
timeout: 0.1
logger:
level: DEBUG
level: INFO

metrics:
enabled: true
Expand Down

0 comments on commit cb2338b

Please sign in to comment.