diff --git a/.gitignore b/.gitignore index 6f6111510..5991c231c 100644 --- a/.gitignore +++ b/.gitignore @@ -160,6 +160,3 @@ webserver_config.py # VI *.sw[a-z] - -# Ignore possibly created symlink to `dev/dags` for running `airflow dags test` command. -dags diff --git a/cosmos/__init__.py b/cosmos/__init__.py index f76f18b7c..ad4c9ee35 100644 --- a/cosmos/__init__.py +++ b/cosmos/__init__.py @@ -5,7 +5,8 @@ Contains dags, task groups, and operators. """ -__version__ = "1.6.0" + +__version__ = "1.7.0a1" from cosmos.airflow.dag import DbtDag diff --git a/cosmos/airflow/graph.py b/cosmos/airflow/graph.py index 9de21292e..f507b03ac 100644 --- a/cosmos/airflow/graph.py +++ b/cosmos/airflow/graph.py @@ -132,6 +132,7 @@ def create_task_metadata( node: DbtNode, execution_mode: ExecutionMode, args: dict[str, Any], + dbt_dag_task_group_identifier: str, use_task_group: bool = False, source_rendering_behavior: SourceRenderingBehavior = SourceRenderingBehavior.NONE, ) -> TaskMetadata | None: @@ -142,6 +143,7 @@ def create_task_metadata( :param execution_mode: Where Cosmos should run each dbt task (e.g. ExecutionMode.LOCAL, ExecutionMode.KUBERNETES). Default is ExecutionMode.LOCAL. :param args: Arguments to be used to instantiate an Airflow Task + :param dbt_dag_task_group_identifier: Identifier to refer to the DbtDAG or DbtTaskGroup in the DAG. :param use_task_group: It determines whether to use the name as a prefix for the task id or not. If it is False, then use the name as a prefix for the task id, otherwise do not. :returns: The metadata necessary to instantiate the source dbt node as an Airflow task. @@ -156,7 +158,10 @@ def create_task_metadata( args = {**args, **{"models": node.resource_name}} if DbtResourceType(node.resource_type) in DEFAULT_DBT_RESOURCES and node.resource_type in dbt_resource_to_class: - extra_context = {"dbt_node_config": node.context_dict} + extra_context = { + "dbt_node_config": node.context_dict, + "dbt_dag_task_group_identifier": dbt_dag_task_group_identifier, + } if node.resource_type == DbtResourceType.MODEL: task_id = f"{node.name}_run" if use_task_group is True: @@ -226,6 +231,7 @@ def generate_task_or_group( node=node, execution_mode=execution_mode, args=task_args, + dbt_dag_task_group_identifier=_get_dbt_dag_task_group_identifier(dag, task_group), use_task_group=use_task_group, source_rendering_behavior=source_rendering_behavior, ) @@ -268,14 +274,28 @@ def _add_dbt_compile_task( id=DBT_COMPILE_TASK_ID, operator_class="cosmos.operators.airflow_async.DbtCompileAirflowAsyncOperator", arguments=task_args, - extra_context={}, + extra_context={"dbt_dag_task_group_identifier": _get_dbt_dag_task_group_identifier(dag, task_group)}, ) compile_airflow_task = create_airflow_task(compile_task_metadata, dag, task_group=task_group) + + for task_id, task in tasks_map.items(): + if not task.upstream_list: + compile_airflow_task >> task + tasks_map[DBT_COMPILE_TASK_ID] = compile_airflow_task - for node_id, node in nodes.items(): - if not node.depends_on and node_id in tasks_map: - tasks_map[DBT_COMPILE_TASK_ID] >> tasks_map[node_id] + +def _get_dbt_dag_task_group_identifier(dag: DAG, task_group: TaskGroup | None) -> str: + dag_id = dag.dag_id + task_group_id = task_group.group_id if task_group else None + identifiers_list = [] + if dag_id: + identifiers_list.append(dag_id) + if task_group_id: + identifiers_list.append(task_group_id) + dag_task_group_identifier = "__".join(identifiers_list) + + return dag_task_group_identifier def build_airflow_graph( @@ -358,9 +378,8 @@ def build_airflow_graph( for leaf_node_id in leaves_ids: tasks_map[leaf_node_id] >> test_task - _add_dbt_compile_task(nodes, dag, execution_mode, task_args, tasks_map, task_group) - create_airflow_task_dependencies(nodes, tasks_map) + _add_dbt_compile_task(nodes, dag, execution_mode, task_args, tasks_map, task_group) def create_airflow_task_dependencies( diff --git a/cosmos/config.py b/cosmos/config.py index 2cebbf3cc..ccda2c432 100644 --- a/cosmos/config.py +++ b/cosmos/config.py @@ -10,6 +10,7 @@ from pathlib import Path from typing import Any, Callable, Iterator +import yaml from airflow.version import version as airflow_version from cosmos.cache import create_cache_profile, get_cached_profile, is_profile_cache_enabled @@ -286,6 +287,21 @@ def validate_profiles_yml(self) -> None: if self.profiles_yml_filepath and not Path(self.profiles_yml_filepath).exists(): raise CosmosValueError(f"The file {self.profiles_yml_filepath} does not exist.") + def get_profile_type(self) -> str: + if isinstance(self.profile_mapping, BaseProfileMapping): + return str(self.profile_mapping.dbt_profile_type) + + profile_path = self._get_profile_path() + + with open(profile_path) as file: + profiles = yaml.safe_load(file) + + profile = profiles[self.profile_name] + target_type = profile["outputs"][self.target_name]["type"] + return str(target_type) + + return "undefined" + def _get_profile_path(self, use_mock_values: bool = False) -> Path: """ Handle the profile caching mechanism. diff --git a/cosmos/core/airflow.py b/cosmos/core/airflow.py index 1bdce9361..6f1064649 100644 --- a/cosmos/core/airflow.py +++ b/cosmos/core/airflow.py @@ -1,6 +1,7 @@ from __future__ import annotations import importlib +from typing import Any from airflow.models import BaseOperator from airflow.models.dag import DAG @@ -27,7 +28,7 @@ def get_airflow_task(task: Task, dag: DAG, task_group: TaskGroup | None = None) module = importlib.import_module(module_name) Operator = getattr(module, class_name) - task_kwargs = {} + task_kwargs: dict[str, Any] = {} if task.owner != "": task_kwargs["owner"] = task.owner diff --git a/cosmos/dbt/graph.py b/cosmos/dbt/graph.py index 1c0237e8f..7a957b2fc 100644 --- a/cosmos/dbt/graph.py +++ b/cosmos/dbt/graph.py @@ -467,7 +467,6 @@ def should_use_dbt_ls_cache(self) -> bool: def load_via_dbt_ls_cache(self) -> bool: """(Try to) load dbt ls cache from an Airflow Variable""" - logger.info(f"Trying to parse the dbt project using dbt ls cache {self.dbt_ls_cache_key}...") if self.should_use_dbt_ls_cache(): project_path = self.project_path diff --git a/cosmos/operators/airflow_async.py b/cosmos/operators/airflow_async.py index 05f762702..a7f30a330 100644 --- a/cosmos/operators/airflow_async.py +++ b/cosmos/operators/airflow_async.py @@ -1,67 +1,190 @@ +from __future__ import annotations + +import inspect +from pathlib import Path +from typing import TYPE_CHECKING, Any, Sequence + +from airflow.providers.google.cloud.hooks.bigquery import BigQueryHook +from airflow.providers.google.cloud.operators.bigquery import BigQueryInsertJobOperator +from airflow.utils.context import Context + +from cosmos import settings +from cosmos.config import ProfileConfig +from cosmos.exceptions import CosmosValueError +from cosmos.operators.base import AbstractDbtBaseOperator from cosmos.operators.local import ( DbtBuildLocalOperator, DbtCompileLocalOperator, - DbtDocsAzureStorageLocalOperator, - DbtDocsGCSLocalOperator, - DbtDocsLocalOperator, - DbtDocsS3LocalOperator, + DbtLocalBaseOperator, DbtLSLocalOperator, - DbtRunLocalOperator, DbtRunOperationLocalOperator, DbtSeedLocalOperator, DbtSnapshotLocalOperator, DbtSourceLocalOperator, DbtTestLocalOperator, ) +from cosmos.settings import remote_target_path, remote_target_path_conn_id +_SUPPORTED_DATABASES = ["bigquery"] -class DbtBuildAirflowAsyncOperator(DbtBuildLocalOperator): - pass +from abc import ABCMeta - -class DbtLSAirflowAsyncOperator(DbtLSLocalOperator): - pass +from airflow.models.baseoperator import BaseOperator -class DbtSeedAirflowAsyncOperator(DbtSeedLocalOperator): - pass - - -class DbtSnapshotAirflowAsyncOperator(DbtSnapshotLocalOperator): - pass - - -class DbtSourceAirflowAsyncOperator(DbtSourceLocalOperator): - pass +class DbtBaseAirflowAsyncOperator(BaseOperator, metaclass=ABCMeta): + def __init__(self, **kwargs) -> None: # type: ignore + self.location = kwargs.pop("location") + self.configuration = kwargs.pop("configuration", {}) + super().__init__(**kwargs) -class DbtRunAirflowAsyncOperator(DbtRunLocalOperator): +class DbtBuildAirflowAsyncOperator(DbtBaseAirflowAsyncOperator, DbtBuildLocalOperator): # type: ignore pass -class DbtTestAirflowAsyncOperator(DbtTestLocalOperator): +class DbtLSAirflowAsyncOperator(DbtBaseAirflowAsyncOperator, DbtLSLocalOperator): # type: ignore pass -class DbtRunOperationAirflowAsyncOperator(DbtRunOperationLocalOperator): +class DbtSeedAirflowAsyncOperator(DbtBaseAirflowAsyncOperator, DbtSeedLocalOperator): # type: ignore pass -class DbtDocsAirflowAsyncOperator(DbtDocsLocalOperator): +class DbtSnapshotAirflowAsyncOperator(DbtBaseAirflowAsyncOperator, DbtSnapshotLocalOperator): # type: ignore pass -class DbtDocsS3AirflowAsyncOperator(DbtDocsS3LocalOperator): +class DbtSourceAirflowAsyncOperator(DbtBaseAirflowAsyncOperator, DbtSourceLocalOperator): # type: ignore pass -class DbtDocsAzureStorageAirflowAsyncOperator(DbtDocsAzureStorageLocalOperator): +class DbtRunAirflowAsyncOperator(BigQueryInsertJobOperator): # type: ignore + + template_fields: Sequence[str] = ( + "full_refresh", + "project_dir", + "gcp_project", + "dataset", + "location", + ) + + def __init__( # type: ignore + self, + project_dir: str, + profile_config: ProfileConfig, + location: str, # This is a mandatory parameter when using BigQueryInsertJobOperator with deferrable=True + full_refresh: bool = False, + extra_context: dict[str, object] | None = None, + configuration: dict[str, object] | None = None, + **kwargs, + ) -> None: + # dbt task param + self.project_dir = project_dir + self.extra_context = extra_context or {} + self.full_refresh = full_refresh + self.profile_config = profile_config + if not self.profile_config or not self.profile_config.profile_mapping: + raise CosmosValueError(f"Cosmos async support is only available when using ProfileMapping") + + self.profile_type: str = profile_config.get_profile_type() # type: ignore + if self.profile_type not in _SUPPORTED_DATABASES: + raise CosmosValueError(f"Async run are only supported: {_SUPPORTED_DATABASES}") + + # airflow task param + self.location = location + self.configuration = configuration or {} + self.gcp_conn_id = self.profile_config.profile_mapping.conn_id # type: ignore + profile = self.profile_config.profile_mapping.profile + self.gcp_project = profile["project"] + self.dataset = profile["dataset"] + + # Cosmos attempts to pass many kwargs that BigQueryInsertJobOperator simply does not accept. + # We need to pop them. + clean_kwargs = {} + non_async_args = set(inspect.signature(AbstractDbtBaseOperator.__init__).parameters.keys()) + non_async_args |= set(inspect.signature(DbtLocalBaseOperator.__init__).parameters.keys()) + non_async_args -= {"task_id"} + + for arg_key, arg_value in kwargs.items(): + if arg_key not in non_async_args: + clean_kwargs[arg_key] = arg_value + + # The following are the minimum required parameters to run BigQueryInsertJobOperator using the deferrable mode + super().__init__( + gcp_conn_id=self.gcp_conn_id, + configuration=self.configuration, + location=self.location, + deferrable=True, + **clean_kwargs, + ) + + def get_remote_sql(self) -> str: + if not settings.AIRFLOW_IO_AVAILABLE: + raise CosmosValueError(f"Cosmos async support is only available starting in Airflow 2.8 or later.") + from airflow.io.path import ObjectStoragePath + + file_path = self.extra_context["dbt_node_config"]["file_path"] # type: ignore + dbt_dag_task_group_identifier = self.extra_context["dbt_dag_task_group_identifier"] + + remote_target_path_str = str(remote_target_path).rstrip("/") + + if TYPE_CHECKING: + assert self.project_dir is not None + + project_dir_parent = str(Path(self.project_dir).parent) + relative_file_path = str(file_path).replace(project_dir_parent, "").lstrip("/") + remote_model_path = f"{remote_target_path_str}/{dbt_dag_task_group_identifier}/compiled/{relative_file_path}" + + object_storage_path = ObjectStoragePath(remote_model_path, conn_id=remote_target_path_conn_id) + with object_storage_path.open() as fp: # type: ignore + return fp.read() # type: ignore + + def drop_table_sql(self) -> None: + model_name = self.extra_context["dbt_node_config"]["resource_name"] # type: ignore + sql = f"DROP TABLE IF EXISTS {self.gcp_project}.{self.dataset}.{model_name};" + + hook = BigQueryHook( + gcp_conn_id=self.gcp_conn_id, + impersonation_chain=self.impersonation_chain, + ) + self.configuration = { + "query": { + "query": sql, + "useLegacySql": False, + } + } + hook.insert_job(configuration=self.configuration, location=self.location, project_id=self.gcp_project) + + def execute(self, context: Context) -> Any | None: + if not self.full_refresh: + raise CosmosValueError("The async execution only supported for full_refresh") + else: + # It may be surprising to some, but the dbt-core --full-refresh argument fully drops the table before populating it + # https://github.com/dbt-labs/dbt-core/blob/5e9f1b515f37dfe6cdae1ab1aa7d190b92490e24/core/dbt/context/base.py#L662-L666 + # https://docs.getdbt.com/reference/resource-configs/full_refresh#recommendation + # We're emulating this behaviour here + self.drop_table_sql() + sql = self.get_remote_sql() + model_name = self.extra_context["dbt_node_config"]["resource_name"] # type: ignore + # prefix explicit create command to create table + sql = f"CREATE TABLE {self.gcp_project}.{self.dataset}.{model_name} AS {sql}" + self.configuration = { + "query": { + "query": sql, + "useLegacySql": False, + } + } + return super().execute(context) + + +class DbtTestAirflowAsyncOperator(DbtBaseAirflowAsyncOperator, DbtTestLocalOperator): # type: ignore pass -class DbtDocsGCSAirflowAsyncOperator(DbtDocsGCSLocalOperator): +class DbtRunOperationAirflowAsyncOperator(DbtBaseAirflowAsyncOperator, DbtRunOperationLocalOperator): # type: ignore pass -class DbtCompileAirflowAsyncOperator(DbtCompileLocalOperator): +class DbtCompileAirflowAsyncOperator(DbtBaseAirflowAsyncOperator, DbtCompileLocalOperator): # type: ignore pass diff --git a/cosmos/operators/local.py b/cosmos/operators/local.py index db5993609..05fa356f6 100644 --- a/cosmos/operators/local.py +++ b/cosmos/operators/local.py @@ -32,7 +32,7 @@ from cosmos.dataset import get_dataset_alias_name from cosmos.dbt.project import get_partial_parse_path, has_non_empty_dependencies_file from cosmos.exceptions import AirflowCompatibilityError, CosmosValueError -from cosmos.settings import AIRFLOW_IO_AVAILABLE, remote_target_path, remote_target_path_conn_id +from cosmos.settings import remote_target_path, remote_target_path_conn_id try: from airflow.datasets import Dataset @@ -294,7 +294,7 @@ def _configure_remote_target_path() -> tuple[Path, str] | tuple[None, None]: if remote_conn_id is None: return None, None - if not AIRFLOW_IO_AVAILABLE: + if not settings.AIRFLOW_IO_AVAILABLE: raise CosmosValueError( f"You're trying to specify remote target path {target_path_str}, but the required " f"Object Storage feature is unavailable in Airflow version {airflow_version}. Please upgrade to " @@ -311,23 +311,16 @@ def _configure_remote_target_path() -> tuple[Path, str] | tuple[None, None]: return _configured_target_path, remote_conn_id def _construct_dest_file_path( - self, dest_target_dir: Path, file_path: str, source_compiled_dir: Path, context: Context + self, + dest_target_dir: Path, + file_path: str, + source_compiled_dir: Path, ) -> str: """ Construct the destination path for the compiled SQL files to be uploaded to the remote store. """ dest_target_dir_str = str(dest_target_dir).rstrip("/") - - task = context["task"] - dag_id = task.dag_id - task_group_id = task.task_group.group_id if task.task_group else None - identifiers_list = [] - if dag_id: - identifiers_list.append(dag_id) - if task_group_id: - identifiers_list.append(task_group_id) - dag_task_group_identifier = "__".join(identifiers_list) - + dag_task_group_identifier = self.extra_context["dbt_dag_task_group_identifier"] rel_path = os.path.relpath(file_path, source_compiled_dir).lstrip("/") return f"{dest_target_dir_str}/{dag_task_group_identifier}/compiled/{rel_path}" @@ -340,6 +333,7 @@ def upload_compiled_sql(self, tmp_project_dir: str, context: Context) -> None: return dest_target_dir, dest_conn_id = self._configure_remote_target_path() + if not dest_target_dir: raise CosmosValueError( "You're trying to upload compiled SQL files, but the remote target path is not configured. " @@ -350,7 +344,7 @@ def upload_compiled_sql(self, tmp_project_dir: str, context: Context) -> None: source_compiled_dir = Path(tmp_project_dir) / "target" / "compiled" files = [str(file) for file in source_compiled_dir.rglob("*") if file.is_file()] for file_path in files: - dest_file_path = self._construct_dest_file_path(dest_target_dir, file_path, source_compiled_dir, context) + dest_file_path = self._construct_dest_file_path(dest_target_dir, file_path, source_compiled_dir) dest_object_storage_path = ObjectStoragePath(dest_file_path, conn_id=dest_conn_id) ObjectStoragePath(file_path).copy(dest_object_storage_path) self.log.debug("Copied %s to %s", file_path, dest_object_storage_path) diff --git a/cosmos/settings.py b/cosmos/settings.py index 2cae79968..7bcf04bb9 100644 --- a/cosmos/settings.py +++ b/cosmos/settings.py @@ -34,7 +34,6 @@ # This will be merged with the `cache_dir` config parameter in upcoming releases. remote_cache_dir = conf.get("cosmos", "remote_cache_dir", fallback=None) remote_cache_dir_conn_id = conf.get("cosmos", "remote_cache_dir_conn_id", fallback=None) - remote_target_path = conf.get("cosmos", "remote_target_path", fallback=None) remote_target_path_conn_id = conf.get("cosmos", "remote_target_path_conn_id", fallback=None) diff --git a/dev/Dockerfile b/dev/Dockerfile index a17bb9943..9fd3df75a 100644 --- a/dev/Dockerfile +++ b/dev/Dockerfile @@ -17,7 +17,7 @@ COPY ./README.rst ${AIRFLOW_HOME}/astronomer_cosmos/ COPY ./cosmos/ ${AIRFLOW_HOME}/astronomer_cosmos/cosmos/ COPY ./dev/requirements.txt ${AIRFLOW_HOME}/requirements.txt # install the package in editable mode -RUN uv pip install --system -e "${AIRFLOW_HOME}/astronomer_cosmos"[dbt-postgres,dbt-databricks] && \ +RUN uv pip install --system -e "${AIRFLOW_HOME}/astronomer_cosmos"[dbt-postgres,dbt-databricks,dbt-bigquery] && \ uv pip install --system -r ${AIRFLOW_HOME}/requirements.txt diff --git a/dev/dags/cosmos_manifest_example.py b/dev/dags/cosmos_manifest_example.py index a96eff971..8e35208b8 100644 --- a/dev/dags/cosmos_manifest_example.py +++ b/dev/dags/cosmos_manifest_example.py @@ -74,7 +74,7 @@ def cosmos_manifest_example() -> None: gcp_gs_example = DbtTaskGroup( group_id="gcp_gs_example", project_config=ProjectConfig( - manifest_path="gs://cosmos-manifest-test/manifest.json", + manifest_path="gs://cosmos_remote_target/manifest.json", manifest_conn_id="gcp_gs_conn", # `manifest_conn_id` is optional. If not provided, the default connection ID `google_cloud_default` is used. project_name="jaffle_shop", diff --git a/dev/dags/dbt/original_jaffle_shop/.gitignore b/dev/dags/dbt/original_jaffle_shop/.gitignore new file mode 100644 index 000000000..45d294b9a --- /dev/null +++ b/dev/dags/dbt/original_jaffle_shop/.gitignore @@ -0,0 +1,5 @@ + +target/ +dbt_packages/ +logs/ +!target/manifest.json diff --git a/dev/dags/dbt/original_jaffle_shop/LICENSE b/dev/dags/dbt/original_jaffle_shop/LICENSE new file mode 100644 index 000000000..8dada3eda --- /dev/null +++ b/dev/dags/dbt/original_jaffle_shop/LICENSE @@ -0,0 +1,201 @@ + Apache License + Version 2.0, January 2004 + http://www.apache.org/licenses/ + + TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION + + 1. Definitions. + + "License" shall mean the terms and conditions for use, reproduction, + and distribution as defined by Sections 1 through 9 of this document. + + "Licensor" shall mean the copyright owner or entity authorized by + the copyright owner that is granting the License. + + "Legal Entity" shall mean the union of the acting entity and all + other entities that control, are controlled by, or are under common + control with that entity. For the purposes of this definition, + "control" means (i) the power, direct or indirect, to cause the + direction or management of such entity, whether by contract or + otherwise, or (ii) ownership of fifty percent (50%) or more of the + outstanding shares, or (iii) beneficial ownership of such entity. + + "You" (or "Your") shall mean an individual or Legal Entity + exercising permissions granted by this License. + + "Source" form shall mean the preferred form for making modifications, + including but not limited to software source code, documentation + source, and configuration files. + + "Object" form shall mean any form resulting from mechanical + transformation or translation of a Source form, including but + not limited to compiled object code, generated documentation, + and conversions to other media types. + + "Work" shall mean the work of authorship, whether in Source or + Object form, made available under the License, as indicated by a + copyright notice that is included in or attached to the work + (an example is provided in the Appendix below). + + "Derivative Works" shall mean any work, whether in Source or Object + form, that is based on (or derived from) the Work and for which the + editorial revisions, annotations, elaborations, or other modifications + represent, as a whole, an original work of authorship. For the purposes + of this License, Derivative Works shall not include works that remain + separable from, or merely link (or bind by name) to the interfaces of, + the Work and Derivative Works thereof. + + "Contribution" shall mean any work of authorship, including + the original version of the Work and any modifications or additions + to that Work or Derivative Works thereof, that is intentionally + submitted to Licensor for inclusion in the Work by the copyright owner + or by an individual or Legal Entity authorized to submit on behalf of + the copyright owner. For the purposes of this definition, "submitted" + means any form of electronic, verbal, or written communication sent + to the Licensor or its representatives, including but not limited to + communication on electronic mailing lists, source code control systems, + and issue tracking systems that are managed by, or on behalf of, the + Licensor for the purpose of discussing and improving the Work, but + excluding communication that is conspicuously marked or otherwise + designated in writing by the copyright owner as "Not a Contribution." + + "Contributor" shall mean Licensor and any individual or Legal Entity + on behalf of whom a Contribution has been received by Licensor and + subsequently incorporated within the Work. + + 2. Grant of Copyright License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + copyright license to reproduce, prepare Derivative Works of, + publicly display, publicly perform, sublicense, and distribute the + Work and such Derivative Works in Source or Object form. + + 3. Grant of Patent License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + (except as stated in this section) patent license to make, have made, + use, offer to sell, sell, import, and otherwise transfer the Work, + where such license applies only to those patent claims licensable + by such Contributor that are necessarily infringed by their + Contribution(s) alone or by combination of their Contribution(s) + with the Work to which such Contribution(s) was submitted. If You + institute patent litigation against any entity (including a + cross-claim or counterclaim in a lawsuit) alleging that the Work + or a Contribution incorporated within the Work constitutes direct + or contributory patent infringement, then any patent licenses + granted to You under this License for that Work shall terminate + as of the date such litigation is filed. + + 4. Redistribution. You may reproduce and distribute copies of the + Work or Derivative Works thereof in any medium, with or without + modifications, and in Source or Object form, provided that You + meet the following conditions: + + (a) You must give any other recipients of the Work or + Derivative Works a copy of this License; and + + (b) You must cause any modified files to carry prominent notices + stating that You changed the files; and + + (c) You must retain, in the Source form of any Derivative Works + that You distribute, all copyright, patent, trademark, and + attribution notices from the Source form of the Work, + excluding those notices that do not pertain to any part of + the Derivative Works; and + + (d) If the Work includes a "NOTICE" text file as part of its + distribution, then any Derivative Works that You distribute must + include a readable copy of the attribution notices contained + within such NOTICE file, excluding those notices that do not + pertain to any part of the Derivative Works, in at least one + of the following places: within a NOTICE text file distributed + as part of the Derivative Works; within the Source form or + documentation, if provided along with the Derivative Works; or, + within a display generated by the Derivative Works, if and + wherever such third-party notices normally appear. The contents + of the NOTICE file are for informational purposes only and + do not modify the License. You may add Your own attribution + notices within Derivative Works that You distribute, alongside + or as an addendum to the NOTICE text from the Work, provided + that such additional attribution notices cannot be construed + as modifying the License. + + You may add Your own copyright statement to Your modifications and + may provide additional or different license terms and conditions + for use, reproduction, or distribution of Your modifications, or + for any such Derivative Works as a whole, provided Your use, + reproduction, and distribution of the Work otherwise complies with + the conditions stated in this License. + + 5. Submission of Contributions. Unless You explicitly state otherwise, + any Contribution intentionally submitted for inclusion in the Work + by You to the Licensor shall be under the terms and conditions of + this License, without any additional terms or conditions. + Notwithstanding the above, nothing herein shall supersede or modify + the terms of any separate license agreement you may have executed + with Licensor regarding such Contributions. + + 6. Trademarks. This License does not grant permission to use the trade + names, trademarks, service marks, or product names of the Licensor, + except as required for reasonable and customary use in describing the + origin of the Work and reproducing the content of the NOTICE file. + + 7. Disclaimer of Warranty. Unless required by applicable law or + agreed to in writing, Licensor provides the Work (and each + Contributor provides its Contributions) on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + implied, including, without limitation, any warranties or conditions + of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A + PARTICULAR PURPOSE. You are solely responsible for determining the + appropriateness of using or redistributing the Work and assume any + risks associated with Your exercise of permissions under this License. + + 8. Limitation of Liability. In no event and under no legal theory, + whether in tort (including negligence), contract, or otherwise, + unless required by applicable law (such as deliberate and grossly + negligent acts) or agreed to in writing, shall any Contributor be + liable to You for damages, including any direct, indirect, special, + incidental, or consequential damages of any character arising as a + result of this License or out of the use or inability to use the + Work (including but not limited to damages for loss of goodwill, + work stoppage, computer failure or malfunction, or any and all + other commercial damages or losses), even if such Contributor + has been advised of the possibility of such damages. + + 9. Accepting Warranty or Additional Liability. While redistributing + the Work or Derivative Works thereof, You may choose to offer, + and charge a fee for, acceptance of support, warranty, indemnity, + or other liability obligations and/or rights consistent with this + License. However, in accepting such obligations, You may act only + on Your own behalf and on Your sole responsibility, not on behalf + of any other Contributor, and only if You agree to indemnify, + defend, and hold each Contributor harmless for any liability + incurred by, or claims asserted against, such Contributor by reason + of your accepting any such warranty or additional liability. + + END OF TERMS AND CONDITIONS + + APPENDIX: How to apply the Apache License to your work. + + To apply the Apache License to your work, attach the following + boilerplate notice, with the fields enclosed by brackets "{}" + replaced with your own identifying information. (Don't include + the brackets!) The text should be enclosed in the appropriate + comment syntax for the file format. We also recommend that a + file or class name and description of purpose be included on the + same "printed page" as the copyright notice for easier + identification within third-party archives. + + Copyright {yyyy} {name of copyright owner} + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. diff --git a/dev/dags/dbt/original_jaffle_shop/README.md b/dev/dags/dbt/original_jaffle_shop/README.md new file mode 100644 index 000000000..d4ce46446 --- /dev/null +++ b/dev/dags/dbt/original_jaffle_shop/README.md @@ -0,0 +1,11 @@ +## `jaffle_shop` + +`jaffle_shop` is a fictional ecommerce store. This dbt project transforms raw data from an app database into a customers and orders model ready for analytics. + +See [dbt's documentation](https://github.com/dbt-labs/jaffle_shop) for more info. + +### Modifications + +This project has been modified from the original to highlight some of the features of Cosmos. Namely: + +- tags have been added to the models diff --git a/dev/dags/dbt/original_jaffle_shop/dbt_project.yml b/dev/dags/dbt/original_jaffle_shop/dbt_project.yml new file mode 100644 index 000000000..42767c5ea --- /dev/null +++ b/dev/dags/dbt/original_jaffle_shop/dbt_project.yml @@ -0,0 +1,26 @@ +name: 'original_jaffle_shop' + +config-version: 2 +version: '0.1' + +profile: 'jaffle_shop' + +model-paths: ["models"] +seed-paths: ["seeds"] +test-paths: ["tests"] +analysis-paths: ["analysis"] +macro-paths: ["macros"] + +target-path: "target" +clean-targets: + - "target" + - "dbt_modules" + - "logs" + +require-dbt-version: [">=1.0.0", "<2.0.0"] + +models: + jaffle_shop: + materialized: table + staging: + materialized: view diff --git a/dev/dags/dbt/original_jaffle_shop/macros/drop_table.sql b/dev/dags/dbt/original_jaffle_shop/macros/drop_table.sql new file mode 100644 index 000000000..37a8b21d7 --- /dev/null +++ b/dev/dags/dbt/original_jaffle_shop/macros/drop_table.sql @@ -0,0 +1,6 @@ +{%- macro drop_table_by_name(table_name) -%} + {%- set drop_query -%} + DROP TABLE IF EXISTS {{ target.schema }}.{{ table_name }} CASCADE + {%- endset -%} + {% do run_query(drop_query) %} +{%- endmacro -%} diff --git a/dev/dags/dbt/original_jaffle_shop/models/customers.sql b/dev/dags/dbt/original_jaffle_shop/models/customers.sql new file mode 100644 index 000000000..016a004fe --- /dev/null +++ b/dev/dags/dbt/original_jaffle_shop/models/customers.sql @@ -0,0 +1,69 @@ +with customers as ( + + select * from {{ ref('stg_customers') }} + +), + +orders as ( + + select * from {{ ref('stg_orders') }} + +), + +payments as ( + + select * from {{ ref('stg_payments') }} + +), + +customer_orders as ( + + select + customer_id, + + min(order_date) as first_order, + max(order_date) as most_recent_order, + count(order_id) as number_of_orders + from orders + + group by customer_id + +), + +customer_payments as ( + + select + orders.customer_id, + sum(amount) as total_amount + + from payments + + left join orders on + payments.order_id = orders.order_id + + group by orders.customer_id + +), + +final as ( + + select + customers.customer_id, + customers.first_name, + customers.last_name, + customer_orders.first_order, + customer_orders.most_recent_order, + customer_orders.number_of_orders, + customer_payments.total_amount as customer_lifetime_value + + from customers + + left join customer_orders + on customers.customer_id = customer_orders.customer_id + + left join customer_payments + on customers.customer_id = customer_payments.customer_id + +) + +select * from final diff --git a/dev/dags/dbt/original_jaffle_shop/models/docs.md b/dev/dags/dbt/original_jaffle_shop/models/docs.md new file mode 100644 index 000000000..c6ae93be0 --- /dev/null +++ b/dev/dags/dbt/original_jaffle_shop/models/docs.md @@ -0,0 +1,14 @@ +{% docs orders_status %} + +Orders can be one of the following statuses: + +| status | description | +|----------------|------------------------------------------------------------------------------------------------------------------------| +| placed | The order has been placed but has not yet left the warehouse | +| shipped | The order has ben shipped to the customer and is currently in transit | +| completed | The order has been received by the customer | +| return_pending | The customer has indicated that they would like to return the order, but it has not yet been received at the warehouse | +| returned | The order has been returned by the customer and received at the warehouse | + + +{% enddocs %} diff --git a/dev/dags/dbt/original_jaffle_shop/models/orders.sql b/dev/dags/dbt/original_jaffle_shop/models/orders.sql new file mode 100644 index 000000000..cbb293491 --- /dev/null +++ b/dev/dags/dbt/original_jaffle_shop/models/orders.sql @@ -0,0 +1,56 @@ +{% set payment_methods = ['credit_card', 'coupon', 'bank_transfer', 'gift_card'] %} + +with orders as ( + + select * from {{ ref('stg_orders') }} + +), + +payments as ( + + select * from {{ ref('stg_payments') }} + +), + +order_payments as ( + + select + order_id, + + {% for payment_method in payment_methods -%} + sum(case when payment_method = '{{ payment_method }}' then amount else 0 end) as {{ payment_method }}_amount, + {% endfor -%} + + sum(amount) as total_amount + + from payments + + group by order_id + +), + +final as ( + + select + orders.order_id, + orders.customer_id, + orders.order_date, + orders.status, + + {% for payment_method in payment_methods -%} + + order_payments.{{ payment_method }}_amount, + + {% endfor -%} + + order_payments.total_amount as amount + + from orders + + + left join order_payments + on orders.order_id = order_payments.order_id + +) + +select * from final diff --git a/dev/dags/dbt/original_jaffle_shop/models/overview.md b/dev/dags/dbt/original_jaffle_shop/models/overview.md new file mode 100644 index 000000000..0544c42b1 --- /dev/null +++ b/dev/dags/dbt/original_jaffle_shop/models/overview.md @@ -0,0 +1,11 @@ +{% docs __overview__ %} + +## Data Documentation for Jaffle Shop + +`jaffle_shop` is a fictional ecommerce store. + +This [dbt](https://www.getdbt.com/) project is for testing out code. + +The source code can be found [here](https://github.com/clrcrl/jaffle_shop). + +{% enddocs %} diff --git a/dev/dags/dbt/original_jaffle_shop/models/schema.yml b/dev/dags/dbt/original_jaffle_shop/models/schema.yml new file mode 100644 index 000000000..381349cfd --- /dev/null +++ b/dev/dags/dbt/original_jaffle_shop/models/schema.yml @@ -0,0 +1,82 @@ +version: 2 + +models: + - name: customers + description: This table has basic information about a customer, as well as some derived facts based on a customer's orders + + columns: + - name: customer_id + description: This is a unique identifier for a customer + tests: + - unique + - not_null + + - name: first_name + description: Customer's first name. PII. + + - name: last_name + description: Customer's last name. PII. + + - name: first_order + description: Date (UTC) of a customer's first order + + - name: most_recent_order + description: Date (UTC) of a customer's most recent order + + - name: number_of_orders + description: Count of the number of orders a customer has placed + + - name: total_order_amount + description: Total value (AUD) of a customer's orders + + - name: orders + description: This table has basic information about orders, as well as some derived facts based on payments + + columns: + - name: order_id + tests: + - unique + - not_null + description: This is a unique identifier for an order + + - name: customer_id + description: Foreign key to the customers table + tests: + - not_null + - relationships: + to: ref('customers') + field: customer_id + + - name: order_date + description: Date (UTC) that the order was placed + + - name: status + description: '{{ doc("orders_status") }}' + tests: + - accepted_values: + values: ['placed', 'shipped', 'completed', 'return_pending', 'returned'] + + - name: amount + description: Total amount (AUD) of the order + tests: + - not_null + + - name: credit_card_amount + description: Amount of the order (AUD) paid for by credit card + tests: + - not_null + + - name: coupon_amount + description: Amount of the order (AUD) paid for by coupon + tests: + - not_null + + - name: bank_transfer_amount + description: Amount of the order (AUD) paid for by bank transfer + tests: + - not_null + + - name: gift_card_amount + description: Amount of the order (AUD) paid for by gift card + tests: + - not_null diff --git a/dev/dags/dbt/original_jaffle_shop/models/staging/schema.yml b/dev/dags/dbt/original_jaffle_shop/models/staging/schema.yml new file mode 100644 index 000000000..c207e4cf5 --- /dev/null +++ b/dev/dags/dbt/original_jaffle_shop/models/staging/schema.yml @@ -0,0 +1,31 @@ +version: 2 + +models: + - name: stg_customers + columns: + - name: customer_id + tests: + - unique + - not_null + + - name: stg_orders + columns: + - name: order_id + tests: + - unique + - not_null + - name: status + tests: + - accepted_values: + values: ['placed', 'shipped', 'completed', 'return_pending', 'returned'] + + - name: stg_payments + columns: + - name: payment_id + tests: + - unique + - not_null + - name: payment_method + tests: + - accepted_values: + values: ['credit_card', 'coupon', 'bank_transfer', 'gift_card'] diff --git a/dev/dags/dbt/original_jaffle_shop/models/staging/stg_customers.sql b/dev/dags/dbt/original_jaffle_shop/models/staging/stg_customers.sql new file mode 100644 index 000000000..cad047269 --- /dev/null +++ b/dev/dags/dbt/original_jaffle_shop/models/staging/stg_customers.sql @@ -0,0 +1,22 @@ +with source as ( + + {#- + Normally we would select from the table here, but we are using seeds to load + our data in this project + #} + select * from {{ ref('raw_customers') }} + +), + +renamed as ( + + select + id as customer_id, + first_name, + last_name + + from source + +) + +select * from renamed diff --git a/dev/dags/dbt/original_jaffle_shop/models/staging/stg_orders.sql b/dev/dags/dbt/original_jaffle_shop/models/staging/stg_orders.sql new file mode 100644 index 000000000..a654dcb94 --- /dev/null +++ b/dev/dags/dbt/original_jaffle_shop/models/staging/stg_orders.sql @@ -0,0 +1,23 @@ +with source as ( + + {#- + Normally we would select from the table here, but we are using seeds to load + our data in this project + #} + select * from {{ ref('raw_orders') }} + +), + +renamed as ( + + select + id as order_id, + user_id as customer_id, + order_date, + status + + from source + +) + +select * from renamed diff --git a/dev/dags/dbt/original_jaffle_shop/models/staging/stg_payments.sql b/dev/dags/dbt/original_jaffle_shop/models/staging/stg_payments.sql new file mode 100644 index 000000000..f718596ad --- /dev/null +++ b/dev/dags/dbt/original_jaffle_shop/models/staging/stg_payments.sql @@ -0,0 +1,25 @@ +with source as ( + + {#- + Normally we would select from the table here, but we are using seeds to load + our data in this project + #} + select * from {{ ref('raw_payments') }} + +), + +renamed as ( + + select + id as payment_id, + order_id, + payment_method, + + -- `amount` is currently stored in cents, so we convert it to dollars + amount / 100 as amount + + from source + +) + +select * from renamed diff --git a/dev/dags/dbt/original_jaffle_shop/seeds/raw_customers.csv b/dev/dags/dbt/original_jaffle_shop/seeds/raw_customers.csv new file mode 100644 index 000000000..b3e6747d6 --- /dev/null +++ b/dev/dags/dbt/original_jaffle_shop/seeds/raw_customers.csv @@ -0,0 +1,101 @@ +id,first_name,last_name +1,Michael,P. +2,Shawn,M. +3,Kathleen,P. +4,Jimmy,C. +5,Katherine,R. +6,Sarah,R. +7,Martin,M. +8,Frank,R. +9,Jennifer,F. +10,Henry,W. +11,Fred,S. +12,Amy,D. +13,Kathleen,M. +14,Steve,F. +15,Teresa,H. +16,Amanda,H. +17,Kimberly,R. +18,Johnny,K. +19,Virginia,F. +20,Anna,A. +21,Willie,H. +22,Sean,H. +23,Mildred,A. +24,David,G. +25,Victor,H. +26,Aaron,R. +27,Benjamin,B. +28,Lisa,W. +29,Benjamin,K. +30,Christina,W. +31,Jane,G. +32,Thomas,O. +33,Katherine,M. +34,Jennifer,S. +35,Sara,T. +36,Harold,O. +37,Shirley,J. +38,Dennis,J. +39,Louise,W. +40,Maria,A. +41,Gloria,C. +42,Diana,S. +43,Kelly,N. +44,Jane,R. +45,Scott,B. +46,Norma,C. +47,Marie,P. +48,Lillian,C. +49,Judy,N. +50,Billy,L. +51,Howard,R. +52,Laura,F. +53,Anne,B. +54,Rose,M. +55,Nicholas,R. +56,Joshua,K. +57,Paul,W. +58,Kathryn,K. +59,Adam,A. +60,Norma,W. +61,Timothy,R. +62,Elizabeth,P. +63,Edward,G. +64,David,C. +65,Brenda,W. +66,Adam,W. +67,Michael,H. +68,Jesse,E. +69,Janet,P. +70,Helen,F. +71,Gerald,C. +72,Kathryn,O. +73,Alan,B. +74,Harry,A. +75,Andrea,H. +76,Barbara,W. +77,Anne,W. +78,Harry,H. +79,Jack,R. +80,Phillip,H. +81,Shirley,H. +82,Arthur,D. +83,Virginia,R. +84,Christina,R. +85,Theresa,M. +86,Jason,C. +87,Phillip,B. +88,Adam,T. +89,Margaret,J. +90,Paul,P. +91,Todd,W. +92,Willie,O. +93,Frances,R. +94,Gregory,H. +95,Lisa,P. +96,Jacqueline,A. +97,Shirley,D. +98,Nicole,M. +99,Mary,G. +100,Jean,M. diff --git a/dev/dags/dbt/original_jaffle_shop/seeds/raw_orders.csv b/dev/dags/dbt/original_jaffle_shop/seeds/raw_orders.csv new file mode 100644 index 000000000..c4870621b --- /dev/null +++ b/dev/dags/dbt/original_jaffle_shop/seeds/raw_orders.csv @@ -0,0 +1,100 @@ +id,user_id,order_date,status +1,1,2018-01-01,returned +2,3,2018-01-02,completed +3,94,2018-01-04,completed +4,50,2018-01-05,completed +5,64,2018-01-05,completed +6,54,2018-01-07,completed +7,88,2018-01-09,completed +8,2,2018-01-11,returned +9,53,2018-01-12,completed +10,7,2018-01-14,completed +11,99,2018-01-14,completed +12,59,2018-01-15,completed +13,84,2018-01-17,completed +14,40,2018-01-17,returned +15,25,2018-01-17,completed +16,39,2018-01-18,completed +17,71,2018-01-18,completed +18,64,2018-01-20,returned +19,54,2018-01-22,completed +20,20,2018-01-23,completed +21,71,2018-01-23,completed +22,86,2018-01-24,completed +23,22,2018-01-26,return_pending +24,3,2018-01-27,completed +25,51,2018-01-28,completed +26,32,2018-01-28,completed +27,94,2018-01-29,completed +28,8,2018-01-29,completed +29,57,2018-01-31,completed +30,69,2018-02-02,completed +31,16,2018-02-02,completed +32,28,2018-02-04,completed +33,42,2018-02-04,completed +34,38,2018-02-06,completed +35,80,2018-02-08,completed +36,85,2018-02-10,completed +37,1,2018-02-10,completed +38,51,2018-02-10,completed +39,26,2018-02-11,completed +40,33,2018-02-13,completed +41,99,2018-02-14,completed +42,92,2018-02-16,completed +43,31,2018-02-17,completed +44,66,2018-02-17,completed +45,22,2018-02-17,completed +46,6,2018-02-19,completed +47,50,2018-02-20,completed +48,27,2018-02-21,completed +49,35,2018-02-21,completed +50,51,2018-02-23,completed +51,71,2018-02-24,completed +52,54,2018-02-25,return_pending +53,34,2018-02-26,completed +54,54,2018-02-26,completed +55,18,2018-02-27,completed +56,79,2018-02-28,completed +57,93,2018-03-01,completed +58,22,2018-03-01,completed +59,30,2018-03-02,completed +60,12,2018-03-03,completed +61,63,2018-03-03,completed +62,57,2018-03-05,completed +63,70,2018-03-06,completed +64,13,2018-03-07,completed +65,26,2018-03-08,completed +66,36,2018-03-10,completed +67,79,2018-03-11,completed +68,53,2018-03-11,completed +69,3,2018-03-11,completed +70,8,2018-03-12,completed +71,42,2018-03-12,shipped +72,30,2018-03-14,shipped +73,19,2018-03-16,completed +74,9,2018-03-17,shipped +75,69,2018-03-18,completed +76,25,2018-03-20,completed +77,35,2018-03-21,shipped +78,90,2018-03-23,shipped +79,52,2018-03-23,shipped +80,11,2018-03-23,shipped +81,76,2018-03-23,shipped +82,46,2018-03-24,shipped +83,54,2018-03-24,shipped +84,70,2018-03-26,placed +85,47,2018-03-26,shipped +86,68,2018-03-26,placed +87,46,2018-03-27,placed +88,91,2018-03-27,shipped +89,21,2018-03-28,placed +90,66,2018-03-30,shipped +91,47,2018-03-31,placed +92,84,2018-04-02,placed +93,66,2018-04-03,placed +94,63,2018-04-03,placed +95,27,2018-04-04,placed +96,90,2018-04-06,placed +97,89,2018-04-07,placed +98,41,2018-04-07,placed +99,85,2018-04-09,placed diff --git a/dev/dags/dbt/original_jaffle_shop/seeds/raw_payments.csv b/dev/dags/dbt/original_jaffle_shop/seeds/raw_payments.csv new file mode 100644 index 000000000..a587baab5 --- /dev/null +++ b/dev/dags/dbt/original_jaffle_shop/seeds/raw_payments.csv @@ -0,0 +1,114 @@ +id,order_id,payment_method,amount +1,1,credit_card,1000 +2,2,credit_card,2000 +3,3,coupon,100 +4,4,coupon,2500 +5,5,bank_transfer,1700 +6,6,credit_card,600 +7,7,credit_card,1600 +8,8,credit_card,2300 +9,9,gift_card,2300 +10,9,bank_transfer,0 +11,10,bank_transfer,2600 +12,11,credit_card,2700 +13,12,credit_card,100 +14,13,credit_card,500 +15,13,bank_transfer,1400 +16,14,bank_transfer,300 +17,15,coupon,2200 +18,16,credit_card,1000 +19,17,bank_transfer,200 +20,18,credit_card,500 +21,18,credit_card,800 +22,19,gift_card,600 +23,20,bank_transfer,1500 +24,21,credit_card,1200 +25,22,bank_transfer,800 +26,23,gift_card,2300 +27,24,coupon,2600 +28,25,bank_transfer,2000 +29,25,credit_card,2200 +30,25,coupon,1600 +31,26,credit_card,3000 +32,27,credit_card,2300 +33,28,bank_transfer,1900 +34,29,bank_transfer,1200 +35,30,credit_card,1300 +36,31,credit_card,1200 +37,32,credit_card,300 +38,33,credit_card,2200 +39,34,bank_transfer,1500 +40,35,credit_card,2900 +41,36,bank_transfer,900 +42,37,credit_card,2300 +43,38,credit_card,1500 +44,39,bank_transfer,800 +45,40,credit_card,1400 +46,41,credit_card,1700 +47,42,coupon,1700 +48,43,gift_card,1800 +49,44,gift_card,1100 +50,45,bank_transfer,500 +51,46,bank_transfer,800 +52,47,credit_card,2200 +53,48,bank_transfer,300 +54,49,credit_card,600 +55,49,credit_card,900 +56,50,credit_card,2600 +57,51,credit_card,2900 +58,51,credit_card,100 +59,52,bank_transfer,1500 +60,53,credit_card,300 +61,54,credit_card,1800 +62,54,bank_transfer,1100 +63,55,credit_card,2900 +64,56,credit_card,400 +65,57,bank_transfer,200 +66,58,coupon,1800 +67,58,gift_card,600 +68,59,gift_card,2800 +69,60,credit_card,400 +70,61,bank_transfer,1600 +71,62,gift_card,1400 +72,63,credit_card,2900 +73,64,bank_transfer,2600 +74,65,credit_card,0 +75,66,credit_card,2800 +76,67,bank_transfer,400 +77,67,credit_card,1900 +78,68,credit_card,1600 +79,69,credit_card,1900 +80,70,credit_card,2600 +81,71,credit_card,500 +82,72,credit_card,2900 +83,73,bank_transfer,300 +84,74,credit_card,3000 +85,75,credit_card,1900 +86,76,coupon,200 +87,77,credit_card,0 +88,77,bank_transfer,1900 +89,78,bank_transfer,2600 +90,79,credit_card,1800 +91,79,credit_card,900 +92,80,gift_card,300 +93,81,coupon,200 +94,82,credit_card,800 +95,83,credit_card,100 +96,84,bank_transfer,2500 +97,85,bank_transfer,1700 +98,86,coupon,2300 +99,87,gift_card,3000 +100,87,credit_card,2600 +101,88,credit_card,2900 +102,89,bank_transfer,2200 +103,90,bank_transfer,200 +104,91,credit_card,1900 +105,92,bank_transfer,1500 +106,92,coupon,200 +107,93,gift_card,2600 +108,94,coupon,700 +109,95,coupon,2400 +110,96,gift_card,1700 +111,97,bank_transfer,1400 +112,98,bank_transfer,1000 +113,99,credit_card,2400 diff --git a/dev/dags/simple_dag_async.py b/dev/dags/simple_dag_async.py index 787461236..1b2b67651 100644 --- a/dev/dags/simple_dag_async.py +++ b/dev/dags/simple_dag_async.py @@ -2,8 +2,8 @@ from datetime import datetime from pathlib import Path -from cosmos import DbtDag, ExecutionConfig, ExecutionMode, ProfileConfig, ProjectConfig -from cosmos.profiles import PostgresUserPasswordProfileMapping +from cosmos import DbtDag, ExecutionConfig, ExecutionMode, ProfileConfig, ProjectConfig, RenderConfig +from cosmos.profiles import GoogleCloudServiceAccountDictProfileMapping DEFAULT_DBT_ROOT_PATH = Path(__file__).parent / "dbt" DBT_ROOT_PATH = Path(os.getenv("DBT_ROOT_PATH", DEFAULT_DBT_ROOT_PATH)) @@ -11,29 +11,32 @@ profile_config = ProfileConfig( profile_name="default", target_name="dev", - profile_mapping=PostgresUserPasswordProfileMapping( - conn_id="example_conn", - profile_args={"schema": "public"}, - disable_event_tracking=True, + profile_mapping=GoogleCloudServiceAccountDictProfileMapping( + conn_id="gcp_gs_conn", profile_args={"dataset": "release_17", "project": "astronomer-dag-authoring"} ), ) + # [START airflow_async_execution_mode_example] simple_dag_async = DbtDag( # dbt/cosmos-specific parameters project_config=ProjectConfig( - DBT_ROOT_PATH / "jaffle_shop", + DBT_ROOT_PATH / "original_jaffle_shop", ), profile_config=profile_config, execution_config=ExecutionConfig( execution_mode=ExecutionMode.AIRFLOW_ASYNC, ), + render_config=RenderConfig( + select=["path:models"], + # test_behavior=TestBehavior.NONE + ), # normal dag parameters schedule_interval=None, start_date=datetime(2023, 1, 1), catchup=False, dag_id="simple_dag_async", tags=["simple"], - operator_args={"install_deps": True}, + operator_args={"full_refresh": True, "location": "northamerica-northeast1"}, ) # [END airflow_async_execution_mode_example] diff --git a/docs/getting_started/execution-modes.rst b/docs/getting_started/execution-modes.rst index ec150992d..10f6cce67 100644 --- a/docs/getting_started/execution-modes.rst +++ b/docs/getting_started/execution-modes.rst @@ -243,6 +243,7 @@ Each task will create a new Cloud Run Job execution, giving full isolation. The }, ) + Airflow Async (experimental) ---------------------------- @@ -268,9 +269,20 @@ deferrable operators and supplying to them those compiled SQLs. Note that currently, the ``airflow_async`` execution mode has the following limitations and is released as Experimental: -1. Only supports the ``dbt resource type`` models to be run asynchronously using Airflow deferrable operators. All other resources are executed synchronously using dbt commands as they are in the ``local`` execution mode. -2. Only supports BigQuery as the target database. If a profile target other than BigQuery is specified, Cosmos will error out saying that the target database is not supported with this execution mode. -3. Only works for ``full_refresh`` models. There is pending work to support other modes. +1. This feature only works when using Airflow 2.8 and above +2. Only supports the ``dbt resource type`` models to be run asynchronously using Airflow deferrable operators. All other resources are executed synchronously using dbt commands as they are in the ``local`` execution mode. +3. Only supports BigQuery as the target database. If a profile target other than BigQuery is specified, Cosmos will error out saying that the target database is not supported with this execution mode. +4. Only works for ``full_refresh`` models. There is pending work to support other modes. +5. Only Support for the Bigquery profile type +6. Users need to provide ProfileMapping parameter in ProfileConfig +7. It does not support dataset + +You can leverage async operator support by installing an additional dependency + +.. code:: bash + + astronomer-cosmos[dbt-bigquery, google] + Example DAG: @@ -279,6 +291,35 @@ Example DAG: :start-after: [START airflow_async_execution_mode_example] :end-before: [END airflow_async_execution_mode_example] +**Known Issue:** + +The ``dag test`` command failed with the following error, likely because the trigger does not fully initialize during the ``dag test``, leading to an uninitialized task instance. +This causes the BigQuery trigger to attempt accessing parameters of the Task Instance that are not properly initialized. + +.. code:: bash + + [2024-10-01T18:19:09.726+0530] {base_events.py:1738} ERROR - unhandled exception during asyncio.run() shutdown + task: ()> exception=AttributeError("'NoneType' object has no attribute 'dag_id'")> + Traceback (most recent call last): + File "/Users/pankaj/Documents/astro_code/astronomer-cosmos/devenv/lib/python3.9/site-packages/airflow/providers/google/cloud/triggers/bigquery.py", line 138, in run + yield TriggerEvent( + asyncio.exceptions.CancelledError + + During handling of the above exception, another exception occurred: + + Traceback (most recent call last): + File "/Users/pankaj/Documents/astro_code/astronomer-cosmos/devenv/lib/python3.9/site-packages/airflow/providers/google/cloud/triggers/bigquery.py", line 157, in run + if self.job_id and self.cancel_on_kill and self.safe_to_cancel(): + File "/Users/pankaj/Documents/astro_code/astronomer-cosmos/devenv/lib/python3.9/site-packages/airflow/providers/google/cloud/triggers/bigquery.py", line 126, in safe_to_cancel + task_instance = self.get_task_instance() # type: ignore[call-arg] + File "/Users/pankaj/Documents/astro_code/astronomer-cosmos/devenv/lib/python3.9/site-packages/airflow/utils/session.py", line 97, in wrapper + return func(*args, session=session, **kwargs) + File "/Users/pankaj/Documents/astro_code/astronomer-cosmos/devenv/lib/python3.9/site-packages/airflow/providers/google/cloud/triggers/bigquery.py", line 102, in get_task_instance + TaskInstance.dag_id == self.task_instance.dag_id, + AttributeError: 'NoneType' object has no attribute 'dag_id' + + + .. _invocation_modes: Invocation Modes ================ diff --git a/scripts/test/integration-dbt-1-5-4.sh b/scripts/test/integration-dbt-1-5-4.sh index bb936fc21..6992b8f15 100644 --- a/scripts/test/integration-dbt-1-5-4.sh +++ b/scripts/test/integration-dbt-1-5-4.sh @@ -1,5 +1,5 @@ pip uninstall dbt-adapters dbt-common dbt-core dbt-extractor dbt-postgres dbt-semantic-interfaces -y -pip install dbt-postgres==1.5.4 dbt-databricks==1.5.4 +pip install dbt-postgres==1.5.4 dbt-databricks==1.5.4 dbt-bigquery==1.5.4 export SOURCE_RENDERING_BEHAVIOR=all rm -rf airflow.*; \ airflow db init; \ diff --git a/scripts/test/integration-setup.sh b/scripts/test/integration-setup.sh index c6e106fd5..fec9e95eb 100644 --- a/scripts/test/integration-setup.sh +++ b/scripts/test/integration-setup.sh @@ -11,4 +11,4 @@ rm -rf airflow.* pip freeze | grep airflow airflow db reset -y airflow db init -pip install 'dbt-core' 'dbt-databricks' 'dbt-postgres' 'dbt-vertica' 'openlineage-airflow' +pip install 'dbt-core' 'dbt-bigquery' 'dbt-databricks' 'dbt-postgres' 'dbt-vertica' 'openlineage-airflow' diff --git a/tests/airflow/test_graph.py b/tests/airflow/test_graph.py index 6fc7cdc0a..1bd8cab35 100644 --- a/tests/airflow/test_graph.py +++ b/tests/airflow/test_graph.py @@ -1,7 +1,7 @@ import os from datetime import datetime from pathlib import Path -from unittest.mock import patch +from unittest.mock import MagicMock, patch import pytest from airflow import __version__ as airflow_version @@ -30,7 +30,7 @@ ) from cosmos.converter import airflow_kwargs from cosmos.dbt.graph import DbtNode -from cosmos.profiles import PostgresUserPasswordProfileMapping +from cosmos.profiles import GoogleCloudServiceAccountFileProfileMapping, PostgresUserPasswordProfileMapping SAMPLE_PROJ_PATH = Path("/home/user/path/dbt-proj/") SOURCE_RENDERING_BEHAVIOR = SourceRenderingBehavior(os.getenv("SOURCE_RENDERING_BEHAVIOR", "none")) @@ -228,19 +228,21 @@ def test_build_airflow_graph_with_after_all(): @pytest.mark.integration +@patch("airflow.hooks.base.BaseHook.get_connection", new=MagicMock()) def test_build_airflow_graph_with_dbt_compile_task(): + bigquery_profile_config = ProfileConfig( + profile_name="my-bigquery-db", + target_name="dev", + profile_mapping=GoogleCloudServiceAccountFileProfileMapping( + conn_id="fake_conn", profile_args={"dataset": "release_17"} + ), + ) with DAG("test-id-dbt-compile", start_date=datetime(2022, 1, 1)) as dag: task_args = { "project_dir": SAMPLE_PROJ_PATH, "conn_id": "fake_conn", - "profile_config": ProfileConfig( - profile_name="default", - target_name="default", - profile_mapping=PostgresUserPasswordProfileMapping( - conn_id="fake_conn", - profile_args={"schema": "public"}, - ), - ), + "profile_config": bigquery_profile_config, + "location": "", } render_config = RenderConfig( select=["tag:some"], @@ -318,7 +320,7 @@ def test_create_task_metadata_unsupported(caplog): tags=[], config={}, ) - response = create_task_metadata(child_node, execution_mode="", args={}) + response = create_task_metadata(child_node, execution_mode="", args={}, dbt_dag_task_group_identifier="") assert response is None expected_msg = ( "Unavailable conversion function for (node ). " @@ -337,6 +339,7 @@ def test_create_task_metadata_unsupported(caplog): "cosmos.operators.local.DbtRunLocalOperator", {"models": "my_model"}, { + "dbt_dag_task_group_identifier": "", "dbt_node_config": { "unique_id": "model.my_folder.my_model", "resource_type": "model", @@ -347,7 +350,7 @@ def test_create_task_metadata_unsupported(caplog): "has_test": False, "resource_name": "my_model", "name": "my_model", - } + }, }, ), ( @@ -377,6 +380,7 @@ def test_create_task_metadata_unsupported(caplog): "cosmos.operators.local.DbtSnapshotLocalOperator", {"models": "my_snapshot"}, { + "dbt_dag_task_group_identifier": "", "dbt_node_config": { "unique_id": "snapshot.my_folder.my_snapshot", "resource_type": "snapshot", @@ -411,7 +415,9 @@ def test_create_task_metadata_model( has_freshness=True, ) - metadata = create_task_metadata(child_node, execution_mode=ExecutionMode.LOCAL, args={}) + metadata = create_task_metadata( + child_node, execution_mode=ExecutionMode.LOCAL, args={}, dbt_dag_task_group_identifier="" + ) if metadata: assert metadata.id == expected_id assert metadata.operator_class == expected_operator_class @@ -428,7 +434,9 @@ def test_create_task_metadata_model_with_versions(caplog): tags=[], config={}, ) - metadata = create_task_metadata(child_node, execution_mode=ExecutionMode.LOCAL, args={}) + metadata = create_task_metadata( + child_node, execution_mode=ExecutionMode.LOCAL, args={}, dbt_dag_task_group_identifier="" + ) assert metadata.id == "my_model_v1_run" assert metadata.operator_class == "cosmos.operators.local.DbtRunLocalOperator" assert metadata.arguments == {"models": "my_model.v1"} @@ -443,7 +451,9 @@ def test_create_task_metadata_model_use_task_group(caplog): tags=[], config={}, ) - metadata = create_task_metadata(child_node, execution_mode=ExecutionMode.LOCAL, args={}, use_task_group=True) + metadata = create_task_metadata( + child_node, execution_mode=ExecutionMode.LOCAL, args={}, use_task_group=True, dbt_dag_task_group_identifier="" + ) assert metadata.id == "run" @@ -498,7 +508,11 @@ def test_create_task_metadata_source_with_rendering_options( ) metadata = create_task_metadata( - child_node, execution_mode=ExecutionMode.LOCAL, source_rendering_behavior=source_rendering_behavior, args={} + child_node, + execution_mode=ExecutionMode.LOCAL, + source_rendering_behavior=source_rendering_behavior, + args={}, + dbt_dag_task_group_identifier="", ) if metadata: assert metadata.id == expected_id @@ -516,12 +530,15 @@ def test_create_task_metadata_seed(caplog, use_task_group): config={}, ) if use_task_group is None: - metadata = create_task_metadata(sample_node, execution_mode=ExecutionMode.DOCKER, args={}) + metadata = create_task_metadata( + sample_node, execution_mode=ExecutionMode.DOCKER, args={}, dbt_dag_task_group_identifier="" + ) else: metadata = create_task_metadata( sample_node, execution_mode=ExecutionMode.DOCKER, args={}, + dbt_dag_task_group_identifier="", use_task_group=use_task_group, ) @@ -543,7 +560,9 @@ def test_create_task_metadata_snapshot(caplog): tags=[], config={}, ) - metadata = create_task_metadata(sample_node, execution_mode=ExecutionMode.KUBERNETES, args={}) + metadata = create_task_metadata( + sample_node, execution_mode=ExecutionMode.KUBERNETES, args={}, dbt_dag_task_group_identifier="" + ) assert metadata.id == "my_snapshot_snapshot" assert metadata.operator_class == "cosmos.operators.kubernetes.DbtSnapshotKubernetesOperator" assert metadata.arguments == {"models": "my_snapshot"} diff --git a/tests/dbt/test_graph.py b/tests/dbt/test_graph.py index 4174c9a2d..1c0912042 100644 --- a/tests/dbt/test_graph.py +++ b/tests/dbt/test_graph.py @@ -1522,7 +1522,7 @@ def test_save_dbt_ls_cache(mock_variable_set, mock_datetime, tmp_dbt_project_dir hash_dir, hash_args = version.split(",") assert hash_args == "d41d8cd98f00b204e9800998ecf8427e" if sys.platform == "darwin": - assert hash_dir == "c1e25b0679b5ddcb636bcc30f2f85a06" + assert hash_dir == "25beeb54cc4eeabe6198248e286a1cfe" else: assert hash_dir == "6f63493009733a7be34364a6ea3ffd3c" diff --git a/tests/operators/test_airflow_async.py b/tests/operators/test_airflow_async.py index fc085c7d0..ec2f5e715 100644 --- a/tests/operators/test_airflow_async.py +++ b/tests/operators/test_airflow_async.py @@ -1,10 +1,11 @@ +import pytest +from airflow import __version__ as airflow_version +from airflow.providers.google.cloud.operators.bigquery import BigQueryInsertJobOperator +from packaging import version + from cosmos.operators.airflow_async import ( DbtBuildAirflowAsyncOperator, DbtCompileAirflowAsyncOperator, - DbtDocsAirflowAsyncOperator, - DbtDocsAzureStorageAirflowAsyncOperator, - DbtDocsGCSAirflowAsyncOperator, - DbtDocsS3AirflowAsyncOperator, DbtLSAirflowAsyncOperator, DbtRunAirflowAsyncOperator, DbtRunOperationAirflowAsyncOperator, @@ -16,12 +17,7 @@ from cosmos.operators.local import ( DbtBuildLocalOperator, DbtCompileLocalOperator, - DbtDocsAzureStorageLocalOperator, - DbtDocsGCSLocalOperator, - DbtDocsLocalOperator, - DbtDocsS3LocalOperator, DbtLSLocalOperator, - DbtRunLocalOperator, DbtRunOperationLocalOperator, DbtSeedLocalOperator, DbtSnapshotLocalOperator, @@ -50,8 +46,12 @@ def test_dbt_source_airflow_async_operator_inheritance(): assert issubclass(DbtSourceAirflowAsyncOperator, DbtSourceLocalOperator) +@pytest.mark.skipif( + version.parse(airflow_version) < version.parse("2.8"), + reason="Cosmos Async operators only work with Airflow 2.8 onwards.", +) def test_dbt_run_airflow_async_operator_inheritance(): - assert issubclass(DbtRunAirflowAsyncOperator, DbtRunLocalOperator) + assert issubclass(DbtRunAirflowAsyncOperator, BigQueryInsertJobOperator) def test_dbt_test_airflow_async_operator_inheritance(): @@ -62,21 +62,5 @@ def test_dbt_run_operation_airflow_async_operator_inheritance(): assert issubclass(DbtRunOperationAirflowAsyncOperator, DbtRunOperationLocalOperator) -def test_dbt_docs_airflow_async_operator_inheritance(): - assert issubclass(DbtDocsAirflowAsyncOperator, DbtDocsLocalOperator) - - -def test_dbt_docs_s3_airflow_async_operator_inheritance(): - assert issubclass(DbtDocsS3AirflowAsyncOperator, DbtDocsS3LocalOperator) - - -def test_dbt_docs_azure_storage_airflow_async_operator_inheritance(): - assert issubclass(DbtDocsAzureStorageAirflowAsyncOperator, DbtDocsAzureStorageLocalOperator) - - -def test_dbt_docs_gcs_airflow_async_operator_inheritance(): - assert issubclass(DbtDocsGCSAirflowAsyncOperator, DbtDocsGCSLocalOperator) - - def test_dbt_compile_airflow_async_operator_inheritance(): assert issubclass(DbtCompileAirflowAsyncOperator, DbtCompileLocalOperator) diff --git a/tests/operators/test_local.py b/tests/operators/test_local.py index c7615225f..ed954dfdf 100644 --- a/tests/operators/test_local.py +++ b/tests/operators/test_local.py @@ -1147,7 +1147,7 @@ def test_dbt_compile_local_operator_initialisation(): @patch("cosmos.operators.local.remote_target_path", new="s3://some-bucket/target") -@patch("cosmos.operators.local.AIRFLOW_IO_AVAILABLE", new=False) +@patch("cosmos.settings.AIRFLOW_IO_AVAILABLE", new=False) def test_configure_remote_target_path_object_storage_unavailable_on_earlier_airflow_versions(): operator = DbtCompileLocalOperator( task_id="fake-task", @@ -1242,6 +1242,7 @@ def test_upload_compiled_sql_should_upload(mock_configure_remote, mock_object_st profile_config=profile_config, project_dir="fake-dir", dag=DAG("test_dag", start_date=datetime(2024, 4, 16)), + extra_context={"dbt_dag_task_group_identifier": "test_dag"}, ) mock_configure_remote.return_value = ("mock_remote_path", "mock_conn_id") diff --git a/tests/test_example_dags.py b/tests/test_example_dags.py index 9aa66432d..6c7e98802 100644 --- a/tests/test_example_dags.py +++ b/tests/test_example_dags.py @@ -68,6 +68,10 @@ def get_dag_bag() -> DagBag: print(f"Adding {dagfile} to .airflowignore") file.writelines([f"{dagfile}\n"]) + # Ignore Async DAG for dbt <=1.5 + if DBT_VERSION <= Version("1.5.0"): + file.writelines(["simple_dag_async.py\n"]) + # The dbt sqlite adapter is only available until dbt 1.4 if DBT_VERSION >= Version("1.5.0"): file.writelines(["example_cosmos_sources.py\n"]) @@ -98,4 +102,10 @@ def test_example_dag(session, dag_id: str): return dag_bag = get_dag_bag() dag = dag_bag.get_dag(dag_id) - test_utils.run_dag(dag) + + # This feature is available since Airflow 2.5 and we've backported it in Cosmos: + # https://airflow.apache.org/docs/apache-airflow/stable/release_notes.html#airflow-2-5-0-2022-12-02 + if AIRFLOW_VERSION >= Version("2.5"): + dag.test() + else: + test_utils.run_dag(dag) diff --git a/tests/test_example_dags_no_connections.py b/tests/test_example_dags_no_connections.py index 0cc560ecc..3a43a644c 100644 --- a/tests/test_example_dags_no_connections.py +++ b/tests/test_example_dags_no_connections.py @@ -43,6 +43,10 @@ def get_dag_bag() -> DagBag: print(f"Adding {dagfile} to .airflowignore") file.writelines([f"{dagfile}\n"]) + # Ignore Async DAG for dbt <=1.5 + if DBT_VERSION <= Version("1.5.0"): + file.writelines(["simple_dag_async.py\n"]) + if DBT_VERSION >= Version("1.5.0"): file.writelines(["example_cosmos_sources.py\n"]) if DBT_VERSION < Version("1.6.0"):