Skip to content

Commit

Permalink
Merge branch 'main' into optimize-dbt-virtualenv-reuse
Browse files Browse the repository at this point in the history
  • Loading branch information
tatiana authored Sep 30, 2024
2 parents 3b97cb5 + 7eb9386 commit 823887f
Show file tree
Hide file tree
Showing 33 changed files with 1,632 additions and 34 deletions.
10 changes: 10 additions & 0 deletions .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,8 @@ jobs:
POSTGRES_DB: postgres
POSTGRES_SCHEMA: public
POSTGRES_PORT: 5432
AIRFLOW__COSMOS__REMOTE_TARGET_PATH: "s3://cosmos-remote-cache/target_compiled/"
AIRFLOW__COSMOS__REMOTE_TARGET_PATH_CONN_ID: aws_s3_conn

- name: Upload coverage to Github
uses: actions/upload-artifact@v4
Expand Down Expand Up @@ -248,6 +250,8 @@ jobs:
POSTGRES_DB: postgres
POSTGRES_SCHEMA: public
POSTGRES_PORT: 5432
AIRFLOW__COSMOS__REMOTE_TARGET_PATH: "s3://cosmos-remote-cache/target_compiled/"
AIRFLOW__COSMOS__REMOTE_TARGET_PATH_CONN_ID: aws_s3_conn

- name: Upload coverage to Github
uses: actions/upload-artifact@v4
Expand Down Expand Up @@ -316,6 +320,8 @@ jobs:
POSTGRES_DB: postgres
POSTGRES_SCHEMA: public
POSTGRES_PORT: 5432
AIRFLOW__COSMOS__REMOTE_TARGET_PATH: "s3://cosmos-remote-cache/target_compiled/"
AIRFLOW__COSMOS__REMOTE_TARGET_PATH_CONN_ID: aws_s3_conn

- name: Upload coverage to Github
uses: actions/upload-artifact@v4
Expand Down Expand Up @@ -393,6 +399,8 @@ jobs:
POSTGRES_DB: postgres
POSTGRES_SCHEMA: public
POSTGRES_PORT: 5432
AIRFLOW__COSMOS__REMOTE_TARGET_PATH: "s3://cosmos-remote-cache/target_compiled/"
AIRFLOW__COSMOS__REMOTE_TARGET_PATH_CONN_ID: aws_s3_conn

- name: Upload coverage to Github
uses: actions/upload-artifact@v4
Expand Down Expand Up @@ -537,6 +545,8 @@ jobs:
POSTGRES_DB: postgres
POSTGRES_SCHEMA: public
POSTGRES_PORT: 5432
AIRFLOW__COSMOS__REMOTE_TARGET_PATH: "s3://cosmos-remote-cache/target_compiled/"
AIRFLOW__COSMOS__REMOTE_TARGET_PATH_CONN_ID: aws_s3_conn

- name: Upload coverage to Github
uses: actions/upload-artifact@v4
Expand Down
2 changes: 1 addition & 1 deletion .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ repos:
- --py37-plus
- --keep-runtime-typing
- repo: https://github.com/astral-sh/ruff-pre-commit
rev: v0.6.7
rev: v0.6.8
hooks:
- id: ruff
args:
Expand Down
42 changes: 42 additions & 0 deletions cosmos/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,40 @@
DbtTestAwsEksOperator = MissingPackage("cosmos.operators.azure_container_instance.DbtTestAwsEksOperator", "aws_eks")


try:
from cosmos.operators.gcp_cloud_run_job import (
DbtBuildGcpCloudRunJobOperator,
DbtLSGcpCloudRunJobOperator,
DbtRunGcpCloudRunJobOperator,
DbtRunOperationGcpCloudRunJobOperator,
DbtSeedGcpCloudRunJobOperator,
DbtSnapshotGcpCloudRunJobOperator,
DbtTestGcpCloudRunJobOperator,
)
except (ImportError, AttributeError):
DbtBuildGcpCloudRunJobOperator = MissingPackage(
"cosmos.operators.gcp_cloud_run_job.DbtBuildGcpCloudRunJobOperator", "gcp-cloud-run-job"
)
DbtLSGcpCloudRunJobOperator = MissingPackage(
"cosmos.operators.gcp_cloud_run_job.DbtLSGcpCloudRunJobOperator", "gcp-cloud-run-job"
)
DbtRunGcpCloudRunJobOperator = MissingPackage(
"cosmos.operators.gcp_cloud_run_job.DbtRunGcpCloudRunJobOperator", "gcp-cloud-run-job"
)
DbtRunOperationGcpCloudRunJobOperator = MissingPackage(
"cosmos.operators.gcp_cloud_run_job.DbtRunOperationGcpCloudRunJobOperator", "gcp-cloud-run-job"
)
DbtSeedGcpCloudRunJobOperator = MissingPackage(
"cosmos.operators.gcp_cloud_run_job.DbtSeedGcpCloudRunJobOperator", "gcp-cloud-run-job"
)
DbtSnapshotGcpCloudRunJobOperator = MissingPackage(
"cosmos.operators.gcp_cloud_run_job.DbtSnapshotGcpCloudRunJobOperator", "gcp-cloud-run-job"
)
DbtTestGcpCloudRunJobOperator = MissingPackage(
"cosmos.operators.gcp_cloud_run_job.DbtTestGcpCloudRunJobOperator", "gcp-cloud-run-job"
)


