diff --git a/CHANGELOG.md b/CHANGELOG.md index 81ee9d10f..8fa19da4a 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -29,6 +29,8 @@ ### Bugfix * ensure `logprep.abc.Component.Config` is immutable and can be applied multiple times +* remove lost callback reassign behavior from `kafka_input` connector +* remove manual commit option from `kafka_input` connector ## 13.1.2 ### Bugfix diff --git a/logprep/connector/confluent_kafka/input.py b/logprep/connector/confluent_kafka/input.py index 3052f8b83..1b2bfc6c2 100644 --- a/logprep/connector/confluent_kafka/input.py +++ b/logprep/connector/confluent_kafka/input.py @@ -32,7 +32,7 @@ from functools import cached_property, partial from socket import getfqdn from types import MappingProxyType -from typing import Callable, Optional, Tuple, Union +from typing import Optional, Tuple, Union import msgspec from attrs import define, field, validators @@ -229,6 +229,11 @@ class Config(Input.Config): - bootstrap.servers (STRING): a comma separated list of kafka brokers - group.id (STRING): a unique identifier for the consumer group + The following keys are injected by the connector and should not be set: + + - "enable.auto.offset.store" is set to "false", + - "enable.auto.commit" is set to "true", + For additional configuration options see the official: `librdkafka configuration `_. @@ -256,6 +261,8 @@ def _kafka_config(self) -> dict: """ injected_config = { "logger": logger, + "enable.auto.offset.store": "false", + "enable.auto.commit": "true", "on_commit": self._commit_callback, "stats_cb": self._stats_callback, "error_cb": self._error_callback, @@ -448,21 +455,15 @@ def _enable_auto_commit(self) -> bool: return self._config.kafka_config.get("enable.auto.commit") == "true" def batch_finished_callback(self) -> None: - """Store offsets for each kafka partition in `self._last_valid_records` - and if configured commit them. Should be called by output connectors if - they are finished processing a batch of records. + """Store offsets for each kafka partition in `self._last_valid_records`. + Should be called by output connectors if they are finished processing + a batch of records. """ if self._enable_auto_offset_store: return - self._handle_offsets(self._consumer.store_offsets) - if not self._enable_auto_commit: - self._handle_offsets(self._consumer.commit) - self._last_valid_records.clear() - - def _handle_offsets(self, offset_handler: Callable) -> None: for message in self._last_valid_records.values(): try: - offset_handler(message=message) + self._consumer.store_offsets(message=message) except KafkaException as error: raise InputWarning(self, f"{error}, {message}") from error @@ -503,8 +504,6 @@ def _lost_callback(self, consumer, topic_partitions): topic_partition.topic, topic_partition.partition, ) - topic_partition.offset = OFFSET_STORED - self._consumer.assign(topic_partitions) def shut_down(self) -> None: """Close consumer, which also commits kafka offsets.""" diff --git a/tests/unit/connector/test_confluent_kafka_input.py b/tests/unit/connector/test_confluent_kafka_input.py index 1f1042ae0..adac8003e 100644 --- a/tests/unit/connector/test_confluent_kafka_input.py +++ b/tests/unit/connector/test_confluent_kafka_input.py @@ -98,51 +98,20 @@ def test_shut_down_calls_consumer_close(self, _): self.object.shut_down() kafka_consumer.close.assert_called() - @pytest.mark.parametrize( - "settings,handlers", - [ - ( - {"enable.auto.offset.store": "false", "enable.auto.commit": "true"}, - ("store_offsets",), - ), - ( - {"enable.auto.offset.store": "false", "enable.auto.commit": "false"}, - ("store_offsets", "commit"), - ), - ({"enable.auto.offset.store": "true", "enable.auto.commit": "false"}, None), - ({"enable.auto.offset.store": "true", "enable.auto.commit": "true"}, None), - ], - ) @mock.patch("logprep.connector.confluent_kafka.input.Consumer") - def test_batch_finished_callback_calls_offsets_handler_for_setting(self, _, settings, handlers): + def test_batch_finished_callback_calls_store_offsets(self, _): input_config = deepcopy(self.CONFIG) - input_config["kafka_config"] |= settings kafka_input = Factory.create({"test": input_config}) kafka_consumer = kafka_input._consumer message = "test message" kafka_input._last_valid_records = {0: message} kafka_input.batch_finished_callback() - if handlers is None: - assert kafka_consumer.commit.call_count == 0 - assert kafka_consumer.store_offsets.call_count == 0 - else: - for handler in handlers: - getattr(kafka_consumer, handler).assert_called() - getattr(kafka_consumer, handler).assert_called_with(message=message) + kafka_consumer.store_offsets.assert_called() + kafka_consumer.store_offsets.assert_called_with(message=message) - @pytest.mark.parametrize( - "settings,handler", - [ - ({"enable.auto.offset.store": "false", "enable.auto.commit": "true"}, "store_offsets"), - ({"enable.auto.offset.store": "false", "enable.auto.commit": "false"}, "commit"), - ], - ) @mock.patch("logprep.connector.confluent_kafka.input.Consumer") - def test_batch_finished_callback_raises_input_warning_on_kafka_exception( - self, _, settings, handler - ): + def test_batch_finished_callback_raises_input_warning_on_kafka_exception(self, _): input_config = deepcopy(self.CONFIG) - input_config["kafka_config"] |= settings kafka_input = Factory.create({"test": input_config}) kafka_consumer = kafka_input._consumer return_sequence = [KafkaException("test error"), None] @@ -150,7 +119,7 @@ def test_batch_finished_callback_raises_input_warning_on_kafka_exception( def raise_generator(return_sequence): return list(reversed(return_sequence)).pop() - getattr(kafka_consumer, handler).side_effect = raise_generator(return_sequence) + kafka_consumer.store_offsets.side_effect = raise_generator(return_sequence) kafka_input._last_valid_records = {0: "message"} with pytest.raises(InputWarning): kafka_input.batch_finished_callback() @@ -286,6 +255,20 @@ def test_default_config_is_injected(self, mock_consumer): _ = self.object._consumer mock_consumer.assert_called_with(injected_config) + @mock.patch("logprep.connector.confluent_kafka.input.Consumer") + def test_auto_offset_store_and_auto_commit_are_managed_by_connector(self, mock_consumer): + config = deepcopy(self.CONFIG) + config["kafka_config"] |= { + "enable.auto.offset.store": "true", + "enable.auto.commit": "false", + } + kafka_input = Factory.create({"test": config}) + _ = kafka_input._consumer + mock_consumer.assert_called() + injected_config = mock_consumer.call_args[0][0] + assert injected_config.get("enable.auto.offset.store") == "false" + assert injected_config.get("enable.auto.commit") == "true" + @mock.patch("logprep.connector.confluent_kafka.input.Consumer") def test_client_id_can_be_overwritten(self, mock_consumer): input_config = deepcopy(self.CONFIG) @@ -338,17 +321,12 @@ def test_offset_metrics_not_initialized_with_default_label_values(self, metric_n assert len(metric_object.samples) == 0 @mock.patch("logprep.connector.confluent_kafka.input.Consumer") - def test_lost_callback_reassings_to_partitions(self, mock_consumer): - mock_partitions = [mock.MagicMock()] - self.object._consumer.assign = mock.MagicMock() - self.object._lost_callback(mock_consumer, mock_partitions) - self.object._consumer.assign.assert_called_with(mock_partitions) - - @mock.patch("logprep.connector.confluent_kafka.input.Consumer") - def test_lost_callback_counts_warnings(self, mock_consumer): + def test_lost_callback_counts_warnings_and_logs(self, mock_consumer): self.object.metrics.number_of_warnings = 0 mock_partitions = [mock.MagicMock()] - self.object._lost_callback(mock_consumer, mock_partitions) + with mock.patch("logging.Logger.warning") as mock_warning: + self.object._lost_callback(mock_consumer, mock_partitions) + mock_warning.assert_called() assert self.object.metrics.number_of_warnings == 1 @mock.patch("logprep.connector.confluent_kafka.input.Consumer")