diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index 2cfe0e00d..3f798602b 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -309,6 +309,7 @@ jobs: AIRFLOW_CONN_AZURE_ABFS_CONN: ${{ secrets.AIRFLOW_CONN_AZURE_ABFS_CONN }} AIRFLOW__CORE__DAGBAG_IMPORT_TIMEOUT: 90.0 PYTHONPATH: /home/runner/work/astronomer-cosmos/astronomer-cosmos/:$PYTHONPATH + AIRFLOW__COSMOS__ENABLE_CACHE: 0 COSMOS_CONN_POSTGRES_PASSWORD: ${{ secrets.COSMOS_CONN_POSTGRES_PASSWORD }} DATABRICKS_CLUSTER_ID: mock DATABRICKS_HOST: mock @@ -388,6 +389,7 @@ jobs: AIRFLOW_CONN_AZURE_ABFS_CONN: ${{ secrets.AIRFLOW_CONN_AZURE_ABFS_CONN }} AIRFLOW__CORE__DAGBAG_IMPORT_TIMEOUT: 90.0 PYTHONPATH: /home/runner/work/astronomer-cosmos/astronomer-cosmos/:$PYTHONPATH + AIRFLOW__COSMOS__ENABLE_CACHE: 0 COSMOS_CONN_POSTGRES_PASSWORD: ${{ secrets.COSMOS_CONN_POSTGRES_PASSWORD }} DATABRICKS_CLUSTER_ID: mock DATABRICKS_HOST: mock @@ -489,7 +491,7 @@ jobs: strategy: matrix: python-version: [ "3.11" ] - airflow-version: [ "2.9" ] + airflow-version: [ "2.10" ] steps: - uses: actions/checkout@v3 with: @@ -511,21 +513,14 @@ jobs: - name: Install packages and dependencies run: | - python -m venv venv - source venv/bin/activate - pip install --upgrade pip - pip install -e ".[tests]" - pip install apache-airflow-providers-cncf-kubernetes - pip install dbt-postgres==1.8.2 psycopg2==2.9.3 pytz - pip install apache-airflow==${{ matrix.airflow-version }} - # hatch -e tests.py${{ matrix.python-version }}-${{ matrix.airflow-version }} run pip freeze + python -m pip install uv + uv pip install --system hatch + hatch -e tests.py${{ matrix.python-version }}-${{ matrix.airflow-version }} run pip freeze - name: Run kubernetes tests run: | - source venv/bin/activate - sh ./scripts/test/kubernetes-setup.sh - cd dev && sh ../scripts/test/integration-kubernetes.sh - # hatch run tests.py${{ matrix.python-version }}-${{ matrix.airflow-version }}:test-kubernetes + hatch run tests.py${{ matrix.python-version }}-${{ matrix.airflow-version }}:test-kubernetes-setup + hatch run tests.py${{ matrix.python-version }}-${{ matrix.airflow-version }}:test-kubernetes env: AIRFLOW_HOME: /home/runner/work/astronomer-cosmos/astronomer-cosmos/ AIRFLOW_CONN_EXAMPLE_CONN: postgres://postgres:postgres@0.0.0.0:5432/postgres diff --git a/cosmos/converter.py b/cosmos/converter.py index d00c867eb..debb5c0bb 100644 --- a/cosmos/converter.py +++ b/cosmos/converter.py @@ -210,6 +210,7 @@ def __init__( *args: Any, **kwargs: Any, ) -> None: + project_config.validate_project() execution_config = execution_config or ExecutionConfig() diff --git a/dev/dags/cosmos_manifest_example.py b/dev/dags/cosmos_manifest_example.py index 8e35208b8..3905fcea8 100644 --- a/dev/dags/cosmos_manifest_example.py +++ b/dev/dags/cosmos_manifest_example.py @@ -71,40 +71,42 @@ def cosmos_manifest_example() -> None: # [END aws_s3_example] # [START gcp_gs_example] - gcp_gs_example = DbtTaskGroup( - group_id="gcp_gs_example", - project_config=ProjectConfig( - manifest_path="gs://cosmos_remote_target/manifest.json", - manifest_conn_id="gcp_gs_conn", - # `manifest_conn_id` is optional. If not provided, the default connection ID `google_cloud_default` is used. - project_name="jaffle_shop", - ), - profile_config=profile_config, - render_config=render_config, - execution_config=execution_config, - operator_args={"install_deps": True}, - ) + # gcp_gs_example = DbtTaskGroup( + # group_id="gcp_gs_example", + # project_config=ProjectConfig( + # manifest_path="gs://cosmos_remote_target/manifest.json", + # manifest_conn_id="gcp_gs_conn", + # # `manifest_conn_id` is optional. If not provided, the default connection ID `google_cloud_default` is used. + # project_name="jaffle_shop", + # ), + # profile_config=profile_config, + # render_config=render_config, + # execution_config=execution_config, + # operator_args={"install_deps": True}, + # ) # [END gcp_gs_example] # [START azure_abfs_example] - azure_abfs_example = DbtTaskGroup( - group_id="azure_abfs_example", - project_config=ProjectConfig( - manifest_path="abfs://cosmos-manifest-test/manifest.json", - manifest_conn_id="azure_abfs_conn", - # `manifest_conn_id` is optional. If not provided, the default connection ID `wasb_default` is used. - project_name="jaffle_shop", - ), - profile_config=profile_config, - render_config=render_config, - execution_config=execution_config, - operator_args={"install_deps": True}, - ) + # azure_abfs_example = DbtTaskGroup( + # group_id="azure_abfs_example", + # project_config=ProjectConfig( + # manifest_path="abfs://cosmos-manifest-test/manifest.json", + # manifest_conn_id="azure_abfs_conn", + # # `manifest_conn_id` is optional. If not provided, the default connection ID `wasb_default` is used. + # project_name="jaffle_shop", + # ), + # profile_config=profile_config, + # render_config=render_config, + # execution_config=execution_config, + # operator_args={"install_deps": True}, + # ) # [END azure_abfs_example] post_dbt = EmptyOperator(task_id="post_dbt") - (pre_dbt >> local_example >> aws_s3_example >> gcp_gs_example >> azure_abfs_example >> post_dbt) + (pre_dbt >> local_example >> aws_s3_example >> post_dbt) + # TODO: re-enable the following + # (pre_dbt >> local_example >> aws_s3_example >> gcp_gs_example >> azure_abfs_example >> post_dbt) cosmos_manifest_example() diff --git a/dev/dags/jaffle_shop_kubernetes.py b/dev/dags/jaffle_shop_kubernetes.py index aaca09ad9..2ed423ca2 100644 --- a/dev/dags/jaffle_shop_kubernetes.py +++ b/dev/dags/jaffle_shop_kubernetes.py @@ -10,6 +10,9 @@ """ +import os +from pathlib import Path + from airflow import DAG from airflow.providers.cncf.kubernetes.secret import Secret from pendulum import datetime @@ -21,9 +24,17 @@ ExecutionMode, ProfileConfig, ProjectConfig, + RenderConfig, ) from cosmos.profiles import PostgresUserPasswordProfileMapping +DEFAULT_DBT_ROOT_PATH = Path(__file__).parent / "dbt" + +DBT_ROOT_PATH = Path(os.getenv("DBT_ROOT_PATH", DEFAULT_DBT_ROOT_PATH)) +AIRFLOW_PROJECT_DIR = DBT_ROOT_PATH / "jaffle_shop" + +K8S_PROJECT_DIR = "dags/dbt/jaffle_shop" + DBT_IMAGE = "dbt-jaffle-shop:1.0.0" project_seeds = [{"project": "jaffle_shop", "seeds": ["raw_customers", "raw_payments", "raw_orders"]}] @@ -51,7 +62,7 @@ # [START kubernetes_seed_example] load_seeds = DbtSeedKubernetesOperator( task_id="load_seeds", - project_dir="dags/dbt/jaffle_shop", + project_dir=K8S_PROJECT_DIR, get_logs=True, schema="public", image=DBT_IMAGE, @@ -72,6 +83,7 @@ # [START kubernetes_tg_example] run_models = DbtTaskGroup( + project_config=ProjectConfig(), profile_config=ProfileConfig( profile_name="postgres_profile", target_name="dev", @@ -82,10 +94,8 @@ }, ), ), - project_config=ProjectConfig(dbt_project_path="dags/dbt/jaffle_shop"), - execution_config=ExecutionConfig( - execution_mode=ExecutionMode.KUBERNETES, - ), + render_config=RenderConfig(dbt_project_path=AIRFLOW_PROJECT_DIR), + execution_config=ExecutionConfig(execution_mode=ExecutionMode.KUBERNETES, dbt_project_path=K8S_PROJECT_DIR), operator_args={ "image": DBT_IMAGE, "get_logs": True, diff --git a/pyproject.toml b/pyproject.toml index 128af27d9..416dd0f19 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -85,20 +85,6 @@ docs = [ "sphinx-autoapi", "apache-airflow-providers-cncf-kubernetes>=5.1.1", ] -tests = [ - "packaging>=22.0", - "pytest>=6.0", - "pytest-split", - "pytest-dotenv", - "requests-mock", - "pytest-cov", - "pytest-describe", - "sqlalchemy-stubs", # Change when sqlalchemy is upgraded https://docs.sqlalchemy.org/en/14/orm/extensions/mypy.html - "types-pytz", - "types-requests", - "sqlalchemy-stubs", # Change when sqlalchemy is upgraded https://docs.sqlalchemy.org/en/14/orm/extensions/mypy.html - "pre-commit", -] docker = [ "apache-airflow-providers-docker>=3.5.0", ] @@ -141,19 +127,23 @@ packages = ["/cosmos"] [tool.hatch.envs.tests] dependencies = [ - "astronomer-cosmos[tests]", - "apache-airflow-providers-cncf-kubernetes>=5.1.1", - "apache-airflow-providers-amazon[s3fs]>=3.0.0", - "apache-airflow-providers-docker>=3.5.0", - "apache-airflow-providers-google", - "apache-airflow-providers-microsoft-azure", - "apache-airflow-providers-postgres", + # do not install Airflow dependencies here! + # we need to use constrantis and we're doing this in the `pre-install-commands` + "packaging>=22.0", + "pytest>=6.0", + "pytest-split", + "pytest-dotenv", + "requests-mock", + "pytest-cov", + "pytest-describe", + "sqlalchemy-stubs", # Change when sqlalchemy is upgraded https://docs.sqlalchemy.org/en/14/orm/extensions/mypy.html + "pre-commit", "types-PyYAML", "types-attrs", + "types-pytz", "types-requests", "types-python-dateutil", "Werkzeug<3.0.0", - "apache-airflow~={matrix:airflow}.0,!=2.9.0,!=2.9.1", # https://github.com/apache/airflow/pull/39670 ] pre-install-commands = ["sh scripts/test/pre-install-airflow.sh {matrix:airflow} {matrix:python}"] @@ -164,8 +154,6 @@ airflow = ["2.4", "2.5", "2.6", "2.7", "2.8", "2.9", "2.10"] [tool.hatch.envs.tests.overrides] matrix.airflow.dependencies = [ { value = "typing_extensions<4.6", if = ["2.6"] }, - { value = "apache-airflow-providers-google<10.11.0", if = ["2.4"] }, - { value = "apache-airflow-providers-google>=10.11.0", if = ["2.5", "2.6", "2.7", "2.8", "2.9", "2.10"] }, ] [tool.hatch.envs.tests.scripts] @@ -174,6 +162,7 @@ test = 'sh scripts/test/unit.sh' test-cov = 'sh scripts/test/unit-cov.sh' test-integration = 'sh scripts/test/integration.sh' test-kubernetes = "sh scripts/test/integration-kubernetes.sh" +test-kubernetes-setup = "sh scripts/test/kubernetes-setup.sh" test-integration-dbt-1-5-4 = 'sh scripts/test/integration-dbt-1-5-4.sh' test-integration-expensive = 'sh scripts/test/integration-expensive.sh' test-integration-setup = 'sh scripts/test/integration-setup.sh' @@ -181,7 +170,7 @@ test-integration-sqlite = 'sh scripts/test/integration-sqlite.sh' test-integration-sqlite-setup = 'sh scripts/test/integration-sqlite-setup.sh' test-performance = 'sh scripts/test/performance.sh' test-performance-setup = 'sh scripts/test/performance-setup.sh' -type-check = " pre-commit run mypy --files cosmos/**/*" +type-check = "pre-commit run mypy --files cosmos/**/*" [tool.pytest.ini_options] filterwarnings = ["ignore::DeprecationWarning"] diff --git a/scripts/test/integration-kubernetes.sh b/scripts/test/integration-kubernetes.sh index 1cc305c62..402da1437 100644 --- a/scripts/test/integration-kubernetes.sh +++ b/scripts/test/integration-kubernetes.sh @@ -13,4 +13,4 @@ pytest -vv \ --cov-report=xml \ --durations=0 \ -m integration \ - ../tests/test_example_k8s_dags.py + tests/test_example_k8s_dags.py diff --git a/scripts/test/kubernetes-setup.sh b/scripts/test/kubernetes-setup.sh index d40aaa187..90021f544 100644 --- a/scripts/test/kubernetes-setup.sh +++ b/scripts/test/kubernetes-setup.sh @@ -5,6 +5,9 @@ set -x set -e + +pip install dbt-postgres==1.8.2 psycopg2==2.9.3 pytz + # Create a Kubernetes secret named 'postgres-secrets' with the specified literals for host and password kubectl create secret generic postgres-secrets \ --from-literal=host=postgres-postgresql.default.svc.cluster.local \ diff --git a/scripts/test/pre-install-airflow.sh b/scripts/test/pre-install-airflow.sh index 29ec01038..e54a32706 100755 --- a/scripts/test/pre-install-airflow.sh +++ b/scripts/test/pre-install-airflow.sh @@ -1,5 +1,8 @@ #!/bin/bash +set -x +set -v + AIRFLOW_VERSION="$1" PYTHON_VERSION="$2" @@ -18,19 +21,40 @@ curl -sSL $CONSTRAINT_URL -o /tmp/constraint.txt # Workaround to remove PyYAML constraint that will work on both Linux and MacOS sed '/PyYAML==/d' /tmp/constraint.txt > /tmp/constraint.txt.tmp mv /tmp/constraint.txt.tmp /tmp/constraint.txt + # Install Airflow with constraints pip install uv -uv pip install "apache-airflow==$AIRFLOW_VERSION" --constraint /tmp/constraint.txt -uv pip install pydantic --constraint /tmp/constraint.txt +uv pip install pip --upgrade +uv pip install "apache-airflow==$AIRFLOW_VERSION" --constraint /tmp/constraint.txt +uv pip install apache-airflow-providers-docker --constraint /tmp/constraint.txt +uv pip install apache-airflow-providers-postgres --constraint /tmp/constraint.txt -if [ "$AIRFLOW_VERSION" = "2.7" ]; then +if [ "$AIRFLOW_VERSION" = "2.4" ] || [ "$AIRFLOW_VERSION" = "2.5" ] || [ "$AIRFLOW_VERSION" = "2.6" ] ; then + uv pip install "apache-airflow-providers-amazon" "apache-airflow==$AIRFLOW_VERSION" "urllib3<2" + uv pip install "apache-airflow-providers-cncf-kubernetes" "apache-airflow==$AIRFLOW_VERSION" + uv pip install "apache-airflow-providers-google<10.11" "apache-airflow==$AIRFLOW_VERSION" + uv pip install "apache-airflow-providers-microsoft-azure" "apache-airflow==$AIRFLOW_VERSION" + uv pip install pyopenssl --upgrade +elif [ "$AIRFLOW_VERSION" = "2.7" ] ; then + uv pip install "apache-airflow-providers-amazon" --constraint /tmp/constraint.txt + uv pip install "apache-airflow-providers-cncf-kubernetes" --constraint /tmp/constraint.txt + uv pip install "apache-airflow-providers-google>10.11" "apache-airflow==$AIRFLOW_VERSION" + uv pip install apache-airflow-providers-microsoft-azure --constraint /tmp/constraint.txt +else uv pip install "apache-airflow-providers-amazon[s3fs]" --constraint /tmp/constraint.txt uv pip install "apache-airflow-providers-cncf-kubernetes" --constraint /tmp/constraint.txt - uv pip install "apache-airflow-providers-docker" --constraint /tmp/constraint.txt - uv pip install "apache-airflow-providers-google" --constraint /tmp/constraint.txt - uv pip install "apache-airflow-providers-microsoft-azure" --constraint /tmp/constraint.txt - uv pip install "apache-airflow-providers-postgres" --constraint /tmp/constraint.txt + uv pip install "apache-airflow-providers-google>=10.11.0" --constraint /tmp/constraint.txt + uv pip install apache-airflow-providers-microsoft-azure --constraint /tmp/constraint.txt fi rm /tmp/constraint.txt + +actual_version=$(airflow version | cut -d. -f1,2) + +if [ "$actual_version" = $AIRFLOW_VERSION ]; then + echo "Version is as expected: $AIRFLOW_VERSION" +else + echo "Version does not match. Expected: $AIRFLOW_VERSION, but got: $actual_version" + exit 1 +fi diff --git a/tests/operators/test_kubernetes.py b/tests/operators/test_kubernetes.py index 8e0dda9c0..51375f66b 100644 --- a/tests/operators/test_kubernetes.py +++ b/tests/operators/test_kubernetes.py @@ -2,8 +2,10 @@ from unittest.mock import MagicMock, patch import pytest +from airflow import __version__ as airflow_version from airflow.models import TaskInstance from airflow.utils.context import Context, context_merge +from packaging import version from pendulum import datetime from cosmos.operators.kubernetes import ( @@ -115,6 +117,11 @@ def test_dbt_kubernetes_operator_get_env(p_context_to_airflow_vars: MagicMock, b "no_version_check": True, } + +if version.parse(airflow_version) == version.parse("2.4"): + base_kwargs["name"] = "some-pod-name" + + result_map = { "ls": DbtLSKubernetesOperator(**base_kwargs), "run": DbtRunKubernetesOperator(**base_kwargs),