Skip to content

Commit

Permalink
fix store_offsets call when last_valid_record is None (#693)
Browse files Browse the repository at this point in the history
- Prevent store_offsets from being called before the first message is pulled.
- Add unit test to ensure store_offsets is not called incorrectly.
- Can happen on rebalancing before first message poll
  • Loading branch information
dtrai2 authored Nov 5, 2024
1 parent a0dc175 commit 8bbb38e
Show file tree
Hide file tree
Showing 3 changed files with 18 additions and 3 deletions.
7 changes: 5 additions & 2 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,9 @@
### Improvements
### Bugfix

- fix `confluent_kafka.store_offsets` if `last_valid_record` is `None`, can happen if a rebalancing happens
before the first message was pulled.

## 14.0.0
### Breaking

Expand Down Expand Up @@ -53,7 +56,7 @@

* fixes a bug not increasing but decreasing timeout throttle factor of ThrottlingQueue
* handle DecodeError and unexpected Exceptions on requests in `http_input` separately
* fixes unbound local error in http input connector
* fixes unbound local error in http input connector

## 13.1.1
### Improvements
Expand Down Expand Up @@ -97,7 +100,7 @@

### Bugfix

* This release limits the mysql-connector-python dependency to have version less the 9
* This release limits the mysql-connector-python dependency to have version less the 9

## 13.0.0

Expand Down
3 changes: 3 additions & 0 deletions logprep/connector/confluent_kafka/input.py
Original file line number Diff line number Diff line change
Expand Up @@ -473,6 +473,9 @@ def batch_finished_callback(self) -> None:
"""
if self._enable_auto_offset_store:
return
# in case the ConfluentKafkaInput._revoke_callback is triggered before the first message was polled
if not self._last_valid_record:
return
try:
self._consumer.store_offsets(message=self._last_valid_record)
except KafkaException as error:
Expand Down
11 changes: 10 additions & 1 deletion tests/unit/connector/test_confluent_kafka_input.py
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,15 @@ def test_batch_finished_callback_calls_store_offsets(self, _):
kafka_consumer.store_offsets.assert_called()
kafka_consumer.store_offsets.assert_called_with(message=message)

@mock.patch("logprep.connector.confluent_kafka.input.Consumer")
def test_batch_finished_callback_calls_store_offsets(self, _):
input_config = deepcopy(self.CONFIG)
kafka_input = Factory.create({"test": input_config})
kafka_consumer = kafka_input._consumer
kafka_input._last_valid_record = None
kafka_input.batch_finished_callback()
kafka_consumer.store_offsets.assert_not_called()

@mock.patch("logprep.connector.confluent_kafka.input.Consumer")
def test_batch_finished_callback_raises_input_warning_on_kafka_exception(self, _):
input_config = deepcopy(self.CONFIG)
Expand All @@ -119,7 +128,7 @@ def raise_generator(return_sequence):
return list(reversed(return_sequence)).pop()

kafka_consumer.store_offsets.side_effect = raise_generator(return_sequence)
kafka_input._last_valid_records = {0: "message"}
kafka_input._last_valid_record = {0: "message"}
with pytest.raises(InputWarning):
kafka_input.batch_finished_callback()

Expand Down

0 comments on commit 8bbb38e

Please sign in to comment.