Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update Dag Factory to use active flag #17

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion dagfactory/__version__.py
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
"""Module contains the version of dag-factory"""
__version__ = "0.17.1.post11"
__version__ = "0.17.1.post12"
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

could you use post13 if you don't mind? I'm pushing in an emergency fix today that will use post12

1 change: 1 addition & 0 deletions dagfactory/dagbuilder.py
Original file line number Diff line number Diff line change
Expand Up @@ -716,6 +716,7 @@ def build(self) -> Dict[str, Union[str, DAG]]:
dag_kwargs["wait_on_tasks"] = dag_params.get("wait_on_tasks", None)
dag_kwargs["alert_on_start"] = dag_params.get("alert_on_start", None)
dag_kwargs["alert_on_finish"] = dag_params.get("alert_on_finish", None)
dag_kwargs["is_dag_active"] = dag_params.get("is_dag_active", None)

operator_defaults: Optional[Dict] = None
if utils.check_dict_key(dag_params, "operator_defaults"):
Expand Down
16 changes: 15 additions & 1 deletion plugins/dags/peloton_dep_dag.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
import logging
import os
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not a big deal but it's not really necessary to update this class. dag factory actually imports from the airflow repo at run time.

from typing import List, Optional, Union

from airflow.models import DAG
from airflow.models import DAG, DagModel
from airflow.operators.dummy import DummyOperator
from airflow.models.baseoperator import BaseOperator

Expand Down Expand Up @@ -61,6 +62,8 @@ class PelotonDepDag(DAG):
:param alert_on_finish: whether slack alert is sent on completion of the dag, default is false
:type alert_on_finish: boolean
all other parameters inherited from DAG would also apply
:param is_dag_active: flag indicating DAG being active as controlled by Airflow scheduler
:type is_dag_active: boolean
"""

def __init__(
Expand All @@ -72,6 +75,7 @@ def __init__(
wait_on_tasks: Optional[List[dict]] = None,
alert_on_start: bool = False,
alert_on_finish: bool = False,
is_dag_active: bool = False,
*args,
**kwargs
):
Expand Down Expand Up @@ -99,6 +103,16 @@ def __init__(
self.wait_on_tasks = wait_on_tasks
self.alert_on_start = alert_on_start
self.alert_on_finish = alert_on_finish
self.is_dag_active = is_dag_active

# sync status in Production environment
if os.environ["ENV"] == "prod":
dag = DagModel.get_dagmodel(self.dag_id)
if dag:
if self.is_dag_active:
dag.set_is_paused(False)
else:
dag.set_is_paused(True)

def __exit__(self, _type, _value, _tb):
roots = self.roots
Expand Down
Loading