Skip to content

Commit

Permalink
Support to cache profiles created via ProfileMapping (#1046)
Browse files Browse the repository at this point in the history
Add dbt profile caching mechanism.

1. Introduced env `enable_cache_profile` to enable or disable profile
caching. This will be enabled only if global `enable_cache` is enabled.
2. Users can set the env `profile_cache_dir_name`. This will be the name
of a sub-dir inside `cache_dir` where cached profiles will be stored.
This is optional, and the default name is `profile`
3. Example Path for versioned profile:
`{cache_dir}/{profile_cache_dir}/592906f650558ce1dadb75fcce84a2ec09e444441e6af6069f19204d59fe428b/profiles.yml`
4. Implemented profile mapping hashing: first, the profile is serialized
using pickle. Then, the profile_name and target_name are appended before
hashing the data using the SHA-256 algorithm

**Perf test result:**
In local dev env with command
```
AIRFLOW_HOME=`pwd` AIRFLOW_CONN_EXAMPLE_CONN="postgres://postgres:[email protected]:5432/postgres"  AIRFLOW_HOME=`pwd` AIRFLOW__CORE__DAGBAG_IMPORT_TIMEOUT=20000 AIRFLOW__CORE__DAG_FILE_PROCESSOR_TIMEOUT=20000 hatch run tests.py3.10-2.8:test-performance
```

NUM_MODELS=100
- TIME=167.45248413085938 (with profile cache enabled)
- TIME=173.94845390319824 (with profile cache disabled)

NUM_MODELS=200
- TIME=376.2585120201111 (with profile cache enabled)
- TIME=418.14210200309753 (with profile cache disabled)

Closes: #925
Closes: #647
  • Loading branch information
pankajastro committed Jun 27, 2024
1 parent 3bc46b7 commit 2d2e7af
Show file tree
Hide file tree
Showing 10 changed files with 273 additions and 27 deletions.
52 changes: 51 additions & 1 deletion cosmos/cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,10 @@
from sqlalchemy.orm import Session

from cosmos import settings
from cosmos.constants import DBT_MANIFEST_FILE_NAME, DBT_TARGET_DIR_NAME
from cosmos.constants import DBT_MANIFEST_FILE_NAME, DBT_TARGET_DIR_NAME, DEFAULT_PROFILES_FILE_NAME
from cosmos.dbt.project import get_partial_parse_path
from cosmos.log import get_logger
from cosmos.settings import cache_dir, dbt_profile_cache_dir_name, enable_cache, enable_cache_profile

logger = get_logger(__name__)
VAR_KEY_CACHE_PREFIX = "cosmos_cache__"
Expand Down Expand Up @@ -346,3 +347,52 @@ def delete_unused_dbt_ls_cache(
f"Deleted {deleted_cosmos_variables}/{total_cosmos_variables} Airflow Variables used to store Cosmos cache. "
)
return deleted_cosmos_variables


def is_profile_cache_enabled() -> bool:
"""Return True if global and profile cache is enable"""
return enable_cache and enable_cache_profile


def _get_or_create_profile_cache_dir() -> Path:
"""
Get or create the directory path for caching DBT profiles.
- Constructs the profile cache directory path based on cache_dir and dbt_profile_cache_dir.
- Checks if the directory exists; if not, creates it
- Return profile cache directory
"""
profile_cache_dir = cache_dir / dbt_profile_cache_dir_name
if not profile_cache_dir.exists():
profile_cache_dir.mkdir(parents=True, exist_ok=True)
return profile_cache_dir


def get_cached_profile(version: str) -> Path | None:
"""
Retrieve the path to a cached DBT profile YML file if it exists for the given version.
- Constructs the DBT profile YML Path based on version and profile cache directory
- Checks if the profile YML exists
- Return the profile YML Path
"""
profile_yml_path = _get_or_create_profile_cache_dir() / version / DEFAULT_PROFILES_FILE_NAME
if profile_yml_path.exists() and profile_yml_path.is_file():
return profile_yml_path
return None


def create_cache_profile(version: str, profile_content: str) -> Path:
"""
Create a cached DBT profile YAML file with the provided content for the given version.
- Constructs the path for profile YML based on the version in the profile cache directory
- Creates the profile directory if it does not exist
- Writes the profile content to the profile YML file
- Return the profile YML Path
"""
profile_yml_dir = _get_or_create_profile_cache_dir() / version
profile_yml_dir.mkdir(parents=True, exist_ok=True)
profile_yml_path = profile_yml_dir / DEFAULT_PROFILES_FILE_NAME
profile_yml_path.write_text(profile_content)
return profile_yml_path
70 changes: 48 additions & 22 deletions cosmos/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,9 @@
from pathlib import Path
from typing import Any, Callable, Iterator

from cosmos.cache import create_cache_profile, get_cached_profile, is_profile_cache_enabled
from cosmos.constants import (
DEFAULT_PROFILES_FILE_NAME,
DbtResourceType,
ExecutionMode,
InvocationMode,
Expand All @@ -25,8 +27,6 @@

logger = get_logger(__name__)

DEFAULT_PROFILES_FILE_NAME = "profiles.yml"


class CosmosConfigException(Exception):
"""
Expand Down Expand Up @@ -258,6 +258,27 @@ def validate_profiles_yml(self) -> None:
if self.profiles_yml_filepath and not Path(self.profiles_yml_filepath).exists():
raise CosmosValueError(f"The file {self.profiles_yml_filepath} does not exist.")

def _get_profile_path(self, use_mock_values: bool = False) -> Path:
"""
Handle the profile caching mechanism.
Check if profile object version is exist then reuse it
Otherwise, create profile yml for requested object and return the profile path
"""
assert self.profile_mapping # To satisfy MyPy
current_profile_version = self.profile_mapping.version(self.profile_name, self.target_name, use_mock_values)
cached_profile_path = get_cached_profile(current_profile_version)
if cached_profile_path:
logger.info("Profile found in cache using profile: %s.", cached_profile_path)
return cached_profile_path
else:
profile_contents = self.profile_mapping.get_profile_file_contents(
profile_name=self.profile_name, target_name=self.target_name, use_mock_values=use_mock_values
)
profile_path = create_cache_profile(current_profile_version, profile_contents)
logger.info("Profile not found in cache storing and using profile: %s.", profile_path)
return profile_path

@contextlib.contextmanager
def ensure_profile(
self, desired_profile_path: Path | None = None, use_mock_values: bool = False
Expand All @@ -268,35 +289,40 @@ def ensure_profile(
yield Path(self.profiles_yml_filepath), {}

elif self.profile_mapping:
profile_contents = self.profile_mapping.get_profile_file_contents(
profile_name=self.profile_name, target_name=self.target_name, use_mock_values=use_mock_values
)

if use_mock_values:
env_vars = {}
else:
env_vars = self.profile_mapping.env_vars

if desired_profile_path:
logger.info(
"Writing profile to %s with the following contents:\n%s",
desired_profile_path,
profile_contents,
)
# write profile_contents to desired_profile_path using yaml library
desired_profile_path.write_text(profile_contents)
yield desired_profile_path, env_vars
if is_profile_cache_enabled():
logger.info("Profile caching is enable.")
cached_profile_path = self._get_profile_path(use_mock_values)
yield cached_profile_path, env_vars
else:
with tempfile.TemporaryDirectory() as temp_dir:
temp_file = Path(temp_dir) / DEFAULT_PROFILES_FILE_NAME
profile_contents = self.profile_mapping.get_profile_file_contents(
profile_name=self.profile_name, target_name=self.target_name, use_mock_values=use_mock_values
)

if desired_profile_path:
logger.info(
"Creating temporary profiles.yml with use_mock_values=%s at %s with the following contents:\n%s",
use_mock_values,
temp_file,
"Writing profile to %s with the following contents:\n%s",
desired_profile_path,
profile_contents,
)
temp_file.write_text(profile_contents)
yield temp_file, env_vars
# write profile_contents to desired_profile_path using yaml library
desired_profile_path.write_text(profile_contents)
yield desired_profile_path, env_vars
else:
with tempfile.TemporaryDirectory() as temp_dir:
temp_file = Path(temp_dir) / DEFAULT_PROFILES_FILE_NAME
logger.info(
"Creating temporary profiles.yml with use_mock_values=%s at %s with the following contents:\n%s",
use_mock_values,
temp_file,
profile_contents,
)
temp_file.write_text(profile_contents)
yield temp_file, env_vars


@dataclass
Expand Down
1 change: 1 addition & 0 deletions cosmos/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
DBT_DEPENDENCIES_FILE_NAMES = {"packages.yml", "dependencies.yml"}
DBT_LOG_FILENAME = "dbt.log"
DBT_BINARY_NAME = "dbt"
DEFAULT_PROFILES_FILE_NAME = "profiles.yml"

DEFAULT_OPENLINEAGE_NAMESPACE = "cosmos"
OPENLINEAGE_PRODUCER = "https://github.com/astronomer/astronomer-cosmos/"
Expand Down
19 changes: 19 additions & 0 deletions cosmos/profiles/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@

from __future__ import annotations

import hashlib
import json
import warnings
from abc import ABC, abstractmethod
from typing import TYPE_CHECKING, Any, Dict, Literal, Optional
Expand Down Expand Up @@ -82,6 +84,23 @@ def __init__(
self.dbt_config_vars = dbt_config_vars
self._validate_disable_event_tracking()

def version(self, profile_name: str, target_name: str, mock_profile: bool = False) -> str:
"""
Generate SHA-256 hash digest based on the provided profile, profile and target names.
:param profile_name: Name of the DBT profile.
:param target_name: Name of the DBT target
:param mock_profile: If True, use a mock profile.
"""
if mock_profile:
profile = self.mock_profile
else:
profile = self.profile
profile["profile_name"] = profile_name
profile["target_name"] = target_name
hash_object = hashlib.sha256(json.dumps(profile, sort_keys=True).encode())
return hash_object.hexdigest()

def _validate_profile_args(self) -> None:
"""
Check if profile_args contains keys that should not be overridden from the
Expand Down
2 changes: 2 additions & 0 deletions cosmos/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
dbt_docs_dir = conf.get("cosmos", "dbt_docs_dir", fallback=None)
dbt_docs_conn_id = conf.get("cosmos", "dbt_docs_conn_id", fallback=None)
dbt_docs_index_file_name = conf.get("cosmos", "dbt_docs_index_file_name", fallback="index.html")
enable_cache_profile = conf.getboolean("cosmos", "enable_cache_profile", fallback=True)
dbt_profile_cache_dir_name = conf.get("cosmos", "profile_cache_dir_name", fallback="profile")

try:
LINEAGE_NAMESPACE = conf.get("openlineage", "namespace")
Expand Down
14 changes: 14 additions & 0 deletions docs/configuration/caching.rst
Original file line number Diff line number Diff line change
Expand Up @@ -116,3 +116,17 @@ Users can customize where to store the cache using the setting ``AIRFLOW__COSMOS
It is possible to switch off this feature by exporting the environment variable ``AIRFLOW__COSMOS__ENABLE_CACHE_PARTIAL_PARSE=0``.

For more information, read the `Cosmos partial parsing documentation <./partial-parsing.html>`_


Caching the profiles
~~~~~~~~~~~~~~~~~~~~~~~~

(Introduced in Cosmos 1.5)

Cosmos 1.5 introduced `support to profile caching <https://github.com/astronomer/astronomer-cosmos/pull/1046>`_,
enabling caching for the profile mapping in the path specified by env ``AIRFLOW__COSMOS__CACHE_DIR`` and ``AIRFLOW__COSMOS__PROFILE_CACHE_DIR_NAME``.
This feature facilitates the reuse of Airflow connections and ``profiles.yml``.

Users have the flexibility to customize the cache storage location using the settings ``AIRFLOW__COSMOS__CACHE_DIR`` and ``AIRFLOW__COSMOS__PROFILE_CACHE_DIR_NAME``.

To disable this feature, users can set the environment variable ``AIRFLOW__COSMOS__ENABLE_CACHE_PROFILE=False``
17 changes: 17 additions & 0 deletions docs/configuration/cosmos-conf.rst
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,23 @@ This page lists all available Airflow configurations that affect ``astronomer-co
- Default: ``None``
- Environment Variable: ``AIRFLOW__COSMOS__DBT_DOCS_CONN_ID``

.. _enable_cache_profile:

`enable_cache_profile`_:
Enable caching for the DBT profile.

- Default: ``True``
- Environment Variable: ``AIRFLOW__COSMOS__ENABLE_CACHE_PROFILE``

.. _profile_cache_dir_name:

`profile_cache_dir_name`_:
Folder name to store the DBT cached profiles. This will be a sub-folder of ``cache_dir``

- Default: ``profile``
- Environment Variable: ``AIRFLOW__COSMOS__PROFILE_CACHE_DIR_NAME``


[openlineage]
~~~~~~~~~~~~~

Expand Down
15 changes: 12 additions & 3 deletions tests/dbt/test_graph.py
Original file line number Diff line number Diff line change
Expand Up @@ -405,10 +405,13 @@ def test_load(


@pytest.mark.integration
@pytest.mark.parametrize("enable_cache_profile", [True, False])
@patch("cosmos.config.is_profile_cache_enabled")
@patch("cosmos.dbt.graph.Popen")
def test_load_via_dbt_ls_does_not_create_target_logs_in_original_folder(
mock_popen, tmp_dbt_project_dir, postgres_profile_config
mock_popen, is_profile_cache_enabled, enable_cache_profile, tmp_dbt_project_dir, postgres_profile_config
):
is_profile_cache_enabled.return_value = enable_cache_profile
mock_popen().communicate.return_value = ("", "")
mock_popen().returncode = 0
assert not (tmp_dbt_project_dir / "target").exists()
Expand All @@ -427,7 +430,7 @@ def test_load_via_dbt_ls_does_not_create_target_logs_in_original_folder(
assert not (tmp_dbt_project_dir / "target").exists()
assert not (tmp_dbt_project_dir / "logs").exists()

used_cwd = Path(mock_popen.call_args[0][0][-5])
used_cwd = Path(mock_popen.call_args[0][0][5])
assert used_cwd != project_config.dbt_project_path
assert not used_cwd.exists()

Expand Down Expand Up @@ -638,7 +641,11 @@ def test_load_via_dbt_ls_without_dbt_deps_and_preinstalled_dbt_packages(


@pytest.mark.integration
def test_load_via_dbt_ls_caching_partial_parsing(tmp_dbt_project_dir, postgres_profile_config, caplog, tmp_path):
@pytest.mark.parametrize("enable_cache_profile", [True, False])
@patch("cosmos.config.is_profile_cache_enabled")
def test_load_via_dbt_ls_caching_partial_parsing(
is_profile_cache_enabled, enable_cache_profile, tmp_dbt_project_dir, postgres_profile_config, caplog, tmp_path
):
"""
When using RenderConfig.enable_mock_profile=False and defining DbtGraph.cache_dir,
Cosmos should leverage dbt partial parsing.
Expand All @@ -647,6 +654,8 @@ def test_load_via_dbt_ls_caching_partial_parsing(tmp_dbt_project_dir, postgres_p

caplog.set_level(logging.DEBUG)

is_profile_cache_enabled.return_value = enable_cache_profile

project_config = ProjectConfig(dbt_project_path=tmp_dbt_project_dir / DBT_PROJECT_NAME)
render_config = RenderConfig(
dbt_project_path=tmp_dbt_project_dir / DBT_PROJECT_NAME, dbt_deps=True, enable_mock_profile=False
Expand Down
6 changes: 6 additions & 0 deletions tests/profiles/test_base_profile.py
Original file line number Diff line number Diff line change
Expand Up @@ -162,3 +162,9 @@ def test_profile_config_validate_dbt_config_vars_check_values(dbt_config_var: st
conn_id="fake_conn_id",
dbt_config_vars=DbtProfileConfigVars(**dbt_config_vars),
)


def test_profile_version_sha_consistency():
profile_mapping = TestProfileMapping(conn_id="fake_conn_id")
version = profile_mapping.version(profile_name="dev", target_name="dev")
assert version == "ea3bf1f70b033405ba9ff9cafe65af873fd7a868cac840cdbfd5e8e9a1da9650"
Loading

0 comments on commit 2d2e7af

Please sign in to comment.