From 60f3029c1c9cd3a9e085d3144772419e77482f55 Mon Sep 17 00:00:00 2001 From: Nam Pham Date: Mon, 4 Mar 2024 10:08:13 -0500 Subject: [PATCH 1/3] modify peloton dep dag to add active flag --- dagfactory/dagbuilder.py | 1 + plugins/dags/peloton_dep_dag.py | 13 ++++++++++++- 2 files changed, 13 insertions(+), 1 deletion(-) diff --git a/dagfactory/dagbuilder.py b/dagfactory/dagbuilder.py index 8272e9ea..2d2ed2cd 100644 --- a/dagfactory/dagbuilder.py +++ b/dagfactory/dagbuilder.py @@ -716,6 +716,7 @@ def build(self) -> Dict[str, Union[str, DAG]]: dag_kwargs["wait_on_tasks"] = dag_params.get("wait_on_tasks", None) dag_kwargs["alert_on_start"] = dag_params.get("alert_on_start", None) dag_kwargs["alert_on_finish"] = dag_params.get("alert_on_finish", None) + dag_kwargs["is_dag_active"] = dag_params.get("is_dag_active", None) operator_defaults: Optional[Dict] = None if utils.check_dict_key(dag_params, "operator_defaults"): diff --git a/plugins/dags/peloton_dep_dag.py b/plugins/dags/peloton_dep_dag.py index f7af6fd3..fa772ec4 100644 --- a/plugins/dags/peloton_dep_dag.py +++ b/plugins/dags/peloton_dep_dag.py @@ -1,7 +1,7 @@ import logging from typing import List, Optional, Union -from airflow.models import DAG +from airflow.models import DAG, DagModel from airflow.operators.dummy import DummyOperator from airflow.models.baseoperator import BaseOperator @@ -61,6 +61,8 @@ class PelotonDepDag(DAG): :param alert_on_finish: whether slack alert is sent on completion of the dag, default is false :type alert_on_finish: boolean all other parameters inherited from DAG would also apply + :param is_dag_active: flag indicating DAG being active as controlled by Airflow scheduler + :type is_dag_active: boolean """ def __init__( @@ -72,6 +74,7 @@ def __init__( wait_on_tasks: Optional[List[dict]] = None, alert_on_start: bool = False, alert_on_finish: bool = False, + is_dag_active: bool = False, *args, **kwargs ): @@ -99,6 +102,14 @@ def __init__( self.wait_on_tasks = wait_on_tasks self.alert_on_start = alert_on_start self.alert_on_finish = alert_on_finish + self.is_dag_active = is_dag_active + + dag = DagModel.get_dagmodel(self.dag_id) + if dag: + if self.is_dag_active: + dag.set_is_paused(False) + else: + dag.set_is_paused(True) def __exit__(self, _type, _value, _tb): roots = self.roots From 60cde928688bdc9666a55d8a4a70877f8e6bcdaf Mon Sep 17 00:00:00 2001 From: Nam Pham Date: Mon, 4 Mar 2024 10:22:06 -0500 Subject: [PATCH 2/3] update version number --- dagfactory/__version__.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dagfactory/__version__.py b/dagfactory/__version__.py index 62da8487..38b8fe50 100644 --- a/dagfactory/__version__.py +++ b/dagfactory/__version__.py @@ -1,2 +1,2 @@ """Module contains the version of dag-factory""" -__version__ = "0.17.1.post11" +__version__ = "0.17.1.post11.dev1" From f306ecaf6652259067e3dd3e261a808c50f2e82e Mon Sep 17 00:00:00 2001 From: Nam Pham Date: Tue, 5 Mar 2024 14:17:43 -0500 Subject: [PATCH 3/3] modify to Prod ENV --- dagfactory/__version__.py | 2 +- plugins/dags/peloton_dep_dag.py | 15 +++++++++------ 2 files changed, 10 insertions(+), 7 deletions(-) diff --git a/dagfactory/__version__.py b/dagfactory/__version__.py index 38b8fe50..05480198 100644 --- a/dagfactory/__version__.py +++ b/dagfactory/__version__.py @@ -1,2 +1,2 @@ """Module contains the version of dag-factory""" -__version__ = "0.17.1.post11.dev1" +__version__ = "0.17.1.post12" diff --git a/plugins/dags/peloton_dep_dag.py b/plugins/dags/peloton_dep_dag.py index fa772ec4..5e09b09f 100644 --- a/plugins/dags/peloton_dep_dag.py +++ b/plugins/dags/peloton_dep_dag.py @@ -1,4 +1,5 @@ import logging +import os from typing import List, Optional, Union from airflow.models import DAG, DagModel @@ -104,12 +105,14 @@ def __init__( self.alert_on_finish = alert_on_finish self.is_dag_active = is_dag_active - dag = DagModel.get_dagmodel(self.dag_id) - if dag: - if self.is_dag_active: - dag.set_is_paused(False) - else: - dag.set_is_paused(True) + # sync status in Production environment + if os.environ["ENV"] == "prod": + dag = DagModel.get_dagmodel(self.dag_id) + if dag: + if self.is_dag_active: + dag.set_is_paused(False) + else: + dag.set_is_paused(True) def __exit__(self, _type, _value, _tb): roots = self.roots