Skip to content

Commit

Permalink
let add_field_to always raise FieldExistsWarning on failure
Browse files Browse the repository at this point in the history
- Catch and handle FieldExistsWarning to raise CriticalInputError.
  • Loading branch information
dtrai2 committed Nov 7, 2024
1 parent 44c0166 commit 3bde77a
Show file tree
Hide file tree
Showing 5 changed files with 47 additions and 57 deletions.
42 changes: 17 additions & 25 deletions logprep/abc/input.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
from logprep.abc.connector import Connector
from logprep.abc.exceptions import LogprepException
from logprep.metrics.metrics import Metric
from logprep.processor.base.exceptions import FieldExistsWarning
from logprep.util.helper import add_field_to, get_dotted_field_value
from logprep.util.time import UTC, TimeParser
from logprep.util.validators import dict_structure_validator
Expand Down Expand Up @@ -280,16 +281,19 @@ def get_next(self, timeout: float) -> dict | None:
self.metrics.number_of_processed_events += 1
if not isinstance(event, dict):
raise CriticalInputError(self, "not a dict", event)
if self._add_hmac:
event = self._add_hmac_to(event, raw_event)
if self._add_version_info:
self._add_version_information_to_event(event)
if self._add_log_arrival_time_information:
self._add_arrival_time_information_to_event(event)
if self._add_log_arrival_timedelta_information:
self._add_arrival_timedelta_information_to_event(event)
if self._add_env_enrichment:
self._add_env_enrichment_to_event(event)
try:
if self._add_hmac:
event = self._add_hmac_to(event, raw_event)
if self._add_version_info:
self._add_version_information_to_event(event)
if self._add_log_arrival_time_information:
self._add_arrival_time_information_to_event(event)
if self._add_log_arrival_timedelta_information:
self._add_arrival_timedelta_information_to_event(event)
if self._add_env_enrichment:
self._add_env_enrichment_to_event(event)
except FieldExistsWarning as error:
raise CriticalInputError(self, error.args[0], event) from error
return event

def batch_finished_callback(self):
Expand Down Expand Up @@ -331,7 +335,7 @@ def _add_version_information_to_event(self, event: dict):
add_field_to(event, target_field, self._config._version_information)
# pylint: enable=protected-access

def _add_hmac_to(self, event_dict, raw_event) -> Tuple[dict, str]:
def _add_hmac_to(self, event_dict, raw_event) -> dict:
"""
Calculates an HMAC (Hash-based message authentication code) based on a given target field
and adds it to the given event. If the target field has the value '<RAW_MSG>' the full raw
Expand All @@ -357,7 +361,7 @@ def _add_hmac_to(self, event_dict, raw_event) -> Tuple[dict, str]:
------
CriticalInputError
If the hmac could not be added to the event because the desired output field already
exists or cant't be found.
exists or can't be found.
"""
hmac_options = self._config.preprocessing.get("hmac", {})
hmac_target_field_name = hmac_options.get("target")
Expand All @@ -382,17 +386,5 @@ def _add_hmac_to(self, event_dict, raw_event) -> Tuple[dict, str]:
).hexdigest()
compressed = zlib.compress(received_orig_message, level=-1)
hmac_output = {"hmac": hmac, "compressed_base64": base64.b64encode(compressed).decode()}
add_was_successful = add_field_to(
event_dict,
hmac_options.get("output_field"),
hmac_output,
)
if not add_was_successful:
raise CriticalInputError(
self,
f"Couldn't add the hmac to the input event as the desired "
f"output field '{hmac_options.get('output_field')}' already "
f"exist.",
event_dict,
)
add_field_to(event_dict, hmac_options.get("output_field"), hmac_output)
return event_dict
11 changes: 8 additions & 3 deletions logprep/processor/base/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ def __init__(self, message: str, rule: "Rule"):
class ProcessingWarning(Warning):
"""A warning occurred - log the warning, but continue processing the event."""

