From 78d4b24c7e9f3b139ae592b02d27c17cf5754df3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?J=C3=B6rg=20Zimmermann?= <101292599+ekneg54@users.noreply.github.com> Date: Wed, 2 Oct 2024 11:57:14 +0200 Subject: [PATCH] Ensure kafka output flushes queue on shutdown (#679) * fix timeout settings in kafka output * add tests for shutdown behavior * Update CHANGELOG.md --- CHANGELOG.md | 1 + logprep/connector/confluent_kafka/output.py | 41 +++++++++++++++---- .../connector/test_confluent_kafka_output.py | 9 ++++ 3 files changed, 43 insertions(+), 8 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 642c305b0..e82f58a0b 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -23,6 +23,7 @@ * add manual how to use local images with minikube example setup to documentation * move `Configuration` to top level of documentation * add `CONTRIBUTING` file +* sets the default for `flush_timeout` and `send_timeout` in `kafka_output` connector to `0` seconds ### Bugfix diff --git a/logprep/connector/confluent_kafka/output.py b/logprep/connector/confluent_kafka/output.py index 06e95ddfb..1550b4b02 100644 --- a/logprep/connector/confluent_kafka/output.py +++ b/logprep/connector/confluent_kafka/output.py @@ -148,10 +148,22 @@ class Config(Output.Config): topic: str = field(validator=validators.instance_of(str)) """The topic into which the processed events should be written to.""" - error_topic: str - """The topic into which events should be written that couldn't be processed successfully.""" - flush_timeout: float - send_timeout: int = field(validator=validators.instance_of(int), default=0) + error_topic: str = field(validator=validators.instance_of(str)) + """The topic into which the failed events should be written to.""" + flush_timeout: float = field( + validator=validators.instance_of(float), converter=float, default=0 + ) + """The maximum time in seconds to wait for the producer to flush the messages + to kafka broker. If the buffer is full, the producer will block until the buffer + is empty or the timeout is reached. This implies that the producer does not + wait for all messages to be send to the broker, if the timeout is reached + before the buffer is empty. Default is :code:`0`. + """ + send_timeout: float = field( + validator=validators.instance_of(float), converter=float, default=0 + ) + """The maximum time in seconds to wait for an answer from the broker on polling. + Default is :code:`0`.""" kafka_config: Optional[MappingProxyType] = field( validator=[ validators.instance_of(MappingProxyType), @@ -286,7 +298,7 @@ def store_custom(self, document: dict, target: str) -> None: self._producer.poll(self._config.send_timeout) self.metrics.number_of_processed_events += 1 except BufferError: - # block program until buffer is empty + # block program until buffer is empty or timeout is reached self._producer.flush(timeout=self._config.flush_timeout) except BaseException as error: raise CriticalOutputError( @@ -327,9 +339,22 @@ def store_failed( self._producer.flush(timeout=self._config.flush_timeout) def shut_down(self) -> None: - """ensures that all messages are flushed""" - if self._producer is not None: - self._producer.flush(self._config.flush_timeout) + """ensures that all messages are flushed. According to + https://confluent-kafka-python.readthedocs.io/en/latest/#confluent_kafka.Producer.flush + flush without the timeout parameter will block until all messages are delivered. + This ensures no messages will get lost on shutdown. + """ + if self._producer is None: + return + remaining_messages = self._producer.flush() + if remaining_messages: + self.metrics.number_of_errors += 1 + logger.error( + "Flushing producer timed out. %s messages are still in the buffer.", + remaining_messages, + ) + else: + logger.info("Producer flushed successfully. %s messages remaining.", remaining_messages) def health(self) -> bool: """Check the health of kafka producer.""" diff --git a/tests/unit/connector/test_confluent_kafka_output.py b/tests/unit/connector/test_confluent_kafka_output.py index b4bda2df6..6cdd4c964 100644 --- a/tests/unit/connector/test_confluent_kafka_output.py +++ b/tests/unit/connector/test_confluent_kafka_output.py @@ -195,3 +195,12 @@ def test_health_counts_metrics_on_kafka_exception(self): self.object._producer.list_topics.side_effect = KafkaException("test error") assert not self.object.health() assert self.object.metrics.number_of_errors == 1 + + def test_shutdown_logs_and_counts_error_if_queue_not_fully_flushed(self): + self.object.metrics.number_of_errors = 0 + self.object._producer = mock.MagicMock() + self.object._producer.flush.return_value = 1 + with mock.patch("logging.Logger.error") as mock_error: + self.object.shut_down() + mock_error.assert_called() + self.object.metrics.number_of_errors = 1