Skip to content

Commit

Permalink
Remove deprecated and unused methods / properties on DAG (apache#41440)
Browse files Browse the repository at this point in the history
* Remove deprecated and unused methods / properties on DAG

(cherry picked from commit 6bd4f83062151d427dab764bca123ba396eda6c0)

* add newsfragment

* fix test

* fix test

* Fix formatting

---------

Co-authored-by: Tzu-ping Chung <[email protected]>
  • Loading branch information
dstandish and uranusjr authored Aug 14, 2024
1 parent 9221bb5 commit 1fcbde2
Show file tree
Hide file tree
Showing 4 changed files with 27 additions and 257 deletions.
233 changes: 0 additions & 233 deletions airflow/models/dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,6 @@
from airflow.timetables.trigger import CronTriggerTimetable
from airflow.utils import timezone
from airflow.utils.dag_cycle_tester import check_cycle
from airflow.utils.dates import cron_presets, date_range as utils_date_range
from airflow.utils.decorators import fixup_decorator_warning_stack
from airflow.utils.helpers import at_most_one, exactly_one, validate_instance_args, validate_key
from airflow.utils.log.logging_mixin import LoggingMixin
Expand Down Expand Up @@ -944,60 +943,6 @@ def update_old_perm(permission: str):

return updated_access_control

def date_range(
self,
start_date: pendulum.DateTime,
num: int | None = None,
end_date: datetime | None = None,
) -> list[datetime]:
message = "`DAG.date_range()` is deprecated."
if num is not None:
warnings.warn(message, category=RemovedInAirflow3Warning, stacklevel=2)
with warnings.catch_warnings():
warnings.simplefilter("ignore", RemovedInAirflow3Warning)
return utils_date_range(
start_date=start_date, num=num, delta=self.normalized_schedule_interval
)
message += " Please use `DAG.iter_dagrun_infos_between(..., align=False)` instead."
warnings.warn(message, category=RemovedInAirflow3Warning, stacklevel=2)
if end_date is None:
coerced_end_date = timezone.utcnow()
else:
coerced_end_date = end_date
it = self.iter_dagrun_infos_between(start_date, pendulum.instance(coerced_end_date), align=False)
return [info.logical_date for info in it]

def is_fixed_time_schedule(self):
"""
Figures out if the schedule has a fixed time (e.g. 3 AM every day).
Detection is done by "peeking" the next two cron trigger time; if the
two times have the same minute and hour value, the schedule is fixed,
and we *don't* need to perform the DST fix.
This assumes DST happens on whole minute changes (e.g. 12:59 -> 12:00).
Do not try to understand what this actually means. It is old logic that
should not be used anywhere.
"""
warnings.warn(
"`DAG.is_fixed_time_schedule()` is deprecated.",
category=RemovedInAirflow3Warning,
stacklevel=2,
)

from airflow.timetables._cron import CronMixin

if not isinstance(self.timetable, CronMixin):
return True

from croniter import croniter

cron = croniter(self.timetable._expression)
next_a = cron.get_next(datetime)
next_b = cron.get_next(datetime)
return next_b.minute == next_a.minute and next_b.hour == next_a.hour

def following_schedule(self, dttm):
"""
Calculate the following schedule for this dag in UTC.
Expand Down Expand Up @@ -1162,21 +1107,6 @@ def next_dagrun_info(
info = None
return info

def next_dagrun_after_date(self, date_last_automated_dagrun: pendulum.DateTime | None):
warnings.warn(
"`DAG.next_dagrun_after_date()` is deprecated. Please use `DAG.next_dagrun_info()` instead.",
category=RemovedInAirflow3Warning,
stacklevel=2,
)
if date_last_automated_dagrun is None:
data_interval = None
else:
data_interval = self.infer_automated_data_interval(date_last_automated_dagrun)
info = self.next_dagrun_info(data_interval)
if info is None:
return None
return info.run_after

@functools.cached_property
def _time_restriction(self) -> TimeRestriction:
start_dates = [t.start_date for t in self.tasks if t.start_date]
Expand Down Expand Up @@ -1267,46 +1197,6 @@ def iter_dagrun_infos_between(
)
break

def get_run_dates(self, start_date, end_date=None) -> list:
"""
Return a list of dates between the interval received as parameter using this dag's schedule interval.
Returned dates can be used for execution dates.
:param start_date: The start date of the interval.
:param end_date: The end date of the interval. Defaults to ``timezone.utcnow()``.
:return: A list of dates within the interval following the dag's schedule.
"""
warnings.warn(
"`DAG.get_run_dates()` is deprecated. Please use `DAG.iter_dagrun_infos_between()` instead.",
category=RemovedInAirflow3Warning,
stacklevel=2,
)
earliest = timezone.coerce_datetime(start_date)
if end_date is None:
latest = pendulum.now(timezone.utc)
else:
latest = timezone.coerce_datetime(end_date)
return [info.logical_date for info in self.iter_dagrun_infos_between(earliest, latest)]

def normalize_schedule(self, dttm):
warnings.warn(
"`DAG.normalize_schedule()` is deprecated.",
category=RemovedInAirflow3Warning,
stacklevel=2,
)
with warnings.catch_warnings():
warnings.simplefilter("ignore", RemovedInAirflow3Warning)
following = self.following_schedule(dttm)
if not following: # in case of @once
return dttm
with warnings.catch_warnings():
warnings.simplefilter("ignore", RemovedInAirflow3Warning)
previous_of_following = self.previous_schedule(following)
if previous_of_following != dttm:
return following
return dttm

@provide_session
def get_last_dagrun(self, session=NEW_SESSION, include_externally_triggered=False):
return get_last_dagrun(
Expand All @@ -1330,43 +1220,6 @@ def dag_id(self) -> str:
def dag_id(self, value: str) -> None:
self._dag_id = value

@property
def full_filepath(self) -> str:
"""
Full file path to the DAG.
:meta private:
"""
warnings.warn(
"DAG.full_filepath is deprecated in favour of fileloc",
RemovedInAirflow3Warning,
stacklevel=2,
)
return self.fileloc

@full_filepath.setter
def full_filepath(self, value) -> None:
warnings.warn(
"DAG.full_filepath is deprecated in favour of fileloc",
RemovedInAirflow3Warning,
stacklevel=2,
)
self.fileloc = value

@property
def concurrency(self) -> int:
# TODO: Remove in Airflow 3.0
warnings.warn(
"The 'DAG.concurrency' attribute is deprecated. Please use 'DAG.max_active_tasks'.",
RemovedInAirflow3Warning,
stacklevel=2,
)
return self._max_active_tasks

@concurrency.setter
def concurrency(self, value: int):
self._max_active_tasks = value

@property
def max_active_tasks(self) -> int:
return self._max_active_tasks
Expand Down Expand Up @@ -1438,20 +1291,6 @@ def tasks_upstream_of_teardowns(self) -> list[Operator]:
def task_group(self) -> TaskGroup:
return self._task_group

@property
def filepath(self) -> str:
"""
Relative file path to the DAG.
:meta private:
"""
warnings.warn(
"filepath is deprecated, use relative_fileloc instead",
RemovedInAirflow3Warning,
stacklevel=2,
)
return str(self.relative_fileloc)

@property
def relative_fileloc(self) -> pathlib.Path:
"""File location of the importable dag 'file' relative to the configured DAGs folder."""
Expand Down Expand Up @@ -1496,16 +1335,6 @@ def get_concurrency_reached(self, session=NEW_SESSION) -> bool:
)
return total_tasks >= self.max_active_tasks

@property
def concurrency_reached(self):
"""Use `airflow.models.DAG.get_concurrency_reached`, this attribute is deprecated."""
warnings.warn(
"This attribute is deprecated. Please use `airflow.models.DAG.get_concurrency_reached` method.",
RemovedInAirflow3Warning,
stacklevel=2,
)
return self.get_concurrency_reached()

@provide_session
def get_is_active(self, session=NEW_SESSION) -> None:
"""Return a boolean indicating whether this DAG is active."""
Expand All @@ -1526,21 +1355,6 @@ def is_paused(self):
)
return self.get_is_paused()

@property
def normalized_schedule_interval(self) -> ScheduleInterval:
warnings.warn(
"DAG.normalized_schedule_interval() is deprecated.",
category=RemovedInAirflow3Warning,
stacklevel=2,
)
if isinstance(self.schedule_interval, str) and self.schedule_interval in cron_presets:
_schedule_interval: ScheduleInterval = cron_presets.get(self.schedule_interval)
elif self.schedule_interval == "@once":
_schedule_interval = None
else:
_schedule_interval = self.schedule_interval
return _schedule_interval

@staticmethod
@internal_api_call
@provide_session
Expand Down Expand Up @@ -1724,16 +1538,6 @@ def get_latest_execution_date(self, session: Session = NEW_SESSION) -> pendulum.
"""Return the latest date for which at least one dag run exists."""
return session.scalar(select(func.max(DagRun.execution_date)).where(DagRun.dag_id == self.dag_id))

@property
def latest_execution_date(self):
"""Use `airflow.models.DAG.get_latest_execution_date`, this attribute is deprecated."""
warnings.warn(
"This attribute is deprecated. Please use `airflow.models.DAG.get_latest_execution_date`.",
RemovedInAirflow3Warning,
stacklevel=2,
)
return self.get_latest_execution_date()

def resolve_template_files(self):
for t in self.tasks:
t.resolve_template_files()
Expand Down Expand Up @@ -2264,28 +2068,6 @@ def nested_topo(group):

return tuple(nested_topo(self.task_group))

@provide_session
def set_dag_runs_state(
self,
state: DagRunState = DagRunState.RUNNING,
session: Session = NEW_SESSION,
start_date: datetime | None = None,
end_date: datetime | None = None,
dag_ids: list[str] | None = None,
) -> None:
warnings.warn(
"This method is deprecated and will be removed in a future version.",
RemovedInAirflow3Warning,
stacklevel=3,
)
dag_ids = dag_ids or [self.dag_id]
query = update(DagRun).where(DagRun.dag_id.in_(dag_ids))
if start_date:
query = query.where(DagRun.execution_date >= start_date)
if end_date:
query = query.where(DagRun.execution_date <= end_date)
session.execute(query.values(state=state).execution_options(synchronize_session="fetch"))

@provide_session
def clear(
self,
Expand Down Expand Up @@ -3055,21 +2837,6 @@ def create_dagrun(
)
return run

@classmethod
@provide_session
def bulk_sync_to_db(
cls,
dags: Collection[DAG],
session=NEW_SESSION,
):
"""Use `airflow.models.DAG.bulk_write_to_db`, this method is deprecated."""
warnings.warn(
"This method is deprecated and will be removed in a future version. Please use bulk_write_to_db",
RemovedInAirflow3Warning,
stacklevel=2,
)
return cls.bulk_write_to_db(dags=dags, session=session)

@classmethod
@provide_session
def bulk_write_to_db(
Expand Down
16 changes: 16 additions & 0 deletions newsfragments/41440.significant.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
Removed unused methods / properties in models/dag.py

Methods removed:
* date_range
* is_fixed_time_schedule
* next_dagrun_after_date
* get_run_dates
* normalize_schedule
* full_filepath
* concurrency
* filepath
* concurrency_reached
* normalized_schedule_interval
* latest_execution_date
* set_dag_runs_state
* bulk_sync_to_db
21 changes: 11 additions & 10 deletions tests/jobs/test_scheduler_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -5284,16 +5284,17 @@ def test_find_zombies(self, load_examples):
self.job_runner._find_zombies()

scheduler_job.executor.callback_sink.send.assert_called_once()
requests = scheduler_job.executor.callback_sink.send.call_args.args
assert 1 == len(requests)
assert requests[0].full_filepath == dag.fileloc
assert requests[0].msg == str(self.job_runner._generate_zombie_message_details(ti))
assert requests[0].is_failure_callback is True
assert isinstance(requests[0].simple_task_instance, SimpleTaskInstance)
assert ti.dag_id == requests[0].simple_task_instance.dag_id
assert ti.task_id == requests[0].simple_task_instance.task_id
assert ti.run_id == requests[0].simple_task_instance.run_id
assert ti.map_index == requests[0].simple_task_instance.map_index
callback_requests = scheduler_job.executor.callback_sink.send.call_args.args
assert len(callback_requests) == 1
callback_request = callback_requests[0]
assert isinstance(callback_request.simple_task_instance, SimpleTaskInstance)
assert callback_request.full_filepath == dag.fileloc
assert callback_request.msg == str(self.job_runner._generate_zombie_message_details(ti))
assert callback_request.is_failure_callback is True
assert callback_request.simple_task_instance.dag_id == ti.dag_id
assert callback_request.simple_task_instance.task_id == ti.task_id
assert callback_request.simple_task_instance.run_id == ti.run_id
assert callback_request.simple_task_instance.map_index == ti.map_index

with create_session() as session:
session.query(TaskInstance).delete()
Expand Down
14 changes: 0 additions & 14 deletions tests/models/test_dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -2588,20 +2588,6 @@ def test_validate_params_on_trigger_dag(self):
data_interval=(TEST_DATE, TEST_DATE),
)

def test_return_date_range_with_num_method(self):
start_date = TEST_DATE
delta = timedelta(days=1)

dag = DAG("dummy-dag", schedule=delta, start_date=start_date)
with pytest.warns(RemovedInAirflow3Warning, match=r"`DAG.date_range\(\)` is deprecated."):
dag_dates = dag.date_range(start_date=start_date, num=3)

assert dag_dates == [
start_date,
start_date + delta,
start_date + 2 * delta,
]

def test_dag_owner_links(self):
dag = DAG(
"dag",
Expand Down

0 comments on commit 1fcbde2

Please sign in to comment.