diff --git a/cosmos/cache.py b/cosmos/cache.py index 32a21d30d..102186423 100644 --- a/cosmos/cache.py +++ b/cosmos/cache.py @@ -5,12 +5,14 @@ import json import os import shutil +import tempfile import time from collections import defaultdict from datetime import datetime, timedelta, timezone from pathlib import Path import msgpack +import yaml from airflow.models import DagRun, Variable from airflow.models.dag import DAG from airflow.utils.session import provide_session @@ -19,10 +21,21 @@ from sqlalchemy.orm import Session from cosmos import settings -from cosmos.constants import DBT_MANIFEST_FILE_NAME, DBT_TARGET_DIR_NAME, DEFAULT_PROFILES_FILE_NAME +from cosmos.constants import ( + DBT_MANIFEST_FILE_NAME, + DBT_TARGET_DIR_NAME, + DEFAULT_PROFILES_FILE_NAME, + PACKAGE_LOCKFILE_YML, +) from cosmos.dbt.project import get_partial_parse_path from cosmos.log import get_logger -from cosmos.settings import cache_dir, dbt_profile_cache_dir_name, enable_cache, enable_cache_profile +from cosmos.settings import ( + cache_dir, + dbt_profile_cache_dir_name, + enable_cache, + enable_cache_package_lockfile, + enable_cache_profile, +) logger = get_logger(__name__) VAR_KEY_CACHE_PREFIX = "cosmos_cache__" @@ -400,3 +413,69 @@ def create_cache_profile(version: str, profile_content: str) -> Path: profile_yml_path = profile_yml_dir / DEFAULT_PROFILES_FILE_NAME profile_yml_path.write_text(profile_content) return profile_yml_path + + +def is_cache_package_lockfile_enabled(project_dir: Path) -> bool: + if not enable_cache_package_lockfile: + return False + package_lockfile = project_dir / PACKAGE_LOCKFILE_YML + return package_lockfile.is_file() + + +def _get_sha1_hash(yaml_file: Path) -> str: + """Read package-lock.yml file and return sha1_hash""" + with open(yaml_file) as file: + yaml_content = file.read() + data = yaml.safe_load(yaml_content) + sha1_hash: str = data.get("sha1_hash", "") + return sha1_hash + + +def _get_latest_cached_package_lockfile(project_dir: Path) -> Path | None: + """ + Retrieves the latest cached package-lock.yml for the specified project directory, + or creates and caches it if not already cached and hashes match. + """ + cache_identifier = project_dir.name + package_lockfile = project_dir / PACKAGE_LOCKFILE_YML + cached_package_lockfile = cache_dir / cache_identifier / PACKAGE_LOCKFILE_YML + + if cached_package_lockfile.exists() and cached_package_lockfile.is_file(): + project_sha1_hash = _get_sha1_hash(package_lockfile) + cached_sha1_hash = _get_sha1_hash(cached_package_lockfile) + if project_sha1_hash == cached_sha1_hash: + return cached_package_lockfile + cached_lockfile_dir = cache_dir / cache_identifier + cached_lockfile_dir.mkdir(parents=True, exist_ok=True) + _safe_copy(package_lockfile, cached_package_lockfile) + return cached_package_lockfile + + +def _copy_cached_package_lockfile_to_project(cached_package_lockfile: Path, project_dir: Path) -> None: + """Copy the cached package-lock.yml to tmp project dir""" + package_lockfile = project_dir / PACKAGE_LOCKFILE_YML + _safe_copy(cached_package_lockfile, package_lockfile) + + +# TODO: Move this function to a different location +def _safe_copy(src: Path, dst: Path) -> None: + """ + Safely copies a file from a source path to a destination path. + + This function ensures that the copy operation is atomic by first + copying the file to a temporary file in the same directory as the + destination and then renaming the temporary file to the destination + file. This approach minimizes the risk of file corruption or partial + writes in case of a failure or interruption during the copy process. + + See the blog for atomic file operations: + https://alexwlchan.net/2019/atomic-cross-filesystem-moves-in-python/ + """ + # Create a temporary file in the same directory as the destination + dir_name, base_name = os.path.split(dst) + temp_fd, temp_path = tempfile.mkstemp(dir=dir_name) + + shutil.copyfile(src, temp_path) + + # Rename the temporary file to the destination file + os.rename(temp_path, dst) diff --git a/cosmos/constants.py b/cosmos/constants.py index cc9841ed6..25b33f28a 100644 --- a/cosmos/constants.py +++ b/cosmos/constants.py @@ -20,6 +20,7 @@ DBT_LOG_FILENAME = "dbt.log" DBT_BINARY_NAME = "dbt" DEFAULT_PROFILES_FILE_NAME = "profiles.yml" +PACKAGE_LOCKFILE_YML = "package-lock.yml" DEFAULT_OPENLINEAGE_NAMESPACE = "cosmos" OPENLINEAGE_PRODUCER = "https://github.com/astronomer/astronomer-cosmos/" diff --git a/cosmos/dbt/graph.py b/cosmos/dbt/graph.py index 68105eb21..5240a8ed6 100644 --- a/cosmos/dbt/graph.py +++ b/cosmos/dbt/graph.py @@ -18,6 +18,11 @@ from airflow.models import Variable from cosmos import cache, settings +from cosmos.cache import ( + _copy_cached_package_lockfile_to_project, + _get_latest_cached_package_lockfile, + is_cache_package_lockfile_enabled, +) from cosmos.config import ExecutionConfig, ProfileConfig, ProjectConfig, RenderConfig from cosmos.constants import ( DBT_LOG_DIR_NAME, @@ -487,6 +492,10 @@ def load_via_dbt_ls_without_cache(self) -> None: env[DBT_TARGET_PATH_ENVVAR] = str(self.target_dir) if self.render_config.dbt_deps and has_non_empty_dependencies_file(self.project_path): + if is_cache_package_lockfile_enabled(project_path): + latest_package_lockfile = _get_latest_cached_package_lockfile(project_path) + if latest_package_lockfile: + _copy_cached_package_lockfile_to_project(latest_package_lockfile, tmpdir_path) self.run_dbt_deps(dbt_cmd, tmpdir_path, env) nodes = self.run_dbt_ls(dbt_cmd, self.project_path, tmpdir_path, env) diff --git a/cosmos/dbt/project.py b/cosmos/dbt/project.py index 987d10f3a..2c9f9743a 100644 --- a/cosmos/dbt/project.py +++ b/cosmos/dbt/project.py @@ -10,6 +10,7 @@ DBT_LOG_DIR_NAME, DBT_PARTIAL_PARSE_FILE_NAME, DBT_TARGET_DIR_NAME, + PACKAGE_LOCKFILE_YML, ) from cosmos.log import get_logger @@ -38,7 +39,7 @@ def has_non_empty_dependencies_file(project_path: Path) -> bool: def create_symlinks(project_path: Path, tmp_dir: Path, ignore_dbt_packages: bool) -> None: """Helper function to create symlinks to the dbt project files.""" - ignore_paths = [DBT_LOG_DIR_NAME, DBT_TARGET_DIR_NAME, "profiles.yml"] + ignore_paths = [DBT_LOG_DIR_NAME, DBT_TARGET_DIR_NAME, PACKAGE_LOCKFILE_YML, "profiles.yml"] if ignore_dbt_packages: # this is linked to dbt deps so if dbt deps is true then ignore existing dbt_packages folder ignore_paths.append("dbt_packages") diff --git a/cosmos/operators/local.py b/cosmos/operators/local.py index 4acaa9453..8a4ea1ba2 100644 --- a/cosmos/operators/local.py +++ b/cosmos/operators/local.py @@ -17,6 +17,11 @@ from attr import define from cosmos import cache +from cosmos.cache import ( + _copy_cached_package_lockfile_to_project, + _get_latest_cached_package_lockfile, + is_cache_package_lockfile_enabled, +) from cosmos.constants import InvocationMode from cosmos.dbt.project import get_partial_parse_path, has_non_empty_dependencies_file from cosmos.exceptions import AirflowCompatibilityError @@ -275,6 +280,13 @@ def run_dbt_runner(self, command: list[str], env: dict[str, str], cwd: str) -> d return result + def _cache_package_lockfile(self, tmp_project_dir: Path) -> None: + project_dir = Path(self.project_dir) + if is_cache_package_lockfile_enabled(project_dir): + latest_package_lockfile = _get_latest_cached_package_lockfile(project_dir) + if latest_package_lockfile: + _copy_cached_package_lockfile_to_project(latest_package_lockfile, tmp_project_dir) + def run_command( self, cmd: list[str], @@ -320,6 +332,7 @@ def run_command( ] if self.install_deps: + self._cache_package_lockfile(tmp_dir_path) deps_command = [self.dbt_executable_path, "deps"] deps_command.extend(flags) self.invoke_dbt( diff --git a/cosmos/settings.py b/cosmos/settings.py index 67d5928d9..71387de6e 100644 --- a/cosmos/settings.py +++ b/cosmos/settings.py @@ -14,6 +14,7 @@ cache_dir = Path(conf.get("cosmos", "cache_dir", fallback=DEFAULT_CACHE_DIR) or DEFAULT_CACHE_DIR) enable_cache = conf.getboolean("cosmos", "enable_cache", fallback=True) enable_cache_partial_parse = conf.getboolean("cosmos", "enable_cache_partial_parse", fallback=True) +enable_cache_package_lockfile = conf.getboolean("cosmos", "enable_cache_package_lockfile", fallback=True) enable_cache_dbt_ls = conf.getboolean("cosmos", "enable_cache_dbt_ls", fallback=True) propagate_logs = conf.getboolean("cosmos", "propagate_logs", fallback=True) dbt_docs_dir = conf.get("cosmos", "dbt_docs_dir", fallback=None) diff --git a/dev/dags/dbt/jaffle_shop/package-lock.yml b/dev/dags/dbt/jaffle_shop/package-lock.yml new file mode 100644 index 000000000..669f9637d --- /dev/null +++ b/dev/dags/dbt/jaffle_shop/package-lock.yml @@ -0,0 +1,4 @@ +packages: + - package: dbt-labs/dbt_utils + version: 1.1.1 +sha1_hash: a158c48c59c2bb7d729d2a4e215aabe5bb4f3353 diff --git a/docs/configuration/cosmos-conf.rst b/docs/configuration/cosmos-conf.rst index 8dc90a5c1..037a43d3b 100644 --- a/docs/configuration/cosmos-conf.rst +++ b/docs/configuration/cosmos-conf.rst @@ -46,6 +46,14 @@ This page lists all available Airflow configurations that affect ``astronomer-co - Default: ``True`` - Environment Variable: ``AIRFLOW__COSMOS__ENABLE_CACHE_PARTIAL_PARSE`` +.. _enable_cache_package_lockfile: + +`enable_cache_package_lockfile`_: + Enable or disable caching of dbt project package lockfile. + + - Default: ``True`` + - Environment Variable: ``AIRFLOW__COSMOS__ENABLE_CACHE_PACKAGE_LOCKFILE`` + .. _propagate_logs: `propagate_logs`_: diff --git a/tests/dbt/test_graph.py b/tests/dbt/test_graph.py index 7483c768c..0e7b0b05a 100644 --- a/tests/dbt/test_graph.py +++ b/tests/dbt/test_graph.py @@ -1422,9 +1422,9 @@ def test_save_dbt_ls_cache(mock_variable_set, mock_datetime, tmp_dbt_project_dir hash_dir, hash_args = version.split(",") assert hash_args == "d41d8cd98f00b204e9800998ecf8427e" if sys.platform == "darwin": - assert hash_dir == "18b97e2bff2684161f71db817f1f50e2" + assert hash_dir == "65595448aded2c2b52878a801c1d9c59" else: - assert hash_dir == "6c662da10b64a8390c469c884af88321" + assert hash_dir == "4c826c84a94b0f1f5508c4e425170677" @pytest.mark.integration diff --git a/tests/test_cache.py b/tests/test_cache.py index cc1d11b21..b9ab087a7 100644 --- a/tests/test_cache.py +++ b/tests/test_cache.py @@ -16,12 +16,15 @@ _copy_partial_parse_to_project, _create_cache_identifier, _create_folder_version_hash, + _get_latest_cached_package_lockfile, _get_latest_partial_parse, _get_or_create_profile_cache_dir, + _get_sha1_hash, _update_partial_parse_cache, create_cache_profile, delete_unused_dbt_ls_cache, get_cached_profile, + is_cache_package_lockfile_enabled, is_profile_cache_enabled, ) from cosmos.constants import DBT_PARTIAL_PARSE_FILE_NAME, DBT_TARGET_DIR_NAME, DEFAULT_PROFILES_FILE_NAME @@ -317,3 +320,89 @@ def test_create_cache_profile(): # Check content of the created file assert expected_path.read_text() == profile_content assert profile_yml_path == expected_path + + +@patch("pathlib.Path.is_file") +def test_cache_package_lockfile_enabled(mock_path_is_file): + # Mocking the return value of Path.is_file() + mock_path_is_file.return_value = True + + # Test case where lockfile exists + project_dir = Path("/path/to/your/project") + result = is_cache_package_lockfile_enabled(project_dir) + assert result is True + + # Test case where lockfile doesn't exist + mock_path_is_file.return_value = False + result = is_cache_package_lockfile_enabled(project_dir) + assert result is False + + +@pytest.fixture +def mock_package_lockfile(): + # Create a temporary YAML file with test data + yaml_data = """ + packages: + - package: dbt-labs/dbt_utils + version: 1.1.1 + sha1_hash: a158c48c59c2bb7d729d2a4e215aabe5bb4f3353 + """ + tmp_file = Path("test_package-lock.yml") + with open(tmp_file, "w") as file: + file.write(yaml_data) + yield tmp_file + # Clean up: delete the temporary file after the test + tmp_file.unlink() + + +def test_get_sha1_hash(): + profile_lock_content = """ + packages: + - package: dbt-labs/dbt_utils + version: 1.1.1 + sha1_hash: a158c48c59c2bb7d729d2a4e215aabe5bb4f3353 + """ + tmp_file = Path("package-lock.yml") + with open(tmp_file, "w") as file: + file.write(profile_lock_content) + + sha1_hash = _get_sha1_hash(tmp_file) + assert sha1_hash == "a158c48c59c2bb7d729d2a4e215aabe5bb4f3353" + tmp_file.unlink() + + +def _test_tmp_dir(dir_name: str): + # Create a temporary directory for cache_dir + with tempfile.TemporaryDirectory() as temp_dir: + cache_dir = Path(temp_dir) / "test_cache" + cache_dir.mkdir() + return cache_dir + + +@patch("cosmos.cache.cache_dir") +@patch("cosmos.cache._get_sha1_hash") +def test_get_latest_cached_package_lockfile_with_cache(mock_get_sha, cache_dir): + # Create a fake cached lockfile + project_dir = _test_tmp_dir("test_project") + cache_dir.return_value = _test_tmp_dir("test_cache") + cache_identifier = project_dir.name + cached_profile_lockfile = cache_dir / cache_identifier / "package-lock.yml" + cached_profile_lockfile.parent.mkdir(parents=True, exist_ok=True) + cached_profile_lockfile.touch() + + # Test case where there is a cached file + result = _get_latest_cached_package_lockfile(project_dir) + assert result == cached_profile_lockfile + assert cached_profile_lockfile.exists() + + +@patch("cosmos.cache._get_sha1_hash") +def test_get_latest_cached_lockfile_with_no_cache(mock_get_sha): + project_dir = _test_tmp_dir("test_project") + project_package_lockfile = project_dir / "package-lock.yml" + project_package_lockfile.parent.mkdir(parents=True, exist_ok=True) + project_package_lockfile.touch() + + # Test case where there is a cached file + result = _get_latest_cached_package_lockfile(project_dir) + assert result.exists()