Skip to content

Commit

Permalink
Merge branch 'main' of https://github.com/ags-de/astronomer-cosmos in…
Browse files Browse the repository at this point in the history
…to gcpcloudrun-executor
  • Loading branch information
ags-de committed Sep 24, 2024
2 parents 5dcec92 + aa26fc3 commit 85c4c0a
Show file tree
Hide file tree
Showing 26 changed files with 1,062 additions and 175 deletions.
24 changes: 18 additions & 6 deletions .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -32,14 +32,14 @@ jobs:
architecture: "x64"

- run: pip3 install hatch
- run: hatch run tests.py3.9-2.7:type-check
- run: hatch run tests.py3.9-2.8:type-check

Run-Unit-Tests:
runs-on: ubuntu-latest
strategy:
matrix:
python-version: ["3.8", "3.9", "3.10", "3.11", "3.12"]
airflow-version: ["2.4", "2.5", "2.6", "2.7", "2.8", "2.9"]
airflow-version: ["2.4", "2.5", "2.6", "2.7", "2.8", "2.9", "2.10"]
exclude:
- python-version: "3.11"
airflow-version: "2.4"
Expand All @@ -59,6 +59,12 @@ jobs:
airflow-version: "2.7"
- python-version: "3.12"
airflow-version: "2.8"
# It's observed that the dependencies resolution for Apache Airflow versions 2.7 are error-ring out with deep
# resolutions. This is a temporary exclusion until the issue is resolved.
- python-version: "3.8"
airflow-version: "2.7"
- python-version: "3.9"
airflow-version: "2.7"
steps:
- uses: actions/checkout@v3
with:
Expand Down Expand Up @@ -98,12 +104,18 @@ jobs:
strategy:
matrix:
python-version: ["3.8", "3.9", "3.10", "3.11"]
airflow-version: ["2.4", "2.5", "2.6", "2.7", "2.8", "2.9"]
airflow-version: ["2.4", "2.5", "2.6", "2.7", "2.8", "2.9", "2.10"]
exclude:
- python-version: "3.11"
airflow-version: "2.4"
- python-version: "3.11"
airflow-version: "2.5"
# It's observed that the dependencies resolution for Apache Airflow versions 2.7 are error-ring out with deep
# resolutions. This is a temporary exclusion until the issue is resolved.
- python-version: "3.8"
airflow-version: "2.7"
- python-version: "3.9"
airflow-version: "2.7"
services:
postgres:
image: postgres
Expand Down Expand Up @@ -254,7 +266,7 @@ jobs:
strategy:
matrix:
python-version: ["3.11"]
airflow-version: ["2.7"]
airflow-version: ["2.8"]

steps:
- uses: actions/checkout@v3
Expand Down Expand Up @@ -319,7 +331,7 @@ jobs:
strategy:
matrix:
python-version: [ "3.11" ]
airflow-version: [ "2.7" ]
airflow-version: [ "2.8" ]
services:
postgres:
image: postgres
Expand Down Expand Up @@ -395,7 +407,7 @@ jobs:
strategy:
matrix:
python-version: ["3.11"]
airflow-version: ["2.7"]
airflow-version: ["2.8"]
num-models: [1, 10, 50, 100]
services:
postgres:
Expand Down
4 changes: 2 additions & 2 deletions .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ repos:
- --py37-plus
- --keep-runtime-typing
- repo: https://github.com/astral-sh/ruff-pre-commit
rev: v0.5.7
rev: v0.6.2
hooks:
- id: ruff
args:
Expand All @@ -71,7 +71,7 @@ repos:
alias: black
additional_dependencies: [black>=22.10.0]
- repo: https://github.com/pre-commit/mirrors-mypy
rev: "v1.11.1"
rev: "v1.11.2"

hooks:
- id: mypy
Expand Down
42 changes: 42 additions & 0 deletions CHANGELOG.rst
Original file line number Diff line number Diff line change
@@ -1,6 +1,48 @@
Changelog
=========

