Skip to content

Commit

Permalink
Add log message to confirm it is reaching the relevant part of the code
Browse files Browse the repository at this point in the history
  • Loading branch information
tatiana committed Sep 24, 2024
1 parent 64461df commit 491fe3e
Showing 1 changed file with 2 additions and 0 deletions.
2 changes: 2 additions & 0 deletions cosmos/operators/local.py
Original file line number Diff line number Diff line change
Expand Up @@ -480,6 +480,7 @@ def register_dataset(self, new_inlets: list[Dataset], new_outlets: list[Dataset]
Until Airflow 2.7, there was not a better interface to associate outlets to a task during execution.
"""
if AIRFLOW_VERSION < Version("2.10"):
logger.info("Assigning inlets/outlets without DatasetAlias")
with create_session() as session:
self.outlets.extend(new_outlets)
self.inlets.extend(new_inlets)
Expand All @@ -490,6 +491,7 @@ def register_dataset(self, new_inlets: list[Dataset], new_outlets: list[Dataset]
DAG.bulk_write_to_db([self.dag], session=session)
session.commit()
else:
logger.info("Assigning inlets/outlets with DatasetAlias")
dataset_alias_name = get_dataset_alias_name(self.dag, self.task_group, self.task_id)
for outlet in new_outlets:
context["outlet_events"][dataset_alias_name].add(outlet)
Expand Down

0 comments on commit 491fe3e

Please sign in to comment.