Skip to content

Commit

Permalink
Remove broken kafka implementations (#678)
Browse files Browse the repository at this point in the history
* remove broken kafka implementations
  • Loading branch information
ekneg54 authored Oct 4, 2024
1 parent 72cb7f9 commit d34805f
Show file tree
Hide file tree
Showing 3 changed files with 37 additions and 58 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@
### Bugfix

* ensure `logprep.abc.Component.Config` is immutable and can be applied multiple times
* remove lost callback reassign behavior from `kafka_input` connector
* remove manual commit option from `kafka_input` connector

## 13.1.2
### Bugfix
Expand Down
25 changes: 12 additions & 13 deletions logprep/connector/confluent_kafka/input.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
from functools import cached_property, partial
from socket import getfqdn
from types import MappingProxyType
from typing import Callable, Optional, Tuple, Union
from typing import Optional, Tuple, Union

import msgspec
from attrs import define, field, validators
Expand Down Expand Up @@ -229,6 +229,11 @@ class Config(Input.Config):
- bootstrap.servers (STRING): a comma separated list of kafka brokers
- group.id (STRING): a unique identifier for the consumer group
The following keys are injected by the connector and should not be set:
- "enable.auto.offset.store" is set to "false",
- "enable.auto.commit" is set to "true",
For additional configuration options see the official:
`librdkafka configuration <https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md>`_.
Expand Down Expand Up @@ -256,6 +261,8 @@ def _kafka_config(self) -> dict:
"""
injected_config = {
"logger": logger,
"enable.auto.offset.store": "false",
"enable.auto.commit": "true",
"on_commit": self._commit_callback,
"stats_cb": self._stats_callback,
"error_cb": self._error_callback,
Expand Down Expand Up @@ -448,21 +455,15 @@ def _enable_auto_commit(self) -> bool:
return self._config.kafka_config.get("enable.auto.commit") == "true"

def batch_finished_callback(self) -> None:
"""Store offsets for each kafka partition in `self._last_valid_records`
and if configured commit them. Should be called by output connectors if
they are finished processing a batch of records.
"""Store offsets for each kafka partition in `self._last_valid_records`.
Should be called by output connectors if they are finished processing
a batch of records.
"""
if self._enable_auto_offset_store:
return
self._handle_offsets(self._consumer.store_offsets)
if not self._enable_auto_commit:
self._handle_offsets(self._consumer.commit)
self._last_valid_records.clear()

def _handle_offsets(self, offset_handler: Callable) -> None:
for message in self._last_valid_records.values():
try:
offset_handler(message=message)
self._consumer.store_offsets(message=message)
except KafkaException as error:
raise InputWarning(self, f"{error}, {message}") from error

Expand Down Expand Up @@ -503,8 +504,6 @@ def _lost_callback(self, consumer, topic_partitions):
topic_partition.topic,
topic_partition.partition,
)
topic_partition.offset = OFFSET_STORED
self._consumer.assign(topic_partitions)

def shut_down(self) -> None:
"""Close consumer, which also commits kafka offsets."""
Expand Down
68 changes: 23 additions & 45 deletions tests/unit/connector/test_confluent_kafka_input.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,59 +98,28 @@ def test_shut_down_calls_consumer_close(self, _):
self.object.shut_down()
kafka_consumer.close.assert_called()

@pytest.mark.parametrize(
"settings,handlers",
[
(
{"enable.auto.offset.store": "false", "enable.auto.commit": "true"},
("store_offsets",),
),
(
{"enable.auto.offset.store": "false", "enable.auto.commit": "false"},
("store_offsets", "commit"),
),
({"enable.auto.offset.store": "true", "enable.auto.commit": "false"}, None),
({"enable.auto.offset.store": "true", "enable.auto.commit": "true"}, None),
],
)
@mock.patch("logprep.connector.confluent_kafka.input.Consumer")
def test_batch_finished_callback_calls_offsets_handler_for_setting(self, _, settings, handlers):
def test_batch_finished_callback_calls_store_offsets(self, _):
input_config = deepcopy(self.CONFIG)
input_config["kafka_config"] |= settings
kafka_input = Factory.create({"test": input_config})
kafka_consumer = kafka_input._consumer
message = "test message"
kafka_input._last_valid_records = {0: message}
kafka_input.batch_finished_callback()
if handlers is None:
assert kafka_consumer.commit.call_count == 0
assert kafka_consumer.store_offsets.call_count == 0
else:
for handler in handlers:
getattr(kafka_consumer, handler).assert_called()
getattr(kafka_consumer, handler).assert_called_with(message=message)
kafka_consumer.store_offsets.assert_called()
kafka_consumer.store_offsets.assert_called_with(message=message)

