Skip to content

Commit

Permalink
revert exception signature and add rule to add_field_to method as arg…
Browse files Browse the repository at this point in the history
…ument
  • Loading branch information
dtrai2 committed Nov 13, 2024
1 parent 434216e commit a5d953c
Show file tree
Hide file tree
Showing 24 changed files with 80 additions and 53 deletions.
6 changes: 3 additions & 3 deletions logprep/abc/processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -359,15 +359,15 @@ def _handle_warning_error(self, event, rule, error, failure_tags=None):
new_field = {"tags": sorted(list({*failure_tags}))}
else:
new_field = {"tags": sorted(list({*tags, *failure_tags}))}
add_and_overwrite(event, new_field)
add_and_overwrite(event, new_field, rule)
if isinstance(error, ProcessingWarning):
if error.tags:
tags = tags if tags else []
new_field = {"tags": sorted(list({*error.tags, *tags, *failure_tags}))}
add_and_overwrite(event, new_field)
add_and_overwrite(event, new_field, rule)
self.result.warnings.append(error)
else:
self.result.warnings.append(ProcessingWarning(str(error), event, rule))
self.result.warnings.append(ProcessingWarning(str(error), rule, event))

def _has_missing_values(self, event, rule, source_field_dict):
missing_fields = list(
Expand Down
11 changes: 3 additions & 8 deletions logprep/processor/base/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ def __init__(self, message: str, rule: "Rule"):
class ProcessingWarning(Warning):
"""A warning occurred - log the warning, but continue processing the event."""

def __init__(self, message: str, event: dict, rule: "Rule" = None, tags: List[str] = None):
def __init__(self, message: str, rule: "Rule | None", event: dict, tags: List[str] = None):
self.tags = tags if tags else []
if rule:
rule.metrics.number_of_warnings += 1
Expand All @@ -82,16 +82,11 @@ def __init__(self, message: str, event: dict, rule: "Rule" = None, tags: List[st
class FieldExistsWarning(ProcessingWarning):
"""Raised if field already exists."""

def __init__(
self,
event: dict,
skipped_fields: List[str],
rule: "Rule" = None,
):
def __init__(self, rule: "Rule | None", event: dict, skipped_fields: List[str]):
self.skipped_fields = skipped_fields
message = (
"The following fields could not be written, because "
"one or more subfields existed and could not be extended: "
f"{', '.join(skipped_fields)}"
)
super().__init__(message, event, rule)
super().__init__(message, rule, event)
4 changes: 2 additions & 2 deletions logprep/processor/dissector/processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,12 +85,12 @@ def _get_mappings(self, event, rule) -> List[Tuple[Callable, dict, dict, str, in
if strip_char:
content = content.strip(strip_char)
field = {target_field: content}
yield rule_action, event, field, separator, position
yield rule_action, event, field, separator, rule, position

def _apply_convert_datatype(self, event, rule):
for target_field, converter in rule.convert_actions:
try:
target_value = converter(get_dotted_field_value(event, target_field))
add_field_to(event, {target_field: target_value}, overwrite_target_field=True)
add_field_to(event, {target_field: target_value}, rule, overwrite_target_field=True)
except ValueError as error:
self._handle_warning_error(event, rule, error)
10 changes: 7 additions & 3 deletions logprep/processor/domain_label_extractor/processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,9 @@ def _apply_rules(self, event, rule: DomainLabelExtractorRule):

if self._is_valid_ip(domain):
tagging_field.append(f"ip_in_{rule.source_fields[0].replace('.', '_')}")
add_and_overwrite(event, fields={self._config.tagging_field_name: tagging_field})
add_and_overwrite(
event, fields={self._config.tagging_field_name: tagging_field}, rule=rule
)
return

labels = self._tld_extractor(domain)
Expand All @@ -140,10 +142,12 @@ def _apply_rules(self, event, rule: DomainLabelExtractorRule):
f"{rule.target_field}.top_level_domain": labels.suffix,
f"{rule.target_field}.subdomain": labels.subdomain,
}
add_field_to(event, fields, overwrite_target_field=rule.overwrite_target)
add_field_to(event, fields, rule, overwrite_target_field=rule.overwrite_target)
else:
tagging_field.append(f"invalid_domain_in_{rule.source_fields[0].replace('.', '_')}")
add_and_overwrite(event, fields={self._config.tagging_field_name: tagging_field})
add_and_overwrite(
event, fields={self._config.tagging_field_name: tagging_field}, rule=rule
)

@staticmethod
def _is_valid_ip(domain):
Expand Down
20 changes: 12 additions & 8 deletions logprep/processor/field_manager/processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,11 @@ def _apply_mapping(self, event, rule, rule_args):
return
source_field_values, targets = self._filter_missing_fields(source_field_values, targets)
add_field_to(
event, dict(zip(targets, source_field_values)), extend_target_list, overwrite_target
event,
dict(zip(targets, source_field_values)),
rule,
extend_target_list,
overwrite_target,
)
if rule.delete_source_fields:
for dotted_field in source_fields:
Expand All @@ -105,7 +109,7 @@ def _write_to_single_target(self, args, extend_target_list, overwrite_target, ru
case State(
extend=True, overwrite=True, single_source_element=False, target_is_list=False
):
add_and_overwrite(event, fields={target_field: source_fields_values})
add_and_overwrite(event, fields={target_field: source_fields_values}, rule=rule)
return

case State(
Expand All @@ -117,35 +121,35 @@ def _write_to_single_target(self, args, extend_target_list, overwrite_target, ru
):
flattened_source_fields = self._overwrite_from_source_values(source_fields_values)
source_fields_values = [*flattened_source_fields]
add_and_overwrite(event, fields={target_field: source_fields_values})
add_and_overwrite(event, fields={target_field: source_fields_values}, rule=rule)
return

case State(extend=True, overwrite=False, target_is_list=False, target_is_none=True):
add_and_overwrite(event, fields={target_field: source_fields_values})
add_and_overwrite(event, fields={target_field: source_fields_values}, rule=rule)
return

case State(extend=True, overwrite=False, target_is_list=False):
source_fields_values = [target_field_value, *source_fields_values]
add_and_overwrite(event, fields={target_field: source_fields_values})
add_and_overwrite(event, fields={target_field: source_fields_values}, rule=rule)
return

case State(
extend=True, overwrite=False, single_source_element=False, target_is_list=True
):
flattened_source_fields = self._overwrite_from_source_values(source_fields_values)
source_fields_values = [*target_field_value, *flattened_source_fields]
add_and_overwrite(event, fields={target_field: source_fields_values})
add_and_overwrite(event, fields={target_field: source_fields_values}, rule=rule)
return

case State(overwrite=True, extend=True):
flattened_source_fields = self._overwrite_from_source_values(source_fields_values)
source_fields_values = [*flattened_source_fields]
add_and_overwrite(event, fields={target_field: source_fields_values})
add_and_overwrite(event, fields={target_field: source_fields_values}, rule=rule)
return

case _:
field = {target_field: source_fields_values}
add_field_to(event, field, state.extend, state.overwrite)
add_field_to(event, field, rule, state.extend, state.overwrite)

def _overwrite_from_source_values(self, source_fields_values):
duplicates = []
Expand Down
2 changes: 1 addition & 1 deletion logprep/processor/generic_adder/processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -230,7 +230,7 @@ def _apply_rules(self, event: dict, rule: GenericAdderRule):
self._update_db_table()
items_to_add = self._get_items_to_add_from_db(event, rule)
if items_to_add:
add_field_to(event, items_to_add, rule.extend_target_list, rule.overwrite_target)
add_field_to(event, items_to_add, rule, rule.extend_target_list, rule.overwrite_target)

def _get_items_to_add_from_db(self, event: dict, rule: GenericAdderRule) -> dict | None:
"""Get the sub part of the value from the event using a regex pattern"""
Expand Down
3 changes: 2 additions & 1 deletion logprep/processor/generic_resolver/processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,13 +62,14 @@ def _apply_rules(self, event, rule):
add_field_to(
event,
fields={target_field: content},
rule=rule,
extends_lists=rule.extend_target_list,
overwrite_target_field=rule.overwrite_target,
)
except FieldExistsWarning as error:
conflicting_fields.extend(error.skipped_fields)
if conflicting_fields:
raise FieldExistsWarning(event, conflicting_fields, rule)
raise FieldExistsWarning(rule, event, conflicting_fields)

def _find_content_of_first_matching_pattern(self, rule, source_field_value):
if rule.resolve_from_file:
Expand Down
1 change: 1 addition & 0 deletions logprep/processor/geoip_enricher/processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,7 @@ def _apply_rules(self, event, rule):
add_field_to(
event,
fields,
rule=rule,
extends_lists=False,
overwrite_target_field=rule.overwrite_target,
)
3 changes: 2 additions & 1 deletion logprep/processor/grokker/processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,13 +88,14 @@ def _apply_rules(self, event: dict, rule: GrokkerRule):
add_field_to(
event,
result,
rule=rule,
extends_lists=rule.extend_target_list,
overwrite_target_field=rule.overwrite_target,
)
if self._handle_missing_fields(event, rule, rule.actions.keys(), source_values):
return
if not matches:
raise ProcessingWarning("no grok pattern matched", event, rule)
raise ProcessingWarning("no grok pattern matched", rule, event)

def setup(self):
"""Loads the action mapping. Has to be called before processing"""
Expand Down
3 changes: 2 additions & 1 deletion logprep/processor/hyperscan_resolver/processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -122,14 +122,15 @@ def _apply_rules(self, event: dict, rule: HyperscanResolverRule):
add_field_to(
event,
fields={resolve_target: dest_val},
rule=rule,
extends_lists=rule.extend_target_list,
overwrite_target_field=rule.overwrite_target,
)
except FieldExistsWarning as error:
conflicting_fields.extend(error.skipped_fields)
self._handle_missing_fields(event, rule, rule.field_mapping.keys(), source_values)
if conflicting_fields:
raise FieldExistsWarning(event, conflicting_fields, rule)
raise FieldExistsWarning(rule, event, conflicting_fields)

@staticmethod
def _match_with_hyperscan(hyperscan_db: Database, src_val: str) -> list:
Expand Down
2 changes: 1 addition & 1 deletion logprep/processor/ip_informer/processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ def _apply_rules(self, event: dict, rule: IpInformerRule) -> None:
if results:
self._write_target_field(event, rule, results)
for msg, error in self._processing_warnings:
raise ProcessingWarning(msg, event, rule) from error
raise ProcessingWarning(msg, rule, event) from error

def _get_results(self, ip_address_list: Iterable, rule: IpInformerRule) -> dict:
results = [(ip, self._ip_properties(ip, rule)) for ip in ip_address_list]
Expand Down
4 changes: 2 additions & 2 deletions logprep/processor/labeler/processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,10 +74,10 @@ def setup(self):
def _apply_rules(self, event, rule):
"""Applies the rule to the current event"""
fields = {key: value for key, value in rule.prefixed_label.items()}
add_field_to(event, fields, extends_lists=True)
add_field_to(event, fields, rule=rule, extends_lists=True)
# convert sets into sorted lists
fields = {
key: sorted(set(get_dotted_field_value(event, key)))
for key, _ in rule.prefixed_label.items()
}
add_field_to(event, fields, overwrite_target_field=True)
add_field_to(event, fields, rule=rule, overwrite_target_field=True)
2 changes: 1 addition & 1 deletion logprep/processor/list_comparison/processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ def _apply_rules(self, event, rule):
comparison_result, comparison_key = self._list_comparison(rule, event)
if comparison_result is not None:
fields = {f"{rule.target_field}.{comparison_key}": comparison_result}
add_field_to(event, fields, extends_lists=True)
add_field_to(event, fields, rule=rule, extends_lists=True)

def _list_comparison(self, rule: ListComparisonRule, event: dict):
"""
Expand Down
4 changes: 2 additions & 2 deletions logprep/processor/pre_detector/processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -105,8 +105,8 @@ def normalize_timestamp(self, rule: PreDetectorRule, timestamp: str) -> str:
except TimeParserException as error:
raise ProcessingWarning(
"Could not parse timestamp",
self.result.event,
rule,
self.result.event,
tags=["_pre_detector_timeparsing_failure"],
) from error

Expand All @@ -126,7 +126,7 @@ def _get_detection_result(self, event: dict, rule: PreDetectorRule):
pre_detection_id = get_dotted_field_value(event, "pre_detection_id")
if pre_detection_id is None:
pre_detection_id = str(uuid4())
add_field_to(event, {"pre_detection_id": pre_detection_id})
add_field_to(event, {"pre_detection_id": pre_detection_id}, rule=rule)
detection_result = self._generate_detection_result(pre_detection_id, event, rule)
self.result.data.append((detection_result, self._config.outputs))

Expand Down
4 changes: 3 additions & 1 deletion logprep/processor/pseudonymizer/processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -264,7 +264,9 @@ def _apply_rules(self, event: dict, rule: PseudonymizerRule):
]
else:
field_value = self._pseudonymize_field(rule, dotted_field, regex, field_value)
add_field_to(event, fields={dotted_field: field_value}, overwrite_target_field=True)
add_field_to(
event, fields={dotted_field: field_value}, rule=rule, overwrite_target_field=True
)
if "@timestamp" in event:
for pseudonym, _ in self.result.data:
pseudonym["@timestamp"] = event["@timestamp"]
Expand Down
4 changes: 3 additions & 1 deletion logprep/processor/requester/processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ def _handle_response(self, event, rule, response):
add_field_to(
event,
fields={rule.target_field: self._get_result(response)},
rule=rule,
extends_lists=rule.extend_target_list,
overwrite_target_field=rule.overwrite_target,
)
Expand All @@ -86,13 +87,14 @@ def _handle_response(self, event, rule, response):
add_field_to(
event,
dict(zip(targets, contents)),
rule,
rule.extend_target_list,
rule.overwrite_target,
)
except FieldExistsWarning as error:
conflicting_fields.extend(error.skipped_fields)
if conflicting_fields:
raise FieldExistsWarning(event, conflicting_fields, rule)
raise FieldExistsWarning(rule, event, conflicting_fields)

def _request(self, event, rule, kwargs):
try:
Expand Down
2 changes: 1 addition & 1 deletion logprep/processor/selective_extractor/processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,5 +64,5 @@ def _apply_rules(self, event: dict, rule: SelectiveExtractorRule):
}
if flattened_fields:
filtered_event = {}
add_field_to(filtered_event, flattened_fields)
add_field_to(filtered_event, flattened_fields, rule)
self.result.data.append((filtered_event, rule.outputs))
2 changes: 1 addition & 1 deletion logprep/processor/string_splitter/processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,6 @@ def _apply_rules(self, event: dict, rule: StringSplitterRule):
source_field_content = get_dotted_field_value(event, source_field)
self._handle_missing_fields(event, rule, rule.source_fields, [source_field_content])
if not isinstance(source_field_content, str):
raise ProcessingWarning(f"source_field '{source_field}' is not a string", event, rule)
raise ProcessingWarning(f"source_field '{source_field}' is not a string", rule, event)
result = source_field_content.split(rule.delimeter)
self._write_target_field(event, rule, result)
5 changes: 4 additions & 1 deletion logprep/processor/template_replacer/processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,10 @@ def _perform_replacement(self, event: dict, replacement: str, rule: TemplateRepl
"""
overwrite = get_dotted_field_value(event, self._target_field) is not None
add_field_to(
event, fields={self._target_field: replacement}, overwrite_target_field=overwrite
event,
fields={self._target_field: replacement},
rule=rule,
overwrite_target_field=overwrite,
)

def setup(self):
Expand Down
2 changes: 1 addition & 1 deletion logprep/processor/timestamper/processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,4 +61,4 @@ def _apply_rules(self, event, rule):
parsed_successfully = True
break
if not parsed_successfully:
raise ProcessingWarning(str("Could not parse timestamp"), event, rule)
raise ProcessingWarning(str("Could not parse timestamp"), rule, event)
Loading

0 comments on commit a5d953c

Please sign in to comment.