def __init__(self, message: str, rule: "Rule", event: dict, tags: List[str] = None):
def __init__(self, message: str, event: dict, rule: "Rule" = None, tags: List[str] = None):
self.tags = tags if tags else []
if rule:
rule.metrics.number_of_warnings += 1
Expand All @@ -82,10 +82,15 @@ def __init__(self, message: str, rule: "Rule", event: dict, tags: List[str] = No
class FieldExistsWarning(ProcessingWarning):
"""Raised if field already exists."""

def __init__(self, rule: "Rule", event: dict, skipped_fields: List[str]):
def __init__(
self,
event: dict,
skipped_fields: List[str],
rule: "Rule" = None,
):
message = (
"The following fields could not be written, because "
"one or more subfields existed and could not be extended: "
f"{', '.join(skipped_fields)}"
)
super().__init__(message, rule, event)
super().__init__(message, event, rule)
21 changes: 10 additions & 11 deletions logprep/util/helper.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
from colorama import Back, Fore
from colorama.ansi import AnsiBack, AnsiFore

from logprep.processor.base.exceptions import FieldExistsWarning
from logprep.util.defaults import DEFAULT_CONFIG_LOCATION

if TYPE_CHECKING: # pragma: no cover
Expand Down Expand Up @@ -63,7 +64,6 @@ def add_field_to(
content,
extends_lists=False,
overwrite_output_field=False,
raise_on_failure=None,
):
"""
Add content to the output_field in the given event. Output_field can be a dotted subfield.
Expand All @@ -80,11 +80,13 @@ def add_field_to(
Flag that determines whether output_field lists should be extended
overwrite_output_field: bool
Flag that determines whether the output_field should be overwritten
Returns
Raises
------
bool
True if no conflicting fields were found during the process of the creation
of the dotted subfields, otherwise False.
ValueError
If both extends_lists and overwrite_output_field are set to True.
FieldExistsWarning
If the output field already exists and overwrite_output_field is False, or if extends_lists is True but
the existing field is not a list.
"""
if extends_lists and overwrite_output_field:
raise ValueError("An output field can't be overwritten and extended at the same time")
Expand All @@ -93,19 +95,16 @@ def add_field_to(
try:
target_parent = reduce(_add_and_not_overwrite_key, field_path)
except KeyError as error:
if raise_on_failure:
raise raise_on_failure from error
return
raise FieldExistsWarning(event, [output_field]) from error
if overwrite_output_field:
target_parent[target_key] = content
else:
existing_value = target_parent.get(target_key)
if existing_value is None:
target_parent[target_key] = content
if not extends_lists or not isinstance(existing_value, list):
if raise_on_failure:
raise raise_on_failure
return
if not extends_lists or not isinstance(existing_value, list):
raise FieldExistsWarning(event, [output_field])
if isinstance(content, list):
target_parent[target_key].extend(content)
else:
Expand Down
3 changes: 1 addition & 2 deletions tests/unit/connector/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -266,8 +266,7 @@ def test_get_next_with_hmac_result_in_already_existing_subfield(self):
connector._get_event = mock.MagicMock(
return_value=(test_event.copy(), raw_encoded_test_event)
)
non_critical_error_msg = "Couldn't add the hmac to the input event as the desired output field 'message' already exist."
with pytest.raises(CriticalInputError, match=non_critical_error_msg) as error:
with pytest.raises(CriticalInputError, match="could not be written") as error:
_ = connector.get_next(1)
assert error.value.raw_input == {"message": {"with_subfield": "content"}}

Expand Down
27 changes: 11 additions & 16 deletions tests/unit/util/test_helper_add_field.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
# pylint: disable=missing-docstring
import pytest

from logprep.abc.exceptions import LogprepException
from logprep.processor.base.exceptions import FieldExistsWarning
from logprep.util.helper import add_field_to


Expand Down Expand Up @@ -31,15 +31,13 @@ def test_add_str_content_as_partially_new_dotted_subfield(self):

def test_provoke_str_duplicate_in_root_field(self):
document = {"source": {"ip": "8.8.8.8"}, "field": "exists already"}
error = LogprepException("test error")
with pytest.raises(LogprepException, match=r"test error"):
add_field_to(document, "field", "content", raise_on_failure=error)
with pytest.raises(FieldExistsWarning, match=r"could not be written"):
add_field_to(document, "field", "content")

def test_provoke_str_duplicate_in_dotted_subfield(self):
document = {"source": {"ip": "8.8.8.8"}, "sub": {"field": "exists already"}}
error = LogprepException("test error")
with pytest.raises(LogprepException, match=r"test error"):
add_field_to(document, "sub.field", "content", raise_on_failure=error)
with pytest.raises(FieldExistsWarning, match=r"could not be written"):
add_field_to(document, "sub.field", "content")

def test_add_dict_content_as_new_root_field(self):
document = {"source": {"ip": "8.8.8.8"}}
Expand All @@ -64,15 +62,13 @@ def test_add_dict_content_as_partially_new_dotted_subfield(self):

def test_provoke_dict_duplicate_in_root_field(self):
document = {"source": {"ip": "8.8.8.8"}, "field": {"already_existing": "dict"}}
error = LogprepException("test error")
with pytest.raises(LogprepException, match=r"test error"):
add_field_to(document, "field", {"dict": "content"}, raise_on_failure=error)
with pytest.raises(FieldExistsWarning, match=r"could not be written"):
add_field_to(document, "field", {"dict": "content"})

def test_provoke_dict_duplicate_in_dotted_subfield(self):
document = {"source": {"ip": "8.8.8.8"}, "sub": {"field": {"already_existing": "dict"}}}
error = LogprepException("test error")
with pytest.raises(LogprepException, match=r"test error"):
add_field_to(document, "sub.field", {"dict": "content"}, raise_on_failure=error)
with pytest.raises(FieldExistsWarning, match=r"could not be written"):
add_field_to(document, "sub.field", {"dict": "content"})

def test_add_field_to_overwrites_output_field_in_root_level(self):
document = {"some": "field", "output_field": "has already content"}
Expand Down Expand Up @@ -110,9 +106,8 @@ def test_add_field_to_raises_if_list_should_be_extended_and_overwritten_at_the_s
def test_returns_false_if_dotted_field_value_key_exists(self):
document = {"user": "Franz"}
content = ["user_inlist"]
error = LogprepException("test error")
with pytest.raises(LogprepException, match=r"test error"):
add_field_to(document, "user.in_list", content, raise_on_failure=error)
with pytest.raises(FieldExistsWarning, match=r"could not be written"):
add_field_to(document, "user.in_list", content)

def test_add_list_with_nested_keys(self):
testdict = {
Expand Down

0 comments on commit 3bde77a

Please sign in to comment.