Skip to content

Commit

Permalink
Cache package-lock.yml file (#1086)
Browse files Browse the repository at this point in the history
This PR aims to cache the package-lock.yml in `cache_dir/dbt_project`

Since dbt version 1.7.0, executing the dbt deps command results in the
generation of a package-lock.yml file. This file pins the dependencies
and their versions for the dbt project. dbt uses this file to install
packages, ensuring predictable and consistent package installations
across environments.

- This feature is enabled only if the user checks in package-lock.yml in
their dbt project. Also, I'm assuming if `package-lock.yml` their
dbt-core version is >= 1.7.0 since this feature is available for only
dbt >= 1.7.0
- package-lock.yml also contains the sha1_hash of the packages. This is
used to check if the cached package-lock.yml is outdated or not in this
PR
- The cached `package-lock.yml` is finally copied from from cached path
to the tmp project and used
- To update dependencies or versions, it is expected that the user will
manually update their package-lock.yml in the dbt project using the dbt
deps command.


closes: #930
  • Loading branch information
pankajastro committed Aug 8, 2024
1 parent 711bb7c commit e847f19
Show file tree
Hide file tree
Showing 10 changed files with 210 additions and 5 deletions.
83 changes: 81 additions & 2 deletions cosmos/cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,14 @@
import json
import os
import shutil
import tempfile
import time
from collections import defaultdict
from datetime import datetime, timedelta, timezone
from pathlib import Path

import msgpack
import yaml
from airflow.models import DagRun, Variable
from airflow.models.dag import DAG
from airflow.utils.session import provide_session
Expand All @@ -19,10 +21,21 @@
from sqlalchemy.orm import Session

from cosmos import settings
from cosmos.constants import DBT_MANIFEST_FILE_NAME, DBT_TARGET_DIR_NAME, DEFAULT_PROFILES_FILE_NAME
from cosmos.constants import (
DBT_MANIFEST_FILE_NAME,
DBT_TARGET_DIR_NAME,
DEFAULT_PROFILES_FILE_NAME,
PACKAGE_LOCKFILE_YML,
)
from cosmos.dbt.project import get_partial_parse_path
from cosmos.log import get_logger
from cosmos.settings import cache_dir, dbt_profile_cache_dir_name, enable_cache, enable_cache_profile
from cosmos.settings import (
cache_dir,
dbt_profile_cache_dir_name,
enable_cache,
enable_cache_package_lockfile,
enable_cache_profile,
)

logger = get_logger(__name__)
VAR_KEY_CACHE_PREFIX = "cosmos_cache__"
Expand Down Expand Up @@ -400,3 +413,69 @@ def create_cache_profile(version: str, profile_content: str) -> Path:
profile_yml_path = profile_yml_dir / DEFAULT_PROFILES_FILE_NAME
profile_yml_path.write_text(profile_content)
return profile_yml_path


def is_cache_package_lockfile_enabled(project_dir: Path) -> bool:
if not enable_cache_package_lockfile:
return False
package_lockfile = project_dir / PACKAGE_LOCKFILE_YML
return package_lockfile.is_file()


def _get_sha1_hash(yaml_file: Path) -> str:
"""Read package-lock.yml file and return sha1_hash"""
with open(yaml_file) as file:
yaml_content = file.read()
data = yaml.safe_load(yaml_content)
sha1_hash: str = data.get("sha1_hash", "")
return sha1_hash


def _get_latest_cached_package_lockfile(project_dir: Path) -> Path | None:
"""
Retrieves the latest cached package-lock.yml for the specified project directory,
or creates and caches it if not already cached and hashes match.
"""
cache_identifier = project_dir.name
package_lockfile = project_dir / PACKAGE_LOCKFILE_YML
cached_package_lockfile = cache_dir / cache_identifier / PACKAGE_LOCKFILE_YML

if cached_package_lockfile.exists() and cached_package_lockfile.is_file():
project_sha1_hash = _get_sha1_hash(package_lockfile)
cached_sha1_hash = _get_sha1_hash(cached_package_lockfile)
if project_sha1_hash == cached_sha1_hash:
return cached_package_lockfile
cached_lockfile_dir = cache_dir / cache_identifier
cached_lockfile_dir.mkdir(parents=True, exist_ok=True)
_safe_copy(package_lockfile, cached_package_lockfile)
return cached_package_lockfile


def _copy_cached_package_lockfile_to_project(cached_package_lockfile: Path, project_dir: Path) -> None:
"""Copy the cached package-lock.yml to tmp project dir"""
package_lockfile = project_dir / PACKAGE_LOCKFILE_YML
_safe_copy(cached_package_lockfile, package_lockfile)


# TODO: Move this function to a different location
def _safe_copy(src: Path, dst: Path) -> None:
"""
Safely copies a file from a source path to a destination path.
This function ensures that the copy operation is atomic by first
copying the file to a temporary file in the same directory as the
destination and then renaming the temporary file to the destination
file. This approach minimizes the risk of file corruption or partial
writes in case of a failure or interruption during the copy process.
See the blog for atomic file operations:
https://alexwlchan.net/2019/atomic-cross-filesystem-moves-in-python/
"""
# Create a temporary file in the same directory as the destination
dir_name, base_name = os.path.split(dst)
temp_fd, temp_path = tempfile.mkstemp(dir=dir_name)

shutil.copyfile(src, temp_path)

# Rename the temporary file to the destination file
os.rename(temp_path, dst)
1 change: 1 addition & 0 deletions cosmos/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
DBT_LOG_FILENAME = "dbt.log"
DBT_BINARY_NAME = "dbt"
DEFAULT_PROFILES_FILE_NAME = "profiles.yml"
PACKAGE_LOCKFILE_YML = "package-lock.yml"

DEFAULT_OPENLINEAGE_NAMESPACE = "cosmos"
OPENLINEAGE_PRODUCER = "https://github.com/astronomer/astronomer-cosmos/"
Expand Down
9 changes: 9 additions & 0 deletions cosmos/dbt/graph.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,11 @@
from airflow.models import Variable

from cosmos import cache, settings
from cosmos.cache import (
_copy_cached_package_lockfile_to_project,
_get_latest_cached_package_lockfile,
is_cache_package_lockfile_enabled,
)
from cosmos.config import ExecutionConfig, ProfileConfig, ProjectConfig, RenderConfig
from cosmos.constants import (
DBT_LOG_DIR_NAME,
Expand Down Expand Up @@ -487,6 +492,10 @@ def load_via_dbt_ls_without_cache(self) -> None:
env[DBT_TARGET_PATH_ENVVAR] = str(self.target_dir)

if self.render_config.dbt_deps and has_non_empty_dependencies_file(self.project_path):
if is_cache_package_lockfile_enabled(project_path):
latest_package_lockfile = _get_latest_cached_package_lockfile(project_path)
if latest_package_lockfile:
_copy_cached_package_lockfile_to_project(latest_package_lockfile, tmpdir_path)
self.run_dbt_deps(dbt_cmd, tmpdir_path, env)

nodes = self.run_dbt_ls(dbt_cmd, self.project_path, tmpdir_path, env)
Expand Down
3 changes: 2 additions & 1 deletion cosmos/dbt/project.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
DBT_LOG_DIR_NAME,
DBT_PARTIAL_PARSE_FILE_NAME,
DBT_TARGET_DIR_NAME,
PACKAGE_LOCKFILE_YML,
)
from cosmos.log import get_logger

Expand Down Expand Up @@ -38,7 +39,7 @@ def has_non_empty_dependencies_file(project_path: Path) -> bool:

def create_symlinks(project_path: Path, tmp_dir: Path, ignore_dbt_packages: bool) -> None:
"""Helper function to create symlinks to the dbt project files."""
ignore_paths = [DBT_LOG_DIR_NAME, DBT_TARGET_DIR_NAME, "profiles.yml"]
ignore_paths = [DBT_LOG_DIR_NAME, DBT_TARGET_DIR_NAME, PACKAGE_LOCKFILE_YML, "profiles.yml"]
if ignore_dbt_packages:
# this is linked to dbt deps so if dbt deps is true then ignore existing dbt_packages folder
ignore_paths.append("dbt_packages")
Expand Down
13 changes: 13 additions & 0 deletions cosmos/operators/local.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,11 @@
from attr import define

from cosmos import cache
from cosmos.cache import (
_copy_cached_package_lockfile_to_project,
_get_latest_cached_package_lockfile,
is_cache_package_lockfile_enabled,
)
from cosmos.constants import InvocationMode
from cosmos.dbt.project import get_partial_parse_path, has_non_empty_dependencies_file
from cosmos.exceptions import AirflowCompatibilityError
Expand Down Expand Up @@ -275,6 +280,13 @@ def run_dbt_runner(self, command: list[str], env: dict[str, str], cwd: str) -> d

return result

def _cache_package_lockfile(self, tmp_project_dir: Path) -> None:
project_dir = Path(self.project_dir)
if is_cache_package_lockfile_enabled(project_dir):
latest_package_lockfile = _get_latest_cached_package_lockfile(project_dir)
if latest_package_lockfile:
_copy_cached_package_lockfile_to_project(latest_package_lockfile, tmp_project_dir)

def run_command(
self,
cmd: list[str],
Expand Down Expand Up @@ -320,6 +332,7 @@ def run_command(
]

if self.install_deps:
self._cache_package_lockfile(tmp_dir_path)
deps_command = [self.dbt_executable_path, "deps"]
deps_command.extend(flags)
self.invoke_dbt(
Expand Down
1 change: 1 addition & 0 deletions cosmos/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
cache_dir = Path(conf.get("cosmos", "cache_dir", fallback=DEFAULT_CACHE_DIR) or DEFAULT_CACHE_DIR)
enable_cache = conf.getboolean("cosmos", "enable_cache", fallback=True)
enable_cache_partial_parse = conf.getboolean("cosmos", "enable_cache_partial_parse", fallback=True)
enable_cache_package_lockfile = conf.getboolean("cosmos", "enable_cache_package_lockfile", fallback=True)
enable_cache_dbt_ls = conf.getboolean("cosmos", "enable_cache_dbt_ls", fallback=True)
propagate_logs = conf.getboolean("cosmos", "propagate_logs", fallback=True)
dbt_docs_dir = conf.get("cosmos", "dbt_docs_dir", fallback=None)
Expand Down
4 changes: 4 additions & 0 deletions dev/dags/dbt/jaffle_shop/package-lock.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
packages:
- package: dbt-labs/dbt_utils
version: 1.1.1
sha1_hash: a158c48c59c2bb7d729d2a4e215aabe5bb4f3353
8 changes: 8 additions & 0 deletions docs/configuration/cosmos-conf.rst
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,14 @@ This page lists all available Airflow configurations that affect ``astronomer-co
- Default: ``True``
- Environment Variable: ``AIRFLOW__COSMOS__ENABLE_CACHE_PARTIAL_PARSE``

.. _enable_cache_package_lockfile:

`enable_cache_package_lockfile`_:
Enable or disable caching of dbt project package lockfile.

- Default: ``True``
- Environment Variable: ``AIRFLOW__COSMOS__ENABLE_CACHE_PACKAGE_LOCKFILE``

.. _propagate_logs:

`propagate_logs`_:
Expand Down
4 changes: 2 additions & 2 deletions tests/dbt/test_graph.py
Original file line number Diff line number Diff line change
Expand Up @@ -1422,9 +1422,9 @@ def test_save_dbt_ls_cache(mock_variable_set, mock_datetime, tmp_dbt_project_dir
hash_dir, hash_args = version.split(",")
assert hash_args == "d41d8cd98f00b204e9800998ecf8427e"
if sys.platform == "darwin":
assert hash_dir == "18b97e2bff2684161f71db817f1f50e2"
assert hash_dir == "65595448aded2c2b52878a801c1d9c59"
else:
assert hash_dir == "6c662da10b64a8390c469c884af88321"
assert hash_dir == "4c826c84a94b0f1f5508c4e425170677"


@pytest.mark.integration
Expand Down
89 changes: 89 additions & 0 deletions tests/test_cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,15 @@
_copy_partial_parse_to_project,
_create_cache_identifier,
_create_folder_version_hash,
_get_latest_cached_package_lockfile,
_get_latest_partial_parse,
_get_or_create_profile_cache_dir,
_get_sha1_hash,
_update_partial_parse_cache,
create_cache_profile,
delete_unused_dbt_ls_cache,
get_cached_profile,
is_cache_package_lockfile_enabled,
is_profile_cache_enabled,
)
from cosmos.constants import DBT_PARTIAL_PARSE_FILE_NAME, DBT_TARGET_DIR_NAME, DEFAULT_PROFILES_FILE_NAME
Expand Down Expand Up @@ -317,3 +320,89 @@ def test_create_cache_profile():
# Check content of the created file
assert expected_path.read_text() == profile_content
assert profile_yml_path == expected_path


@patch("pathlib.Path.is_file")
def test_cache_package_lockfile_enabled(mock_path_is_file):
# Mocking the return value of Path.is_file()
mock_path_is_file.return_value = True

# Test case where lockfile exists
project_dir = Path("/path/to/your/project")
result = is_cache_package_lockfile_enabled(project_dir)
assert result is True

# Test case where lockfile doesn't exist
mock_path_is_file.return_value = False
result = is_cache_package_lockfile_enabled(project_dir)
assert result is False


@pytest.fixture
def mock_package_lockfile():
# Create a temporary YAML file with test data
yaml_data = """
packages:
- package: dbt-labs/dbt_utils
version: 1.1.1
sha1_hash: a158c48c59c2bb7d729d2a4e215aabe5bb4f3353
"""
tmp_file = Path("test_package-lock.yml")
with open(tmp_file, "w") as file:
file.write(yaml_data)
yield tmp_file
# Clean up: delete the temporary file after the test
tmp_file.unlink()


def test_get_sha1_hash():
profile_lock_content = """
packages:
- package: dbt-labs/dbt_utils
version: 1.1.1
sha1_hash: a158c48c59c2bb7d729d2a4e215aabe5bb4f3353
"""
tmp_file = Path("package-lock.yml")
with open(tmp_file, "w") as file:
file.write(profile_lock_content)

sha1_hash = _get_sha1_hash(tmp_file)
assert sha1_hash == "a158c48c59c2bb7d729d2a4e215aabe5bb4f3353"
tmp_file.unlink()


def _test_tmp_dir(dir_name: str):
# Create a temporary directory for cache_dir
with tempfile.TemporaryDirectory() as temp_dir:
cache_dir = Path(temp_dir) / "test_cache"
cache_dir.mkdir()
return cache_dir


@patch("cosmos.cache.cache_dir")
@patch("cosmos.cache._get_sha1_hash")
def test_get_latest_cached_package_lockfile_with_cache(mock_get_sha, cache_dir):
# Create a fake cached lockfile
project_dir = _test_tmp_dir("test_project")
cache_dir.return_value = _test_tmp_dir("test_cache")
cache_identifier = project_dir.name
cached_profile_lockfile = cache_dir / cache_identifier / "package-lock.yml"
cached_profile_lockfile.parent.mkdir(parents=True, exist_ok=True)
cached_profile_lockfile.touch()

# Test case where there is a cached file
result = _get_latest_cached_package_lockfile(project_dir)
assert result == cached_profile_lockfile
assert cached_profile_lockfile.exists()


@patch("cosmos.cache._get_sha1_hash")
def test_get_latest_cached_lockfile_with_no_cache(mock_get_sha):
project_dir = _test_tmp_dir("test_project")
project_package_lockfile = project_dir / "package-lock.yml"
project_package_lockfile.parent.mkdir(parents=True, exist_ok=True)
project_package_lockfile.touch()

# Test case where there is a cached file
result = _get_latest_cached_package_lockfile(project_dir)
assert result.exists()

0 comments on commit e847f19

Please sign in to comment.