Skip to content

Commit

Permalink
add ability to set format as dict in preprocessor
Browse files Browse the repository at this point in the history
  • Loading branch information
djkhl committed Feb 27, 2025
1 parent 350fa37 commit e6115ad
Show file tree
Hide file tree
Showing 2 changed files with 32 additions and 5 deletions.
12 changes: 9 additions & 3 deletions logprep/abc/input.py
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ class Config(Connector.Config):
"log_arrival_time_target_field": Optional[str],
"log_arrival_timedelta": Optional[TimeDeltaConfig],
"enrich_by_env_variables": Optional[dict],
"add_full_event_to_target_field": Optional[str],
"add_full_event_to_target_field": Optional[dict],
},
),
],
Expand Down Expand Up @@ -338,11 +338,17 @@ def _add_arrival_time_information_to_event(self, event: dict):

def _write_full_event_to_target_field(self, event_dict: dict, raw_event: bytearray):
target = self._config.preprocessing.get("add_full_event_to_target_field")
complete_event = {}
if raw_event is None:
raw_event = self._encoder.encode(event_dict)
complete_event = json.dumps(raw_event.decode("utf-8"))
if target["format"] is "dict":
complete_event = self._decoder.decode(raw_event.decode("utf-8"))
else:
complete_event = json.dumps(raw_event.decode("utf-8"))
event_dict.clear()
add_fields_to(event_dict, fields={target: complete_event}, overwrite_target=True)
add_fields_to(
event_dict, fields={target["target_field"]: complete_event}, overwrite_target=True
)

def _add_arrival_timedelta_information_to_event(self, event: dict):
log_arrival_timedelta_config = self._config.preprocessing.get("log_arrival_timedelta")
Expand Down
25 changes: 23 additions & 2 deletions tests/unit/connector/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -441,10 +441,13 @@ def test_pipeline_preprocessing_does_not_add_timestamp_delta_if_not_configured(s
result = connector.get_next(0.01)
assert "arrival_time" in result

def test_add_full_event_to_target_field(self):
def test_add_full_event_to_target_field_with_string_format(self):
preprocessing_config = {
"preprocessing": {
"add_full_event_to_target_field": "event.original",
"add_full_event_to_target_field": {
"format": "str",
"target_field": "event.original",
},
}
}
connector_config = deepcopy(self.CONFIG)
Expand All @@ -456,6 +459,24 @@ def test_add_full_event_to_target_field(self):
expected = {"event": {"original": '"{\\"any\\":\\"content\\"}"'}}
assert result == expected, f"{expected} is not the same as {result}"

def test_add_full_event_to_target_field_with_dict_format(self):
preprocessing_config = {
"preprocessing": {
"add_full_event_to_target_field": {
"format": "dict",
"target_field": "event.original",
},
}
}
connector_config = deepcopy(self.CONFIG)
connector_config.update(preprocessing_config)
connector = Factory.create({"test connector": connector_config})
test_event = {"any": "content"}
connector._get_event = mock.MagicMock(return_value=(test_event, None))
result = connector.get_next(0.01)
expected = {"event": {"original": {"any": "content"}}}
assert result == expected, f"{expected} is not the same as {result}"

def test_pipeline_preprocessing_does_not_add_timestamp_delta_if_configured_but_log_arrival_timestamp_not(
self,
):
Expand Down

0 comments on commit e6115ad

Please sign in to comment.