Skip to content

Commit

Permalink
Add GCP_CLOUD_RUN_JOB execution mode (#1153)
Browse files Browse the repository at this point in the history
Added new `GCP_CLOUD_RUN_JOB` execution mode that triggers Google Cloud
Platform's Cloud Run Job instance with dbt model in it.

It extends Airflow's `CloudRunExecuteJobOperator` and overrides Cloud
Run Job's container with dbt command generated by cosmos.

Note: `CloudRunExecuteJobOperator` has `container_overrides` parameter
implemented in `apache-airflow-providers-google==10.13.0` which is
supported by `airflow >=2.6.0`.

Resolves #1149 

Co-authored-by: Agata Zalewska <[email protected]>
Co-authored-by: Tatiana Al-Chueyr <[email protected]>
  • Loading branch information
3 people committed Sep 26, 2024
1 parent 243acfd commit 56ff6dd
Show file tree
Hide file tree
Showing 12 changed files with 767 additions and 6 deletions.
42 changes: 42 additions & 0 deletions cosmos/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,40 @@
DbtTestAwsEksOperator = MissingPackage("cosmos.operators.azure_container_instance.DbtTestAwsEksOperator", "aws_eks")


try:
from cosmos.operators.gcp_cloud_run_job import (
DbtBuildGcpCloudRunJobOperator,
DbtLSGcpCloudRunJobOperator,
DbtRunGcpCloudRunJobOperator,
DbtRunOperationGcpCloudRunJobOperator,
DbtSeedGcpCloudRunJobOperator,
DbtSnapshotGcpCloudRunJobOperator,
DbtTestGcpCloudRunJobOperator,
)
except (ImportError, AttributeError):
DbtBuildGcpCloudRunJobOperator = MissingPackage(
"cosmos.operators.gcp_cloud_run_job.DbtBuildGcpCloudRunJobOperator", "gcp-cloud-run-job"
)
DbtLSGcpCloudRunJobOperator = MissingPackage(
"cosmos.operators.gcp_cloud_run_job.DbtLSGcpCloudRunJobOperator", "gcp-cloud-run-job"
)
DbtRunGcpCloudRunJobOperator = MissingPackage(
"cosmos.operators.gcp_cloud_run_job.DbtRunGcpCloudRunJobOperator", "gcp-cloud-run-job"
)
DbtRunOperationGcpCloudRunJobOperator = MissingPackage(
"cosmos.operators.gcp_cloud_run_job.DbtRunOperationGcpCloudRunJobOperator", "gcp-cloud-run-job"
)
DbtSeedGcpCloudRunJobOperator = MissingPackage(
"cosmos.operators.gcp_cloud_run_job.DbtSeedGcpCloudRunJobOperator", "gcp-cloud-run-job"
)
DbtSnapshotGcpCloudRunJobOperator = MissingPackage(
"cosmos.operators.gcp_cloud_run_job.DbtSnapshotGcpCloudRunJobOperator", "gcp-cloud-run-job"
)
DbtTestGcpCloudRunJobOperator = MissingPackage(
"cosmos.operators.gcp_cloud_run_job.DbtTestGcpCloudRunJobOperator", "gcp-cloud-run-job"
)


__all__ = [
"ProjectConfig",
"ProfileConfig",
Expand Down Expand Up @@ -221,6 +255,14 @@
"DbtSeedAwsEksOperator",
"DbtSnapshotAwsEksOperator",
"DbtTestAwsEksOperator",
# GCP Cloud Run Job Execution Mode
"DbtBuildGcpCloudRunJobOperator",
"DbtLSGcpCloudRunJobOperator",
"DbtRunGcpCloudRunJobOperator",
"DbtRunOperationGcpCloudRunJobOperator",
"DbtSeedGcpCloudRunJobOperator",
"DbtSnapshotGcpCloudRunJobOperator",
"DbtTestGcpCloudRunJobOperator",
]

"""
Expand Down
1 change: 1 addition & 0 deletions cosmos/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ class ExecutionMode(Enum):
AWS_EKS = "aws_eks"
VIRTUALENV = "virtualenv"
AZURE_CONTAINER_INSTANCE = "azure_container_instance"
GCP_CLOUD_RUN_JOB = "gcp_cloud_run_job"


class InvocationMode(Enum):
Expand Down
7 changes: 3 additions & 4 deletions cosmos/converter.py
Original file line number Diff line number Diff line change
Expand Up @@ -116,10 +116,9 @@ def validate_initial_user_config(
:param render_config: Configuration related to how to convert the dbt workflow into an Airflow DAG
:param operator_args: Arguments to pass to the underlying operators.
"""
if profile_config is None and execution_config.execution_mode not in (
ExecutionMode.KUBERNETES,
ExecutionMode.AWS_EKS,
ExecutionMode.DOCKER,
if profile_config is None and execution_config.execution_mode in (
ExecutionMode.LOCAL,
ExecutionMode.VIRTUALENV,
):
raise CosmosValueError(f"The profile_config is mandatory when using {execution_config.execution_mode}")

Expand Down
172 changes: 172 additions & 0 deletions cosmos/operators/gcp_cloud_run_job.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,172 @@
from __future__ import annotations

import inspect
from typing import Any, Callable, Sequence

from airflow.utils.context import Context

from cosmos.config import ProfileConfig
from cosmos.log import get_logger
from cosmos.operators.base import (
AbstractDbtBaseOperator,
DbtBuildMixin,
DbtLSMixin,
DbtRunMixin,
DbtRunOperationMixin,
DbtSeedMixin,
DbtSnapshotMixin,
DbtTestMixin,
)

logger = get_logger(__name__)

DEFAULT_ENVIRONMENT_VARIABLES: dict[str, str] = {}

try:
from airflow.providers.google.cloud.operators.cloud_run import CloudRunExecuteJobOperator

# The overrides parameter needed to pass the dbt command was added in apache-airflow-providers-google==10.13.0
init_signature = inspect.signature(CloudRunExecuteJobOperator.__init__)
if "overrides" not in init_signature.parameters:
raise AttributeError(
"CloudRunExecuteJobOperator does not have `overrides` attribute. Ensure you've installed apache-airflow-providers-google of at least 10.11.0 "
"separately or with `pip install astronomer-cosmos[...,gcp-cloud-run-job]`."
)
except ImportError:
raise ImportError(
"Could not import CloudRunExecuteJobOperator. Ensure you've installed the Google Cloud provider "
"separately or with `pip install astronomer-cosmos[...,gcp-cloud-run-job]`."
)


class DbtGcpCloudRunJobBaseOperator(AbstractDbtBaseOperator, CloudRunExecuteJobOperator): # type: ignore
"""
Executes a dbt core cli command in a Cloud Run Job instance with dbt installed in it.
"""

template_fields: Sequence[str] = tuple(
list(AbstractDbtBaseOperator.template_fields) + list(CloudRunExecuteJobOperator.template_fields)
)

intercept_flag = False

def __init__(
self,
# arguments required by CloudRunExecuteJobOperator
project_id: str,
region: str,
job_name: str,
#
profile_config: ProfileConfig | None = None,
command: list[str] | None = None,
environment_variables: dict[str, Any] | None = None,
**kwargs: Any,
) -> None:
self.profile_config = profile_config
self.command = command
self.environment_variables = environment_variables or DEFAULT_ENVIRONMENT_VARIABLES
super().__init__(project_id=project_id, region=region, job_name=job_name, **kwargs)

def build_and_run_cmd(self, context: Context, cmd_flags: list[str] | None = None) -> Any:
self.build_command(context, cmd_flags)
self.log.info(f"Running command: {self.command}")
result = CloudRunExecuteJobOperator.execute(self, context)
logger.info(result)

def build_command(self, context: Context, cmd_flags: list[str] | None = None) -> None:
# For the first round, we're going to assume that the command is dbt
# This means that we don't have openlineage support, but we will create a ticket
# to add that in the future
self.dbt_executable_path = "dbt"
dbt_cmd, env_vars = self.build_cmd(context=context, cmd_flags=cmd_flags)
self.environment_variables = {**env_vars, **self.environment_variables}
self.command = dbt_cmd
# Override Cloud Run Job default arguments with dbt command
self.overrides = {
"container_overrides": [
{
"args": self.command,
"env": [{"name": key, "value": value} for key, value in self.environment_variables.items()],
}
],
}


class DbtBuildGcpCloudRunJobOperator(DbtBuildMixin, DbtGcpCloudRunJobBaseOperator):
"""
Executes a dbt core build command.
"""

template_fields: Sequence[str] = DbtGcpCloudRunJobBaseOperator.template_fields + DbtBuildMixin.template_fields # type: ignore[operator]

def __init__(self, *args: Any, **kwargs: Any) -> None:
super().__init__(*args, **kwargs)


class DbtLSGcpCloudRunJobOperator(DbtLSMixin, DbtGcpCloudRunJobBaseOperator):
"""
Executes a dbt core ls command.
"""

def __init__(self, *args: Any, **kwargs: Any) -> None:
super().__init__(*args, **kwargs)


class DbtSeedGcpCloudRunJobOperator(DbtSeedMixin, DbtGcpCloudRunJobBaseOperator):
"""
Executes a dbt core seed command.
:param full_refresh: dbt optional arg - dbt will treat incremental models as table models
"""

template_fields: Sequence[str] = DbtGcpCloudRunJobBaseOperator.template_fields + DbtSeedMixin.template_fields # type: ignore[operator]

def __init__(self, *args: Any, **kwargs: Any) -> None:
super().__init__(*args, **kwargs)


class DbtSnapshotGcpCloudRunJobOperator(DbtSnapshotMixin, DbtGcpCloudRunJobBaseOperator):
"""
Executes a dbt core snapshot command.
"""

def __init__(self, *args: Any, **kwargs: Any) -> None:
super().__init__(*args, **kwargs)


class DbtRunGcpCloudRunJobOperator(DbtRunMixin, DbtGcpCloudRunJobBaseOperator):
"""
Executes a dbt core run command.
"""

template_fields: Sequence[str] = DbtGcpCloudRunJobBaseOperator.template_fields + DbtRunMixin.template_fields # type: ignore[operator]

def __init__(self, *args: Any, **kwargs: Any) -> None:
super().__init__(*args, **kwargs)


class DbtTestGcpCloudRunJobOperator(DbtTestMixin, DbtGcpCloudRunJobBaseOperator):
"""
Executes a dbt core test command.
"""

def __init__(self, on_warning_callback: Callable[..., Any] | None = None, **kwargs: str) -> None:
super().__init__(**kwargs)
# as of now, on_warning_callback in docker executor does nothing
self.on_warning_callback = on_warning_callback


class DbtRunOperationGcpCloudRunJobOperator(DbtRunOperationMixin, DbtGcpCloudRunJobBaseOperator):
"""
Executes a dbt core run-operation command.
:param macro_name: name of macro to execute
:param args: Supply arguments to the macro. This dictionary will be mapped to the keyword arguments defined in the
selected macro.
"""

template_fields: Sequence[str] = DbtGcpCloudRunJobBaseOperator.template_fields + DbtRunOperationMixin.template_fields # type: ignore[operator]

def __init__(self, *args: Any, **kwargs: Any) -> None:
super().__init__(*args, **kwargs)
Binary file added docs/_static/cosmos_gcp_crj_schematic.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added docs/_static/jaffle_shop_big_query.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added docs/_static/jaffle_shop_gcp_cloud_run_job.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
28 changes: 28 additions & 0 deletions docs/getting_started/execution-modes.rst
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ Cosmos can run ``dbt`` commands using five different approaches, called ``execut
4. **kubernetes**: Run ``dbt`` commands from Kubernetes Pods managed by Cosmos (requires a pre-existing Docker image)
5. **aws_eks**: Run ``dbt`` commands from AWS EKS Pods managed by Cosmos (requires a pre-existing Docker image)
6. **azure_container_instance**: Run ``dbt`` commands from Azure Container Instances managed by Cosmos (requires a pre-existing Docker image)
7. **gcp_cloud_run_job**: Run ``dbt`` commands from GCP Cloud Run Job instances managed by Cosmos (requires a pre-existing Docker image)

The choice of the ``execution mode`` can vary based on each user's needs and concerns. For more details, check each execution mode described below.

Expand Down Expand Up @@ -47,6 +48,10 @@ The choice of the ``execution mode`` can vary based on each user's needs and con
- Slow
- High
- No
* - GCP Cloud Run Job Instance
- Slow
- High
- No

Local
-----
Expand Down Expand Up @@ -209,6 +214,29 @@ Each task will create a new container on Azure, giving full isolation. This, how
},
)
GCP Cloud Run Job
------------------------
.. versionadded:: 1.7
The ``gcp_cloud_run_job`` execution mode is particularly useful for users who prefer to run their ``dbt`` commands on Google Cloud infrastructure, taking advantage of Cloud Run's scalability, isolation, and managed service capabilities.

