Skip to content

Commit

Permalink
Add missing SSL config to kafka admin clients (#710)
Browse files Browse the repository at this point in the history
* Add missing SSL config to kafka admin clients
  • Loading branch information
ppcad authored Nov 27, 2024
1 parent e976e88 commit af153a2
Show file tree
Hide file tree
Showing 4 changed files with 78 additions and 8 deletions.
12 changes: 9 additions & 3 deletions logprep/connector/confluent_kafka/input.py
Original file line number Diff line number Diff line change
Expand Up @@ -273,7 +273,8 @@ def _kafka_config(self) -> dict:
DEFAULTS.update({"client.id": getfqdn()})
DEFAULTS.update(
{
"group.instance.id": f"{getfqdn().strip('.')}-Pipeline{self.pipeline_index}-pid{os.getpid()}"
"group.instance.id": f"{getfqdn().strip('.')}-"
f"Pipeline{self.pipeline_index}-pid{os.getpid()}"
}
)
return DEFAULTS | self._config.kafka_config | injected_config
Expand All @@ -288,6 +289,9 @@ def _admin(self) -> AdminClient:
confluent_kafka admin client object
"""
admin_config = {"bootstrap.servers": self._config.kafka_config["bootstrap.servers"]}
for key, value in self._config.kafka_config.items():
if key.startswith(("security.", "ssl.")):
admin_config[key] = value
return AdminClient(admin_config)

@cached_property
Expand Down Expand Up @@ -375,7 +379,8 @@ def _commit_callback(
if offset in SPECIAL_OFFSETS:
offset = 0
labels = {
"description": f"topic: {self._config.topic} - partition: {topic_partition.partition}"
"description": f"topic: {self._config.topic} - "
f"partition: {topic_partition.partition}"
}
self.metrics.committed_offsets.add_with_labels(offset, labels)

Expand Down Expand Up @@ -473,7 +478,8 @@ def batch_finished_callback(self) -> None:
"""
if self._enable_auto_offset_store:
return
# in case the ConfluentKafkaInput._revoke_callback is triggered before the first message was polled
# in case the ConfluentKafkaInput._revoke_callback is triggered before the first message
# was polled
if not self._last_valid_record:
return
try:
Expand Down
8 changes: 7 additions & 1 deletion logprep/connector/confluent_kafka/output.py
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,9 @@ def _admin(self) -> AdminClient:
confluent_kafka admin client object
"""
admin_config = {"bootstrap.servers": self._config.kafka_config["bootstrap.servers"]}
for key, value in self._config.kafka_config.items():
if key.startswith(("security.", "ssl.")):
admin_config[key] = value
return AdminClient(admin_config)

@cached_property
Expand Down Expand Up @@ -266,7 +269,10 @@ def describe(self) -> str:
"""
base_description = super().describe()
return f"{base_description} - Kafka Output: {self._config.kafka_config.get('bootstrap.servers')}"
return (
f"{base_description} - Kafka Output: "
f"{self._config.kafka_config.get('bootstrap.servers')}"
)

def store(self, document: dict) -> Optional[bool]:
"""Store a document in the producer topic.
Expand Down
36 changes: 33 additions & 3 deletions tests/unit/connector/test_confluent_kafka_input.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
from unittest import mock

import pytest
from confluent_kafka import OFFSET_BEGINNING, KafkaError, KafkaException, Message
from confluent_kafka import OFFSET_BEGINNING, KafkaError, KafkaException

from logprep.abc.input import (
CriticalInputError,
Expand Down Expand Up @@ -71,7 +71,8 @@ def test_get_next_raises_critical_input_exception_for_invalid_confluent_kafka_re
mock_record.error = mock.MagicMock(
return_value=KafkaError(
error=3,
reason="Subscribed topic not available: (Test Instance Name) : Broker: Unknown topic or partition",
reason="Subscribed topic not available: (Test Instance Name) : "
"Broker: Unknown topic or partition",
fatal=False,
retriable=False,
txn_requires_abort=False,
Expand Down Expand Up @@ -109,7 +110,7 @@ def test_batch_finished_callback_calls_store_offsets(self, _):
kafka_consumer.store_offsets.assert_called_with(message=message)

@mock.patch("logprep.connector.confluent_kafka.input.Consumer")
def test_batch_finished_callback_calls_store_offsets(self, _):
def test_batch_finished_callback_does_not_call_store_offsets(self, _):
input_config = deepcopy(self.CONFIG)
kafka_input = Factory.create({"test": input_config})
kafka_consumer = kafka_input._consumer
Expand Down Expand Up @@ -424,3 +425,32 @@ def test_health_counts_metrics_on_kafka_exception(self):
self.object._consumer.list_topics.side_effect = KafkaException("test error")
assert not self.object.health()
assert self.object.metrics.number_of_errors == 1

@pytest.mark.parametrize(
["kafka_config_update", "expected_admin_client_config"],
[
({}, {"bootstrap.servers": "testserver:9092"}),
({"statistics.foo": "bar"}, {"bootstrap.servers": "testserver:9092"}),
(
{"security.foo": "bar"},
{"bootstrap.servers": "testserver:9092", "security.foo": "bar"},
),
(
{"ssl.foo": "bar"},
{"bootstrap.servers": "testserver:9092", "ssl.foo": "bar"},
),
(
{"security.foo": "bar", "ssl.foo": "bar"},
{"bootstrap.servers": "testserver:9092", "security.foo": "bar", "ssl.foo": "bar"},
),
],
)
@mock.patch("logprep.connector.confluent_kafka.input.AdminClient")
def test_set_security_related_config_in_admin_client(
self, admin_client, kafka_config_update, expected_admin_client_config
):
new_kafka_config = deepcopy(self.CONFIG)
new_kafka_config["kafka_config"].update(kafka_config_update)
input_connector = Factory.create({"input_connector": new_kafka_config})
_ = input_connector._admin
admin_client.assert_called_with(expected_admin_client_config)
30 changes: 29 additions & 1 deletion tests/unit/connector/test_confluent_kafka_output.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
# pylint: disable=wrong-import-position
# pylint: disable=wrong-import-order
# pylint: disable=attribute-defined-outside-init
# pylint: disable=no-self-use

import json
from copy import deepcopy
Expand Down Expand Up @@ -168,3 +167,32 @@ def test_shutdown_logs_and_counts_error_if_queue_not_fully_flushed(self):
def test_health_returns_bool(self):
with mock.patch.object(self.object, "_admin"):
super().test_health_returns_bool()

@pytest.mark.parametrize(
["kafka_config_update", "expected_admin_client_config"],
[
({}, {"bootstrap.servers": "localhost:9092"}),
({"statistics.foo": "bar"}, {"bootstrap.servers": "localhost:9092"}),
(
{"security.foo": "bar"},
{"bootstrap.servers": "localhost:9092", "security.foo": "bar"},
),
(
{"ssl.foo": "bar"},
{"bootstrap.servers": "localhost:9092", "ssl.foo": "bar"},
),
(
{"security.foo": "bar", "ssl.foo": "bar"},
{"bootstrap.servers": "localhost:9092", "security.foo": "bar", "ssl.foo": "bar"},
),
],
)
@mock.patch("logprep.connector.confluent_kafka.output.AdminClient")
def test_set_security_related_config_in_admin_client(
self, admin_client, kafka_config_update, expected_admin_client_config
):
new_kafka_config = deepcopy(self.CONFIG)
new_kafka_config["kafka_config"].update(kafka_config_update)
output_connector = Factory.create({"output_connector": new_kafka_config})
_ = output_connector._admin
admin_client.assert_called_with(expected_admin_client_config)

0 comments on commit af153a2

Please sign in to comment.