diff --git a/.dockerignore b/.dockerignore index 57efb59ad..4b946a334 100644 --- a/.dockerignore +++ b/.dockerignore @@ -10,7 +10,6 @@ !LICENSE !pyproject.toml !setup.py -!container/start.sh !container/healthcheck.py # Ignore some files in source directories. diff --git a/.github/workflows/tests.yml b/.github/workflows/tests.yml index 8b151f124..d585d3611 100644 --- a/.github/workflows/tests.yml +++ b/.github/workflows/tests.yml @@ -14,6 +14,7 @@ env: FORCE_COLOR: 1 PIP_PROGRESS_BAR: off PYTHONUNBUFFERED: 1 + KARAPACE_DOTENV: ${{ github.workspace }}/karapace.config.env jobs: tests: @@ -44,11 +45,11 @@ jobs: - run: make unit-tests env: COVERAGE_FILE: ".coverage.${{ matrix.python-version }}" - PYTEST_ARGS: "--cov=karapace --cov-append --numprocesses 4" + PYTEST_ARGS: "--cov=src --cov-append --numprocesses 4" - run: make integration-tests env: COVERAGE_FILE: ".coverage.${{ matrix.python-version }}" - PYTEST_ARGS: "--cov=karapace --cov-append --random-order --numprocesses 4" + PYTEST_ARGS: "--cov=src --cov-append --random-order --numprocesses 4" - name: Archive logs uses: actions/upload-artifact@v4 diff --git a/bin/smoke-test-registry.sh b/bin/smoke-test-registry.sh index 71f4e4fc7..477651a20 100755 --- a/bin/smoke-test-registry.sh +++ b/bin/smoke-test-registry.sh @@ -6,6 +6,7 @@ for ((i = 0; i <= retries; i++)); do response=$( curl --silent --verbose --fail --request POST \ --header 'Content-Type: application/vnd.schemaregistry.v1+json' \ + --header 'Authorization: Basic YWRtaW46YWRtaW4=' \ --data '{"schema": "{\"type\": \"record\", \"name\": \"Obj\", \"fields\":[{\"name\": \"age\", \"type\": \"int\"}]}"}' \ http://localhost:8081/subjects/test-key/versions ) diff --git a/container/Dockerfile b/container/Dockerfile index 2e1544319..55ca06e1c 100644 --- a/container/Dockerfile +++ b/container/Dockerfile @@ -55,10 +55,6 @@ RUN apt-get update \ COPY --from=builder /venv /venv ENV PATH="/venv/bin:$PATH" -COPY ./container/start.sh /opt/karapace -RUN chmod 500 /opt/karapace/start.sh \ - && chown karapace:karapace /opt/karapace/start.sh - COPY ./container/healthcheck.py /opt/karapace WORKDIR /opt/karapace diff --git a/container/compose.yml b/container/compose.yml index f17c21eb3..ed633663f 100644 --- a/container/compose.yml +++ b/container/compose.yml @@ -4,7 +4,7 @@ services: zookeeper: image: confluentinc/cp-zookeeper:latest ports: - - "2181:2181" + - 2181:2181 environment: ZOOKEEPER_CLIENT_PORT: 2181 ZOOKEEPER_TICK_TIME: 2000 @@ -14,8 +14,8 @@ services: depends_on: - zookeeper ports: - - "9101:9101" # JMX - - "9092:9092" # Kafka + - 9101:9101 # JMX + - 9092:9092 # Kafka environment: # Listeners: # PLAINTEXT_HOST -> Expose kafka to the host network @@ -23,7 +23,7 @@ services: KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:29092,PLAINTEXT_HOST://localhost:9092 KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0 - KAFKA_CONFLUENT_SCHEMA_REGISTRY_URL: http://karapace-registry:8081 + KAFKA_CONFLUENT_SCHEMA_REGISTRY_URL: http://karapace-schema-registry:8081 # Metrics: KAFKA_JMX_PORT: 9101 KAFKA_JMX_HOSTNAME: localhost @@ -54,51 +54,44 @@ services: KAFKA_ZOOKEEPER_CONNECTION_TIMEOUT_MS: 6000 KAFKA_ZOOKEEPER_CONNECT: "zookeeper:2181" - karapace-registry: + karapace-schema-registry: image: ghcr.io/aiven-open/karapace:develop build: context: .. dockerfile: container/Dockerfile entrypoint: - - /bin/bash - - /opt/karapace/start.sh - - registry + - python3 + - -m + - schema_registry depends_on: - kafka ports: - - "8081:8081" + - 8081:8081 volumes: - - ./karapace.env:/opt/karapace/karapace.env + - ./karapace.registry.env:/opt/karapace/karapace.env environment: KARAPACE_DOTENV: /opt/karapace/karapace.env + KARAPACE_PORT: 8081 - karapace-rest: + karapace-rest-proxy: image: ghcr.io/aiven-open/karapace:develop build: context: .. dockerfile: container/Dockerfile entrypoint: - - /bin/bash - - /opt/karapace/start.sh - - rest + - python3 + - -m + - karapace.karapace_all depends_on: - kafka - - karapace-registry + - karapace-schema-registry ports: - - "8082:8082" + - 8082:8082 + volumes: + - ./karapace.rest.env:/opt/karapace/karapace.env environment: + KARAPACE_DOTENV: /opt/karapace/karapace.env KARAPACE_PORT: 8082 - KARAPACE_HOST: 0.0.0.0 - KARAPACE_ADVERTISED_HOSTNAME: karapace-rest - KARAPACE_BOOTSTRAP_URI: kafka:29092 - KARAPACE_REGISTRY_HOST: karapace-registry - KARAPACE_REGISTRY_PORT: 8081 - KARAPACE_ADMIN_METADATA_MAX_AGE: 0 - KARAPACE_LOG_LEVEL: WARNING - KARAPACE_STATSD_HOST: statsd-exporter - KARAPACE_STATSD_PORT: 8125 - KARAPACE_KAFKA_SCHEMA_READER_STRICT_MODE: false - KARAPACE_KAFKA_RETRIABLE_ERRORS_SILENCED: true prometheus: image: prom/prometheus diff --git a/container/karapace.env b/container/karapace.registry.env similarity index 95% rename from container/karapace.env rename to container/karapace.registry.env index 6cee31a2e..849d6237c 100644 --- a/container/karapace.env +++ b/container/karapace.registry.env @@ -15,7 +15,7 @@ FETCH_MIN_BYTES=1 GROUP_ID=karapace-schema-registry HOST=0.0.0.0 PORT=8081 -REGISTRY_HOST=karapace-registry +REGISTRY_HOST=karapace-schema-registry REGISTRY_PORT=8081 REST_AUTHORIZATION=False LOG_HANDLER=stdout @@ -35,6 +35,7 @@ PRODUCER_MAX_REQUEST_SIZE=1048576 SESSION_TIMEOUT_MS=10000 KARAPACE_REST=False KARAPACE_REGISTRY=True +KARAPACE_PORT=8081 NAME_STRATEGY=topic_name NAME_STRATEGY_VALIDATION=True MASTER_ELECTION_STRATEGY=lowest diff --git a/container/karapace.rest.env b/container/karapace.rest.env new file mode 100644 index 000000000..1e4c60352 --- /dev/null +++ b/container/karapace.rest.env @@ -0,0 +1,51 @@ +KARAPACE_DOTENV=/opt/karapace/karapace.env +ACCESS_LOGS_DEBUG=False +# ACCESS_LOG_CLASS=karapace.utils.DebugAccessLogger +ACCESS_LOG_CLASS=aiohttp.web_log.AccessLogger +ADVERTISED_HOSTNAME=karapace-rest-proxy +ADVERTISED_PORT=8082 +ADVERTISED_PROTOCOL=http +BOOTSTRAP_URI=kafka:29092 +CLIENT_ID=karapace-rest-proxy +COMPATIBILITY=BACKWARD +CONNECTIONS_MAX_IDLE_MS=15000 +CONSUMER_ENABLE_AUTO_COMMIT=True +CONSUMER_REQUEST_TIMEOUT_MS=11000 +CONSUMER_REQUEST_MAX_BYTES=67108864 +CONSUMER_IDLE_DISCONNECT_TIMEOUT=0 +FETCH_MIN_BYTES=1 +GROUP_ID=karapace-rest-proxy +HOST=0.0.0.0 +PORT=8082 +REGISTRY_HOST=karapace-schema-registry +REGISTRY_PORT=8081 +REST_AUTHORIZATION=False +LOG_HANDLER=stdout +LOG_LEVEL=DEBUG +LOG_FORMAT=%(asctime)s [%(threadName)s] %(filename)s:%(funcName)s:%(lineno)d %(message)s +MASTER_ELIGIBILITY=True +REPLICATION_FACTOR=1 +SECURITY_PROTOCOL=PLAINTEXT +SSL_CHECK_HOSTNAME=True +TOPIC_NAME=_schemas +METADATA_MAX_AGE_MS=60000 +ADMIN_METADATA_MAX_AGE=5 +PRODUCER_ACKS=1 +PRODUCER_COUNT=5 +PRODUCER_LINGER_MS=100 +PRODUCER_MAX_REQUEST_SIZE=1048576 +SESSION_TIMEOUT_MS=10000 +KARAPACE_REST=True +KARAPACE_REGISTRY=False +KARAPACE_PORT=8082 +NAME_STRATEGY=topic_name +NAME_STRATEGY_VALIDATION=True +MASTER_ELECTION_STRATEGY=lowest +PROTOBUF_RUNTIME_DIRECTORY=runtime +STATSD_HOST=statsd-exporter +STATSD_PORT=8125 +KAFKA_SCHEMA_READER_STRICT_MODE=False +KAFKA_RETRIABLE_ERRORS_SILENCED=True +USE_PROTOBUF_FORMATTER=False +HTTP_REQUEST_MAX_SIZE=1048576 +TAGS='{ "app": "karapace-rest-proxy" }' diff --git a/container/start.sh b/container/start.sh deleted file mode 100755 index a00f045e0..000000000 --- a/container/start.sh +++ /dev/null @@ -1,39 +0,0 @@ -#!/usr/bin/env bash -set -Eeuo pipefail - -# Configuration is done using environment variables. The environment variable -# names are the same as the configuration keys, all letters in caps, and always -# start with `KARAPACE_`. - -# In the code below the expression ${var+isset} is used to check if the -# variable was defined, and ${var-isunset} if not. -# -# Ref: https://pubs.opengroup.org/onlinepubs/9699919799/utilities/V3_chap02.html#tag_18_06_02 - -case $1 in -rest) - # Reexport variables for compatibility - [[ -n ${KARAPACE_REST_ADVERTISED_HOSTNAME+isset} ]] && export KARAPACE_ADVERTISED_HOSTNAME="${KARAPACE_REST_ADVERTISED_HOSTNAME}" - [[ -n ${KARAPACE_REST_BOOTSTRAP_URI+isset} ]] && export KARAPACE_BOOTSTRAP_URI="${KARAPACE_REST_BOOTSTRAP_URI}" - [[ -n ${KARAPACE_REST_REGISTRY_HOST+isset} ]] && export KARAPACE_REGISTRY_HOST="${KARAPACE_REST_REGISTRY_HOST}" - [[ -n ${KARAPACE_REST_REGISTRY_PORT+isset} ]] && export KARAPACE_REGISTRY_PORT="${KARAPACE_REST_REGISTRY_PORT}" - [[ -n ${KARAPACE_REST_HOST+isset} ]] && export KARAPACE_HOST="${KARAPACE_REST_HOST}" - [[ -n ${KARAPACE_REST_PORT+isset} ]] && export KARAPACE_PORT="${KARAPACE_REST_PORT}" - [[ -n ${KARAPACE_REST_ADMIN_METADATA_MAX_AGE+isset} ]] && export KARAPACE_ADMIN_METADATA_MAX_AGE="${KARAPACE_REST_ADMIN_METADATA_MAX_AGE}" - [[ -n ${KARAPACE_REST_LOG_LEVEL+isset} ]] && export KARAPACE_LOG_LEVEL="${KARAPACE_REST_LOG_LEVEL}" - export KARAPACE_REST=1 - echo "{}" >/opt/karapace/rest.config.json - - echo "Starting Karapace REST API" - exec python3 -m karapace.karapace_all /opt/karapace/rest.config.json - ;; -registry) - exec python3 -m schema_registry - ;; -*) - echo "usage: start-karapace.sh " - exit 0 - ;; -esac - -wait diff --git a/pyproject.toml b/pyproject.toml index 8cd2b27cc..6512d9d23 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -113,7 +113,7 @@ include-package-data = true where = ["src"] [tool.setuptools.package-data] -schema_registry = ["*.yaml"] +karapace = ["*.yaml"] [tool.setuptools_scm] version_file = "src/karapace/version.py" diff --git a/requirements/requirements-dev.txt b/requirements/requirements-dev.txt index c749d0098..532ed048d 100644 --- a/requirements/requirements-dev.txt +++ b/requirements/requirements-dev.txt @@ -58,7 +58,7 @@ configargparse==1.7 # via locust confluent-kafka==2.4.0 # via karapace (/karapace/pyproject.toml) -coverage[toml]==7.6.7 +coverage[toml]==7.6.8 # via pytest-cov cramjam==2.9.0 # via python-snappy @@ -104,7 +104,7 @@ gevent==24.11.1 # via # geventhttpclient # locust -geventhttpclient==2.3.1 +geventhttpclient==2.3.3 # via locust greenlet==3.1.1 # via gevent diff --git a/src/karapace/auth/auth.py b/src/karapace/auth.py similarity index 96% rename from src/karapace/auth/auth.py rename to src/karapace/auth.py index ccaab30e6..cfc566cdf 100644 --- a/src/karapace/auth/auth.py +++ b/src/karapace/auth.py @@ -8,7 +8,7 @@ from dataclasses import dataclass, field from enum import Enum, unique from hmac import compare_digest -from karapace.config import InvalidConfiguration +from karapace.config import Config, InvalidConfiguration from karapace.statsd import StatsClient from karapace.utils import json_decode, json_encode from typing import Protocol @@ -205,14 +205,12 @@ def check_authorization_any(self, user: User | None, operation: Operation, resou class HTTPAuthorizer(ACLAuthorizer, AuthenticatorAndAuthorizer): - def __init__(self, filename: str) -> None: + def __init__(self, config: Config) -> None: super().__init__() - self._auth_filename: str = filename + self._auth_filename: str = config.registry_authfile self._auth_mtime: float = -1 self._refresh_auth_task: asyncio.Task | None = None self._refresh_auth_awatch_stop_event = asyncio.Event() - # Once first, can raise if file not valid - self._load_authfile() @property def authfile_last_modified(self) -> float: @@ -221,6 +219,7 @@ def authfile_last_modified(self) -> float: @override async def start(self, stats: StatsClient) -> None: """Start authfile refresher task""" + self._load_authfile() async def _refresh_authfile() -> None: """Reload authfile, but keep old auth data if loading fails""" @@ -294,6 +293,14 @@ def authenticate(self, *, username: str, password: str) -> User: return user +def get_authorizer( + config: Config, + http_authorizer: HTTPAuthorizer, + no_auth_authorizer: NoAuthAndAuthz, +) -> AuthenticatorAndAuthorizer: + return http_authorizer if config.registry_authfile else no_auth_authorizer + + def main() -> int: parser = argparse.ArgumentParser(prog="karapace_mkpasswd", description="Karapace password hasher") parser.add_argument("-u", "--user", help="Username", type=str) diff --git a/src/karapace/auth/__init__.py b/src/karapace/auth/__init__.py deleted file mode 100644 index e69de29bb..000000000 diff --git a/src/karapace/auth/dependencies.py b/src/karapace/auth/dependencies.py deleted file mode 100644 index 671769714..000000000 --- a/src/karapace/auth/dependencies.py +++ /dev/null @@ -1,77 +0,0 @@ -""" -Copyright (c) 2024 Aiven Ltd -See LICENSE for details -""" - -from fastapi import Depends, HTTPException, Security, status -from fastapi.security import HTTPBasic, HTTPBasicCredentials -from fastapi.security.base import SecurityBase -from karapace.auth.auth import AuthenticationError, AuthenticatorAndAuthorizer, HTTPAuthorizer, NoAuthAndAuthz, User -from schema_registry.dependencies.config_dependency import ConfigDependencyManager -from typing import Annotated, Optional - -import logging - -LOG = logging.getLogger(__name__) - - -class AuthorizationDependencyManager: - AUTHORIZER: AuthenticatorAndAuthorizer | None = None - AUTH_SET: bool = False - SECURITY: SecurityBase | None = None - - @classmethod - def get_authorizer(cls) -> AuthenticatorAndAuthorizer: - if AuthorizationDependencyManager.AUTH_SET: - assert AuthorizationDependencyManager.AUTHORIZER - return AuthorizationDependencyManager.AUTHORIZER - - config = ConfigDependencyManager.get_config() - if config.registry_authfile: - AuthorizationDependencyManager.AUTHORIZER = HTTPAuthorizer(config.registry_authfile) - else: - # TODO: remove the need for empty authorization logic. - AuthorizationDependencyManager.AUTHORIZER = NoAuthAndAuthz() - AuthorizationDependencyManager.AUTH_SET = True - return AuthorizationDependencyManager.AUTHORIZER - - -AuthenticatorAndAuthorizerDep = Annotated[AuthenticatorAndAuthorizer, Depends(AuthorizationDependencyManager.get_authorizer)] - -# TODO Karapace can have authentication/authorization enabled or disabled. This code needs cleanup and better -# injection mechanism, this is fast workaround for optional user authentication and authorization. -SECURITY: SecurityBase | None = None -config = ConfigDependencyManager.get_config() -if config.registry_authfile: - SECURITY = HTTPBasic(auto_error=False) - - def get_current_user( - credentials: Annotated[Optional[HTTPBasicCredentials], Security(SECURITY)], - authorizer: AuthenticatorAndAuthorizerDep, - ) -> User: - if authorizer and not credentials: - raise HTTPException( - status_code=status.HTTP_401_UNAUTHORIZED, - detail={"message": "Unauthorized"}, - headers={"WWW-Authenticate": 'Basic realm="Karapace Schema Registry"'}, - ) - assert authorizer is not None - assert credentials is not None - username: str = credentials.username - password: str = credentials.password - try: - return authorizer.authenticate(username=username, password=password) - except AuthenticationError: - raise HTTPException( - status_code=status.HTTP_401_UNAUTHORIZED, - detail={"message": "Unauthorized"}, - headers={"WWW-Authenticate": 'Basic realm="Karapace Schema Registry"'}, - ) - -else: - - def get_current_user() -> None: - return None - - -CurrentUserDep = Annotated[Optional[User], Depends(get_current_user)] diff --git a/src/schema_registry/base_config.yaml b/src/karapace/base_config.yaml similarity index 100% rename from src/schema_registry/base_config.yaml rename to src/karapace/base_config.yaml diff --git a/src/karapace/config.py b/src/karapace/config.py index 511c25897..4ace0fc74 100644 --- a/src/karapace/config.py +++ b/src/karapace/config.py @@ -7,23 +7,32 @@ from __future__ import annotations from collections.abc import Mapping +from copy import deepcopy from karapace.constants import DEFAULT_AIOHTTP_CLIENT_MAX_SIZE, DEFAULT_PRODUCER_MAX_REQUEST, DEFAULT_SCHEMA_TOPIC from karapace.typing import ElectionStrategy, NameStrategy from karapace.utils import json_encode from pathlib import Path -from pydantic import BaseSettings +from pydantic import BaseModel, BaseSettings, PyObject +from typing import Final import logging import os import socket import ssl +KARAPACE_ROOT: Final[Path] = Path(__file__).parent +KARAPACE_BASE_CONFIG_YAML_PATH: Final[Path] = KARAPACE_ROOT / "base_config.yaml" + HOSTNAME = socket.gethostname() +class KarapaceTags(BaseModel): + app: str = "Karapace" + + class Config(BaseSettings): access_logs_debug: bool = False - access_log_class: type | None = None + access_log_class: PyObject | None = None advertised_hostname: str | None = None advertised_port: int | None = None advertised_protocol: str = "http" @@ -58,6 +67,7 @@ class Config(BaseSettings): master_eligibility: bool = True replication_factor: int = 1 security_protocol: str = "PLAINTEXT" + ssl_ciphers: str | None = None ssl_cafile: str | None = None ssl_certfile: str | None = None ssl_keyfile: str | None = None @@ -88,9 +98,9 @@ class Config(BaseSettings): kafka_schema_reader_strict_mode: bool = False kafka_retriable_errors_silenced: bool = True use_protobuf_formatter: bool = False + sentry_dsn: str | None = None - sentry: Mapping[str, object] | None = None - tags: Mapping[str, object] | None = None + tags: KarapaceTags = KarapaceTags() # add rest uri if not set # f"{new_config['advertised_protocol']}://{new_config['advertised_hostname']}:{new_config['advertised_port']}" @@ -117,6 +127,40 @@ def to_env_str(self) -> str: env_lines.append(f"{key.upper()}={value}") return "\n".join(env_lines) + def set_config_defaults(self, new_config: Mapping[str, str]) -> Config: + config = deepcopy(self) + for key, value in new_config.items(): + setattr(config, key, value) + + # Fallback to default port if `advertised_port` is not set + if config.advertised_port is None: + config.advertised_port = new_config["port"] + + # Fallback to `advertised_*` constructed URI if not set + if config.rest_base_uri is None: + config.rest_base_uri = f"{config.advertised_protocol}://{config.advertised_hostname}:{config.advertised_port}" + + # Set the aiohttp client max size if REST Proxy is enabled and producer max request configuration is altered + # from default and aiohttp client max size is not set + # Use the http request max size from the configuration without altering if set. + if ( + config.karapace_rest + and config.producer_max_request_size > DEFAULT_PRODUCER_MAX_REQUEST + and config.http_request_max_size is None + ): + # REST Proxy API configuration for producer max request size must be taken into account + # also for the aiohttp.web.Application client max size. + # Always add the aiohttp default client max size as the headroom above the producer max request size. + # The input JSON size for REST Proxy is not easy to estimate, lot of small records in single request has + # a lot of overhead due to JSON structure. + config.http_request_max_size = config.producer_max_request_size + DEFAULT_AIOHTTP_CLIENT_MAX_SIZE + elif config.http_request_max_size is None: + # Set the default aiohttp client max size + config.http_request_max_size = DEFAULT_AIOHTTP_CLIENT_MAX_SIZE + + validate_config(config) + return config + # class ConfigDefaults(Config, total=False): # ... @@ -145,45 +189,6 @@ def parse_env_value(value: str) -> str | int | bool: return value -def set_config_defaults(config: Config) -> Config: - # Fallback to default port if `advertised_port` is not set - if config["advertised_port"] is None: - config["advertised_port"] = new_config["port"] - - # Fallback to `advertised_*` constructed URI if not set - if new_config["rest_base_uri"] is None: - new_config[ - "rest_base_uri" - ] = f"{new_config['advertised_protocol']}://{new_config['advertised_hostname']}:{new_config['advertised_port']}" - - # Tag app should always be karapace - new_config.setdefault("tags", {}) - new_config["tags"]["app"] = "Karapace" - - # Set the aiohttp client max size if REST Proxy is enabled and producer max request configuration is altered from default - # and aiohttp client max size is not set - # Use the http request max size from the configuration without altering if set. - if ( - new_config["karapace_rest"] - and new_config["producer_max_request_size"] > DEFAULT_PRODUCER_MAX_REQUEST - and new_config["http_request_max_size"] is None - ): - # REST Proxy API configuration for producer max request size must be taken into account - # also for the aiohttp.web.Application client max size. - # Always add the aiohttp default client max size as the headroom above the producer max request size. - # The input JSON size for REST Proxy is not easy to estimate, lot of small records in single request has - # a lot of overhead due to JSON structure. - new_config["http_request_max_size"] = new_config["producer_max_request_size"] + DEFAULT_AIOHTTP_CLIENT_MAX_SIZE - elif new_config["http_request_max_size"] is None: - # Set the default aiohttp client max size - new_config["http_request_max_size"] = DEFAULT_AIOHTTP_CLIENT_MAX_SIZE - - # set_settings_from_environment(new_config) - set_sentry_dsn_from_environment(new_config) - validate_config(new_config) - return new_config - - # def set_settings_from_environment(config: Config) -> None: # """The environment variables have precedence and overwrite the configuration settings.""" # for config_name in DEFAULTS: @@ -218,7 +223,7 @@ def set_sentry_dsn_from_environment(config: Config) -> None: def validate_config(config: Config) -> None: - master_election_strategy = config["master_election_strategy"] + master_election_strategy = config.master_election_strategy try: ElectionStrategy(master_election_strategy.lower()) except ValueError: @@ -227,7 +232,7 @@ def validate_config(config: Config) -> None: f"Invalid master election strategy: {master_election_strategy}, valid values are {valid_strategies}" ) from None - name_strategy = config["name_strategy"] + name_strategy = config.name_strategy try: NameStrategy(name_strategy) except ValueError: @@ -236,7 +241,7 @@ def validate_config(config: Config) -> None: f"Invalid default name strategy: {name_strategy}, valid values are {valid_strategies}" ) from None - if config["rest_authorization"] and config["sasl_bootstrap_uri"] is None: + if config.rest_authorization and config.sasl_bootstrap_uri is None: raise InvalidConfiguration( "Using 'rest_authorization' requires configuration value for 'sasl_bootstrap_uri' to be set" ) @@ -253,17 +258,10 @@ def write_env_file(dot_env_path: Path, config: Config) -> None: def read_env_file(env_file_path: str) -> Config: return Config(_env_file=env_file_path, _env_file_encoding="utf-8") - Config() - try: - config = json_decode(config_handler) - except JSONDecodeError as ex: - raise InvalidConfiguration("Configuration is not a valid JSON") from ex - return set_config_defaults(config) - def create_client_ssl_context(config: Config) -> ssl.SSLContext | None: # taken from conn.py, as it adds a lot more logic to the context configuration than the initial version - if config["security_protocol"] in ("PLAINTEXT", "SASL_PLAINTEXT"): + if config.security_protocol in ("PLAINTEXT", "SASL_PLAINTEXT"): return None ssl_context = ssl.SSLContext(ssl.PROTOCOL_TLS) ssl_context.options |= ssl.OP_NO_SSLv2 @@ -271,30 +269,30 @@ def create_client_ssl_context(config: Config) -> ssl.SSLContext | None: ssl_context.options |= ssl.OP_NO_TLSv1 ssl_context.options |= ssl.OP_NO_TLSv1_1 ssl_context.verify_mode = ssl.CERT_OPTIONAL - if config["ssl_check_hostname"]: + if config.ssl_check_hostname: ssl_context.check_hostname = True - if config["ssl_cafile"]: - ssl_context.load_verify_locations(config["ssl_cafile"]) + if config.ssl_cafile: + ssl_context.load_verify_locations(config.ssl_cafile) ssl_context.verify_mode = ssl.CERT_REQUIRED - if config["ssl_certfile"] and config["ssl_keyfile"]: + if config.ssl_certfile and config.ssl_keyfile: ssl_context.load_cert_chain( - certfile=config["ssl_certfile"], - keyfile=config["ssl_keyfile"], - password=config["ssl_password"], + certfile=config.ssl_certfile, + keyfile=config.ssl_keyfile, + password=config.ssl_password, ) - if config["ssl_crlfile"]: + if config.ssl_crlfile: if not hasattr(ssl, "VERIFY_CRL_CHECK_LEAF"): raise RuntimeError("This version of Python does not support ssl_crlfile!") - ssl_context.load_verify_locations(config["ssl_crlfile"]) + ssl_context.load_verify_locations(config.ssl_crlfile) ssl_context.verify_flags |= ssl.VERIFY_CRL_CHECK_LEAF - if config.get("ssl_ciphers"): - ssl_context.set_ciphers(config["ssl_ciphers"]) + if config.ssl_ciphers: + ssl_context.set_ciphers(config.ssl_ciphers) return ssl_context def create_server_ssl_context(config: Config) -> ssl.SSLContext | None: - tls_certfile = config["server_tls_certfile"] - tls_keyfile = config["server_tls_keyfile"] + tls_certfile = config.server_tls_certfile + tls_keyfile = config.server_tls_keyfile if tls_certfile is None: if tls_keyfile is None: # Neither config value set, do not use TLS diff --git a/src/karapace/container.py b/src/karapace/container.py new file mode 100644 index 000000000..7c71e99ca --- /dev/null +++ b/src/karapace/container.py @@ -0,0 +1,40 @@ +""" +Copyright (c) 2024 Aiven Ltd +See LICENSE for details +""" + +from dependency_injector import containers, providers +from karapace.auth import get_authorizer, HTTPAuthorizer, NoAuthAndAuthz +from karapace.config import Config +from karapace.forward_client import ForwardClient +from karapace.instrumentation.prometheus import PrometheusInstrumentation +from karapace.schema_registry import KarapaceSchemaRegistry +from karapace.statsd import StatsClient + + +class KarapaceContainer(containers.DeclarativeContainer): + base_config = providers.Configuration() + config = providers.Singleton( + Config, + _env_file=base_config.karapace.env_file, + _env_file_encoding=base_config.karapace.env_file_encoding, + ) + + statsd = providers.Singleton(StatsClient, config=config) + + no_auth_authorizer = providers.Singleton(NoAuthAndAuthz) + + http_authorizer = providers.Singleton(HTTPAuthorizer, config=config) + + schema_registry = providers.Singleton(KarapaceSchemaRegistry, config=config) + + forward_client = providers.Singleton(ForwardClient) + + authorizer = providers.Factory( + get_authorizer, + config=config, + http_authorizer=http_authorizer, + no_auth_authorizer=no_auth_authorizer, + ) + + prometheus = providers.Singleton(PrometheusInstrumentation) diff --git a/src/karapace/instrumentation/prometheus.py b/src/karapace/instrumentation/prometheus.py index 1336b4ab0..90d260057 100644 --- a/src/karapace/instrumentation/prometheus.py +++ b/src/karapace/instrumentation/prometheus.py @@ -22,6 +22,7 @@ class PrometheusInstrumentation: METRICS_ENDPOINT_PATH: Final[str] = "/metrics" + CONTENT_TYPE_LATEST: Final[str] = "text/plain; version=0.0.4; charset=utf-8" START_TIME_REQUEST_KEY: Final[str] = "start_time" registry: Final[CollectorRegistry] = CollectorRegistry() diff --git a/src/karapace/kafka_rest_apis/__init__.py b/src/karapace/kafka_rest_apis/__init__.py index f41f53842..49d5093a3 100644 --- a/src/karapace/kafka_rest_apis/__init__.py +++ b/src/karapace/kafka_rest_apis/__init__.py @@ -18,6 +18,7 @@ from collections import namedtuple from confluent_kafka.error import KafkaException from contextlib import AsyncExitStack +from copy import deepcopy from http import HTTPStatus from karapace.config import Config from karapace.errors import InvalidSchema @@ -304,7 +305,8 @@ async def get_user_proxy(self, request: HTTPRequest) -> "UserRestProxy": key = auth_header if self.proxies.get(key) is None: - config = self.config.copy() + # config = self.config.copy() + config = deepcopy(self.config) config.bootstrap_uri = config.sasl_bootstrap_uri config.security_protocol = ( "SASL_SSL" if config.security_protocol in ("SSL", "SASL_SSL") else "SASL_PLAINTEXT" diff --git a/src/karapace/karapace_all.py b/src/karapace/karapace_all.py index c176bc337..29eeb0d20 100644 --- a/src/karapace/karapace_all.py +++ b/src/karapace/karapace_all.py @@ -4,61 +4,35 @@ """ from __future__ import annotations -from contextlib import closing +from dependency_injector.wiring import inject, Provide from karapace import version as karapace_version -from karapace.config import read_config +from karapace.config import Config, KARAPACE_BASE_CONFIG_YAML_PATH +from karapace.container import KarapaceContainer from karapace.instrumentation.prometheus import PrometheusInstrumentation from karapace.kafka_rest_apis import KafkaRest -from karapace.logging import configure_logging -from karapace.rapu import RestApp -from schema_registry.schema_registry_apis import KarapaceSchemaRegistryController +from karapace.logging import configure_logging, log_config_without_secrets import argparse import logging import sys -class KarapaceAll(KafkaRest, KarapaceSchemaRegistryController): - pass - - -def main() -> int: +@inject +def main( + config: Config = Provide[KarapaceContainer.config], + prometheus: PrometheusInstrumentation = Provide[KarapaceContainer.prometheus], +) -> int: parser = argparse.ArgumentParser(prog="karapace", description="Karapace: Your Kafka essentials in one tool") parser.add_argument("--version", action="version", help="show program version", version=karapace_version.__version__) - parser.add_argument("config_file", help="configuration file path", type=argparse.FileType()) - arg = parser.parse_args() - - with closing(arg.config_file): - config = read_config(arg.config_file) - + parser.parse_args() configure_logging(config=config) + log_config_without_secrets(config=config) - app: RestApp - if config["karapace_rest"] and config["karapace_registry"]: - info_str = "both services" - app = KarapaceAll(config=config) - elif config["karapace_rest"]: - info_str = "karapace rest" - app = KafkaRest(config=config) - elif config["karapace_registry"]: - info_str = "karapace schema registry" - app = KarapaceSchemaRegistryController(config=config) - else: - print("Both rest and registry options are disabled, exiting") - return 1 - - info_str_separator = "=" * 100 - logging.log(logging.INFO, "\n%s\nStarting %s\n%s", info_str_separator, info_str, info_str_separator) - - config_without_secrets = {} - for key, value in config.items(): - if "password" in key: - value = "****" - config_without_secrets[key] = value - logging.log(logging.DEBUG, "Config %r", config_without_secrets) + logging.info("\n%s\nStarting %s\n%s", ("=" * 100), "Starting Karapace Rest Proxy", ("=" * 100)) + app = KafkaRest(config=config) try: - PrometheusInstrumentation.setup_metrics(app=app) + prometheus.setup_metrics(app=app) app.run() # `close` will be called by the callback `close_by_app` set by `KarapaceBase` except Exception as ex: # pylint: disable-broad-except app.stats.unexpected_exception(ex=ex, where="karapace") @@ -67,4 +41,7 @@ def main() -> int: if __name__ == "__main__": + container = KarapaceContainer() + container.base_config.from_yaml(KARAPACE_BASE_CONFIG_YAML_PATH, envs_required=True, required=True) + container.wire(modules=[__name__]) sys.exit(main()) diff --git a/src/karapace/logging.py b/src/karapace/logging.py index ad6656bc3..a8521601e 100644 --- a/src/karapace/logging.py +++ b/src/karapace/logging.py @@ -31,6 +31,7 @@ def configure_logging(*, config: Config) -> None: logging.root.addHandler(root_handler) logging.root.setLevel(config.log_level) + logging.getLogger("aiohttp.access").setLevel(config.log_level) logging.getLogger("uvicorn.error").setLevel(config.log_level) diff --git a/src/karapace/protobuf/io.py b/src/karapace/protobuf/io.py index 36c76e491..89cdd26f1 100644 --- a/src/karapace/protobuf/io.py +++ b/src/karapace/protobuf/io.py @@ -97,7 +97,7 @@ def get_protobuf_class_instance( class_name: str, cfg: Config, ) -> _ProtobufModel: - directory = Path(cfg["protobuf_runtime_directory"]) + directory = Path(cfg.protobuf_runtime_directory) deps_list = crawl_dependencies(schema) root_class_name = "" for value in deps_list.values(): diff --git a/src/karapace/sentry/__init__.py b/src/karapace/sentry/__init__.py index 8c3b173e5..8aaf572bf 100644 --- a/src/karapace/sentry/__init__.py +++ b/src/karapace/sentry/__init__.py @@ -1,14 +1,14 @@ from __future__ import annotations -from karapace.sentry.sentry_client_api import KarapaceSentryConfig, SentryClientAPI, SentryNoOpClient +from karapace.sentry.sentry_client_api import SentryClientAPI, SentryNoOpClient import logging LOG = logging.getLogger(__name__) -def _get_sentry_noop_client(sentry_config: KarapaceSentryConfig) -> SentryClientAPI: - return SentryNoOpClient(sentry_config=sentry_config) +def _get_sentry_noop_client(sentry_dsn: str) -> SentryClientAPI: + return SentryNoOpClient(sentry_dsn=sentry_dsn) _get_sentry_client = _get_sentry_noop_client @@ -18,13 +18,13 @@ def _get_sentry_noop_client(sentry_config: KarapaceSentryConfig) -> SentryClient from karapace.sentry.sentry_client import SentryClient # If Sentry SDK can be imported in SentryClient the Sentry SDK can be initialized. - def _get_actual_sentry_client(sentry_config: KarapaceSentryConfig) -> SentryClientAPI: - return SentryClient(sentry_config=sentry_config) + def _get_actual_sentry_client(sentry_dsn: str) -> SentryClientAPI: + return SentryClient(sentry_dsn=sentry_dsn) _get_sentry_client = _get_actual_sentry_client except ImportError: LOG.warning("Cannot enable Sentry.io sending: importing 'sentry_sdk' failed") -def get_sentry_client(sentry_config: KarapaceSentryConfig) -> SentryClientAPI: - return _get_sentry_client(sentry_config=sentry_config) +def get_sentry_client(sentry_dsn: str) -> SentryClientAPI: + return _get_sentry_client(sentry_dsn=sentry_dsn) diff --git a/src/karapace/sentry/sentry_client.py b/src/karapace/sentry/sentry_client.py index c4dc99d33..88b47d0fb 100644 --- a/src/karapace/sentry/sentry_client.py +++ b/src/karapace/sentry/sentry_client.py @@ -5,34 +5,32 @@ from __future__ import annotations from collections.abc import Mapping -from karapace.sentry.sentry_client_api import KarapaceSentryConfig, SentryClientAPI +from karapace.sentry.sentry_client_api import SentryClientAPI # The Sentry SDK is optional, omit pylint import error import sentry_sdk class SentryClient(SentryClientAPI): - def __init__(self, sentry_config: KarapaceSentryConfig) -> None: - super().__init__(sentry_config=sentry_config) + def __init__(self, sentry_dsn: str) -> None: + super().__init__(sentry_dsn=sentry_dsn) self._initialize_sentry() def _initialize_sentry(self) -> None: - sentry_config = ( - dict(self.sentry_config) - if self.sentry_config is not None - else { - "ignore_errors": [ - "ClientConnectorError", # aiohttp - "ClientPayloadError", # aiohttp - "ConnectionRefusedError", # kafka (asyncio) - "ConnectionResetError", # kafka, requests - "IncompleteReadError", # kafka (asyncio) - "ServerDisconnectedError", # aiohttp - "ServerTimeoutError", # aiohttp - "TimeoutError", # kafka - ] - } - ) + sentry_config = { + "dsn": self.sentry_dsn, + "default_integrations": False, + "ignore_errors": [ + "ClientConnectorError", # aiohttp + "ClientPayloadError", # aiohttp + "ConnectionRefusedError", # kafka (asyncio) + "ConnectionResetError", # kafka, requests + "IncompleteReadError", # kafka (asyncio) + "ServerDisconnectedError", # aiohttp + "ServerTimeoutError", # aiohttp + "TimeoutError", # kafka + ], + } # If the DSN is not in the config or in SENTRY_DSN environment variable # the Sentry client does not send any events. diff --git a/src/karapace/sentry/sentry_client_api.py b/src/karapace/sentry/sentry_client_api.py index 4ca9575c8..6ed166eff 100644 --- a/src/karapace/sentry/sentry_client_api.py +++ b/src/karapace/sentry/sentry_client_api.py @@ -5,14 +5,11 @@ from __future__ import annotations from collections.abc import Mapping -from typing_extensions import TypeAlias - -KarapaceSentryConfig: TypeAlias = "Mapping[str, object] | None" class SentryClientAPI: - def __init__(self, sentry_config: KarapaceSentryConfig) -> None: - self.sentry_config = sentry_config or {} + def __init__(self, sentry_dsn: str) -> None: + self.sentry_dsn = sentry_dsn def unexpected_exception( self, diff --git a/src/karapace/statsd.py b/src/karapace/statsd.py index 13b0db0a4..a29562d8d 100644 --- a/src/karapace/statsd.py +++ b/src/karapace/statsd.py @@ -29,7 +29,7 @@ def __init__(self, config: Config) -> None: self._dest_addr: Final = (config.statsd_host, config.statsd_port) self._socket: Final = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) self._tags: Final = config.tags or {} - self.sentry_client: Final = get_sentry_client(sentry_config=(config.sentry or None)) + self.sentry_client: Final = get_sentry_client(sentry_dsn=config.sentry_dsn) @contextmanager def timing_manager(self, metric: str, tags: dict | None = None) -> Iterator[None]: diff --git a/src/schema_registry/__main__.py b/src/schema_registry/__main__.py index 31a972576..2f9b8a712 100644 --- a/src/schema_registry/__main__.py +++ b/src/schema_registry/__main__.py @@ -2,76 +2,44 @@ Copyright (c) 2024 Aiven Ltd See LICENSE for details """ -from collections.abc import AsyncGenerator -from contextlib import asynccontextmanager -from fastapi import FastAPI, Depends -from karapace import version as karapace_version -from karapace.auth.auth import AuthenticatorAndAuthorizer -from karapace.auth.dependencies import AuthorizationDependencyManager -from karapace.config import Config -from schema_registry.dependencies.schema_registry_dependency import SchemaRegistryDependencyManager -from schema_registry.dependencies.stats_dependeny import StatsDependencyManager -from karapace.logging import configure_logging, log_config_without_secrets -from karapace.schema_registry import KarapaceSchemaRegistry -from opentelemetry.instrumentation.fastapi import FastAPIInstrumentor -from schema_registry.http_handlers import setup_exception_handlers -from schema_registry.middlewares import setup_middlewares -from schema_registry.routers import setup_routers -from typing import Final +from karapace.config import KARAPACE_BASE_CONFIG_YAML_PATH +from karapace.container import KarapaceContainer from schema_registry.container import SchemaRegistryContainer -from dependency_injector.wiring import Provide, inject +from schema_registry.factory import create_karapace_application, karapace_schema_registry_lifespan +from schema_registry.routers import compatibility, config, health, metrics, mode, schemas, subjects -import logging +import schema_registry.factory +import schema_registry.schema_registry_apis +import schema_registry.user import uvicorn -from pathlib import Path - -SCHEMA_REGISTRY_ROOT: Final[Path] = Path(__file__).parent - - -@asynccontextmanager -async def lifespan(_: FastAPI) -> AsyncGenerator[None, None]: - schema_registry: KarapaceSchemaRegistry | None = None - authorizer: AuthenticatorAndAuthorizer | None = None - try: - schema_registry = await SchemaRegistryDependencyManager.get_schema_registry() - await schema_registry.start() - await schema_registry.get_master() - authorizer = AuthorizationDependencyManager.get_authorizer() - if authorizer is not None: - await authorizer.start(StatsDependencyManager.get_stats()) - yield - finally: - if schema_registry: - await schema_registry.close() - if authorizer: - await authorizer.close() - - -@inject -def create_karapace_application(*, config: Config = Depends(Provide[SchemaRegistryContainer.config])) -> FastAPI: - configure_logging(config=config) - log_config_without_secrets(config=config) - logging.info("Starting Karapace Schema Registry (%s)", karapace_version.__version__) - - app = FastAPI(lifespan=lifespan) - setup_routers(app=app) - setup_exception_handlers(app=app) - setup_middlewares(app=app) - - FastAPIInstrumentor.instrument_app(app) - - return app - if __name__ == "__main__": - container = SchemaRegistryContainer() - container.base_config.from_yaml(f"{SCHEMA_REGISTRY_ROOT / 'base_config.yaml'}", envs_required=True, required=True) - container.wire(modules=[__name__,]) + container = KarapaceContainer() + container.base_config.from_yaml(KARAPACE_BASE_CONFIG_YAML_PATH, envs_required=True, required=True) + container.wire( + modules=[ + __name__, + schema_registry.schema_registry_apis, + ] + ) + + schema_registry_container = SchemaRegistryContainer(karapace_container=container) + schema_registry_container.wire( + modules=[ + __name__, + schema_registry.factory, + schema_registry.user, + health, + mode, + compatibility, + schemas, + subjects, + config, + metrics, + ] + ) - app = create_karapace_application() + app = create_karapace_application(config=container.config(), lifespan=karapace_schema_registry_lifespan) uvicorn.run( - app, - host=container.config().host, - port=container.config().port, - log_level=container.config().log_level.lower() + app, host=container.config().host, port=container.config().port, log_level=container.config().log_level.lower() ) diff --git a/src/schema_registry/container.py b/src/schema_registry/container.py index ad66de53e..b93bc4139 100644 --- a/src/schema_registry/container.py +++ b/src/schema_registry/container.py @@ -4,15 +4,15 @@ """ from dependency_injector import containers, providers -from dependency_injector.wiring import Provide, inject - -from karapace.config import Config +from karapace.container import KarapaceContainer +from schema_registry.schema_registry_apis import KarapaceSchemaRegistryController class SchemaRegistryContainer(containers.DeclarativeContainer): - base_config = providers.Configuration() - config = providers.Singleton( - Config, - _env_file=base_config.karapace.env_file, - _env_file_encoding=base_config.karapace.env_file_encoding, + karapace_container = providers.Container(KarapaceContainer) + schema_registry_controller = providers.Singleton( + KarapaceSchemaRegistryController, + config=karapace_container.config, + schema_registry=karapace_container.schema_registry, + stats=karapace_container.statsd, ) diff --git a/src/schema_registry/dependencies/config_dependency.py b/src/schema_registry/dependencies/config_dependency.py deleted file mode 100644 index 9c299b725..000000000 --- a/src/schema_registry/dependencies/config_dependency.py +++ /dev/null @@ -1,23 +0,0 @@ -""" -Copyright (c) 2024 Aiven Ltd -See LICENSE for details -""" - -from fastapi import Depends -from karapace.config import Config -from typing import Annotated - -import os - -env_file = os.environ.get("KARAPACE_DOTENV", None) - - -class ConfigDependencyManager: - CONFIG = Config(_env_file=env_file, _env_file_encoding="utf-8") - - @classmethod - def get_config(cls) -> Config: - return ConfigDependencyManager.CONFIG - - -ConfigDep = Annotated[Config, Depends(ConfigDependencyManager.get_config)] diff --git a/src/schema_registry/dependencies/controller_dependency.py b/src/schema_registry/dependencies/controller_dependency.py deleted file mode 100644 index 60da9bf29..000000000 --- a/src/schema_registry/dependencies/controller_dependency.py +++ /dev/null @@ -1,23 +0,0 @@ -""" -Copyright (c) 2024 Aiven Ltd -See LICENSE for details -""" - - -from fastapi import Depends -from schema_registry.dependencies.config_dependency import ConfigDep -from schema_registry.dependencies.schema_registry_dependency import SchemaRegistryDep -from schema_registry.dependencies.stats_dependeny import StatsDep -from schema_registry.schema_registry_apis import KarapaceSchemaRegistryController -from typing import Annotated - - -async def get_controller( - config: ConfigDep, - stats: StatsDep, - schema_registry: SchemaRegistryDep, -) -> KarapaceSchemaRegistryController: - return KarapaceSchemaRegistryController(config=config, schema_registry=schema_registry, stats=stats) - - -KarapaceSchemaRegistryControllerDep = Annotated[KarapaceSchemaRegistryController, Depends(get_controller)] diff --git a/src/schema_registry/dependencies/forward_client_dependency.py b/src/schema_registry/dependencies/forward_client_dependency.py deleted file mode 100644 index 57459c371..000000000 --- a/src/schema_registry/dependencies/forward_client_dependency.py +++ /dev/null @@ -1,20 +0,0 @@ -""" -Copyright (c) 2024 Aiven Ltd -See LICENSE for details -""" - -from fastapi import Depends -from karapace.forward_client import ForwardClient -from typing import Annotated - -FORWARD_CLIENT: ForwardClient | None = None - - -def get_forward_client() -> ForwardClient: - global FORWARD_CLIENT - if not FORWARD_CLIENT: - FORWARD_CLIENT = ForwardClient() - return FORWARD_CLIENT - - -ForwardClientDep = Annotated[ForwardClient, Depends(get_forward_client)] diff --git a/src/schema_registry/dependencies/schema_registry_dependency.py b/src/schema_registry/dependencies/schema_registry_dependency.py deleted file mode 100644 index 4823a4cc1..000000000 --- a/src/schema_registry/dependencies/schema_registry_dependency.py +++ /dev/null @@ -1,24 +0,0 @@ -""" -Copyright (c) 2024 Aiven Ltd -See LICENSE for details -""" - -from fastapi import Depends -from schema_registry.dependencies.config_dependency import ConfigDependencyManager -from karapace.schema_registry import KarapaceSchemaRegistry -from typing import Annotated - - -class SchemaRegistryDependencyManager: - SCHEMA_REGISTRY: KarapaceSchemaRegistry | None = None - - @classmethod - async def get_schema_registry(cls) -> KarapaceSchemaRegistry: - if not SchemaRegistryDependencyManager.SCHEMA_REGISTRY: - SchemaRegistryDependencyManager.SCHEMA_REGISTRY = KarapaceSchemaRegistry( - config=ConfigDependencyManager.get_config() - ) - return SchemaRegistryDependencyManager.SCHEMA_REGISTRY - - -SchemaRegistryDep = Annotated[KarapaceSchemaRegistry, Depends(SchemaRegistryDependencyManager.get_schema_registry)] diff --git a/src/schema_registry/dependencies/stats_dependeny.py b/src/schema_registry/dependencies/stats_dependeny.py deleted file mode 100644 index b619443c5..000000000 --- a/src/schema_registry/dependencies/stats_dependeny.py +++ /dev/null @@ -1,23 +0,0 @@ -""" -Copyright (c) 2024 Aiven Ltd -See LICENSE for details -""" - - -from fastapi import Depends -from schema_registry.dependencies.config_dependency import ConfigDependencyManager -from karapace.statsd import StatsClient -from typing import Annotated - - -class StatsDependencyManager: - STATS_CLIENT: StatsClient | None = None - - @classmethod - def get_stats(cls) -> StatsClient: - if not StatsDependencyManager.STATS_CLIENT: - StatsDependencyManager.STATS_CLIENT = StatsClient(config=ConfigDependencyManager.get_config()) - return StatsDependencyManager.STATS_CLIENT - - -StatsDep = Annotated[StatsClient, Depends(StatsDependencyManager.get_stats)] diff --git a/src/schema_registry/factory.py b/src/schema_registry/factory.py new file mode 100644 index 000000000..662b06d59 --- /dev/null +++ b/src/schema_registry/factory.py @@ -0,0 +1,57 @@ +""" +Copyright (c) 2024 Aiven Ltd +See LICENSE for details +""" +from collections.abc import AsyncGenerator +from contextlib import asynccontextmanager +from dependency_injector.wiring import inject, Provide +from fastapi import Depends, FastAPI +from karapace import version as karapace_version +from karapace.auth import AuthenticatorAndAuthorizer +from karapace.config import Config +from karapace.logging import configure_logging, log_config_without_secrets +from karapace.schema_registry import KarapaceSchemaRegistry +from karapace.statsd import StatsClient +from opentelemetry.instrumentation.fastapi import FastAPIInstrumentor +from schema_registry.container import SchemaRegistryContainer +from schema_registry.http_handlers import setup_exception_handlers +from schema_registry.middlewares import setup_middlewares +from schema_registry.routers.setup import setup_routers + +import logging + + +@asynccontextmanager +@inject +async def karapace_schema_registry_lifespan( + _: FastAPI, + stastd: StatsClient = Depends(Provide[SchemaRegistryContainer.karapace_container.statsd]), + schema_registry: KarapaceSchemaRegistry = Depends(Provide[SchemaRegistryContainer.karapace_container.schema_registry]), + authorizer: AuthenticatorAndAuthorizer = Depends(Provide[SchemaRegistryContainer.karapace_container.authorizer]), +) -> AsyncGenerator[None, None]: + try: + await schema_registry.start() + await schema_registry.get_master() + await authorizer.start(stats=stastd) + + yield + finally: + if schema_registry: + await schema_registry.close() + if authorizer: + await authorizer.close() + + +def create_karapace_application(*, config: Config, lifespan: AsyncGenerator[None, None]) -> FastAPI: + configure_logging(config=config) + log_config_without_secrets(config=config) + logging.info("Starting Karapace Schema Registry (%s)", karapace_version.__version__) + + app = FastAPI(lifespan=lifespan) + setup_routers(app=app) + setup_exception_handlers(app=app) + setup_middlewares(app=app) + + FastAPIInstrumentor.instrument_app(app) + + return app diff --git a/src/schema_registry/routers/__init__.py b/src/schema_registry/routers/__init__.py index e077a1551..f53be7121 100644 --- a/src/schema_registry/routers/__init__.py +++ b/src/schema_registry/routers/__init__.py @@ -2,22 +2,3 @@ Copyright (c) 2024 Aiven Ltd See LICENSE for details """ - -from fastapi import FastAPI -from schema_registry.routers.compatibility_router import compatibility_router -from schema_registry.routers.config_router import config_router -from schema_registry.routers.health_router import health_router -from schema_registry.routers.mode_router import mode_router -from schema_registry.routers.root_router import root_router -from schema_registry.routers.schemas_router import schemas_router -from schema_registry.routers.subjects_router import subjects_router - - -def setup_routers(app: FastAPI) -> None: - app.include_router(compatibility_router) - app.include_router(config_router) - app.include_router(health_router) - app.include_router(mode_router) - app.include_router(root_router) - app.include_router(schemas_router) - app.include_router(subjects_router) diff --git a/src/schema_registry/routers/compatibility_router.py b/src/schema_registry/routers/compatibility.py similarity index 57% rename from src/schema_registry/routers/compatibility_router.py rename to src/schema_registry/routers/compatibility.py index 108773981..0e91e3625 100644 --- a/src/schema_registry/routers/compatibility_router.py +++ b/src/schema_registry/routers/compatibility.py @@ -3,13 +3,16 @@ See LICENSE for details """ -from fastapi import APIRouter -from karapace.auth.auth import Operation -from karapace.auth.dependencies import AuthenticatorAndAuthorizerDep, CurrentUserDep -from schema_registry.dependencies.controller_dependency import KarapaceSchemaRegistryControllerDep +from dependency_injector.wiring import inject, Provide +from fastapi import APIRouter, Depends +from karapace.auth import AuthenticatorAndAuthorizer, Operation, User from karapace.typing import Subject +from schema_registry.container import SchemaRegistryContainer from schema_registry.routers.errors import unauthorized from schema_registry.routers.requests import CompatibilityCheckResponse, SchemaRequest +from schema_registry.schema_registry_apis import KarapaceSchemaRegistryController +from schema_registry.user import get_current_user +from typing import Annotated compatibility_router = APIRouter( prefix="/compatibility", @@ -19,13 +22,14 @@ @compatibility_router.post("/subjects/{subject}/versions/{version}", response_model_exclude_none=True) +@inject async def compatibility_post( - controller: KarapaceSchemaRegistryControllerDep, - user: CurrentUserDep, - authorizer: AuthenticatorAndAuthorizerDep, subject: Subject, version: str, # TODO support actual Version object schema_request: SchemaRequest, + user: Annotated[User, Depends(get_current_user)], + authorizer: AuthenticatorAndAuthorizer = Depends(Provide[SchemaRegistryContainer.karapace_container.authorizer]), + controller: KarapaceSchemaRegistryController = Depends(Provide[SchemaRegistryContainer.schema_registry_controller]), ) -> CompatibilityCheckResponse: if authorizer and not authorizer.check_authorization(user, Operation.Read, f"Subject:{subject}"): raise unauthorized() diff --git a/src/schema_registry/routers/config_router.py b/src/schema_registry/routers/config.py similarity index 52% rename from src/schema_registry/routers/config_router.py rename to src/schema_registry/routers/config.py index e7d6c22b4..d9944d5f4 100644 --- a/src/schema_registry/routers/config_router.py +++ b/src/schema_registry/routers/config.py @@ -3,15 +3,18 @@ See LICENSE for details """ -from fastapi import APIRouter, Request -from karapace.auth.auth import Operation -from karapace.auth.dependencies import AuthenticatorAndAuthorizerDep, CurrentUserDep -from schema_registry.dependencies.controller_dependency import KarapaceSchemaRegistryControllerDep -from schema_registry.dependencies.forward_client_dependency import ForwardClientDep -from schema_registry.dependencies.schema_registry_dependency import SchemaRegistryDep +from dependency_injector.wiring import inject, Provide +from fastapi import APIRouter, Depends, Request +from karapace.auth import AuthenticatorAndAuthorizer, Operation, User +from karapace.forward_client import ForwardClient +from karapace.schema_registry import KarapaceSchemaRegistry from karapace.typing import Subject +from schema_registry.container import SchemaRegistryContainer from schema_registry.routers.errors import no_primary_url_error, unauthorized from schema_registry.routers.requests import CompatibilityLevelResponse, CompatibilityRequest, CompatibilityResponse +from schema_registry.schema_registry_apis import KarapaceSchemaRegistryController +from schema_registry.user import get_current_user +from typing import Annotated config_router = APIRouter( prefix="/config", @@ -21,10 +24,11 @@ @config_router.get("") +@inject async def config_get( - controller: KarapaceSchemaRegistryControllerDep, - user: CurrentUserDep, - authorizer: AuthenticatorAndAuthorizerDep, + user: Annotated[User, Depends(get_current_user)], + authorizer: AuthenticatorAndAuthorizer = Depends(Provide[SchemaRegistryContainer.karapace_container.authorizer]), + controller: KarapaceSchemaRegistryController = Depends(Provide[SchemaRegistryContainer.schema_registry_controller]), ) -> CompatibilityLevelResponse: if authorizer and not authorizer.check_authorization(user, Operation.Read, "Config:"): raise unauthorized() @@ -33,14 +37,15 @@ async def config_get( @config_router.put("") +@inject async def config_put( request: Request, - controller: KarapaceSchemaRegistryControllerDep, - schema_registry: SchemaRegistryDep, - forward_client: ForwardClientDep, - user: CurrentUserDep, - authorizer: AuthenticatorAndAuthorizerDep, compatibility_level_request: CompatibilityRequest, + user: Annotated[User, Depends(get_current_user)], + schema_registry: KarapaceSchemaRegistry = Depends(Provide[SchemaRegistryContainer.karapace_container.schema_registry]), + forward_client: ForwardClient = Depends(Provide[SchemaRegistryContainer.karapace_container.forward_client]), + authorizer: AuthenticatorAndAuthorizer = Depends(Provide[SchemaRegistryContainer.karapace_container.authorizer]), + controller: KarapaceSchemaRegistryController = Depends(Provide[SchemaRegistryContainer.schema_registry_controller]), ) -> CompatibilityResponse: if authorizer and not authorizer.check_authorization(user, Operation.Write, "Config:"): raise unauthorized() @@ -56,11 +61,11 @@ async def config_put( @config_router.get("/{subject}") async def config_get_subject( - controller: KarapaceSchemaRegistryControllerDep, - user: CurrentUserDep, - authorizer: AuthenticatorAndAuthorizerDep, subject: Subject, + user: Annotated[User, Depends(get_current_user)], defaultToGlobal: bool = False, + authorizer: AuthenticatorAndAuthorizer = Depends(Provide[SchemaRegistryContainer.karapace_container.authorizer]), + controller: KarapaceSchemaRegistryController = Depends(Provide[SchemaRegistryContainer.schema_registry_controller]), ) -> CompatibilityLevelResponse: if authorizer and not authorizer.check_authorization(user, Operation.Read, f"Subject:{subject}"): raise unauthorized() @@ -71,13 +76,13 @@ async def config_get_subject( @config_router.put("/{subject}") async def config_set_subject( request: Request, - controller: KarapaceSchemaRegistryControllerDep, - schema_registry: SchemaRegistryDep, - forward_client: ForwardClientDep, - user: CurrentUserDep, - authorizer: AuthenticatorAndAuthorizerDep, subject: Subject, compatibility_level_request: CompatibilityRequest, + user: Annotated[User, Depends(get_current_user)], + schema_registry: KarapaceSchemaRegistry = Depends(Provide[SchemaRegistryContainer.karapace_container.schema_registry]), + forward_client: ForwardClient = Depends(Provide[SchemaRegistryContainer.karapace_container.forward_client]), + authorizer: AuthenticatorAndAuthorizer = Depends(Provide[SchemaRegistryContainer.karapace_container.authorizer]), + controller: KarapaceSchemaRegistryController = Depends(Provide[SchemaRegistryContainer.schema_registry_controller]), ) -> CompatibilityResponse: if authorizer and not authorizer.check_authorization(user, Operation.Write, f"Subject:{subject}"): raise unauthorized() @@ -94,12 +99,12 @@ async def config_set_subject( @config_router.delete("/{subject}") async def config_delete_subject( request: Request, - controller: KarapaceSchemaRegistryControllerDep, - schema_registry: SchemaRegistryDep, - forward_client: ForwardClientDep, - user: CurrentUserDep, - authorizer: AuthenticatorAndAuthorizerDep, subject: Subject, + user: Annotated[User, Depends(get_current_user)], + schema_registry: KarapaceSchemaRegistry = Depends(Provide[SchemaRegistryContainer.karapace_container.schema_registry]), + forward_client: ForwardClient = Depends(Provide[SchemaRegistryContainer.karapace_container.forward_client]), + authorizer: AuthenticatorAndAuthorizer = Depends(Provide[SchemaRegistryContainer.karapace_container.authorizer]), + controller: KarapaceSchemaRegistryController = Depends(Provide[SchemaRegistryContainer.schema_registry_controller]), ) -> CompatibilityResponse: if authorizer and not authorizer.check_authorization(user, Operation.Write, f"Subject:{subject}"): raise unauthorized() diff --git a/src/schema_registry/routers/health_router.py b/src/schema_registry/routers/health.py similarity index 84% rename from src/schema_registry/routers/health_router.py rename to src/schema_registry/routers/health.py index 36a3c6975..df3a8822f 100644 --- a/src/schema_registry/routers/health_router.py +++ b/src/schema_registry/routers/health.py @@ -3,9 +3,11 @@ See LICENSE for details """ -from fastapi import APIRouter, HTTPException, status -from schema_registry.dependencies.schema_registry_dependency import SchemaRegistryDep +from dependency_injector.wiring import inject, Provide +from fastapi import APIRouter, Depends, HTTPException, status +from karapace.schema_registry import KarapaceSchemaRegistry from pydantic import BaseModel +from schema_registry.container import SchemaRegistryContainer class HealthStatus(BaseModel): @@ -33,8 +35,9 @@ class HealthCheck(BaseModel): @health_router.get("") +@inject async def health( - schema_registry: SchemaRegistryDep, + schema_registry: KarapaceSchemaRegistry = Depends(Provide[SchemaRegistryContainer.karapace_container.schema_registry]), ) -> HealthCheck: starttime = 0.0 if schema_registry.schema_reader.ready: diff --git a/src/schema_registry/routers/metrics.py b/src/schema_registry/routers/metrics.py new file mode 100644 index 000000000..23b4b39f8 --- /dev/null +++ b/src/schema_registry/routers/metrics.py @@ -0,0 +1,24 @@ +""" +Copyright (c) 2024 Aiven Ltd +See LICENSE for details +""" + +from dependency_injector.wiring import inject, Provide +from fastapi import APIRouter, Depends, Response +from karapace.instrumentation.prometheus import PrometheusInstrumentation +from pydantic import BaseModel +from schema_registry.container import SchemaRegistryContainer + +metrics_router = APIRouter( + prefix=PrometheusInstrumentation.METRICS_ENDPOINT_PATH, + tags=["metrics"], + responses={404: {"description": "Not found"}}, +) + + +@metrics_router.get("") +@inject +async def metrics( + prometheus: PrometheusInstrumentation = Depends(Provide[SchemaRegistryContainer.karapace_container.prometheus]), +) -> BaseModel: + return Response(content=await prometheus.serve_metrics(), media_type=prometheus.CONTENT_TYPE_LATEST) diff --git a/src/schema_registry/routers/mode.py b/src/schema_registry/routers/mode.py new file mode 100644 index 000000000..45e0d1479 --- /dev/null +++ b/src/schema_registry/routers/mode.py @@ -0,0 +1,46 @@ +""" +Copyright (c) 2024 Aiven Ltd +See LICENSE for details +""" + +from dependency_injector.wiring import inject, Provide +from fastapi import APIRouter, Depends +from karapace.auth import AuthenticatorAndAuthorizer, Operation, User +from karapace.typing import Subject +from schema_registry.container import SchemaRegistryContainer +from schema_registry.routers.errors import unauthorized +from schema_registry.schema_registry_apis import KarapaceSchemaRegistryController +from schema_registry.user import get_current_user +from typing import Annotated + +mode_router = APIRouter( + prefix="/mode", + tags=["mode"], + responses={404: {"description": "Not found"}}, +) + + +@mode_router.get("") +@inject +async def mode_get( + user: Annotated[User, Depends(get_current_user)], + authorizer: AuthenticatorAndAuthorizer = Depends(Provide[SchemaRegistryContainer.karapace_container.authorizer]), + controller: KarapaceSchemaRegistryController = Depends(Provide[SchemaRegistryContainer.schema_registry_controller]), +): + if authorizer and not authorizer.check_authorization(user, Operation.Read, "Config:"): + raise unauthorized() + + return await controller.get_global_mode() + + +@mode_router.get("/{subject}") +async def mode_get_subject( + subject: Subject, + user: Annotated[User, Depends(get_current_user)], + authorizer: AuthenticatorAndAuthorizer = Depends(Provide[SchemaRegistryContainer.karapace_container.authorizer]), + controller: KarapaceSchemaRegistryController = Depends(Provide[SchemaRegistryContainer.schema_registry_controller]), +): + if authorizer and not authorizer.check_authorization(user, Operation.Read, f"Subject:{subject}"): + raise unauthorized() + + return await controller.get_subject_mode(subject=subject) diff --git a/src/schema_registry/routers/mode_router.py b/src/schema_registry/routers/mode_router.py deleted file mode 100644 index 5b6fca0c9..000000000 --- a/src/schema_registry/routers/mode_router.py +++ /dev/null @@ -1,42 +0,0 @@ -""" -Copyright (c) 2024 Aiven Ltd -See LICENSE for details -""" - -from fastapi import APIRouter -from karapace.auth.auth import Operation -from karapace.auth.dependencies import AuthenticatorAndAuthorizerDep, CurrentUserDep -from schema_registry.dependencies.controller_dependency import KarapaceSchemaRegistryControllerDep -from karapace.typing import Subject -from schema_registry.routers.errors import unauthorized - -mode_router = APIRouter( - prefix="/mode", - tags=["mode"], - responses={404: {"description": "Not found"}}, -) - - -@mode_router.get("") -async def mode_get( - controller: KarapaceSchemaRegistryControllerDep, - user: CurrentUserDep, - authorizer: AuthenticatorAndAuthorizerDep, -): - if authorizer and not authorizer.check_authorization(user, Operation.Read, "Config:"): - raise unauthorized() - - return await controller.get_global_mode() - - -@mode_router.get("/{subject}") -async def mode_get_subject( - controller: KarapaceSchemaRegistryControllerDep, - user: CurrentUserDep, - authorizer: AuthenticatorAndAuthorizerDep, - subject: Subject, -): - if authorizer and not authorizer.check_authorization(user, Operation.Read, f"Subject:{subject}"): - raise unauthorized() - - return await controller.get_subject_mode(subject=subject) diff --git a/src/schema_registry/routers/root_router.py b/src/schema_registry/routers/root.py similarity index 100% rename from src/schema_registry/routers/root_router.py rename to src/schema_registry/routers/root.py diff --git a/src/schema_registry/routers/schemas_router.py b/src/schema_registry/routers/schemas.py similarity index 57% rename from src/schema_registry/routers/schemas_router.py rename to src/schema_registry/routers/schemas.py index 048d52e15..d7af4cd2b 100644 --- a/src/schema_registry/routers/schemas_router.py +++ b/src/schema_registry/routers/schemas.py @@ -3,10 +3,14 @@ See LICENSE for details """ -from fastapi import APIRouter -from karapace.auth.dependencies import AuthenticatorAndAuthorizerDep, CurrentUserDep -from schema_registry.dependencies.controller_dependency import KarapaceSchemaRegistryControllerDep +from dependency_injector.wiring import inject, Provide +from fastapi import APIRouter, Depends +from karapace.auth import AuthenticatorAndAuthorizer, User +from schema_registry.container import SchemaRegistryContainer from schema_registry.routers.requests import SchemaListingItem, SchemasResponse, SubjectVersion +from schema_registry.schema_registry_apis import KarapaceSchemaRegistryController +from schema_registry.user import get_current_user +from typing import Annotated schemas_router = APIRouter( prefix="/schemas", @@ -17,12 +21,13 @@ # TODO is this needed? Is this actually the ids/schema/id/schema?? @schemas_router.get("") +@inject async def schemas_get_list( - controller: KarapaceSchemaRegistryControllerDep, - user: CurrentUserDep, - authorizer: AuthenticatorAndAuthorizerDep, + user: Annotated[User, Depends(get_current_user)], deleted: bool = False, latestOnly: bool = False, + authorizer: AuthenticatorAndAuthorizer = Depends(Provide[SchemaRegistryContainer.karapace_container.authorizer]), + controller: KarapaceSchemaRegistryController = Depends(Provide[SchemaRegistryContainer.schema_registry_controller]), ) -> list[SchemaListingItem]: return await controller.schemas_list( deleted=deleted, @@ -33,14 +38,15 @@ async def schemas_get_list( @schemas_router.get("/ids/{schema_id}", response_model_exclude_none=True) +@inject async def schemas_get( - controller: KarapaceSchemaRegistryControllerDep, - user: CurrentUserDep, - authorizer: AuthenticatorAndAuthorizerDep, + user: Annotated[User, Depends(get_current_user)], schema_id: str, # TODO: type to actual type includeSubjects: bool = False, # TODO: include subjects? fetchMaxId: bool = False, # TODO: fetch max id? format: str = "", + authorizer: AuthenticatorAndAuthorizer = Depends(Provide[SchemaRegistryContainer.karapace_container.authorizer]), + controller: KarapaceSchemaRegistryController = Depends(Provide[SchemaRegistryContainer.schema_registry_controller]), ) -> SchemasResponse: return await controller.schemas_get( schema_id=schema_id, @@ -61,12 +67,13 @@ async def schemas_get( @schemas_router.get("/ids/{schema_id}/versions") +@inject async def schemas_get_versions( - controller: KarapaceSchemaRegistryControllerDep, - user: CurrentUserDep, - authorizer: AuthenticatorAndAuthorizerDep, + user: Annotated[User, Depends(get_current_user)], schema_id: str, deleted: bool = False, + authorizer: AuthenticatorAndAuthorizer = Depends(Provide[SchemaRegistryContainer.karapace_container.authorizer]), + controller: KarapaceSchemaRegistryController = Depends(Provide[SchemaRegistryContainer.schema_registry_controller]), ) -> list[SubjectVersion]: return await controller.schemas_get_versions( schema_id=schema_id, @@ -77,7 +84,8 @@ async def schemas_get_versions( @schemas_router.get("/types") +@inject async def schemas_get_subjects_list( - controller: KarapaceSchemaRegistryControllerDep, + controller: KarapaceSchemaRegistryController = Depends(Provide[SchemaRegistryContainer.schema_registry_controller]), ) -> list[str]: return await controller.schemas_types() diff --git a/src/schema_registry/routers/setup.py b/src/schema_registry/routers/setup.py new file mode 100644 index 000000000..fe0b6be9b --- /dev/null +++ b/src/schema_registry/routers/setup.py @@ -0,0 +1,25 @@ +""" +Copyright (c) 2024 Aiven Ltd +See LICENSE for details +""" + +from fastapi import FastAPI +from schema_registry.routers.compatibility import compatibility_router +from schema_registry.routers.config import config_router +from schema_registry.routers.health import health_router +from schema_registry.routers.metrics import metrics_router +from schema_registry.routers.mode import mode_router +from schema_registry.routers.root import root_router +from schema_registry.routers.schemas import schemas_router +from schema_registry.routers.subjects import subjects_router + + +def setup_routers(app: FastAPI) -> None: + app.include_router(compatibility_router) + app.include_router(config_router) + app.include_router(health_router) + app.include_router(mode_router) + app.include_router(root_router) + app.include_router(schemas_router) + app.include_router(subjects_router) + app.include_router(metrics_router) diff --git a/src/schema_registry/routers/subjects_router.py b/src/schema_registry/routers/subjects.py similarity index 56% rename from src/schema_registry/routers/subjects_router.py rename to src/schema_registry/routers/subjects.py index 29e58e840..766329795 100644 --- a/src/schema_registry/routers/subjects_router.py +++ b/src/schema_registry/routers/subjects.py @@ -3,15 +3,18 @@ See LICENSE for details """ -from fastapi import APIRouter, Request -from karapace.auth.auth import Operation -from karapace.auth.dependencies import AuthenticatorAndAuthorizerDep, CurrentUserDep -from schema_registry.dependencies.controller_dependency import KarapaceSchemaRegistryControllerDep -from schema_registry.dependencies.forward_client_dependency import ForwardClientDep -from schema_registry.dependencies.schema_registry_dependency import SchemaRegistryDep +from dependency_injector.wiring import inject, Provide +from fastapi import APIRouter, Depends, Request +from karapace.auth import AuthenticatorAndAuthorizer, Operation, User +from karapace.forward_client import ForwardClient +from karapace.schema_registry import KarapaceSchemaRegistry from karapace.typing import Subject +from schema_registry.container import SchemaRegistryContainer from schema_registry.routers.errors import no_primary_url_error, unauthorized from schema_registry.routers.requests import SchemaIdResponse, SchemaRequest, SchemaResponse, SubjectSchemaVersionResponse +from schema_registry.schema_registry_apis import KarapaceSchemaRegistryController +from schema_registry.user import get_current_user +from typing import Annotated import logging @@ -26,11 +29,12 @@ @subjects_router.get("") +@inject async def subjects_get( - controller: KarapaceSchemaRegistryControllerDep, - user: CurrentUserDep, - authorizer: AuthenticatorAndAuthorizerDep, + user: Annotated[User, Depends(get_current_user)], deleted: bool = False, + authorizer: AuthenticatorAndAuthorizer = Depends(Provide[SchemaRegistryContainer.karapace_container.authorizer]), + controller: KarapaceSchemaRegistryController = Depends(Provide[SchemaRegistryContainer.schema_registry_controller]), ) -> list[str]: return await controller.subjects_list( deleted=deleted, @@ -40,14 +44,15 @@ async def subjects_get( @subjects_router.post("/{subject}", response_model_exclude_none=True) +@inject async def subjects_subject_post( - controller: KarapaceSchemaRegistryControllerDep, - user: CurrentUserDep, - authorizer: AuthenticatorAndAuthorizerDep, subject: Subject, + user: Annotated[User, Depends(get_current_user)], schema_request: SchemaRequest, deleted: bool = False, normalize: bool = False, + authorizer: AuthenticatorAndAuthorizer = Depends(Provide[SchemaRegistryContainer.karapace_container.authorizer]), + controller: KarapaceSchemaRegistryController = Depends(Provide[SchemaRegistryContainer.schema_registry_controller]), ) -> SchemaResponse: if authorizer and not authorizer.check_authorization(user, Operation.Read, f"Subject:{subject}"): raise unauthorized() @@ -61,15 +66,16 @@ async def subjects_subject_post( @subjects_router.delete("/{subject}") +@inject async def subjects_subject_delete( request: Request, - controller: KarapaceSchemaRegistryControllerDep, - schema_registry: SchemaRegistryDep, - forward_client: ForwardClientDep, - user: CurrentUserDep, - authorizer: AuthenticatorAndAuthorizerDep, subject: Subject, + user: Annotated[User, Depends(get_current_user)], permanent: bool = False, + forward_client: ForwardClient = Depends(Provide[SchemaRegistryContainer.karapace_container.forward_client]), + authorizer: AuthenticatorAndAuthorizer = Depends(Provide[SchemaRegistryContainer.karapace_container.authorizer]), + schema_registry: KarapaceSchemaRegistry = Depends(Provide[SchemaRegistryContainer.karapace_container.schema_registry]), + controller: KarapaceSchemaRegistryController = Depends(Provide[SchemaRegistryContainer.schema_registry_controller]), ) -> list[int]: if authorizer and not authorizer.check_authorization(user, Operation.Write, f"Subject:{subject}"): raise unauthorized() @@ -84,15 +90,16 @@ async def subjects_subject_delete( @subjects_router.post("/{subject}/versions") +@inject async def subjects_subject_versions_post( request: Request, - controller: KarapaceSchemaRegistryControllerDep, - forward_client: ForwardClientDep, - user: CurrentUserDep, - authorizer: AuthenticatorAndAuthorizerDep, subject: Subject, schema_request: SchemaRequest, + user: Annotated[User, Depends(get_current_user)], + forward_client: ForwardClient = Depends(Provide[SchemaRegistryContainer.karapace_container.forward_client]), + authorizer: AuthenticatorAndAuthorizer = Depends(Provide[SchemaRegistryContainer.karapace_container.authorizer]), normalize: bool = False, + controller: KarapaceSchemaRegistryController = Depends(Provide[SchemaRegistryContainer.schema_registry_controller]), ) -> SchemaIdResponse: if authorizer and not authorizer.check_authorization(user, Operation.Write, f"Subject:{subject}"): raise unauthorized() @@ -109,12 +116,13 @@ async def subjects_subject_versions_post( @subjects_router.get("/{subject}/versions") +@inject async def subjects_subject_versions_list( - controller: KarapaceSchemaRegistryControllerDep, - user: CurrentUserDep, - authorizer: AuthenticatorAndAuthorizerDep, subject: Subject, + user: Annotated[User, Depends(get_current_user)], deleted: bool = False, + authorizer: AuthenticatorAndAuthorizer = Depends(Provide[SchemaRegistryContainer.karapace_container.authorizer]), + controller: KarapaceSchemaRegistryController = Depends(Provide[SchemaRegistryContainer.schema_registry_controller]), ) -> list[int]: if authorizer and not authorizer.check_authorization(user, Operation.Read, f"Subject:{subject}"): raise unauthorized() @@ -123,13 +131,14 @@ async def subjects_subject_versions_list( @subjects_router.get("/{subject}/versions/{version}", response_model_exclude_none=True) +@inject async def subjects_subject_version_get( - controller: KarapaceSchemaRegistryControllerDep, - user: CurrentUserDep, - authorizer: AuthenticatorAndAuthorizerDep, subject: Subject, version: str, + user: Annotated[User, Depends(get_current_user)], deleted: bool = False, + authorizer: AuthenticatorAndAuthorizer = Depends(Provide[SchemaRegistryContainer.karapace_container.authorizer]), + controller: KarapaceSchemaRegistryController = Depends(Provide[SchemaRegistryContainer.schema_registry_controller]), ) -> SubjectSchemaVersionResponse: if authorizer and not authorizer.check_authorization(user, Operation.Read, f"Subject:{subject}"): raise unauthorized() @@ -138,16 +147,17 @@ async def subjects_subject_version_get( @subjects_router.delete("/{subject}/versions/{version}") +@inject async def subjects_subject_version_delete( request: Request, - controller: KarapaceSchemaRegistryControllerDep, - schema_registry: SchemaRegistryDep, - forward_client: ForwardClientDep, - user: CurrentUserDep, - authorizer: AuthenticatorAndAuthorizerDep, subject: Subject, version: str, + user: Annotated[User, Depends(get_current_user)], permanent: bool = False, + forward_client: ForwardClient = Depends(Provide[SchemaRegistryContainer.karapace_container.forward_client]), + authorizer: AuthenticatorAndAuthorizer = Depends(Provide[SchemaRegistryContainer.karapace_container.authorizer]), + schema_registry: KarapaceSchemaRegistry = Depends(Provide[SchemaRegistryContainer.karapace_container.schema_registry]), + controller: KarapaceSchemaRegistryController = Depends(Provide[SchemaRegistryContainer.schema_registry_controller]), ) -> int: if authorizer and not authorizer.check_authorization(user, Operation.Write, f"Subject:{subject}"): raise unauthorized() @@ -162,12 +172,13 @@ async def subjects_subject_version_delete( @subjects_router.get("/{subject}/versions/{version}/schema") +@inject async def subjects_subject_version_schema_get( - controller: KarapaceSchemaRegistryControllerDep, - user: CurrentUserDep, - authorizer: AuthenticatorAndAuthorizerDep, subject: Subject, version: str, + user: Annotated[User, Depends(get_current_user)], + authorizer: AuthenticatorAndAuthorizer = Depends(Provide[SchemaRegistryContainer.karapace_container.authorizer]), + controller: KarapaceSchemaRegistryController = Depends(Provide[SchemaRegistryContainer.schema_registry_controller]), ) -> dict: if authorizer and not authorizer.check_authorization(user, Operation.Read, f"Subject:{subject}"): raise unauthorized() @@ -176,12 +187,13 @@ async def subjects_subject_version_schema_get( @subjects_router.get("/{subject}/versions/{version}/referencedby") +@inject async def subjects_subject_version_referenced_by( - controller: KarapaceSchemaRegistryControllerDep, - user: CurrentUserDep, - authorizer: AuthenticatorAndAuthorizerDep, subject: Subject, version: str, + user: Annotated[User, Depends(get_current_user)], + authorizer: AuthenticatorAndAuthorizer = Depends(Provide[SchemaRegistryContainer.karapace_container.authorizer]), + controller: KarapaceSchemaRegistryController = Depends(Provide[SchemaRegistryContainer.schema_registry_controller]), ) -> list[int]: if authorizer and not authorizer.check_authorization(user, Operation.Read, f"Subject:{subject}"): raise unauthorized() diff --git a/src/schema_registry/schema_registry_apis.py b/src/schema_registry/schema_registry_apis.py index a11e29be2..cc9a01bb2 100644 --- a/src/schema_registry/schema_registry_apis.py +++ b/src/schema_registry/schema_registry_apis.py @@ -5,13 +5,14 @@ from __future__ import annotations from avro.errors import SchemaParseException -from fastapi import HTTPException, Request, Response, status -from karapace.auth.auth import Operation, User -from karapace.auth.dependencies import AuthenticatorAndAuthorizerDep +from dependency_injector.wiring import inject, Provide +from fastapi import Depends, HTTPException, Request, Response, status +from karapace.auth import AuthenticatorAndAuthorizer, Operation, User from karapace.compatibility import CompatibilityModes from karapace.compatibility.jsonschema.checks import is_incompatible from karapace.compatibility.schema_compatibility import SchemaCompatibility from karapace.config import Config +from karapace.container import KarapaceContainer from karapace.errors import ( IncompatibleSchema, InvalidReferences, @@ -64,6 +65,9 @@ class KarapaceSchemaRegistryController: def __init__(self, config: Config, schema_registry: KarapaceSchemaRegistry, stats: StatsClient) -> None: # super().__init__(config=config, not_ready_handler=self._forward_if_not_ready_to_serve) + print("+++++++++========") + print(schema_registry) + self.config = config self._process_start_time = time.monotonic() self.stats = stats @@ -141,13 +145,14 @@ async def compatibility_check( return CompatibilityCheckResponse(is_compatible=False, messages=list(result.messages)) return CompatibilityCheckResponse(is_compatible=True) + @inject async def schemas_list( self, *, deleted: bool, latest_only: bool, user: User | None, - authorizer: AuthenticatorAndAuthorizerDep | None, + authorizer: AuthenticatorAndAuthorizer = Depends(Provide[KarapaceContainer.authorizer]), ) -> list[SchemaListingItem]: schemas = await self.schema_registry.schemas_list(include_deleted=deleted, latest_only=latest_only) response_schemas: list[SchemaListingItem] = [] @@ -171,6 +176,7 @@ async def schemas_list( return response_schemas + @inject async def schemas_get( self, *, @@ -179,7 +185,7 @@ async def schemas_get( include_subjects: bool, format_serialized: str, user: User | None, - authorizer: AuthenticatorAndAuthorizerDep, + authorizer: AuthenticatorAndAuthorizer = Depends(Provide[KarapaceContainer.authorizer]), ) -> SchemasResponse: try: parsed_schema_id = SchemaId(int(schema_id)) @@ -213,6 +219,8 @@ def _has_subject_with_id() -> bool: ) schema = self.schema_registry.schemas_get(parsed_schema_id, fetch_max_id=fetch_max_id) + print("+++++++++========") + print(schema) if not schema: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, @@ -249,13 +257,14 @@ def _has_subject_with_id() -> bool: maxId=maxId, ) + @inject async def schemas_get_versions( self, *, schema_id: str, deleted: bool, user: User | None, - authorizer: AuthenticatorAndAuthorizerDep, + authorizer: AuthenticatorAndAuthorizer = Depends(Provide[KarapaceContainer.authorizer]), ) -> list[SubjectVersion]: try: schema_id_int = SchemaId(int(schema_id)) @@ -370,11 +379,12 @@ async def config_subject_delete( self.schema_registry.send_config_subject_delete_message(subject=Subject(subject)) return CompatibilityResponse(compatibility=self.schema_registry.schema_reader.config.compatibility) + @inject async def subjects_list( self, deleted: bool, user: User | None, - authorizer: AuthenticatorAndAuthorizerDep | None, + authorizer: AuthenticatorAndAuthorizer = Depends(Provide[KarapaceContainer.authorizer]), ) -> list[str]: subjects = [str(subject) for subject in self.schema_registry.database.find_subjects(include_deleted=deleted)] if authorizer: diff --git a/src/schema_registry/user.py b/src/schema_registry/user.py new file mode 100644 index 000000000..0155c4665 --- /dev/null +++ b/src/schema_registry/user.py @@ -0,0 +1,35 @@ +""" +Copyright (c) 2024 Aiven Ltd +See LICENSE for details +""" + +from dependency_injector.wiring import inject, Provide +from fastapi import Depends, HTTPException, status +from fastapi.security import HTTPBasicCredentials +from karapace.auth import AuthenticationError, AuthenticatorAndAuthorizer, User +from schema_registry.container import SchemaRegistryContainer + + +@inject +async def get_current_user( + credentials: HTTPBasicCredentials | None, + authorizer: AuthenticatorAndAuthorizer = Depends(Provide[SchemaRegistryContainer.karapace_container.authorizer]), +) -> User: + if authorizer and not credentials: + raise HTTPException( + status_code=status.HTTP_401_UNAUTHORIZED, + detail={"message": "Unauthorized"}, + headers={"WWW-Authenticate": 'Basic realm="Karapace Schema Registry"'}, + ) + assert authorizer is not None + assert credentials is not None + username: str = credentials.username + password: str = credentials.password + try: + return authorizer.authenticate(username=username, password=password) + except AuthenticationError as exc: + raise HTTPException( + status_code=status.HTTP_401_UNAUTHORIZED, + detail={"message": "Unauthorized"}, + headers={"WWW-Authenticate": 'Basic realm="Karapace Schema Registry"'}, + ) from exc diff --git a/tests/conftest.py b/tests/conftest.py index d62663633..91fb0b02d 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -3,7 +3,10 @@ See LICENSE for details """ from avro.compatibility import SchemaCompatibilityResult +from karapace.config import KARAPACE_BASE_CONFIG_YAML_PATH +from karapace.container import KarapaceContainer from pathlib import Path +from schema_registry.container import SchemaRegistryContainer from tempfile import mkstemp from typing import Optional @@ -179,3 +182,15 @@ def fixture_tmp_file(): path = Path(str_path) yield path path.unlink() + + +@pytest.fixture(name="karapace_container", scope="session") +def fixture_karapace_container() -> KarapaceContainer: + container = KarapaceContainer() + container.base_config.from_yaml(KARAPACE_BASE_CONFIG_YAML_PATH, envs_required=True, required=True) + return container + + +@pytest.fixture +def schema_registry_container(karapace_container: KarapaceContainer) -> SchemaRegistryContainer: + return SchemaRegistryContainer(karapace_container=karapace_container) diff --git a/tests/integration/conftest.py b/tests/integration/conftest.py index bbfb05b1f..55edc0559 100644 --- a/tests/integration/conftest.py +++ b/tests/integration/conftest.py @@ -152,8 +152,8 @@ def create_kafka_server( data_dir = session_datadir / "kafka" log_dir = session_logdir / "kafka" - data_dir.mkdir(parents=True) - log_dir.mkdir(parents=True) + data_dir.mkdir(parents=True, exist_ok=True) + log_dir.mkdir(parents=True, exist_ok=True) kafka_config = KafkaConfig( datadir=str(data_dir), logdir=str(log_dir), diff --git a/tests/integration/utils/zookeeper.py b/tests/integration/utils/zookeeper.py index 5dffcfeca..1ffb798db 100644 --- a/tests/integration/utils/zookeeper.py +++ b/tests/integration/utils/zookeeper.py @@ -25,7 +25,7 @@ def configure_and_start_zk(config: ZKConfig, kafka_description: KafkaDescription zk_dir = Path(config.path) cfg_path = zk_dir / "zoo.cfg" logs_dir = zk_dir / "logs" - logs_dir.mkdir(parents=True) + logs_dir.mkdir(parents=True, exist_ok=True) zoo_cfg = { # Number of milliseconds of each tick diff --git a/tests/unit/backup/test_api.py b/tests/unit/backup/test_api.py index 983beb786..3df4d028d 100644 --- a/tests/unit/backup/test_api.py +++ b/tests/unit/backup/test_api.py @@ -5,7 +5,6 @@ from __future__ import annotations from aiokafka.errors import KafkaError, TopicAlreadyExistsError -from karapace import config from karapace.backup.api import ( _admin, _consumer, @@ -22,6 +21,7 @@ from karapace.backup.errors import BackupError, PartitionCountError from karapace.config import Config from karapace.constants import DEFAULT_SCHEMA_TOPIC +from karapace.container import KarapaceContainer from karapace.kafka.consumer import KafkaConsumer, PartitionMetadata from karapace.kafka.producer import KafkaProducer from pathlib import Path @@ -41,10 +41,12 @@ class TestAdmin: @mock.patch("time.sleep", autospec=True) @patch_admin_new - def test_retries_on_kafka_error(self, admin_new: MagicMock, sleep_mock: MagicMock) -> None: + def test_retries_on_kafka_error( + self, admin_new: MagicMock, sleep_mock: MagicMock, karapace_container: KarapaceContainer + ) -> None: admin_mock = admin_new.return_value admin_new.side_effect = [KafkaError("1"), KafkaError("2"), admin_mock] - with _admin(config.DEFAULTS) as admin: + with _admin(karapace_container.config()) as admin: assert admin is admin_mock assert sleep_mock.call_count == 2 # proof that we waited between retries @@ -56,41 +58,48 @@ def test_reraises_unknown_exceptions( admin_new: MagicMock, sleep_mock: MagicMock, e: type[BaseException], + karapace_container: KarapaceContainer, ) -> None: admin_new.side_effect = e - with pytest.raises(e), _admin(config.DEFAULTS): + with pytest.raises(e), _admin(karapace_container.config()): pass assert sleep_mock.call_count == 0 # proof that we did not retry class TestHandleRestoreTopic: @patch_admin_new - def test_calls_admin_create_topics(self, admin_new: MagicMock) -> None: + def test_calls_admin_create_topics(self, admin_new: MagicMock, karapace_container: KarapaceContainer) -> None: new_topic: MagicMock = admin_new.return_value.new_topic topic_configs = {"cleanup.policy": "compact"} - _maybe_create_topic(DEFAULT_SCHEMA_TOPIC, config=config.DEFAULTS, replication_factor=1, topic_configs=topic_configs) + _maybe_create_topic( + DEFAULT_SCHEMA_TOPIC, config=karapace_container.config(), replication_factor=1, topic_configs=topic_configs + ) new_topic.assert_called_once_with( DEFAULT_SCHEMA_TOPIC, num_partitions=1, - replication_factor=config.DEFAULTS["replication_factor"], + replication_factor=karapace_container.config().replication_factor, config=topic_configs, ) @patch_admin_new - def test_gracefully_handles_topic_already_exists_error(self, admin_new: MagicMock) -> None: + def test_gracefully_handles_topic_already_exists_error( + self, admin_new: MagicMock, karapace_container: KarapaceContainer + ) -> None: new_topic: MagicMock = admin_new.return_value.new_topic new_topic.side_effect = TopicAlreadyExistsError() - _maybe_create_topic(DEFAULT_SCHEMA_TOPIC, config=config.DEFAULTS, replication_factor=1, topic_configs={}) + _maybe_create_topic(DEFAULT_SCHEMA_TOPIC, config=karapace_container.config(), replication_factor=1, topic_configs={}) new_topic.assert_called_once() @patch_admin_new - def test_retries_for_kafka_errors(self, admin_new: MagicMock) -> None: + def test_retries_for_kafka_errors(self, admin_new: MagicMock, karapace_container: KarapaceContainer) -> None: new_topic: MagicMock = admin_new.return_value.new_topic new_topic.side_effect = [KafkaError("1"), KafkaError("2"), None] with mock.patch("time.sleep", autospec=True): - _maybe_create_topic(DEFAULT_SCHEMA_TOPIC, config=config.DEFAULTS, replication_factor=1, topic_configs={}) + _maybe_create_topic( + DEFAULT_SCHEMA_TOPIC, config=karapace_container.config(), replication_factor=1, topic_configs={} + ) assert new_topic.call_count == 3 @@ -98,17 +107,19 @@ def test_retries_for_kafka_errors(self, admin_new: MagicMock) -> None: def test_noop_for_custom_name_on_legacy_versions( self, admin_new: MagicMock, + karapace_container: KarapaceContainer, ) -> None: new_topic: MagicMock = admin_new.return_value.new_topic assert "custom-name" != DEFAULT_SCHEMA_TOPIC instruction = RestoreTopicLegacy(topic_name="custom-name", partition_count=1) - _handle_restore_topic_legacy(instruction, config.DEFAULTS) + _handle_restore_topic_legacy(instruction, karapace_container.config()) new_topic.assert_not_called() @patch_admin_new def test_allows_custom_name_on_v3( self, admin_new: MagicMock, + karapace_container: KarapaceContainer, ) -> None: new_topic: MagicMock = admin_new.return_value.new_topic topic_name = "custom-name" @@ -117,7 +128,7 @@ def test_allows_custom_name_on_v3( instruction = RestoreTopic( topic_name="custom-name", partition_count=1, replication_factor=2, topic_configs=topic_configs ) - _handle_restore_topic(instruction, config.DEFAULTS) + _handle_restore_topic(instruction, karapace_container.config()) new_topic.assert_called_once_with(topic_name, num_partitions=1, replication_factor=2, config=topic_configs) @@ -125,11 +136,12 @@ def test_allows_custom_name_on_v3( def test_skip_topic_creation( self, admin_new: MagicMock, + karapace_container: KarapaceContainer, ) -> None: new_topic: MagicMock = admin_new.return_value.new_topic _handle_restore_topic( RestoreTopic(topic_name="custom-name", partition_count=1, replication_factor=2, topic_configs={}), - config.DEFAULTS, + karapace_container.config(), skip_topic_creation=True, ) _handle_restore_topic_legacy( @@ -137,7 +149,7 @@ def test_skip_topic_creation( topic_name="custom-name", partition_count=1, ), - config.DEFAULTS, + karapace_container.config(), skip_topic_creation=True, ) @@ -171,11 +183,12 @@ def test_auto_closing( client_class: type[KafkaConsumer | KafkaProducer], partitions_method: FunctionType, close_method_name: str, + karapace_container: KarapaceContainer, ) -> None: with mock.patch(f"{client_class.__module__}.{client_class.__qualname__}.__new__", autospec=True) as client_ctor: client_mock = client_ctor.return_value getattr(client_mock, partitions_method.__name__).return_value = self._partition_metadata() - with ctx_mng(config.DEFAULTS, "topic") as client: + with ctx_mng(karapace_container.config(), "topic") as client: assert client is client_mock assert getattr(client_mock, close_method_name).call_count == 1 @@ -194,12 +207,13 @@ def test_raises_partition_count_error_for_unexpected_count( partitions_method: FunctionType, partition_count: int, close_method_name: str, + karapace_container: KarapaceContainer, ) -> None: with mock.patch(f"{client_class.__module__}.{client_class.__qualname__}.__new__", autospec=True) as client_ctor: client_mock = client_ctor.return_value getattr(client_mock, partitions_method.__name__).return_value = self._partition_metadata(partition_count) with pytest.raises(PartitionCountError): - with ctx_mng(config.DEFAULTS, "topic") as client: + with ctx_mng(karapace_container.config(), "topic") as client: assert client == client_mock assert getattr(client_mock, close_method_name).call_count == 1 @@ -271,6 +285,6 @@ def test_returns_option_if_given(self) -> None: fake_config = cast(Config, {}) assert normalize_topic_name("some-topic", fake_config) == "some-topic" - def test_defaults_to_config(self) -> None: - fake_config = cast(Config, {"topic_name": "default-topic"}) + def test_defaults_to_config(self, karapace_container: KarapaceContainer) -> None: + fake_config = karapace_container.config().set_config_defaults({"topic_name": "default-topic"}) assert normalize_topic_name(None, fake_config) == "default-topic" diff --git a/tests/unit/kafka_rest_apis/test_rest_proxy_cluster_metadata_cache.py b/tests/unit/kafka_rest_apis/test_rest_proxy_cluster_metadata_cache.py index b47fb5e02..d1227fbc2 100644 --- a/tests/unit/kafka_rest_apis/test_rest_proxy_cluster_metadata_cache.py +++ b/tests/unit/kafka_rest_apis/test_rest_proxy_cluster_metadata_cache.py @@ -3,7 +3,8 @@ Copyright (c) 2024 Aiven Ltd See LICENSE for details """ -from karapace.config import DEFAULTS + +from karapace.container import KarapaceContainer from karapace.kafka_rest_apis import UserRestProxy from karapace.serialization import SchemaRegistrySerializer from unittest.mock import patch @@ -11,10 +12,10 @@ import copy -def user_rest_proxy(max_age_metadata: int = 5) -> UserRestProxy: - configs = {**DEFAULTS, **{"admin_metadata_max_age": max_age_metadata}} - serializer = SchemaRegistrySerializer(configs) - return UserRestProxy(configs, 1, serializer, auth_expiry=None, verify_connection=False) +def user_rest_proxy(karapace_container: KarapaceContainer, max_age_metadata: int = 5) -> UserRestProxy: + config = karapace_container.config().set_config_defaults({"admin_metadata_max_age": max_age_metadata}) + serializer = SchemaRegistrySerializer(config=config) + return UserRestProxy(config, 1, serializer, auth_expiry=None, verify_connection=False) EMPTY_REPLY = { @@ -158,8 +159,8 @@ def user_rest_proxy(max_age_metadata: int = 5) -> UserRestProxy: } -async def test_cache_is_evicted_after_expiration_global_initially() -> None: - proxy = user_rest_proxy() +async def test_cache_is_evicted_after_expiration_global_initially(karapace_container: KarapaceContainer) -> None: + proxy = user_rest_proxy(karapace_container) with patch( "karapace.kafka.admin.KafkaAdminClient.cluster_metadata", return_value=EMPTY_REPLY ) as mocked_cluster_metadata: @@ -167,8 +168,8 @@ async def test_cache_is_evicted_after_expiration_global_initially() -> None: mocked_cluster_metadata.assert_called_once_with(None) # "initially the metadata are always old" -async def test_no_topic_means_all_metadata() -> None: - proxy = user_rest_proxy() +async def test_no_topic_means_all_metadata(karapace_container: KarapaceContainer) -> None: + proxy = user_rest_proxy(karapace_container) with patch( "karapace.kafka.admin.KafkaAdminClient.cluster_metadata", return_value=EMPTY_REPLY ) as mocked_cluster_metadata: @@ -176,8 +177,8 @@ async def test_no_topic_means_all_metadata() -> None: mocked_cluster_metadata.assert_called_once_with(None) -async def test_cache_is_evicted_after_expiration_global() -> None: - proxy = user_rest_proxy(max_age_metadata=10) +async def test_cache_is_evicted_after_expiration_global(karapace_container: KarapaceContainer) -> None: + proxy = user_rest_proxy(karapace_container, max_age_metadata=10) proxy._global_metadata_birth = 0 with patch( "karapace.kafka.admin.KafkaAdminClient.cluster_metadata", return_value=EMPTY_REPLY @@ -187,8 +188,8 @@ async def test_cache_is_evicted_after_expiration_global() -> None: mocked_cluster_metadata.assert_called_once_with(None) # "metadata old require a refresh" -async def test_global_cache_is_used_for_single_topic() -> None: - proxy = user_rest_proxy(max_age_metadata=10) +async def test_global_cache_is_used_for_single_topic(karapace_container: KarapaceContainer) -> None: + proxy = user_rest_proxy(karapace_container, max_age_metadata=10) proxy._global_metadata_birth = 0 with patch( "karapace.kafka.admin.KafkaAdminClient.cluster_metadata", return_value=ALL_TOPIC_REQUEST @@ -214,8 +215,8 @@ async def test_global_cache_is_used_for_single_topic() -> None: ), "the result should still be cached since we marked it as ready at time 11 and we are at 14" -async def test_cache_is_evicted_if_one_topic_is_expired() -> None: - proxy = user_rest_proxy(max_age_metadata=10) +async def test_cache_is_evicted_if_one_topic_is_expired(karapace_container: KarapaceContainer) -> None: + proxy = user_rest_proxy(karapace_container, max_age_metadata=10) proxy._global_metadata_birth = 0 with patch( "karapace.kafka.admin.KafkaAdminClient.cluster_metadata", return_value=ALL_TOPIC_REQUEST @@ -234,8 +235,8 @@ async def test_cache_is_evicted_if_one_topic_is_expired() -> None: assert mocked_cluster_metadata.call_count == 1, "topic_b should be evicted" -async def test_cache_is_evicted_if_a_topic_was_never_queries() -> None: - proxy = user_rest_proxy(max_age_metadata=10) +async def test_cache_is_evicted_if_a_topic_was_never_queries(karapace_container: KarapaceContainer) -> None: + proxy = user_rest_proxy(karapace_container, max_age_metadata=10) proxy._global_metadata_birth = 0 with patch( "karapace.kafka.admin.KafkaAdminClient.cluster_metadata", return_value=ALL_TOPIC_REQUEST @@ -254,8 +255,8 @@ async def test_cache_is_evicted_if_a_topic_was_never_queries() -> None: assert mocked_cluster_metadata.call_count == 1, "topic_b is not present in the cache, should call the refresh" -async def test_cache_is_used_if_topic_requested_is_updated() -> None: - proxy = user_rest_proxy(max_age_metadata=10) +async def test_cache_is_used_if_topic_requested_is_updated(karapace_container: KarapaceContainer) -> None: + proxy = user_rest_proxy(karapace_container, max_age_metadata=10) proxy._global_metadata_birth = 0 with patch( "karapace.kafka.admin.KafkaAdminClient.cluster_metadata", return_value=TOPIC_REQUEST @@ -272,8 +273,8 @@ async def test_cache_is_used_if_topic_requested_is_updated() -> None: assert mocked_cluster_metadata.call_count == 0, "topic_a cache its present, should be used" -async def test_update_global_cache() -> None: - proxy = user_rest_proxy(max_age_metadata=10) +async def test_update_global_cache(karapace_container: KarapaceContainer) -> None: + proxy = user_rest_proxy(karapace_container, max_age_metadata=10) proxy._global_metadata_birth = 0 with patch( "karapace.kafka.admin.KafkaAdminClient.cluster_metadata", return_value=TOPIC_REQUEST @@ -292,8 +293,8 @@ async def test_update_global_cache() -> None: assert mocked_cluster_metadata.call_count == 0, "should call the server since the cache its expired" -async def test_update_topic_cache_do_not_evict_all_the_global_cache() -> None: - proxy = user_rest_proxy(max_age_metadata=10) +async def test_update_topic_cache_do_not_evict_all_the_global_cache(karapace_container: KarapaceContainer) -> None: + proxy = user_rest_proxy(karapace_container, max_age_metadata=10) proxy._global_metadata_birth = 0 proxy._cluster_metadata = ALL_TOPIC_REQUEST proxy._cluster_metadata_topic_birth = {"topic_a": 0, "topic_b": 200, "__consumer_offsets": 200} @@ -317,8 +318,10 @@ async def test_update_topic_cache_do_not_evict_all_the_global_cache() -> None: ), "we should call the server since the previous time of caching for the topic_a was 0" -async def test_update_local_cache_does_not_evict_all_the_global_cache_if_no_new_data() -> None: - proxy = user_rest_proxy(max_age_metadata=10) +async def test_update_local_cache_does_not_evict_all_the_global_cache_if_no_new_data( + karapace_container: KarapaceContainer, +) -> None: + proxy = user_rest_proxy(karapace_container, max_age_metadata=10) proxy._global_metadata_birth = 0 proxy._cluster_metadata_complete = True proxy._cluster_metadata = ALL_TOPIC_REQUEST @@ -346,8 +349,10 @@ async def test_update_local_cache_does_not_evict_all_the_global_cache_if_no_new_ ), "we should call the server since the previous time of caching for the topic_a was 0" -async def test_update_local_cache_not_evict_all_the_global_cache_if_changed_replica_data() -> None: - proxy = user_rest_proxy(max_age_metadata=10) +async def test_update_local_cache_not_evict_all_the_global_cache_if_changed_replica_data( + karapace_container: KarapaceContainer, +) -> None: + proxy = user_rest_proxy(karapace_container, max_age_metadata=10) proxy._global_metadata_birth = 0 proxy._cluster_metadata_complete = True proxy._cluster_metadata = ALL_TOPIC_REQUEST @@ -360,8 +365,10 @@ async def test_update_local_cache_not_evict_all_the_global_cache_if_changed_repl assert not proxy._cluster_metadata_complete, "new replica data incoming, should update the global metadata next!" -async def test_update_local_cache_not_evict_all_the_global_cache_if_new_topic_data() -> None: - proxy = user_rest_proxy(max_age_metadata=10) +async def test_update_local_cache_not_evict_all_the_global_cache_if_new_topic_data( + karapace_container: KarapaceContainer, +) -> None: + proxy = user_rest_proxy(karapace_container, max_age_metadata=10) proxy._global_metadata_birth = 0 proxy._cluster_metadata_complete = True proxy._cluster_metadata = ALL_TOPIC_REQUEST @@ -374,8 +381,10 @@ async def test_update_local_cache_not_evict_all_the_global_cache_if_new_topic_da assert not proxy._cluster_metadata_complete, "new topic data incoming, should update the global metadata next!" -async def test_update_local_cache_not_evict_all_the_global_cache_if_new_broker_data() -> None: - proxy = user_rest_proxy(max_age_metadata=10) +async def test_update_local_cache_not_evict_all_the_global_cache_if_new_broker_data( + karapace_container: KarapaceContainer, +) -> None: + proxy = user_rest_proxy(karapace_container, max_age_metadata=10) proxy._global_metadata_birth = 0 proxy._cluster_metadata_complete = True proxy._cluster_metadata = ALL_TOPIC_REQUEST diff --git a/tests/unit/protobuf/test_protoc.py b/tests/unit/protobuf/test_protoc.py index f044f1abe..d61648d9e 100644 --- a/tests/unit/protobuf/test_protoc.py +++ b/tests/unit/protobuf/test_protoc.py @@ -2,7 +2,7 @@ Copyright (c) 2023 Aiven Ltd See LICENSE for details """ -from karapace import config +from karapace.container import KarapaceContainer from karapace.protobuf.io import calculate_class_name from karapace.protobuf.kotlin_wrapper import trim_margin @@ -14,7 +14,7 @@ log = logging.getLogger(__name__) -def test_protoc() -> None: +def test_protoc(karapace_container: KarapaceContainer) -> None: proto: str = """ |syntax = "proto3"; |package com.instaclustr.protobuf; @@ -28,7 +28,7 @@ def test_protoc() -> None: """ proto = trim_margin(proto) - directory = config.DEFAULTS["protobuf_runtime_directory"] + directory = karapace_container.config().protobuf_runtime_directory proto_name = calculate_class_name(str(proto)) proto_path = f"{directory}/{proto_name}.proto" class_path = f"{directory}/{proto_name}_pb2.py" diff --git a/tests/unit/test_authentication.py b/tests/unit/test_authentication.py index 40abc5c01..9834865fb 100644 --- a/tests/unit/test_authentication.py +++ b/tests/unit/test_authentication.py @@ -4,8 +4,9 @@ """ from __future__ import annotations +from collections.abc import Mapping from http import HTTPStatus -from karapace.config import ConfigDefaults, set_config_defaults +from karapace.container import KarapaceContainer from karapace.kafka_rest_apis.authentication import ( get_auth_config_from_header, get_expiration_time_from_header, @@ -13,6 +14,7 @@ SimpleOauthTokenProvider, ) from karapace.rapu import HTTPResponse, JSON_CONTENT_TYPE +from typing import Any import base64 import datetime @@ -31,11 +33,11 @@ def _assert_unauthorized_http_response(http_response: HTTPResponse) -> None: "auth_header", (None, "Digest foo=bar"), ) -def test_get_auth_config_from_header_raises_unauthorized_on_invalid_header(auth_header: str | None) -> None: - config = set_config_defaults({}) - +def test_get_auth_config_from_header_raises_unauthorized_on_invalid_header( + karapace_container: KarapaceContainer, auth_header: str | None +) -> None: with pytest.raises(HTTPResponse) as exc_info: - get_auth_config_from_header(auth_header, config) + get_auth_config_from_header(auth_header, karapace_container.config()) _assert_unauthorized_http_response(exc_info.value) @@ -66,9 +68,12 @@ def test_get_auth_config_from_header_raises_unauthorized_on_invalid_header(auth_ ), ) def test_get_auth_config_from_header( - auth_header: str, config_override: ConfigDefaults, expected_auth_config: ConfigDefaults + karapace_container: KarapaceContainer, + auth_header: str, + config_override: Mapping[str, Any], + expected_auth_config: Mapping[str, Any], ) -> None: - config = set_config_defaults(config_override) + config = karapace_container.config().set_config_defaults(new_config=config_override) auth_config = get_auth_config_from_header(auth_header, config) assert auth_config == expected_auth_config @@ -109,9 +114,11 @@ def test_simple_oauth_token_provider_returns_configured_token_and_expiry() -> No assert token_provider.token_with_expiry() == (token, expiry_timestamp) -def test_get_client_auth_parameters_from_config_sasl_plain() -> None: - config = set_config_defaults( - {"sasl_mechanism": "PLAIN", "sasl_plain_username": "username", "sasl_plain_password": "password"} +def test_get_client_auth_parameters_from_config_sasl_plain( + karapace_container: KarapaceContainer, +) -> None: + config = karapace_container.config().set_config_defaults( + new_config={"sasl_mechanism": "PLAIN", "sasl_plain_username": "username", "sasl_plain_password": "password"}, ) client_auth_params = get_kafka_client_auth_parameters_from_config(config) @@ -123,10 +130,14 @@ def test_get_client_auth_parameters_from_config_sasl_plain() -> None: } -def test_get_client_auth_parameters_from_config_oauth() -> None: +def test_get_client_auth_parameters_from_config_oauth( + karapace_container: KarapaceContainer, +) -> None: expiry_timestamp = 1697013997 token = jwt.encode({"exp": expiry_timestamp}, "secret") - config = set_config_defaults({"sasl_mechanism": "OAUTHBEARER", "sasl_oauth_token": token}) + config = karapace_container.config().set_config_defaults( + new_config={"sasl_mechanism": "OAUTHBEARER", "sasl_oauth_token": token} + ) client_auth_params = get_kafka_client_auth_parameters_from_config(config) diff --git a/tests/unit/test_config.py b/tests/unit/test_config.py index b8475e1c6..79ce7da78 100644 --- a/tests/unit/test_config.py +++ b/tests/unit/test_config.py @@ -4,55 +4,55 @@ Copyright (c) 2023 Aiven Ltd See LICENSE for details """ -from karapace.config import set_config_defaults from karapace.constants import DEFAULT_AIOHTTP_CLIENT_MAX_SIZE, DEFAULT_PRODUCER_MAX_REQUEST +from karapace.container import KarapaceContainer -def test_http_request_max_size() -> None: - config = set_config_defaults( +def test_http_request_max_size(karapace_container: KarapaceContainer) -> None: + config = karapace_container.config().set_config_defaults( { "karapace_rest": False, "producer_max_request_size": DEFAULT_PRODUCER_MAX_REQUEST + 1024, } ) - assert config["http_request_max_size"] == DEFAULT_AIOHTTP_CLIENT_MAX_SIZE + assert config.http_request_max_size == DEFAULT_AIOHTTP_CLIENT_MAX_SIZE - config = set_config_defaults( + config = karapace_container.config().set_config_defaults( { "karapace_rest": False, "http_request_max_size": 1024, } ) - assert config["http_request_max_size"] == 1024 + assert config.http_request_max_size == 1024 - config = set_config_defaults( + config = karapace_container.config().set_config_defaults( { "karapace_rest": True, } ) - assert config["http_request_max_size"] == DEFAULT_AIOHTTP_CLIENT_MAX_SIZE + assert config.http_request_max_size == DEFAULT_AIOHTTP_CLIENT_MAX_SIZE - config = set_config_defaults( + config = karapace_container.config().set_config_defaults( { "karapace_rest": True, "producer_max_request_size": 1024, } ) - assert config["http_request_max_size"] == DEFAULT_AIOHTTP_CLIENT_MAX_SIZE + assert config.http_request_max_size == DEFAULT_AIOHTTP_CLIENT_MAX_SIZE - config = set_config_defaults( + config = karapace_container.config().set_config_defaults( { "karapace_rest": True, "producer_max_request_size": DEFAULT_PRODUCER_MAX_REQUEST + 1024, } ) - assert config["http_request_max_size"] == DEFAULT_PRODUCER_MAX_REQUEST + 1024 + DEFAULT_AIOHTTP_CLIENT_MAX_SIZE + assert config.http_request_max_size == DEFAULT_PRODUCER_MAX_REQUEST + 1024 + DEFAULT_AIOHTTP_CLIENT_MAX_SIZE - config = set_config_defaults( + config = karapace_container.config().set_config_defaults( { "karapace_rest": True, "producer_max_request_size": DEFAULT_PRODUCER_MAX_REQUEST + 1024, "http_request_max_size": 1024, } ) - assert config["http_request_max_size"] == 1024 + assert config.http_request_max_size == 1024 diff --git a/tests/unit/test_in_memory_database.py b/tests/unit/test_in_memory_database.py index a3720940d..2a0156567 100644 --- a/tests/unit/test_in_memory_database.py +++ b/tests/unit/test_in_memory_database.py @@ -7,8 +7,8 @@ from collections import defaultdict from collections.abc import Iterable, Sequence from confluent_kafka.cimpl import KafkaError -from karapace.config import DEFAULTS from karapace.constants import DEFAULT_SCHEMA_TOPIC +from karapace.container import KarapaceContainer from karapace.in_memory_database import InMemoryDatabase, KarapaceDatabase, Subject, SubjectData from karapace.kafka.types import Timestamp from karapace.key_format import KeyFormatter @@ -214,7 +214,7 @@ def compute_schema_id_to_subjects( return schema_id_to_duplicated_subjects -def test_can_ingest_schemas_from_log() -> None: +def test_can_ingest_schemas_from_log(karapace_container: KarapaceContainer) -> None: """ Test for the consistency of a backup, this checks that each SchemaID its unique in the backup. The format of the log its the one obtained by running: @@ -228,7 +228,7 @@ def test_can_ingest_schemas_from_log() -> None: database = WrappedInMemoryDatabase() schema_reader = KafkaSchemaReader( - config=DEFAULTS, + config=karapace_container.config(), offset_watcher=OffsetWatcher(), key_formatter=KeyFormatter(), master_coordinator=None, diff --git a/tests/unit/test_kafka_error_handler.py b/tests/unit/test_kafka_error_handler.py index 45e9fea1b..183205137 100644 --- a/tests/unit/test_kafka_error_handler.py +++ b/tests/unit/test_kafka_error_handler.py @@ -3,6 +3,7 @@ See LICENSE for details """ from _pytest.logging import LogCaptureFixture +from karapace.container import KarapaceContainer from karapace.errors import CorruptKafkaRecordException from karapace.kafka_error_handler import KafkaErrorHandler, KafkaErrorLocation @@ -12,11 +13,13 @@ @pytest.fixture(name="kafka_error_handler") -def fixture_kafka_error_handler() -> KafkaErrorHandler: - config = { - "kafka_schema_reader_strict_mode": False, - "kafka_retriable_errors_silenced": True, - } +def fixture_kafka_error_handler(karapace_container: KarapaceContainer) -> KafkaErrorHandler: + config = karapace_container.config().set_config_defaults( + { + "kafka_schema_reader_strict_mode": False, + "kafka_retriable_errors_silenced": True, + } + ) return KafkaErrorHandler(config=config) diff --git a/tests/unit/test_protobuf_serialization.py b/tests/unit/test_protobuf_serialization.py index ee2586d63..1cb013538 100644 --- a/tests/unit/test_protobuf_serialization.py +++ b/tests/unit/test_protobuf_serialization.py @@ -2,7 +2,7 @@ Copyright (c) 2023 Aiven Ltd See LICENSE for details """ -from karapace.config import read_config +from karapace.container import KarapaceContainer from karapace.dependency import Dependency from karapace.protobuf.kotlin_wrapper import trim_margin from karapace.schema_models import ParsedTypedSchema, SchemaType, Versioner @@ -11,11 +11,11 @@ InvalidMessageHeader, InvalidMessageSchema, InvalidPayload, + SchemaRegistryClient, SchemaRegistrySerializer, START_BYTE, ) from karapace.typing import Subject -from pathlib import Path from tests.utils import schema_protobuf, test_fail_objects_protobuf, test_objects_protobuf from unittest.mock import call, Mock @@ -27,16 +27,16 @@ log = logging.getLogger(__name__) -async def make_ser_deser(config_path: str, mock_client) -> SchemaRegistrySerializer: - with open(config_path, encoding="utf8") as handler: - config = read_config(handler) - serializer = SchemaRegistrySerializer(config=config) +async def make_ser_deser( + karapace_container: KarapaceContainer, mock_client: SchemaRegistryClient +) -> SchemaRegistrySerializer: + serializer = SchemaRegistrySerializer(config=karapace_container.config()) await serializer.registry_client.close() serializer.registry_client = mock_client return serializer -async def test_happy_flow(default_config_path: Path): +async def test_happy_flow(karapace_container: KarapaceContainer): mock_protobuf_registry_client = Mock() schema_for_id_one_future = asyncio.Future() schema_for_id_one_future.set_result( @@ -49,7 +49,7 @@ async def test_happy_flow(default_config_path: Path): ) mock_protobuf_registry_client.get_schema.return_value = get_latest_schema_future - serializer = await make_ser_deser(default_config_path, mock_protobuf_registry_client) + serializer = await make_ser_deser(karapace_container, mock_protobuf_registry_client) assert len(serializer.ids_to_schemas) == 0 schema = await serializer.get_schema_for_subject("top") for o in test_objects_protobuf: @@ -62,7 +62,7 @@ async def test_happy_flow(default_config_path: Path): assert mock_protobuf_registry_client.method_calls == [call.get_schema("top"), call.get_schema_for_id(1)] -async def test_happy_flow_references(default_config_path: Path): +async def test_happy_flow_references(karapace_container: KarapaceContainer): no_ref_schema_str = """ |syntax = "proto3"; | @@ -117,7 +117,7 @@ async def test_happy_flow_references(default_config_path: Path): get_latest_schema_future.set_result((1, ref_schema, Versioner.V(1))) mock_protobuf_registry_client.get_schema.return_value = get_latest_schema_future - serializer = await make_ser_deser(default_config_path, mock_protobuf_registry_client) + serializer = await make_ser_deser(karapace_container, mock_protobuf_registry_client) assert len(serializer.ids_to_schemas) == 0 schema = await serializer.get_schema_for_subject("top") for o in test_objects: @@ -130,7 +130,7 @@ async def test_happy_flow_references(default_config_path: Path): assert mock_protobuf_registry_client.method_calls == [call.get_schema("top"), call.get_schema_for_id(1)] -async def test_happy_flow_references_two(default_config_path: Path): +async def test_happy_flow_references_two(karapace_container: KarapaceContainer): no_ref_schema_str = """ |syntax = "proto3"; | @@ -204,7 +204,7 @@ async def test_happy_flow_references_two(default_config_path: Path): get_latest_schema_future.set_result((1, ref_schema_two, Versioner.V(1))) mock_protobuf_registry_client.get_schema.return_value = get_latest_schema_future - serializer = await make_ser_deser(default_config_path, mock_protobuf_registry_client) + serializer = await make_ser_deser(karapace_container, mock_protobuf_registry_client) assert len(serializer.ids_to_schemas) == 0 schema = await serializer.get_schema_for_subject("top") for o in test_objects: @@ -217,7 +217,7 @@ async def test_happy_flow_references_two(default_config_path: Path): assert mock_protobuf_registry_client.method_calls == [call.get_schema("top"), call.get_schema_for_id(1)] -async def test_serialization_fails(default_config_path: Path): +async def test_serialization_fails(karapace_container: KarapaceContainer): mock_protobuf_registry_client = Mock() get_latest_schema_future = asyncio.Future() get_latest_schema_future.set_result( @@ -225,7 +225,7 @@ async def test_serialization_fails(default_config_path: Path): ) mock_protobuf_registry_client.get_schema.return_value = get_latest_schema_future - serializer = await make_ser_deser(default_config_path, mock_protobuf_registry_client) + serializer = await make_ser_deser(karapace_container, mock_protobuf_registry_client) with pytest.raises(InvalidMessageSchema): schema = await serializer.get_schema_for_subject("top") await serializer.serialize(schema, test_fail_objects_protobuf[0]) @@ -240,10 +240,10 @@ async def test_serialization_fails(default_config_path: Path): assert mock_protobuf_registry_client.method_calls == [call.get_schema("top")] -async def test_deserialization_fails(default_config_path: Path): +async def test_deserialization_fails(karapace_container: KarapaceContainer): mock_protobuf_registry_client = Mock() - deserializer = await make_ser_deser(default_config_path, mock_protobuf_registry_client) + deserializer = await make_ser_deser(karapace_container, mock_protobuf_registry_client) invalid_header_payload = struct.pack(">bII", 1, 500, 500) with pytest.raises(InvalidMessageHeader): await deserializer.deserialize(invalid_header_payload) @@ -259,10 +259,10 @@ async def test_deserialization_fails(default_config_path: Path): assert mock_protobuf_registry_client.method_calls == [call.get_schema_for_id(500)] -async def test_deserialization_fails2(default_config_path: Path): +async def test_deserialization_fails2(karapace_container: KarapaceContainer): mock_protobuf_registry_client = Mock() - deserializer = await make_ser_deser(default_config_path, mock_protobuf_registry_client) + deserializer = await make_ser_deser(karapace_container, mock_protobuf_registry_client) invalid_header_payload = struct.pack(">bII", 1, 500, 500) with pytest.raises(InvalidMessageHeader): await deserializer.deserialize(invalid_header_payload) diff --git a/tests/unit/test_rapu.py b/tests/unit/test_rapu.py index cde68e2be..ba5c77e8c 100644 --- a/tests/unit/test_rapu.py +++ b/tests/unit/test_rapu.py @@ -5,7 +5,7 @@ from _pytest.logging import LogCaptureFixture from aiohttp.client_exceptions import ClientConnectionError from aiohttp.web import Request -from karapace.config import DEFAULTS +from karapace.container import KarapaceContainer from karapace.karapace import KarapaceBase from karapace.rapu import HTTPRequest, REST_ACCEPT_RE, REST_CONTENT_TYPE_RE from karapace.statsd import StatsClient @@ -167,12 +167,14 @@ def test_content_type_re(): @pytest.mark.parametrize("connection_error", (ConnectionError(), ClientConnectionError())) -async def test_raise_connection_error_handling(connection_error: BaseException) -> None: +async def test_raise_connection_error_handling( + karapace_container: KarapaceContainer, connection_error: BaseException +) -> None: request_mock = Mock(spec=Request) request_mock.read.side_effect = connection_error callback_mock = Mock() - app = KarapaceBase(config=DEFAULTS) + app = KarapaceBase(config=karapace_container.config()) response = await app._handle_request( # pylint: disable=protected-access request=request_mock, @@ -185,8 +187,8 @@ async def test_raise_connection_error_handling(connection_error: BaseException) callback_mock.assert_not_called() -async def test_close_by_app(caplog: LogCaptureFixture) -> None: - app = KarapaceBase(config=DEFAULTS) +async def test_close_by_app(caplog: LogCaptureFixture, karapace_container: KarapaceContainer) -> None: + app = KarapaceBase(config=karapace_container.config()) app.stats = Mock(spec=StatsClient) with caplog.at_level(logging.WARNING, logger="karapace.rapu"): diff --git a/tests/unit/test_rest_auth.py b/tests/unit/test_rest_auth.py index 86bb14b8a..ad2d54057 100644 --- a/tests/unit/test_rest_auth.py +++ b/tests/unit/test_rest_auth.py @@ -5,7 +5,7 @@ """ from __future__ import annotations -from karapace.config import set_config_defaults +from karapace.container import KarapaceContainer from karapace.kafka_rest_apis import AUTH_EXPIRY_TOLERANCE, KafkaRest, UserRestProxy from unittest.mock import call, Mock @@ -34,8 +34,8 @@ def _create_mock_proxy( return proxy -async def test_rest_proxy_janitor_expiring_credentials() -> None: - config = set_config_defaults( +async def test_rest_proxy_janitor_expiring_credentials(karapace_container: KarapaceContainer) -> None: + config = karapace_container.config().set_config_defaults( { "rest_authorization": True, "sasl_bootstrap_uri": "localhost:9094", @@ -92,8 +92,8 @@ async def test_rest_proxy_janitor_expiring_credentials() -> None: assert unused_proxy_expiring_later_than_tolerance.method_calls == [call.num_consumers(), call.aclose()] -async def test_rest_proxy_janitor_default() -> None: - config = set_config_defaults( +async def test_rest_proxy_janitor_default(karapace_container: KarapaceContainer) -> None: + config = karapace_container.config().set_config_defaults( { "rest_authorization": True, "sasl_bootstrap_uri": "localhost:9094", @@ -148,8 +148,8 @@ async def test_rest_proxy_janitor_default() -> None: assert active_proxy_with_consumers.method_calls == [call.num_consumers()] -async def test_rest_proxy_janitor_destructive() -> None: - config = set_config_defaults( +async def test_rest_proxy_janitor_destructive(karapace_container: KarapaceContainer) -> None: + config = karapace_container.config().set_config_defaults( { "rest_authorization": True, "sasl_bootstrap_uri": "localhost:9094", diff --git a/tests/unit/test_schema_reader.py b/tests/unit/test_schema_reader.py index 552fa0be7..093cab333 100644 --- a/tests/unit/test_schema_reader.py +++ b/tests/unit/test_schema_reader.py @@ -9,7 +9,7 @@ from concurrent.futures import Future, ThreadPoolExecutor from confluent_kafka import Message from dataclasses import dataclass -from karapace.config import DEFAULTS +from karapace.container import KarapaceContainer from karapace.errors import CorruptKafkaRecordException, ShutdownException from karapace.in_memory_database import InMemoryDatabase from karapace.kafka.consumer import KafkaConsumer @@ -154,7 +154,7 @@ class ReadinessTestCase(BaseTestCase): ), ], ) -def test_readiness_check(testcase: ReadinessTestCase) -> None: +def test_readiness_check(testcase: ReadinessTestCase, karapace_container: KarapaceContainer) -> None: key_formatter_mock = Mock() consumer_mock = Mock() consumer_mock.consume.return_value = [] @@ -163,7 +163,7 @@ def test_readiness_check(testcase: ReadinessTestCase) -> None: offset_watcher = OffsetWatcher() schema_reader = KafkaSchemaReader( - config=DEFAULTS, + config=karapace_container.config(), offset_watcher=offset_watcher, key_formatter=key_formatter_mock, master_coordinator=None, @@ -176,7 +176,7 @@ def test_readiness_check(testcase: ReadinessTestCase) -> None: assert schema_reader.ready is testcase.expected -def test_num_max_messages_to_consume_moved_to_one_after_ready() -> None: +def test_num_max_messages_to_consume_moved_to_one_after_ready(karapace_container: KarapaceContainer) -> None: key_formatter_mock = Mock() consumer_mock = Mock() consumer_mock.consume.return_value = [] @@ -185,7 +185,7 @@ def test_num_max_messages_to_consume_moved_to_one_after_ready() -> None: offset_watcher = OffsetWatcher() schema_reader = KafkaSchemaReader( - config=DEFAULTS, + config=karapace_container.config(), offset_watcher=offset_watcher, key_formatter=key_formatter_mock, master_coordinator=None, @@ -200,7 +200,9 @@ def test_num_max_messages_to_consume_moved_to_one_after_ready() -> None: assert schema_reader.max_messages_to_process == MAX_MESSAGES_TO_CONSUME_AFTER_STARTUP -def test_schema_reader_can_end_to_ready_state_if_last_message_is_invalid_in_schemas_topic() -> None: +def test_schema_reader_can_end_to_ready_state_if_last_message_is_invalid_in_schemas_topic( + karapace_container: KarapaceContainer, +) -> None: key_formatter_mock = Mock(spec=KeyFormatter) consumer_mock = Mock(spec=KafkaConsumer) @@ -230,7 +232,7 @@ def test_schema_reader_can_end_to_ready_state_if_last_message_is_invalid_in_sche offset_watcher = OffsetWatcher() schema_reader = KafkaSchemaReader( - config=DEFAULTS, + config=karapace_container.config(), offset_watcher=offset_watcher, key_formatter=key_formatter_mock, master_coordinator=None, @@ -255,7 +257,7 @@ def test_schema_reader_can_end_to_ready_state_if_last_message_is_invalid_in_sche assert schema_reader.max_messages_to_process == MAX_MESSAGES_TO_CONSUME_AFTER_STARTUP -def test_soft_deleted_schema_storing() -> None: +def test_soft_deleted_schema_storing(karapace_container: KarapaceContainer) -> None: """This tests a case when _schemas has been compacted and only the soft deleted version of the schema is present. """ @@ -287,7 +289,7 @@ def test_soft_deleted_schema_storing() -> None: offset_watcher = OffsetWatcher() schema_reader = KafkaSchemaReader( - config=DEFAULTS, + config=karapace_container.config(), offset_watcher=offset_watcher, key_formatter=key_formatter_mock, master_coordinator=None, @@ -302,14 +304,14 @@ def test_soft_deleted_schema_storing() -> None: assert soft_deleted_stored_schema is not None -def test_handle_msg_delete_subject_logs(caplog: LogCaptureFixture) -> None: +def test_handle_msg_delete_subject_logs(caplog: LogCaptureFixture, karapace_container: KarapaceContainer) -> None: database_mock = Mock(spec=InMemoryDatabase) database_mock.find_subject.return_value = True database_mock.find_subject_schemas.return_value = { Version(1): "SchemaVersion" } # `SchemaVersion` is an actual object, simplified for test schema_reader = KafkaSchemaReader( - config=DEFAULTS, + config=karapace_container.config(), offset_watcher=OffsetWatcher(), key_formatter=KeyFormatter(), master_coordinator=None, @@ -376,7 +378,9 @@ class HealthCheckTestCase(BaseTestCase): ), ], ) -async def test_schema_reader_health_check(testcase: HealthCheckTestCase, monkeypatch: MonkeyPatch) -> None: +async def test_schema_reader_health_check( + testcase: HealthCheckTestCase, monkeypatch: MonkeyPatch, karapace_container: KarapaceContainer +) -> None: offset_watcher = OffsetWatcher() key_formatter_mock = Mock() admin_client_mock = Mock() @@ -386,10 +390,10 @@ async def test_schema_reader_health_check(testcase: HealthCheckTestCase, monkeyp emtpy_future.set_exception(testcase.check_topic_error) else: emtpy_future.set_result(None) - admin_client_mock.describe_topics.return_value = {DEFAULTS["topic_name"]: emtpy_future} + admin_client_mock.describe_topics.return_value = {karapace_container.config().topic_name: emtpy_future} schema_reader = KafkaSchemaReader( - config=DEFAULTS, + config=karapace_container.config(), offset_watcher=offset_watcher, key_formatter=key_formatter_mock, master_coordinator=None, @@ -415,7 +419,9 @@ class KafkaMessageHandlingErrorTestCase(BaseTestCase): @pytest.fixture(name="schema_reader_with_consumer_messages_factory") -def fixture_schema_reader_with_consumer_messages_factory() -> Callable[[tuple[list[Message]]], KafkaSchemaReader]: +def fixture_schema_reader_with_consumer_messages_factory( + karapace_container: KarapaceContainer, +) -> Callable[[tuple[list[Message]]], KafkaSchemaReader]: def factory(consumer_messages: tuple[list[Message]]) -> KafkaSchemaReader: key_formatter_mock = Mock(spec=KeyFormatter) consumer_mock = Mock(spec=KafkaConsumer) @@ -425,8 +431,7 @@ def factory(consumer_messages: tuple[list[Message]]) -> KafkaSchemaReader: consumer_mock.get_watermark_offsets.return_value = (0, 4) # Update the config to run the schema reader in strict mode so errors can be raised - config = DEFAULTS.copy() - config["kafka_schema_reader_strict_mode"] = True + config = karapace_container.config().set_config_defaults({"kafka_schema_reader_strict_mode": True}) offset_watcher = OffsetWatcher() schema_reader = KafkaSchemaReader( diff --git a/tests/unit/test_schema_registry_api.py b/tests/unit/test_schema_registry_api.py index a9dc897e2..f21f47097 100644 --- a/tests/unit/test_schema_registry_api.py +++ b/tests/unit/test_schema_registry_api.py @@ -2,64 +2,79 @@ Copyright (c) 2023 Aiven Ltd See LICENSE for details """ -from aiohttp.test_utils import TestClient, TestServer -from karapace.config import DEFAULTS, set_config_defaults +from fastapi.exceptions import HTTPException from karapace.rapu import HTTPResponse +from karapace.schema_models import SchemaType, ValidatedTypedSchema from karapace.schema_reader import KafkaSchemaReader -from karapace.schema_registry import KarapaceSchemaRegistry -from schema_registry.schema_registry_apis import KarapaceSchemaRegistryController -from unittest.mock import ANY, AsyncMock, Mock, patch, PropertyMock +from schema_registry.container import SchemaRegistryContainer +from unittest.mock import Mock, patch, PropertyMock import asyncio +import json import pytest +TYPED_AVRO_SCHEMA = ValidatedTypedSchema.parse( + SchemaType.AVRO, + json.dumps( + { + "namespace": "io.aiven.data", + "name": "Test", + "type": "record", + "fields": [ + { + "name": "attr1", + "type": ["null", "string"], + } + ], + } + ), +) -async def test_validate_schema_request_body() -> None: - controller = KarapaceSchemaRegistryController(config=set_config_defaults(DEFAULTS)) - controller._validate_schema_request_body( # pylint: disable=W0212 - "application/json", {"schema": "{}", "schemaType": "JSON", "references": [], "metadata": {}, "ruleSet": {}} +async def test_validate_schema_request_body(schema_registry_container: SchemaRegistryContainer) -> None: + schema_registry_container.schema_registry_controller()._validate_schema_type( # pylint: disable=W0212 + {"schema": "{}", "schemaType": "JSON", "references": [], "metadata": {}, "ruleSet": {}} ) - with pytest.raises(HTTPResponse) as exc_info: - controller._validate_schema_request_body( # pylint: disable=W0212 - "application/json", - {"schema": "{}", "schemaType": "JSON", "references": [], "unexpected_field_name": {}, "ruleSet": {}}, + with pytest.raises(HTTPException) as exc_info: + schema_registry_container.schema_registry_controller()._validate_schema_type( # pylint: disable=W0212 + {"schema": "{}", "schemaType": "DOES_NOT_EXIST", "references": [], "unexpected_field_name": {}, "ruleSet": {}}, ) - assert exc_info.type is HTTPResponse - assert str(exc_info.value) == "HTTPResponse 422" + assert exc_info.type is HTTPException + assert str(exc_info.value) == "422: {'error_code': 422, 'message': 'Invalid schemaType DOES_NOT_EXIST'}" -async def test_forward_when_not_ready() -> None: - with patch("schema_registry.schema_registry_apis.KarapaceSchemaRegistry") as schema_registry_class: +async def test_forward_when_not_ready(schema_registry_container: SchemaRegistryContainer) -> None: + with patch("karapace.container.KarapaceSchemaRegistry") as schema_registry_class: schema_reader_mock = Mock(spec=KafkaSchemaReader) ready_property_mock = PropertyMock(return_value=False) - schema_registry = AsyncMock(spec=KarapaceSchemaRegistry) type(schema_reader_mock).ready = ready_property_mock - schema_registry.schema_reader = schema_reader_mock - schema_registry_class.return_value = schema_registry + schema_registry_class.schema_reader = schema_reader_mock - schema_registry.get_master.return_value = (False, "http://primary-url") + schema_registry_class.schemas_get.return_value = TYPED_AVRO_SCHEMA + schema_registry_class.get_master.return_value = (False, "http://primary-url") close_future_result = asyncio.Future() close_future_result.set_result(True) close_func = Mock() close_func.return_value = close_future_result - schema_registry.close = close_func + schema_registry_class.close = close_func + + schema_registry_container.karapace_container().schema_registry = schema_registry_class + controller = schema_registry_container.schema_registry_controller() + controller.schema_registry = schema_registry_class - controller = KarapaceSchemaRegistryController(config=set_config_defaults(DEFAULTS)) mock_forward_func_future = asyncio.Future() mock_forward_func_future.set_exception(HTTPResponse({"mock": "response"})) mock_forward_func = Mock() mock_forward_func.return_value = mock_forward_func_future controller._forward_request_remote = mock_forward_func # pylint: disable=protected-access - test_server = TestServer(controller.app) - async with TestClient(test_server) as client: - await client.get("/schemas/ids/1", headers={"Content-Type": "application/json"}) - - ready_property_mock.assert_called_once() - schema_registry.get_master.assert_called_once() - mock_forward_func.assert_called_once_with( - request=ANY, body=None, url="http://primary-url/schemas/ids/1", content_type="application/json", method="GET" - ) + assert await controller.schemas_get( + schema_id=1, + include_subjects=False, + fetch_max_id=False, + format_serialized="", + user=None, + authorizer=None, + ) diff --git a/tests/unit/test_serialization.py b/tests/unit/test_serialization.py index a21d3bc00..041df44ab 100644 --- a/tests/unit/test_serialization.py +++ b/tests/unit/test_serialization.py @@ -2,8 +2,7 @@ Copyright (c) 2023 Aiven Ltd See LICENSE for details """ -from karapace.client import Path -from karapace.config import DEFAULTS, read_config +from karapace.container import KarapaceContainer from karapace.schema_models import SchemaType, ValidatedTypedSchema, Versioner from karapace.serialization import ( flatten_unions, @@ -12,6 +11,7 @@ InvalidMessageHeader, InvalidMessageSchema, InvalidPayload, + SchemaRegistryClient, SchemaRegistrySerializer, START_BYTE, write_value, @@ -109,16 +109,16 @@ ) -async def make_ser_deser(config_path: str, mock_client) -> SchemaRegistrySerializer: - with open(config_path, encoding="utf8") as handler: - config = read_config(handler) - serializer = SchemaRegistrySerializer(config=config) +async def make_ser_deser( + karapace_container: KarapaceContainer, mock_client: SchemaRegistryClient +) -> SchemaRegistrySerializer: + serializer = SchemaRegistrySerializer(config=karapace_container.config()) await serializer.registry_client.close() serializer.registry_client = mock_client return serializer -async def test_happy_flow(default_config_path: Path): +async def test_happy_flow(karapace_container: KarapaceContainer): mock_registry_client = Mock() get_latest_schema_future = asyncio.Future() get_latest_schema_future.set_result((1, ValidatedTypedSchema.parse(SchemaType.AVRO, schema_avro_json), Versioner.V(1))) @@ -127,7 +127,7 @@ async def test_happy_flow(default_config_path: Path): schema_for_id_one_future.set_result((ValidatedTypedSchema.parse(SchemaType.AVRO, schema_avro_json), [Subject("stub")])) mock_registry_client.get_schema_for_id.return_value = schema_for_id_one_future - serializer = await make_ser_deser(default_config_path, mock_registry_client) + serializer = await make_ser_deser(karapace_container, mock_registry_client) assert len(serializer.ids_to_schemas) == 0 schema = await serializer.get_schema_for_subject(Subject("top")) for o in test_objects_avro: @@ -213,7 +213,7 @@ def test_flatten_unions_map() -> None: assert flatten_unions(typed_schema.schema, record) == flatten_record -def test_avro_json_write_invalid() -> None: +def test_avro_json_write_invalid(karapace_container: KarapaceContainer) -> None: schema = { "namespace": "io.aiven.data", "name": "Test", @@ -236,10 +236,10 @@ def test_avro_json_write_invalid() -> None: for record in records: with pytest.raises(avro.errors.AvroTypeException): - write_value(DEFAULTS, typed_schema, bio, record) + write_value(karapace_container.config(), typed_schema, bio, record) -def test_avro_json_write_accepts_json_encoded_data_without_tagged_unions() -> None: +def test_avro_json_write_accepts_json_encoded_data_without_tagged_unions(karapace_container: KarapaceContainer) -> None: """Backwards compatibility test for Avro data using JSON encoding. The initial behavior of the API was incorrect, and it accept data with @@ -299,24 +299,24 @@ def test_avro_json_write_accepts_json_encoded_data_without_tagged_unions() -> No buffer_a = io.BytesIO() buffer_b = io.BytesIO() - write_value(DEFAULTS, typed_schema, buffer_a, properly_tagged_encoding_a) - write_value(DEFAULTS, typed_schema, buffer_b, missing_tag_encoding_a) + write_value(karapace_container.config(), typed_schema, buffer_a, properly_tagged_encoding_a) + write_value(karapace_container.config(), typed_schema, buffer_b, missing_tag_encoding_a) assert buffer_a.getbuffer() == buffer_b.getbuffer() buffer_a = io.BytesIO() buffer_b = io.BytesIO() - write_value(DEFAULTS, typed_schema, buffer_a, properly_tagged_encoding_b) - write_value(DEFAULTS, typed_schema, buffer_b, missing_tag_encoding_b) + write_value(karapace_container.config(), typed_schema, buffer_a, properly_tagged_encoding_b) + write_value(karapace_container.config(), typed_schema, buffer_b, missing_tag_encoding_b) assert buffer_a.getbuffer() == buffer_b.getbuffer() -async def test_serialization_fails(default_config_path: Path): +async def test_serialization_fails(karapace_container: KarapaceContainer): mock_registry_client = Mock() get_latest_schema_future = asyncio.Future() get_latest_schema_future.set_result((1, ValidatedTypedSchema.parse(SchemaType.AVRO, schema_avro_json), Versioner.V(1))) mock_registry_client.get_schema.return_value = get_latest_schema_future - serializer = await make_ser_deser(default_config_path, mock_registry_client) + serializer = await make_ser_deser(karapace_container, mock_registry_client) with pytest.raises(InvalidMessageSchema): schema = await serializer.get_schema_for_subject(Subject("topic")) await serializer.serialize(schema, {"foo": "bar"}) @@ -324,13 +324,13 @@ async def test_serialization_fails(default_config_path: Path): assert mock_registry_client.method_calls == [call.get_schema("topic")] -async def test_deserialization_fails(default_config_path: Path): +async def test_deserialization_fails(karapace_container: KarapaceContainer): mock_registry_client = Mock() schema_for_id_one_future = asyncio.Future() schema_for_id_one_future.set_result((ValidatedTypedSchema.parse(SchemaType.AVRO, schema_avro_json), [Subject("stub")])) mock_registry_client.get_schema_for_id.return_value = schema_for_id_one_future - deserializer = await make_ser_deser(default_config_path, mock_registry_client) + deserializer = await make_ser_deser(karapace_container, mock_registry_client) invalid_header_payload = struct.pack(">bII", 1, 500, 500) with pytest.raises(InvalidMessageHeader): await deserializer.deserialize(invalid_header_payload)