From ab24575dede3c21f6ea22de30e695691d32d3026 Mon Sep 17 00:00:00 2001 From: Elia Migliore Date: Wed, 28 Aug 2024 12:31:01 +0200 Subject: [PATCH] WIP: 1. now the schema_coordinator wait until the schema_reader reached the last message in the topic before declaring himself as master 2. changed the `_ready` flag of the schema_reader to be protected by a lock since now also the schema_coordinator can reset the `_ready` flag 3. set the close of the coordinator in the `close` method instead of in the run method --- .../coordinator/master_coordinator.py | 21 +++++-- .../coordinator/schema_coordinator.py | 14 +++-- src/karapace/schema_reader.py | 55 ++++++++++++++----- src/karapace/schema_registry.py | 9 ++- src/karapace/schema_registry_apis.py | 5 +- src/karapace/typing.py | 11 ++++ tests/integration/conftest.py | 1 - tests/integration/test_master_coordinator.py | 10 ++++ tests/integration/test_schema_coordinator.py | 12 ++++ tests/integration/test_schema_reader.py | 4 ++ tests/unit/test_schema_reader.py | 12 ++-- tests/unit/test_schema_registry_api.py | 2 +- 12 files changed, 121 insertions(+), 35 deletions(-) diff --git a/src/karapace/coordinator/master_coordinator.py b/src/karapace/coordinator/master_coordinator.py index eb9c03eb9..8d0e9b738 100644 --- a/src/karapace/coordinator/master_coordinator.py +++ b/src/karapace/coordinator/master_coordinator.py @@ -13,14 +13,17 @@ from karapace.config import Config from karapace.coordinator.schema_coordinator import SchemaCoordinator, SchemaCoordinatorStatus from karapace.kafka.types import DEFAULT_REQUEST_TIMEOUT_MS +from karapace.typing import SchemaReaderStoppper from threading import Thread from typing import Final import asyncio import logging +import time __all__ = ("MasterCoordinator",) + LOG = logging.getLogger(__name__) @@ -42,6 +45,10 @@ def __init__(self, config: Config) -> None: self._sc: SchemaCoordinator | None = None self._thread: Thread = Thread(target=self._start_loop, daemon=True) self._loop: asyncio.AbstractEventLoop | None = None + self._schema_reader_stopper: SchemaReaderStoppper | None = None + + def set_stoppper(self, schema_reader_stopper: SchemaReaderStoppper) -> None: + self._schema_reader_stopper = schema_reader_stopper @property def schema_coordinator(self) -> SchemaCoordinator | None: @@ -84,14 +91,17 @@ async def _async_loop(self) -> None: self._sc = self.init_schema_coordinator() while self._running: if self._sc.ready(): - return + break await asyncio.sleep(0.5) - + # todo: wait a condition variable or a lock. LOG.info("Closing master_coordinator") if self._sc: await self._sc.close() - if self._loop: - self._loop.close() + while self._loop is not None and not self._loop.is_closed(): + self._loop.stop() + if not self._loop.is_running(): + self._loop.close() + time.sleep(0.5) if self._kafka_client: await self._kafka_client.close() @@ -119,8 +129,10 @@ def init_kafka_client(self) -> AIOKafkaClient: def init_schema_coordinator(self) -> SchemaCoordinator: assert self._kafka_client is not None + assert self._schema_reader_stopper is not None schema_coordinator = SchemaCoordinator( client=self._kafka_client, + schema_reader_stopper=self._schema_reader_stopper, election_strategy=self._config.get("master_election_strategy", "lowest"), group_id=self._config["group_id"], hostname=self._config["advertised_hostname"], @@ -159,3 +171,4 @@ def get_master_info(self) -> tuple[bool | None, str | None]: async def close(self) -> None: self._running = False + # todo set the condition variable or lock. diff --git a/src/karapace/coordinator/schema_coordinator.py b/src/karapace/coordinator/schema_coordinator.py index eed301a30..7fd662590 100644 --- a/src/karapace/coordinator/schema_coordinator.py +++ b/src/karapace/coordinator/schema_coordinator.py @@ -27,7 +27,7 @@ from aiokafka.util import create_future, create_task from collections.abc import Coroutine, Sequence from karapace.dataclasses import default_dataclass -from karapace.typing import JsonData +from karapace.typing import JsonData, SchemaReaderStoppper from karapace.utils import json_decode, json_encode from karapace.version import __version__ from typing import Any, Final @@ -123,6 +123,7 @@ class SchemaCoordinator: def __init__( self, client: AIOKafkaClient, + schema_reader_stopper: SchemaReaderStoppper, hostname: str, port: int, scheme: str, @@ -147,6 +148,7 @@ def __init__( self.scheme: Final = scheme self.master_eligibility: Final = master_eligibility self.master_url: str | None = None + self._schema_reader_stopper = schema_reader_stopper self._are_we_master: bool | None = False # a value that its strictly higher than any clock, so we are sure # we are never going to consider this the leader without explictly passing @@ -212,7 +214,7 @@ def are_we_master(self) -> bool | None: LOG.warning("No new elections performed yet.") return None - if not self._ready: + if not self._ready or not self._schema_reader_stopper.ready(): return False if self._are_we_master and self._initial_election_sec is not None: @@ -224,7 +226,7 @@ def are_we_master(self) -> bool | None: self._initial_election_sec = None # this is the last point in time were we wait till to the end of the log queue for new # incoming messages. - self._ready = False # todo: wrong, its not the _ready flag we should change, we should change the same + self._schema_reader_stopper.set_not_ready() # flag that its set at startup, fix this return False @@ -485,7 +487,7 @@ async def _on_join_complete( # was a master change, the time before acting its always respect # to which was the previous master (if we were master no need # to wait more before acting) - self._ready = False # todo: wrong, its not the _ready flag we should change, we should change the same + self._schema_reader_stopper.set_not_ready() # flag that its set at startup, fix this # `time.monotonic()` because we don't want the time to go back or forward because of e.g. ntp self._initial_election_sec = time.monotonic() @@ -506,7 +508,7 @@ async def _on_join_complete( self.master_url = None self._are_we_master = False else: - LOG.info("We are not elected as master", member_id) + LOG.info("We are not elected as master") self.master_url = master_url self._are_we_master = False self._ready = True @@ -519,6 +521,7 @@ def coordinator_dead(self) -> None: """ if self._coordinator_dead_fut is not None and self.coordinator_id is not None: LOG.warning("Marking the coordinator dead (node %s)for group %s.", self.coordinator_id, self.group_id) + self._are_we_master = False self.coordinator_id = None self._coordinator_dead_fut.set_result(None) @@ -526,6 +529,7 @@ def reset_generation(self) -> None: """Coordinator did not recognize either generation or member_id. Will need to re-join the group. """ + self._are_we_master = False self.generation = OffsetCommitRequest.DEFAULT_GENERATION_ID self.member_id = JoinGroupRequest[0].UNKNOWN_MEMBER_ID self.request_rejoin() diff --git a/src/karapace/schema_reader.py b/src/karapace/schema_reader.py index b20487631..62b0bec75 100644 --- a/src/karapace/schema_reader.py +++ b/src/karapace/schema_reader.py @@ -43,9 +43,9 @@ from karapace.schema_models import parse_protobuf_schema_definition, SchemaType, TypedSchema, ValidatedTypedSchema from karapace.schema_references import LatestVersionReference, Reference, reference_from_mapping, Referents from karapace.statsd import StatsClient -from karapace.typing import JsonObject, SchemaId, Subject, Version +from karapace.typing import JsonObject, SchemaId, SchemaReaderStoppper, Subject, Version from karapace.utils import json_decode, JSONDecodeError, shutdown -from threading import Event, Thread +from threading import Event, Lock, Thread from typing import Final import asyncio @@ -129,7 +129,7 @@ def _create_admin_client_from_config(config: Config) -> KafkaAdminClient: ) -class KafkaSchemaReader(Thread): +class KafkaSchemaReader(Thread, SchemaReaderStoppper): def __init__( self, config: Config, @@ -167,7 +167,10 @@ def __init__( # old stale version that has not been deleted yet.) self.offset = OFFSET_UNINITIALIZED self._highest_offset = OFFSET_UNINITIALIZED - self.ready = False + # when a master its elected as master we should read the last arrived messages at least + # once. This lock prevent the concurrent modification of the `ready` flag. + self._ready_lock = Lock() + self._ready = False # This event controls when the Reader should stop running, it will be # set by another thread (e.g. `KarapaceSchemaRegistry`) @@ -319,9 +322,10 @@ def _get_beginning_offset(self) -> int: return OFFSET_UNINITIALIZED def _is_ready(self) -> bool: - if self.ready: - return True - + """ + Always call `_is_ready` only if `self._ready` is False. + Removed the check since now with the Lock the lookup it's a costly operation. + """ assert self.consumer is not None, "Thread must be started" try: @@ -365,6 +369,14 @@ def _is_ready(self) -> bool: def highest_offset(self) -> int: return max(self._highest_offset, self._offset_watcher.greatest_offset()) + def ready(self) -> bool: + with self._ready_lock: + return self._ready + + def set_not_ready(self) -> None: + with self._ready_lock: + self._ready = False + @staticmethod def _parse_message_value(raw_value: str | bytes) -> JsonObject | None: value = json_decode(raw_value) @@ -376,10 +388,8 @@ def _parse_message_value(raw_value: str | bytes) -> JsonObject | None: def handle_messages(self) -> None: assert self.consumer is not None, "Thread must be started" - msgs: list[Message] = self.consumer.consume(timeout=self.timeout_s, num_messages=self.max_messages_to_process) - if self.ready is False: - self.ready = self._is_ready() + self._update_is_ready_flag() watch_offsets = False if self.master_coordinator is not None: @@ -433,9 +443,10 @@ def consume_messages(self, msgs: list[Message], watch_offsets: bool) -> None: # Default keymode is CANONICAL and preferred unless any data consumed # has key in non-canonical format. If keymode is set to DEPRECATED_KARAPACE # the subsequent keys are omitted from detection. - if not self.ready and self.key_formatter.get_keymode() == KeyMode.CANONICAL: - if msg_keymode == KeyMode.DEPRECATED_KARAPACE: - self.key_formatter.set_keymode(KeyMode.DEPRECATED_KARAPACE) + with self._ready_lock: + if not self._ready and self.key_formatter.get_keymode() == KeyMode.CANONICAL: + if msg_keymode == KeyMode.DEPRECATED_KARAPACE: + self.key_formatter.set_keymode(KeyMode.DEPRECATED_KARAPACE) value = None message_value = msg.value() @@ -461,14 +472,28 @@ def consume_messages(self, msgs: list[Message], watch_offsets: bool) -> None: else: schema_records_processed_keymode_deprecated_karapace += 1 - if self.ready and watch_offsets: - self._offset_watcher.offset_seen(self.offset) + with self._ready_lock: + if self._ready and watch_offsets: + self._offset_watcher.offset_seen(self.offset) self._report_schema_metrics( schema_records_processed_keymode_canonical, schema_records_processed_keymode_deprecated_karapace, ) + def _update_is_ready_flag(self) -> None: + update_ready_flag = False + + # to keep the lock as few as possible. + with self._ready_lock: + if self._ready is False: + update_ready_flag = True + + if update_ready_flag: + new_ready_flag = self._is_ready() + with self._ready_lock: + self._ready = new_ready_flag + def _report_schema_metrics( self, schema_records_processed_keymode_canonical: int, diff --git a/src/karapace/schema_registry.py b/src/karapace/schema_registry.py index 67f58fddd..614b4a6ad 100644 --- a/src/karapace/schema_registry.py +++ b/src/karapace/schema_registry.py @@ -58,6 +58,13 @@ def __init__(self, config: Config) -> None: master_coordinator=self.mc, database=self.database, ) + # very ugly, left as a placeholder, since we have a bidirectional + # dependency it means that the two objects needs to be one (aka the + # mc should create the KafkaSchemaReader and inject the stopper inside + # the schema_coordinator. Left as it is to reason together to the implementation + # since semantically it's the same, after we agree on the solution proceeding with + # the refactor) + self.mc.set_stoppper(self.schema_reader) self.schema_lock = asyncio.Lock() self._master_lock = asyncio.Lock() @@ -96,7 +103,7 @@ async def get_master(self, ignore_readiness: bool = False) -> tuple[bool, str | are_we_master, master_url = self.mc.get_master_info() if are_we_master is None: LOG.info("No master set: %r, url: %r", are_we_master, master_url) - elif not ignore_readiness and self.schema_reader.ready is False: + elif not ignore_readiness and self.schema_reader.ready() is False: LOG.info("Schema reader isn't ready yet: %r", self.schema_reader.ready) else: return are_we_master, master_url diff --git a/src/karapace/schema_registry_apis.py b/src/karapace/schema_registry_apis.py index 06375e6c8..12cb3b4a1 100644 --- a/src/karapace/schema_registry_apis.py +++ b/src/karapace/schema_registry_apis.py @@ -84,6 +84,7 @@ class SchemaErrorMessages(Enum): class KarapaceSchemaRegistryController(KarapaceBase): def __init__(self, config: Config) -> None: + # the `not_ready_handler` its wrong, its not expecting an async method the receiver. super().__init__(config=config, not_ready_handler=self._forward_if_not_ready_to_serve) self._auth: HTTPAuthorizer | None = None @@ -104,7 +105,7 @@ async def schema_registry_health(self) -> HealthCheck: if self._auth is not None: resp["schema_registry_authfile_timestamp"] = self._auth.authfile_last_modified resp["schema_registry_ready"] = self.schema_registry.schema_reader.ready - if self.schema_registry.schema_reader.ready: + if self.schema_registry.schema_reader.ready(): resp["schema_registry_startup_time_sec"] = ( self.schema_registry.schema_reader.last_check - self._process_start_time ) @@ -141,7 +142,7 @@ def _check_authorization(self, user: User | None, operation: Operation, resource self.r(body={"message": "Forbidden"}, content_type=JSON_CONTENT_TYPE, status=HTTPStatus.FORBIDDEN) async def _forward_if_not_ready_to_serve(self, request: HTTPRequest) -> None: - if self.schema_registry.schema_reader.ready: + if self.schema_registry.schema_reader.ready(): pass else: # Not ready, still loading the state. diff --git a/src/karapace/typing.py b/src/karapace/typing.py index 1268db001..f7fba570f 100644 --- a/src/karapace/typing.py +++ b/src/karapace/typing.py @@ -4,6 +4,7 @@ """ from __future__ import annotations +from abc import ABC, abstractmethod from collections.abc import Mapping, Sequence from enum import Enum, unique from karapace.errors import InvalidVersion @@ -102,3 +103,13 @@ def value(self) -> int: @property def is_latest(self) -> bool: return self.value == self.MINUS_1_VERSION_TAG + + +class SchemaReaderStoppper(ABC): + @abstractmethod + def ready(self) -> bool: + pass + + @abstractmethod + def set_not_ready(self) -> None: + pass diff --git a/tests/integration/conftest.py b/tests/integration/conftest.py index c264befb1..89b5a74f5 100644 --- a/tests/integration/conftest.py +++ b/tests/integration/conftest.py @@ -36,7 +36,6 @@ from tests.integration.utils.synchronization import lock_path_for from tests.integration.utils.zookeeper import configure_and_start_zk from tests.utils import repeat_until_master_is_available, repeat_until_successful_request -from typing import AsyncIterator, Iterator from urllib.parse import urlparse import asyncio diff --git a/tests/integration/test_master_coordinator.py b/tests/integration/test_master_coordinator.py index 0f04c8a15..62cc4392d 100644 --- a/tests/integration/test_master_coordinator.py +++ b/tests/integration/test_master_coordinator.py @@ -6,6 +6,7 @@ """ from karapace.config import set_config_defaults from karapace.coordinator.master_coordinator import MasterCoordinator +from karapace.typing import SchemaReaderStoppper from tests.integration.utils.kafka_server import KafkaServers from tests.integration.utils.network import allocate_port from tests.integration.utils.rest_client import RetryRestClient @@ -16,8 +17,17 @@ import pytest +class AlwaysAvailableSchemaReaderStoppper(SchemaReaderStoppper): + def ready(self) -> bool: + return True + + def set_not_ready(self) -> None: + pass + + async def init_admin(config): mc = MasterCoordinator(config=config) + mc.set_stoppper(AlwaysAvailableSchemaReaderStoppper()) await mc.start() return mc diff --git a/tests/integration/test_schema_coordinator.py b/tests/integration/test_schema_coordinator.py index 6ed6c3368..15c432d10 100644 --- a/tests/integration/test_schema_coordinator.py +++ b/tests/integration/test_schema_coordinator.py @@ -23,6 +23,7 @@ from karapace.utils import json_encode from karapace.version import __version__ from tenacity import retry, stop_after_delay, TryAgain, wait_fixed +from tests.integration.test_master_coordinator import AlwaysAvailableSchemaReaderStoppper from tests.integration.utils.kafka_server import KafkaServers from tests.utils import new_random_name from typing import Final @@ -56,6 +57,7 @@ async def fixture_admin( ) -> AsyncGenerator: coordinator = SchemaCoordinator( mocked_client, + AlwaysAvailableSchemaReaderStoppper(), "test-host", 10101, "https", @@ -116,6 +118,7 @@ async def test_coordinator_workflow( waiting_time_before_acting_as_master_sec = 5 coordinator = SchemaCoordinator( client, + AlwaysAvailableSchemaReaderStoppper(), "test-host-1", 10101, "https", @@ -136,12 +139,14 @@ async def test_coordinator_workflow( assert not coordinator.are_we_master() # the waiting_time_before_acting_as_master_ms await asyncio.sleep(10) + assert not coordinator.are_we_master(), "last fetch before being available as master" assert coordinator.are_we_master(), f"after {waiting_time_before_acting_as_master_sec} seconds we can act as a master" # Check if adding an additional coordinator will rebalance correctly client2 = await _get_client(kafka_servers=kafka_servers) coordinator2 = SchemaCoordinator( client2, + AlwaysAvailableSchemaReaderStoppper(), "test-host-2", 10100, "https", @@ -174,6 +179,7 @@ async def test_coordinator_workflow( assert not secondary.are_we_master(), "also the second cannot be immediately a master" # after that time the primary can act as a master await asyncio.sleep(waiting_time_before_acting_as_master_sec) + assert not primary.are_we_master(), "Last fetch before being available as master" assert primary.are_we_master() assert not secondary.are_we_master() @@ -414,6 +420,7 @@ async def test_coordinator_metadata_update(client: AIOKafkaClient) -> None: try: coordinator = SchemaCoordinator( client, + AlwaysAvailableSchemaReaderStoppper(), "test-host", 10101, "https", @@ -456,6 +463,7 @@ async def test_coordinator__send_req(client: AIOKafkaClient) -> None: try: coordinator = SchemaCoordinator( client, + AlwaysAvailableSchemaReaderStoppper(), "test-host", 10101, "https", @@ -494,6 +502,7 @@ async def test_coordinator_ensure_coordinator_known(client: AIOKafkaClient) -> N try: coordinator = SchemaCoordinator( client, + AlwaysAvailableSchemaReaderStoppper(), "test-host", 10101, "https", @@ -575,6 +584,7 @@ async def test_coordinator__do_heartbeat(client: AIOKafkaClient) -> None: try: coordinator = SchemaCoordinator( client, + AlwaysAvailableSchemaReaderStoppper(), "test-host", 10101, "https", @@ -663,6 +673,7 @@ async def test_coordinator__heartbeat_routine(client: AIOKafkaClient) -> None: try: coordinator = SchemaCoordinator( client, + AlwaysAvailableSchemaReaderStoppper(), "test-host", 10101, "https", @@ -734,6 +745,7 @@ async def test_coordinator__coordination_routine(client: AIOKafkaClient) -> None try: coordinator = SchemaCoordinator( client, + AlwaysAvailableSchemaReaderStoppper(), "test-host", 10101, "https", diff --git a/tests/integration/test_schema_reader.py b/tests/integration/test_schema_reader.py index 4d00a5581..c5516951a 100644 --- a/tests/integration/test_schema_reader.py +++ b/tests/integration/test_schema_reader.py @@ -15,6 +15,7 @@ from karapace.schema_reader import KafkaSchemaReader from karapace.utils import json_encode from tests.base_testcase import BaseTestCase +from tests.integration.test_master_coordinator import AlwaysAvailableSchemaReaderStoppper from tests.integration.utils.kafka_server import KafkaServers from tests.schemas.json_schemas import FALSE_SCHEMA, TRUE_SCHEMA from tests.utils import create_group_name_factory, create_subject_name_factory, new_random_name, new_topic @@ -70,6 +71,7 @@ async def test_regression_soft_delete_schemas_should_be_registered( } ) master_coordinator = MasterCoordinator(config=config) + master_coordinator.set_stoppper(AlwaysAvailableSchemaReaderStoppper()) try: await master_coordinator.start() database = InMemoryDatabase() @@ -162,6 +164,7 @@ async def test_regression_config_for_inexisting_object_should_not_throw( } ) master_coordinator = MasterCoordinator(config=config) + master_coordinator.set_stoppper(AlwaysAvailableSchemaReaderStoppper()) try: await master_coordinator.start() database = InMemoryDatabase() @@ -266,6 +269,7 @@ async def test_key_format_detection( } ) master_coordinator = MasterCoordinator(config=config) + master_coordinator.set_stoppper(AlwaysAvailableSchemaReaderStoppper()) try: await master_coordinator.start() key_formatter = KeyFormatter() diff --git a/tests/unit/test_schema_reader.py b/tests/unit/test_schema_reader.py index 552fa0be7..4b8cc7c28 100644 --- a/tests/unit/test_schema_reader.py +++ b/tests/unit/test_schema_reader.py @@ -173,7 +173,7 @@ def test_readiness_check(testcase: ReadinessTestCase) -> None: schema_reader.offset = testcase.cur_offset schema_reader.handle_messages() - assert schema_reader.ready is testcase.expected + assert schema_reader.ready() is testcase.expected def test_num_max_messages_to_consume_moved_to_one_after_ready() -> None: @@ -196,7 +196,7 @@ def test_num_max_messages_to_consume_moved_to_one_after_ready() -> None: assert schema_reader.max_messages_to_process == MAX_MESSAGES_TO_CONSUME_ON_STARTUP schema_reader.handle_messages() - assert schema_reader.ready is True + assert schema_reader.ready() is True assert schema_reader.max_messages_to_process == MAX_MESSAGES_TO_CONSUME_AFTER_STARTUP @@ -242,16 +242,16 @@ def test_schema_reader_can_end_to_ready_state_if_last_message_is_invalid_in_sche schema_reader.handle_messages() assert schema_reader.offset == 1 - assert schema_reader.ready is False + assert schema_reader.ready() is False schema_reader.handle_messages() assert schema_reader.offset == 2 - assert schema_reader.ready is False + assert schema_reader.ready() is False schema_reader.handle_messages() assert schema_reader.offset == 3 - assert schema_reader.ready is False + assert schema_reader.ready() is False schema_reader.handle_messages() # call last time to call _is_ready() assert schema_reader.offset == 3 - assert schema_reader.ready is True + assert schema_reader.ready() is True assert schema_reader.max_messages_to_process == MAX_MESSAGES_TO_CONSUME_AFTER_STARTUP diff --git a/tests/unit/test_schema_registry_api.py b/tests/unit/test_schema_registry_api.py index 7fcecd47e..b4d87f35b 100644 --- a/tests/unit/test_schema_registry_api.py +++ b/tests/unit/test_schema_registry_api.py @@ -33,7 +33,7 @@ async def test_validate_schema_request_body() -> None: async def test_forward_when_not_ready() -> None: with patch("karapace.schema_registry_apis.KarapaceSchemaRegistry") as schema_registry_class: schema_reader_mock = Mock(spec=KafkaSchemaReader) - ready_property_mock = PropertyMock(return_value=False) + ready_property_mock = PropertyMock(return_value=lambda: False) schema_registry = AsyncMock(spec=KarapaceSchemaRegistry) type(schema_reader_mock).ready = ready_property_mock schema_registry.schema_reader = schema_reader_mock