diff --git a/cosmos/__init__.py b/cosmos/__init__.py index c98a14402..75b56583d 100644 --- a/cosmos/__init__.py +++ b/cosmos/__init__.py @@ -32,6 +32,40 @@ logger = get_logger(__name__) +try: + from cosmos.operators.gcp_cloud_run_job import ( + DbtBuildGcpCloudRunJobOperator, + DbtLSGcpCloudRunJobOperator, + DbtRunGcpCloudRunJobOperator, + DbtRunOperationGcpCloudRunJobOperator, + DbtSeedGcpCloudRunJobOperator, + DbtSnapshotGcpCloudRunJobOperator, + DbtTestGcpCloudRunJobOperator, + ) +except (ImportError, AttributeError): + DbtBuildGcpCloudRunJobOperator = MissingPackage( + "cosmos.operators.gcp_cloud_run_job.DbtBuildGcpCloudRunJobOperator", "gcp-cloud-run-job" + ) + DbtLSGcpCloudRunJobOperator = MissingPackage( + "cosmos.operators.gcp_cloud_run_job.DbtLSGcpCloudRunJobOperator", "gcp-cloud-run-job" + ) + DbtRunGcpCloudRunJobOperator = MissingPackage( + "cosmos.operators.gcp_cloud_run_job.DbtRunGcpCloudRunJobOperator", "gcp-cloud-run-job" + ) + DbtRunOperationGcpCloudRunJobOperator = MissingPackage( + "cosmos.operators.gcp_cloud_run_job.DbtRunOperationGcpCloudRunJobOperator", + "gcp-cloud-run-job", + ) + DbtSeedGcpCloudRunJobOperator = MissingPackage( + "cosmos.operators.gcp_cloud_run_job.DbtSeedGcpCloudRunJobOperator", "gcp-cloud-run-job" + ) + DbtSnapshotGcpCloudRunJobOperator = MissingPackage( + "cosmos.operators.gcp_cloud_run_job.DbtSnapshotGcpCloudRunJobOperator", "gcp-cloud-run-job" + ) + DbtTestGcpCloudRunJobOperator = MissingPackage( + "cosmos.operators.gcp_cloud_run_job.DbtTestGcpCloudRunJobOperator", "gcp-cloud-run-job" + ) + try: from cosmos.operators.docker import ( DbtLSDockerOperator, @@ -134,6 +168,13 @@ "DbtSnapshotLocalOperator", "DbtDag", "DbtTaskGroup", + "DbtBuildGcpCloudRunJobOperator", + "DbtLSGcpCloudRunJobOperator", + "DbtRunGcpCloudRunJobOperator", + "DbtRunOperationGcpCloudRunJobOperator", + "DbtSeedGcpCloudRunJobOperator", + "DbtSnapshotGcpCloudRunJobOperator", + "DbtTestGcpCloudRunJobOperator", "DbtLSDockerOperator", "DbtRunOperationDockerOperator", "DbtRunDockerOperator", diff --git a/cosmos/constants.py b/cosmos/constants.py index 6e96551e4..e9d1aaa6b 100644 --- a/cosmos/constants.py +++ b/cosmos/constants.py @@ -91,6 +91,7 @@ class ExecutionMode(Enum): AWS_EKS = "aws_eks" VIRTUALENV = "virtualenv" AZURE_CONTAINER_INSTANCE = "azure_container_instance" + GCP_CLOUD_RUN_JOB = "gcp_cloud_run_job" class InvocationMode(Enum): diff --git a/cosmos/converter.py b/cosmos/converter.py index 40929ef55..00c109839 100644 --- a/cosmos/converter.py +++ b/cosmos/converter.py @@ -116,10 +116,9 @@ def validate_initial_user_config( :param render_config: Configuration related to how to convert the dbt workflow into an Airflow DAG :param operator_args: Arguments to pass to the underlying operators. """ - if profile_config is None and execution_config.execution_mode not in ( - ExecutionMode.KUBERNETES, - ExecutionMode.AWS_EKS, - ExecutionMode.DOCKER, + if profile_config is None and execution_config.execution_mode in ( + ExecutionMode.LOCAL, + ExecutionMode.VIRTUALENV, ): raise CosmosValueError(f"The profile_config is mandatory when using {execution_config.execution_mode}") diff --git a/cosmos/operators/gcp_cloud_run_job.py b/cosmos/operators/gcp_cloud_run_job.py new file mode 100644 index 000000000..6b4ded49a --- /dev/null +++ b/cosmos/operators/gcp_cloud_run_job.py @@ -0,0 +1,172 @@ +from __future__ import annotations + +import inspect +from typing import Any, Callable, Sequence + +from airflow.utils.context import Context + +from cosmos.config import ProfileConfig +from cosmos.log import get_logger +from cosmos.operators.base import ( + AbstractDbtBaseOperator, + DbtBuildMixin, + DbtLSMixin, + DbtRunMixin, + DbtRunOperationMixin, + DbtSeedMixin, + DbtSnapshotMixin, + DbtTestMixin, +) + +logger = get_logger(__name__) + +DEFAULT_ENVIRONMENT_VARIABLES: dict[str, str] = {} + +try: + from airflow.providers.google.cloud.operators.cloud_run import CloudRunExecuteJobOperator + + # The overrides parameter needed to pass the dbt command was added in apache-airflow-providers-google==10.13.0 + init_signature = inspect.signature(CloudRunExecuteJobOperator.__init__) + if "overrides" not in init_signature.parameters: + raise AttributeError( + "CloudRunExecuteJobOperator does not have `overrides` attribute. Ensure you've installed apache-airflow-providers-google of at least 10.11.0 " + "separately or with `pip install astronomer-cosmos[...,gcp-cloud-run-job]`." + ) +except ImportError: + raise ImportError( + "Could not import CloudRunExecuteJobOperator. Ensure you've installed the Google Cloud provider " + "separately or with `pip install astronomer-cosmos[...,gcp-cloud-run-job]`." + ) + + +class DbtGcpCloudRunJobBaseOperator(AbstractDbtBaseOperator, CloudRunExecuteJobOperator): # type: ignore + """ + Executes a dbt core cli command in a Cloud Run Job instance with dbt installed in it. + + """ + + template_fields: Sequence[str] = tuple( + list(AbstractDbtBaseOperator.template_fields) + list(CloudRunExecuteJobOperator.template_fields) + ) + + intercept_flag = False + + def __init__( + self, + # arguments required by CloudRunExecuteJobOperator + project_id: str, + region: str, + job_name: str, + # + profile_config: ProfileConfig | None = None, + command: list[str] | None = None, + environment_variables: dict[str, Any] | None = None, + **kwargs: Any, + ) -> None: + self.profile_config = profile_config + self.command = command + self.environment_variables = environment_variables or DEFAULT_ENVIRONMENT_VARIABLES + super().__init__(project_id=project_id, region=region, job_name=job_name, **kwargs) + + def build_and_run_cmd(self, context: Context, cmd_flags: list[str] | None = None) -> Any: + self.build_command(context, cmd_flags) + self.log.info(f"Running command: {self.command}") + result = CloudRunExecuteJobOperator.execute(self, context) + logger.info(result) + + def build_command(self, context: Context, cmd_flags: list[str] | None = None) -> None: + # For the first round, we're going to assume that the command is dbt + # This means that we don't have openlineage support, but we will create a ticket + # to add that in the future + self.dbt_executable_path = "dbt" + dbt_cmd, env_vars = self.build_cmd(context=context, cmd_flags=cmd_flags) + self.environment_variables = {**env_vars, **self.environment_variables} + self.command = dbt_cmd + # Override Cloud Run Job default arguments with dbt command + self.overrides = { + "container_overrides": [ + { + "args": self.command, + "env": [{"name": key, "value": value} for key, value in self.environment_variables.items()], + } + ], + } + + +class DbtBuildGcpCloudRunJobOperator(DbtBuildMixin, DbtGcpCloudRunJobBaseOperator): + """ + Executes a dbt core build command. + """ + + template_fields: Sequence[str] = DbtGcpCloudRunJobBaseOperator.template_fields + DbtBuildMixin.template_fields # type: ignore[operator] + + def __init__(self, *args: Any, **kwargs: Any) -> None: + super().__init__(*args, **kwargs) + + +class DbtLSGcpCloudRunJobOperator(DbtLSMixin, DbtGcpCloudRunJobBaseOperator): + """ + Executes a dbt core ls command. + """ + + def __init__(self, *args: Any, **kwargs: Any) -> None: + super().__init__(*args, **kwargs) + + +class DbtSeedGcpCloudRunJobOperator(DbtSeedMixin, DbtGcpCloudRunJobBaseOperator): + """ + Executes a dbt core seed command. + + :param full_refresh: dbt optional arg - dbt will treat incremental models as table models + """ + + template_fields: Sequence[str] = DbtGcpCloudRunJobBaseOperator.template_fields + DbtSeedMixin.template_fields # type: ignore[operator] + + def __init__(self, *args: Any, **kwargs: Any) -> None: + super().__init__(*args, **kwargs) + + +class DbtSnapshotGcpCloudRunJobOperator(DbtSnapshotMixin, DbtGcpCloudRunJobBaseOperator): + """ + Executes a dbt core snapshot command. + """ + + def __init__(self, *args: Any, **kwargs: Any) -> None: + super().__init__(*args, **kwargs) + + +class DbtRunGcpCloudRunJobOperator(DbtRunMixin, DbtGcpCloudRunJobBaseOperator): + """ + Executes a dbt core run command. + """ + + template_fields: Sequence[str] = DbtGcpCloudRunJobBaseOperator.template_fields + DbtRunMixin.template_fields # type: ignore[operator] + + def __init__(self, *args: Any, **kwargs: Any) -> None: + super().__init__(*args, **kwargs) + + +class DbtTestGcpCloudRunJobOperator(DbtTestMixin, DbtGcpCloudRunJobBaseOperator): + """ + Executes a dbt core test command. + """ + + def __init__(self, on_warning_callback: Callable[..., Any] | None = None, **kwargs: str) -> None: + super().__init__(**kwargs) + # as of now, on_warning_callback in docker executor does nothing + self.on_warning_callback = on_warning_callback + + +class DbtRunOperationGcpCloudRunJobOperator(DbtRunOperationMixin, DbtGcpCloudRunJobBaseOperator): + """ + Executes a dbt core run-operation command. + + :param macro_name: name of macro to execute + :param args: Supply arguments to the macro. This dictionary will be mapped to the keyword arguments defined in the + selected macro. + """ + + template_fields: Sequence[str] = DbtGcpCloudRunJobBaseOperator.template_fields + DbtRunOperationMixin.template_fields # type: ignore[operator] + + def __init__(self, *args: Any, **kwargs: Any) -> None: + super().__init__(*args, **kwargs) diff --git a/docs/_static/cosmos_gcp_crj_schematic.png b/docs/_static/cosmos_gcp_crj_schematic.png new file mode 100644 index 000000000..a3abddb99 Binary files /dev/null and b/docs/_static/cosmos_gcp_crj_schematic.png differ diff --git a/docs/_static/jaffle_shop_big_query.png b/docs/_static/jaffle_shop_big_query.png new file mode 100644 index 000000000..28dd0a227 Binary files /dev/null and b/docs/_static/jaffle_shop_big_query.png differ diff --git a/docs/_static/jaffle_shop_gcp_cloud_run_job.png b/docs/_static/jaffle_shop_gcp_cloud_run_job.png new file mode 100644 index 000000000..71d0329a4 Binary files /dev/null and b/docs/_static/jaffle_shop_gcp_cloud_run_job.png differ diff --git a/docs/getting_started/execution-modes.rst b/docs/getting_started/execution-modes.rst index df9b0f26d..f80c3da9d 100644 --- a/docs/getting_started/execution-modes.rst +++ b/docs/getting_started/execution-modes.rst @@ -11,6 +11,7 @@ Cosmos can run ``dbt`` commands using five different approaches, called ``execut 4. **kubernetes**: Run ``dbt`` commands from Kubernetes Pods managed by Cosmos (requires a pre-existing Docker image) 5. **aws_eks**: Run ``dbt`` commands from AWS EKS Pods managed by Cosmos (requires a pre-existing Docker image) 6. **azure_container_instance**: Run ``dbt`` commands from Azure Container Instances managed by Cosmos (requires a pre-existing Docker image) +7. **gcp_cloud_run_job**: Run ``dbt`` commands from GCP Cloud Run Job instances managed by Cosmos (requires a pre-existing Docker image) The choice of the ``execution mode`` can vary based on each user's needs and concerns. For more details, check each execution mode described below. @@ -47,6 +48,10 @@ The choice of the ``execution mode`` can vary based on each user's needs and con - Slow - High - No + * - GCP Cloud Run Job Instance + - Slow + - High + - No Local ----- @@ -209,6 +214,29 @@ Each task will create a new container on Azure, giving full isolation. This, how }, ) +GCP Cloud Run Job +------------------------ +.. versionadded:: 1.7 +The ``gcp_cloud_run_job`` execution mode is particularly useful for users who prefer to run their ``dbt`` commands on Google Cloud infrastructure, taking advantage of Cloud Run's scalability, isolation, and managed service capabilities. + +For the ``gcp_cloud_run_job`` execution mode to work, a Cloud Run Job instance must first be created using a previously built Docker container. This container should include the latest ``dbt`` pipelines and profiles. You can find more details in the `Cloud Run Job creation guide `__ . + +This execution mode allows users to run ``dbt`` core CLI commands in a Google Cloud Run Job instance. This mode leverages the ``CloudRunExecuteJobOperator`` from the Google Cloud Airflow provider to execute commands within a Cloud Run Job instance, where ``dbt`` is already installed. Similarly to the ``Docker`` and ``Kubernetes`` execution modes, a Docker container should be available, containing the up-to-date ``dbt`` pipelines and profiles. + +Each task will create a new Cloud Run Job execution, giving full isolation. The separation of tasks adds extra overhead; however, that can be mitigated by using the ``concurrency`` parameter in ``DbtDag``, which will result in parallelized execution of ``dbt`` models. + + +.. code-block:: python + + gcp_cloud_run_job_cosmos_dag = DbtDag( + # ... + execution_config=ExecutionConfig(execution_mode=ExecutionMode.GCP_CLOUD_RUN_JOB), + operator_args={ + "project_id": "my-gcp-project-id", + "region": "europe-west1", + "job_name": "my-crj-{{ ti.task_id.replace('.','-').replace('_','-') }}", + }, + ) .. _invocation_modes: Invocation Modes diff --git a/docs/getting_started/gcp-cloud-run-job.rst b/docs/getting_started/gcp-cloud-run-job.rst new file mode 100644 index 000000000..a9a4b2261 --- /dev/null +++ b/docs/getting_started/gcp-cloud-run-job.rst @@ -0,0 +1,264 @@ +.. _gcp-cloud-run-job: + +GCP Cloud Run Job Execution Mode +======================================= +.. versionadded:: 1.7 +This tutorial will guide you through the steps required to use Cloud Run Job instance as the Execution Mode for your dbt code with Astronomer Cosmos. This guide will walk you through the steps required to build the following architecture: + +.. figure:: https://github.com/astronomer/astronomer-cosmos/raw/main/docs/_static/cosmos_gcp_crj_schematic.png + :width: 600 + +Prerequisites ++++++++++++++ +1. Docker with docker daemon (Docker Desktop on MacOS). Follow the `Docker installation guide `_. +2. Airflow +3. Google Cloud SDK (install guide here: `gcloud SDK `_) +4. Astronomer-cosmos package containing the dbt Cloud Run Job operators +5. GCP account with: + 1. A GCP project (`setup guide `_) + 2. IAM roles: + * Basic Role: `Owner `_ (control over whole project) or + * Predefined Roles: `Artifact Registry Administrator `_, `Cloud Run Developer `_ (control over specific services) + 3. Enabled service APIs: + * Artifact Registry API + * Cloud Run Admin API + * BigQuery API + 4. A service account with BigQuery roles: `JobUser `_ and `DataEditor `_ +6. Docker image built with required dbt project and dbt DAG +7. dbt DAG with Cloud Run Job operators in the Airflow DAGs directory to run in Airflow + +.. note:: + + Google Cloud Platform provides free tier on many resources, as well as Free Trial with $300 in credit. Learn more `here `_. + +More information on how to achieve 2-6 is detailed below. + + +Step-by-step guide +++++++++++++++++++ + +**Install Airflow and Cosmos** + +Create a python virtualenv, activate it, upgrade pip to the latest version and install ``apache airflow`` & ``astronomer cosmos``: + +.. code-block:: bash + + python3 -m venv venv + source venv/bin/activate + python3 -m pip install --upgrade pip + pip install apache-airflow + pip install "astronomer-cosmos[dbt-bigquery,gcp-cloud-run-job]" + +**Setup gcloud and environment variables** + +Set environment variables that will be used to create cloud infrastructure. Replace placeholders with your unique GCP ``project id`` and ``region`` of the project: + +.. code-block:: bash + + export PROJECT_ID=<<>> + export REGION=<<>> + export REPO_NAME="astronomer-cosmos-dbt" + export IMAGE_NAME="$REGION-docker.pkg.dev/$PROJECT_ID/$REPO_NAME/cosmos-example" + export SERVICE_ACCOUNT_NAME="cloud-run-job-sa" + export DATASET_NAME="astronomer_cosmos_example" + export CLOUD_RUN_JOB_NAME="astronomer-cosmos-example" + +Before we do anything in the GCP project, we first need to authorize gcloud to access the Cloud Platform with Google user credentials: + +.. code-block:: bash + + gcloud auth login + +You'll receive a link to sign into Google Cloud SDK using a Google Account. + +Next, set default ``project id`` using below command: + +.. code-block:: bash + + gcloud config set project $PROJECT_ID + +In case BigQuery has never been used before in the project, run below command to enable BigQuery API: + +.. code-block:: bash + + gcloud services enable bigquery.googleapis.com + +**Setup Artifact Registry** + +In order to run a container in Cloud Run Job, it needs access to the container image. In our setup, we will use Artifact Registry repository that stores images. +To use Artifact Registry, you need to enable the API first: + +.. code-block:: bash + + gcloud services enable artifactregistry.googleapis.com + +To set an Artifact Registry repository up, you can use the following bash command: + +.. code-block:: bash + + gcloud artifacts repositories create $REPO_NAME \ + --repository-format=docker \ + --location=$REGION \ + --project $PROJECT_ID + +**Setup Service Account** + +In order to use dbt and make transformations in BigQuery, Cloud Run Job needs some BigQuery permissions. One way to achieve that is to set up a separate ``Service Account`` with needed permissions: + +.. code-block:: bash + + # create a service account + gcloud iam service-accounts create $SERVICE_ACCOUNT_NAME + +.. code-block:: bash + + # grant JobUser role + gcloud projects add-iam-policy-binding $PROJECT_ID \ + --member="serviceAccount:$SERVICE_ACCOUNT_NAME@$PROJECT_ID.iam.gserviceaccount.com" \ + --role="roles/bigquery.jobUser" + +.. code-block:: bash + + # grant DataEditor role + gcloud projects add-iam-policy-binding $PROJECT_ID \ + --member="serviceAccount:$SERVICE_ACCOUNT_NAME@$PROJECT_ID.iam.gserviceaccount.com" \ + --role="roles/bigquery.dataEditor" + +**Build the dbt Docker image** + +Now, we are going to download an example dbt project and build a Docker image with it. + +.. important:: + + You need to ensure Docker is using the right credentials to push images. For Artifact Registry, this can be done by running the following command: + + .. code-block:: bash + + gcloud auth print-access-token | docker login -u oauth2accesstoken --password-stdin https://$REGION-docker.pkg.dev + + The token will be valid for 1 hour. After that, you need to create another one, if still needed. + +Clone the `cosmos-example `_ repo: + +.. code-block:: bash + + git clone https://github.com/astronomer/cosmos-example.git + cd cosmos-example + +Open `Dockerfile `_ located in ``gcp_cloud_run_job_example`` folder and change environments variables ``GCP_PROJECT_ID`` and ``GCP_REGION`` to your GCP project id and project region. + +Build a Docker image using previously modified ``Dockerfile``, which will be used by Cloud Run Job: + +.. code-block:: bash + + docker build -t $IMAGE_NAME -f gcp_cloud_run_job_example/Dockerfile.gcp_cloud_run_job . + +.. important:: + + Make sure to stay in ``cosmos-example`` directory when running ``docker build`` command. + +After this, the image needs to be pushed to the Artifact Registry: + +.. code-block:: bash + + docker push $IMAGE_NAME + +Take a read of the Dockerfile to understand what it does so that you could use it as a reference in your project. + + - The dags directory containing the `dbt project jaffle_shop `_ is added to the image + - The `bigquery dbt profile `_ file is added to the image + - The dbt_project.yml is replaced with `postgres_profile_dbt_project.yml `_ which contains the profile key pointing to postgres_profile as profile creation is not handled at the moment for K8s operators like in local mode. + +**Create Cloud Run Job instance** + +When the image is pushed to Artifact Registry, you can finally create Cloud Run Job with the image and previously created service account. + +First, enable Cloud Run Admin API using below command: + +.. code-block:: bash + + gcloud services enable run.googleapis.com + + +Next, set default Cloud Run region to your GCP region: + +.. code-block:: bash + + gcloud config set run/region $REGION + +Then, run below command to create Cloud Run Job instance: + +.. code-block:: bash + + gcloud run jobs create $CLOUD_RUN_JOB_NAME \ + --image=$IMAGE_NAME \ + --task-timeout=180s \ + --max-retries=0 \ + --cpu=1 \ + --memory=512Mi \ + --service-account=$SERVICE_ACCOUNT_NAME@$PROJECT_ID.iam.gserviceaccount.com + +**Setup Airflow Connections** + +Now, when you have the required Google Cloud infrastructure, you still need to check Airflow configuration to ensure the infrastructure can be used. You'll need a ``google_cloud_default`` connection in order to work on GCP resources. + +Check out the ``airflow-settings.yml`` file `here `_ for an example. If you are using Astro CLI, filling in the right values here will be enough for this to work. + +**Setup and Trigger the DAG with Airflow** + +Open `jaffle_shop_gcp_cloud_run_job `_ DAG file and update ``GCP_PROJECT_ID`` and ``GCP_LOCATION`` constants with your GCP project id and project region. + +When the DAG is configured, copy the ``dags`` directory from ``cosmos-example`` repo to your Airflow home: + +.. code-block:: bash + + cp -r dags $AIRFLOW_HOME/ + +Run Airflow: + +.. code-block:: bash + + airflow standalone + +.. note:: + + You might need to run airflow standalone with ``sudo`` if your Airflow user is not able to access the docker socket URL or pull the images in the Kind cluster. + +Log in to Airflow through a web browser ``http://localhost:8080/``, using the user ``airflow`` and the password described in the ``standalone_admin_password.txt`` file. + +Enable and trigger a run of the `jaffle_shop_gcp_cloud_run_job `_ DAG. You will be able to see the following successful DAG run. + +.. figure:: https://github.com/astronomer/astronomer-cosmos/raw/main/docs/_static/jaffle_shop_gcp_cloud_run_job.png + :width: 800 + + +You can also verify the tables that were created using dbt in BigQuery Studio: + +.. figure:: https://github.com/astronomer/astronomer-cosmos/raw/main/docs/_static/jaffle_shop_big_query.png + :width: 800 + + +**Delete resources** + +After the successfull tests, don't forget to delete Google Cloud resources to save up costs: + +.. code-block:: bash + + # Delete Cloud Run Job instance + + gcloud run jobs delete $CLOUD_RUN_JOB_NAME + +.. code-block:: bash + + # Delete BigQuery main and custom dataset specified in dbt schema.yml with all tables included + + bq rm -r -f -d $PROJECT_ID:$DATASET_NAME + + bq rm -r -f -d $PROJECT_ID:dbt_dev + +.. code-block:: bash + + # Delete Artifact Registry repository with all images included + + gcloud artifacts repositories delete $REPO_NAME \ + --location=$REGION \ No newline at end of file diff --git a/docs/getting_started/index.rst b/docs/getting_started/index.rst index 247f8378e..ed1952793 100644 --- a/docs/getting_started/index.rst +++ b/docs/getting_started/index.rst @@ -12,6 +12,7 @@ Docker Execution Mode Kubernetes Execution Mode Azure Container Instance Execution Mode + GCP Cloud Run Job Execution Mode dbt and Airflow Similar Concepts @@ -40,6 +41,8 @@ For specific guides, see the following: - `Executing dbt DAGs with Docker Operators `__ - `Executing dbt DAGs with KubernetesPodOperators `__ +- `Executing dbt DAGs with AzureContainerInstancesOperators `__ +- `Executing dbt DAGs with GcpCloudRunExecuteJobOperators `__ Concepts Overview diff --git a/pyproject.toml b/pyproject.toml index 93b97541f..8908867ff 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -69,7 +69,7 @@ openlineage = ["openlineage-integration-common!=1.15.0", "openlineage-airflow"] amazon = [ "apache-airflow-providers-amazon[s3fs]>=3.0.0", ] -google = ["apache-airflow-providers-google"] +google = ["apache-airflow-providers-google>=10.11.0"] microsoft = ["apache-airflow-providers-microsoft-azure"] all = [ "astronomer-cosmos[dbt-all]", @@ -111,6 +111,9 @@ aws_eks = [ azure-container-instance = [ "apache-airflow-providers-microsoft-azure>=8.4.0", ] +gcp-cloud-run-job = [ + "apache-airflow-providers-google>=10.11.0", +] [project.entry-points.apache_airflow_provider] provider_info = "cosmos:get_provider_info" @@ -160,7 +163,9 @@ airflow = ["2.4", "2.5", "2.6", "2.7", "2.8", "2.9"] [tool.hatch.envs.tests.overrides] matrix.airflow.dependencies = [ - { value = "typing_extensions<4.6", if = ["2.6"] } + { value = "typing_extensions<4.6", if = ["2.6"] }, + { value = "apache-airflow-providers-google<10.11.0", if = ["2.4"] }, + { value = "apache-airflow-providers-google>=10.11.0", if = ["2.5", "2.6", "2.7", "2.8", "2.9", "2.10"] }, ] [tool.hatch.envs.tests.scripts] @@ -192,7 +197,7 @@ dependencies = [ "aenum", "apache-airflow-providers-amazon[s3fs]>=3.0.0", "apache-airflow-providers-cncf-kubernetes>=5.1.1", - "apache-airflow-providers-google", + "apache-airflow-providers-google>=10.11.0", "apache-airflow-providers-microsoft-azure", "msgpack", "openlineage-airflow", diff --git a/tests/operators/test_gcp_cloud_run_job.py b/tests/operators/test_gcp_cloud_run_job.py new file mode 100644 index 000000000..a6f16942a --- /dev/null +++ b/tests/operators/test_gcp_cloud_run_job.py @@ -0,0 +1,247 @@ +import inspect +from pathlib import Path +from unittest.mock import MagicMock, patch + +import pkg_resources +import pytest +from airflow.utils.context import Context +from pendulum import datetime + +try: + from cosmos.operators.gcp_cloud_run_job import ( + DbtGcpCloudRunJobBaseOperator, + DbtLSGcpCloudRunJobOperator, + DbtRunGcpCloudRunJobOperator, + DbtSeedGcpCloudRunJobOperator, + DbtTestGcpCloudRunJobOperator, + DbtBuildGcpCloudRunJobOperator, + DbtSnapshotGcpCloudRunJobOperator, + DbtRunOperationGcpCloudRunJobOperator, + ) + + class ConcreteDbtGcpCloudRunJobOperator(DbtGcpCloudRunJobBaseOperator): + base_cmd = ["cmd"] + +except (ImportError, AttributeError): + DbtGcpCloudRunJobBaseOperator = None + + +BASE_KWARGS = { + "task_id": "my-task", + "project_id": "my-gcp-project-id", + "region": "europe-west1", + "job_name": "my-fantastic-dbt-job", + "environment_variables": {"FOO": "BAR", "OTHER_FOO": "OTHER_BAR"}, + "project_dir": "my/dir", + "vars": { + "start_time": "{{ data_interval_start.strftime('%Y%m%d%H%M%S') }}", + "end_time": "{{ data_interval_end.strftime('%Y%m%d%H%M%S') }}", + }, + "no_version_check": True, +} + + +def skip_on_empty_operator(test_func): + """ + Skip the test if DbtGcpCloudRunJob operators couldn't be imported. + It is required as some tests don't rely on those operators and in this case we need to avoid throwing an exception. + """ + return pytest.mark.skipif( + DbtGcpCloudRunJobBaseOperator is None, reason="DbtGcpCloudRunJobBaseOperator could not be imported" + )(test_func) + + +def test_overrides_missing(): + """ + The overrides parameter needed to pass the dbt command was added in apache-airflow-providers-google==10.11.0. + We need to check if the parameter is actually present in required version. + """ + required_version = "10.11.0" + package_name = "apache-airflow-providers-google" + + from airflow.providers.google.cloud.operators.cloud_run import CloudRunExecuteJobOperator + + installed_version = pkg_resources.get_distribution(package_name).version + init_signature = inspect.signature(CloudRunExecuteJobOperator.__init__) + + if pkg_resources.parse_version(installed_version) < pkg_resources.parse_version(required_version): + assert "overrides" not in init_signature.parameters + else: + assert "overrides" in init_signature.parameters + + +@skip_on_empty_operator +def test_dbt_gcp_cloud_run_job_operator_add_global_flags() -> None: + """ + Check if global flags are added correctly. + """ + dbt_base_operator = ConcreteDbtGcpCloudRunJobOperator( + task_id="my-task", + project_id="my-gcp-project-id", + region="europe-west1", + job_name="my-fantastic-dbt-job", + project_dir="my/dir", + vars={ + "start_time": "{{ data_interval_start.strftime('%Y%m%d%H%M%S') }}", + "end_time": "{{ data_interval_end.strftime('%Y%m%d%H%M%S') }}", + }, + no_version_check=True, + ) + assert dbt_base_operator.add_global_flags() == [ + "--vars", + "end_time: '{{ data_interval_end.strftime(''%Y%m%d%H%M%S'') }}'\n" + "start_time: '{{ data_interval_start.strftime(''%Y%m%d%H%M%S'') }}'\n", + "--no-version-check", + ] + + +@skip_on_empty_operator +@patch("cosmos.operators.base.context_to_airflow_vars") +def test_dbt_gcp_cloud_run_job_operator_get_env(p_context_to_airflow_vars: MagicMock) -> None: + """ + If an end user passes in a variable via the context that is also a global flag, validate that the both are kept + """ + dbt_base_operator = ConcreteDbtGcpCloudRunJobOperator( + task_id="my-task", + project_id="my-gcp-project-id", + region="europe-west1", + job_name="my-fantastic-dbt-job", + project_dir="my/dir", + ) + dbt_base_operator.env = { + "start_date": "20220101", + "end_date": "20220102", + "some_path": Path(__file__), + "retries": 3, + ("tuple", "key"): "some_value", + } + p_context_to_airflow_vars.return_value = {"START_DATE": "2023-02-15 12:30:00"} + env = dbt_base_operator.get_env( + Context(execution_date=datetime(2023, 2, 15, 12, 30)), + ) + expected_env = { + "start_date": "20220101", + "end_date": "20220102", + "some_path": Path(__file__), + "START_DATE": "2023-02-15 12:30:00", + } + assert env == expected_env + + +@skip_on_empty_operator +@patch("cosmos.operators.base.context_to_airflow_vars") +def test_dbt_gcp_cloud_run_job_operator_check_environment_variables( + p_context_to_airflow_vars: MagicMock, +) -> None: + """ + If an end user passes in a variable via the context that is also a global flag, validate that the both are kept + """ + dbt_base_operator = ConcreteDbtGcpCloudRunJobOperator( + task_id="my-task", + project_id="my-gcp-project-id", + region="europe-west1", + job_name="my-fantastic-dbt-job", + project_dir="my/dir", + environment_variables={"FOO": "BAR"}, + ) + dbt_base_operator.env = { + "start_date": "20220101", + "end_date": "20220102", + "some_path": Path(__file__), + "retries": 3, + "FOO": "foo", + ("tuple", "key"): "some_value", + } + expected_env = {"start_date": "20220101", "end_date": "20220102", "some_path": Path(__file__), "FOO": "BAR"} + dbt_base_operator.build_command(context=MagicMock()) + + assert dbt_base_operator.environment_variables == expected_env + + +@skip_on_empty_operator +def test_dbt_gcp_cloud_run_job_build_command(): + """ + Check whether the dbt command is built correctly. + """ + + result_map = { + "ls": DbtLSGcpCloudRunJobOperator(**BASE_KWARGS), + "run": DbtRunGcpCloudRunJobOperator(**BASE_KWARGS), + "test": DbtTestGcpCloudRunJobOperator(**BASE_KWARGS), + "seed": DbtSeedGcpCloudRunJobOperator(**BASE_KWARGS), + "build": DbtBuildGcpCloudRunJobOperator(**BASE_KWARGS), + "snapshot": DbtSnapshotGcpCloudRunJobOperator(**BASE_KWARGS), + "run-operation": DbtRunOperationGcpCloudRunJobOperator(macro_name="some-macro", **BASE_KWARGS), + } + + for command_name, command_operator in result_map.items(): + command_operator.build_command(context=MagicMock(), cmd_flags=MagicMock()) + if command_name != "run-operation": + assert command_operator.command == [ + "dbt", + command_name, + "--vars", + "end_time: '{{ data_interval_end.strftime(''%Y%m%d%H%M%S'') }}'\n" + "start_time: '{{ data_interval_start.strftime(''%Y%m%d%H%M%S'') }}'\n", + "--no-version-check", + ] + else: + assert command_operator.command == [ + "dbt", + command_name, + "some-macro", + "--vars", + "end_time: '{{ data_interval_end.strftime(''%Y%m%d%H%M%S'') }}'\n" + "start_time: '{{ data_interval_start.strftime(''%Y%m%d%H%M%S'') }}'\n", + "--no-version-check", + ] + + +@skip_on_empty_operator +def test_dbt_gcp_cloud_run_job_overrides_parameter(): + """ + Check whether overrides parameter passed on to CloudRunExecuteJobOperator is built correctly. + """ + + run_operator = DbtRunGcpCloudRunJobOperator(**BASE_KWARGS) + run_operator.build_command(context=MagicMock(), cmd_flags=MagicMock()) + + actual_overrides = run_operator.overrides + + assert "container_overrides" in actual_overrides + actual_container_overrides = actual_overrides["container_overrides"][0] + + assert isinstance(actual_container_overrides["args"], list), "`args` should be of type list" + + assert "env" in actual_container_overrides + actual_env = actual_container_overrides["env"] + + expected_env_vars = [{"name": "FOO", "value": "BAR"}, {"name": "OTHER_FOO", "value": "OTHER_BAR"}] + + for expected_env_var in expected_env_vars: + assert expected_env_var in actual_env + + +@skip_on_empty_operator +@patch("cosmos.operators.gcp_cloud_run_job.CloudRunExecuteJobOperator.execute") +def test_dbt_gcp_cloud_run_job_build_and_run_cmd(mock_execute): + """ + Check that building methods run correctly. + """ + + dbt_base_operator = ConcreteDbtGcpCloudRunJobOperator( + task_id="my-task", + project_id="my-gcp-project-id", + region="europe-west1", + job_name="my-fantastic-dbt-job", + project_dir="my/dir", + environment_variables={"FOO": "BAR"}, + ) + mock_build_command = MagicMock() + dbt_base_operator.build_command = mock_build_command + + mock_context = MagicMock() + dbt_base_operator.build_and_run_cmd(context=mock_context) + + mock_build_command.assert_called_with(mock_context, None) + mock_execute.assert_called_once_with(dbt_base_operator, mock_context)