From cb2338b4959c419f2ead95df3e0baadba1e7112c Mon Sep 17 00:00:00 2001 From: ekneg54 Date: Thu, 12 Oct 2023 09:57:28 +0000 Subject: [PATCH] WIP --- logprep/abc/component.py | 4 +- logprep/abc/input.py | 2 +- logprep/abc/processor.py | 33 +++++++------ logprep/connector/confluent_kafka/output.py | 2 +- logprep/connector/console/output.py | 2 +- logprep/connector/dummy/output.py | 4 +- logprep/connector/elasticsearch/output.py | 6 +-- logprep/connector/jsonl/output.py | 2 +- logprep/connector/s3/output.py | 4 +- logprep/framework/pipeline.py | 15 ++---- logprep/framework/rule_tree/rule_tree.py | 54 ++++++++++++++++----- logprep/metrics/metrics.py | 5 +- quickstart/exampledata/config/pipeline.yml | 2 +- 13 files changed, 81 insertions(+), 54 deletions(-) diff --git a/logprep/abc/component.py b/logprep/abc/component.py index 9724d44ef..13fa07d11 100644 --- a/logprep/abc/component.py +++ b/logprep/abc/component.py @@ -32,7 +32,6 @@ class Metrics: number_of_processed_events: Metric = Metric( type=MetricType.COUNTER, description="", - labels={"pipeline": "1"}, name=f"{_prefix}number_of_processed_events", ) """Number of events that were processed by the processor""" @@ -41,7 +40,8 @@ def __attrs_post_init__(self): for attribute in asdict(self): attribute = getattr(self, attribute) if isinstance(attribute, Metric): - attribute.labels |= self._labels + attribute.labels = self._labels + attribute.labels |= {"component": type(self).__module__.split(".")[-1]} attribute.tracker = attribute.type( name=attribute.name, documentation=attribute.description, diff --git a/logprep/abc/input.py b/logprep/abc/input.py index 750085f2d..7edbb1197 100644 --- a/logprep/abc/input.py +++ b/logprep/abc/input.py @@ -290,7 +290,7 @@ def get_next(self, timeout: float) -> Tuple[Optional[dict], Optional[str]]: self._add_arrival_timedelta_information_to_event(event) if self._add_env_enrichment: self._add_env_enrichment_to_event(event) - self.metrics.number_of_processed_events += 1 + # self.metrics.number_of_processed_events += 1 return event, non_critical_error_msg def batch_finished_callback(self): diff --git a/logprep/abc/processor.py b/logprep/abc/processor.py index 7d7ff417c..5455f2212 100644 --- a/logprep/abc/processor.py +++ b/logprep/abc/processor.py @@ -10,7 +10,7 @@ from attr import define, field, validators from logprep.abc.component import Component -from logprep.framework.rule_tree.rule_tree import RuleTree +from logprep.framework.rule_tree.rule_tree import RuleTree, RuleTreeType from logprep.processor.base.exceptions import ( FieldExistsWarning, ProcessingCriticalError, @@ -92,7 +92,6 @@ class Metrics(Component.Metrics): "rule_class", "has_custom_tests", "metrics", - "metric_labels", "_event", "_specific_tree", "_generic_tree", @@ -101,7 +100,6 @@ class Metrics(Component.Metrics): rule_class: "Rule" has_custom_tests: bool metrics: "Processor.Metrics" - metric_labels: dict _event: dict _specific_tree: RuleTree _generic_tree: RuleTree @@ -109,12 +107,15 @@ class Metrics(Component.Metrics): def __init__(self, name: str, configuration: "Processor.Config", logger: Logger): super().__init__(name, configuration, logger) - self.metric_labels, specific_tree_labels, generic_tree_labels = self._create_metric_labels() self._specific_tree = RuleTree( - config_path=self._config.tree_config, metric_labels=specific_tree_labels + processor_name=self.name, + processor_config=self._config, + rule_tree_type=RuleTreeType.SPECIFIC, ) self._generic_tree = RuleTree( - config_path=self._config.tree_config, metric_labels=generic_tree_labels + processor_name=self.name, + processor_config=self._config, + rule_tree_type=RuleTreeType.GENERIC, ) self.load_rules( generic_rules_targets=self._config.generic_rules, @@ -127,16 +128,6 @@ def __init__(self, name: str, configuration: "Processor.Config", logger: Logger) ) self.has_custom_tests = False - def _create_metric_labels(self): - """Reads out the metrics from the configuration and sets up labels for the rule trees""" - metric_labels = self._config.metric_labels - metric_labels.update({"processor": self.name}) - specif_tree_labels = copy.deepcopy(metric_labels) - specif_tree_labels.update({"rule_tree": "specific"}) - generic_tree_labels = copy.deepcopy(metric_labels) - generic_tree_labels.update({"rule_tree": "generic"}) - return metric_labels, specif_tree_labels, generic_tree_labels - @property def _specific_rules(self): """Returns all specific rules @@ -167,6 +158,16 @@ def rules(self): """ return [*self._generic_rules, *self._specific_rules] + @property + def metric_labels(self) -> dict: + """Returns the metric labels + + Returns + ------- + metric_labels: dict + """ + return {"type": self._config.type, "name": self.name} + @TimeMeasurement.measure_time() def process(self, event: dict): """Process a log event by calling the implemented `process` method of the diff --git a/logprep/connector/confluent_kafka/output.py b/logprep/connector/confluent_kafka/output.py index 2ab784966..790b68fd1 100644 --- a/logprep/connector/confluent_kafka/output.py +++ b/logprep/connector/confluent_kafka/output.py @@ -158,7 +158,7 @@ def store(self, document: dict) -> Optional[bool]: configured input """ self.store_custom(document, self._config.topic) - self.metrics.number_of_processed_events += 1 + # self.metrics.number_of_processed_events += 1 if self.input_connector: self.input_connector.batch_finished_callback() diff --git a/logprep/connector/console/output.py b/logprep/connector/console/output.py index 9e2a6298d..bf468529e 100644 --- a/logprep/connector/console/output.py +++ b/logprep/connector/console/output.py @@ -25,7 +25,7 @@ class ConsoleOutput(Output): def store(self, document: dict): pprint(document) - self.metrics.number_of_processed_events += 1 + # self.metrics.number_of_processed_events += 1 if self.input_connector: self.input_connector.batch_finished_callback() diff --git a/logprep/connector/dummy/output.py b/logprep/connector/dummy/output.py index ea9694835..df1300d86 100644 --- a/logprep/connector/dummy/output.py +++ b/logprep/connector/dummy/output.py @@ -19,7 +19,7 @@ from logging import Logger from typing import List -from attr import field, define +from attr import define, field from attrs import validators from logprep.abc.output import Output @@ -85,7 +85,7 @@ def store(self, document: dict): if exception is not None: raise Exception(exception) self.events.append(document) - self.metrics.number_of_processed_events += 1 + # self.metrics.number_of_processed_events += 1 if self.input_connector: self.input_connector.batch_finished_callback() diff --git a/logprep/connector/elasticsearch/output.py b/logprep/connector/elasticsearch/output.py index 41c4258bf..e19545106 100644 --- a/logprep/connector/elasticsearch/output.py +++ b/logprep/connector/elasticsearch/output.py @@ -35,7 +35,7 @@ import ssl from functools import cached_property from logging import Logger -from typing import List, Optional, Tuple, Union, Pattern +from typing import List, Optional, Pattern, Tuple, Union import elasticsearch as search from attr import define, field @@ -210,7 +210,7 @@ def store(self, document: dict): document = self._build_failed_index_document(document, "Missing index in document") self._add_dates(document) - self.metrics.number_of_processed_events += 1 + # self.metrics.number_of_processed_events += 1 self._write_to_search_context(document) def store_custom(self, document: dict, target: str): @@ -230,7 +230,7 @@ def store_custom(self, document: dict, target: str): """ document["_index"] = target self._add_dates(document) - self.metrics.number_of_processed_events += 1 + # self.metrics.number_of_processed_events += 1 self._write_to_search_context(document) def store_failed(self, error_message: str, document_received: dict, document_processed: dict): diff --git a/logprep/connector/jsonl/output.py b/logprep/connector/jsonl/output.py index b21405858..8d7bf1e9e 100644 --- a/logprep/connector/jsonl/output.py +++ b/logprep/connector/jsonl/output.py @@ -79,7 +79,7 @@ def _write_json(filepath: str, line: dict): def store(self, document: dict): self.events.append(document) JsonlOutput._write_json(self._config.output_file, document) - self.metrics.number_of_processed_events += 1 + # self.metrics.number_of_processed_events += 1 if self.input_connector: self.input_connector.batch_finished_callback() diff --git a/logprep/connector/s3/output.py b/logprep/connector/s3/output.py index d9104f7fe..b0b1b3ea2 100644 --- a/logprep/connector/s3/output.py +++ b/logprep/connector/s3/output.py @@ -248,7 +248,7 @@ def store(self, document: dict): document : dict Document to store. """ - self.metrics.number_of_processed_events += 1 + # self.metrics.number_of_processed_events += 1 prefix_value = get_dotted_field_value(document, self._config.prefix_field) if prefix_value is None: @@ -284,7 +284,7 @@ def store_custom(self, document: dict, target: str): Prefix for the document. """ - self.metrics.number_of_processed_events += 1 + # self.metrics.number_of_processed_events += 1 self._write_to_s3_resource(document, target) def store_failed(self, error_message: str, document_received: dict, document_processed: dict): diff --git a/logprep/framework/pipeline.py b/logprep/framework/pipeline.py index 83c1184ae..6cc5e9011 100644 --- a/logprep/framework/pipeline.py +++ b/logprep/framework/pipeline.py @@ -210,9 +210,9 @@ def _event_version_information(self) -> dict: "configuration": self._logprep_config.get("version", "unset"), } - @cached_property + @property def _metric_labels(self) -> dict: - return {"pipeline": f"pipeline-{self.pipeline_index}"} + return {"pipeline": self._process_name} @cached_property def metrics(self) -> Metrics: @@ -240,7 +240,6 @@ def _output(self) -> dict[str, Output]: output_names = list(output_configs.keys()) outputs = {} for output_name in output_names: - output_configs[output_name]["metric_labels"] = self._metric_labels output_config = output_configs.get(output_name) outputs |= {output_name: Factory.create({output_name: output_config}, self.logger)} return outputs @@ -250,11 +249,6 @@ def _input(self) -> Input: input_connector_config = self._logprep_config.get("input") if input_connector_config is None: return None - connector_name = list(input_connector_config.keys())[0] - input_connector_config[connector_name]["metric_labels"] = self._metric_labels - input_connector_config[connector_name].update( - {"version_information": self._event_version_information} - ) return Factory.create(input_connector_config, self.logger) @_handle_pipeline_error @@ -281,8 +275,6 @@ def _setup(self): self.logger.info("Finished building pipeline") def _create_processor(self, entry: dict) -> "Processor": - processor_name = list(entry.keys())[0] - entry[processor_name]["metric_labels"] = self._metric_labels processor = Factory.create(entry, self.logger) processor.setup() self.logger.debug(f"Created '{processor}' processor") @@ -383,8 +375,7 @@ def process_event(self, event: dict): break if self._processing_counter: self._processing_counter.increment() - if self.metrics: - self.metrics.number_of_processed_events += 1 + self.metrics.number_of_processed_events += 1 return extra_outputs def _store_extra_data(self, extra_data: List[tuple]) -> None: diff --git a/logprep/framework/rule_tree/rule_tree.py b/logprep/framework/rule_tree/rule_tree.py index 2b4635dd8..3d2ce7795 100644 --- a/logprep/framework/rule_tree/rule_tree.py +++ b/logprep/framework/rule_tree/rule_tree.py @@ -1,5 +1,6 @@ """This module contains the rule tree functionality.""" +from enum import Enum from logging import Logger from typing import TYPE_CHECKING, List, Optional @@ -12,9 +13,19 @@ from logprep.util import getter if TYPE_CHECKING: + from logprep.abc.processor import Processor from logprep.processor.base.rule import Rule +class RuleTreeType(Enum): + """Types of rule trees.""" + + SPECIFIC = 1 + """Specific rule tree that is used to match specific rules.""" + GENERIC = 2 + """Generic rule tree that is used to match generic rules.""" + + class RuleTree: """Represent a set of rules using a rule tree model.""" @@ -49,19 +60,29 @@ def mean_processing_time(self): "rule_parser", "metrics", "priority_dict", + "rule_tree_type", "_rule_mapping", - "_config_path", + "_processor_config", + "_processor_name", "_root", ) rule_parser: Optional[RuleParser] metrics: RuleTreeMetrics priority_dict: dict + rule_tree_type: RuleTreeType _rule_mapping: dict - _config_path: str + _processor_name: str + _processor_config: "Processor.Config" _root: Node - def __init__(self, root: Node = None, config_path: str = None, metric_labels: dict = None): + def __init__( + self, + root: Node = None, + processor_name: str = None, + processor_config: "Processor.Config" = None, + rule_tree_type: RuleTreeType = None, + ): """Rule tree initialization function. Initializes a new rule tree with a given root node and a path to the tree's optional config @@ -72,23 +93,32 @@ def __init__(self, root: Node = None, config_path: str = None, metric_labels: di ---------- root: Node, optional Node that should be used as the new rule tree's root node. - config_path: str, optional - Path to the optional configuration file that contains the new rule tree's configuration. + processor_config: Processor.Config, optional + Configuration of the processor that uses the rule tree. """ self.rule_parser = None self._rule_mapping = {} - self._config_path = config_path + self._processor_config = processor_config + self._processor_name = processor_name + self.rule_tree_type = rule_tree_type self._setup() - if not metric_labels: - metric_labels = {"component": "rule_tree"} - self.metrics = self.RuleTreeMetrics(labels=metric_labels) + self.metrics = self.RuleTreeMetrics(labels=self.metric_labels) if root: self._root = root else: self._root = Node(None) + @property + def metric_labels(self) -> dict: + """Return metric labels.""" + return { + "rule_tree": self.rule_tree_type.name.lower(), + "processor": self._processor_name, + "processor_type": self._processor_config.type, + } + def _setup(self): """Basic setup of rule tree. @@ -98,8 +128,10 @@ def _setup(self): self.priority_dict = {} tag_map = {} - if self._config_path: - config_data = getter.GetterFactory.from_string(self._config_path).get_json() + if self._processor_config.tree_config: + config_data = getter.GetterFactory.from_string( + self._processor_config.tree_config + ).get_json() self.priority_dict = config_data["priority_dict"] tag_map = config_data["tag_map"] self.rule_parser = RuleParser(tag_map) diff --git a/logprep/metrics/metrics.py b/logprep/metrics/metrics.py index 1b1cc98db..beee537cf 100644 --- a/logprep/metrics/metrics.py +++ b/logprep/metrics/metrics.py @@ -1,4 +1,5 @@ """This module tracks, calculates, exposes and resets logprep metrics""" +import logging from enum import Enum from attr import asdict, define, field, validators @@ -61,12 +62,14 @@ class Metric: key_validator=validators.instance_of(str), value_validator=validators.instance_of(str), ), - ] + ], + default={}, ) tracker: object = field(default=None) def __add__(self, other): self.tracker.labels(**self.labels).inc(other) + logging.getLogger(__name__).info("Incremented metric %s by %s", self.name, self.labels) return self diff --git a/quickstart/exampledata/config/pipeline.yml b/quickstart/exampledata/config/pipeline.yml index 139baf908..0a374400d 100644 --- a/quickstart/exampledata/config/pipeline.yml +++ b/quickstart/exampledata/config/pipeline.yml @@ -2,7 +2,7 @@ version: 1 process_count: 1 timeout: 0.1 logger: - level: DEBUG + level: INFO metrics: enabled: true