Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add option to ignore missing fields in field_manager #457

Merged
merged 6 commits into from
Oct 13, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 4 additions & 12 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,24 +11,14 @@

* add a preprocessor to enrich by systems env variables
* add option to define rules inline in pipeline config under processor configs `generic_rules` or `specific_rules`

### Improvements

* `pre_detector` processor now adds the field `creation_timestamp` to pre-detections.
It contains the time at which a pre-detection was created by the processor.
* add `prometheus` and `grafana` to the quickstart setup to support development
* reimplemented kafka input connector
- move kafka config options to `kafka_config` dictionary

### Features

* add option to `field_manager` to ignore missing source fields to suppress warnings and failure tags
* add ignore_missing_source_fields behavior to `calculator`, `concatenator`, `dissector`, `grokker`, `ip_informer`, `selective_extractor`
dtrai2 marked this conversation as resolved.
Show resolved Hide resolved
* kafka input connector
- implemented manual commit behaviour if `enable.auto.commit: false`
- implemented on_commit callback to check for errors during commit
- implemented statistics callback to collect metrics from underlying librdkafka library
- implemented per partition offset metrics
- get logs and handle errors from underlying librdkafka library

* kafka output connector
- implemented statistics callback to collect metrics from underlying librdkafka library
- get logs and handle errors from underlying librdkafka library
Expand All @@ -37,8 +27,10 @@ It contains the time at which a pre-detection was created by the processor.

* `pre_detector` processor now adds the field `creation_timestamp` to pre-detections.
It contains the time at which a pre-detection was created by the processor.
* add `prometheus` and `grafana` to the quickstart setup to support development
* provide confluent kafka test setup to run tests against a real kafka cluster


### Bugfix

* fix CVE-2023-37920 Removal of e-Tugra root certificate
Expand Down
2 changes: 2 additions & 0 deletions logprep/abc/processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -324,6 +324,8 @@ def _has_missing_values(self, event, rule, source_field_dict):
dict(filter(lambda x: x[1] in [None, ""], source_field_dict.items())).keys()
)
if missing_fields:
if rule.ignore_missing_fields:
return True
error = BaseException(f"{self.name}: no value for fields: {missing_fields}")
self._handle_warning_error(event, rule, error)
return True
Expand Down
3 changes: 3 additions & 0 deletions logprep/processor/amides/rule.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,3 +54,6 @@ class Config(FieldManagerRule.Config):
)
target_field: str = field(validator=validators.instance_of(str), default="amides")
mapping: dict = field(default="", init=False, repr=False, eq=False)
ignore_missing_fields: bool = field(
init=False, repr=False, eq=False, default=False, validator=validators.instance_of(bool)
)
4 changes: 4 additions & 0 deletions logprep/processor/calculator/rule.py
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,10 @@ class Config(FieldManagerRule.Config):
"""
timeout: int = field(validator=validators.instance_of(int), converter=int, default=1)
"""The maximum time in seconds for the calculation. Defaults to :code:`1`"""
ignore_missing_fields: bool = field(validator=validators.instance_of(bool), default=False)
"""If set to :code:`True` missing fields will be ignored, no warning is logged,
and the event is not tagged with the a failure tag. As soon as one field is missing
no calculation is performed at all. Defaults to :code:`False`"""
mapping: dict = field(default="", init=False, repr=False, eq=False)

def __attrs_post_init__(self):
Expand Down
6 changes: 3 additions & 3 deletions logprep/processor/concatenator/processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,8 @@

.. automodule:: logprep.processor.concatenator.rule
"""
from logprep.abc.processor import Processor
from logprep.processor.concatenator.rule import ConcatenatorRule
from logprep.processor.field_manager.processor import FieldManager
from logprep.util.helper import get_dotted_field_value


Expand All @@ -38,7 +38,7 @@ def __init__(self, name: str, message: str):
super().__init__(f"Concatenator ({name}): {message}")


class Concatenator(Processor):
class Concatenator(FieldManager):
"""Concatenates a list of source fields into a new target field."""

