diff --git a/cosmos/cache.py b/cosmos/cache.py index fd1dd53f4..a3b29b0e5 100644 --- a/cosmos/cache.py +++ b/cosmos/cache.py @@ -19,9 +19,10 @@ from sqlalchemy.orm import Session from cosmos import settings -from cosmos.constants import DBT_MANIFEST_FILE_NAME, DBT_TARGET_DIR_NAME +from cosmos.constants import DBT_MANIFEST_FILE_NAME, DBT_TARGET_DIR_NAME, DEFAULT_PROFILES_FILE_NAME from cosmos.dbt.project import get_partial_parse_path from cosmos.log import get_logger +from cosmos.settings import cache_dir, dbt_profile_cache_dir_name, enable_cache, enable_cache_profile logger = get_logger(__name__) VAR_KEY_CACHE_PREFIX = "cosmos_cache__" @@ -346,3 +347,52 @@ def delete_unused_dbt_ls_cache( f"Deleted {deleted_cosmos_variables}/{total_cosmos_variables} Airflow Variables used to store Cosmos cache. " ) return deleted_cosmos_variables + + +def is_profile_cache_enabled() -> bool: + """Return True if global and profile cache is enable""" + return enable_cache and enable_cache_profile + + +def _get_or_create_profile_cache_dir() -> Path: + """ + Get or create the directory path for caching DBT profiles. + + - Constructs the profile cache directory path based on cache_dir and dbt_profile_cache_dir. + - Checks if the directory exists; if not, creates it + - Return profile cache directory + """ + profile_cache_dir = cache_dir / dbt_profile_cache_dir_name + if not profile_cache_dir.exists(): + profile_cache_dir.mkdir(parents=True, exist_ok=True) + return profile_cache_dir + + +def get_cached_profile(version: str) -> Path | None: + """ + Retrieve the path to a cached DBT profile YML file if it exists for the given version. + + - Constructs the DBT profile YML Path based on version and profile cache directory + - Checks if the profile YML exists + - Return the profile YML Path + """ + profile_yml_path = _get_or_create_profile_cache_dir() / version / DEFAULT_PROFILES_FILE_NAME + if profile_yml_path.exists() and profile_yml_path.is_file(): + return profile_yml_path + return None + + +def create_cache_profile(version: str, profile_content: str) -> Path: + """ + Create a cached DBT profile YAML file with the provided content for the given version. + + - Constructs the path for profile YML based on the version in the profile cache directory + - Creates the profile directory if it does not exist + - Writes the profile content to the profile YML file + - Return the profile YML Path + """ + profile_yml_dir = _get_or_create_profile_cache_dir() / version + profile_yml_dir.mkdir(parents=True, exist_ok=True) + profile_yml_path = profile_yml_dir / DEFAULT_PROFILES_FILE_NAME + profile_yml_path.write_text(profile_content) + return profile_yml_path diff --git a/cosmos/config.py b/cosmos/config.py index 5ca21709d..948d009f7 100644 --- a/cosmos/config.py +++ b/cosmos/config.py @@ -10,7 +10,9 @@ from pathlib import Path from typing import Any, Callable, Iterator +from cosmos.cache import create_cache_profile, get_cached_profile, is_profile_cache_enabled from cosmos.constants import ( + DEFAULT_PROFILES_FILE_NAME, DbtResourceType, ExecutionMode, InvocationMode, @@ -25,8 +27,6 @@ logger = get_logger(__name__) -DEFAULT_PROFILES_FILE_NAME = "profiles.yml" - class CosmosConfigException(Exception): """ @@ -258,6 +258,27 @@ def validate_profiles_yml(self) -> None: if self.profiles_yml_filepath and not Path(self.profiles_yml_filepath).exists(): raise CosmosValueError(f"The file {self.profiles_yml_filepath} does not exist.") + def _get_profile_path(self, use_mock_values: bool = False) -> Path: + """ + Handle the profile caching mechanism. + + Check if profile object version is exist then reuse it + Otherwise, create profile yml for requested object and return the profile path + """ + assert self.profile_mapping # To satisfy MyPy + current_profile_version = self.profile_mapping.version(self.profile_name, self.target_name, use_mock_values) + cached_profile_path = get_cached_profile(current_profile_version) + if cached_profile_path: + logger.info("Profile found in cache using profile: %s.", cached_profile_path) + return cached_profile_path + else: + profile_contents = self.profile_mapping.get_profile_file_contents( + profile_name=self.profile_name, target_name=self.target_name, use_mock_values=use_mock_values + ) + profile_path = create_cache_profile(current_profile_version, profile_contents) + logger.info("Profile not found in cache storing and using profile: %s.", profile_path) + return profile_path + @contextlib.contextmanager def ensure_profile( self, desired_profile_path: Path | None = None, use_mock_values: bool = False @@ -268,35 +289,40 @@ def ensure_profile( yield Path(self.profiles_yml_filepath), {} elif self.profile_mapping: - profile_contents = self.profile_mapping.get_profile_file_contents( - profile_name=self.profile_name, target_name=self.target_name, use_mock_values=use_mock_values - ) - if use_mock_values: env_vars = {} else: env_vars = self.profile_mapping.env_vars - if desired_profile_path: - logger.info( - "Writing profile to %s with the following contents:\n%s", - desired_profile_path, - profile_contents, - ) - # write profile_contents to desired_profile_path using yaml library - desired_profile_path.write_text(profile_contents) - yield desired_profile_path, env_vars + if is_profile_cache_enabled(): + logger.info("Profile caching is enable.") + cached_profile_path = self._get_profile_path(use_mock_values) + yield cached_profile_path, env_vars else: - with tempfile.TemporaryDirectory() as temp_dir: - temp_file = Path(temp_dir) / DEFAULT_PROFILES_FILE_NAME + profile_contents = self.profile_mapping.get_profile_file_contents( + profile_name=self.profile_name, target_name=self.target_name, use_mock_values=use_mock_values + ) + + if desired_profile_path: logger.info( - "Creating temporary profiles.yml with use_mock_values=%s at %s with the following contents:\n%s", - use_mock_values, - temp_file, + "Writing profile to %s with the following contents:\n%s", + desired_profile_path, profile_contents, ) - temp_file.write_text(profile_contents) - yield temp_file, env_vars + # write profile_contents to desired_profile_path using yaml library + desired_profile_path.write_text(profile_contents) + yield desired_profile_path, env_vars + else: + with tempfile.TemporaryDirectory() as temp_dir: + temp_file = Path(temp_dir) / DEFAULT_PROFILES_FILE_NAME + logger.info( + "Creating temporary profiles.yml with use_mock_values=%s at %s with the following contents:\n%s", + use_mock_values, + temp_file, + profile_contents, + ) + temp_file.write_text(profile_contents) + yield temp_file, env_vars @dataclass diff --git a/cosmos/constants.py b/cosmos/constants.py index 2a1abb20e..956660e01 100644 --- a/cosmos/constants.py +++ b/cosmos/constants.py @@ -18,6 +18,7 @@ DBT_DEPENDENCIES_FILE_NAMES = {"packages.yml", "dependencies.yml"} DBT_LOG_FILENAME = "dbt.log" DBT_BINARY_NAME = "dbt" +DEFAULT_PROFILES_FILE_NAME = "profiles.yml" DEFAULT_OPENLINEAGE_NAMESPACE = "cosmos" OPENLINEAGE_PRODUCER = "https://github.com/astronomer/astronomer-cosmos/" diff --git a/cosmos/profiles/base.py b/cosmos/profiles/base.py index 7c7b277b1..a81512dbb 100755 --- a/cosmos/profiles/base.py +++ b/cosmos/profiles/base.py @@ -5,6 +5,8 @@ from __future__ import annotations +import hashlib +import json import warnings from abc import ABC, abstractmethod from typing import TYPE_CHECKING, Any, Dict, Literal, Optional @@ -82,6 +84,23 @@ def __init__( self.dbt_config_vars = dbt_config_vars self._validate_disable_event_tracking() + def version(self, profile_name: str, target_name: str, mock_profile: bool = False) -> str: + """ + Generate SHA-256 hash digest based on the provided profile, profile and target names. + + :param profile_name: Name of the DBT profile. + :param target_name: Name of the DBT target + :param mock_profile: If True, use a mock profile. + """ + if mock_profile: + profile = self.mock_profile + else: + profile = self.profile + profile["profile_name"] = profile_name + profile["target_name"] = target_name + hash_object = hashlib.sha256(json.dumps(profile, sort_keys=True).encode()) + return hash_object.hexdigest() + def _validate_profile_args(self) -> None: """ Check if profile_args contains keys that should not be overridden from the diff --git a/cosmos/settings.py b/cosmos/settings.py index 68ed8758f..62d4ee5bd 100644 --- a/cosmos/settings.py +++ b/cosmos/settings.py @@ -17,6 +17,8 @@ dbt_docs_dir = conf.get("cosmos", "dbt_docs_dir", fallback=None) dbt_docs_conn_id = conf.get("cosmos", "dbt_docs_conn_id", fallback=None) dbt_docs_index_file_name = conf.get("cosmos", "dbt_docs_index_file_name", fallback="index.html") +enable_cache_profile = conf.getboolean("cosmos", "enable_cache_profile", fallback=True) +dbt_profile_cache_dir_name = conf.get("cosmos", "profile_cache_dir_name", fallback="profile") try: LINEAGE_NAMESPACE = conf.get("openlineage", "namespace") diff --git a/docs/configuration/caching.rst b/docs/configuration/caching.rst index b5ec155da..7dba9933f 100644 --- a/docs/configuration/caching.rst +++ b/docs/configuration/caching.rst @@ -116,3 +116,17 @@ Users can customize where to store the cache using the setting ``AIRFLOW__COSMOS It is possible to switch off this feature by exporting the environment variable ``AIRFLOW__COSMOS__ENABLE_CACHE_PARTIAL_PARSE=0``. For more information, read the `Cosmos partial parsing documentation <./partial-parsing.html>`_ + + +Caching the profiles +~~~~~~~~~~~~~~~~~~~~~~~~ + +(Introduced in Cosmos 1.5) + +Cosmos 1.5 introduced `support to profile caching `_, +enabling caching for the profile mapping in the path specified by env ``AIRFLOW__COSMOS__CACHE_DIR`` and ``AIRFLOW__COSMOS__PROFILE_CACHE_DIR_NAME``. +This feature facilitates the reuse of Airflow connections and ``profiles.yml``. + +Users have the flexibility to customize the cache storage location using the settings ``AIRFLOW__COSMOS__CACHE_DIR`` and ``AIRFLOW__COSMOS__PROFILE_CACHE_DIR_NAME``. + +To disable this feature, users can set the environment variable ``AIRFLOW__COSMOS__ENABLE_CACHE_PROFILE=False`` diff --git a/docs/configuration/cosmos-conf.rst b/docs/configuration/cosmos-conf.rst index 0c13cbb26..8dc90a5c1 100644 --- a/docs/configuration/cosmos-conf.rst +++ b/docs/configuration/cosmos-conf.rst @@ -70,6 +70,23 @@ This page lists all available Airflow configurations that affect ``astronomer-co - Default: ``None`` - Environment Variable: ``AIRFLOW__COSMOS__DBT_DOCS_CONN_ID`` +.. _enable_cache_profile: + +`enable_cache_profile`_: + Enable caching for the DBT profile. + + - Default: ``True`` + - Environment Variable: ``AIRFLOW__COSMOS__ENABLE_CACHE_PROFILE`` + +.. _profile_cache_dir_name: + +`profile_cache_dir_name`_: + Folder name to store the DBT cached profiles. This will be a sub-folder of ``cache_dir`` + + - Default: ``profile`` + - Environment Variable: ``AIRFLOW__COSMOS__PROFILE_CACHE_DIR_NAME`` + + [openlineage] ~~~~~~~~~~~~~ diff --git a/tests/dbt/test_graph.py b/tests/dbt/test_graph.py index 9e931ba8c..064d34a13 100644 --- a/tests/dbt/test_graph.py +++ b/tests/dbt/test_graph.py @@ -405,10 +405,13 @@ def test_load( @pytest.mark.integration +@pytest.mark.parametrize("enable_cache_profile", [True, False]) +@patch("cosmos.config.is_profile_cache_enabled") @patch("cosmos.dbt.graph.Popen") def test_load_via_dbt_ls_does_not_create_target_logs_in_original_folder( - mock_popen, tmp_dbt_project_dir, postgres_profile_config + mock_popen, is_profile_cache_enabled, enable_cache_profile, tmp_dbt_project_dir, postgres_profile_config ): + is_profile_cache_enabled.return_value = enable_cache_profile mock_popen().communicate.return_value = ("", "") mock_popen().returncode = 0 assert not (tmp_dbt_project_dir / "target").exists() @@ -427,7 +430,7 @@ def test_load_via_dbt_ls_does_not_create_target_logs_in_original_folder( assert not (tmp_dbt_project_dir / "target").exists() assert not (tmp_dbt_project_dir / "logs").exists() - used_cwd = Path(mock_popen.call_args[0][0][-5]) + used_cwd = Path(mock_popen.call_args[0][0][5]) assert used_cwd != project_config.dbt_project_path assert not used_cwd.exists() @@ -638,7 +641,11 @@ def test_load_via_dbt_ls_without_dbt_deps_and_preinstalled_dbt_packages( @pytest.mark.integration -def test_load_via_dbt_ls_caching_partial_parsing(tmp_dbt_project_dir, postgres_profile_config, caplog, tmp_path): +@pytest.mark.parametrize("enable_cache_profile", [True, False]) +@patch("cosmos.config.is_profile_cache_enabled") +def test_load_via_dbt_ls_caching_partial_parsing( + is_profile_cache_enabled, enable_cache_profile, tmp_dbt_project_dir, postgres_profile_config, caplog, tmp_path +): """ When using RenderConfig.enable_mock_profile=False and defining DbtGraph.cache_dir, Cosmos should leverage dbt partial parsing. @@ -647,6 +654,8 @@ def test_load_via_dbt_ls_caching_partial_parsing(tmp_dbt_project_dir, postgres_p caplog.set_level(logging.DEBUG) + is_profile_cache_enabled.return_value = enable_cache_profile + project_config = ProjectConfig(dbt_project_path=tmp_dbt_project_dir / DBT_PROJECT_NAME) render_config = RenderConfig( dbt_project_path=tmp_dbt_project_dir / DBT_PROJECT_NAME, dbt_deps=True, enable_mock_profile=False diff --git a/tests/profiles/test_base_profile.py b/tests/profiles/test_base_profile.py index 7fdbdb886..8eeb83537 100644 --- a/tests/profiles/test_base_profile.py +++ b/tests/profiles/test_base_profile.py @@ -162,3 +162,9 @@ def test_profile_config_validate_dbt_config_vars_check_values(dbt_config_var: st conn_id="fake_conn_id", dbt_config_vars=DbtProfileConfigVars(**dbt_config_vars), ) + + +def test_profile_version_sha_consistency(): + profile_mapping = TestProfileMapping(conn_id="fake_conn_id") + version = profile_mapping.version(profile_name="dev", target_name="dev") + assert version == "ea3bf1f70b033405ba9ff9cafe65af873fd7a868cac840cdbfd5e8e9a1da9650" diff --git a/tests/test_cache.py b/tests/test_cache.py index 9cd216998..9edf10f90 100644 --- a/tests/test_cache.py +++ b/tests/test_cache.py @@ -16,10 +16,15 @@ _copy_partial_parse_to_project, _create_cache_identifier, _get_latest_partial_parse, + _get_or_create_profile_cache_dir, _update_partial_parse_cache, + create_cache_profile, delete_unused_dbt_ls_cache, + get_cached_profile, + is_profile_cache_enabled, ) -from cosmos.constants import DBT_PARTIAL_PARSE_FILE_NAME, DBT_TARGET_DIR_NAME +from cosmos.constants import DBT_PARTIAL_PARSE_FILE_NAME, DBT_TARGET_DIR_NAME, DEFAULT_PROFILES_FILE_NAME +from cosmos.settings import dbt_profile_cache_dir_name START_DATE = datetime(2024, 4, 16) example_dag = DAG("dag", start_date=START_DATE) @@ -182,3 +187,100 @@ def test_delete_unused_dbt_ls_cache_deletes_all_cache_five_minutes_ago(vars_sess assert not vars_session.query(Variable).filter_by(key="cosmos_cache__dag_a").first() assert not vars_session.query(Variable).filter_by(key="cosmos_cache__dag_b").first() assert not vars_session.query(Variable).filter_by(key="cosmos_cache__dag_c__task_group_1").first() + + +@pytest.mark.parametrize( + "enable_cache, enable_cache_profile, expected_result", + [(True, True, True), (True, False, False), (False, True, False), (False, False, False)], +) +def test_is_profile_cache_enabled(enable_cache, enable_cache_profile, expected_result): + with patch("cosmos.cache.enable_cache", enable_cache), patch( + "cosmos.cache.enable_cache_profile", enable_cache_profile + ): + assert is_profile_cache_enabled() == expected_result + + +def test_get_or_create_profile_cache_dir(): + # Create a temporary directory for cache_dir + with tempfile.TemporaryDirectory() as temp_dir: + temp_dir_path = Path(temp_dir) + + # Test case 1: Directory does not exist, should create it + with patch("cosmos.cache.cache_dir", temp_dir_path): + profile_cache_dir = _get_or_create_profile_cache_dir() + expected_dir = temp_dir_path / dbt_profile_cache_dir_name + assert profile_cache_dir == expected_dir + assert expected_dir.exists() + + # Test case 2: Directory already exists, should return existing path + with patch("cosmos.cache.cache_dir", temp_dir_path): + profile_cache_dir_again = _get_or_create_profile_cache_dir() + expected_dir = temp_dir_path / dbt_profile_cache_dir_name + assert profile_cache_dir_again == expected_dir + assert expected_dir.exists() + + +def test_get_cached_profile_not_exists(): + with tempfile.TemporaryDirectory() as temp_dir: + temp_dir = Path(temp_dir) + # Mock cache_dir to use the temporary directory + with patch("cosmos.cache.cache_dir", temp_dir): + # Create a dummy profile YAML file for version 'v1' + version = "592906f650558ce1dadb75fcce84a2ec09e444441e6af6069f19204d59fe428b" + result = get_cached_profile(version) + assert result is None + + +def test_get_cached_profile(): + profile_content = """ + default: + target: dev + outputs: + dev: + type: postgres + host: localhost + user: myuser + pass: mypassword + dbname: mydatabase + """ + with tempfile.TemporaryDirectory() as temp_dir: + temp_dir = Path(temp_dir) + with patch("cosmos.cache.cache_dir", temp_dir): + # Setup DBT profile + version = "592906f650558ce1dadb75fcce84a2ec09e444441e6af6069f19204d59fe428b" + create_cache_profile(version, profile_content) + + expected_yml_path = temp_dir / dbt_profile_cache_dir_name / version / DEFAULT_PROFILES_FILE_NAME + result = get_cached_profile(version) + assert result == expected_yml_path + + +def test_create_cache_profile(): + version = "592906f650558ce1dadb75fcce84a2ec09e444441e6af6069f19204d59fe428b" + profile_content = """ + default: + target: dev + outputs: + dev: + type: postgres + host: localhost + user: myuser + pass: mypassword + dbname: mydatabase + """ + + with tempfile.TemporaryDirectory() as temp_dir: + temp_dir = Path(temp_dir) + with patch("cosmos.cache.cache_dir", temp_dir): + profile_yml_path = create_cache_profile(version, profile_content) + + expected_dir = temp_dir / dbt_profile_cache_dir_name / version + expected_path = expected_dir / DEFAULT_PROFILES_FILE_NAME + + # Check if the directory and file were created + assert expected_dir.exists() + assert expected_path.exists() + + # Check content of the created file + assert expected_path.read_text() == profile_content + assert profile_yml_path == expected_path