Skip to content

Commit

Permalink
Configure remote_cache_path in ProjectConfig and use that
Browse files Browse the repository at this point in the history
  • Loading branch information
pankajkoti committed Aug 10, 2024
1 parent ffa28d5 commit 0f5e28b
Show file tree
Hide file tree
Showing 3 changed files with 38 additions and 38 deletions.
32 changes: 31 additions & 1 deletion cosmos/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,8 @@
from cosmos.exceptions import CosmosValueError
from cosmos.log import get_logger
from cosmos.profiles import BaseProfileMapping
from cosmos.settings import AIRFLOW_IO_AVAILABLE
from cosmos.settings import AIRFLOW_IO_AVAILABLE, remote_cache_conn_id
from cosmos.settings import remote_cache_path as settings_remote_cache_path

logger = get_logger(__name__)

Expand Down Expand Up @@ -146,6 +147,7 @@ class ProjectConfig:
seeds_path: Path | None = None
snapshots_path: Path | None = None
project_name: str
remote_cache_path: Path | None = None

def __init__(
self,
Expand Down Expand Up @@ -203,6 +205,34 @@ def __init__(
self.env_vars = env_vars
self.dbt_vars = dbt_vars
self.partial_parse = partial_parse
self.remote_cache_path = self._configure_remote_cache_path()

@staticmethod
def _configure_remote_cache_path() -> Path | None:
"""Configure the remote cache path if it is provided."""
cache_path = None

if settings_remote_cache_path and not AIRFLOW_IO_AVAILABLE:
raise CosmosValueError(
f"You're trying to specify dbt_ls_cache_remote_path {settings_remote_cache_path}, but the required "
f"Object Storage feature is unavailable in Airflow version {airflow_version}. Please upgrade to "
"Airflow 2.8 or later."
)
elif settings_remote_cache_path:
from airflow.io.path import ObjectStoragePath

remote_conn_id = remote_cache_conn_id or FILE_SCHEME_AIRFLOW_DEFAULT_CONN_ID_MAP.get(
settings_remote_cache_path.split("://")[0], None
)

cache_path = ObjectStoragePath(settings_remote_cache_path, conn_id=remote_conn_id)
if not cache_path.exists(): # type: ignore[no-untyped-call]
raise CosmosValueError(
f"remote_cache_path `{settings_remote_cache_path}` does not exist or is not accessible using "
f"remote_cache_conn_id `{remote_conn_id}`"
)

return cache_path

def validate_project(self) -> None:
"""
Expand Down
8 changes: 4 additions & 4 deletions cosmos/dbt/graph.py
Original file line number Diff line number Diff line change
Expand Up @@ -281,8 +281,8 @@ def save_dbt_ls_cache(self, dbt_ls_output: str) -> None:
"last_modified": datetime.datetime.now(datetime.timezone.utc).isoformat(),
**self.airflow_metadata,
}
if settings.remote_cache_path:
remote_cache_key_path = settings.remote_cache_path / self.dbt_ls_cache_key / "dbt_ls_cache.json"
if self.project.remote_cache_path:
remote_cache_key_path = self.project.remote_cache_path / self.dbt_ls_cache_key / "dbt_ls_cache.json"
with remote_cache_key_path.open("w") as fp:
json.dump(cache_dict, fp)
else:
Expand All @@ -291,9 +291,9 @@ def save_dbt_ls_cache(self, dbt_ls_output: str) -> None:
def _get_dbt_ls_remote_cache(self) -> dict[str, str]:
"""Loads the remote cache for dbt ls."""
cache_dict: dict[str, str] = {}
if settings.remote_cache_path is None:
if self.project.remote_cache_path is None:
return cache_dict
remote_cache_key_path = settings.remote_cache_path / self.dbt_ls_cache_key / "dbt_ls_cache.json"
remote_cache_key_path = self.project.remote_cache_path / self.dbt_ls_cache_key / "dbt_ls_cache.json"
if remote_cache_key_path.exists():
with remote_cache_key_path.open("r") as fp:
cache_dict = json.load(fp)
Expand Down
36 changes: 3 additions & 33 deletions cosmos/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,7 @@
from cosmos.constants import (
DEFAULT_COSMOS_CACHE_DIR_NAME,
DEFAULT_OPENLINEAGE_NAMESPACE,
FILE_SCHEME_AIRFLOW_DEFAULT_CONN_ID_MAP,
)
from cosmos.exceptions import CosmosValueError

# In MacOS users may want to set the envvar `TMPDIR` if they do not want the value of the temp directory to change
DEFAULT_CACHE_DIR = Path(tempfile.gettempdir(), DEFAULT_COSMOS_CACHE_DIR_NAME)
Expand All @@ -30,40 +28,12 @@
enable_cache_profile = conf.getboolean("cosmos", "enable_cache_profile", fallback=True)
dbt_profile_cache_dir_name = conf.get("cosmos", "profile_cache_dir_name", fallback="profile")

remote_cache_path = conf.get("cosmos", "remote_cache_path", fallback=None)
remote_cache_conn_id = conf.get("cosmos", "remote_cache_conn_id", fallback=None)

try:
LINEAGE_NAMESPACE = conf.get("openlineage", "namespace")
except airflow.exceptions.AirflowConfigException:
LINEAGE_NAMESPACE = os.getenv("OPENLINEAGE_NAMESPACE", DEFAULT_OPENLINEAGE_NAMESPACE)

AIRFLOW_IO_AVAILABLE = Version(airflow_version) >= Version("2.8.0")


def _configure_remote_cache_path() -> Path | None:
remote_cache_path_str = str(conf.get("cosmos", "remote_cache_path", fallback=""))
remote_cache_conn_id = str(conf.get("cosmos", "remote_cache_conn_id", fallback=""))
cache_path = None

if remote_cache_path_str and not AIRFLOW_IO_AVAILABLE:
raise CosmosValueError(
f"You're trying to specify dbt_ls_cache_remote_path {remote_cache_path_str}, but the required Object "
f"Storage feature is unavailable in Airflow version {airflow_version}. Please upgrade to "
f"Airflow 2.8 or later."
)
elif remote_cache_path_str:
from airflow.io.path import ObjectStoragePath

if not remote_cache_conn_id:
remote_cache_conn_id = FILE_SCHEME_AIRFLOW_DEFAULT_CONN_ID_MAP.get(
remote_cache_path_str.split("://")[0], None
)

cache_path = ObjectStoragePath(remote_cache_path_str, conn_id=remote_cache_conn_id)
if not cache_path.exists(): # type: ignore[no-untyped-call]
raise CosmosValueError(
f"`remote_cache_path` {remote_cache_path_str} does not exist or is not accessible using "
f"`remote_cache_conn_id` {remote_cache_conn_id}"
)
return cache_path


remote_cache_path = _configure_remote_cache_path()

0 comments on commit 0f5e28b

Please sign in to comment.