From facd1ea844e90026855fc2ca345ade4019dbc303 Mon Sep 17 00:00:00 2001 From: ekneg54 Date: Mon, 2 Oct 2023 20:28:55 +0000 Subject: [PATCH] add env variable enrichment preprocessor --- logprep/abc/input.py | 28 ++++++++++++++++++++++++ tests/unit/connector/base.py | 41 ++++++++++++++++++++++++++++++++++++ 2 files changed, 69 insertions(+) diff --git a/logprep/abc/input.py b/logprep/abc/input.py index 9112d44d5..507792c1a 100644 --- a/logprep/abc/input.py +++ b/logprep/abc/input.py @@ -4,6 +4,7 @@ import base64 import hashlib +import os import zlib from abc import abstractmethod from functools import partial @@ -102,6 +103,17 @@ class TimeDeltaConfig: The calculation will be the arrival time minus the time of this reference field.""" +@define(kw_only=True) +class EnvEnrichmentConfig: + """Enrichment Configurations + Works only if the preprocessor enrich_by_env_variable is set.""" + + target_field: field(validator=[validators.instance_of(str), lambda _, __, x: bool(x)]) + """Defines the fieldname to which the env variable value should be written to.""" + variable_name: field(validator=[validators.instance_of(str), lambda _, __, x: bool(x)]) + """Defines the name of the env variable that should be used for the enrichment.""" + + class Input(Connector): """Connect to a source for log data.""" @@ -119,6 +131,7 @@ class Config(Connector.Config): "hmac": Optional[HmacConfig], "log_arrival_time_target_field": Optional[str], "log_arrival_timedelta": Optional[TimeDeltaConfig], + "enrich_by_env_variables": Optional[dict], }, ), ], @@ -216,6 +229,11 @@ def _add_log_arrival_timedelta_information(self): ) return log_arrival_time_target_field_present & log_arrival_timedelta_present + @property + def _add_env_enrichment(self): + """Check and return if the env enrichment should be added to the event.""" + return bool(self._config.preprocessing.get("enrich_by_env_variables")) + def _get_raw_event(self, timeout: float) -> bytearray: # pylint: disable=unused-argument """Implements the details how to get the raw event @@ -278,12 +296,22 @@ def get_next(self, timeout: float) -> Tuple[Optional[dict], Optional[str]]: self._add_arrival_time_information_to_event(event) if self._add_log_arrival_timedelta_information: self._add_arrival_timedelta_information_to_event(event) + if self._add_env_enrichment: + self._add_env_enrichment_to_event(event) self.metrics.number_of_processed_events += 1 return event, non_critical_error_msg def batch_finished_callback(self): """Can be called by output connectors after processing a batch of one or more records.""" + def _add_env_enrichment_to_event(self, event: dict): + """Add the env enrichment information to the event""" + enrichments = self._config.preprocessing.get("enrich_by_env_variables") + if not enrichments: + return + for target_field, variable_name in enrichments.items(): + add_field_to(event, target_field, os.environ.get(variable_name, "")) + def _add_arrival_time_information_to_event(self, event: dict): now = TimeParser.now() target_field = self._config.preprocessing.get("log_arrival_time_target_field") diff --git a/tests/unit/connector/base.py b/tests/unit/connector/base.py index ed58e1837..7c68c9709 100644 --- a/tests/unit/connector/base.py +++ b/tests/unit/connector/base.py @@ -3,6 +3,7 @@ # pylint: disable=line-too-long import base64 import json +import os import zlib from copy import deepcopy from logging import getLogger @@ -480,6 +481,46 @@ def test_get_next_returns_event_with_active_time_measurement(self): TimeMeasurement.TIME_MEASUREMENT_ENABLED = False TimeMeasurement.APPEND_TO_EVENT = False + def test_preprocessing_enriches_by_env_variable(self): + preprocessing_config = { + "preprocessing": { + "enrich_by_env_variables": { + "enriched_field": "TEST_ENV_VARIABLE", + }, + } + } + connector_config = deepcopy(self.CONFIG) + connector_config.update(preprocessing_config) + connector = Factory.create({"test connector": connector_config}, logger=self.logger) + test_event = {"any": "content"} + os.environ["TEST_ENV_VARIABLE"] = "test_value" + connector._get_event = mock.MagicMock(return_value=(test_event, None)) + result, _ = connector.get_next(0.01) + assert result == {"any": "content", "enriched_field": "test_value"} + + def test_preprocessing_enriches_by_multiple_env_variables(self): + preprocessing_config = { + "preprocessing": { + "enrich_by_env_variables": { + "enriched_field1": "TEST_ENV_VARIABLE_FOO", + "enriched_field2": "TEST_ENV_VARIABLE_BAR", + }, + } + } + connector_config = deepcopy(self.CONFIG) + connector_config.update(preprocessing_config) + connector = Factory.create({"test connector": connector_config}, logger=self.logger) + test_event = {"any": "content"} + os.environ["TEST_ENV_VARIABLE_FOO"] = "test_value_foo" + os.environ["TEST_ENV_VARIABLE_BAR"] = "test_value_bar" + connector._get_event = mock.MagicMock(return_value=(test_event, None)) + result, _ = connector.get_next(0.01) + assert result == { + "any": "content", + "enriched_field1": "test_value_foo", + "enriched_field2": "test_value_bar", + } + class BaseOutputTestCase(BaseConnectorTestCase): def test_is_output_instance(self):