diff --git a/logprep/connector/confluent_kafka/input.py b/logprep/connector/confluent_kafka/input.py index 72e836448..53ce16f96 100644 --- a/logprep/connector/confluent_kafka/input.py +++ b/logprep/connector/confluent_kafka/input.py @@ -44,6 +44,7 @@ OFFSET_STORED, Consumer, KafkaException, + Message, TopicPartition, ) from confluent_kafka.admin import AdminClient @@ -244,13 +245,13 @@ class Config(Input.Config): """ - _last_valid_records: dict + _last_valid_record: Message - __slots__ = ["_last_valid_records"] + __slots__ = ["_last_valid_record"] def __init__(self, name: str, configuration: "Connector.Config") -> None: super().__init__(name, configuration) - self._last_valid_records = {} + self._last_valid_record = None @property def _kafka_config(self) -> dict: @@ -418,7 +419,7 @@ def _get_raw_event(self, timeout: float) -> bytearray: raise CriticalInputError( self, "A confluent-kafka record contains an error code", str(kafka_error) ) - self._last_valid_records[message.partition()] = message + self._last_valid_record = message labels = {"description": f"topic: {self._config.topic} - partition: {message.partition()}"} self.metrics.current_offsets.add_with_labels(message.offset() + 1, labels) return message.value() @@ -467,17 +468,15 @@ def _enable_auto_commit(self) -> bool: return self._config.kafka_config.get("enable.auto.commit") == "true" def batch_finished_callback(self) -> None: - """Store offsets for each kafka partition in `self._last_valid_records`. - Should be called by output connectors if they are finished processing - a batch of records. + """Store offsets for last message referenced by `self._last_valid_records`. + Should be called after delivering the current message to the output or error queue. """ if self._enable_auto_offset_store: return - for message in self._last_valid_records.values(): - try: - self._consumer.store_offsets(message=message) - except KafkaException as error: - raise InputWarning(self, f"{error}, {message}") from error + try: + self._consumer.store_offsets(message=self._last_valid_record) + except KafkaException as error: + raise InputWarning(self, f"{error}, {self._last_valid_record}") from error def _assign_callback(self, consumer, topic_partitions): for topic_partition in topic_partitions: