From d9bbc02bde6b65ce0badfd30db135982cb18f227 Mon Sep 17 00:00:00 2001 From: Tatiana Al-Chueyr Date: Tue, 24 Sep 2024 13:20:50 +0300 Subject: [PATCH] Fix emitting DatasetAlias --- cosmos/core/airflow.py | 14 ++++++++++++-- 1 file changed, 12 insertions(+), 2 deletions(-) diff --git a/cosmos/core/airflow.py b/cosmos/core/airflow.py index a72000ac1..458b8fa56 100644 --- a/cosmos/core/airflow.py +++ b/cosmos/core/airflow.py @@ -35,13 +35,23 @@ def get_airflow_task(task: Task, dag: DAG, task_group: TaskGroup | None = None) if task.owner != "": task_kwargs["owner"] = task.owner - if module_name == "local" and AIRFLOW_VERSION >= Version("2.10"): + if module_name.split(".")[-1] == "local" and AIRFLOW_VERSION >= Version("2.10"): from airflow.datasets import DatasetAlias - # ignoring the type because older versions of Airflow raise the follow error in MyPU + # ignoring the type because older versions of Airflow raise the follow error in mypy # error: Incompatible types in assignment (expression has type "list[DatasetAlias]", target has type "str") [assignment] Found 1 error in 1 file (checked 3 source files) task_kwargs["outlets"] = [DatasetAlias(name=get_dataset_alias_name(dag, task_group, task.id))] # type: ignore + logger.info("HELP ME!!!") + logger.info(module_name) + logger.info(Operator) + logger.info(task.id) + logger.info(dag) + logger.info(task_group) + logger.info(task_kwargs) + logger.info({} if class_name == "EmptyOperator" else {"extra_context": task.extra_context}) + logger.info(task.arguments) + airflow_task = Operator( task_id=task.id, dag=dag,