Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add GCP_CLOUD_RUN_JOB execution mode #1153

Merged
merged 3 commits into from
Sep 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 42 additions & 0 deletions cosmos/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,40 @@
DbtTestAwsEksOperator = MissingPackage("cosmos.operators.azure_container_instance.DbtTestAwsEksOperator", "aws_eks")


try:
from cosmos.operators.gcp_cloud_run_job import (
DbtBuildGcpCloudRunJobOperator,
DbtLSGcpCloudRunJobOperator,
DbtRunGcpCloudRunJobOperator,
DbtRunOperationGcpCloudRunJobOperator,
DbtSeedGcpCloudRunJobOperator,
DbtSnapshotGcpCloudRunJobOperator,
DbtTestGcpCloudRunJobOperator,
)
except (ImportError, AttributeError):
DbtBuildGcpCloudRunJobOperator = MissingPackage(
"cosmos.operators.gcp_cloud_run_job.DbtBuildGcpCloudRunJobOperator", "gcp-cloud-run-job"
)
DbtLSGcpCloudRunJobOperator = MissingPackage(
"cosmos.operators.gcp_cloud_run_job.DbtLSGcpCloudRunJobOperator", "gcp-cloud-run-job"
)
DbtRunGcpCloudRunJobOperator = MissingPackage(
"cosmos.operators.gcp_cloud_run_job.DbtRunGcpCloudRunJobOperator", "gcp-cloud-run-job"
)
DbtRunOperationGcpCloudRunJobOperator = MissingPackage(
"cosmos.operators.gcp_cloud_run_job.DbtRunOperationGcpCloudRunJobOperator", "gcp-cloud-run-job"
)
DbtSeedGcpCloudRunJobOperator = MissingPackage(
"cosmos.operators.gcp_cloud_run_job.DbtSeedGcpCloudRunJobOperator", "gcp-cloud-run-job"
)
DbtSnapshotGcpCloudRunJobOperator = MissingPackage(
"cosmos.operators.gcp_cloud_run_job.DbtSnapshotGcpCloudRunJobOperator", "gcp-cloud-run-job"
)
DbtTestGcpCloudRunJobOperator = MissingPackage(
"cosmos.operators.gcp_cloud_run_job.DbtTestGcpCloudRunJobOperator", "gcp-cloud-run-job"
)


__all__ = [
"ProjectConfig",
"ProfileConfig",
Expand Down Expand Up @@ -221,6 +255,14 @@
"DbtSeedAwsEksOperator",
"DbtSnapshotAwsEksOperator",
"DbtTestAwsEksOperator",
# GCP Cloud Run Job Execution Mode
"DbtBuildGcpCloudRunJobOperator",
"DbtLSGcpCloudRunJobOperator",
"DbtRunGcpCloudRunJobOperator",
"DbtRunOperationGcpCloudRunJobOperator",
"DbtSeedGcpCloudRunJobOperator",
"DbtSnapshotGcpCloudRunJobOperator",
"DbtTestGcpCloudRunJobOperator",
]