rule_class = ConcatenatorRule
Expand All @@ -56,11 +56,11 @@ def _apply_rules(self, event, rule: ConcatenatorRule):
rule :
Currently applied concatenator rule.
"""

source_field_values = []
for source_field in rule.source_fields:
field_value = get_dotted_field_value(event, source_field)
source_field_values.append(field_value)
self._handle_missing_fields(event, rule, rule.source_fields, source_field_values)

source_field_values = [field for field in source_field_values if field is not None]
target_value = f"{rule.separator}".join(source_field_values)
Expand Down
2 changes: 2 additions & 0 deletions logprep/processor/datetime_extractor/rule.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,10 +42,12 @@ class Config(FieldManagerRule.Config):
source_fields: list = field(
validator=[
validators.instance_of(list),
validators.max_len(1),
validators.deep_iterable(member_validator=validators.instance_of(str)),
],
)
"""The fields from where to get the values which should be processed."""
target_field: str = field(validator=validators.instance_of(str))
"""The field where to write the processed values to. """
mapping: dict = field(default="", init=False, repr=False, eq=False)
ignore_missing_fields: bool = field(default=False, init=False, repr=False, eq=False)
8 changes: 5 additions & 3 deletions logprep/processor/dissector/processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,12 +29,12 @@
"""
from typing import Callable, List, Tuple

from logprep.abc.processor import Processor
from logprep.processor.dissector.rule import DissectorRule
from logprep.util.helper import get_dotted_field_value, add_field_to
from logprep.processor.field_manager.processor import FieldManager
from logprep.util.helper import add_field_to, get_dotted_field_value


class Dissector(Processor):
class Dissector(FieldManager):
"""A processor that tokenizes field values to new fields and converts datatypes"""

