diff --git a/airflow/models/dag.py b/airflow/models/dag.py index 7b762aa18dbf..50e2222bd018 100644 --- a/airflow/models/dag.py +++ b/airflow/models/dag.py @@ -131,7 +131,6 @@ from airflow.timetables.trigger import CronTriggerTimetable from airflow.utils import timezone from airflow.utils.dag_cycle_tester import check_cycle -from airflow.utils.dates import cron_presets, date_range as utils_date_range from airflow.utils.decorators import fixup_decorator_warning_stack from airflow.utils.helpers import at_most_one, exactly_one, validate_instance_args, validate_key from airflow.utils.log.logging_mixin import LoggingMixin @@ -944,60 +943,6 @@ def update_old_perm(permission: str): return updated_access_control - def date_range( - self, - start_date: pendulum.DateTime, - num: int | None = None, - end_date: datetime | None = None, - ) -> list[datetime]: - message = "`DAG.date_range()` is deprecated." - if num is not None: - warnings.warn(message, category=RemovedInAirflow3Warning, stacklevel=2) - with warnings.catch_warnings(): - warnings.simplefilter("ignore", RemovedInAirflow3Warning) - return utils_date_range( - start_date=start_date, num=num, delta=self.normalized_schedule_interval - ) - message += " Please use `DAG.iter_dagrun_infos_between(..., align=False)` instead." - warnings.warn(message, category=RemovedInAirflow3Warning, stacklevel=2) - if end_date is None: - coerced_end_date = timezone.utcnow() - else: - coerced_end_date = end_date - it = self.iter_dagrun_infos_between(start_date, pendulum.instance(coerced_end_date), align=False) - return [info.logical_date for info in it] - - def is_fixed_time_schedule(self): - """ - Figures out if the schedule has a fixed time (e.g. 3 AM every day). - - Detection is done by "peeking" the next two cron trigger time; if the - two times have the same minute and hour value, the schedule is fixed, - and we *don't* need to perform the DST fix. - - This assumes DST happens on whole minute changes (e.g. 12:59 -> 12:00). - - Do not try to understand what this actually means. It is old logic that - should not be used anywhere. - """ - warnings.warn( - "`DAG.is_fixed_time_schedule()` is deprecated.", - category=RemovedInAirflow3Warning, - stacklevel=2, - ) - - from airflow.timetables._cron import CronMixin - - if not isinstance(self.timetable, CronMixin): - return True - - from croniter import croniter - - cron = croniter(self.timetable._expression) - next_a = cron.get_next(datetime) - next_b = cron.get_next(datetime) - return next_b.minute == next_a.minute and next_b.hour == next_a.hour - def following_schedule(self, dttm): """ Calculate the following schedule for this dag in UTC. @@ -1162,21 +1107,6 @@ def next_dagrun_info( info = None return info - def next_dagrun_after_date(self, date_last_automated_dagrun: pendulum.DateTime | None): - warnings.warn( - "`DAG.next_dagrun_after_date()` is deprecated. Please use `DAG.next_dagrun_info()` instead.", - category=RemovedInAirflow3Warning, - stacklevel=2, - ) - if date_last_automated_dagrun is None: - data_interval = None - else: - data_interval = self.infer_automated_data_interval(date_last_automated_dagrun) - info = self.next_dagrun_info(data_interval) - if info is None: - return None - return info.run_after - @functools.cached_property def _time_restriction(self) -> TimeRestriction: start_dates = [t.start_date for t in self.tasks if t.start_date] @@ -1267,46 +1197,6 @@ def iter_dagrun_infos_between( ) break - def get_run_dates(self, start_date, end_date=None) -> list: - """ - Return a list of dates between the interval received as parameter using this dag's schedule interval. - - Returned dates can be used for execution dates. - - :param start_date: The start date of the interval. - :param end_date: The end date of the interval. Defaults to ``timezone.utcnow()``. - :return: A list of dates within the interval following the dag's schedule. - """ - warnings.warn( - "`DAG.get_run_dates()` is deprecated. Please use `DAG.iter_dagrun_infos_between()` instead.", - category=RemovedInAirflow3Warning, - stacklevel=2, - ) - earliest = timezone.coerce_datetime(start_date) - if end_date is None: - latest = pendulum.now(timezone.utc) - else: - latest = timezone.coerce_datetime(end_date) - return [info.logical_date for info in self.iter_dagrun_infos_between(earliest, latest)] - - def normalize_schedule(self, dttm): - warnings.warn( - "`DAG.normalize_schedule()` is deprecated.", - category=RemovedInAirflow3Warning, - stacklevel=2, - ) - with warnings.catch_warnings(): - warnings.simplefilter("ignore", RemovedInAirflow3Warning) - following = self.following_schedule(dttm) - if not following: # in case of @once - return dttm - with warnings.catch_warnings(): - warnings.simplefilter("ignore", RemovedInAirflow3Warning) - previous_of_following = self.previous_schedule(following) - if previous_of_following != dttm: - return following - return dttm - @provide_session def get_last_dagrun(self, session=NEW_SESSION, include_externally_triggered=False): return get_last_dagrun( @@ -1330,43 +1220,6 @@ def dag_id(self) -> str: def dag_id(self, value: str) -> None: self._dag_id = value - @property - def full_filepath(self) -> str: - """ - Full file path to the DAG. - - :meta private: - """ - warnings.warn( - "DAG.full_filepath is deprecated in favour of fileloc", - RemovedInAirflow3Warning, - stacklevel=2, - ) - return self.fileloc - - @full_filepath.setter - def full_filepath(self, value) -> None: - warnings.warn( - "DAG.full_filepath is deprecated in favour of fileloc", - RemovedInAirflow3Warning, - stacklevel=2, - ) - self.fileloc = value - - @property - def concurrency(self) -> int: - # TODO: Remove in Airflow 3.0 - warnings.warn( - "The 'DAG.concurrency' attribute is deprecated. Please use 'DAG.max_active_tasks'.", - RemovedInAirflow3Warning, - stacklevel=2, - ) - return self._max_active_tasks - - @concurrency.setter - def concurrency(self, value: int): - self._max_active_tasks = value - @property def max_active_tasks(self) -> int: return self._max_active_tasks @@ -1438,20 +1291,6 @@ def tasks_upstream_of_teardowns(self) -> list[Operator]: def task_group(self) -> TaskGroup: return self._task_group - @property - def filepath(self) -> str: - """ - Relative file path to the DAG. - - :meta private: - """ - warnings.warn( - "filepath is deprecated, use relative_fileloc instead", - RemovedInAirflow3Warning, - stacklevel=2, - ) - return str(self.relative_fileloc) - @property def relative_fileloc(self) -> pathlib.Path: """File location of the importable dag 'file' relative to the configured DAGs folder.""" @@ -1496,16 +1335,6 @@ def get_concurrency_reached(self, session=NEW_SESSION) -> bool: ) return total_tasks >= self.max_active_tasks - @property - def concurrency_reached(self): - """Use `airflow.models.DAG.get_concurrency_reached`, this attribute is deprecated.""" - warnings.warn( - "This attribute is deprecated. Please use `airflow.models.DAG.get_concurrency_reached` method.", - RemovedInAirflow3Warning, - stacklevel=2, - ) - return self.get_concurrency_reached() - @provide_session def get_is_active(self, session=NEW_SESSION) -> None: """Return a boolean indicating whether this DAG is active.""" @@ -1526,21 +1355,6 @@ def is_paused(self): ) return self.get_is_paused() - @property - def normalized_schedule_interval(self) -> ScheduleInterval: - warnings.warn( - "DAG.normalized_schedule_interval() is deprecated.", - category=RemovedInAirflow3Warning, - stacklevel=2, - ) - if isinstance(self.schedule_interval, str) and self.schedule_interval in cron_presets: - _schedule_interval: ScheduleInterval = cron_presets.get(self.schedule_interval) - elif self.schedule_interval == "@once": - _schedule_interval = None - else: - _schedule_interval = self.schedule_interval - return _schedule_interval - @staticmethod @internal_api_call @provide_session @@ -1724,16 +1538,6 @@ def get_latest_execution_date(self, session: Session = NEW_SESSION) -> pendulum. """Return the latest date for which at least one dag run exists.""" return session.scalar(select(func.max(DagRun.execution_date)).where(DagRun.dag_id == self.dag_id)) - @property - def latest_execution_date(self): - """Use `airflow.models.DAG.get_latest_execution_date`, this attribute is deprecated.""" - warnings.warn( - "This attribute is deprecated. Please use `airflow.models.DAG.get_latest_execution_date`.", - RemovedInAirflow3Warning, - stacklevel=2, - ) - return self.get_latest_execution_date() - def resolve_template_files(self): for t in self.tasks: t.resolve_template_files() @@ -2264,28 +2068,6 @@ def nested_topo(group): return tuple(nested_topo(self.task_group)) - @provide_session - def set_dag_runs_state( - self, - state: DagRunState = DagRunState.RUNNING, - session: Session = NEW_SESSION, - start_date: datetime | None = None, - end_date: datetime | None = None, - dag_ids: list[str] | None = None, - ) -> None: - warnings.warn( - "This method is deprecated and will be removed in a future version.", - RemovedInAirflow3Warning, - stacklevel=3, - ) - dag_ids = dag_ids or [self.dag_id] - query = update(DagRun).where(DagRun.dag_id.in_(dag_ids)) - if start_date: - query = query.where(DagRun.execution_date >= start_date) - if end_date: - query = query.where(DagRun.execution_date <= end_date) - session.execute(query.values(state=state).execution_options(synchronize_session="fetch")) - @provide_session def clear( self, @@ -3055,21 +2837,6 @@ def create_dagrun( ) return run - @classmethod - @provide_session - def bulk_sync_to_db( - cls, - dags: Collection[DAG], - session=NEW_SESSION, - ): - """Use `airflow.models.DAG.bulk_write_to_db`, this method is deprecated.""" - warnings.warn( - "This method is deprecated and will be removed in a future version. Please use bulk_write_to_db", - RemovedInAirflow3Warning, - stacklevel=2, - ) - return cls.bulk_write_to_db(dags=dags, session=session) - @classmethod @provide_session def bulk_write_to_db( diff --git a/newsfragments/41440.significant.rst b/newsfragments/41440.significant.rst new file mode 100644 index 000000000000..4f819bb4d8f9 --- /dev/null +++ b/newsfragments/41440.significant.rst @@ -0,0 +1,16 @@ +Removed unused methods / properties in models/dag.py + +Methods removed: + * date_range + * is_fixed_time_schedule + * next_dagrun_after_date + * get_run_dates + * normalize_schedule + * full_filepath + * concurrency + * filepath + * concurrency_reached + * normalized_schedule_interval + * latest_execution_date + * set_dag_runs_state + * bulk_sync_to_db diff --git a/tests/jobs/test_scheduler_job.py b/tests/jobs/test_scheduler_job.py index 8fdbf4826db7..8475e6c4cbb9 100644 --- a/tests/jobs/test_scheduler_job.py +++ b/tests/jobs/test_scheduler_job.py @@ -5284,16 +5284,17 @@ def test_find_zombies(self, load_examples): self.job_runner._find_zombies() scheduler_job.executor.callback_sink.send.assert_called_once() - requests = scheduler_job.executor.callback_sink.send.call_args.args - assert 1 == len(requests) - assert requests[0].full_filepath == dag.fileloc - assert requests[0].msg == str(self.job_runner._generate_zombie_message_details(ti)) - assert requests[0].is_failure_callback is True - assert isinstance(requests[0].simple_task_instance, SimpleTaskInstance) - assert ti.dag_id == requests[0].simple_task_instance.dag_id - assert ti.task_id == requests[0].simple_task_instance.task_id - assert ti.run_id == requests[0].simple_task_instance.run_id - assert ti.map_index == requests[0].simple_task_instance.map_index + callback_requests = scheduler_job.executor.callback_sink.send.call_args.args + assert len(callback_requests) == 1 + callback_request = callback_requests[0] + assert isinstance(callback_request.simple_task_instance, SimpleTaskInstance) + assert callback_request.full_filepath == dag.fileloc + assert callback_request.msg == str(self.job_runner._generate_zombie_message_details(ti)) + assert callback_request.is_failure_callback is True + assert callback_request.simple_task_instance.dag_id == ti.dag_id + assert callback_request.simple_task_instance.task_id == ti.task_id + assert callback_request.simple_task_instance.run_id == ti.run_id + assert callback_request.simple_task_instance.map_index == ti.map_index with create_session() as session: session.query(TaskInstance).delete() diff --git a/tests/models/test_dag.py b/tests/models/test_dag.py index 2b3961f1e6c6..95ec87d72bf1 100644 --- a/tests/models/test_dag.py +++ b/tests/models/test_dag.py @@ -2588,20 +2588,6 @@ def test_validate_params_on_trigger_dag(self): data_interval=(TEST_DATE, TEST_DATE), ) - def test_return_date_range_with_num_method(self): - start_date = TEST_DATE - delta = timedelta(days=1) - - dag = DAG("dummy-dag", schedule=delta, start_date=start_date) - with pytest.warns(RemovedInAirflow3Warning, match=r"`DAG.date_range\(\)` is deprecated."): - dag_dates = dag.date_range(start_date=start_date, num=3) - - assert dag_dates == [ - start_date, - start_date + delta, - start_date + 2 * delta, - ] - def test_dag_owner_links(self): dag = DAG( "dag",