Skip to content

Commit

Permalink
Refactor component handlers (#3)
Browse files Browse the repository at this point in the history
  • Loading branch information
disrupted authored Dec 21, 2022
1 parent 32b504e commit d16651f
Show file tree
Hide file tree
Showing 66 changed files with 465 additions and 362 deletions.
1 change: 1 addition & 0 deletions config.yaml
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
environment: development
broker: "http://k8kafka-cp-kafka-headless.kpops.svc.cluster.local:9092"
kafka_connect_host: "http://localhost:8083"
kafka_rest_host: "http://localhost:8082"
schema_registry_url: "http://localhost:8081"
5 changes: 0 additions & 5 deletions kpops/cli/exception.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,2 @@
class PipelineStateNotInitializedException(Exception):
def __init__(self, message="PipelineState was not initialized correctly"):
super().__init__(message)


class ClassNotFoundError(Exception):
"""Similar to builtin ModuleNotFoundError; class doesn't exist inside module."""
68 changes: 23 additions & 45 deletions kpops/cli/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,13 @@

from kpops.cli.custom_formatter import CustomFormatter
from kpops.cli.pipeline_config import ENV_PREFIX, PipelineConfig
from kpops.cli.pipeline_handlers import PipelineHandlers
from kpops.cli.registry import Registry
from kpops.pipeline_deployer.kafka_connect.handler import ConnectorHandler
from kpops.pipeline_deployer.schema_handler.schema_handler import SchemaHandler
from kpops.pipeline_deployer.streams_bootstrap.handler import AppHandler
from kpops.pipeline_deployer.topic.handler import TopicHandler
from kpops.pipeline_deployer.topic.proxy_wrapper import ProxyWrapper
from kpops.component_handlers import ComponentHandlers
from kpops.component_handlers.kafka_connect.handler import ConnectorHandler
from kpops.component_handlers.schema_handler.schema_handler import SchemaHandler
from kpops.component_handlers.streams_bootstrap.handler import AppHandler
from kpops.component_handlers.topic.handler import TopicHandler
from kpops.component_handlers.topic.proxy_wrapper import ProxyWrapper
from kpops.pipeline_generator.pipeline import Pipeline

if TYPE_CHECKING:
Expand Down Expand Up @@ -90,7 +90,7 @@
log = logging.getLogger("")


def enrich_pipeline(
def setup_pipeline(
pipeline_base_dir: Path,
pipeline_path: Path,
components_module: str | None,
Expand All @@ -101,28 +101,23 @@ def enrich_pipeline(
registry.find_components(components_module)
registry.find_components("kpops.components")

handlers = setup_handlers(components_module, pipeline_config)
return Pipeline.load_from_yaml(
base_dir=pipeline_base_dir,
path=pipeline_path,
registry=registry,
config=pipeline_config,
pipeline_base_dir, pipeline_path, registry, pipeline_config, handlers
)


def setup_handlers(
components_module: str | None, config: PipelineConfig
) -> PipelineHandlers:
) -> ComponentHandlers:
schema_handler = SchemaHandler.load_schema_handler(components_module, config)
app_handler = AppHandler.from_pipeline_config(pipeline_config=config)
connector_handler = ConnectorHandler.from_pipeline_config(pipeline_config=config)
wrapper = ProxyWrapper(pipeline_config=config)
topic_handler = TopicHandler(proxy_wrapper=wrapper)

return PipelineHandlers(
schema_handler=schema_handler,
app_handler=app_handler,
connector_handler=connector_handler,
topic_handler=topic_handler,
app_handler = AppHandler.from_pipeline_config(config)
connector_handler = ConnectorHandler.from_pipeline_config(config)
proxy_wrapper = ProxyWrapper(config)
topic_handler = TopicHandler(proxy_wrapper)

return ComponentHandlers(
schema_handler, app_handler, connector_handler, topic_handler
)


Expand Down Expand Up @@ -196,27 +191,13 @@ def run_destroy_clean_reset(
):
pipeline_config = create_pipeline_config(config, defaults, verbose)
pipeline = setup_pipeline(
pipeline_base_dir, components_module, pipeline_config, pipeline_path
pipeline_base_dir, pipeline_path, components_module, pipeline_config
)
pipeline_handlers = setup_handlers(components_module, config=pipeline_config)
for component in reversed(get_steps_to_apply(pipeline, steps)):
log_action("Destroy", component)
component.pipeline_handlers = pipeline_handlers
component.destroy(dry_run=dry_run, clean=clean, delete_outputs=delete_outputs)


def setup_pipeline(
pipeline_base_dir: Path,
components_module: str | None,
pipeline_config: PipelineConfig,
pipeline_path: Path,
) -> Pipeline:
pipeline = enrich_pipeline(
pipeline_base_dir, pipeline_path, components_module, pipeline_config
)
return pipeline


def create_pipeline_config(
config: Path, defaults: Path, verbose: bool
) -> PipelineConfig:
Expand Down Expand Up @@ -245,7 +226,7 @@ def generate(
verbose: bool = typer.Option(False, help="Enable verbose printing"),
):
pipeline_config = create_pipeline_config(config, defaults, verbose)
pipeline = enrich_pipeline(
pipeline = setup_pipeline(
pipeline_base_dir, pipeline_path, components_module, pipeline_config
)
if print_yaml:
Expand All @@ -272,17 +253,14 @@ def deploy(
):
pipeline_config = create_pipeline_config(config, defaults, verbose)
pipeline = setup_pipeline(
pipeline_base_dir, components_module, pipeline_config, pipeline_path
pipeline_base_dir, pipeline_path, components_module, pipeline_config
)

steps_to_apply = get_steps_to_apply(pipeline, steps)
# init handlers (like schema handler)
pipeline_handlers = setup_handlers(components_module, config=pipeline_config)

for component in steps_to_apply:
pipeline_component = component
pipeline_component.pipeline_handlers = pipeline_handlers
log_action("Deploy", pipeline_component)
pipeline_component.deploy(dry_run=dry_run)
log_action("Deploy", component)
component.deploy(dry_run=dry_run)


@app.command(help="Destroy pipeline steps")
Expand Down
2 changes: 1 addition & 1 deletion kpops/cli/pipeline_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
from pydantic import BaseConfig, BaseSettings, Field
from pydantic.env_settings import SettingsSourceCallable

from kpops.pipeline_deployer.streams_bootstrap.helm_wrapper import HelmConfig
from kpops.component_handlers.streams_bootstrap.helm_wrapper import HelmConfig
from kpops.utils.yaml_loading import load_yaml_file

ENV_PREFIX = "KPOPS_"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,21 +2,22 @@

from typing import TYPE_CHECKING

from kpops.pipeline_deployer.kafka_connect.handler import ConnectorHandler
from kpops.pipeline_deployer.streams_bootstrap.handler import AppHandler
from kpops.component_handlers.kafka_connect.handler import ConnectorHandler
from kpops.component_handlers.streams_bootstrap.handler import AppHandler

if TYPE_CHECKING:
from kpops.pipeline_deployer.schema_handler.schema_handler import SchemaHandler
from kpops.component_handlers.schema_handler.schema_handler import SchemaHandler
from kpops.component_handlers.topic.handler import TopicHandler


class PipelineHandlers:
class ComponentHandlers:
def __init__(
self,
schema_handler: SchemaHandler | None,
app_handler: AppHandler,
connector_handler: ConnectorHandler,
topic_handler,
):
topic_handler: TopicHandler,
) -> None:
self.schema_handler = schema_handler
self.app_handler = app_handler
self.connector_handler = connector_handler
Expand Down
File renamed without changes.
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,11 @@

import requests

from kpops.pipeline_deployer.kafka_connect.exception import (
from kpops.component_handlers.kafka_connect.exception import (
ConnectorNotFoundException,
KafkaConnectError,
)
from kpops.pipeline_deployer.kafka_connect.model import (
from kpops.component_handlers.kafka_connect.model import (
KafkaConnectConfig,
KafkaConnectConfigErrorResponse,
KafkaConnectResponse,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from kpops.pipeline_deployer.utils.exception import RequestsException
from kpops.component_handlers.utils.exception import RequestsException


class ConnectorNotFoundException(Exception):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,14 @@

from pydantic import BaseModel

from kpops.pipeline_deployer.kafka_connect.connect_wrapper import ConnectWrapper
from kpops.pipeline_deployer.kafka_connect.exception import ConnectorNotFoundException
from kpops.pipeline_deployer.kafka_connect.model import (
from kpops.component_handlers.kafka_connect.connect_wrapper import ConnectWrapper
from kpops.component_handlers.kafka_connect.exception import ConnectorNotFoundException
from kpops.component_handlers.kafka_connect.model import (
KafkaConnectConfig,
KafkaConnectorType,
)
from kpops.pipeline_deployer.kafka_connect.timeout import timeout
from kpops.pipeline_deployer.streams_bootstrap.helm_wrapper import (
from kpops.component_handlers.kafka_connect.timeout import timeout
from kpops.component_handlers.streams_bootstrap.helm_wrapper import (
HelmCommandConfig,
HelmWrapper,
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,11 @@
from kpops.cli.exception import ClassNotFoundError
from kpops.cli.pipeline_config import PipelineConfig
from kpops.cli.registry import find_class
from kpops.components.base_components.models.to_section import ToSection
from kpops.pipeline_deployer.schema_handler.schema_provider import (
from kpops.component_handlers.schema_handler.schema_provider import (
Schema,
SchemaProvider,
)
from kpops.components.base_components.models.to_section import ToSection
from kpops.utils.colorify import greenify, magentaify, yellowify

log = logging.getLogger("SchemaHandler")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,11 @@
from pathlib import Path
from typing import TYPE_CHECKING

from kpops.pipeline_deployer.streams_bootstrap.helm_wrapper import (
from kpops.component_handlers.streams_bootstrap.helm_wrapper import (
HelmCommandConfig,
HelmWrapper,
)
from kpops.pipeline_deployer.streams_bootstrap.streams_bootstrap_application_type import (
from kpops.component_handlers.streams_bootstrap.streams_bootstrap_application_type import (
ApplicationType,
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,9 @@
import yaml
from pydantic import BaseModel, BaseSettings, Field

from kpops.pipeline_deployer.streams_bootstrap.exception import ReleaseNotFoundException
from kpops.component_handlers.streams_bootstrap.exception import (
ReleaseNotFoundException,
)
from kpops.utils.dict_differ import render_diff

log = logging.getLogger("Helm")
Expand Down
File renamed without changes.
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from kpops.pipeline_deployer.utils.exception import RequestsException
from kpops.component_handlers.utils.exception import RequestsException


class TopicNotFoundException(Exception):
Expand Down
Original file line number Diff line number Diff line change
@@ -1,18 +1,18 @@
import logging

from kpops.components.base_components.models.to_section import TopicConfig, ToSection
from kpops.pipeline_deployer.topic.exception import TopicNotFoundException
from kpops.pipeline_deployer.topic.model import (
from kpops.component_handlers.topic.exception import TopicNotFoundException
from kpops.component_handlers.topic.model import (
TopicConfigResponse,
TopicResponse,
TopicSpec,
)
from kpops.pipeline_deployer.topic.proxy_wrapper import HEADERS, ProxyWrapper
from kpops.pipeline_deployer.topic.utils import (
from kpops.component_handlers.topic.proxy_wrapper import HEADERS, ProxyWrapper
from kpops.component_handlers.topic.utils import (
get_effective_config,
parse_and_compare_topic_configs,
parse_rest_proxy_topic_config,
)
from kpops.components.base_components.models.to_section import TopicConfig, ToSection
from kpops.utils.colorify import greenify, magentaify, yellowify
from kpops.utils.dict_differ import Diff, DiffType, get_diff, render_diff

Expand Down
File renamed without changes.
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,11 @@
import requests

from kpops.cli.pipeline_config import PipelineConfig
from kpops.pipeline_deployer.topic.exception import (
from kpops.component_handlers.topic.exception import (
KafkaRestProxyError,
TopicNotFoundException,
)
from kpops.pipeline_deployer.topic.model import (
from kpops.component_handlers.topic.model import (
BrokerConfigResponse,
TopicConfigResponse,
TopicResponse,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from kpops.pipeline_deployer.topic.model import (
from kpops.component_handlers.topic.model import (
BrokerConfigResponse,
KafkaTopicConfigSource,
TopicConfigResponse,
Expand Down
File renamed without changes.
File renamed without changes.
8 changes: 5 additions & 3 deletions kpops/components/base_components/base_defaults_component.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
from pydantic import BaseConfig, BaseModel, Field

from kpops.cli.pipeline_config import PipelineConfig
from kpops.component_handlers import ComponentHandlers
from kpops.utils.yaml_loading import load_yaml_file

log = logging.getLogger("PipelineComponentEnricher")
Expand All @@ -18,11 +19,12 @@
class BaseDefaultsComponent(BaseModel):
_type: str = Field(..., alias="type")

enrich: bool = False
config: PipelineConfig
enrich: bool = Field(default=False, exclude=True)
config: PipelineConfig = Field(default=..., exclude=True)
handlers: ComponentHandlers = Field(default=..., exclude=True)

class Config(BaseConfig):
fields = {"enrich": {"exclude": True}, "config": {"exclude": True}} # type: ignore
arbitrary_types_allowed = True

def __init__(self, **kwargs):
if kwargs.get("enrich", True):
Expand Down
Loading

0 comments on commit d16651f

Please sign in to comment.