diff --git a/src/asphalt/core/_component.py b/src/asphalt/core/_component.py index cafe090b..fe91ee9a 100644 --- a/src/asphalt/core/_component.py +++ b/src/asphalt/core/_component.py @@ -3,7 +3,6 @@ import logging import sys from abc import ABCMeta, abstractmethod -from collections import defaultdict from collections.abc import ( Awaitable, Coroutine, @@ -11,7 +10,7 @@ MutableMapping, Sequence, ) -from dataclasses import dataclass, field +from contextlib import AsyncExitStack from enum import Enum, auto from inspect import isawaitable, isclass from traceback import StackSummary @@ -28,14 +27,13 @@ ) from anyio import ( - Event, create_task_group, - move_on_after, + sleep, ) +from anyio.abc import TaskGroup from ._concurrent import ExceptionHandler, TaskFactory, TaskHandle, run_background_task from ._context import ( - Context, FactoryCallback, T_Resource, TeardownCallback, @@ -178,84 +176,30 @@ async def run(self) -> int | None: component_types = PluginContainer("asphalt.components", Component) -@overload -async def start_component( - component_class: type[TComponent], - config: dict[str, Any] | None = ..., - *, - timeout: float | None = ..., -) -> TComponent: ... - - -@overload -async def start_component( - component_class: str, - config: dict[str, Any] | None = ..., - *, - timeout: float | None = ..., -) -> Component: ... - - -async def start_component( - component_class: type[Component] | str, - config: dict[str, Any] | None = None, - *, - timeout: float | None = 20, -) -> Component: - """ - Start a component and its subcomponents. - - :param component_class: the root component class, an entry point name in the - ``asphalt.components`` namespace or a ``modulename:varname`` reference - :param config: configuration for the root component (and its child components) - :param timeout: seconds to wait for all the components in the hierarchy to start - (default: ``20``; set to ``None`` to disable timeout) - :raises RuntimeError: if this function is called without an active :class:`Context` - :raises TimeoutError: if the startup of the component hierarchy takes more than - ``timeout`` seconds - :raises TypeError: if ``component_class`` is neither a :class:`Component` subclass - or a string - - """ - try: - current_context() - except NoCurrentContext: - raise RuntimeError( - "start_component() requires an active Asphalt context" - ) from None - - if config is None: - config = {} - elif not isinstance(config, MutableMapping): - raise TypeError("config must be a dict (or any other mutable mapping) or None") - - orchestrator = ComponentStartupOrchestrator(component_class, config) - return await orchestrator.start_component_tree(timeout) - - class ComponentState(Enum): initialized = auto() preparing = auto() + starting_children = auto() starting = auto() started = auto() closed = auto() class ComponentContext: - _context: Context - def __init__( self, + component: Component, path: str, - component_class: type[Component], default_resource_name: str, + child_contexts: dict[str, ComponentContext], ): - super().__init__() self._path = path - self._component_class = component_class + self._component = component self._default_resource_name = default_resource_name self._context = current_context() + self._child_contexts = child_contexts self._state: ComponentState = ComponentState.initialized + self._coro: Coroutine[Any, Any, None] | None = None def __format_resource_description( self, types: Any, name: str, description: str | None = None @@ -537,211 +481,255 @@ async def finalize_service_task() -> None: return task_handle.start_value -@dataclass -class ComponentStatus: - component_class: type[Component] - status: str = "creating" - coro: Coroutine[Any, Any, Any] | None = None +@overload +async def start_component( + component_class: type[TComponent], + config: dict[str, Any] | None = ..., + *, + timeout: float | None = ..., +) -> TComponent: ... -@dataclass -class ComponentStartupOrchestrator: - root_component_class: type[Component] | str - config: Mapping[str, Any] +@overload +async def start_component( + component_class: str, + config: dict[str, Any] | None = ..., + *, + timeout: float | None = ..., +) -> Component: ... - _component_statuses: dict[str, ComponentStatus] = field( - init=False, default_factory=dict - ) - _components_by_path: dict[str, Component] = field(init=False, default_factory=dict) - _child_component_aliases: dict[str, list[str]] = field( - init=False, default_factory=lambda: defaultdict(list) + +async def start_component( + component_class: type[Component] | str, + config: dict[str, Any] | None = None, + *, + timeout: float | None = 20, +) -> Component: + """ + Start a component and its subcomponents. + + :param component_class: the root component class, an entry point name in the + ``asphalt.components`` namespace or a ``modulename:varname`` reference + :param config: configuration for the root component (and its child components) + :param timeout: seconds to wait for all the components in the hierarchy to start + (default: ``20``; set to ``None`` to disable timeout) + :raises RuntimeError: if this function is called without an active :class:`Context` + :raises TimeoutError: if the startup of the component hierarchy takes more than + ``timeout`` seconds + :raises TypeError: if ``component_class`` is neither a :class:`Component` subclass + or a string + + """ + try: + current_context() + except NoCurrentContext: + raise RuntimeError( + "start_component() requires an active Asphalt context" + ) from None + + if config is None: + config = {} + elif not isinstance(config, MutableMapping): + raise TypeError("config must be a dict (or any other mutable mapping) or None") + + root_component_context = _init_component("", {"type": component_class, **config}) + async with AsyncExitStack() as exit_stack: + tg: TaskGroup | None = None + if timeout: + await exit_stack.enter_async_context(coalesce_exceptions()) + tg = await exit_stack.enter_async_context(create_task_group()) + tg.start_soon( + _watch_component_tree_startup, + root_component_context, + timeout, + ) + + await _start_component(root_component_context, "") + + if tg: + tg.cancel_scope.cancel() + + return root_component_context._component + + +def _init_component( + path: str, + config: MutableMapping[str, Any], + default_resource_name: str = "default", +) -> ComponentContext: + # Separate the child components from the config + child_components_config = config.pop("components", {}) + + # Resolve the type to a class + component_type = config.pop("type") + component_class = component_types.resolve(component_type) + if not isclass(component_class) or not issubclass(component_class, Component): + raise TypeError( + f"{path or '(root)'}: the declared component type ({component_type!r}) " + f"resolved to {component_class!r} which is not a subclass of Component" + ) + + # Instantiate the component + logger.debug("Creating %s", format_component_name(path, component_class)) + try: + component = component_class(**config) + except Exception as exc: + raise ComponentStartError("creating", path, component_class) from exc + + # Merge the overrides to the hard-coded configuration + logger.debug("Created %s", format_component_name(path, component_class)) + child_components_config = merge_config( + component._child_components, child_components_config ) - def _init_component(self, path: str, config: MutableMapping[str, Any]) -> None: - # Separate the child components from the config - child_components_config = config.pop("components", {}) + # Create the child components + child_contexts: dict[str, ComponentContext] = {} + for alias, child_config in child_components_config.items(): + child_path = f"{path}.{alias}" if path else alias - # Resolve the type to a class - component_type = config.pop("type") - component_class = component_types.resolve(component_type) - if not isclass(component_class) or not issubclass(component_class, Component): + if child_config is None: + child_config = {} + elif not isinstance(child_config, MutableMapping): raise TypeError( - f"{path or '(root)'}: the declared component type ({component_type!r}) " - f"resolved to {component_class!r} which is not a subclass of Component" + f"{child_path}: component configuration must be either None or a " + f"dict (or any other mutable mapping type), not " + f"{qualified_name(child_config)}" ) - # Instantiate the component - logger.debug("Creating %s", format_component_name(path, component_class)) - self._component_statuses[path] = ComponentStatus(component_class) + # If the type was specified only via an alias, use that as a type + child_config.setdefault("type", alias) + + # If the type contains a forward slash, split the latter part out of it + if isinstance(child_config["type"], str) and "/" in child_config["type"]: + child_config["type"] = child_config["type"].split("/")[0] + + if "/" in alias: + child_default_resource_name = alias.split("/", 1)[1] + else: + child_default_resource_name = "default" + + child_contexts[child_path] = _init_component( + child_path, child_config, child_default_resource_name + ) + + return ComponentContext(component, path, default_resource_name, child_contexts) + + +async def _start_component(context: ComponentContext, path: str) -> None: + # Prevent add_component() from being called beyond this point + component = context._component + component._component_started = True + + component_class = type(component) + # Call prepare() on the component itself, if it's implemented on the component + # class + if component_class.prepare is not Component.prepare: + logger.debug("Calling prepare() of %s", format_component_name(path)) + context._state = ComponentState.preparing + coro = context._coro = component.prepare(context) try: - component = self._components_by_path[path] = component_class(**config) + await coro except Exception as exc: - raise ComponentStartError("creating", path, component_class) from exc + raise ComponentStartError("preparing", path, component_class) from exc - logger.debug("Created %s", format_component_name(path, component_class)) - self._component_statuses[path].status = "created" + logger.debug("Returned from prepare() of %s", format_component_name(path)) + context._coro = None - # Merge the overrides to the hard-coded configuration - child_components_config = merge_config( - component._child_components, child_components_config - ) - self._child_component_aliases[path] = list(child_components_config) - - # Create the child components - for alias, child_config in child_components_config.items(): - child_path = f"{path}.{alias}" if path else alias - - if child_config is None: - child_config = {} - elif not isinstance(child_config, MutableMapping): - raise TypeError( - f"{child_path}: component configuration must be either None or a " - f"dict (or any other mutable mapping type), not " - f"{qualified_name(child_config)}" + # Start the child components, if there are any + if context._child_contexts: + logger.debug("Starting the child components of %s", format_component_name(path)) + context._state = ComponentState.starting_children + async with coalesce_exceptions(), create_task_group() as tg: + for alias, child_context in context._child_contexts.items(): + child_path = f"{path}.{alias}" if path else alias + tg.start_soon( + _start_component, + child_context, + child_path, + name=( + f"Starting component {child_path} " + f"({qualified_name(child_context._component)})" + ), ) - # If the type was specified only via an alias, use that as a type - child_config.setdefault("type", alias) - - # If the type contains a forward slash, split the latter part out of it - if isinstance(child_config["type"], str) and "/" in child_config["type"]: - child_config["type"] = child_config["type"].split("/")[0] - - self._init_component(child_path, child_config) - - async def _start_component( - self, component: Component, path: str, default_resource_name: str = "default" - ) -> Component: - # Prevent add_component() from being called beyond this point - component._component_started = True - - component_class = type(component) - component_status = self._component_statuses[path] - proxy = ComponentContext(path, component_class, default_resource_name) - # Call prepare() on the component itself, if it's implemented on the component - # class - if component_class.prepare is not Component.prepare: - logger.debug("Calling prepare() of %s", format_component_name(path)) - component_status.status = "preparing" - coro = component_status.coro = component.prepare(proxy) - component_status.coro = coro - try: - await coro - except Exception as exc: - raise ComponentStartError("preparing", path, component_class) from exc - - logger.debug("Returned from prepare() of %s", format_component_name(path)) - component_status.coro = None - - # Start the child components, if there are any - if child_component_aliases := self._child_component_aliases.get(path): - logger.debug( - "Starting the child components of %s", format_component_name(path) - ) - component_status.status = "starting children" - async with coalesce_exceptions(), create_task_group() as tg: - for alias in child_component_aliases: - if "/" in alias: - default_resource_name = alias.split("/", 1)[1] - else: - default_resource_name = "default" - - child_path = f"{path}.{alias}" if path else alias - child_component = self._components_by_path[child_path] - tg.start_soon( - self._start_component, - child_component, - child_path, - default_resource_name, - name=( - f"Starting component {child_path} " - f"({qualified_name(child_component)})" - ), - ) - - # Call start() on the component itself, if it's implemented on the component - # class - if component_class.start is not Component.start: - proxy._state = ComponentState.starting - logger.debug("Calling start() of %s", format_component_name(path)) - coro = component_status.coro = component.start(proxy) - component_status.status = "starting" - try: - await coro - except Exception as exc: - raise ComponentStartError("starting", path, component_class) from exc - - logger.debug("Returned from start() of %s", format_component_name(path)) - component_status.coro = None - - proxy._state = ComponentState.started - del self._component_statuses[path] - return component - - async def start_component_tree(self, timeout: float | None) -> Component: - async with coalesce_exceptions(), create_task_group() as tg: - component_started_event = Event() - tg.start_soon( - self._watch_component_tree_startup, component_started_event, timeout - ) - self._init_component("", {"type": self.root_component_class, **self.config}) - try: - return await self._start_component(self._components_by_path[""], "") - finally: - component_started_event.set() - - async def _watch_component_tree_startup( - self, component_started_event: Event, timeout: float | None - ) -> None: - with move_on_after(timeout): - await component_started_event.wait() - return + # Call start() on the component itself, if it's implemented on the component + # class + if component_class.start is not Component.start: + context._state = ComponentState.starting + logger.debug("Calling start() of %s", format_component_name(path)) + coro = context._coro = component.start(context) + context._state = ComponentState.starting + try: + await coro + except Exception as exc: + raise ComponentStartError("starting", path, component_class) from exc + + logger.debug("Returned from start() of %s", format_component_name(path)) + context._coro = None + + context._state = ComponentState.started + + +async def _watch_component_tree_startup( + context: ComponentContext, + timeout: float, +) -> None: + def create_status_summaries(subcontext: ComponentContext) -> list[str]: + parts = (subcontext._path or "(root)").split(".") + indent = " " * (len(parts) if subcontext._path else 0) + state = subcontext._state.name.replace("_", " ") + summaries = [f"{indent}{parts[-1]}: {state}"] + for child_context in subcontext._child_contexts.values(): + if child_context._state is not ComponentState.started: + summaries.extend(create_status_summaries(child_context)) + + return summaries + + def create_stack_summaries(subcontext: ComponentContext) -> list[str]: + summaries: list[str] = [] + if subcontext._coro is not None: + stack_summary = _get_coro_stack_summary(subcontext._coro) + formatted_summary = "".join(stack_summary.format()) + title = f"{subcontext._path} ({qualified_name(subcontext._component)})" + summaries.append(f"{title}:\n{formatted_summary.rstrip()}") + + for child_context in subcontext._child_contexts.values(): + summaries.extend(create_stack_summaries(child_context)) + + return summaries + + await sleep(timeout) + status_summary_sections: list[str] = [ + "Timeout waiting for the component tree to start" + ] + status_summaries: list[str] = create_status_summaries(context) + title = "Current status of the components still waiting to finish startup" + status_summary_sections.append(f"{title}\n{'-' * len(title)}") + status_summary_sections.append("\n".join(status_summaries)) + + if stack_summaries := create_stack_summaries(context): + title = "Stack summaries of components still waiting to start" + status_summary_sections.append(f"{title}\n{'-' * len(title)}") + status_summary_sections.extend(stack_summaries) - status_summary_sections: list[str] = [ - "Timeout waiting for the component tree to start" - ] + logger.error("%s", "\n\n".join(status_summary_sections)) + raise TimeoutError("timeout starting component tree") - status_summary: list[str] = [] - for path, status in self._component_statuses.items(): - parts = (path or "(root)").split(".") - indent = " " * (len(parts) if path else 0) - status_summary.append(f"{indent}{parts[-1]}: {status.status}") - title = "Current status of the components still waiting to finish startup" - status_summary_sections.append(f"{title}\n{'-' * len(title)}") - status_summary_sections.append("\n".join(status_summary)) - - stack_summaries: list[str] = [] - for path, status in self._component_statuses.items(): - if status.coro is not None: - stack_summary = self._get_coro_stack_summary(status.coro) - formatted_summary = "".join(stack_summary.format()) - title = f"{path} ({qualified_name(status.component_class)})" - stack_summaries.append(f"{title}:\n{formatted_summary.rstrip()}") - - if stack_summaries: - title = "Stack summaries of components still waiting to start" - status_summary_sections.append(f"{title}\n{'-' * len(title)}") - status_summary_sections.extend(stack_summaries) - - logger.error("%s", "\n\n".join(status_summary_sections)) - raise TimeoutError("timeout starting component tree") - - @staticmethod - def _get_coro_stack_summary(coro: Coroutine[Any, Any, Any]) -> StackSummary: - import gc - - frames: list[FrameType] = [] - awaitable: Awaitable[Any] | None = coro - while isinstance(awaitable, Coroutine): - while awaitable.__class__.__name__ == "async_generator_asend": - # Hack to get past asend() objects - awaitable = gc.get_referents(awaitable)[0].ag_await - - if frame := getattr(awaitable, "cr_frame", None): - frames.append(frame) - - awaitable = getattr(awaitable, "cr_await", None) - - frame_tuples = [(f, f.f_lineno) for f in frames] - return StackSummary.extract(frame_tuples) +def _get_coro_stack_summary(coro: Coroutine[Any, Any, Any]) -> StackSummary: + import gc + + frames: list[FrameType] = [] + awaitable: Awaitable[Any] | None = coro + while isinstance(awaitable, Coroutine): + while awaitable.__class__.__name__ == "async_generator_asend": + # Hack to get past asend() objects + awaitable = gc.get_referents(awaitable)[0].ag_await + + if frame := getattr(awaitable, "cr_frame", None): + frames.append(frame) + + awaitable = getattr(awaitable, "cr_await", None) + + frame_tuples = [(f, f.f_lineno) for f in frames] + return StackSummary.extract(frame_tuples)