Skip to content

Commit

Permalink
add env variable enrichment preprocessor
Browse files Browse the repository at this point in the history
  • Loading branch information
ekneg54 committed Oct 2, 2023
1 parent f7625a6 commit facd1ea
Show file tree
Hide file tree
Showing 2 changed files with 69 additions and 0 deletions.
28 changes: 28 additions & 0 deletions logprep/abc/input.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

import base64
import hashlib
import os
import zlib
from abc import abstractmethod
from functools import partial
Expand Down Expand Up @@ -102,6 +103,17 @@ class TimeDeltaConfig:
The calculation will be the arrival time minus the time of this reference field."""


@define(kw_only=True)
class EnvEnrichmentConfig:
"""Enrichment Configurations
Works only if the preprocessor enrich_by_env_variable is set."""

target_field: field(validator=[validators.instance_of(str), lambda _, __, x: bool(x)])
"""Defines the fieldname to which the env variable value should be written to."""
variable_name: field(validator=[validators.instance_of(str), lambda _, __, x: bool(x)])
"""Defines the name of the env variable that should be used for the enrichment."""


class Input(Connector):
"""Connect to a source for log data."""

Expand All @@ -119,6 +131,7 @@ class Config(Connector.Config):
"hmac": Optional[HmacConfig],
"log_arrival_time_target_field": Optional[str],
"log_arrival_timedelta": Optional[TimeDeltaConfig],
"enrich_by_env_variables": Optional[dict],
},
),
],
Expand Down Expand Up @@ -216,6 +229,11 @@ def _add_log_arrival_timedelta_information(self):
)
return log_arrival_time_target_field_present & log_arrival_timedelta_present

@property
def _add_env_enrichment(self):
"""Check and return if the env enrichment should be added to the event."""
return bool(self._config.preprocessing.get("enrich_by_env_variables"))

def _get_raw_event(self, timeout: float) -> bytearray: # pylint: disable=unused-argument
"""Implements the details how to get the raw event
Expand Down Expand Up @@ -278,12 +296,22 @@ def get_next(self, timeout: float) -> Tuple[Optional[dict], Optional[str]]:
self._add_arrival_time_information_to_event(event)
if self._add_log_arrival_timedelta_information:
self._add_arrival_timedelta_information_to_event(event)
if self._add_env_enrichment:
self._add_env_enrichment_to_event(event)
self.metrics.number_of_processed_events += 1
return event, non_critical_error_msg

def batch_finished_callback(self):
"""Can be called by output connectors after processing a batch of one or more records."""

def _add_env_enrichment_to_event(self, event: dict):
"""Add the env enrichment information to the event"""
enrichments = self._config.preprocessing.get("enrich_by_env_variables")
if not enrichments:
return
for target_field, variable_name in enrichments.items():
add_field_to(event, target_field, os.environ.get(variable_name, ""))

def _add_arrival_time_information_to_event(self, event: dict):
now = TimeParser.now()
target_field = self._config.preprocessing.get("log_arrival_time_target_field")
Expand Down
41 changes: 41 additions & 0 deletions tests/unit/connector/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
# pylint: disable=line-too-long
import base64
import json
import os
import zlib
from copy import deepcopy
from logging import getLogger
Expand Down Expand Up @@ -480,6 +481,46 @@ def test_get_next_returns_event_with_active_time_measurement(self):
TimeMeasurement.TIME_MEASUREMENT_ENABLED = False
TimeMeasurement.APPEND_TO_EVENT = False

def test_preprocessing_enriches_by_env_variable(self):
preprocessing_config = {
"preprocessing": {
"enrich_by_env_variables": {
"enriched_field": "TEST_ENV_VARIABLE",
},
}
}
connector_config = deepcopy(self.CONFIG)
connector_config.update(preprocessing_config)
connector = Factory.create({"test connector": connector_config}, logger=self.logger)
test_event = {"any": "content"}
os.environ["TEST_ENV_VARIABLE"] = "test_value"
connector._get_event = mock.MagicMock(return_value=(test_event, None))
result, _ = connector.get_next(0.01)
assert result == {"any": "content", "enriched_field": "test_value"}

def test_preprocessing_enriches_by_multiple_env_variables(self):
preprocessing_config = {
"preprocessing": {
"enrich_by_env_variables": {
"enriched_field1": "TEST_ENV_VARIABLE_FOO",
"enriched_field2": "TEST_ENV_VARIABLE_BAR",
},
}
}
connector_config = deepcopy(self.CONFIG)
connector_config.update(preprocessing_config)
connector = Factory.create({"test connector": connector_config}, logger=self.logger)
test_event = {"any": "content"}
os.environ["TEST_ENV_VARIABLE_FOO"] = "test_value_foo"
os.environ["TEST_ENV_VARIABLE_BAR"] = "test_value_bar"
connector._get_event = mock.MagicMock(return_value=(test_event, None))
result, _ = connector.get_next(0.01)
assert result == {
"any": "content",
"enriched_field1": "test_value_foo",
"enriched_field2": "test_value_bar",
}


class BaseOutputTestCase(BaseConnectorTestCase):
def test_is_output_instance(self):
Expand Down

0 comments on commit facd1ea

Please sign in to comment.