Skip to content

Commit

Permalink
Pre-compile patterns, add cache and support ignoring case in generic …
Browse files Browse the repository at this point in the history
…resolver (#716)

* Pre-compile patterns, add cache and support ignoring case in generic resolver

* Black formatting

* Fix mistakes in generic resolver docstrings

* Clarify docstrings for generic resolver

* Refactor names in generic resolver

* Add test with dict resolve list value to generic resolver

* Add comments to generic resolver tests with cache metrics
  • Loading branch information
ppcad authored Dec 5, 2024
1 parent af153a2 commit 4219101
Show file tree
Hide file tree
Showing 4 changed files with 400 additions and 17 deletions.
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@
* fix `requester` documentation
* replace `BaseException` with `Exception` for custom errors
* refactor `generic_resolver` to validate rules on startup instead of application of each rule
* regex pattern lists for the `generic_resolver` are pre-compiled
* regex matching from lists in the `generic_resolver` is cached
* matching in the `generic_resolver` can be case-insensitive
* rewrite the helper method `add_field_to` such that it always raises an `FieldExistsWarning` instead of return a bool.
* add new helper method `add_fields_to` to directly add multiple fields to one event
* refactored some processors to make use of the new helper methods
Expand Down
116 changes: 110 additions & 6 deletions logprep/processor/generic_resolver/processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,13 @@
.. automodule:: logprep.processor.generic_resolver.rule
"""

import re
from functools import cached_property, lru_cache

from typing import Optional
from attrs import define, field, validators

from logprep.abc.processor import Processor
from logprep.metrics.metrics import GaugeMetric
from logprep.processor.base.exceptions import FieldExistsWarning
from logprep.processor.field_manager.processor import FieldManager
from logprep.processor.generic_resolver.rule import GenericResolverRule
Expand All @@ -36,8 +41,79 @@
class GenericResolver(FieldManager):
"""Resolve values in documents by referencing a mapping list."""

@define(kw_only=True)
class Config(Processor.Config):
"""GenericResolver config"""

max_cache_entries: Optional[int] = field(
validator=validators.optional(validators.instance_of(int)), default=0
)
"""(Optional) Size of cache for results when resolving form a list.
The cache can be disabled by setting it this option to :code:`0`."""
cache_metrics_interval: Optional[int] = field(
validator=validators.optional(validators.instance_of(int)), default=1
)
"""(Optional) Cache metrics won't be updated immediately.
Instead updating is skipped for a number of events before it's next update.
:code:`cache_metrics_interval` sets the number of events between updates (default: 1)."""

@define(kw_only=True)
class Metrics(FieldManager.Metrics):
"""Tracks statistics about the generic resolver"""

new_results: GaugeMetric = field(
factory=lambda: GaugeMetric(
description="Number of newly resolved values",
name="generic_resolver_new_results",
)
)
"""Number of newly resolved values"""

cached_results: GaugeMetric = field(
factory=lambda: GaugeMetric(
description="Number of values resolved from cache",
name="generic_resolver_cached_results",
)
)
"""Number of resolved values from cache"""
num_cache_entries: GaugeMetric = field(
factory=lambda: GaugeMetric(
description="Number of resolved values in cache",
name="generic_resolver_num_cache_entries",
)
)
"""Number of values in cache"""
cache_load: GaugeMetric = field(
factory=lambda: GaugeMetric(
description="Relative cache load.",
name="generic_resolver_cache_load",
)
)
"""Relative cache load."""

__slots__ = ["_cache_metrics_skip_count"]

_cache_metrics_skip_count: int

rule_class = GenericResolverRule

@property
def max_cache_entries(self):
"""Returns the configured number of max_cache_entries"""
return self._config.max_cache_entries

@property
def cache_metrics_interval(self):
"""Returns the configured cache_metrics_interval"""
return self._config.cache_metrics_interval

@cached_property
def _get_lru_cached_value_from_list(self):
"""Returns lru cashed method to retrieve values from list if configured"""
if self.max_cache_entries <= 0:
return self._resolve_value_from_list
return lru_cache(maxsize=self.max_cache_entries)(self._resolve_value_from_list)

def _apply_rules(self, event, rule):
"""Apply the given rule to the current event"""
source_field_values = [
Expand Down Expand Up @@ -68,18 +144,46 @@ def _apply_rules(self, event, rule):
)
except FieldExistsWarning as error:
conflicting_fields.extend(error.skipped_fields)

self._update_cache_metrics()

if conflicting_fields:
raise FieldExistsWarning(rule, event, conflicting_fields)

def _find_content_of_first_matching_pattern(self, rule, source_field_value):
if rule.resolve_from_file:
pattern = f'^{rule.resolve_from_file["pattern"]}$'
replacements = rule.resolve_from_file["additions"]
matches = re.match(pattern, source_field_value)
matches = rule.pattern.match(source_field_value)
if matches:
content = replacements.get(matches.group("mapping"))
mapping = matches.group("mapping")
if rule.ignore_case:
mapping = mapping.upper()
content = replacements.get(mapping)
if content:
return content
for pattern, content in rule.resolve_list.items():
if re.search(pattern, source_field_value):
return self._get_lru_cached_value_from_list(rule, source_field_value)

def _resolve_value_from_list(
self, rule: GenericResolverRule, source_field_value: str
) -> Optional[str]:
for pattern, content in rule.compiled_resolve_list:
if pattern.search(source_field_value):
return content

def _update_cache_metrics(self):
if self.max_cache_entries <= 0:
return
self._cache_metrics_skip_count += 1
if self._cache_metrics_skip_count < self.cache_metrics_interval:
return
self._cache_metrics_skip_count = 0

cache_info = self._get_lru_cached_value_from_list.cache_info()
self.metrics.new_results += cache_info.misses
self.metrics.cached_results += cache_info.hits
self.metrics.num_cache_entries += cache_info.currsize
self.metrics.cache_load += cache_info.currsize / cache_info.maxsize

def setup(self):
super().setup()
self._cache_metrics_skip_count = 0
55 changes: 44 additions & 11 deletions logprep/processor/generic_resolver/rule.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,22 @@
The resolver will check for the pattern and get value captured by the :code:`mapping` group.
This captured value is then used in the list from the file.
:code:`ignore_case` can be set to ignore the case when matching values that will be resolved.
It is disabled by default. In the following example :code:`to_resolve: heLLo` would be resolved,
since :code:`ignore_case` is set to true.
.. code-block:: yaml
:linenos:
:caption: Example
filter: to_resolve
generic_resolver:
field_mapping:
to_resolve: resolved
resolve_list:
.*Hello.*: Greeting
ignore_case: true
In the following example :code:`to_resolve` will be checked by the
regex pattern :code:`\d*(?P<mapping>[a-z]+)\d*` and the list in :code:`path/to/resolve_mapping.yml`
will be used to add new fields.
Expand Down Expand Up @@ -72,7 +88,10 @@
:noindex:
"""

import re
from functools import cached_property
from pathlib import Path
from typing import Optional, Tuple, List

from attrs import define, field, validators

Expand All @@ -98,16 +117,7 @@ class Config(FieldManagerRule.Config):
]
)
"""Mapping in form of :code:`{SOURCE_FIELD: DESTINATION_FIELD}`"""
resolve_list: dict = field(
validator=[
validators.instance_of(dict),
validators.deep_mapping(
key_validator=validators.instance_of(str),
value_validator=validators.instance_of(str),
),
],
factory=dict,
)
resolve_list: dict = field(validator=[validators.instance_of(dict)], factory=dict)
"""lookup mapping in form of
:code:`{REGEX_PATTERN_0: ADDED_VALUE_0, ..., REGEX_PATTERN_N: ADDED_VALUE_N}`"""
resolve_from_file: dict = field(
Expand All @@ -125,6 +135,8 @@ class Config(FieldManagerRule.Config):
a regex pattern which can be used to resolve values.
The resolve list in the file at :code:`path` is then used in conjunction with
the regex pattern in :code:`pattern`."""
ignore_case: Optional[str] = field(validator=validators.instance_of(bool), default=False)
"""(Optional) Ignore case when matching resolve values. Defaults to :code:`False`."""

def __attrs_post_init__(self):
if self.resolve_from_file:
Expand All @@ -142,8 +154,11 @@ def __attrs_post_init__(self):
isinstance(value, str) for value in add_dict.values()
):
raise InvalidConfigurationError(
f"Additions file '{file_path}' must be a dictionary with string values! (Rule ID: '{self.id}')",
f"Additions file '{file_path}' must be a dictionary "
f"with string values! (Rule ID: '{self.id}')",
)
if self.ignore_case:
add_dict = {key.upper(): value for key, value in add_dict.items()}
self.resolve_from_file["additions"] = add_dict

@property
Expand All @@ -156,7 +171,25 @@ def resolve_list(self) -> dict:
"""Returns the resolve list"""
return self._config.resolve_list

@cached_property
def compiled_resolve_list(self) -> List[Tuple[re.Pattern, str]]:
"""Returns the resolve list with tuple pairs of compiled patterns and values"""
return [
(re.compile(pattern, re.I if self.ignore_case else 0), val)
for pattern, val in self._config.resolve_list.items()
]

@property
def resolve_from_file(self) -> dict:
"""Returns the resolve file"""
return self._config.resolve_from_file

@property
def ignore_case(self) -> bool:
"""Returns if the matching should be case-sensitive or not"""
return self._config.ignore_case

@cached_property
def pattern(self) -> re.Pattern:
"""Pattern used to resolve from file"""
return re.compile(f'^{self.resolve_from_file["pattern"]}$', re.I if self.ignore_case else 0)
Loading

0 comments on commit 4219101

Please sign in to comment.