diff --git a/logprep/connector/confluent_kafka/input.py b/logprep/connector/confluent_kafka/input.py index 0ce2340f7..c3625bd99 100644 --- a/logprep/connector/confluent_kafka/input.py +++ b/logprep/connector/confluent_kafka/input.py @@ -273,7 +273,8 @@ def _kafka_config(self) -> dict: DEFAULTS.update({"client.id": getfqdn()}) DEFAULTS.update( { - "group.instance.id": f"{getfqdn().strip('.')}-Pipeline{self.pipeline_index}-pid{os.getpid()}" + "group.instance.id": f"{getfqdn().strip('.')}-" + f"Pipeline{self.pipeline_index}-pid{os.getpid()}" } ) return DEFAULTS | self._config.kafka_config | injected_config @@ -288,6 +289,9 @@ def _admin(self) -> AdminClient: confluent_kafka admin client object """ admin_config = {"bootstrap.servers": self._config.kafka_config["bootstrap.servers"]} + for key, value in self._config.kafka_config.items(): + if key.startswith(("security.", "ssl.")): + admin_config[key] = value return AdminClient(admin_config) @cached_property @@ -375,7 +379,8 @@ def _commit_callback( if offset in SPECIAL_OFFSETS: offset = 0 labels = { - "description": f"topic: {self._config.topic} - partition: {topic_partition.partition}" + "description": f"topic: {self._config.topic} - " + f"partition: {topic_partition.partition}" } self.metrics.committed_offsets.add_with_labels(offset, labels) @@ -473,7 +478,8 @@ def batch_finished_callback(self) -> None: """ if self._enable_auto_offset_store: return - # in case the ConfluentKafkaInput._revoke_callback is triggered before the first message was polled + # in case the ConfluentKafkaInput._revoke_callback is triggered before the first message + # was polled if not self._last_valid_record: return try: diff --git a/logprep/connector/confluent_kafka/output.py b/logprep/connector/confluent_kafka/output.py index 2c02a323a..dca6b545e 100644 --- a/logprep/connector/confluent_kafka/output.py +++ b/logprep/connector/confluent_kafka/output.py @@ -212,6 +212,9 @@ def _admin(self) -> AdminClient: confluent_kafka admin client object """ admin_config = {"bootstrap.servers": self._config.kafka_config["bootstrap.servers"]} + for key, value in self._config.kafka_config.items(): + if key.startswith(("security.", "ssl.")): + admin_config[key] = value return AdminClient(admin_config) @cached_property @@ -266,7 +269,10 @@ def describe(self) -> str: """ base_description = super().describe() - return f"{base_description} - Kafka Output: {self._config.kafka_config.get('bootstrap.servers')}" + return ( + f"{base_description} - Kafka Output: " + f"{self._config.kafka_config.get('bootstrap.servers')}" + ) def store(self, document: dict) -> Optional[bool]: """Store a document in the producer topic. diff --git a/tests/unit/connector/test_confluent_kafka_input.py b/tests/unit/connector/test_confluent_kafka_input.py index d2cf0344d..50c5d4acf 100644 --- a/tests/unit/connector/test_confluent_kafka_input.py +++ b/tests/unit/connector/test_confluent_kafka_input.py @@ -9,7 +9,7 @@ from unittest import mock import pytest -from confluent_kafka import OFFSET_BEGINNING, KafkaError, KafkaException, Message +from confluent_kafka import OFFSET_BEGINNING, KafkaError, KafkaException from logprep.abc.input import ( CriticalInputError, @@ -71,7 +71,8 @@ def test_get_next_raises_critical_input_exception_for_invalid_confluent_kafka_re mock_record.error = mock.MagicMock( return_value=KafkaError( error=3, - reason="Subscribed topic not available: (Test Instance Name) : Broker: Unknown topic or partition", + reason="Subscribed topic not available: (Test Instance Name) : " + "Broker: Unknown topic or partition", fatal=False, retriable=False, txn_requires_abort=False, @@ -109,7 +110,7 @@ def test_batch_finished_callback_calls_store_offsets(self, _): kafka_consumer.store_offsets.assert_called_with(message=message) @mock.patch("logprep.connector.confluent_kafka.input.Consumer") - def test_batch_finished_callback_calls_store_offsets(self, _): + def test_batch_finished_callback_does_not_call_store_offsets(self, _): input_config = deepcopy(self.CONFIG) kafka_input = Factory.create({"test": input_config}) kafka_consumer = kafka_input._consumer @@ -424,3 +425,32 @@ def test_health_counts_metrics_on_kafka_exception(self): self.object._consumer.list_topics.side_effect = KafkaException("test error") assert not self.object.health() assert self.object.metrics.number_of_errors == 1 + + @pytest.mark.parametrize( + ["kafka_config_update", "expected_admin_client_config"], + [ + ({}, {"bootstrap.servers": "testserver:9092"}), + ({"statistics.foo": "bar"}, {"bootstrap.servers": "testserver:9092"}), + ( + {"security.foo": "bar"}, + {"bootstrap.servers": "testserver:9092", "security.foo": "bar"}, + ), + ( + {"ssl.foo": "bar"}, + {"bootstrap.servers": "testserver:9092", "ssl.foo": "bar"}, + ), + ( + {"security.foo": "bar", "ssl.foo": "bar"}, + {"bootstrap.servers": "testserver:9092", "security.foo": "bar", "ssl.foo": "bar"}, + ), + ], + ) + @mock.patch("logprep.connector.confluent_kafka.input.AdminClient") + def test_set_security_related_config_in_admin_client( + self, admin_client, kafka_config_update, expected_admin_client_config + ): + new_kafka_config = deepcopy(self.CONFIG) + new_kafka_config["kafka_config"].update(kafka_config_update) + input_connector = Factory.create({"input_connector": new_kafka_config}) + _ = input_connector._admin + admin_client.assert_called_with(expected_admin_client_config) diff --git a/tests/unit/connector/test_confluent_kafka_output.py b/tests/unit/connector/test_confluent_kafka_output.py index 35cb590c4..37b629fdb 100644 --- a/tests/unit/connector/test_confluent_kafka_output.py +++ b/tests/unit/connector/test_confluent_kafka_output.py @@ -3,7 +3,6 @@ # pylint: disable=wrong-import-position # pylint: disable=wrong-import-order # pylint: disable=attribute-defined-outside-init -# pylint: disable=no-self-use import json from copy import deepcopy @@ -168,3 +167,32 @@ def test_shutdown_logs_and_counts_error_if_queue_not_fully_flushed(self): def test_health_returns_bool(self): with mock.patch.object(self.object, "_admin"): super().test_health_returns_bool() + + @pytest.mark.parametrize( + ["kafka_config_update", "expected_admin_client_config"], + [ + ({}, {"bootstrap.servers": "localhost:9092"}), + ({"statistics.foo": "bar"}, {"bootstrap.servers": "localhost:9092"}), + ( + {"security.foo": "bar"}, + {"bootstrap.servers": "localhost:9092", "security.foo": "bar"}, + ), + ( + {"ssl.foo": "bar"}, + {"bootstrap.servers": "localhost:9092", "ssl.foo": "bar"}, + ), + ( + {"security.foo": "bar", "ssl.foo": "bar"}, + {"bootstrap.servers": "localhost:9092", "security.foo": "bar", "ssl.foo": "bar"}, + ), + ], + ) + @mock.patch("logprep.connector.confluent_kafka.output.AdminClient") + def test_set_security_related_config_in_admin_client( + self, admin_client, kafka_config_update, expected_admin_client_config + ): + new_kafka_config = deepcopy(self.CONFIG) + new_kafka_config["kafka_config"].update(kafka_config_update) + output_connector = Factory.create({"output_connector": new_kafka_config}) + _ = output_connector._admin + admin_client.assert_called_with(expected_admin_client_config)