From 85ef357a3deb8d2fc807667cc38ed303de435491 Mon Sep 17 00:00:00 2001 From: Jarkko Jaakola Date: Tue, 14 May 2024 10:08:09 +0300 Subject: [PATCH] fix: consume only one record at a time after startup If Karapace would expect to consume multiple records after startup, there is extra latency of the consumption timeout as consumer does not return control back to schema reader until it happens. This would cause extra latency when storing schema. On normal operating mode after startup there expectation is to process single record at a time and return control immediately to schema reader when record is available. --- karapace/schema_reader.py | 18 +++++++++++++----- tests/unit/test_schema_reader.py | 32 +++++++++++++++++++++++++++++++- 2 files changed, 44 insertions(+), 6 deletions(-) diff --git a/karapace/schema_reader.py b/karapace/schema_reader.py index 6b4047227..88600f70c 100644 --- a/karapace/schema_reader.py +++ b/karapace/schema_reader.py @@ -58,6 +58,16 @@ KAFKA_CLIENT_CREATION_TIMEOUT_SECONDS: Final = 2.0 SCHEMA_TOPIC_CREATION_TIMEOUT_SECONDS: Final = 5.0 +# For good startup performance the consumption of multiple +# records for each consume round is essential. +# Consumer default is 1 message for each consume call and after +# startup the default is a good value. If consumer would expect +# more messages it would return control back after timeout and +# making schema storing latency to be `processing time + timeout`. +MAX_MESSAGES_TO_CONSUME_ON_STARTUP: Final = 1000 +MAX_MESSAGES_TO_CONSUME_AFTER_STARTUP: Final = 1 +MESSAGE_CONSUME_TIMEOUT_SECONDS: Final = 0.2 + # Metric names METRIC_SCHEMA_TOPIC_RECORDS_PROCESSED_COUNT: Final = "karapace_schema_reader_records_processed" METRIC_SCHEMA_TOPIC_RECORDS_PER_KEYMODE_GAUGE: Final = "karapace_schema_reader_records_per_keymode" @@ -120,11 +130,8 @@ def __init__( ) -> None: Thread.__init__(self, name="schema-reader") self.master_coordinator = master_coordinator - self.timeout_s = 0.2 - # Consumer default is 1 message for each consume call - # For good startup performance the consumption of multiple - # records for each consume round is essential. - self.max_messages_to_process = 1000 + self.timeout_s = MESSAGE_CONSUME_TIMEOUT_SECONDS + self.max_messages_to_process = MAX_MESSAGES_TO_CONSUME_ON_STARTUP self.config = config self.database = database @@ -301,6 +308,7 @@ def _is_ready(self) -> bool: self.startup_previous_processed_offset = self.offset ready = self.offset >= self._highest_offset if ready: + self.max_messages_to_process = MAX_MESSAGES_TO_CONSUME_AFTER_STARTUP LOG.info("Ready in %s seconds", time.monotonic() - self.start_time) return ready diff --git a/tests/unit/test_schema_reader.py b/tests/unit/test_schema_reader.py index 12a404ac5..052c3ef7f 100644 --- a/tests/unit/test_schema_reader.py +++ b/tests/unit/test_schema_reader.py @@ -10,7 +10,13 @@ from karapace.config import DEFAULTS from karapace.in_memory_database import InMemoryDatabase from karapace.offset_watcher import OffsetWatcher -from karapace.schema_reader import KafkaSchemaReader, OFFSET_EMPTY, OFFSET_UNINITIALIZED +from karapace.schema_reader import ( + KafkaSchemaReader, + MAX_MESSAGES_TO_CONSUME_AFTER_STARTUP, + MAX_MESSAGES_TO_CONSUME_ON_STARTUP, + OFFSET_EMPTY, + OFFSET_UNINITIALIZED, +) from tests.base_testcase import BaseTestCase from unittest.mock import Mock @@ -154,3 +160,27 @@ def test_readiness_check(testcase: ReadinessTestCase) -> None: schema_reader.handle_messages() assert schema_reader.ready is testcase.expected + + +def test_num_max_messages_to_consume_moved_to_one_after_ready() -> None: + key_formatter_mock = Mock() + consumer_mock = Mock() + consumer_mock.consume.return_value = [] + # Return tuple (beginning, end), end offset is the next upcoming record offset + consumer_mock.get_watermark_offsets.return_value = (0, 1) + + offset_watcher = OffsetWatcher() + schema_reader = KafkaSchemaReader( + config=DEFAULTS, + offset_watcher=offset_watcher, + key_formatter=key_formatter_mock, + master_coordinator=None, + database=InMemoryDatabase(), + ) + schema_reader.consumer = consumer_mock + schema_reader.offset = 0 + assert schema_reader.max_messages_to_process == MAX_MESSAGES_TO_CONSUME_ON_STARTUP + + schema_reader.handle_messages() + assert schema_reader.ready is True + assert schema_reader.max_messages_to_process == MAX_MESSAGES_TO_CONSUME_AFTER_STARTUP