From a4f18279468779ab66c4291bf92939dc5bf243a2 Mon Sep 17 00:00:00 2001 From: dtrai2 Date: Thu, 31 Oct 2024 15:10:56 +0100 Subject: [PATCH 1/2] fix store_offsets call when last_valid_record is None - Prevent store_offsets from being called before the first message is pulled. - Add unit test to ensure store_offsets is not called incorrectly. --- CHANGELOG.md | 7 +++++-- logprep/connector/confluent_kafka/input.py | 3 +++ tests/unit/connector/test_confluent_kafka_input.py | 9 +++++++++ 3 files changed, 17 insertions(+), 2 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index c5f71bc8b..3c6620e4e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,6 +6,9 @@ ### Improvements ### Bugfix +- fix `confluent_kafka.store_offsets` if `last_valid_record` is `None`, can happen if a rebalancing happens + before the first message was pulled. + ## 14.0.0 ### Breaking @@ -53,7 +56,7 @@ * fixes a bug not increasing but decreasing timeout throttle factor of ThrottlingQueue * handle DecodeError and unexpected Exceptions on requests in `http_input` separately -* fixes unbound local error in http input connector +* fixes unbound local error in http input connector ## 13.1.1 ### Improvements @@ -97,7 +100,7 @@ ### Bugfix -* This release limits the mysql-connector-python dependency to have version less the 9 +* This release limits the mysql-connector-python dependency to have version less the 9 ## 13.0.0 diff --git a/logprep/connector/confluent_kafka/input.py b/logprep/connector/confluent_kafka/input.py index 53ce16f96..0ce2340f7 100644 --- a/logprep/connector/confluent_kafka/input.py +++ b/logprep/connector/confluent_kafka/input.py @@ -473,6 +473,9 @@ def batch_finished_callback(self) -> None: """ if self._enable_auto_offset_store: return + # in case the ConfluentKafkaInput._revoke_callback is triggered before the first message was polled + if not self._last_valid_record: + return try: self._consumer.store_offsets(message=self._last_valid_record) except KafkaException as error: diff --git a/tests/unit/connector/test_confluent_kafka_input.py b/tests/unit/connector/test_confluent_kafka_input.py index e2c2ebd2d..da560c642 100644 --- a/tests/unit/connector/test_confluent_kafka_input.py +++ b/tests/unit/connector/test_confluent_kafka_input.py @@ -108,6 +108,15 @@ def test_batch_finished_callback_calls_store_offsets(self, _): kafka_consumer.store_offsets.assert_called() kafka_consumer.store_offsets.assert_called_with(message=message) + @mock.patch("logprep.connector.confluent_kafka.input.Consumer") + def test_batch_finished_callback_calls_store_offsets(self, _): + input_config = deepcopy(self.CONFIG) + kafka_input = Factory.create({"test": input_config}) + kafka_consumer = kafka_input._consumer + kafka_input._last_valid_record = None + kafka_input.batch_finished_callback() + kafka_consumer.store_offsets.assert_not_called() + @mock.patch("logprep.connector.confluent_kafka.input.Consumer") def test_batch_finished_callback_raises_input_warning_on_kafka_exception(self, _): input_config = deepcopy(self.CONFIG) From 0570dc4385321a6c392f4a8c8e597da62b3a84a9 Mon Sep 17 00:00:00 2001 From: dtrai2 Date: Mon, 4 Nov 2024 14:33:25 +0100 Subject: [PATCH 2/2] fix typo in variable name in test_confluent_kafka_input.py - corrected variable name from `_last_valid_records` to `_last_valid_record` --- tests/unit/connector/test_confluent_kafka_input.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/unit/connector/test_confluent_kafka_input.py b/tests/unit/connector/test_confluent_kafka_input.py index da560c642..fc284e4f8 100644 --- a/tests/unit/connector/test_confluent_kafka_input.py +++ b/tests/unit/connector/test_confluent_kafka_input.py @@ -128,7 +128,7 @@ def raise_generator(return_sequence): return list(reversed(return_sequence)).pop() kafka_consumer.store_offsets.side_effect = raise_generator(return_sequence) - kafka_input._last_valid_records = {0: "message"} + kafka_input._last_valid_record = {0: "message"} with pytest.raises(InputWarning): kafka_input.batch_finished_callback()