Check warning on line 220 in docs/getting_started/execution-modes.rst

View workflow job for this annotation

GitHub Actions / pages

Explicit markup ends without a blank line; unexpected unindent. [docutils]

For the ``gcp_cloud_run_job`` execution mode to work, a Cloud Run Job instance must first be created using a previously built Docker container. This container should include the latest ``dbt`` pipelines and profiles. You can find more details in the `Cloud Run Job creation guide <https://cloud.google.com/run/docs/create-jobs>`__ .

This execution mode allows users to run ``dbt`` core CLI commands in a Google Cloud Run Job instance. This mode leverages the ``CloudRunExecuteJobOperator`` from the Google Cloud Airflow provider to execute commands within a Cloud Run Job instance, where ``dbt`` is already installed. Similarly to the ``Docker`` and ``Kubernetes`` execution modes, a Docker container should be available, containing the up-to-date ``dbt`` pipelines and profiles.

Each task will create a new Cloud Run Job execution, giving full isolation. The separation of tasks adds extra overhead; however, that can be mitigated by using the ``concurrency`` parameter in ``DbtDag``, which will result in parallelized execution of ``dbt`` models.


.. code-block:: python
gcp_cloud_run_job_cosmos_dag = DbtDag(
# ...
execution_config=ExecutionConfig(execution_mode=ExecutionMode.GCP_CLOUD_RUN_JOB),
operator_args={
"project_id": "my-gcp-project-id",
"region": "europe-west1",
"job_name": "my-crj-{{ ti.task_id.replace('.','-').replace('_','-') }}",
},
)
.. _invocation_modes:
Invocation Modes
Expand Down
Loading

0 comments on commit 56ff6dd

Please sign in to comment.