diff --git a/cosmos/__init__.py b/cosmos/__init__.py index 3422cba62..be499beae 100644 --- a/cosmos/__init__.py +++ b/cosmos/__init__.py @@ -40,6 +40,36 @@ logger = get_logger(__name__) +try: + from cosmos.operators.gcp_cloud_run_job import ( + DbtLSGcpCloudRunJobOperator, + DbtRunGcpCloudRunJobOperator, + DbtRunOperationGcpCloudRunJobOperator, + DbtSeedGcpCloudRunJobOperator, + DbtSnapshotGcpCloudRunJobOperator, + DbtTestGcpCloudRunJobOperator, + ) +except (ImportError, AttributeError): + DbtLSGcpCloudRunJobOperator = MissingPackage( + "cosmos.operators.gcp_cloud_run_job.DbtLSGcpCloudRunJobOperator", "gcp-cloud-run-job" + ) + DbtRunGcpCloudRunJobOperator = MissingPackage( + "cosmos.operators.gcp_cloud_run_job.DbtRunGcpCloudRunJobOperator", "gcp-cloud-run-job" + ) + DbtRunOperationGcpCloudRunJobOperator = MissingPackage( + "cosmos.operators.gcp_cloud_run_job.DbtRunOperationGcpCloudRunJobOperator", + "gcp-cloud-run-job", + ) + DbtSeedGcpCloudRunJobOperator = MissingPackage( + "cosmos.operators.gcp_cloud_run_job.DbtSeedGcpCloudRunJobOperator", "gcp-cloud-run-job" + ) + DbtSnapshotGcpCloudRunJobOperator = MissingPackage( + "cosmos.operators.gcp_cloud_run_job.DbtSnapshotGcpCloudRunJobOperator", "gcp-cloud-run-job" + ) + DbtTestGcpCloudRunJobOperator = MissingPackage( + "cosmos.operators.gcp_cloud_run_job.DbtTestGcpCloudRunJobOperator", "gcp-cloud-run-job" + ) + try: from cosmos.operators.docker import ( DbtLSDockerOperator, @@ -142,6 +172,12 @@ "DbtSnapshotLocalOperator", "DbtDag", "DbtTaskGroup", + "DbtLSGcpCloudRunJobOperator", + "DbtRunGcpCloudRunJobOperator", + "DbtRunOperationGcpCloudRunJobOperator", + "DbtSeedGcpCloudRunJobOperator", + "DbtSnapshotGcpCloudRunJobOperator", + "DbtTestGcpCloudRunJobOperator", "DbtLSDockerOperator", "DbtRunOperationDockerOperator", "DbtRunDockerOperator", diff --git a/cosmos/constants.py b/cosmos/constants.py index 6e96551e4..e9d1aaa6b 100644 --- a/cosmos/constants.py +++ b/cosmos/constants.py @@ -91,6 +91,7 @@ class ExecutionMode(Enum): AWS_EKS = "aws_eks" VIRTUALENV = "virtualenv" AZURE_CONTAINER_INSTANCE = "azure_container_instance" + GCP_CLOUD_RUN_JOB = "gcp_cloud_run_job" class InvocationMode(Enum): diff --git a/cosmos/converter.py b/cosmos/converter.py index 553b7b49e..466d33a8d 100644 --- a/cosmos/converter.py +++ b/cosmos/converter.py @@ -120,6 +120,7 @@ def validate_initial_user_config( ExecutionMode.KUBERNETES, ExecutionMode.AWS_EKS, ExecutionMode.DOCKER, + ExecutionMode.GCP_CLOUD_RUN_JOB, ): raise CosmosValueError(f"The profile_config is mandatory when using {execution_config.execution_mode}") diff --git a/cosmos/operators/gcp_cloud_run_job.py b/cosmos/operators/gcp_cloud_run_job.py new file mode 100644 index 000000000..6b4ded49a --- /dev/null +++ b/cosmos/operators/gcp_cloud_run_job.py @@ -0,0 +1,172 @@ +from __future__ import annotations + +import inspect +from typing import Any, Callable, Sequence + +from airflow.utils.context import Context + +from cosmos.config import ProfileConfig +from cosmos.log import get_logger +from cosmos.operators.base import ( + AbstractDbtBaseOperator, + DbtBuildMixin, + DbtLSMixin, + DbtRunMixin, + DbtRunOperationMixin, + DbtSeedMixin, + DbtSnapshotMixin, + DbtTestMixin, +) + +logger = get_logger(__name__) + +DEFAULT_ENVIRONMENT_VARIABLES: dict[str, str] = {} + +try: + from airflow.providers.google.cloud.operators.cloud_run import CloudRunExecuteJobOperator + + # The overrides parameter needed to pass the dbt command was added in apache-airflow-providers-google==10.13.0 + init_signature = inspect.signature(CloudRunExecuteJobOperator.__init__) + if "overrides" not in init_signature.parameters: + raise AttributeError( + "CloudRunExecuteJobOperator does not have `overrides` attribute. Ensure you've installed apache-airflow-providers-google of at least 10.11.0 " + "separately or with `pip install astronomer-cosmos[...,gcp-cloud-run-job]`." + ) +except ImportError: + raise ImportError( + "Could not import CloudRunExecuteJobOperator. Ensure you've installed the Google Cloud provider " + "separately or with `pip install astronomer-cosmos[...,gcp-cloud-run-job]`." + ) + + +class DbtGcpCloudRunJobBaseOperator(AbstractDbtBaseOperator, CloudRunExecuteJobOperator): # type: ignore + """ + Executes a dbt core cli command in a Cloud Run Job instance with dbt installed in it. + + """ + + template_fields: Sequence[str] = tuple( + list(AbstractDbtBaseOperator.template_fields) + list(CloudRunExecuteJobOperator.template_fields) + ) + + intercept_flag = False + + def __init__( + self, + # arguments required by CloudRunExecuteJobOperator + project_id: str, + region: str, + job_name: str, + # + profile_config: ProfileConfig | None = None, + command: list[str] | None = None, + environment_variables: dict[str, Any] | None = None, + **kwargs: Any, + ) -> None: + self.profile_config = profile_config + self.command = command + self.environment_variables = environment_variables or DEFAULT_ENVIRONMENT_VARIABLES + super().__init__(project_id=project_id, region=region, job_name=job_name, **kwargs) + + def build_and_run_cmd(self, context: Context, cmd_flags: list[str] | None = None) -> Any: + self.build_command(context, cmd_flags) + self.log.info(f"Running command: {self.command}") + result = CloudRunExecuteJobOperator.execute(self, context) + logger.info(result) + + def build_command(self, context: Context, cmd_flags: list[str] | None = None) -> None: + # For the first round, we're going to assume that the command is dbt + # This means that we don't have openlineage support, but we will create a ticket + # to add that in the future + self.dbt_executable_path = "dbt" + dbt_cmd, env_vars = self.build_cmd(context=context, cmd_flags=cmd_flags) + self.environment_variables = {**env_vars, **self.environment_variables} + self.command = dbt_cmd + # Override Cloud Run Job default arguments with dbt command + self.overrides = { + "container_overrides": [ + { + "args": self.command, + "env": [{"name": key, "value": value} for key, value in self.environment_variables.items()], + } + ], + } + + +class DbtBuildGcpCloudRunJobOperator(DbtBuildMixin, DbtGcpCloudRunJobBaseOperator): + """ + Executes a dbt core build command. + """ + + template_fields: Sequence[str] = DbtGcpCloudRunJobBaseOperator.template_fields + DbtBuildMixin.template_fields # type: ignore[operator] + + def __init__(self, *args: Any, **kwargs: Any) -> None: + super().__init__(*args, **kwargs) + + +class DbtLSGcpCloudRunJobOperator(DbtLSMixin, DbtGcpCloudRunJobBaseOperator): + """ + Executes a dbt core ls command. + """ + + def __init__(self, *args: Any, **kwargs: Any) -> None: + super().__init__(*args, **kwargs) + + +class DbtSeedGcpCloudRunJobOperator(DbtSeedMixin, DbtGcpCloudRunJobBaseOperator): + """ + Executes a dbt core seed command. + + :param full_refresh: dbt optional arg - dbt will treat incremental models as table models + """ + + template_fields: Sequence[str] = DbtGcpCloudRunJobBaseOperator.template_fields + DbtSeedMixin.template_fields # type: ignore[operator] + + def __init__(self, *args: Any, **kwargs: Any) -> None: + super().__init__(*args, **kwargs) + + +class DbtSnapshotGcpCloudRunJobOperator(DbtSnapshotMixin, DbtGcpCloudRunJobBaseOperator): + """ + Executes a dbt core snapshot command. + """ + + def __init__(self, *args: Any, **kwargs: Any) -> None: + super().__init__(*args, **kwargs) + + +class DbtRunGcpCloudRunJobOperator(DbtRunMixin, DbtGcpCloudRunJobBaseOperator): + """ + Executes a dbt core run command. + """ + + template_fields: Sequence[str] = DbtGcpCloudRunJobBaseOperator.template_fields + DbtRunMixin.template_fields # type: ignore[operator] + + def __init__(self, *args: Any, **kwargs: Any) -> None: + super().__init__(*args, **kwargs) + + +class DbtTestGcpCloudRunJobOperator(DbtTestMixin, DbtGcpCloudRunJobBaseOperator): + """ + Executes a dbt core test command. + """ + + def __init__(self, on_warning_callback: Callable[..., Any] | None = None, **kwargs: str) -> None: + super().__init__(**kwargs) + # as of now, on_warning_callback in docker executor does nothing + self.on_warning_callback = on_warning_callback + + +class DbtRunOperationGcpCloudRunJobOperator(DbtRunOperationMixin, DbtGcpCloudRunJobBaseOperator): + """ + Executes a dbt core run-operation command. + + :param macro_name: name of macro to execute + :param args: Supply arguments to the macro. This dictionary will be mapped to the keyword arguments defined in the + selected macro. + """ + + template_fields: Sequence[str] = DbtGcpCloudRunJobBaseOperator.template_fields + DbtRunOperationMixin.template_fields # type: ignore[operator] + + def __init__(self, *args: Any, **kwargs: Any) -> None: + super().__init__(*args, **kwargs) diff --git a/docs/getting_started/execution-modes.rst b/docs/getting_started/execution-modes.rst index df9b0f26d..dfa843fc0 100644 --- a/docs/getting_started/execution-modes.rst +++ b/docs/getting_started/execution-modes.rst @@ -11,6 +11,7 @@ Cosmos can run ``dbt`` commands using five different approaches, called ``execut 4. **kubernetes**: Run ``dbt`` commands from Kubernetes Pods managed by Cosmos (requires a pre-existing Docker image) 5. **aws_eks**: Run ``dbt`` commands from AWS EKS Pods managed by Cosmos (requires a pre-existing Docker image) 6. **azure_container_instance**: Run ``dbt`` commands from Azure Container Instances managed by Cosmos (requires a pre-existing Docker image) +7. **gcp_cloud_run_job**: Run ``dbt`` commands from GCP Cloud Run Job instances managed by Cosmos (requires a pre-existing Docker image) The choice of the ``execution mode`` can vary based on each user's needs and concerns. For more details, check each execution mode described below. @@ -47,6 +48,10 @@ The choice of the ``execution mode`` can vary based on each user's needs and con - Slow - High - No + * - GCP Cloud Run Job Instance + - Slow + - High + - No Local ----- @@ -209,6 +214,27 @@ Each task will create a new container on Azure, giving full isolation. This, how }, ) +GCP Cloud Run Job +------------------------ + +The ``gcp_cloud_run_job`` execution mode is particularly useful for users who prefer to run their ``dbt`` commands on Google Cloud infrastructure, taking advantage of Cloud Run's scalability, isolation, and managed service capabilities. + +This execution mode allows users to run ``dbt`` core CLI commands in a Google Cloud Run Job instance. This mode leverages the ``CloudRunExecuteJobOperator`` from the Google Cloud Airflow provider to execute commands within a Cloud Run Job instance, where ``dbt`` is already installed. Similarly to the ``Docker`` and ``Kubernetes`` execution modes, a Docker container should be available, containing the up-to-date ``dbt`` pipelines and profiles. + +Each task will create a new Cloud Run Job execution, giving full isolation. The separation of tasks adds extra overhead; however, that can be mitigated by using the ``concurrency`` parameter in ``DbtDag``, which will result in parallelized execution of ``dbt`` models. + + +.. code-block:: python + + gcp_cloud_run_job_cosmos_dag = DbtDag( + # ... + execution_config=ExecutionConfig(execution_mode=ExecutionMode.GCP_CLOUD_RUN_JOB), + operator_args={ + "project_id": "my-gcp-project-id", + "region": "europe-west1", + "job_name": "my-crj-{{ ti.task_id.replace('.','-').replace('_','-') }}", + }, + ) .. _invocation_modes: Invocation Modes diff --git a/pyproject.toml b/pyproject.toml index 2087b3f32..dc6a9671c 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -69,7 +69,7 @@ openlineage = ["openlineage-integration-common!=1.15.0", "openlineage-airflow"] amazon = [ "apache-airflow-providers-amazon[s3fs]>=3.0.0", ] -google = ["apache-airflow-providers-google"] +google = ["apache-airflow-providers-google>=10.11.0"] microsoft = ["apache-airflow-providers-microsoft-azure"] all = [ "astronomer-cosmos[dbt-all]", @@ -111,6 +111,9 @@ aws_eks = [ azure-container-instance = [ "apache-airflow-providers-microsoft-azure>=8.4.0", ] +gcp-cloud-run-job = [ + "apache-airflow-providers-google>=10.11.0", +] [project.entry-points.apache_airflow_provider] provider_info = "cosmos:get_provider_info" @@ -160,7 +163,9 @@ airflow = ["2.4", "2.5", "2.6", "2.7", "2.8", "2.9", "2.10"] [tool.hatch.envs.tests.overrides] matrix.airflow.dependencies = [ - { value = "typing_extensions<4.6", if = ["2.6"] } + { value = "typing_extensions<4.6", if = ["2.6"] }, + { value = "apache-airflow-providers-google<10.11.0", if = ["2.4"] }, + { value = "apache-airflow-providers-google>=10.11.0", if = ["2.5", "2.6", "2.7", "2.8", "2.9", "2.10"] }, ] [tool.hatch.envs.tests.scripts] @@ -192,7 +197,7 @@ dependencies = [ "aenum", "apache-airflow-providers-amazon[s3fs]>=3.0.0", "apache-airflow-providers-cncf-kubernetes>=5.1.1", - "apache-airflow-providers-google", + "apache-airflow-providers-google>=10.11.0", "apache-airflow-providers-microsoft-azure", "msgpack", "openlineage-airflow", diff --git a/tests/operators/test_gcp_cloud_run_job.py b/tests/operators/test_gcp_cloud_run_job.py new file mode 100644 index 000000000..a6f16942a --- /dev/null +++ b/tests/operators/test_gcp_cloud_run_job.py @@ -0,0 +1,247 @@ +import inspect +from pathlib import Path +from unittest.mock import MagicMock, patch + +import pkg_resources +import pytest +from airflow.utils.context import Context +from pendulum import datetime + +try: + from cosmos.operators.gcp_cloud_run_job import ( + DbtGcpCloudRunJobBaseOperator, + DbtLSGcpCloudRunJobOperator, + DbtRunGcpCloudRunJobOperator, + DbtSeedGcpCloudRunJobOperator, + DbtTestGcpCloudRunJobOperator, + DbtBuildGcpCloudRunJobOperator, + DbtSnapshotGcpCloudRunJobOperator, + DbtRunOperationGcpCloudRunJobOperator, + ) + + class ConcreteDbtGcpCloudRunJobOperator(DbtGcpCloudRunJobBaseOperator): + base_cmd = ["cmd"] + +except (ImportError, AttributeError): + DbtGcpCloudRunJobBaseOperator = None + + +BASE_KWARGS = { + "task_id": "my-task", + "project_id": "my-gcp-project-id", + "region": "europe-west1", + "job_name": "my-fantastic-dbt-job", + "environment_variables": {"FOO": "BAR", "OTHER_FOO": "OTHER_BAR"}, + "project_dir": "my/dir", + "vars": { + "start_time": "{{ data_interval_start.strftime('%Y%m%d%H%M%S') }}", + "end_time": "{{ data_interval_end.strftime('%Y%m%d%H%M%S') }}", + }, + "no_version_check": True, +} + + +def skip_on_empty_operator(test_func): + """ + Skip the test if DbtGcpCloudRunJob operators couldn't be imported. + It is required as some tests don't rely on those operators and in this case we need to avoid throwing an exception. + """ + return pytest.mark.skipif( + DbtGcpCloudRunJobBaseOperator is None, reason="DbtGcpCloudRunJobBaseOperator could not be imported" + )(test_func) + + +def test_overrides_missing(): + """ + The overrides parameter needed to pass the dbt command was added in apache-airflow-providers-google==10.11.0. + We need to check if the parameter is actually present in required version. + """ + required_version = "10.11.0" + package_name = "apache-airflow-providers-google" + + from airflow.providers.google.cloud.operators.cloud_run import CloudRunExecuteJobOperator + + installed_version = pkg_resources.get_distribution(package_name).version + init_signature = inspect.signature(CloudRunExecuteJobOperator.__init__) + + if pkg_resources.parse_version(installed_version) < pkg_resources.parse_version(required_version): + assert "overrides" not in init_signature.parameters + else: + assert "overrides" in init_signature.parameters + + +@skip_on_empty_operator +def test_dbt_gcp_cloud_run_job_operator_add_global_flags() -> None: + """ + Check if global flags are added correctly. + """ + dbt_base_operator = ConcreteDbtGcpCloudRunJobOperator( + task_id="my-task", + project_id="my-gcp-project-id", + region="europe-west1", + job_name="my-fantastic-dbt-job", + project_dir="my/dir", + vars={ + "start_time": "{{ data_interval_start.strftime('%Y%m%d%H%M%S') }}", + "end_time": "{{ data_interval_end.strftime('%Y%m%d%H%M%S') }}", + }, + no_version_check=True, + ) + assert dbt_base_operator.add_global_flags() == [ + "--vars", + "end_time: '{{ data_interval_end.strftime(''%Y%m%d%H%M%S'') }}'\n" + "start_time: '{{ data_interval_start.strftime(''%Y%m%d%H%M%S'') }}'\n", + "--no-version-check", + ] + + +@skip_on_empty_operator +@patch("cosmos.operators.base.context_to_airflow_vars") +def test_dbt_gcp_cloud_run_job_operator_get_env(p_context_to_airflow_vars: MagicMock) -> None: + """ + If an end user passes in a variable via the context that is also a global flag, validate that the both are kept + """ + dbt_base_operator = ConcreteDbtGcpCloudRunJobOperator( + task_id="my-task", + project_id="my-gcp-project-id", + region="europe-west1", + job_name="my-fantastic-dbt-job", + project_dir="my/dir", + ) + dbt_base_operator.env = { + "start_date": "20220101", + "end_date": "20220102", + "some_path": Path(__file__), + "retries": 3, + ("tuple", "key"): "some_value", + } + p_context_to_airflow_vars.return_value = {"START_DATE": "2023-02-15 12:30:00"} + env = dbt_base_operator.get_env( + Context(execution_date=datetime(2023, 2, 15, 12, 30)), + ) + expected_env = { + "start_date": "20220101", + "end_date": "20220102", + "some_path": Path(__file__), + "START_DATE": "2023-02-15 12:30:00", + } + assert env == expected_env + + +@skip_on_empty_operator +@patch("cosmos.operators.base.context_to_airflow_vars") +def test_dbt_gcp_cloud_run_job_operator_check_environment_variables( + p_context_to_airflow_vars: MagicMock, +) -> None: + """ + If an end user passes in a variable via the context that is also a global flag, validate that the both are kept + """ + dbt_base_operator = ConcreteDbtGcpCloudRunJobOperator( + task_id="my-task", + project_id="my-gcp-project-id", + region="europe-west1", + job_name="my-fantastic-dbt-job", + project_dir="my/dir", + environment_variables={"FOO": "BAR"}, + ) + dbt_base_operator.env = { + "start_date": "20220101", + "end_date": "20220102", + "some_path": Path(__file__), + "retries": 3, + "FOO": "foo", + ("tuple", "key"): "some_value", + } + expected_env = {"start_date": "20220101", "end_date": "20220102", "some_path": Path(__file__), "FOO": "BAR"} + dbt_base_operator.build_command(context=MagicMock()) + + assert dbt_base_operator.environment_variables == expected_env + + +@skip_on_empty_operator +def test_dbt_gcp_cloud_run_job_build_command(): + """ + Check whether the dbt command is built correctly. + """ + + result_map = { + "ls": DbtLSGcpCloudRunJobOperator(**BASE_KWARGS), + "run": DbtRunGcpCloudRunJobOperator(**BASE_KWARGS), + "test": DbtTestGcpCloudRunJobOperator(**BASE_KWARGS), + "seed": DbtSeedGcpCloudRunJobOperator(**BASE_KWARGS), + "build": DbtBuildGcpCloudRunJobOperator(**BASE_KWARGS), + "snapshot": DbtSnapshotGcpCloudRunJobOperator(**BASE_KWARGS), + "run-operation": DbtRunOperationGcpCloudRunJobOperator(macro_name="some-macro", **BASE_KWARGS), + } + + for command_name, command_operator in result_map.items(): + command_operator.build_command(context=MagicMock(), cmd_flags=MagicMock()) + if command_name != "run-operation": + assert command_operator.command == [ + "dbt", + command_name, + "--vars", + "end_time: '{{ data_interval_end.strftime(''%Y%m%d%H%M%S'') }}'\n" + "start_time: '{{ data_interval_start.strftime(''%Y%m%d%H%M%S'') }}'\n", + "--no-version-check", + ] + else: + assert command_operator.command == [ + "dbt", + command_name, + "some-macro", + "--vars", + "end_time: '{{ data_interval_end.strftime(''%Y%m%d%H%M%S'') }}'\n" + "start_time: '{{ data_interval_start.strftime(''%Y%m%d%H%M%S'') }}'\n", + "--no-version-check", + ] + + +@skip_on_empty_operator +def test_dbt_gcp_cloud_run_job_overrides_parameter(): + """ + Check whether overrides parameter passed on to CloudRunExecuteJobOperator is built correctly. + """ + + run_operator = DbtRunGcpCloudRunJobOperator(**BASE_KWARGS) + run_operator.build_command(context=MagicMock(), cmd_flags=MagicMock()) + + actual_overrides = run_operator.overrides + + assert "container_overrides" in actual_overrides + actual_container_overrides = actual_overrides["container_overrides"][0] + + assert isinstance(actual_container_overrides["args"], list), "`args` should be of type list" + + assert "env" in actual_container_overrides + actual_env = actual_container_overrides["env"] + + expected_env_vars = [{"name": "FOO", "value": "BAR"}, {"name": "OTHER_FOO", "value": "OTHER_BAR"}] + + for expected_env_var in expected_env_vars: + assert expected_env_var in actual_env + + +@skip_on_empty_operator +@patch("cosmos.operators.gcp_cloud_run_job.CloudRunExecuteJobOperator.execute") +def test_dbt_gcp_cloud_run_job_build_and_run_cmd(mock_execute): + """ + Check that building methods run correctly. + """ + + dbt_base_operator = ConcreteDbtGcpCloudRunJobOperator( + task_id="my-task", + project_id="my-gcp-project-id", + region="europe-west1", + job_name="my-fantastic-dbt-job", + project_dir="my/dir", + environment_variables={"FOO": "BAR"}, + ) + mock_build_command = MagicMock() + dbt_base_operator.build_command = mock_build_command + + mock_context = MagicMock() + dbt_base_operator.build_and_run_cmd(context=mock_context) + + mock_build_command.assert_called_with(mock_context, None) + mock_execute.assert_called_once_with(dbt_base_operator, mock_context)