From 6f6661ef42dec25c37ac466ce01f7f330f838221 Mon Sep 17 00:00:00 2001 From: Emmanuel Evbuomwan Date: Fri, 22 Nov 2024 18:58:41 +0100 Subject: [PATCH] feat: wire more dependencies and modules we drop the dependency managers and use the DI containers and providers --- .dockerignore | 1 - container/Dockerfile | 4 - container/compose.yml | 37 +++----- .../{karapace.env => karapace.registry.env} | 2 +- container/karapace.rest.env | 47 ++++++++++ container/start.sh | 39 -------- pyproject.toml | 2 +- src/karapace/auth/auth.py | 17 ++-- src/karapace/auth/dependencies.py | 77 ---------------- .../base_config.yaml | 0 src/karapace/config.py | 46 +++++----- src/karapace/container.py | 37 ++++++++ src/karapace/karapace_all.py | 56 ++++-------- src/schema_registry/__main__.py | 70 +++++++++------ src/schema_registry/container.py | 16 ++-- .../dependencies/config_dependency.py | 23 ----- .../dependencies/controller_dependency.py | 23 ----- .../dependencies/forward_client_dependency.py | 20 ----- .../schema_registry_dependency.py | 24 ----- .../dependencies/stats_dependeny.py | 23 ----- src/schema_registry/routers/__init__.py | 19 ---- ...mpatibility_router.py => compatibility.py} | 18 ++-- .../routers/{config_router.py => config.py} | 59 +++++++------ .../routers/{health_router.py => health.py} | 9 +- src/schema_registry/routers/mode.py | 46 ++++++++++ src/schema_registry/routers/mode_router.py | 42 --------- .../routers/{root_router.py => root.py} | 0 .../routers/{schemas_router.py => schemas.py} | 34 ++++--- src/schema_registry/routers/setup.py | 23 +++++ .../{subjects_router.py => subjects.py} | 88 +++++++++++-------- src/schema_registry/schema_registry_apis.py | 19 ++-- src/schema_registry/user.py | 31 +++++++ 32 files changed, 434 insertions(+), 518 deletions(-) rename container/{karapace.env => karapace.registry.env} (96%) create mode 100644 container/karapace.rest.env delete mode 100755 container/start.sh delete mode 100644 src/karapace/auth/dependencies.py rename src/{schema_registry => karapace}/base_config.yaml (100%) create mode 100644 src/karapace/container.py delete mode 100644 src/schema_registry/dependencies/config_dependency.py delete mode 100644 src/schema_registry/dependencies/controller_dependency.py delete mode 100644 src/schema_registry/dependencies/forward_client_dependency.py delete mode 100644 src/schema_registry/dependencies/schema_registry_dependency.py delete mode 100644 src/schema_registry/dependencies/stats_dependeny.py rename src/schema_registry/routers/{compatibility_router.py => compatibility.py} (56%) rename src/schema_registry/routers/{config_router.py => config.py} (52%) rename src/schema_registry/routers/{health_router.py => health.py} (84%) create mode 100644 src/schema_registry/routers/mode.py delete mode 100644 src/schema_registry/routers/mode_router.py rename src/schema_registry/routers/{root_router.py => root.py} (100%) rename src/schema_registry/routers/{schemas_router.py => schemas.py} (57%) create mode 100644 src/schema_registry/routers/setup.py rename src/schema_registry/routers/{subjects_router.py => subjects.py} (56%) create mode 100644 src/schema_registry/user.py diff --git a/.dockerignore b/.dockerignore index 57efb59ad..4b946a334 100644 --- a/.dockerignore +++ b/.dockerignore @@ -10,7 +10,6 @@ !LICENSE !pyproject.toml !setup.py -!container/start.sh !container/healthcheck.py # Ignore some files in source directories. diff --git a/container/Dockerfile b/container/Dockerfile index 2e1544319..55ca06e1c 100644 --- a/container/Dockerfile +++ b/container/Dockerfile @@ -55,10 +55,6 @@ RUN apt-get update \ COPY --from=builder /venv /venv ENV PATH="/venv/bin:$PATH" -COPY ./container/start.sh /opt/karapace -RUN chmod 500 /opt/karapace/start.sh \ - && chown karapace:karapace /opt/karapace/start.sh - COPY ./container/healthcheck.py /opt/karapace WORKDIR /opt/karapace diff --git a/container/compose.yml b/container/compose.yml index f17c21eb3..154eb9fac 100644 --- a/container/compose.yml +++ b/container/compose.yml @@ -23,7 +23,7 @@ services: KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:29092,PLAINTEXT_HOST://localhost:9092 KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0 - KAFKA_CONFLUENT_SCHEMA_REGISTRY_URL: http://karapace-registry:8081 + KAFKA_CONFLUENT_SCHEMA_REGISTRY_URL: http://karapace-schema-registry:8081 # Metrics: KAFKA_JMX_PORT: 9101 KAFKA_JMX_HOSTNAME: localhost @@ -54,51 +54,42 @@ services: KAFKA_ZOOKEEPER_CONNECTION_TIMEOUT_MS: 6000 KAFKA_ZOOKEEPER_CONNECT: "zookeeper:2181" - karapace-registry: + karapace-schema-registry: image: ghcr.io/aiven-open/karapace:develop build: context: .. dockerfile: container/Dockerfile entrypoint: - - /bin/bash - - /opt/karapace/start.sh - - registry + - python3 + - -m + - schema_registry depends_on: - kafka ports: - "8081:8081" volumes: - - ./karapace.env:/opt/karapace/karapace.env + - ./karapace.registry.env:/opt/karapace/karapace.env environment: KARAPACE_DOTENV: /opt/karapace/karapace.env - karapace-rest: + karapace-rest-proxy: image: ghcr.io/aiven-open/karapace:develop build: context: .. dockerfile: container/Dockerfile entrypoint: - - /bin/bash - - /opt/karapace/start.sh - - rest + - python3 + - -m + - karapace.karapace_all depends_on: - kafka - - karapace-registry + - karapace-schema-registry ports: - "8082:8082" + volumes: + - ./karapace.rest.env:/opt/karapace/karapace.env environment: - KARAPACE_PORT: 8082 - KARAPACE_HOST: 0.0.0.0 - KARAPACE_ADVERTISED_HOSTNAME: karapace-rest - KARAPACE_BOOTSTRAP_URI: kafka:29092 - KARAPACE_REGISTRY_HOST: karapace-registry - KARAPACE_REGISTRY_PORT: 8081 - KARAPACE_ADMIN_METADATA_MAX_AGE: 0 - KARAPACE_LOG_LEVEL: WARNING - KARAPACE_STATSD_HOST: statsd-exporter - KARAPACE_STATSD_PORT: 8125 - KARAPACE_KAFKA_SCHEMA_READER_STRICT_MODE: false - KARAPACE_KAFKA_RETRIABLE_ERRORS_SILENCED: true + KARAPACE_DOTENV: /opt/karapace/karapace.env prometheus: image: prom/prometheus diff --git a/container/karapace.env b/container/karapace.registry.env similarity index 96% rename from container/karapace.env rename to container/karapace.registry.env index 6cee31a2e..119128b9f 100644 --- a/container/karapace.env +++ b/container/karapace.registry.env @@ -15,7 +15,7 @@ FETCH_MIN_BYTES=1 GROUP_ID=karapace-schema-registry HOST=0.0.0.0 PORT=8081 -REGISTRY_HOST=karapace-registry +REGISTRY_HOST=karapace-schema-registry REGISTRY_PORT=8081 REST_AUTHORIZATION=False LOG_HANDLER=stdout diff --git a/container/karapace.rest.env b/container/karapace.rest.env new file mode 100644 index 000000000..d5ae3a5b4 --- /dev/null +++ b/container/karapace.rest.env @@ -0,0 +1,47 @@ +KARAPACE_DOTENV=/opt/karapace/karapace.env +ACCESS_LOGS_DEBUG=False +ADVERTISED_HOSTNAME=karapace-rest-proxy +ADVERTISED_PORT=8082 +ADVERTISED_PROTOCOL=http +BOOTSTRAP_URI=kafka:29092 +CLIENT_ID=karapace-rest-proxy +COMPATIBILITY=BACKWARD +CONNECTIONS_MAX_IDLE_MS=15000 +CONSUMER_ENABLE_AUTO_COMMIT=True +CONSUMER_REQUEST_TIMEOUT_MS=11000 +CONSUMER_REQUEST_MAX_BYTES=67108864 +CONSUMER_IDLE_DISCONNECT_TIMEOUT=0 +FETCH_MIN_BYTES=1 +GROUP_ID=karapace-rest-proxy +HOST=0.0.0.0 +PORT=8081 +REGISTRY_HOST=karapace-schema-registry +REGISTRY_PORT=8081 +REST_AUTHORIZATION=False +LOG_HANDLER=stdout +LOG_LEVEL=DEBUG +LOG_FORMAT=%(asctime)s [%(threadName)s] %(filename)s:%(funcName)s:%(lineno)d %(message)s +MASTER_ELIGIBILITY=True +REPLICATION_FACTOR=1 +SECURITY_PROTOCOL=PLAINTEXT +SSL_CHECK_HOSTNAME=True +TOPIC_NAME=_schemas +METADATA_MAX_AGE_MS=60000 +ADMIN_METADATA_MAX_AGE=5 +PRODUCER_ACKS=1 +PRODUCER_COUNT=5 +PRODUCER_LINGER_MS=100 +PRODUCER_MAX_REQUEST_SIZE=1048576 +SESSION_TIMEOUT_MS=10000 +KARAPACE_REST=False +KARAPACE_REGISTRY=True +NAME_STRATEGY=topic_name +NAME_STRATEGY_VALIDATION=True +MASTER_ELECTION_STRATEGY=lowest +PROTOBUF_RUNTIME_DIRECTORY=runtime +STATSD_HOST=statsd-exporter +STATSD_PORT=8125 +KAFKA_SCHEMA_READER_STRICT_MODE=False +KAFKA_RETRIABLE_ERRORS_SILENCED=True +USE_PROTOBUF_FORMATTER=False +HTTP_REQUEST_MAX_SIZE=1048576 diff --git a/container/start.sh b/container/start.sh deleted file mode 100755 index a00f045e0..000000000 --- a/container/start.sh +++ /dev/null @@ -1,39 +0,0 @@ -#!/usr/bin/env bash -set -Eeuo pipefail - -# Configuration is done using environment variables. The environment variable -# names are the same as the configuration keys, all letters in caps, and always -# start with `KARAPACE_`. - -# In the code below the expression ${var+isset} is used to check if the -# variable was defined, and ${var-isunset} if not. -# -# Ref: https://pubs.opengroup.org/onlinepubs/9699919799/utilities/V3_chap02.html#tag_18_06_02 - -case $1 in -rest) - # Reexport variables for compatibility - [[ -n ${KARAPACE_REST_ADVERTISED_HOSTNAME+isset} ]] && export KARAPACE_ADVERTISED_HOSTNAME="${KARAPACE_REST_ADVERTISED_HOSTNAME}" - [[ -n ${KARAPACE_REST_BOOTSTRAP_URI+isset} ]] && export KARAPACE_BOOTSTRAP_URI="${KARAPACE_REST_BOOTSTRAP_URI}" - [[ -n ${KARAPACE_REST_REGISTRY_HOST+isset} ]] && export KARAPACE_REGISTRY_HOST="${KARAPACE_REST_REGISTRY_HOST}" - [[ -n ${KARAPACE_REST_REGISTRY_PORT+isset} ]] && export KARAPACE_REGISTRY_PORT="${KARAPACE_REST_REGISTRY_PORT}" - [[ -n ${KARAPACE_REST_HOST+isset} ]] && export KARAPACE_HOST="${KARAPACE_REST_HOST}" - [[ -n ${KARAPACE_REST_PORT+isset} ]] && export KARAPACE_PORT="${KARAPACE_REST_PORT}" - [[ -n ${KARAPACE_REST_ADMIN_METADATA_MAX_AGE+isset} ]] && export KARAPACE_ADMIN_METADATA_MAX_AGE="${KARAPACE_REST_ADMIN_METADATA_MAX_AGE}" - [[ -n ${KARAPACE_REST_LOG_LEVEL+isset} ]] && export KARAPACE_LOG_LEVEL="${KARAPACE_REST_LOG_LEVEL}" - export KARAPACE_REST=1 - echo "{}" >/opt/karapace/rest.config.json - - echo "Starting Karapace REST API" - exec python3 -m karapace.karapace_all /opt/karapace/rest.config.json - ;; -registry) - exec python3 -m schema_registry - ;; -*) - echo "usage: start-karapace.sh " - exit 0 - ;; -esac - -wait diff --git a/pyproject.toml b/pyproject.toml index 8cd2b27cc..6512d9d23 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -113,7 +113,7 @@ include-package-data = true where = ["src"] [tool.setuptools.package-data] -schema_registry = ["*.yaml"] +karapace = ["*.yaml"] [tool.setuptools_scm] version_file = "src/karapace/version.py" diff --git a/src/karapace/auth/auth.py b/src/karapace/auth/auth.py index ccaab30e6..cfc566cdf 100644 --- a/src/karapace/auth/auth.py +++ b/src/karapace/auth/auth.py @@ -8,7 +8,7 @@ from dataclasses import dataclass, field from enum import Enum, unique from hmac import compare_digest -from karapace.config import InvalidConfiguration +from karapace.config import Config, InvalidConfiguration from karapace.statsd import StatsClient from karapace.utils import json_decode, json_encode from typing import Protocol @@ -205,14 +205,12 @@ def check_authorization_any(self, user: User | None, operation: Operation, resou class HTTPAuthorizer(ACLAuthorizer, AuthenticatorAndAuthorizer): - def __init__(self, filename: str) -> None: + def __init__(self, config: Config) -> None: super().__init__() - self._auth_filename: str = filename + self._auth_filename: str = config.registry_authfile self._auth_mtime: float = -1 self._refresh_auth_task: asyncio.Task | None = None self._refresh_auth_awatch_stop_event = asyncio.Event() - # Once first, can raise if file not valid - self._load_authfile() @property def authfile_last_modified(self) -> float: @@ -221,6 +219,7 @@ def authfile_last_modified(self) -> float: @override async def start(self, stats: StatsClient) -> None: """Start authfile refresher task""" + self._load_authfile() async def _refresh_authfile() -> None: """Reload authfile, but keep old auth data if loading fails""" @@ -294,6 +293,14 @@ def authenticate(self, *, username: str, password: str) -> User: return user +def get_authorizer( + config: Config, + http_authorizer: HTTPAuthorizer, + no_auth_authorizer: NoAuthAndAuthz, +) -> AuthenticatorAndAuthorizer: + return http_authorizer if config.registry_authfile else no_auth_authorizer + + def main() -> int: parser = argparse.ArgumentParser(prog="karapace_mkpasswd", description="Karapace password hasher") parser.add_argument("-u", "--user", help="Username", type=str) diff --git a/src/karapace/auth/dependencies.py b/src/karapace/auth/dependencies.py deleted file mode 100644 index 671769714..000000000 --- a/src/karapace/auth/dependencies.py +++ /dev/null @@ -1,77 +0,0 @@ -""" -Copyright (c) 2024 Aiven Ltd -See LICENSE for details -""" - -from fastapi import Depends, HTTPException, Security, status -from fastapi.security import HTTPBasic, HTTPBasicCredentials -from fastapi.security.base import SecurityBase -from karapace.auth.auth import AuthenticationError, AuthenticatorAndAuthorizer, HTTPAuthorizer, NoAuthAndAuthz, User -from schema_registry.dependencies.config_dependency import ConfigDependencyManager -from typing import Annotated, Optional - -import logging - -LOG = logging.getLogger(__name__) - - -class AuthorizationDependencyManager: - AUTHORIZER: AuthenticatorAndAuthorizer | None = None - AUTH_SET: bool = False - SECURITY: SecurityBase | None = None - - @classmethod - def get_authorizer(cls) -> AuthenticatorAndAuthorizer: - if AuthorizationDependencyManager.AUTH_SET: - assert AuthorizationDependencyManager.AUTHORIZER - return AuthorizationDependencyManager.AUTHORIZER - - config = ConfigDependencyManager.get_config() - if config.registry_authfile: - AuthorizationDependencyManager.AUTHORIZER = HTTPAuthorizer(config.registry_authfile) - else: - # TODO: remove the need for empty authorization logic. - AuthorizationDependencyManager.AUTHORIZER = NoAuthAndAuthz() - AuthorizationDependencyManager.AUTH_SET = True - return AuthorizationDependencyManager.AUTHORIZER - - -AuthenticatorAndAuthorizerDep = Annotated[AuthenticatorAndAuthorizer, Depends(AuthorizationDependencyManager.get_authorizer)] - -# TODO Karapace can have authentication/authorization enabled or disabled. This code needs cleanup and better -# injection mechanism, this is fast workaround for optional user authentication and authorization. -SECURITY: SecurityBase | None = None -config = ConfigDependencyManager.get_config() -if config.registry_authfile: - SECURITY = HTTPBasic(auto_error=False) - - def get_current_user( - credentials: Annotated[Optional[HTTPBasicCredentials], Security(SECURITY)], - authorizer: AuthenticatorAndAuthorizerDep, - ) -> User: - if authorizer and not credentials: - raise HTTPException( - status_code=status.HTTP_401_UNAUTHORIZED, - detail={"message": "Unauthorized"}, - headers={"WWW-Authenticate": 'Basic realm="Karapace Schema Registry"'}, - ) - assert authorizer is not None - assert credentials is not None - username: str = credentials.username - password: str = credentials.password - try: - return authorizer.authenticate(username=username, password=password) - except AuthenticationError: - raise HTTPException( - status_code=status.HTTP_401_UNAUTHORIZED, - detail={"message": "Unauthorized"}, - headers={"WWW-Authenticate": 'Basic realm="Karapace Schema Registry"'}, - ) - -else: - - def get_current_user() -> None: - return None - - -CurrentUserDep = Annotated[Optional[User], Depends(get_current_user)] diff --git a/src/schema_registry/base_config.yaml b/src/karapace/base_config.yaml similarity index 100% rename from src/schema_registry/base_config.yaml rename to src/karapace/base_config.yaml diff --git a/src/karapace/config.py b/src/karapace/config.py index 511c25897..6303542fb 100644 --- a/src/karapace/config.py +++ b/src/karapace/config.py @@ -12,12 +12,16 @@ from karapace.utils import json_encode from pathlib import Path from pydantic import BaseSettings +from typing import Final import logging import os import socket import ssl +KARAPACE_ROOT: Final[Path] = Path(__file__).parent +KARAPACE_BASE_CONFIG_YAML_PATH: Final[Path] = KARAPACE_ROOT / "base_config.yaml" + HOSTNAME = socket.gethostname() @@ -58,6 +62,7 @@ class Config(BaseSettings): master_eligibility: bool = True replication_factor: int = 1 security_protocol: str = "PLAINTEXT" + ssl_ciphers: str | None = None ssl_cafile: str | None = None ssl_certfile: str | None = None ssl_keyfile: str | None = None @@ -218,7 +223,7 @@ def set_sentry_dsn_from_environment(config: Config) -> None: def validate_config(config: Config) -> None: - master_election_strategy = config["master_election_strategy"] + master_election_strategy = config.master_election_strategy try: ElectionStrategy(master_election_strategy.lower()) except ValueError: @@ -227,7 +232,7 @@ def validate_config(config: Config) -> None: f"Invalid master election strategy: {master_election_strategy}, valid values are {valid_strategies}" ) from None - name_strategy = config["name_strategy"] + name_strategy = config.name_strategy try: NameStrategy(name_strategy) except ValueError: @@ -236,7 +241,7 @@ def validate_config(config: Config) -> None: f"Invalid default name strategy: {name_strategy}, valid values are {valid_strategies}" ) from None - if config["rest_authorization"] and config["sasl_bootstrap_uri"] is None: + if config.rest_authorization and config.sasl_bootstrap_uri is None: raise InvalidConfiguration( "Using 'rest_authorization' requires configuration value for 'sasl_bootstrap_uri' to be set" ) @@ -253,17 +258,10 @@ def write_env_file(dot_env_path: Path, config: Config) -> None: def read_env_file(env_file_path: str) -> Config: return Config(_env_file=env_file_path, _env_file_encoding="utf-8") - Config() - try: - config = json_decode(config_handler) - except JSONDecodeError as ex: - raise InvalidConfiguration("Configuration is not a valid JSON") from ex - return set_config_defaults(config) - def create_client_ssl_context(config: Config) -> ssl.SSLContext | None: # taken from conn.py, as it adds a lot more logic to the context configuration than the initial version - if config["security_protocol"] in ("PLAINTEXT", "SASL_PLAINTEXT"): + if config.security_protocol in ("PLAINTEXT", "SASL_PLAINTEXT"): return None ssl_context = ssl.SSLContext(ssl.PROTOCOL_TLS) ssl_context.options |= ssl.OP_NO_SSLv2 @@ -271,30 +269,30 @@ def create_client_ssl_context(config: Config) -> ssl.SSLContext | None: ssl_context.options |= ssl.OP_NO_TLSv1 ssl_context.options |= ssl.OP_NO_TLSv1_1 ssl_context.verify_mode = ssl.CERT_OPTIONAL - if config["ssl_check_hostname"]: + if config.ssl_check_hostname: ssl_context.check_hostname = True - if config["ssl_cafile"]: - ssl_context.load_verify_locations(config["ssl_cafile"]) + if config.ssl_cafile: + ssl_context.load_verify_locations(config.ssl_cafile) ssl_context.verify_mode = ssl.CERT_REQUIRED - if config["ssl_certfile"] and config["ssl_keyfile"]: + if config.ssl_certfile and config.ssl_keyfile: ssl_context.load_cert_chain( - certfile=config["ssl_certfile"], - keyfile=config["ssl_keyfile"], - password=config["ssl_password"], + certfile=config.ssl_certfile, + keyfile=config.ssl_keyfile, + password=config.ssl_password, ) - if config["ssl_crlfile"]: + if config.ssl_crlfile: if not hasattr(ssl, "VERIFY_CRL_CHECK_LEAF"): raise RuntimeError("This version of Python does not support ssl_crlfile!") - ssl_context.load_verify_locations(config["ssl_crlfile"]) + ssl_context.load_verify_locations(config.ssl_crlfile) ssl_context.verify_flags |= ssl.VERIFY_CRL_CHECK_LEAF - if config.get("ssl_ciphers"): - ssl_context.set_ciphers(config["ssl_ciphers"]) + if config.ssl_ciphers: + ssl_context.set_ciphers(config.ssl_ciphers) return ssl_context def create_server_ssl_context(config: Config) -> ssl.SSLContext | None: - tls_certfile = config["server_tls_certfile"] - tls_keyfile = config["server_tls_keyfile"] + tls_certfile = config.server_tls_certfile + tls_keyfile = config.server_tls_keyfile if tls_certfile is None: if tls_keyfile is None: # Neither config value set, do not use TLS diff --git a/src/karapace/container.py b/src/karapace/container.py new file mode 100644 index 000000000..03b97126a --- /dev/null +++ b/src/karapace/container.py @@ -0,0 +1,37 @@ +""" +Copyright (c) 2024 Aiven Ltd +See LICENSE for details +""" + +from dependency_injector import containers, providers +from karapace.auth.auth import get_authorizer, HTTPAuthorizer, NoAuthAndAuthz +from karapace.config import Config +from karapace.forward_client import ForwardClient +from karapace.schema_registry import KarapaceSchemaRegistry +from karapace.statsd import StatsClient + + +class KarapaceContainer(containers.DeclarativeContainer): + base_config = providers.Configuration() + config = providers.Singleton( + Config, + _env_file=base_config.karapace.env_file, + _env_file_encoding=base_config.karapace.env_file_encoding, + ) + + statsd = providers.Singleton(StatsClient, config=config) + + no_auth_authorizer = providers.Singleton(NoAuthAndAuthz) + + http_authorizer = providers.Singleton(HTTPAuthorizer, config=config) + + schema_registry = providers.Singleton(KarapaceSchemaRegistry, config=config) + + forward_client = providers.Singleton(ForwardClient) + + authorizer = providers.Factory( + get_authorizer, + config=config, + http_authorizer=http_authorizer, + no_auth_authorizer=no_auth_authorizer, + ) diff --git a/src/karapace/karapace_all.py b/src/karapace/karapace_all.py index c176bc337..36b91b42a 100644 --- a/src/karapace/karapace_all.py +++ b/src/karapace/karapace_all.py @@ -4,58 +4,29 @@ """ from __future__ import annotations -from contextlib import closing +from dependency_injector.wiring import inject, Provide from karapace import version as karapace_version -from karapace.config import read_config +from karapace.config import Config, KARAPACE_BASE_CONFIG_YAML_PATH +from karapace.container import KarapaceContainer from karapace.instrumentation.prometheus import PrometheusInstrumentation from karapace.kafka_rest_apis import KafkaRest -from karapace.logging import configure_logging -from karapace.rapu import RestApp -from schema_registry.schema_registry_apis import KarapaceSchemaRegistryController +from karapace.logging import configure_logging, log_config_without_secrets import argparse import logging import sys -class KarapaceAll(KafkaRest, KarapaceSchemaRegistryController): - pass - - -def main() -> int: +@inject +def main(config: Config = Provide[KarapaceContainer.config]) -> int: parser = argparse.ArgumentParser(prog="karapace", description="Karapace: Your Kafka essentials in one tool") parser.add_argument("--version", action="version", help="show program version", version=karapace_version.__version__) - parser.add_argument("config_file", help="configuration file path", type=argparse.FileType()) - arg = parser.parse_args() - - with closing(arg.config_file): - config = read_config(arg.config_file) - + parser.parse_args() configure_logging(config=config) + log_config_without_secrets(config=config) - app: RestApp - if config["karapace_rest"] and config["karapace_registry"]: - info_str = "both services" - app = KarapaceAll(config=config) - elif config["karapace_rest"]: - info_str = "karapace rest" - app = KafkaRest(config=config) - elif config["karapace_registry"]: - info_str = "karapace schema registry" - app = KarapaceSchemaRegistryController(config=config) - else: - print("Both rest and registry options are disabled, exiting") - return 1 - - info_str_separator = "=" * 100 - logging.log(logging.INFO, "\n%s\nStarting %s\n%s", info_str_separator, info_str, info_str_separator) - - config_without_secrets = {} - for key, value in config.items(): - if "password" in key: - value = "****" - config_without_secrets[key] = value - logging.log(logging.DEBUG, "Config %r", config_without_secrets) + logging.info("\n%s\nStarting %s\n%s", ("=" * 100), "Starting Karapace Rest Proxy", ("=" * 100)) + app = KafkaRest(config=config) try: PrometheusInstrumentation.setup_metrics(app=app) @@ -67,4 +38,11 @@ def main() -> int: if __name__ == "__main__": + container = KarapaceContainer() + container.base_config.from_yaml(KARAPACE_BASE_CONFIG_YAML_PATH, envs_required=True, required=True) + container.wire( + modules=[ + __name__, + ] + ) sys.exit(main()) diff --git a/src/schema_registry/__main__.py b/src/schema_registry/__main__.py index 31a972576..78b58b48a 100644 --- a/src/schema_registry/__main__.py +++ b/src/schema_registry/__main__.py @@ -4,41 +4,41 @@ """ from collections.abc import AsyncGenerator from contextlib import asynccontextmanager -from fastapi import FastAPI, Depends +from dependency_injector.wiring import inject, Provide +from fastapi import Depends, FastAPI from karapace import version as karapace_version from karapace.auth.auth import AuthenticatorAndAuthorizer -from karapace.auth.dependencies import AuthorizationDependencyManager -from karapace.config import Config -from schema_registry.dependencies.schema_registry_dependency import SchemaRegistryDependencyManager -from schema_registry.dependencies.stats_dependeny import StatsDependencyManager +from karapace.config import Config, KARAPACE_BASE_CONFIG_YAML_PATH +from karapace.container import KarapaceContainer from karapace.logging import configure_logging, log_config_without_secrets from karapace.schema_registry import KarapaceSchemaRegistry +from karapace.statsd import StatsClient from opentelemetry.instrumentation.fastapi import FastAPIInstrumentor +from schema_registry.container import SchemaRegistryContainer from schema_registry.http_handlers import setup_exception_handlers from schema_registry.middlewares import setup_middlewares -from schema_registry.routers import setup_routers -from typing import Final -from schema_registry.container import SchemaRegistryContainer -from dependency_injector.wiring import Provide, inject +from schema_registry.routers import compatibility, config, health, mode, schemas, subjects +from schema_registry.routers.setup import setup_routers import logging +import schema_registry.schema_registry_apis +import schema_registry.user import uvicorn -from pathlib import Path - -SCHEMA_REGISTRY_ROOT: Final[Path] = Path(__file__).parent @asynccontextmanager -async def lifespan(_: FastAPI) -> AsyncGenerator[None, None]: - schema_registry: KarapaceSchemaRegistry | None = None - authorizer: AuthenticatorAndAuthorizer | None = None +@inject +async def lifespan( + _: FastAPI, + stastd: StatsClient = Depends(Provide[KarapaceContainer.statsd]), + schema_registry: KarapaceSchemaRegistry = Depends(Provide[KarapaceContainer.schema_registry]), + authorizer: AuthenticatorAndAuthorizer = Depends(Provide[KarapaceContainer.authorizer]), +) -> AsyncGenerator[None, None]: try: - schema_registry = await SchemaRegistryDependencyManager.get_schema_registry() await schema_registry.start() await schema_registry.get_master() - authorizer = AuthorizationDependencyManager.get_authorizer() - if authorizer is not None: - await authorizer.start(StatsDependencyManager.get_stats()) + await authorizer.start(stats=stastd) + yield finally: if schema_registry: @@ -48,7 +48,7 @@ async def lifespan(_: FastAPI) -> AsyncGenerator[None, None]: @inject -def create_karapace_application(*, config: Config = Depends(Provide[SchemaRegistryContainer.config])) -> FastAPI: +def create_karapace_application(*, config: Config = Depends(Provide[KarapaceContainer.config])) -> FastAPI: configure_logging(config=config) log_config_without_secrets(config=config) logging.info("Starting Karapace Schema Registry (%s)", karapace_version.__version__) @@ -64,14 +64,30 @@ def create_karapace_application(*, config: Config = Depends(Provide[SchemaRegist if __name__ == "__main__": - container = SchemaRegistryContainer() - container.base_config.from_yaml(f"{SCHEMA_REGISTRY_ROOT / 'base_config.yaml'}", envs_required=True, required=True) - container.wire(modules=[__name__,]) + container = KarapaceContainer() + container.base_config.from_yaml(KARAPACE_BASE_CONFIG_YAML_PATH, envs_required=True, required=True) + container.wire( + modules=[ + __name__, + schema_registry.schema_registry_apis, + schema_registry.user, + ] + ) + + schema_registry_container = SchemaRegistryContainer(karapace_container=container) + schema_registry_container.wire( + modules=[ + __name__, + health, + mode, + compatibility, + schemas, + subjects, + config, + ] + ) app = create_karapace_application() uvicorn.run( - app, - host=container.config().host, - port=container.config().port, - log_level=container.config().log_level.lower() + app, host=container.config().host, port=container.config().port, log_level=container.config().log_level.lower() ) diff --git a/src/schema_registry/container.py b/src/schema_registry/container.py index ad66de53e..b93bc4139 100644 --- a/src/schema_registry/container.py +++ b/src/schema_registry/container.py @@ -4,15 +4,15 @@ """ from dependency_injector import containers, providers -from dependency_injector.wiring import Provide, inject - -from karapace.config import Config +from karapace.container import KarapaceContainer +from schema_registry.schema_registry_apis import KarapaceSchemaRegistryController class SchemaRegistryContainer(containers.DeclarativeContainer): - base_config = providers.Configuration() - config = providers.Singleton( - Config, - _env_file=base_config.karapace.env_file, - _env_file_encoding=base_config.karapace.env_file_encoding, + karapace_container = providers.Container(KarapaceContainer) + schema_registry_controller = providers.Singleton( + KarapaceSchemaRegistryController, + config=karapace_container.config, + schema_registry=karapace_container.schema_registry, + stats=karapace_container.statsd, ) diff --git a/src/schema_registry/dependencies/config_dependency.py b/src/schema_registry/dependencies/config_dependency.py deleted file mode 100644 index 9c299b725..000000000 --- a/src/schema_registry/dependencies/config_dependency.py +++ /dev/null @@ -1,23 +0,0 @@ -""" -Copyright (c) 2024 Aiven Ltd -See LICENSE for details -""" - -from fastapi import Depends -from karapace.config import Config -from typing import Annotated - -import os - -env_file = os.environ.get("KARAPACE_DOTENV", None) - - -class ConfigDependencyManager: - CONFIG = Config(_env_file=env_file, _env_file_encoding="utf-8") - - @classmethod - def get_config(cls) -> Config: - return ConfigDependencyManager.CONFIG - - -ConfigDep = Annotated[Config, Depends(ConfigDependencyManager.get_config)] diff --git a/src/schema_registry/dependencies/controller_dependency.py b/src/schema_registry/dependencies/controller_dependency.py deleted file mode 100644 index 60da9bf29..000000000 --- a/src/schema_registry/dependencies/controller_dependency.py +++ /dev/null @@ -1,23 +0,0 @@ -""" -Copyright (c) 2024 Aiven Ltd -See LICENSE for details -""" - - -from fastapi import Depends -from schema_registry.dependencies.config_dependency import ConfigDep -from schema_registry.dependencies.schema_registry_dependency import SchemaRegistryDep -from schema_registry.dependencies.stats_dependeny import StatsDep -from schema_registry.schema_registry_apis import KarapaceSchemaRegistryController -from typing import Annotated - - -async def get_controller( - config: ConfigDep, - stats: StatsDep, - schema_registry: SchemaRegistryDep, -) -> KarapaceSchemaRegistryController: - return KarapaceSchemaRegistryController(config=config, schema_registry=schema_registry, stats=stats) - - -KarapaceSchemaRegistryControllerDep = Annotated[KarapaceSchemaRegistryController, Depends(get_controller)] diff --git a/src/schema_registry/dependencies/forward_client_dependency.py b/src/schema_registry/dependencies/forward_client_dependency.py deleted file mode 100644 index 57459c371..000000000 --- a/src/schema_registry/dependencies/forward_client_dependency.py +++ /dev/null @@ -1,20 +0,0 @@ -""" -Copyright (c) 2024 Aiven Ltd -See LICENSE for details -""" - -from fastapi import Depends -from karapace.forward_client import ForwardClient -from typing import Annotated - -FORWARD_CLIENT: ForwardClient | None = None - - -def get_forward_client() -> ForwardClient: - global FORWARD_CLIENT - if not FORWARD_CLIENT: - FORWARD_CLIENT = ForwardClient() - return FORWARD_CLIENT - - -ForwardClientDep = Annotated[ForwardClient, Depends(get_forward_client)] diff --git a/src/schema_registry/dependencies/schema_registry_dependency.py b/src/schema_registry/dependencies/schema_registry_dependency.py deleted file mode 100644 index 4823a4cc1..000000000 --- a/src/schema_registry/dependencies/schema_registry_dependency.py +++ /dev/null @@ -1,24 +0,0 @@ -""" -Copyright (c) 2024 Aiven Ltd -See LICENSE for details -""" - -from fastapi import Depends -from schema_registry.dependencies.config_dependency import ConfigDependencyManager -from karapace.schema_registry import KarapaceSchemaRegistry -from typing import Annotated - - -class SchemaRegistryDependencyManager: - SCHEMA_REGISTRY: KarapaceSchemaRegistry | None = None - - @classmethod - async def get_schema_registry(cls) -> KarapaceSchemaRegistry: - if not SchemaRegistryDependencyManager.SCHEMA_REGISTRY: - SchemaRegistryDependencyManager.SCHEMA_REGISTRY = KarapaceSchemaRegistry( - config=ConfigDependencyManager.get_config() - ) - return SchemaRegistryDependencyManager.SCHEMA_REGISTRY - - -SchemaRegistryDep = Annotated[KarapaceSchemaRegistry, Depends(SchemaRegistryDependencyManager.get_schema_registry)] diff --git a/src/schema_registry/dependencies/stats_dependeny.py b/src/schema_registry/dependencies/stats_dependeny.py deleted file mode 100644 index b619443c5..000000000 --- a/src/schema_registry/dependencies/stats_dependeny.py +++ /dev/null @@ -1,23 +0,0 @@ -""" -Copyright (c) 2024 Aiven Ltd -See LICENSE for details -""" - - -from fastapi import Depends -from schema_registry.dependencies.config_dependency import ConfigDependencyManager -from karapace.statsd import StatsClient -from typing import Annotated - - -class StatsDependencyManager: - STATS_CLIENT: StatsClient | None = None - - @classmethod - def get_stats(cls) -> StatsClient: - if not StatsDependencyManager.STATS_CLIENT: - StatsDependencyManager.STATS_CLIENT = StatsClient(config=ConfigDependencyManager.get_config()) - return StatsDependencyManager.STATS_CLIENT - - -StatsDep = Annotated[StatsClient, Depends(StatsDependencyManager.get_stats)] diff --git a/src/schema_registry/routers/__init__.py b/src/schema_registry/routers/__init__.py index e077a1551..f53be7121 100644 --- a/src/schema_registry/routers/__init__.py +++ b/src/schema_registry/routers/__init__.py @@ -2,22 +2,3 @@ Copyright (c) 2024 Aiven Ltd See LICENSE for details """ - -from fastapi import FastAPI -from schema_registry.routers.compatibility_router import compatibility_router -from schema_registry.routers.config_router import config_router -from schema_registry.routers.health_router import health_router -from schema_registry.routers.mode_router import mode_router -from schema_registry.routers.root_router import root_router -from schema_registry.routers.schemas_router import schemas_router -from schema_registry.routers.subjects_router import subjects_router - - -def setup_routers(app: FastAPI) -> None: - app.include_router(compatibility_router) - app.include_router(config_router) - app.include_router(health_router) - app.include_router(mode_router) - app.include_router(root_router) - app.include_router(schemas_router) - app.include_router(subjects_router) diff --git a/src/schema_registry/routers/compatibility_router.py b/src/schema_registry/routers/compatibility.py similarity index 56% rename from src/schema_registry/routers/compatibility_router.py rename to src/schema_registry/routers/compatibility.py index 108773981..9a721af33 100644 --- a/src/schema_registry/routers/compatibility_router.py +++ b/src/schema_registry/routers/compatibility.py @@ -3,13 +3,16 @@ See LICENSE for details """ -from fastapi import APIRouter -from karapace.auth.auth import Operation -from karapace.auth.dependencies import AuthenticatorAndAuthorizerDep, CurrentUserDep -from schema_registry.dependencies.controller_dependency import KarapaceSchemaRegistryControllerDep +from dependency_injector.wiring import inject, Provide +from fastapi import APIRouter, Depends +from karapace.auth.auth import AuthenticatorAndAuthorizer, Operation, User from karapace.typing import Subject +from schema_registry.container import SchemaRegistryContainer from schema_registry.routers.errors import unauthorized from schema_registry.routers.requests import CompatibilityCheckResponse, SchemaRequest +from schema_registry.schema_registry_apis import KarapaceSchemaRegistryController +from schema_registry.user import get_current_user +from typing import Annotated compatibility_router = APIRouter( prefix="/compatibility", @@ -19,13 +22,14 @@ @compatibility_router.post("/subjects/{subject}/versions/{version}", response_model_exclude_none=True) +@inject async def compatibility_post( - controller: KarapaceSchemaRegistryControllerDep, - user: CurrentUserDep, - authorizer: AuthenticatorAndAuthorizerDep, subject: Subject, version: str, # TODO support actual Version object schema_request: SchemaRequest, + user: Annotated[User, Depends(get_current_user)], + authorizer: AuthenticatorAndAuthorizer = Depends(Provide[SchemaRegistryContainer.karapace_container.authorizer]), + controller: KarapaceSchemaRegistryController = Depends(Provide[SchemaRegistryContainer.schema_registry_controller]), ) -> CompatibilityCheckResponse: if authorizer and not authorizer.check_authorization(user, Operation.Read, f"Subject:{subject}"): raise unauthorized() diff --git a/src/schema_registry/routers/config_router.py b/src/schema_registry/routers/config.py similarity index 52% rename from src/schema_registry/routers/config_router.py rename to src/schema_registry/routers/config.py index e7d6c22b4..db0bf5e23 100644 --- a/src/schema_registry/routers/config_router.py +++ b/src/schema_registry/routers/config.py @@ -3,15 +3,18 @@ See LICENSE for details """ -from fastapi import APIRouter, Request -from karapace.auth.auth import Operation -from karapace.auth.dependencies import AuthenticatorAndAuthorizerDep, CurrentUserDep -from schema_registry.dependencies.controller_dependency import KarapaceSchemaRegistryControllerDep -from schema_registry.dependencies.forward_client_dependency import ForwardClientDep -from schema_registry.dependencies.schema_registry_dependency import SchemaRegistryDep +from dependency_injector.wiring import inject, Provide +from fastapi import APIRouter, Depends, Request +from karapace.auth.auth import AuthenticatorAndAuthorizer, Operation, User +from karapace.forward_client import ForwardClient +from karapace.schema_registry import KarapaceSchemaRegistry from karapace.typing import Subject +from schema_registry.container import SchemaRegistryContainer from schema_registry.routers.errors import no_primary_url_error, unauthorized from schema_registry.routers.requests import CompatibilityLevelResponse, CompatibilityRequest, CompatibilityResponse +from schema_registry.schema_registry_apis import KarapaceSchemaRegistryController +from schema_registry.user import get_current_user +from typing import Annotated config_router = APIRouter( prefix="/config", @@ -21,10 +24,11 @@ @config_router.get("") +@inject async def config_get( - controller: KarapaceSchemaRegistryControllerDep, - user: CurrentUserDep, - authorizer: AuthenticatorAndAuthorizerDep, + user: Annotated[User, Depends(get_current_user)], + authorizer: AuthenticatorAndAuthorizer = Depends(Provide[SchemaRegistryContainer.karapace_container.authorizer]), + controller: KarapaceSchemaRegistryController = Depends(Provide[SchemaRegistryContainer.schema_registry_controller]), ) -> CompatibilityLevelResponse: if authorizer and not authorizer.check_authorization(user, Operation.Read, "Config:"): raise unauthorized() @@ -33,14 +37,15 @@ async def config_get( @config_router.put("") +@inject async def config_put( request: Request, - controller: KarapaceSchemaRegistryControllerDep, - schema_registry: SchemaRegistryDep, - forward_client: ForwardClientDep, - user: CurrentUserDep, - authorizer: AuthenticatorAndAuthorizerDep, compatibility_level_request: CompatibilityRequest, + user: Annotated[User, Depends(get_current_user)], + schema_registry: KarapaceSchemaRegistry = Depends(Provide[SchemaRegistryContainer.karapace_container.schema_registry]), + forward_client: ForwardClient = Depends(Provide[SchemaRegistryContainer.karapace_container.forward_client]), + authorizer: AuthenticatorAndAuthorizer = Depends(Provide[SchemaRegistryContainer.karapace_container.authorizer]), + controller: KarapaceSchemaRegistryController = Depends(Provide[SchemaRegistryContainer.schema_registry_controller]), ) -> CompatibilityResponse: if authorizer and not authorizer.check_authorization(user, Operation.Write, "Config:"): raise unauthorized() @@ -56,11 +61,11 @@ async def config_put( @config_router.get("/{subject}") async def config_get_subject( - controller: KarapaceSchemaRegistryControllerDep, - user: CurrentUserDep, - authorizer: AuthenticatorAndAuthorizerDep, subject: Subject, + user: Annotated[User, Depends(get_current_user)], defaultToGlobal: bool = False, + authorizer: AuthenticatorAndAuthorizer = Depends(Provide[SchemaRegistryContainer.karapace_container.authorizer]), + controller: KarapaceSchemaRegistryController = Depends(Provide[SchemaRegistryContainer.schema_registry_controller]), ) -> CompatibilityLevelResponse: if authorizer and not authorizer.check_authorization(user, Operation.Read, f"Subject:{subject}"): raise unauthorized() @@ -71,13 +76,13 @@ async def config_get_subject( @config_router.put("/{subject}") async def config_set_subject( request: Request, - controller: KarapaceSchemaRegistryControllerDep, - schema_registry: SchemaRegistryDep, - forward_client: ForwardClientDep, - user: CurrentUserDep, - authorizer: AuthenticatorAndAuthorizerDep, subject: Subject, compatibility_level_request: CompatibilityRequest, + user: Annotated[User, Depends(get_current_user)], + schema_registry: KarapaceSchemaRegistry = Depends(Provide[SchemaRegistryContainer.karapace_container.schema_registry]), + forward_client: ForwardClient = Depends(Provide[SchemaRegistryContainer.karapace_container.forward_client]), + authorizer: AuthenticatorAndAuthorizer = Depends(Provide[SchemaRegistryContainer.karapace_container.authorizer]), + controller: KarapaceSchemaRegistryController = Depends(Provide[SchemaRegistryContainer.schema_registry_controller]), ) -> CompatibilityResponse: if authorizer and not authorizer.check_authorization(user, Operation.Write, f"Subject:{subject}"): raise unauthorized() @@ -94,12 +99,12 @@ async def config_set_subject( @config_router.delete("/{subject}") async def config_delete_subject( request: Request, - controller: KarapaceSchemaRegistryControllerDep, - schema_registry: SchemaRegistryDep, - forward_client: ForwardClientDep, - user: CurrentUserDep, - authorizer: AuthenticatorAndAuthorizerDep, subject: Subject, + user: Annotated[User, Depends(get_current_user)], + schema_registry: KarapaceSchemaRegistry = Depends(Provide[SchemaRegistryContainer.karapace_container.schema_registry]), + forward_client: ForwardClient = Depends(Provide[SchemaRegistryContainer.karapace_container.forward_client]), + authorizer: AuthenticatorAndAuthorizer = Depends(Provide[SchemaRegistryContainer.karapace_container.authorizer]), + controller: KarapaceSchemaRegistryController = Depends(Provide[SchemaRegistryContainer.schema_registry_controller]), ) -> CompatibilityResponse: if authorizer and not authorizer.check_authorization(user, Operation.Write, f"Subject:{subject}"): raise unauthorized() diff --git a/src/schema_registry/routers/health_router.py b/src/schema_registry/routers/health.py similarity index 84% rename from src/schema_registry/routers/health_router.py rename to src/schema_registry/routers/health.py index 36a3c6975..df3a8822f 100644 --- a/src/schema_registry/routers/health_router.py +++ b/src/schema_registry/routers/health.py @@ -3,9 +3,11 @@ See LICENSE for details """ -from fastapi import APIRouter, HTTPException, status -from schema_registry.dependencies.schema_registry_dependency import SchemaRegistryDep +from dependency_injector.wiring import inject, Provide +from fastapi import APIRouter, Depends, HTTPException, status +from karapace.schema_registry import KarapaceSchemaRegistry from pydantic import BaseModel +from schema_registry.container import SchemaRegistryContainer class HealthStatus(BaseModel): @@ -33,8 +35,9 @@ class HealthCheck(BaseModel): @health_router.get("") +@inject async def health( - schema_registry: SchemaRegistryDep, + schema_registry: KarapaceSchemaRegistry = Depends(Provide[SchemaRegistryContainer.karapace_container.schema_registry]), ) -> HealthCheck: starttime = 0.0 if schema_registry.schema_reader.ready: diff --git a/src/schema_registry/routers/mode.py b/src/schema_registry/routers/mode.py new file mode 100644 index 000000000..e7933d050 --- /dev/null +++ b/src/schema_registry/routers/mode.py @@ -0,0 +1,46 @@ +""" +Copyright (c) 2024 Aiven Ltd +See LICENSE for details +""" + +from dependency_injector.wiring import inject, Provide +from fastapi import APIRouter, Depends +from karapace.auth.auth import AuthenticatorAndAuthorizer, Operation, User +from karapace.typing import Subject +from schema_registry.container import SchemaRegistryContainer +from schema_registry.routers.errors import unauthorized +from schema_registry.schema_registry_apis import KarapaceSchemaRegistryController +from schema_registry.user import get_current_user +from typing import Annotated + +mode_router = APIRouter( + prefix="/mode", + tags=["mode"], + responses={404: {"description": "Not found"}}, +) + + +@mode_router.get("") +@inject +async def mode_get( + user: Annotated[User, Depends(get_current_user)], + authorizer: AuthenticatorAndAuthorizer = Depends(Provide[SchemaRegistryContainer.karapace_container.authorizer]), + controller: KarapaceSchemaRegistryController = Depends(Provide[SchemaRegistryContainer.schema_registry_controller]), +): + if authorizer and not authorizer.check_authorization(user, Operation.Read, "Config:"): + raise unauthorized() + + return await controller.get_global_mode() + + +@mode_router.get("/{subject}") +async def mode_get_subject( + subject: Subject, + user: Annotated[User, Depends(get_current_user)], + authorizer: AuthenticatorAndAuthorizer = Depends(Provide[SchemaRegistryContainer.karapace_container.authorizer]), + controller: KarapaceSchemaRegistryController = Depends(Provide[SchemaRegistryContainer.schema_registry_controller]), +): + if authorizer and not authorizer.check_authorization(user, Operation.Read, f"Subject:{subject}"): + raise unauthorized() + + return await controller.get_subject_mode(subject=subject) diff --git a/src/schema_registry/routers/mode_router.py b/src/schema_registry/routers/mode_router.py deleted file mode 100644 index 5b6fca0c9..000000000 --- a/src/schema_registry/routers/mode_router.py +++ /dev/null @@ -1,42 +0,0 @@ -""" -Copyright (c) 2024 Aiven Ltd -See LICENSE for details -""" - -from fastapi import APIRouter -from karapace.auth.auth import Operation -from karapace.auth.dependencies import AuthenticatorAndAuthorizerDep, CurrentUserDep -from schema_registry.dependencies.controller_dependency import KarapaceSchemaRegistryControllerDep -from karapace.typing import Subject -from schema_registry.routers.errors import unauthorized - -mode_router = APIRouter( - prefix="/mode", - tags=["mode"], - responses={404: {"description": "Not found"}}, -) - - -@mode_router.get("") -async def mode_get( - controller: KarapaceSchemaRegistryControllerDep, - user: CurrentUserDep, - authorizer: AuthenticatorAndAuthorizerDep, -): - if authorizer and not authorizer.check_authorization(user, Operation.Read, "Config:"): - raise unauthorized() - - return await controller.get_global_mode() - - -@mode_router.get("/{subject}") -async def mode_get_subject( - controller: KarapaceSchemaRegistryControllerDep, - user: CurrentUserDep, - authorizer: AuthenticatorAndAuthorizerDep, - subject: Subject, -): - if authorizer and not authorizer.check_authorization(user, Operation.Read, f"Subject:{subject}"): - raise unauthorized() - - return await controller.get_subject_mode(subject=subject) diff --git a/src/schema_registry/routers/root_router.py b/src/schema_registry/routers/root.py similarity index 100% rename from src/schema_registry/routers/root_router.py rename to src/schema_registry/routers/root.py diff --git a/src/schema_registry/routers/schemas_router.py b/src/schema_registry/routers/schemas.py similarity index 57% rename from src/schema_registry/routers/schemas_router.py rename to src/schema_registry/routers/schemas.py index 048d52e15..30c84836b 100644 --- a/src/schema_registry/routers/schemas_router.py +++ b/src/schema_registry/routers/schemas.py @@ -3,10 +3,14 @@ See LICENSE for details """ -from fastapi import APIRouter -from karapace.auth.dependencies import AuthenticatorAndAuthorizerDep, CurrentUserDep -from schema_registry.dependencies.controller_dependency import KarapaceSchemaRegistryControllerDep +from dependency_injector.wiring import inject, Provide +from fastapi import APIRouter, Depends +from karapace.auth.auth import AuthenticatorAndAuthorizer, User +from schema_registry.container import SchemaRegistryContainer from schema_registry.routers.requests import SchemaListingItem, SchemasResponse, SubjectVersion +from schema_registry.schema_registry_apis import KarapaceSchemaRegistryController +from schema_registry.user import get_current_user +from typing import Annotated schemas_router = APIRouter( prefix="/schemas", @@ -17,12 +21,13 @@ # TODO is this needed? Is this actually the ids/schema/id/schema?? @schemas_router.get("") +@inject async def schemas_get_list( - controller: KarapaceSchemaRegistryControllerDep, - user: CurrentUserDep, - authorizer: AuthenticatorAndAuthorizerDep, + user: Annotated[User, Depends(get_current_user)], deleted: bool = False, latestOnly: bool = False, + authorizer: AuthenticatorAndAuthorizer = Depends(Provide[SchemaRegistryContainer.karapace_container.authorizer]), + controller: KarapaceSchemaRegistryController = Depends(Provide[SchemaRegistryContainer.schema_registry_controller]), ) -> list[SchemaListingItem]: return await controller.schemas_list( deleted=deleted, @@ -33,14 +38,15 @@ async def schemas_get_list( @schemas_router.get("/ids/{schema_id}", response_model_exclude_none=True) +@inject async def schemas_get( - controller: KarapaceSchemaRegistryControllerDep, - user: CurrentUserDep, - authorizer: AuthenticatorAndAuthorizerDep, + user: Annotated[User, Depends(get_current_user)], schema_id: str, # TODO: type to actual type includeSubjects: bool = False, # TODO: include subjects? fetchMaxId: bool = False, # TODO: fetch max id? format: str = "", + authorizer: AuthenticatorAndAuthorizer = Depends(Provide[SchemaRegistryContainer.karapace_container.authorizer]), + controller: KarapaceSchemaRegistryController = Depends(Provide[SchemaRegistryContainer.schema_registry_controller]), ) -> SchemasResponse: return await controller.schemas_get( schema_id=schema_id, @@ -61,12 +67,13 @@ async def schemas_get( @schemas_router.get("/ids/{schema_id}/versions") +@inject async def schemas_get_versions( - controller: KarapaceSchemaRegistryControllerDep, - user: CurrentUserDep, - authorizer: AuthenticatorAndAuthorizerDep, + user: Annotated[User, Depends(get_current_user)], schema_id: str, deleted: bool = False, + authorizer: AuthenticatorAndAuthorizer = Depends(Provide[SchemaRegistryContainer.karapace_container.authorizer]), + controller: KarapaceSchemaRegistryController = Depends(Provide[SchemaRegistryContainer.schema_registry_controller]), ) -> list[SubjectVersion]: return await controller.schemas_get_versions( schema_id=schema_id, @@ -77,7 +84,8 @@ async def schemas_get_versions( @schemas_router.get("/types") +@inject async def schemas_get_subjects_list( - controller: KarapaceSchemaRegistryControllerDep, + controller: KarapaceSchemaRegistryController = Depends(Provide[SchemaRegistryContainer.schema_registry_controller]), ) -> list[str]: return await controller.schemas_types() diff --git a/src/schema_registry/routers/setup.py b/src/schema_registry/routers/setup.py new file mode 100644 index 000000000..39a4c0149 --- /dev/null +++ b/src/schema_registry/routers/setup.py @@ -0,0 +1,23 @@ +""" +Copyright (c) 2024 Aiven Ltd +See LICENSE for details +""" + +from fastapi import FastAPI +from schema_registry.routers.compatibility import compatibility_router +from schema_registry.routers.config import config_router +from schema_registry.routers.health import health_router +from schema_registry.routers.mode import mode_router +from schema_registry.routers.root import root_router +from schema_registry.routers.schemas import schemas_router +from schema_registry.routers.subjects import subjects_router + + +def setup_routers(app: FastAPI) -> None: + app.include_router(compatibility_router) + app.include_router(config_router) + app.include_router(health_router) + app.include_router(mode_router) + app.include_router(root_router) + app.include_router(schemas_router) + app.include_router(subjects_router) diff --git a/src/schema_registry/routers/subjects_router.py b/src/schema_registry/routers/subjects.py similarity index 56% rename from src/schema_registry/routers/subjects_router.py rename to src/schema_registry/routers/subjects.py index 29e58e840..262b684ae 100644 --- a/src/schema_registry/routers/subjects_router.py +++ b/src/schema_registry/routers/subjects.py @@ -3,15 +3,18 @@ See LICENSE for details """ -from fastapi import APIRouter, Request -from karapace.auth.auth import Operation -from karapace.auth.dependencies import AuthenticatorAndAuthorizerDep, CurrentUserDep -from schema_registry.dependencies.controller_dependency import KarapaceSchemaRegistryControllerDep -from schema_registry.dependencies.forward_client_dependency import ForwardClientDep -from schema_registry.dependencies.schema_registry_dependency import SchemaRegistryDep +from dependency_injector.wiring import inject, Provide +from fastapi import APIRouter, Depends, Request +from karapace.auth.auth import AuthenticatorAndAuthorizer, Operation, User +from karapace.forward_client import ForwardClient +from karapace.schema_registry import KarapaceSchemaRegistry from karapace.typing import Subject +from schema_registry.container import SchemaRegistryContainer from schema_registry.routers.errors import no_primary_url_error, unauthorized from schema_registry.routers.requests import SchemaIdResponse, SchemaRequest, SchemaResponse, SubjectSchemaVersionResponse +from schema_registry.schema_registry_apis import KarapaceSchemaRegistryController +from schema_registry.user import get_current_user +from typing import Annotated import logging @@ -26,11 +29,12 @@ @subjects_router.get("") +@inject async def subjects_get( - controller: KarapaceSchemaRegistryControllerDep, - user: CurrentUserDep, - authorizer: AuthenticatorAndAuthorizerDep, + user: Annotated[User, Depends(get_current_user)], deleted: bool = False, + authorizer: AuthenticatorAndAuthorizer = Depends(Provide[SchemaRegistryContainer.karapace_container.authorizer]), + controller: KarapaceSchemaRegistryController = Depends(Provide[SchemaRegistryContainer.schema_registry_controller]), ) -> list[str]: return await controller.subjects_list( deleted=deleted, @@ -40,14 +44,15 @@ async def subjects_get( @subjects_router.post("/{subject}", response_model_exclude_none=True) +@inject async def subjects_subject_post( - controller: KarapaceSchemaRegistryControllerDep, - user: CurrentUserDep, - authorizer: AuthenticatorAndAuthorizerDep, subject: Subject, + user: Annotated[User, Depends(get_current_user)], schema_request: SchemaRequest, deleted: bool = False, normalize: bool = False, + authorizer: AuthenticatorAndAuthorizer = Depends(Provide[SchemaRegistryContainer.karapace_container.authorizer]), + controller: KarapaceSchemaRegistryController = Depends(Provide[SchemaRegistryContainer.schema_registry_controller]), ) -> SchemaResponse: if authorizer and not authorizer.check_authorization(user, Operation.Read, f"Subject:{subject}"): raise unauthorized() @@ -61,15 +66,16 @@ async def subjects_subject_post( @subjects_router.delete("/{subject}") +@inject async def subjects_subject_delete( request: Request, - controller: KarapaceSchemaRegistryControllerDep, - schema_registry: SchemaRegistryDep, - forward_client: ForwardClientDep, - user: CurrentUserDep, - authorizer: AuthenticatorAndAuthorizerDep, subject: Subject, + user: Annotated[User, Depends(get_current_user)], permanent: bool = False, + forward_client: ForwardClient = Depends(Provide[SchemaRegistryContainer.karapace_container.forward_client]), + authorizer: AuthenticatorAndAuthorizer = Depends(Provide[SchemaRegistryContainer.karapace_container.authorizer]), + schema_registry: KarapaceSchemaRegistry = Depends(Provide[SchemaRegistryContainer.karapace_container.schema_registry]), + controller: KarapaceSchemaRegistryController = Depends(Provide[SchemaRegistryContainer.schema_registry_controller]), ) -> list[int]: if authorizer and not authorizer.check_authorization(user, Operation.Write, f"Subject:{subject}"): raise unauthorized() @@ -84,15 +90,16 @@ async def subjects_subject_delete( @subjects_router.post("/{subject}/versions") +@inject async def subjects_subject_versions_post( request: Request, - controller: KarapaceSchemaRegistryControllerDep, - forward_client: ForwardClientDep, - user: CurrentUserDep, - authorizer: AuthenticatorAndAuthorizerDep, subject: Subject, schema_request: SchemaRequest, + user: Annotated[User, Depends(get_current_user)], + forward_client: ForwardClient = Depends(Provide[SchemaRegistryContainer.karapace_container.forward_client]), + authorizer: AuthenticatorAndAuthorizer = Depends(Provide[SchemaRegistryContainer.karapace_container.authorizer]), normalize: bool = False, + controller: KarapaceSchemaRegistryController = Depends(Provide[SchemaRegistryContainer.schema_registry_controller]), ) -> SchemaIdResponse: if authorizer and not authorizer.check_authorization(user, Operation.Write, f"Subject:{subject}"): raise unauthorized() @@ -109,12 +116,13 @@ async def subjects_subject_versions_post( @subjects_router.get("/{subject}/versions") +@inject async def subjects_subject_versions_list( - controller: KarapaceSchemaRegistryControllerDep, - user: CurrentUserDep, - authorizer: AuthenticatorAndAuthorizerDep, subject: Subject, + user: Annotated[User, Depends(get_current_user)], deleted: bool = False, + authorizer: AuthenticatorAndAuthorizer = Depends(Provide[SchemaRegistryContainer.karapace_container.authorizer]), + controller: KarapaceSchemaRegistryController = Depends(Provide[SchemaRegistryContainer.schema_registry_controller]), ) -> list[int]: if authorizer and not authorizer.check_authorization(user, Operation.Read, f"Subject:{subject}"): raise unauthorized() @@ -123,13 +131,14 @@ async def subjects_subject_versions_list( @subjects_router.get("/{subject}/versions/{version}", response_model_exclude_none=True) +@inject async def subjects_subject_version_get( - controller: KarapaceSchemaRegistryControllerDep, - user: CurrentUserDep, - authorizer: AuthenticatorAndAuthorizerDep, subject: Subject, version: str, + user: Annotated[User, Depends(get_current_user)], deleted: bool = False, + authorizer: AuthenticatorAndAuthorizer = Depends(Provide[SchemaRegistryContainer.karapace_container.authorizer]), + controller: KarapaceSchemaRegistryController = Depends(Provide[SchemaRegistryContainer.schema_registry_controller]), ) -> SubjectSchemaVersionResponse: if authorizer and not authorizer.check_authorization(user, Operation.Read, f"Subject:{subject}"): raise unauthorized() @@ -138,16 +147,17 @@ async def subjects_subject_version_get( @subjects_router.delete("/{subject}/versions/{version}") +@inject async def subjects_subject_version_delete( request: Request, - controller: KarapaceSchemaRegistryControllerDep, - schema_registry: SchemaRegistryDep, - forward_client: ForwardClientDep, - user: CurrentUserDep, - authorizer: AuthenticatorAndAuthorizerDep, subject: Subject, version: str, + user: Annotated[User, Depends(get_current_user)], permanent: bool = False, + forward_client: ForwardClient = Depends(Provide[SchemaRegistryContainer.karapace_container.forward_client]), + authorizer: AuthenticatorAndAuthorizer = Depends(Provide[SchemaRegistryContainer.karapace_container.authorizer]), + schema_registry: KarapaceSchemaRegistry = Depends(Provide[SchemaRegistryContainer.karapace_container.schema_registry]), + controller: KarapaceSchemaRegistryController = Depends(Provide[SchemaRegistryContainer.schema_registry_controller]), ) -> int: if authorizer and not authorizer.check_authorization(user, Operation.Write, f"Subject:{subject}"): raise unauthorized() @@ -162,12 +172,13 @@ async def subjects_subject_version_delete( @subjects_router.get("/{subject}/versions/{version}/schema") +@inject async def subjects_subject_version_schema_get( - controller: KarapaceSchemaRegistryControllerDep, - user: CurrentUserDep, - authorizer: AuthenticatorAndAuthorizerDep, subject: Subject, version: str, + user: Annotated[User, Depends(get_current_user)], + authorizer: AuthenticatorAndAuthorizer = Depends(Provide[SchemaRegistryContainer.karapace_container.authorizer]), + controller: KarapaceSchemaRegistryController = Depends(Provide[SchemaRegistryContainer.schema_registry_controller]), ) -> dict: if authorizer and not authorizer.check_authorization(user, Operation.Read, f"Subject:{subject}"): raise unauthorized() @@ -176,12 +187,13 @@ async def subjects_subject_version_schema_get( @subjects_router.get("/{subject}/versions/{version}/referencedby") +@inject async def subjects_subject_version_referenced_by( - controller: KarapaceSchemaRegistryControllerDep, - user: CurrentUserDep, - authorizer: AuthenticatorAndAuthorizerDep, subject: Subject, version: str, + user: Annotated[User, Depends(get_current_user)], + authorizer: AuthenticatorAndAuthorizer = Depends(Provide[SchemaRegistryContainer.karapace_container.authorizer]), + controller: KarapaceSchemaRegistryController = Depends(Provide[SchemaRegistryContainer.schema_registry_controller]), ) -> list[int]: if authorizer and not authorizer.check_authorization(user, Operation.Read, f"Subject:{subject}"): raise unauthorized() diff --git a/src/schema_registry/schema_registry_apis.py b/src/schema_registry/schema_registry_apis.py index a11e29be2..e68bb15df 100644 --- a/src/schema_registry/schema_registry_apis.py +++ b/src/schema_registry/schema_registry_apis.py @@ -5,13 +5,14 @@ from __future__ import annotations from avro.errors import SchemaParseException -from fastapi import HTTPException, Request, Response, status -from karapace.auth.auth import Operation, User -from karapace.auth.dependencies import AuthenticatorAndAuthorizerDep +from dependency_injector.wiring import inject, Provide +from fastapi import Depends, HTTPException, Request, Response, status +from karapace.auth.auth import AuthenticatorAndAuthorizer, Operation, User from karapace.compatibility import CompatibilityModes from karapace.compatibility.jsonschema.checks import is_incompatible from karapace.compatibility.schema_compatibility import SchemaCompatibility from karapace.config import Config +from karapace.container import KarapaceContainer from karapace.errors import ( IncompatibleSchema, InvalidReferences, @@ -141,13 +142,14 @@ async def compatibility_check( return CompatibilityCheckResponse(is_compatible=False, messages=list(result.messages)) return CompatibilityCheckResponse(is_compatible=True) + @inject async def schemas_list( self, *, deleted: bool, latest_only: bool, user: User | None, - authorizer: AuthenticatorAndAuthorizerDep | None, + authorizer: AuthenticatorAndAuthorizer = Depends(Provide[KarapaceContainer.authorizer]), ) -> list[SchemaListingItem]: schemas = await self.schema_registry.schemas_list(include_deleted=deleted, latest_only=latest_only) response_schemas: list[SchemaListingItem] = [] @@ -171,6 +173,7 @@ async def schemas_list( return response_schemas + @inject async def schemas_get( self, *, @@ -179,7 +182,7 @@ async def schemas_get( include_subjects: bool, format_serialized: str, user: User | None, - authorizer: AuthenticatorAndAuthorizerDep, + authorizer: AuthenticatorAndAuthorizer = Depends(Provide[KarapaceContainer.authorizer]), ) -> SchemasResponse: try: parsed_schema_id = SchemaId(int(schema_id)) @@ -249,13 +252,14 @@ def _has_subject_with_id() -> bool: maxId=maxId, ) + @inject async def schemas_get_versions( self, *, schema_id: str, deleted: bool, user: User | None, - authorizer: AuthenticatorAndAuthorizerDep, + authorizer: AuthenticatorAndAuthorizer = Depends(Provide[KarapaceContainer.authorizer]), ) -> list[SubjectVersion]: try: schema_id_int = SchemaId(int(schema_id)) @@ -370,11 +374,12 @@ async def config_subject_delete( self.schema_registry.send_config_subject_delete_message(subject=Subject(subject)) return CompatibilityResponse(compatibility=self.schema_registry.schema_reader.config.compatibility) + @inject async def subjects_list( self, deleted: bool, user: User | None, - authorizer: AuthenticatorAndAuthorizerDep | None, + authorizer: AuthenticatorAndAuthorizer = Depends(Provide[KarapaceContainer.authorizer]), ) -> list[str]: subjects = [str(subject) for subject in self.schema_registry.database.find_subjects(include_deleted=deleted)] if authorizer: diff --git a/src/schema_registry/user.py b/src/schema_registry/user.py new file mode 100644 index 000000000..e1d195d3e --- /dev/null +++ b/src/schema_registry/user.py @@ -0,0 +1,31 @@ +from dependency_injector.wiring import inject, Provide +from fastapi import Depends, HTTPException, Security, status +from fastapi.security import HTTPBasic, HTTPBasicCredentials +from karapace.auth.auth import AuthenticationError, AuthenticatorAndAuthorizer, User +from karapace.container import KarapaceContainer +from typing import Annotated, Optional + + +@inject +async def get_current_user( + credentials: Annotated[Optional[HTTPBasicCredentials], Security(HTTPBasic(auto_error=False))], + authorizer: AuthenticatorAndAuthorizer = Depends(Provide[KarapaceContainer.authorizer]), +) -> User: + if authorizer and not credentials: + raise HTTPException( + status_code=status.HTTP_401_UNAUTHORIZED, + detail={"message": "Unauthorized"}, + headers={"WWW-Authenticate": 'Basic realm="Karapace Schema Registry"'}, + ) + assert authorizer is not None + assert credentials is not None + username: str = credentials.username + password: str = credentials.password + try: + return authorizer.authenticate(username=username, password=password) + except AuthenticationError: + raise HTTPException( + status_code=status.HTTP_401_UNAUTHORIZED, + detail={"message": "Unauthorized"}, + headers={"WWW-Authenticate": 'Basic realm="Karapace Schema Registry"'}, + )