Skip to content

Commit

Permalink
validate kafka_config
Browse files Browse the repository at this point in the history
  • Loading branch information
ekneg54 committed Oct 11, 2023
1 parent f6f6e8c commit 50689a2
Show file tree
Hide file tree
Showing 4 changed files with 25 additions and 4 deletions.
6 changes: 4 additions & 2 deletions logprep/connector/confluent_kafka/input.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
auto.offset.reset: "earliest"
"""
# pylint: enable=line-too-long
from functools import cached_property
from functools import cached_property, partial
from logging import Logger
from socket import getfqdn
from typing import Callable, Optional, Tuple, Union
Expand All @@ -53,6 +53,7 @@
Input,
WarningInputError,
)
from logprep.util.validators import dict_with_keys_validator

DEFAULTS = {
"enable.auto.offset.store": "false",
Expand Down Expand Up @@ -175,7 +176,8 @@ class Config(Input.Config):
key_validator=validators.instance_of(str),
value_validator=validators.instance_of(str),
),
],
partial(dict_with_keys_validator, expected_keys=["bootstrap.servers", "group.id"]),
]
)
""" Kafka configuration for the kafka client.
At minimum the following keys must be set:
Expand Down
5 changes: 3 additions & 2 deletions logprep/connector/confluent_kafka/output.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,16 +27,16 @@

import json
from datetime import datetime
from functools import cached_property
from functools import cached_property, partial
from logging import Logger
from socket import getfqdn
from typing import Optional

from attrs import define, field, validators
from confluent_kafka import KafkaException, Producer

from logprep.abc.connector import Connector
from logprep.abc.output import CriticalOutputError, FatalOutputError, Output
from logprep.util.validators import dict_with_keys_validator

DEFAULTS = {
"request.required.acks": "-1",
Expand Down Expand Up @@ -108,6 +108,7 @@ class Config(Output.Config):
key_validator=validators.instance_of(str),
value_validator=validators.instance_of((str, dict)),
),
partial(dict_with_keys_validator, expected_keys=["bootstrap.servers"]),
],
factory=dict,
)
Expand Down
9 changes: 9 additions & 0 deletions tests/unit/connector/test_confluent_kafka_input.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
WarningInputError,
)
from logprep.factory import Factory
from logprep.factory_error import InvalidConfigurationError
from tests.unit.connector.base import BaseInputTestCase
from tests.unit.connector.test_confluent_kafka_common import (
CommonConfluentKafkaTestCase,
Expand Down Expand Up @@ -288,3 +289,11 @@ def test_raises_fatal_input_error_if_poll_raises_runtime_error(self, _):
self.object._consumer.poll.side_effect = RuntimeError("test error")
with pytest.raises(FatalInputError, match="test error"):
self.object.get_next(0.01)

def test_raises_value_error_if_mandatory_parameters_not_set(self):
config = deepcopy(self.CONFIG)
config.get("kafka_config").pop("bootstrap.servers")
config.get("kafka_config").pop("group.id")
expected_error_message = r"keys are missing: {'(bootstrap.servers|group.id)', '(bootstrap.servers|group.id)'}" # pylint: disable=line-too-long
with pytest.raises(InvalidConfigurationError, match=expected_error_message):
Factory.create({"test": config}, logger=self.logger)
9 changes: 9 additions & 0 deletions tests/unit/connector/test_confluent_kafka_output.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,15 @@

import json
import socket
from copy import deepcopy
from pathlib import Path
from unittest import mock

import pytest

from logprep.abc.output import CriticalOutputError, FatalOutputError
from logprep.factory import Factory
from logprep.factory_error import InvalidConfigurationError
from tests.unit.connector.base import BaseOutputTestCase
from tests.unit.connector.test_confluent_kafka_common import (
CommonConfluentKafkaTestCase,
Expand Down Expand Up @@ -164,3 +166,10 @@ def test_metrics_expose_returns_data(self):
}
# pylint: enable=line-too-long
assert self.object.metrics.expose() == expected

def test_raises_value_error_if_mandatory_parameters_not_set(self):
config = deepcopy(self.CONFIG)
config.get("kafka_config").pop("bootstrap.servers")
expected_error_message = r"keys are missing: {'bootstrap.servers'}"
with pytest.raises(InvalidConfigurationError, match=expected_error_message):
Factory.create({"test": config}, logger=self.logger)

0 comments on commit 50689a2

Please sign in to comment.