__all__ = [
"ProjectConfig",
"ProfileConfig",
Expand Down Expand Up @@ -221,6 +255,14 @@
"DbtSeedAwsEksOperator",
"DbtSnapshotAwsEksOperator",
"DbtTestAwsEksOperator",
# GCP Cloud Run Job Execution Mode
"DbtBuildGcpCloudRunJobOperator",
"DbtLSGcpCloudRunJobOperator",
"DbtRunGcpCloudRunJobOperator",
"DbtRunOperationGcpCloudRunJobOperator",
"DbtSeedGcpCloudRunJobOperator",
"DbtSnapshotGcpCloudRunJobOperator",
"DbtTestGcpCloudRunJobOperator",
]

"""
Expand Down
31 changes: 30 additions & 1 deletion cosmos/airflow/graph.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

from cosmos.config import RenderConfig
from cosmos.constants import (
DBT_COMPILE_TASK_ID,
DEFAULT_DBT_RESOURCES,
TESTABLE_DBT_RESOURCES,
DbtResourceType,
Expand Down Expand Up @@ -252,6 +253,31 @@ def generate_task_or_group(
return task_or_group


def _add_dbt_compile_task(
nodes: dict[str, DbtNode],
dag: DAG,
execution_mode: ExecutionMode,
task_args: dict[str, Any],
tasks_map: dict[str, Any],
task_group: TaskGroup | None,
) -> None:
if execution_mode != ExecutionMode.AIRFLOW_ASYNC:
return

compile_task_metadata = TaskMetadata(
id=DBT_COMPILE_TASK_ID,
operator_class="cosmos.operators.airflow_async.DbtCompileAirflowAsyncOperator",
arguments=task_args,
extra_context={},
)
compile_airflow_task = create_airflow_task(compile_task_metadata, dag, task_group=task_group)
tasks_map[DBT_COMPILE_TASK_ID] = compile_airflow_task

for node_id, node in nodes.items():
if not node.depends_on and node_id in tasks_map:
tasks_map[DBT_COMPILE_TASK_ID] >> tasks_map[node_id]


def build_airflow_graph(
nodes: dict[str, DbtNode],
dag: DAG, # Airflow-specific - parent DAG where to associate tasks and (optional) task groups
Expand Down Expand Up @@ -332,11 +358,14 @@ def build_airflow_graph(
for leaf_node_id in leaves_ids:
tasks_map[leaf_node_id] >> test_task

_add_dbt_compile_task(nodes, dag, execution_mode, task_args, tasks_map, task_group)

create_airflow_task_dependencies(nodes, tasks_map)


def create_airflow_task_dependencies(
nodes: dict[str, DbtNode], tasks_map: dict[str, Union[TaskGroup, BaseOperator]]
nodes: dict[str, DbtNode],
tasks_map: dict[str, Union[TaskGroup, BaseOperator]],
) -> None:
"""
Create the Airflow task dependencies between non-test nodes.
Expand Down
4 changes: 4 additions & 0 deletions cosmos/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,11 +86,13 @@ class ExecutionMode(Enum):
"""

LOCAL = "local"
AIRFLOW_ASYNC = "airflow_async"
DOCKER = "docker"
KUBERNETES = "kubernetes"
AWS_EKS = "aws_eks"
VIRTUALENV = "virtualenv"
AZURE_CONTAINER_INSTANCE = "azure_container_instance"
GCP_CLOUD_RUN_JOB = "gcp_cloud_run_job"


class InvocationMode(Enum):
Expand Down Expand Up @@ -146,3 +148,5 @@ def _missing_value_(cls, value): # type: ignore
# It expects that you have already created those resources through the appropriate commands.
# https://docs.getdbt.com/reference/commands/test
TESTABLE_DBT_RESOURCES = {DbtResourceType.MODEL, DbtResourceType.SOURCE, DbtResourceType.SNAPSHOT, DbtResourceType.SEED}

DBT_COMPILE_TASK_ID = "dbt_compile"
7 changes: 3 additions & 4 deletions cosmos/converter.py
Original file line number Diff line number Diff line change
Expand Up @@ -116,10 +116,9 @@ def validate_initial_user_config(
:param render_config: Configuration related to how to convert the dbt workflow into an Airflow DAG
:param operator_args: Arguments to pass to the underlying operators.
"""
if profile_config is None and execution_config.execution_mode not in (
ExecutionMode.KUBERNETES,
ExecutionMode.AWS_EKS,
ExecutionMode.DOCKER,
if profile_config is None and execution_config.execution_mode in (
ExecutionMode.LOCAL,
ExecutionMode.VIRTUALENV,
):
raise CosmosValueError(f"The profile_config is mandatory when using {execution_config.execution_mode}")

Expand Down
4 changes: 3 additions & 1 deletion cosmos/core/airflow.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
from __future__ import annotations

import importlib

from airflow.models import BaseOperator
Expand All @@ -10,7 +12,7 @@
logger = get_logger(__name__)


def get_airflow_task(task: Task, dag: DAG, task_group: "TaskGroup | None" = None) -> BaseOperator:
def get_airflow_task(task: Task, dag: DAG, task_group: TaskGroup | None = None) -> BaseOperator:
"""
Get the Airflow Operator class for a Task.
Expand Down
33 changes: 33 additions & 0 deletions cosmos/dataset.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
from __future__ import annotations

from airflow import DAG
from airflow.utils.task_group import TaskGroup


def get_dataset_alias_name(dag: DAG | None, task_group: TaskGroup | None, task_id: str) -> str:
"""
Given the Airflow DAG, Airflow TaskGroup and the Airflow Task ID, return the name of the
Airflow DatasetAlias associated to that task.
"""
dag_id = None
task_group_id = None

if task_group:
if task_group.dag_id is not None:
dag_id = task_group.dag_id
if task_group.group_id is not None:
task_group_id = task_group.group_id
task_group_id = task_group_id.replace(".", "__")
elif dag:
dag_id = dag.dag_id

identifiers_list = []

if dag_id:
identifiers_list.append(dag_id)
if task_group_id:
identifiers_list.append(task_group_id)

identifiers_list.append(task_id)

return "__".join(identifiers_list)
67 changes: 67 additions & 0 deletions cosmos/operators/airflow_async.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
from cosmos.operators.local import (
DbtBuildLocalOperator,
DbtCompileLocalOperator,
DbtDocsAzureStorageLocalOperator,
DbtDocsGCSLocalOperator,
DbtDocsLocalOperator,
DbtDocsS3LocalOperator,
DbtLSLocalOperator,
DbtRunLocalOperator,
DbtRunOperationLocalOperator,
DbtSeedLocalOperator,
DbtSnapshotLocalOperator,
DbtSourceLocalOperator,
DbtTestLocalOperator,
)


class DbtBuildAirflowAsyncOperator(DbtBuildLocalOperator):
pass


class DbtLSAirflowAsyncOperator(DbtLSLocalOperator):
pass


class DbtSeedAirflowAsyncOperator(DbtSeedLocalOperator):
pass


class DbtSnapshotAirflowAsyncOperator(DbtSnapshotLocalOperator):
pass


class DbtSourceAirflowAsyncOperator(DbtSourceLocalOperator):
pass


class DbtRunAirflowAsyncOperator(DbtRunLocalOperator):
pass


class DbtTestAirflowAsyncOperator(DbtTestLocalOperator):
pass


class DbtRunOperationAirflowAsyncOperator(DbtRunOperationLocalOperator):
pass


class DbtDocsAirflowAsyncOperator(DbtDocsLocalOperator):
pass


class DbtDocsS3AirflowAsyncOperator(DbtDocsS3LocalOperator):
pass


class DbtDocsAzureStorageAirflowAsyncOperator(DbtDocsAzureStorageLocalOperator):
pass


class DbtDocsGCSAirflowAsyncOperator(DbtDocsGCSLocalOperator):
pass


class DbtCompileAirflowAsyncOperator(DbtCompileLocalOperator):
pass
9 changes: 9 additions & 0 deletions cosmos/operators/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -429,3 +429,12 @@ def add_cmd_flags(self) -> list[str]:
flags.append("--args")
flags.append(yaml.dump(self.args))
return flags


class DbtCompileMixin:
"""
Mixin for dbt compile command.
"""

base_cmd = ["compile"]
ui_color = "#877c7c"
Loading

0 comments on commit 823887f

Please sign in to comment.