"""
Expand Down
1 change: 1 addition & 0 deletions cosmos/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ class ExecutionMode(Enum):
AWS_EKS = "aws_eks"
VIRTUALENV = "virtualenv"
AZURE_CONTAINER_INSTANCE = "azure_container_instance"
GCP_CLOUD_RUN_JOB = "gcp_cloud_run_job"


class InvocationMode(Enum):
Expand Down
7 changes: 3 additions & 4 deletions cosmos/converter.py
Original file line number Diff line number Diff line change
Expand Up @@ -116,10 +116,9 @@ def validate_initial_user_config(
:param render_config: Configuration related to how to convert the dbt workflow into an Airflow DAG
:param operator_args: Arguments to pass to the underlying operators.
"""
if profile_config is None and execution_config.execution_mode not in (
ExecutionMode.KUBERNETES,
ExecutionMode.AWS_EKS,
ExecutionMode.DOCKER,
if profile_config is None and execution_config.execution_mode in (
ExecutionMode.LOCAL,
ExecutionMode.VIRTUALENV,
):
raise CosmosValueError(f"The profile_config is mandatory when using {execution_config.execution_mode}")

Expand Down
172 changes: 172 additions & 0 deletions cosmos/operators/gcp_cloud_run_job.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,172 @@
from __future__ import annotations

import inspect
from typing import Any, Callable, Sequence

from airflow.utils.context import Context

from cosmos.config import ProfileConfig
from cosmos.log import get_logger
from cosmos.operators.base import (
AbstractDbtBaseOperator,
DbtBuildMixin,
DbtLSMixin,
DbtRunMixin,
DbtRunOperationMixin,
DbtSeedMixin,
DbtSnapshotMixin,
DbtTestMixin,
)

logger = get_logger(__name__)

DEFAULT_ENVIRONMENT_VARIABLES: dict[str, str] = {}

try:
from airflow.providers.google.cloud.operators.cloud_run import CloudRunExecuteJobOperator

# The overrides parameter needed to pass the dbt command was added in apache-airflow-providers-google==10.13.0
init_signature = inspect.signature(CloudRunExecuteJobOperator.__init__)
if "overrides" not in init_signature.parameters:
raise AttributeError(
"CloudRunExecuteJobOperator does not have `overrides` attribute. Ensure you've installed apache-airflow-providers-google of at least 10.11.0 "
"separately or with `pip install astronomer-cosmos[...,gcp-cloud-run-job]`."
)
except ImportError:
raise ImportError(
"Could not import CloudRunExecuteJobOperator. Ensure you've installed the Google Cloud provider "
"separately or with `pip install astronomer-cosmos[...,gcp-cloud-run-job]`."
)


class DbtGcpCloudRunJobBaseOperator(AbstractDbtBaseOperator, CloudRunExecuteJobOperator): # type: ignore
"""
Executes a dbt core cli command in a Cloud Run Job instance with dbt installed in it.

"""

template_fields: Sequence[str] = tuple(
list(AbstractDbtBaseOperator.template_fields) + list(CloudRunExecuteJobOperator.template_fields)
)

intercept_flag = False

def __init__(
self,
# arguments required by CloudRunExecuteJobOperator
project_id: str,
region: str,
job_name: str,
#
profile_config: ProfileConfig | None = None,
command: list[str] | None = None,
environment_variables: dict[str, Any] | None = None,
**kwargs: Any,
) -> None:
self.profile_config = profile_config
self.command = command
self.environment_variables = environment_variables or DEFAULT_ENVIRONMENT_VARIABLES
super().__init__(project_id=project_id, region=region, job_name=job_name, **kwargs)

def build_and_run_cmd(self, context: Context, cmd_flags: list[str] | None = None) -> Any:
self.build_command(context, cmd_flags)
self.log.info(f"Running command: {self.command}")
result = CloudRunExecuteJobOperator.execute(self, context)
logger.info(result)

def build_command(self, context: Context, cmd_flags: list[str] | None = None) -> None:
# For the first round, we're going to assume that the command is dbt
# This means that we don't have openlineage support, but we will create a ticket
# to add that in the future
self.dbt_executable_path = "dbt"
dbt_cmd, env_vars = self.build_cmd(context=context, cmd_flags=cmd_flags)
self.environment_variables = {**env_vars, **self.environment_variables}
self.command = dbt_cmd
# Override Cloud Run Job default arguments with dbt command
self.overrides = {
"container_overrides": [
{
"args": self.command,
"env": [{"name": key, "value": value} for key, value in self.environment_variables.items()],
}
],
}


class DbtBuildGcpCloudRunJobOperator(DbtBuildMixin, DbtGcpCloudRunJobBaseOperator):
"""
Executes a dbt core build command.
"""

template_fields: Sequence[str] = DbtGcpCloudRunJobBaseOperator.template_fields + DbtBuildMixin.template_fields # type: ignore[operator]

def __init__(self, *args: Any, **kwargs: Any) -> None:
super().__init__(*args, **kwargs)


class DbtLSGcpCloudRunJobOperator(DbtLSMixin, DbtGcpCloudRunJobBaseOperator):
"""
Executes a dbt core ls command.
"""

def __init__(self, *args: Any, **kwargs: Any) -> None:
super().__init__(*args, **kwargs)


class DbtSeedGcpCloudRunJobOperator(DbtSeedMixin, DbtGcpCloudRunJobBaseOperator):
"""
Executes a dbt core seed command.

:param full_refresh: dbt optional arg - dbt will treat incremental models as table models
"""

template_fields: Sequence[str] = DbtGcpCloudRunJobBaseOperator.template_fields + DbtSeedMixin.template_fields # type: ignore[operator]

def __init__(self, *args: Any, **kwargs: Any) -> None:
super().__init__(*args, **kwargs)


class DbtSnapshotGcpCloudRunJobOperator(DbtSnapshotMixin, DbtGcpCloudRunJobBaseOperator):
"""
Executes a dbt core snapshot command.
"""

def __init__(self, *args: Any, **kwargs: Any) -> None:
super().__init__(*args, **kwargs)


class DbtRunGcpCloudRunJobOperator(DbtRunMixin, DbtGcpCloudRunJobBaseOperator):
"""
Executes a dbt core run command.
"""

template_fields: Sequence[str] = DbtGcpCloudRunJobBaseOperator.template_fields + DbtRunMixin.template_fields # type: ignore[operator]

def __init__(self, *args: Any, **kwargs: Any) -> None:
super().__init__(*args, **kwargs)


class DbtTestGcpCloudRunJobOperator(DbtTestMixin, DbtGcpCloudRunJobBaseOperator):
"""
Executes a dbt core test command.
"""

def __init__(self, on_warning_callback: Callable[..., Any] | None = None, **kwargs: str) -> None:
super().__init__(**kwargs)
# as of now, on_warning_callback in docker executor does nothing
self.on_warning_callback = on_warning_callback


class DbtRunOperationGcpCloudRunJobOperator(DbtRunOperationMixin, DbtGcpCloudRunJobBaseOperator):
"""
Executes a dbt core run-operation command.

:param macro_name: name of macro to execute
:param args: Supply arguments to the macro. This dictionary will be mapped to the keyword arguments defined in the
selected macro.
"""

template_fields: Sequence[str] = DbtGcpCloudRunJobBaseOperator.template_fields + DbtRunOperationMixin.template_fields # type: ignore[operator]

def __init__(self, *args: Any, **kwargs: Any) -> None:
super().__init__(*args, **kwargs)
Binary file added docs/_static/cosmos_gcp_crj_schematic.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added docs/_static/jaffle_shop_big_query.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added docs/_static/jaffle_shop_gcp_cloud_run_job.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
28 changes: 28 additions & 0 deletions docs/getting_started/execution-modes.rst
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ Cosmos can run ``dbt`` commands using five different approaches, called ``execut
4. **kubernetes**: Run ``dbt`` commands from Kubernetes Pods managed by Cosmos (requires a pre-existing Docker image)
5. **aws_eks**: Run ``dbt`` commands from AWS EKS Pods managed by Cosmos (requires a pre-existing Docker image)
6. **azure_container_instance**: Run ``dbt`` commands from Azure Container Instances managed by Cosmos (requires a pre-existing Docker image)
7. **gcp_cloud_run_job**: Run ``dbt`` commands from GCP Cloud Run Job instances managed by Cosmos (requires a pre-existing Docker image)

The choice of the ``execution mode`` can vary based on each user's needs and concerns. For more details, check each execution mode described below.

Expand Down Expand Up @@ -47,6 +48,10 @@ The choice of the ``execution mode`` can vary based on each user's needs and con
- Slow
- High
- No
* - GCP Cloud Run Job Instance
- Slow
- High
- No

Local
-----
Expand Down Expand Up @@ -209,6 +214,29 @@ Each task will create a new container on Azure, giving full isolation. This, how
},
)

