diff --git a/logprep/connector/confluent_kafka/input.py b/logprep/connector/confluent_kafka/input.py index 3c07ab3c4..7de895ba5 100644 --- a/logprep/connector/confluent_kafka/input.py +++ b/logprep/connector/confluent_kafka/input.py @@ -28,7 +28,7 @@ auto.offset.reset: "earliest" """ # pylint: enable=line-too-long -from functools import cached_property +from functools import cached_property, partial from logging import Logger from socket import getfqdn from typing import Callable, Optional, Tuple, Union @@ -53,6 +53,7 @@ Input, WarningInputError, ) +from logprep.util.validators import dict_with_keys_validator DEFAULTS = { "enable.auto.offset.store": "false", @@ -175,7 +176,8 @@ class Config(Input.Config): key_validator=validators.instance_of(str), value_validator=validators.instance_of(str), ), - ], + partial(dict_with_keys_validator, expected_keys=["bootstrap.servers", "group.id"]), + ] ) """ Kafka configuration for the kafka client. At minimum the following keys must be set: diff --git a/logprep/connector/confluent_kafka/output.py b/logprep/connector/confluent_kafka/output.py index d828d7d3f..e429da22f 100644 --- a/logprep/connector/confluent_kafka/output.py +++ b/logprep/connector/confluent_kafka/output.py @@ -27,7 +27,7 @@ import json from datetime import datetime -from functools import cached_property +from functools import cached_property, partial from logging import Logger from socket import getfqdn from typing import Optional @@ -35,8 +35,8 @@ from attrs import define, field, validators from confluent_kafka import KafkaException, Producer -from logprep.abc.connector import Connector from logprep.abc.output import CriticalOutputError, FatalOutputError, Output +from logprep.util.validators import dict_with_keys_validator DEFAULTS = { "request.required.acks": "-1", @@ -108,6 +108,7 @@ class Config(Output.Config): key_validator=validators.instance_of(str), value_validator=validators.instance_of((str, dict)), ), + partial(dict_with_keys_validator, expected_keys=["bootstrap.servers"]), ], factory=dict, ) diff --git a/tests/unit/connector/test_confluent_kafka_input.py b/tests/unit/connector/test_confluent_kafka_input.py index 4a7724674..ee63a38ae 100644 --- a/tests/unit/connector/test_confluent_kafka_input.py +++ b/tests/unit/connector/test_confluent_kafka_input.py @@ -18,6 +18,7 @@ WarningInputError, ) from logprep.factory import Factory +from logprep.factory_error import InvalidConfigurationError from tests.unit.connector.base import BaseInputTestCase from tests.unit.connector.test_confluent_kafka_common import ( CommonConfluentKafkaTestCase, @@ -288,3 +289,11 @@ def test_raises_fatal_input_error_if_poll_raises_runtime_error(self, _): self.object._consumer.poll.side_effect = RuntimeError("test error") with pytest.raises(FatalInputError, match="test error"): self.object.get_next(0.01) + + def test_raises_value_error_if_mandatory_parameters_not_set(self): + config = deepcopy(self.CONFIG) + config.get("kafka_config").pop("bootstrap.servers") + config.get("kafka_config").pop("group.id") + expected_error_message = r"keys are missing: {'(bootstrap.servers|group.id)', '(bootstrap.servers|group.id)'}" # pylint: disable=line-too-long + with pytest.raises(InvalidConfigurationError, match=expected_error_message): + Factory.create({"test": config}, logger=self.logger) diff --git a/tests/unit/connector/test_confluent_kafka_output.py b/tests/unit/connector/test_confluent_kafka_output.py index e63031bb8..c9ba7d7e0 100644 --- a/tests/unit/connector/test_confluent_kafka_output.py +++ b/tests/unit/connector/test_confluent_kafka_output.py @@ -7,6 +7,7 @@ import json import socket +from copy import deepcopy from pathlib import Path from unittest import mock @@ -14,6 +15,7 @@ from logprep.abc.output import CriticalOutputError, FatalOutputError from logprep.factory import Factory +from logprep.factory_error import InvalidConfigurationError from tests.unit.connector.base import BaseOutputTestCase from tests.unit.connector.test_confluent_kafka_common import ( CommonConfluentKafkaTestCase, @@ -164,3 +166,10 @@ def test_metrics_expose_returns_data(self): } # pylint: enable=line-too-long assert self.object.metrics.expose() == expected + + def test_raises_value_error_if_mandatory_parameters_not_set(self): + config = deepcopy(self.CONFIG) + config.get("kafka_config").pop("bootstrap.servers") + expected_error_message = r"keys are missing: {'bootstrap.servers'}" + with pytest.raises(InvalidConfigurationError, match=expected_error_message): + Factory.create({"test": config}, logger=self.logger)