diff --git a/backend/tests/conftest.py b/backend/tests/conftest.py index 50a8584e49..a47ab40434 100644 --- a/backend/tests/conftest.py +++ b/backend/tests/conftest.py @@ -59,7 +59,7 @@ PORT_PREFECT, PORT_REDIS, ) -from tests.helpers.utils import get_exposed_port, start_neo4j_container +from tests.helpers.utils import get_exposed_port, start_neo4j_container, start_prefect_server_container ResponseClass = TypeVar("ResponseClass") DEFAULT_TESTING_LOG_LEVEL = "WARNING" @@ -367,23 +367,7 @@ def nats(nats_container: dict[int, int] | None, reload_settings_before_each_modu @pytest.fixture(scope="session") def prefect_container(request: pytest.FixtureRequest, load_settings_before_session) -> Optional[dict[int, int]]: - if not INFRAHUB_USE_TEST_CONTAINERS: - return None - - container = ( - DockerContainer(image="prefecthq/prefect:3.0.11-python3.12") - .with_command("prefect server start --host 0.0.0.0 --ui") - .with_exposed_ports(PORT_PREFECT) - ) - - def cleanup(): - container.stop() - - container.start() - wait_for_logs(container, "Configure Prefect to communicate with the server") - request.addfinalizer(cleanup) - - return {PORT_PREFECT: get_exposed_port(container, PORT_PREFECT)} + return start_prefect_server_container(request) @pytest.fixture(scope="module") diff --git a/backend/tests/helpers/test_app.py b/backend/tests/helpers/test_app.py index c30aa5b8c1..975e7c8053 100644 --- a/backend/tests/helpers/test_app.py +++ b/backend/tests/helpers/test_app.py @@ -67,7 +67,7 @@ def bus_simulator(self, db: InfrahubDatabase) -> Generator[BusSimulator, None, N config.OVERRIDE.message_bus = original @pytest.fixture(scope="class", autouse=True) - async def workflow_local(self) -> AsyncGenerator[WorkflowLocalExecution, None]: + async def workflow_local(self, prefect: Generator[str, None, None]) -> AsyncGenerator[WorkflowLocalExecution, None]: original = config.OVERRIDE.workflow workflow = WorkflowLocalExecution() await setup_task_manager() diff --git a/backend/tests/helpers/test_worker.py b/backend/tests/helpers/test_worker.py index 5c7c76b506..fbc725aa0e 100644 --- a/backend/tests/helpers/test_worker.py +++ b/backend/tests/helpers/test_worker.py @@ -11,8 +11,6 @@ from prefect.client.schemas.filters import WorkPoolFilter, WorkPoolFilterId from prefect.client.schemas.objects import FlowRun, StateType, WorkPool from prefect.workers.base import BaseWorkerResult -from testcontainers.core.container import DockerContainer -from testcontainers.core.waiting_utils import wait_for_logs from infrahub.tasks.dummy import DUMMY_FLOW, DUMMY_FLOW_BROKEN from infrahub.workers.infrahub_async import ( @@ -22,11 +20,10 @@ from infrahub.workflows.initialization import setup_blocks from infrahub.workflows.models import WorkerPoolDefinition from tests.helpers.constants import ( - INFRAHUB_USE_TEST_CONTAINERS, PORT_PREFECT, ) from tests.helpers.test_app import TestInfrahubApp -from tests.helpers.utils import get_exposed_port +from tests.helpers.utils import start_prefect_server_container class TestWorkerInfrahubAsync(TestInfrahubApp): @@ -62,33 +59,17 @@ async def worker_run_flow( ) @pytest.fixture(scope="class") - def prefect_container( + def prefect_container_class( self, request: pytest.FixtureRequest, load_settings_before_session: Any ) -> dict[int, int] | None: - if not INFRAHUB_USE_TEST_CONTAINERS: - return None - - container = ( - DockerContainer(image="prefecthq/prefect:3.0.11-python3.12") - .with_command("prefect server start --host 0.0.0.0 --ui") - .with_exposed_ports(PORT_PREFECT) - ) - - def cleanup() -> None: - container.stop() - - container.start() - wait_for_logs(container, "Configure Prefect to communicate with the server") - request.addfinalizer(cleanup) - - return {PORT_PREFECT: get_exposed_port(container, PORT_PREFECT)} + return start_prefect_server_container(request) @pytest.fixture(scope="class") def prefect_server( - self, prefect_container: dict[int, int] | None, reload_settings_before_each_module: Any + self, prefect_container_class: dict[int, int] | None, reload_settings_before_each_module: Any ) -> Generator[str, None, None]: - if prefect_container: - server_port = prefect_container[PORT_PREFECT] + if prefect_container_class: + server_port = prefect_container_class[PORT_PREFECT] server_api_url = f"http://localhost:{server_port}/api" else: server_api_url = f"http://localhost:{PORT_PREFECT}/api" diff --git a/backend/tests/helpers/utils.py b/backend/tests/helpers/utils.py index 8894b3dd3c..6830f3392a 100644 --- a/backend/tests/helpers/utils.py +++ b/backend/tests/helpers/utils.py @@ -1,11 +1,12 @@ from contextlib import contextmanager from typing import Generator +import pytest from testcontainers.core.container import DockerContainer from testcontainers.core.waiting_utils import wait_for_logs from infrahub.services import InfrahubServices, services -from tests.helpers.constants import PORT_BOLT_NEO4J, PORT_HTTP_NEO4J +from tests.helpers.constants import INFRAHUB_USE_TEST_CONTAINERS, PORT_BOLT_NEO4J, PORT_HTTP_NEO4J, PORT_PREFECT def get_exposed_port(container: DockerContainer, port: int) -> int: @@ -35,6 +36,28 @@ def start_neo4j_container(neo4j_image: str) -> DockerContainer: return container +def start_prefect_server_container( + request: pytest.FixtureRequest, +) -> dict[int, int] | None: + if not INFRAHUB_USE_TEST_CONTAINERS: + return None + + container = ( + DockerContainer(image="prefecthq/prefect:3.0.11-python3.12") + .with_command("prefect server start --host 0.0.0.0 --ui") + .with_exposed_ports(PORT_PREFECT) + ) + + def cleanup() -> None: + container.stop() + + container.start() + wait_for_logs(container, "Configure Prefect to communicate with the server") + request.addfinalizer(cleanup) + + return {PORT_PREFECT: get_exposed_port(container, PORT_PREFECT)} + + @contextmanager def init_global_service(service: InfrahubServices) -> Generator: """