From 8bbb38e676881a13187261f6cd6b833d92c8298d Mon Sep 17 00:00:00 2001 From: dtrai2 <95028228+dtrai2@users.noreply.github.com> Date: Tue, 5 Nov 2024 14:50:15 +0100 Subject: [PATCH] fix store_offsets call when last_valid_record is None (#693) - Prevent store_offsets from being called before the first message is pulled. - Add unit test to ensure store_offsets is not called incorrectly. - Can happen on rebalancing before first message poll --- CHANGELOG.md | 7 +++++-- logprep/connector/confluent_kafka/input.py | 3 +++ tests/unit/connector/test_confluent_kafka_input.py | 11 ++++++++++- 3 files changed, 18 insertions(+), 3 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index c5f71bc8b..3c6620e4e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,6 +6,9 @@ ### Improvements ### Bugfix +- fix `confluent_kafka.store_offsets` if `last_valid_record` is `None`, can happen if a rebalancing happens + before the first message was pulled. + ## 14.0.0 ### Breaking @@ -53,7 +56,7 @@ * fixes a bug not increasing but decreasing timeout throttle factor of ThrottlingQueue * handle DecodeError and unexpected Exceptions on requests in `http_input` separately -* fixes unbound local error in http input connector +* fixes unbound local error in http input connector ## 13.1.1 ### Improvements @@ -97,7 +100,7 @@ ### Bugfix -* This release limits the mysql-connector-python dependency to have version less the 9 +* This release limits the mysql-connector-python dependency to have version less the 9 ## 13.0.0 diff --git a/logprep/connector/confluent_kafka/input.py b/logprep/connector/confluent_kafka/input.py index 53ce16f96..0ce2340f7 100644 --- a/logprep/connector/confluent_kafka/input.py +++ b/logprep/connector/confluent_kafka/input.py @@ -473,6 +473,9 @@ def batch_finished_callback(self) -> None: """ if self._enable_auto_offset_store: return + # in case the ConfluentKafkaInput._revoke_callback is triggered before the first message was polled + if not self._last_valid_record: + return try: self._consumer.store_offsets(message=self._last_valid_record) except KafkaException as error: diff --git a/tests/unit/connector/test_confluent_kafka_input.py b/tests/unit/connector/test_confluent_kafka_input.py index e2c2ebd2d..fc284e4f8 100644 --- a/tests/unit/connector/test_confluent_kafka_input.py +++ b/tests/unit/connector/test_confluent_kafka_input.py @@ -108,6 +108,15 @@ def test_batch_finished_callback_calls_store_offsets(self, _): kafka_consumer.store_offsets.assert_called() kafka_consumer.store_offsets.assert_called_with(message=message) + @mock.patch("logprep.connector.confluent_kafka.input.Consumer") + def test_batch_finished_callback_calls_store_offsets(self, _): + input_config = deepcopy(self.CONFIG) + kafka_input = Factory.create({"test": input_config}) + kafka_consumer = kafka_input._consumer + kafka_input._last_valid_record = None + kafka_input.batch_finished_callback() + kafka_consumer.store_offsets.assert_not_called() + @mock.patch("logprep.connector.confluent_kafka.input.Consumer") def test_batch_finished_callback_raises_input_warning_on_kafka_exception(self, _): input_config = deepcopy(self.CONFIG) @@ -119,7 +128,7 @@ def raise_generator(return_sequence): return list(reversed(return_sequence)).pop() kafka_consumer.store_offsets.side_effect = raise_generator(return_sequence) - kafka_input._last_valid_records = {0: "message"} + kafka_input._last_valid_record = {0: "message"} with pytest.raises(InputWarning): kafka_input.batch_finished_callback()