rule_class = DissectorRule
Expand Down Expand Up @@ -67,6 +67,8 @@ def _get_mappings(self, event, rule) -> List[Tuple[Callable, dict, str, str, str
current_field = source_field
loop_content = get_dotted_field_value(event, current_field)
if loop_content is None:
if rule.ignore_missing_fields:
continue
error = BaseException(
f"dissector: mapping field '{source_field}' does not exist"
)
Expand Down
3 changes: 2 additions & 1 deletion logprep/processor/domain_label_extractor/rule.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@
:inherited-members:
:noindex:
"""
from attr import field, validators, define
from attr import define, field, validators

from logprep.processor.field_manager.rule import FieldManagerRule

Expand All @@ -75,3 +75,4 @@ class Config(FieldManagerRule.Config):
)
"""The fields from where to get the values which should be processed."""
mapping: dict = field(default="", init=False, repr=False, eq=False)
ignore_missing_fields: bool = field(default=False, init=False, repr=False, eq=False)
1 change: 1 addition & 0 deletions logprep/processor/domain_resolver/rule.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,3 +45,4 @@ class Config(FieldManagerRule.Config):
)
"""The field where to write the processor output to. Defaults to :code:`resovled_ip`"""
mapping: dict = field(default="", init=False, repr=False, eq=False)
ignore_missing_fields: bool = field(default=False, init=False, repr=False, eq=False)
7 changes: 6 additions & 1 deletion logprep/processor/field_manager/processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -137,10 +137,15 @@ def _overwrite_with_list_from_source_field_values(self, *args):
add_and_overwrite(event, target_field, target_field_value)

def _handle_missing_fields(self, event, rule, source_fields, field_values):
if rule.ignore_missing_fields:
return False
if None in field_values:
error = self._get_missing_fields_error(source_fields, field_values)
self._handle_warning_error(
event, rule, error, failure_tags=["_field_manager_missing_field_warning"]
event,
rule,
error,
failure_tags=[f"_{self.rule_class.rule_type}_missing_field_warning"],
)
return True
return False
Expand Down
7 changes: 7 additions & 0 deletions logprep/processor/field_manager/rule.py
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,9 @@ class Config(Rule.Config):
If the target field does not exist, a new field will be added with the
source field value as list. Defaults to :code:`False`.
"""
ignore_missing_fields: bool = field(validator=validators.instance_of(bool), default=False)
"""If set to :code:`True` missing fields will be ignored, no warning is logged and the event
is not tagged with the failure tag. Defaults to :code:`False`"""

def __attrs_post_init__(self):
# ensures no split operations during processing
Expand Down Expand Up @@ -161,4 +164,8 @@ def overwrite_target(self):
def extend_target_list(self):
return self._config.extend_target_list

@property
def ignore_missing_fields(self):
return self._config.ignore_missing_fields

# pylint: enable=missing-function-docstring
1 change: 1 addition & 0 deletions logprep/processor/geoip_enricher/rule.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ class Config(FieldManagerRule.Config):
description: '...'
"""
mapping: dict = field(default="", init=False, repr=False, eq=False)
ignore_missing_fields: bool = field(default=False, init=False, repr=False, eq=False)

@property
def customize_target_subfields(self) -> dict: # pylint: disable=missing-function-docstring
Expand Down
8 changes: 7 additions & 1 deletion logprep/processor/grokker/processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,11 @@
from attrs import define, field, validators

from logprep.abc.processor import Processor
from logprep.processor.base.exceptions import FieldExistsWarning, ProcessingWarning, ProcessingError
from logprep.processor.base.exceptions import (
FieldExistsWarning,
ProcessingError,
ProcessingWarning,
)
from logprep.processor.grokker.rule import GrokkerRule
from logprep.util.getter import GetterFactory
from logprep.util.helper import add_field_to, get_dotted_field_value
Expand Down Expand Up @@ -66,6 +70,8 @@ def _apply_rules(self, event: dict, rule: GrokkerRule):
for dotted_field, grok in rule.actions.items():
field_value = get_dotted_field_value(event, dotted_field)
if field_value is None:
if rule.ignore_missing_fields:
continue
error = BaseException(f"{self.name}: missing source_field: '{dotted_field}'")
self._handle_warning_error(event=event, rule=rule, error=error)
continue
Expand Down
2 changes: 2 additions & 0 deletions logprep/processor/ip_informer/processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@ class IpInformer(FieldManager):
rule_class = IpInformerRule

def _apply_rules(self, event: dict, rule: IpInformerRule) -> None:
source_field_values = self._get_field_values(event, rule.source_fields)
self._handle_missing_fields(event, rule, rule.source_fields, source_field_values)
self._processing_warnings = []
ip_address_list = self._get_flat_ip_address_list(event, rule)
results = self._get_results(ip_address_list, rule)
Expand Down
1 change: 1 addition & 0 deletions logprep/processor/key_checker/rule.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,3 +70,4 @@ class Config(FieldManagerRule.Config):
target_field: str = field(validator=validators.instance_of(str))
"""The field where to write the processed values to. """
mapping: dict = field(default="", init=False, repr=False, eq=False)
ignore_missing_fields: bool = field(default=False, init=False, repr=False, eq=False)
1 change: 1 addition & 0 deletions logprep/processor/list_comparison/rule.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ class Config(FieldManagerRule.Config):
e.g., :code:`${<your environment variable>}`. The special key :code:`${LOGPREP_LIST}`
will be filled by this processor. """
mapping: dict = field(default="", init=False, repr=False, eq=False)
ignore_missing_fields: bool = field(default=False, init=False, repr=False, eq=False)

def __init__(self, filter_rule: FilterExpression, config: dict):
super().__init__(filter_rule, config)
Expand Down
1 change: 1 addition & 0 deletions logprep/processor/requester/rule.py
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,7 @@ class Config(FieldManagerRule.Config):
cert: str = field(validator=validators.instance_of(str), default="")
"""(Optional) SSL client certificate as path to ssl client cert file (.pem)."""
mapping: dict = field(default="", init=False, repr=False, eq=False)
ignore_missing_fields: bool = field(default=False, init=False, repr=False, eq=False)

def __attrs_post_init__(self):
url_fields = re.findall(FIELD_PATTERN, self.url)
Expand Down
1 change: 1 addition & 0 deletions logprep/processor/selective_extractor/processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ def _apply_rules(self, event: dict, rule: SelectiveExtractorRule):

"""
flattened_fields = get_source_fields_dict(event, rule)
self._handle_missing_fields(event, rule, flattened_fields.keys(), flattened_fields.values())
flattened_fields = {
dotted_field: content
for dotted_field, content in flattened_fields.items()
Expand Down
3 changes: 3 additions & 0 deletions logprep/processor/selective_extractor/rule.py
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,9 @@ class Config(FieldManagerRule.Config):
extract_from_file: str = field(validator=validators.instance_of(str), default="", eq=False)
"""The path or url to a file with a flat list of fields to extract.
For string format see :ref:`getters`."""
ignore_missing_fields: bool = field(validator=validators.instance_of(bool), default=True)
"""If set to :code:`True` missing fields will be ignored, no warning is logged and the event
is not tagged with the failure tag. Defaults to :code:`True`"""
target_field: str = field(default="", init=False, repr=False, eq=False)
overwrite_target: bool = field(default=False, init=False, repr=False, eq=False)
extend_target_list: bool = field(default=False, init=False, repr=False, eq=False)
Expand Down
1 change: 1 addition & 0 deletions logprep/processor/string_splitter/rule.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ class Config(FieldManagerRule.Config):
delimeter: str = field(validator=validators.instance_of(str), default=" ")
"""The delimeter for splitting. Defaults to whitespace"""
mapping: dict = field(default="", init=False, repr=False, eq=False)
ignore_missing_fields: bool = field(default=False, init=False, repr=False, eq=False)
dtrai2 marked this conversation as resolved.
Show resolved Hide resolved

@property
def delimeter(self):
Expand Down
1 change: 1 addition & 0 deletions logprep/processor/timestamp_differ/rule.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ class Config(FieldManagerRule.Config):
"""(Optional) Specifies whether the unit (s, ms, ns) should be part of the output.
Defaults to :code:`False`."""
mapping: dict = field(default="", init=False, repr=False, eq=False)
ignore_missing_fields: bool = field(default=False, init=False, repr=False, eq=False)
dtrai2 marked this conversation as resolved.
Show resolved Hide resolved

def __attrs_post_init__(self):
field_format_str = re.findall(FIELD_PATTERN, self.diff)
Expand Down
1 change: 1 addition & 0 deletions logprep/processor/timestamper/rule.py
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,7 @@ class Config(FieldManagerRule.Config):
)
""" timezone for target_field. defaults to :code:`UTC`"""
mapping: dict = field(default="", init=False, repr=False, eq=False)
ignore_missing_fields: bool = field(default=False, init=False, repr=False, eq=False)
dtrai2 marked this conversation as resolved.
Show resolved Hide resolved

@property
def source_format(self):
Expand Down
13 changes: 13 additions & 0 deletions tests/unit/processor/calculator/test_calculator.py
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,19 @@
{"duration": "0.01"},
{"duration": 10000.0},
),
(
"Ignore missing source fields",
{
"filter": "duration",
"calculator": {
"calc": "${missing_field} * 10e5",
"target_field": "duration",
"ignore_missing_fields": True,
},
},
{"duration": "0.01"},
{"duration": "0.01"},
),
ekneg54 marked this conversation as resolved.
Show resolved Hide resolved
]


Expand Down
21 changes: 21 additions & 0 deletions tests/unit/processor/concatenator/test_concatenator.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ def specific_rules_dirs(self):
"separator": "-",
"overwrite_target": False,
"delete_source_fields": False,
"ignore_missing_fields": True,
},
},
{"field": {"a": "first", "b": "second"}},
Expand Down Expand Up @@ -143,6 +144,26 @@ def specific_rules_dirs(self):
},
{"field": {"c": "another one"}, "target_field": "first-second"},
),
(
"ignore missing fields",
{
"filter": "field.a",
"concatenator": {
"source_fields": ["field.a", "field.b", "other_field.c"],
"target_field": "target_field",
"separator": "-",
"overwrite_target": False,
"delete_source_fields": False,
"ignore_missing_fields": True,
},
},
{"field": {"a": "first"}, "other_field": {"c": "third"}},
{
"field": {"a": "first"},
"other_field": {"c": "third"},
"target_field": "first-third",
},
),
],
)
def test_for_expected_output(self, test_case, rule, document, expected_output):
Expand Down
18 changes: 18 additions & 0 deletions tests/unit/processor/dissector/test_dissector.py
Original file line number Diff line number Diff line change
Expand Up @@ -620,6 +620,24 @@
"sys_type": "system_monitor",
},
),
(
"ignore missing fields",
{
"filter": "message",
"dissector": {
"mapping": {
"message": "%{sys_type}",
"does_not_exist": "%{sys_type}",
},
"ignore_missing_fields": True,
},
},
{"message": "system_monitor"},
{
"message": "system_monitor",
"sys_type": "system_monitor",
},
),
]
failure_test_cases = [ # testcase, rule, event, expected
(
Expand Down
Loading
Loading