From 491fe3ef850f23ee33ef7a9a19b7d15177eca2f8 Mon Sep 17 00:00:00 2001 From: Tatiana Al-Chueyr Date: Tue, 24 Sep 2024 10:40:44 +0300 Subject: [PATCH] Add log message to confirm it is reaching the relevant part of the code --- cosmos/operators/local.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/cosmos/operators/local.py b/cosmos/operators/local.py index e95907042..58fa5f63b 100644 --- a/cosmos/operators/local.py +++ b/cosmos/operators/local.py @@ -480,6 +480,7 @@ def register_dataset(self, new_inlets: list[Dataset], new_outlets: list[Dataset] Until Airflow 2.7, there was not a better interface to associate outlets to a task during execution. """ if AIRFLOW_VERSION < Version("2.10"): + logger.info("Assigning inlets/outlets without DatasetAlias") with create_session() as session: self.outlets.extend(new_outlets) self.inlets.extend(new_inlets) @@ -490,6 +491,7 @@ def register_dataset(self, new_inlets: list[Dataset], new_outlets: list[Dataset] DAG.bulk_write_to_db([self.dag], session=session) session.commit() else: + logger.info("Assigning inlets/outlets with DatasetAlias") dataset_alias_name = get_dataset_alias_name(self.dag, self.task_group, self.task_id) for outlet in new_outlets: context["outlet_events"][dataset_alias_name].add(outlet)