GCP Cloud Run Job
ags-de marked this conversation as resolved.
Show resolved Hide resolved
------------------------
.. versionadded:: 1.7
The ``gcp_cloud_run_job`` execution mode is particularly useful for users who prefer to run their ``dbt`` commands on Google Cloud infrastructure, taking advantage of Cloud Run's scalability, isolation, and managed service capabilities.

For the ``gcp_cloud_run_job`` execution mode to work, a Cloud Run Job instance must first be created using a previously built Docker container. This container should include the latest ``dbt`` pipelines and profiles. You can find more details in the `Cloud Run Job creation guide <https://cloud.google.com/run/docs/create-jobs>`__ .

This execution mode allows users to run ``dbt`` core CLI commands in a Google Cloud Run Job instance. This mode leverages the ``CloudRunExecuteJobOperator`` from the Google Cloud Airflow provider to execute commands within a Cloud Run Job instance, where ``dbt`` is already installed. Similarly to the ``Docker`` and ``Kubernetes`` execution modes, a Docker container should be available, containing the up-to-date ``dbt`` pipelines and profiles.

Each task will create a new Cloud Run Job execution, giving full isolation. The separation of tasks adds extra overhead; however, that can be mitigated by using the ``concurrency`` parameter in ``DbtDag``, which will result in parallelized execution of ``dbt`` models.


.. code-block:: python

gcp_cloud_run_job_cosmos_dag = DbtDag(
# ...
execution_config=ExecutionConfig(execution_mode=ExecutionMode.GCP_CLOUD_RUN_JOB),
operator_args={
"project_id": "my-gcp-project-id",
"region": "europe-west1",
"job_name": "my-crj-{{ ti.task_id.replace('.','-').replace('_','-') }}",
},
)

.. _invocation_modes:
Invocation Modes
Expand Down
Loading
Loading