1.6.0 (2024-08-20)
--------------------

New Features

* Add support for loading manifest from cloud stores using Airflow Object Storage by @pankajkoti in #1109
* Cache ``package-lock.yml`` file by @pankajastro in #1086
* Support persisting the ``LoadMode.VIRTUALENV`` directory @LennartKloppenburg and @tatiana in #1079 and #611
* Add support to store and fetch ``dbt ls`` cache in remote stores by @pankajkoti in #1147
* Add default source nodes rendering by @arojasb3 in #1107
* Add Teradata ``ProfileMapping`` by @sc250072 in #1077

Enhancements

* Add ``DatabricksOauthProfileMapping`` profile by @CorsettiS in #1091
* Use ``dbt ls`` as the default parser when ``profile_config`` is provided by @pankajastro in #1101
* Add task owner to dbt operators by @wornjs in #1082
* Extend Cosmos custom selector to support + when using paths and tags by @mvictoria in #1150
* Simplify logging by @dwreeves in #1108

Bug fixes

* Fix Teradata ``ProfileMapping`` target invalid issue by @sc250072 in #1088
* Fix empty tag in case of custom parser by @pankajastro in #1100
* Fix ``dbt deps`` of ``LoadMode.DBT_LS`` should use ``ProjectConfig.dbt_vars`` by @tatiana in #1114
* Fix import handling by lazy loading hooks introduced in PR #1109 by @dwreeves in #1132
* Fix Airflow 2.10 regression and add Airflow 2.10 in test matrix by @pankajastro in #1162

Docs

* Fix typo in azure-container-instance docs by @pankajastro in #1106
* Use Airflow trademark as it has been registered by @pankajastro in #1105

Others

* Run some example DAGs in Kubernetes execution mode in CI by @pankajastro in #1127
* Install requirements.txt by default during dev env spin up by @@CorsettiS in #1099
* Remove ``DbtGraph.current_version`` dead code by @tatiana in #1111
* Disable test for Airflow-2.5 and Python-3.11 combination in CI by @pankajastro in #1124
* Pre-commit hook updates in #1074, #1113, #1125, #1144, #1154, #1167


1.5.1 (2024-07-17)
------------------

Expand Down
16 changes: 14 additions & 2 deletions cosmos/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
Contains dags, task groups, and operators.
"""
__version__ = "1.5.1"
__version__ = "1.6.0"


from cosmos.airflow.dag import DbtDag
Expand All @@ -16,7 +16,15 @@
ProjectConfig,
RenderConfig,
)
from cosmos.constants import ExecutionMode, LoadMode, TestBehavior
from cosmos.constants import (
DbtResourceType,
ExecutionMode,
InvocationMode,
LoadMode,
SourceRenderingBehavior,
TestBehavior,
TestIndirectSelection,
)
from cosmos.log import get_logger
from cosmos.operators.lazy_load import MissingPackage
from cosmos.operators.local import (
Expand Down Expand Up @@ -198,6 +206,10 @@
"ExecutionMode",
"LoadMode",
"TestBehavior",
"InvocationMode",
"TestIndirectSelection",
"SourceRenderingBehavior",
"DbtResourceType",
]

"""
Expand Down
103 changes: 103 additions & 0 deletions cosmos/cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
from airflow.models.dag import DAG
from airflow.utils.session import provide_session
from airflow.utils.task_group import TaskGroup
from airflow.version import version as airflow_version
from sqlalchemy import select
from sqlalchemy.orm import Session

