Skip to content

Commit

Permalink
fix: consume only one record at a time after startup
Browse files Browse the repository at this point in the history
If Karapace would expect to consume multiple records after startup,
there is extra latency of the consumption timeout as consumer does
not return control back to schema reader until it happens. This would
cause extra latency when storing schema.
On normal operating mode after startup there expectation is to
process single record at a time and return control immediately to
schema reader when record is available.
  • Loading branch information
jjaakola-aiven committed May 14, 2024
1 parent 669493c commit 85ef357
Show file tree
Hide file tree
Showing 2 changed files with 44 additions and 6 deletions.
18 changes: 13 additions & 5 deletions karapace/schema_reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,16 @@
KAFKA_CLIENT_CREATION_TIMEOUT_SECONDS: Final = 2.0
SCHEMA_TOPIC_CREATION_TIMEOUT_SECONDS: Final = 5.0

# For good startup performance the consumption of multiple
# records for each consume round is essential.
# Consumer default is 1 message for each consume call and after
# startup the default is a good value. If consumer would expect
# more messages it would return control back after timeout and
# making schema storing latency to be `processing time + timeout`.
MAX_MESSAGES_TO_CONSUME_ON_STARTUP: Final = 1000
MAX_MESSAGES_TO_CONSUME_AFTER_STARTUP: Final = 1
MESSAGE_CONSUME_TIMEOUT_SECONDS: Final = 0.2

# Metric names
METRIC_SCHEMA_TOPIC_RECORDS_PROCESSED_COUNT: Final = "karapace_schema_reader_records_processed"
METRIC_SCHEMA_TOPIC_RECORDS_PER_KEYMODE_GAUGE: Final = "karapace_schema_reader_records_per_keymode"
Expand Down Expand Up @@ -120,11 +130,8 @@ def __init__(
) -> None:
Thread.__init__(self, name="schema-reader")
self.master_coordinator = master_coordinator
self.timeout_s = 0.2
# Consumer default is 1 message for each consume call
# For good startup performance the consumption of multiple
# records for each consume round is essential.
self.max_messages_to_process = 1000
self.timeout_s = MESSAGE_CONSUME_TIMEOUT_SECONDS
self.max_messages_to_process = MAX_MESSAGES_TO_CONSUME_ON_STARTUP
self.config = config

self.database = database
Expand Down Expand Up @@ -301,6 +308,7 @@ def _is_ready(self) -> bool:
self.startup_previous_processed_offset = self.offset
ready = self.offset >= self._highest_offset
if ready:
self.max_messages_to_process = MAX_MESSAGES_TO_CONSUME_AFTER_STARTUP
LOG.info("Ready in %s seconds", time.monotonic() - self.start_time)
return ready

Expand Down
32 changes: 31 additions & 1 deletion tests/unit/test_schema_reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,13 @@
from karapace.config import DEFAULTS
from karapace.in_memory_database import InMemoryDatabase
from karapace.offset_watcher import OffsetWatcher
from karapace.schema_reader import KafkaSchemaReader, OFFSET_EMPTY, OFFSET_UNINITIALIZED
from karapace.schema_reader import (
KafkaSchemaReader,
MAX_MESSAGES_TO_CONSUME_AFTER_STARTUP,
MAX_MESSAGES_TO_CONSUME_ON_STARTUP,
OFFSET_EMPTY,
OFFSET_UNINITIALIZED,
)
from tests.base_testcase import BaseTestCase
from unittest.mock import Mock

Expand Down Expand Up @@ -154,3 +160,27 @@ def test_readiness_check(testcase: ReadinessTestCase) -> None:

schema_reader.handle_messages()
assert schema_reader.ready is testcase.expected


def test_num_max_messages_to_consume_moved_to_one_after_ready() -> None:
key_formatter_mock = Mock()
consumer_mock = Mock()
consumer_mock.consume.return_value = []
# Return tuple (beginning, end), end offset is the next upcoming record offset
consumer_mock.get_watermark_offsets.return_value = (0, 1)

offset_watcher = OffsetWatcher()
schema_reader = KafkaSchemaReader(
config=DEFAULTS,
offset_watcher=offset_watcher,
key_formatter=key_formatter_mock,
master_coordinator=None,
database=InMemoryDatabase(),
)
schema_reader.consumer = consumer_mock
schema_reader.offset = 0
assert schema_reader.max_messages_to_process == MAX_MESSAGES_TO_CONSUME_ON_STARTUP

schema_reader.handle_messages()
assert schema_reader.ready is True
assert schema_reader.max_messages_to_process == MAX_MESSAGES_TO_CONSUME_AFTER_STARTUP

0 comments on commit 85ef357

Please sign in to comment.