Skip to content

Commit

Permalink
Refactor ConfluentKafkaInput to store offsets for last message
Browse files Browse the repository at this point in the history
This commit refactors the ConfluentKafkaInput class to store offsets for the last message referenced by `_last_valid_records`. Previously, offsets were stored for each kafka partition in `_last_valid_records`, but now only the last valid record is stored. This change improves the efficiency of offset storage and reduces memory usage.

Code changes:
- Modified `ConfluentKafkaInput` class in `logprep/connector/confluent_kafka/input.py`
- Removed `_last_valid_records` dictionary and replaced it with `_last_valid_record` variable
- Updated `batch_finished_callback` method to store offsets for the last valid record
  • Loading branch information
ekneg54 committed Oct 28, 2024
1 parent d210506 commit 0070857
Showing 1 changed file with 11 additions and 12 deletions.
23 changes: 11 additions & 12 deletions logprep/connector/confluent_kafka/input.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
OFFSET_STORED,
Consumer,
KafkaException,
Message,
TopicPartition,
)
from confluent_kafka.admin import AdminClient
Expand Down Expand Up @@ -244,13 +245,13 @@ class Config(Input.Config):
"""

_last_valid_records: dict
_last_valid_record: Message

__slots__ = ["_last_valid_records"]
__slots__ = ["_last_valid_record"]

def __init__(self, name: str, configuration: "Connector.Config") -> None:
super().__init__(name, configuration)
self._last_valid_records = {}
self._last_valid_record = None

@property
def _kafka_config(self) -> dict:
Expand Down Expand Up @@ -418,7 +419,7 @@ def _get_raw_event(self, timeout: float) -> bytearray:
raise CriticalInputError(
self, "A confluent-kafka record contains an error code", str(kafka_error)
)
self._last_valid_records[message.partition()] = message
self._last_valid_record = message
labels = {"description": f"topic: {self._config.topic} - partition: {message.partition()}"}
self.metrics.current_offsets.add_with_labels(message.offset() + 1, labels)
return message.value()
Expand Down Expand Up @@ -467,17 +468,15 @@ def _enable_auto_commit(self) -> bool:
return self._config.kafka_config.get("enable.auto.commit") == "true"

def batch_finished_callback(self) -> None:
"""Store offsets for each kafka partition in `self._last_valid_records`.
Should be called by output connectors if they are finished processing
a batch of records.
"""Store offsets for last message referenced by `self._last_valid_records`.
Should be called after delivering the current message to the output or error queue.
"""
if self._enable_auto_offset_store:
return
for message in self._last_valid_records.values():
try:
self._consumer.store_offsets(message=message)
except KafkaException as error:
raise InputWarning(self, f"{error}, {message}") from error
try:
self._consumer.store_offsets(message=self._last_valid_record)
except KafkaException as error:
raise InputWarning(self, f"{error}, {self._last_valid_record}") from error

def _assign_callback(self, consumer, topic_partitions):
for topic_partition in topic_partitions:
Expand Down

0 comments on commit 0070857

Please sign in to comment.