Expand All @@ -25,22 +26,66 @@
DBT_MANIFEST_FILE_NAME,
DBT_TARGET_DIR_NAME,
DEFAULT_PROFILES_FILE_NAME,
FILE_SCHEME_AIRFLOW_DEFAULT_CONN_ID_MAP,
PACKAGE_LOCKFILE_YML,
)
from cosmos.dbt.project import get_partial_parse_path
from cosmos.exceptions import CosmosValueError
from cosmos.log import get_logger
from cosmos.settings import (
AIRFLOW_IO_AVAILABLE,
cache_dir,
dbt_profile_cache_dir_name,
enable_cache,
enable_cache_package_lockfile,
enable_cache_profile,
remote_cache_dir_conn_id,
)
from cosmos.settings import remote_cache_dir as settings_remote_cache_dir

logger = get_logger(__name__)
VAR_KEY_CACHE_PREFIX = "cosmos_cache__"


def _configure_remote_cache_dir() -> Path | None:
"""Configure the remote cache dir if it is provided."""
if not settings_remote_cache_dir:
return None

_configured_cache_dir = None

cache_dir_str = str(settings_remote_cache_dir)

remote_cache_conn_id = remote_cache_dir_conn_id
if not remote_cache_conn_id:
cache_dir_schema = cache_dir_str.split("://")[0]
remote_cache_conn_id = FILE_SCHEME_AIRFLOW_DEFAULT_CONN_ID_MAP.get(cache_dir_schema, None) # type: ignore[assignment]
if remote_cache_conn_id is None:
return _configured_cache_dir

if not AIRFLOW_IO_AVAILABLE:
raise CosmosValueError(
f"You're trying to specify remote cache_dir {cache_dir_str}, but the required "
f"Object Storage feature is unavailable in Airflow version {airflow_version}. Please upgrade to "
"Airflow 2.8 or later."
)

from airflow.io.path import ObjectStoragePath

_configured_cache_dir = ObjectStoragePath(cache_dir_str, conn_id=remote_cache_conn_id)

if not _configured_cache_dir.exists(): # type: ignore[no-untyped-call]
# TODO: Check if we should raise an error instead in case the provided path does not exist.
_configured_cache_dir.mkdir(parents=True, exist_ok=True)

# raise CosmosValueError(
# f"remote_cache_path `{cache_dir_str}` does not exist or is not accessible using "
# f"remote_cache_conn_id `{remote_cache_conn_id}`"
# )

return _configured_cache_dir


