Skip to content

Commit

Permalink
Fix emitting DatasetAlias
Browse files Browse the repository at this point in the history
  • Loading branch information
tatiana committed Sep 24, 2024
1 parent f2c8cf1 commit 04cc589
Showing 1 changed file with 11 additions and 1 deletion.
12 changes: 11 additions & 1 deletion cosmos/core/airflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,13 +35,23 @@ def get_airflow_task(task: Task, dag: DAG, task_group: TaskGroup | None = None)
if task.owner != "":
task_kwargs["owner"] = task.owner

if module_name == "local" and AIRFLOW_VERSION >= Version("2.10"):
if module_name.split(".")[-1] == "local" and AIRFLOW_VERSION >= Version("2.10"):
from airflow.datasets import DatasetAlias

# ignoring the type because older versions of Airflow raise the follow error in MyPU
# error: Incompatible types in assignment (expression has type "list[DatasetAlias]", target has type "str") [assignment] Found 1 error in 1 file (checked 3 source files)
task_kwargs["outlets"] = [DatasetAlias(name=get_dataset_alias_name(dag, task_group, task.id))] # type: ignore

logger.info("HELP ME!!!")
logger.info(module_name)
logger.info(Operator)
logger.info(task.id)
logger.info(dag)
logger.info(task_group)
logger.info(task_kwargs)
logger.info({} if class_name == "EmptyOperator" else {"extra_context": task.extra_context})
logger.info(task.arguments)

airflow_task = Operator(
task_id=task.id,
dag=dag,
Expand Down

0 comments on commit 04cc589

Please sign in to comment.