@pytest.mark.parametrize(
"settings,handler",
[
({"enable.auto.offset.store": "false", "enable.auto.commit": "true"}, "store_offsets"),
({"enable.auto.offset.store": "false", "enable.auto.commit": "false"}, "commit"),
],
)
@mock.patch("logprep.connector.confluent_kafka.input.Consumer")
def test_batch_finished_callback_raises_input_warning_on_kafka_exception(
self, _, settings, handler
):
def test_batch_finished_callback_raises_input_warning_on_kafka_exception(self, _):
input_config = deepcopy(self.CONFIG)
input_config["kafka_config"] |= settings
kafka_input = Factory.create({"test": input_config})
kafka_consumer = kafka_input._consumer
return_sequence = [KafkaException("test error"), None]

def raise_generator(return_sequence):
return list(reversed(return_sequence)).pop()

getattr(kafka_consumer, handler).side_effect = raise_generator(return_sequence)
kafka_consumer.store_offsets.side_effect = raise_generator(return_sequence)
kafka_input._last_valid_records = {0: "message"}
with pytest.raises(InputWarning):
kafka_input.batch_finished_callback()
Expand Down Expand Up @@ -286,6 +255,20 @@ def test_default_config_is_injected(self, mock_consumer):
_ = self.object._consumer
mock_consumer.assert_called_with(injected_config)

@mock.patch("logprep.connector.confluent_kafka.input.Consumer")
def test_auto_offset_store_and_auto_commit_are_managed_by_connector(self, mock_consumer):
config = deepcopy(self.CONFIG)
config["kafka_config"] |= {
"enable.auto.offset.store": "true",
"enable.auto.commit": "false",
}
kafka_input = Factory.create({"test": config})
_ = kafka_input._consumer
mock_consumer.assert_called()
injected_config = mock_consumer.call_args[0][0]
assert injected_config.get("enable.auto.offset.store") == "false"
assert injected_config.get("enable.auto.commit") == "true"

@mock.patch("logprep.connector.confluent_kafka.input.Consumer")
def test_client_id_can_be_overwritten(self, mock_consumer):
input_config = deepcopy(self.CONFIG)
Expand Down Expand Up @@ -338,17 +321,12 @@ def test_offset_metrics_not_initialized_with_default_label_values(self, metric_n
assert len(metric_object.samples) == 0

@mock.patch("logprep.connector.confluent_kafka.input.Consumer")
def test_lost_callback_reassings_to_partitions(self, mock_consumer):
mock_partitions = [mock.MagicMock()]
self.object._consumer.assign = mock.MagicMock()
self.object._lost_callback(mock_consumer, mock_partitions)
self.object._consumer.assign.assert_called_with(mock_partitions)

@mock.patch("logprep.connector.confluent_kafka.input.Consumer")
def test_lost_callback_counts_warnings(self, mock_consumer):
def test_lost_callback_counts_warnings_and_logs(self, mock_consumer):
self.object.metrics.number_of_warnings = 0
mock_partitions = [mock.MagicMock()]
self.object._lost_callback(mock_consumer, mock_partitions)
with mock.patch("logging.Logger.warning") as mock_warning:
self.object._lost_callback(mock_consumer, mock_partitions)
mock_warning.assert_called()
assert self.object.metrics.number_of_warnings == 1

@mock.patch("logprep.connector.confluent_kafka.input.Consumer")
Expand Down

0 comments on commit d34805f

Please sign in to comment.