Skip to content

Commit

Permalink
Fix emitting DatasetAlias
Browse files Browse the repository at this point in the history
  • Loading branch information
tatiana committed Sep 24, 2024
1 parent 491fe3e commit d9bbc02
Showing 1 changed file with 12 additions and 2 deletions.
14 changes: 12 additions & 2 deletions cosmos/core/airflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,13 +35,23 @@ def get_airflow_task(task: Task, dag: DAG, task_group: TaskGroup | None = None)
if task.owner != "":
task_kwargs["owner"] = task.owner

if module_name == "local" and AIRFLOW_VERSION >= Version("2.10"):
if module_name.split(".")[-1] == "local" and AIRFLOW_VERSION >= Version("2.10"):
from airflow.datasets import DatasetAlias

# ignoring the type because older versions of Airflow raise the follow error in MyPU
# ignoring the type because older versions of Airflow raise the follow error in mypy
# error: Incompatible types in assignment (expression has type "list[DatasetAlias]", target has type "str") [assignment] Found 1 error in 1 file (checked 3 source files)
task_kwargs["outlets"] = [DatasetAlias(name=get_dataset_alias_name(dag, task_group, task.id))] # type: ignore

logger.info("HELP ME!!!")
logger.info(module_name)
logger.info(Operator)
logger.info(task.id)
logger.info(dag)
logger.info(task_group)
logger.info(task_kwargs)
logger.info({} if class_name == "EmptyOperator" else {"extra_context": task.extra_context})
logger.info(task.arguments)

airflow_task = Operator(
task_id=task.id,
dag=dag,
Expand Down

0 comments on commit d9bbc02

Please sign in to comment.