def _get_airflow_metadata(dag: DAG, task_group: TaskGroup | None) -> dict[str, str | None]:
dag_id = None
task_group_id = None
Expand Down Expand Up @@ -366,6 +411,64 @@ def delete_unused_dbt_ls_cache(
return deleted_cosmos_variables


# TODO: Add integration tests once remote cache is supported in the CI pipeline
@provide_session
def delete_unused_dbt_ls_remote_cache_files( # pragma: no cover
max_age_last_usage: timedelta = timedelta(days=30), session: Session | None = None
) -> int:
"""
Delete Cosmos cache stored in remote storage based on the last execution of their associated DAGs.
"""
if session is None:
return 0

logger.info(f"Delete the Cosmos cache stored remotely that hasn't been used for {max_age_last_usage}")
cosmos_dags_ids_remote_cache_files = defaultdict(list)

configured_remote_cache_dir = _configure_remote_cache_dir()
if not configured_remote_cache_dir:
logger.info(
"No remote cache directory configured. Skipping the deletion of the dbt ls cache files in remote storage."
)
return 0

dirs = [obj for obj in configured_remote_cache_dir.iterdir() if obj.is_dir()]
files = [f for label in dirs for f in label.iterdir() if f.is_file()]

total_cosmos_remote_cache_files = 0
for file in files:
prefix_path = (configured_remote_cache_dir / VAR_KEY_CACHE_PREFIX).as_uri()
if file.as_uri().startswith(prefix_path):
with file.open("r") as fp:
cache_dict = json.load(fp)
cosmos_dags_ids_remote_cache_files[cache_dict["dag_id"]].append(file)
total_cosmos_remote_cache_files += 1

deleted_cosmos_remote_cache_files = 0

for dag_id, files in cosmos_dags_ids_remote_cache_files.items():
last_dag_run = (
session.query(DagRun)
.filter(
DagRun.dag_id == dag_id,
)
.order_by(DagRun.execution_date.desc())
.first()
)
if last_dag_run and last_dag_run.execution_date < (datetime.now(timezone.utc) - max_age_last_usage):
for file in files:
logger.info(f"Removing the dbt ls cache remote file {file}")
file.unlink()
deleted_cosmos_remote_cache_files += 1
logger.info(
"Deleted %s/%s dbt ls cache files in remote storage.",
deleted_cosmos_remote_cache_files,
total_cosmos_remote_cache_files,
)

return deleted_cosmos_remote_cache_files


def is_profile_cache_enabled() -> bool:
"""Return True if global and profile cache is enable"""
return enable_cache and enable_cache_profile
Expand Down
4 changes: 4 additions & 0 deletions cosmos/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -359,6 +359,8 @@ class ExecutionConfig:
:param test_indirect_selection: The mode to configure the test behavior when performing indirect selection.
:param dbt_executable_path: The path to the dbt executable for runtime execution. Defaults to dbt if available on the path.
:param dbt_project_path: Configures the DBT project location accessible at runtime for dag execution. This is the project path in a docker container for ExecutionMode.DOCKER or ExecutionMode.KUBERNETES. Mutually Exclusive with ProjectConfig.dbt_project_path
:param virtualenv_dir: Directory path to locate the (cached) virtual env that
should be used for execution when execution mode is set to `ExecutionMode.VIRTUALENV`
"""

execution_mode: ExecutionMode = ExecutionMode.LOCAL
Expand All @@ -367,6 +369,8 @@ class ExecutionConfig:
dbt_executable_path: str | Path = field(default_factory=get_system_dbt)

dbt_project_path: InitVar[str | Path | None] = None
virtualenv_dir: str | Path | None = None

project_path: Path | None = field(init=False)

def __post_init__(self, dbt_project_path: str | Path | None) -> None:
Expand Down
22 changes: 19 additions & 3 deletions cosmos/converter.py
Original file line number Diff line number Diff line change
Expand Up @@ -219,12 +219,26 @@ def __init__(
validate_initial_user_config(execution_config, profile_config, project_config, render_config, operator_args)

if project_config.dbt_project_path:
execution_config, render_config = migrate_to_new_interface(execution_config, project_config, render_config)
# We copy the configuration so the change does not affect other DAGs or TaskGroups
# that may reuse the same original configuration
render_config = copy.deepcopy(render_config)
execution_config = copy.deepcopy(execution_config)
render_config.project_path = project_config.dbt_project_path
execution_config.project_path = project_config.dbt_project_path

validate_adapted_user_config(execution_config, project_config, render_config)

env_vars = project_config.env_vars or operator_args.get("env")
dbt_vars = project_config.dbt_vars or operator_args.get("vars")
env_vars = copy.deepcopy(project_config.env_vars or operator_args.get("env"))
dbt_vars = copy.deepcopy(project_config.dbt_vars or operator_args.get("vars"))

if execution_config.execution_mode != ExecutionMode.VIRTUALENV and execution_config.virtualenv_dir is not None:
logger.warning(
"`ExecutionConfig.virtualenv_dir` is only supported when \
ExecutionConfig.execution_mode is set to ExecutionMode.VIRTUALENV."
)

if not operator_args:
operator_args = {}

cache_dir = None
cache_identifier = None
Expand Down Expand Up @@ -274,6 +288,8 @@ def __init__(
task_args,
execution_mode=execution_config.execution_mode,
)
if execution_config.execution_mode == ExecutionMode.VIRTUALENV and execution_config.virtualenv_dir is not None:
task_args["virtualenv_dir"] = execution_config.virtualenv_dir

build_airflow_graph(
nodes=self.dbt_graph.filtered_nodes,
Expand Down
Loading

0 comments on commit 85c4c0a

Please sign in to comment.