Skip to content

Commit

Permalink
Ensure filtering with manifest loading works with single model (#576)
Browse files Browse the repository at this point in the history
## Description

<!-- Add a brief but complete description of the change. -->

This PR fixes an issue with Cosmos' custom filtering methods.
Previously, when using the manifest loading method (and thus Cosmos'
custom filtering), a user could not filter by path to a specific model
(only a directory). This PR fixes that issue and makes some minor
refactors to the selecting logic to make it more intuitive and
performant.

## Related Issue(s)

<!-- If this PR closes an issue, you can use a keyword to auto-close.
-->
<!-- i.e. "closes #0000" -->

## Breaking Change?

<!-- If this introduces a breaking change, specify that here. -->

No breaking changes.

## Checklist

- [ ] I have made corresponding changes to the documentation (if
required)
- [x] I have added tests that prove my fix is effective or that my
feature works

---------

Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com>
Co-authored-by: Harel Shein <[email protected]>
  • Loading branch information
3 people authored Oct 6, 2023
1 parent 6609f76 commit 2e3e4fd
Show file tree
Hide file tree
Showing 7 changed files with 25,838 additions and 18 deletions.
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,9 @@
# these files get autogenerated
docs/profiles/*

# dbt_packages is a directory that gets created when you run dbt deps
dbt_packages/

# Byte-compiled / optimized / DLL files
__pycache__/
*.py[cod]
Expand Down
63 changes: 46 additions & 17 deletions cosmos/dbt/selector.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

from typing import TYPE_CHECKING

from cosmos.constants import DbtResourceType
from cosmos.exceptions import CosmosValueError
from cosmos.log import get_logger

Expand Down Expand Up @@ -76,6 +77,9 @@ def load_from_statement(self, statement: str) -> None:
self.other.append(item)
logger.warning("Unsupported select statement: %s", item)

def __repr__(self) -> str:
return f"SelectorConfig(paths={self.paths}, tags={self.tags}, config={self.config}, other={self.other})"


def select_nodes_ids_by_intersection(nodes: dict[str, DbtNode], config: SelectorConfig) -> set[str]:
"""
Expand All @@ -88,30 +92,55 @@ def select_nodes_ids_by_intersection(nodes: dict[str, DbtNode], config: Selector
https://docs.getdbt.com/reference/node-selection/syntax
https://docs.getdbt.com/reference/node-selection/yaml-selectors
"""
if config.is_empty:
return set(nodes.keys())

selected_nodes = set()
visited_nodes = set()

if not config.is_empty:
for node_id, node in nodes.items():
if config.tags and not (sorted(node.tags) == sorted(config.tags)):
continue
def should_include_node(node_id: str, node: DbtNode) -> bool:
"Checks if a single node should be included. Only runs once per node with caching."
if node_id in visited_nodes:
return node_id in selected_nodes

supported_node_config = {key: value for key, value in node.config.items() if key in SUPPORTED_CONFIG}
config_tag = config.config.get("tags")
if config.config:
if config_tag and config_tag not in supported_node_config.get("tags", []):
continue
visited_nodes.add(node_id)

# Remove 'tags' as they've already been filtered for
config_copy = copy.deepcopy(config.config)
config_copy.pop("tags", None)
supported_node_config.pop("tags", None)
if config.tags:
if not (set(config.tags) == set(node.tags)):
return False

if not (config_copy.items() <= supported_node_config.items()):
continue
node_config = {key: value for key, value in node.config.items() if key in SUPPORTED_CONFIG}
config_tags = config.config.get("tags")
if config_tags and config_tags not in node_config.get("tags", []):
return False

if config.paths and not (set(config.paths).issubset(set(node.file_path.parents))):
continue
# Remove 'tags' as they've already been filtered for
config_copy = copy.deepcopy(config.config)
config_copy.pop("tags", None)
node_config.pop("tags", None)

if not (config_copy.items() <= node_config.items()):
return False

if config.paths:
for filter_path in config.paths:
if filter_path in node.file_path.parents or filter_path == node.file_path:
return True

# if it's a test coming from a schema.yml file, check the model's file_path
if node.resource_type == DbtResourceType.TEST and node.file_path.name == "schema.yml":
# try to get the corresponding model from node.depends_on
if len(node.depends_on) == 1:
model_node = nodes.get(node.depends_on[0])
if model_node:
return should_include_node(node.depends_on[0], model_node)

return False

return True

for node_id, node in nodes.items():
if should_include_node(node_id, node):
selected_nodes.add(node_id)

return selected_nodes
Expand Down
40 changes: 40 additions & 0 deletions dev/dags/cosmos_manifest_example.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
"""
An example DAG that uses Cosmos to render a dbt project.
"""

import os
from datetime import datetime
from pathlib import Path

from cosmos import DbtDag, ProjectConfig, ProfileConfig, RenderConfig, LoadMode
from cosmos.profiles import PostgresUserPasswordProfileMapping

DEFAULT_DBT_ROOT_PATH = Path(__file__).parent / "dbt"
DBT_ROOT_PATH = Path(os.getenv("DBT_ROOT_PATH", DEFAULT_DBT_ROOT_PATH))

profile_config = ProfileConfig(
profile_name="default",
target_name="dev",
profile_mapping=PostgresUserPasswordProfileMapping(
conn_id="airflow_db",
profile_args={"schema": "public"},
),
)

# [START local_example]
cosmos_manifest_example = DbtDag(
# dbt/cosmos-specific parameters
project_config=ProjectConfig(
DBT_ROOT_PATH / "jaffle_shop",
manifest_path=DBT_ROOT_PATH / "jaffle_shop" / "target" / "manifest.json",
),
profile_config=profile_config,
render_config=RenderConfig(load_method=LoadMode.DBT_MANIFEST, select=["path:models/customers.sql"]),
operator_args={"install_deps": True},
# normal dag parameters
schedule_interval="@daily",
start_date=datetime(2023, 1, 1),
catchup=False,
dag_id="cosmos_manifest_example",
)
# [END local_example]
Loading

0 comments on commit 2e3e4fd

Please sign in to comment.