diff --git a/airflow/cli/commands/remote_commands/task_command.py b/airflow/cli/commands/remote_commands/task_command.py index 28ecb9d7bd798..681b478bfc1a6 100644 --- a/airflow/cli/commands/remote_commands/task_command.py +++ b/airflow/cli/commands/remote_commands/task_command.py @@ -173,9 +173,11 @@ def _get_dag_run( dag_run = DagRun( dag_id=dag.dag_id, run_id=logical_date_or_run_id, + run_type=DagRunType.MANUAL, logical_date=dag_run_logical_date, data_interval=dag.timetable.infer_manual_data_interval(run_after=dag_run_logical_date), triggered_by=DagRunTriggeredByType.CLI, + state=DagRunState.RUNNING, ) return dag_run, True elif create_if_necessary == "db": @@ -186,7 +188,7 @@ def _get_dag_run( run_type=DagRunType.MANUAL, triggered_by=DagRunTriggeredByType.CLI, dag_version=None, - state=DagRunState.QUEUED, + state=DagRunState.RUNNING, session=session, ) return dag_run, True diff --git a/airflow/models/baseoperator.py b/airflow/models/baseoperator.py index b4931ac16d324..e84994c6d04e7 100644 --- a/airflow/models/baseoperator.py +++ b/airflow/models/baseoperator.py @@ -85,6 +85,7 @@ from airflow.utils.operator_helpers import ExecutionCallableRunner from airflow.utils.operator_resources import Resources from airflow.utils.session import NEW_SESSION, provide_session +from airflow.utils.state import DagRunState from airflow.utils.types import DagRunTriggeredByType from airflow.utils.xcom import XCOM_RETURN_KEY @@ -632,6 +633,7 @@ def run( logical_date=info.logical_date, data_interval=info.data_interval, triggered_by=DagRunTriggeredByType.TEST, + state=DagRunState.RUNNING, ) ti = TaskInstance(self, run_id=dr.run_id) ti.dag_run = dr diff --git a/airflow/models/taskinstance.py b/airflow/models/taskinstance.py index 9a3a85b8253e8..c0f6d20703b61 100644 --- a/airflow/models/taskinstance.py +++ b/airflow/models/taskinstance.py @@ -44,7 +44,6 @@ from jinja2 import TemplateAssertionError, UndefinedError from sqlalchemy import ( Column, - DateTime, Float, ForeignKey, ForeignKeyConstraint, @@ -162,7 +161,6 @@ from airflow.sdk.definitions._internal.abstractoperator import Operator from airflow.sdk.definitions.dag import DAG from airflow.sdk.types import OutletEventAccessorsProtocol, RuntimeTaskInstanceProtocol - from airflow.timetables.base import DataInterval from airflow.typing_compat import Literal, TypeGuard from airflow.utils.task_group import TaskGroup @@ -928,6 +926,11 @@ def _get_template_context( from airflow import macros from airflow.models.abstractoperator import NotMapped from airflow.models.baseoperator import BaseOperator + from airflow.sdk.api.datamodels._generated import ( + DagRun as DagRunSDK, + PrevSuccessfulDagRunResponse, + TIRunContext, + ) integrate_macros_plugins() @@ -938,50 +941,34 @@ def _get_template_context( assert task.dag dag_run = task_instance.get_dagrun(session) - data_interval = dag.get_run_data_interval(dag_run) - validated_params = process_params(dag, task, dag_run.conf, suppress_exception=ignore_param_exceptions) - logical_date: DateTime = timezone.coerce_datetime(task_instance.logical_date) - ds = logical_date.strftime("%Y-%m-%d") - ds_nodash = ds.replace("-", "") - ts = logical_date.isoformat() - ts_nodash = logical_date.strftime("%Y%m%dT%H%M%S") - ts_nodash_with_tz = ts.replace("-", "").replace(":", "") + ti_context_from_server = TIRunContext( + dag_run=DagRunSDK.model_validate(dag_run, from_attributes=True), + max_tries=task_instance.max_tries, + ) + runtime_ti = task_instance.to_runtime_ti(context_from_server=ti_context_from_server) + + context: Context = runtime_ti.get_template_context() @cache # Prevent multiple database access. - def _get_previous_dagrun_success() -> DagRun | None: - return task_instance.get_previous_dagrun(state=DagRunState.SUCCESS, session=session) - - def _get_previous_dagrun_data_interval_success() -> DataInterval | None: - dagrun = _get_previous_dagrun_success() - if dagrun is None: - return None - return dag.get_run_data_interval(dagrun) + def _get_previous_dagrun_success() -> PrevSuccessfulDagRunResponse: + dr_from_db = task_instance.get_previous_dagrun(state=DagRunState.SUCCESS, session=session) + if dr_from_db: + return PrevSuccessfulDagRunResponse.model_validate(dr_from_db, from_attributes=True) + return PrevSuccessfulDagRunResponse() def get_prev_data_interval_start_success() -> pendulum.DateTime | None: - data_interval = _get_previous_dagrun_data_interval_success() - if data_interval is None: - return None - return data_interval.start + return timezone.coerce_datetime(_get_previous_dagrun_success().data_interval_start) def get_prev_data_interval_end_success() -> pendulum.DateTime | None: - data_interval = _get_previous_dagrun_data_interval_success() - if data_interval is None: - return None - return data_interval.end + return timezone.coerce_datetime(_get_previous_dagrun_success().data_interval_end) def get_prev_start_date_success() -> pendulum.DateTime | None: - dagrun = _get_previous_dagrun_success() - if dagrun is None: - return None - return timezone.coerce_datetime(dagrun.start_date) + return timezone.coerce_datetime(_get_previous_dagrun_success().start_date) def get_prev_end_date_success() -> pendulum.DateTime | None: - dagrun = _get_previous_dagrun_success() - if dagrun is None: - return None - return timezone.coerce_datetime(dagrun.end_date) + return timezone.coerce_datetime(_get_previous_dagrun_success().end_date) def get_triggering_events() -> dict[str, list[AssetEvent]]: if TYPE_CHECKING: @@ -1005,41 +992,29 @@ def get_triggering_events() -> dict[str, list[AssetEvent]]: # * Context in task_sdk/src/airflow/sdk/definitions/context.py # * KNOWN_CONTEXT_KEYS in airflow/utils/context.py # * Table in docs/apache-airflow/templates-ref.rst - context: Context = { - "dag": dag, - "dag_run": dag_run, - "data_interval_end": timezone.coerce_datetime(data_interval.end), - "data_interval_start": timezone.coerce_datetime(data_interval.start), - "outlet_events": OutletEventAccessors(), - "ds": ds, - "ds_nodash": ds_nodash, - "inlets": task.inlets, - "inlet_events": InletEventsAccessors(task.inlets, session=session), - "logical_date": logical_date, - "macros": macros, - "map_index_template": task.map_index_template, - "outlets": task.outlets, - "params": validated_params, - "prev_data_interval_start_success": get_prev_data_interval_start_success(), - "prev_data_interval_end_success": get_prev_data_interval_end_success(), - "prev_start_date_success": get_prev_start_date_success(), - "prev_end_date_success": get_prev_end_date_success(), - "run_id": task_instance.run_id, - "task": task, # type: ignore[typeddict-item] - "task_instance": task_instance, - "task_instance_key_str": f"{task.dag_id}__{task.task_id}__{ds_nodash}", - "test_mode": task_instance.test_mode, - "ti": task_instance, - "triggering_asset_events": lazy_object_proxy.Proxy(get_triggering_events), - "ts": ts, - "ts_nodash": ts_nodash, - "ts_nodash_with_tz": ts_nodash_with_tz, - "var": { - "json": VariableAccessor(deserialize_json=True), - "value": VariableAccessor(deserialize_json=False), - }, - "conn": ConnectionAccessor(), - } + + context.update( + { + "outlet_events": OutletEventAccessors(), + "inlet_events": InletEventsAccessors(task.inlets, session=session), + "macros": macros, + "params": validated_params, + "prev_data_interval_start_success": get_prev_data_interval_start_success(), + "prev_data_interval_end_success": get_prev_data_interval_end_success(), + "prev_start_date_success": get_prev_start_date_success(), + "prev_end_date_success": get_prev_end_date_success(), + "test_mode": task_instance.test_mode, + # ti/task_instance are added here for ti.xcom_{push,pull} + "task_instance": task_instance, + "ti": task_instance, + "triggering_asset_events": lazy_object_proxy.Proxy(get_triggering_events), + "var": { + "json": VariableAccessor(deserialize_json=True), + "value": VariableAccessor(deserialize_json=False), + }, + "conn": ConnectionAccessor(), + } + ) try: expanded_ti_count: int | None = BaseOperator.get_mapped_ti_count( @@ -1058,8 +1033,6 @@ def get_triggering_events() -> dict[str, list[AssetEvent]]: except NotMapped: pass - # Mypy doesn't like turning existing dicts in to a TypeDict -- and we "lie" in the type stub to say it - # is one, but in practice it isn't. See https://github.com/python/mypy/issues/8890 return context @@ -1902,6 +1875,24 @@ def from_runtime_ti(cls, runtime_ti: RuntimeTaskInstanceProtocol) -> TaskInstanc assert isinstance(ti, TaskInstance) return ti + def to_runtime_ti(self, context_from_server) -> RuntimeTaskInstanceProtocol: + from airflow.sdk.execution_time.task_runner import RuntimeTaskInstance + + runtime_ti = RuntimeTaskInstance.model_construct( + id=self.id, + task_id=self.task_id, + dag_id=self.dag_id, + run_id=self.run_id, + try_numer=self.try_number, + map_index=self.map_index, + task=self.task, + max_tries=self.max_tries, + hostname=self.hostname, + _ti_context_from_server=context_from_server, + ) + + return runtime_ti + @staticmethod def _command_as_list( ti: TaskInstance, diff --git a/airflow/utils/context.py b/airflow/utils/context.py index 74479a328a1a2..6ed1399fe63f6 100644 --- a/airflow/utils/context.py +++ b/airflow/utils/context.py @@ -52,7 +52,11 @@ AssetUriRef, ) from airflow.sdk.definitions.context import Context -from airflow.sdk.execution_time.context import OutletEventAccessors as OutletEventAccessorsSDK +from airflow.sdk.execution_time.context import ( + ConnectionAccessor as ConnectionAccessorSDK, + OutletEventAccessors as OutletEventAccessorsSDK, + VariableAccessor as VariableAccessorSDK, +) from airflow.utils.db import LazySelectSequence from airflow.utils.session import create_session from airflow.utils.types import NOTSET @@ -107,21 +111,13 @@ } -class VariableAccessor: +class VariableAccessor(VariableAccessorSDK): """Wrapper to access Variable values in template.""" - def __init__(self, *, deserialize_json: bool) -> None: - self._deserialize_json = deserialize_json - self.var: Any = None - def __getattr__(self, key: str) -> Any: from airflow.models.variable import Variable - self.var = Variable.get(key, deserialize_json=self._deserialize_json) - return self.var - - def __repr__(self) -> str: - return str(self.var) + return Variable.get(key, deserialize_json=self._deserialize_json) def get(self, key, default: Any = NOTSET) -> Any: from airflow.models.variable import Variable @@ -131,27 +127,20 @@ def get(self, key, default: Any = NOTSET) -> Any: return Variable.get(key, default, deserialize_json=self._deserialize_json) -class ConnectionAccessor: +class ConnectionAccessor(ConnectionAccessorSDK): """Wrapper to access Connection entries in template.""" - def __init__(self) -> None: - self.var: Any = None - - def __getattr__(self, key: str) -> Any: + def __getattr__(self, conn_id: str) -> Any: from airflow.models.connection import Connection - self.var = Connection.get_connection_from_secrets(key) - return self.var - - def __repr__(self) -> str: - return str(self.var) + return Connection.get_connection_from_secrets(conn_id) - def get(self, key: str, default_conn: Any = None) -> Any: + def get(self, conn_id: str, default_conn: Any = None) -> Any: from airflow.exceptions import AirflowNotFoundException from airflow.models.connection import Connection try: - return Connection.get_connection_from_secrets(key) + return Connection.get_connection_from_secrets(conn_id) except AirflowNotFoundException: return default_conn diff --git a/scripts/ci/pre_commit/template_context_key_sync.py b/scripts/ci/pre_commit/template_context_key_sync.py index 501801be55d97..33c694f853a89 100755 --- a/scripts/ci/pre_commit/template_context_key_sync.py +++ b/scripts/ci/pre_commit/template_context_key_sync.py @@ -27,32 +27,65 @@ ROOT_DIR = pathlib.Path(__file__).resolve().parents[3] -TASKINSTANCE_PY = ROOT_DIR.joinpath("airflow", "models", "taskinstance.py") +TASKRUNNER_PY = ROOT_DIR.joinpath("task_sdk", "src", "airflow", "sdk", "execution_time", "task_runner.py") CONTEXT_PY = ROOT_DIR.joinpath("airflow", "utils", "context.py") CONTEXT_HINT = ROOT_DIR.joinpath("task_sdk", "src", "airflow", "sdk", "definitions", "context.py") TEMPLATES_REF_RST = ROOT_DIR.joinpath("docs", "apache-airflow", "templates-ref.rst") def _iter_template_context_keys_from_original_return() -> typing.Iterator[str]: - ti_mod = ast.parse(TASKINSTANCE_PY.read_text("utf-8"), str(TASKINSTANCE_PY)) - fn_get_template_context = next( + ti_mod = ast.parse(TASKRUNNER_PY.read_text("utf-8"), str(TASKRUNNER_PY)) + + # Locate the RuntimeTaskInstance class definition + runtime_task_instance_class = next( node for node in ast.iter_child_nodes(ti_mod) - if isinstance(node, ast.FunctionDef) and node.name == "_get_template_context" + if isinstance(node, ast.ClassDef) and node.name == "RuntimeTaskInstance" ) - st_context_value = next( - stmt.value + + # Locate the get_template_context method in RuntimeTaskInstance + fn_get_template_context = next( + node + for node in ast.iter_child_nodes(runtime_task_instance_class) + if isinstance(node, ast.FunctionDef) and node.name == "get_template_context" + ) + + # Helper function to extract keys from a dictionary node + def extract_keys_from_dict(node: ast.Dict) -> typing.Iterator[str]: + for key in node.keys: + if not isinstance(key, ast.Constant) or not isinstance(key.value, str): + raise ValueError("Key in dictionary is not a string literal") + yield key.value + + # Extract keys from the main `context` dictionary assignment + context_assignment = next( + stmt for stmt in fn_get_template_context.body if isinstance(stmt, ast.AnnAssign) and isinstance(stmt.target, ast.Name) and stmt.target.id == "context" ) - if not isinstance(st_context_value, ast.Dict): - raise ValueError("'context' is not assigned a dict literal") - for expr in st_context_value.keys: - if not isinstance(expr, ast.Constant) or not isinstance(expr.value, str): - raise ValueError("key in 'context' dict is not a str literal") - yield expr.value + + if not isinstance(context_assignment.value, ast.Dict): + raise ValueError("'context' is not assigned a dictionary literal") + yield from extract_keys_from_dict(context_assignment.value) + + # Handle keys added conditionally in `if self._ti_context_from_server` + for stmt in fn_get_template_context.body: + if ( + isinstance(stmt, ast.If) + and isinstance(stmt.test, ast.Attribute) + and stmt.test.attr == "_ti_context_from_server" + ): + for sub_stmt in stmt.body: + # Get keys from `context_from_server` assignment + if ( + isinstance(sub_stmt, ast.AnnAssign) + and isinstance(sub_stmt.target, ast.Name) + and isinstance(sub_stmt.value, ast.Dict) + and sub_stmt.target.id == "context_from_server" + ): + yield from extract_keys_from_dict(sub_stmt.value) def _iter_template_context_keys_from_declaration() -> typing.Iterator[str]: @@ -105,6 +138,12 @@ def _compare_keys(retn_keys: set[str], decl_keys: set[str], hint_keys: set[str], # Compat shim for task-sdk, not actually designed for user use retn_keys.add("expanded_ti_count") + # TODO: These are the keys that are yet to be ported over to the Task SDK. + retn_keys.add("inlet_events") + retn_keys.add("params") + retn_keys.add("test_mode") + retn_keys.add("triggering_asset_events") + # Only present in callbacks. Not listed in templates-ref (that doc is for task execution). retn_keys.update(("exception", "reason", "try_number")) docs_keys.update(("exception", "reason", "try_number")) diff --git a/task_sdk/src/airflow/sdk/types.py b/task_sdk/src/airflow/sdk/types.py index f9ec150ce3ae2..1a886b45ed1c8 100644 --- a/task_sdk/src/airflow/sdk/types.py +++ b/task_sdk/src/airflow/sdk/types.py @@ -25,6 +25,7 @@ from airflow.sdk.definitions.asset import Asset, AssetAlias, AssetAliasEvent, BaseAssetUniqueKey from airflow.sdk.definitions.baseoperator import BaseOperator + from airflow.sdk.definitions.context import Context from airflow.sdk.definitions.mappedoperator import MappedOperator Operator = Union[BaseOperator, MappedOperator] @@ -71,6 +72,8 @@ def xcom_pull( def xcom_push(self, key: str, value: Any) -> None: ... + def get_template_context(self) -> Context: ... + class OutletEventAccessorProtocol(Protocol): """Protocol for managing access to a specific outlet event accessor.""" diff --git a/tests/cli/commands/remote_commands/test_task_command.py b/tests/cli/commands/remote_commands/test_task_command.py index 06a477e283168..2d6f699c1e788 100644 --- a/tests/cli/commands/remote_commands/test_task_command.py +++ b/tests/cli/commands/remote_commands/test_task_command.py @@ -113,7 +113,7 @@ def setup_class(cls): else {} ) cls.dag_run = cls.dag.create_dagrun( - state=State.NONE, + state=State.RUNNING, run_id=cls.run_id, run_type=DagRunType.MANUAL, logical_date=DEFAULT_DATE, @@ -184,7 +184,7 @@ def test_cli_test_different_path(self, session, tmp_path): else {} ) dag.create_dagrun( - state=State.NONE, + state=State.RUNNING, run_id="abc123", run_type=DagRunType.MANUAL, logical_date=logical_date, diff --git a/tests/models/test_dagrun.py b/tests/models/test_dagrun.py index 96c06d279bc3b..b447ec0fc9999 100644 --- a/tests/models/test_dagrun.py +++ b/tests/models/test_dagrun.py @@ -134,10 +134,10 @@ def create_dag_run( return dag_run - @pytest.mark.parametrize("state", [DagRunState.QUEUED, DagRunState.RUNNING]) - def test_clear_task_instances_for_backfill_unfinished_dagrun(self, state, session): + def test_clear_task_instances_for_backfill_running_dagrun(self, session): now = timezone.utcnow() - dag_id = "test_clear_task_instances_for_backfill_dagrun" + state = DagRunState.RUNNING + dag_id = "test_clear_task_instances_for_backfill_running_dagrun" dag = DAG(dag_id=dag_id, schedule=datetime.timedelta(days=1), start_date=now) dag_run = self.create_dag_run(dag, logical_date=now, is_backfill=True, state=state, session=session) @@ -156,7 +156,7 @@ def test_clear_task_instances_for_backfill_unfinished_dagrun(self, state, sessio @pytest.mark.parametrize("state", [DagRunState.SUCCESS, DagRunState.FAILED]) def test_clear_task_instances_for_backfill_finished_dagrun(self, state, session): now = timezone.utcnow() - dag_id = "test_clear_task_instances_for_backfill_dagrun" + dag_id = "test_clear_task_instances_for_backfill_finished_dagrun" dag = DAG(dag_id=dag_id, schedule=datetime.timedelta(days=1), start_date=now) dag_run = self.create_dag_run(dag, logical_date=now, is_backfill=True, state=state, session=session) diff --git a/tests/models/test_taskinstance.py b/tests/models/test_taskinstance.py index 6494236a6cc20..8ca00e2f8d650 100644 --- a/tests/models/test_taskinstance.py +++ b/tests/models/test_taskinstance.py @@ -2301,7 +2301,7 @@ def test_outlet_assets(self, create_task_instance, testing_dag_bundle): session.flush() run_id = str(uuid4()) - dr = DagRun(dag1.dag_id, run_id=run_id, run_type="anything") + dr = DagRun(dag1.dag_id, run_id=run_id, run_type="manual", state=DagRunState.RUNNING) session.merge(dr) task = dag1.get_task("producing_task_1") task.bash_command = "echo 1" # make it go faster @@ -2360,7 +2360,7 @@ def test_outlet_assets_failed(self, create_task_instance, testing_dag_bundle): dagbag.collect_dags(only_if_updated=False, safe_mode=False) dagbag.sync_to_db("testing", None, session=session) run_id = str(uuid4()) - dr = DagRun(dag_with_fail_task.dag_id, run_id=run_id, run_type="anything") + dr = DagRun(dag_with_fail_task.dag_id, run_id=run_id, run_type="manual", state=DagRunState.RUNNING) session.merge(dr) task = dag_with_fail_task.get_task("fail_task") ti = TaskInstance(task, run_id=run_id) @@ -2419,7 +2419,7 @@ def test_outlet_assets_skipped(self, testing_dag_bundle): session.flush() run_id = str(uuid4()) - dr = DagRun(dag_with_skip_task.dag_id, run_id=run_id, run_type="anything") + dr = DagRun(dag_with_skip_task.dag_id, run_id=run_id, run_type="manual", state=DagRunState.RUNNING) session.merge(dr) task = dag_with_skip_task.get_task("skip_task") ti = TaskInstance(task, run_id=run_id) @@ -3262,7 +3262,7 @@ def test_context_triggering_asset_events(self, create_dummy_dag, session): run_id="test2", run_type=DagRunType.ASSET_TRIGGERED, logical_date=logical_date, - state=None, + state=DagRunState.RUNNING, session=session, data_interval=(logical_date, logical_date), **triggered_by_kwargs, @@ -3522,11 +3522,13 @@ def test_handle_failure(self, create_dummy_dag, session=None): run_id="test2", run_type=DagRunType.MANUAL, logical_date=logical_date, - state=None, + state=DagRunState.RUNNING, + start_date=logical_date - datetime.timedelta(hours=1), session=session, data_interval=(logical_date, logical_date), **triggered_by_kwargs, ) + dr.set_state(DagRunState.FAILED) ti1 = dr.get_task_instance(task1.task_id, session=session) ti1.task = task1 @@ -3669,11 +3671,12 @@ def test_handle_failure_fail_fast(self, create_dummy_dag, session=None): run_id="test_ff", run_type=DagRunType.MANUAL, logical_date=logical_date, - state=None, + state=DagRunState.RUNNING, session=session, data_interval=(logical_date, logical_date), **triggered_by_kwargs, ) + dr.set_state(DagRunState.SUCCESS) ti1 = dr.get_task_instance(task1.task_id, session=session) ti1.task = task1 diff --git a/tests/sensors/test_external_task_sensor.py b/tests/sensors/test_external_task_sensor.py index d70c63263b91a..9d1d519ac4a24 100644 --- a/tests/sensors/test_external_task_sensor.py +++ b/tests/sensors/test_external_task_sensor.py @@ -1595,6 +1595,7 @@ def test_clear_overlapping_external_task_marker(dag_bag_head_tail, session): logical_date = DEFAULT_DATE + timedelta(days=delta) dagrun = DagRun( dag_id=dag.dag_id, + start_date=logical_date, state=DagRunState.SUCCESS, logical_date=logical_date, run_type=DagRunType.MANUAL, @@ -1607,10 +1608,31 @@ def test_clear_overlapping_external_task_marker(dag_bag_head_tail, session): ti.state = TaskInstanceState.SUCCESS session.flush() - # The next two lines are doing the same thing. Clearing the first "head" with "Future" - # selected is the same as not selecting "Future". They should take similar amount of - # time too because dag.clear() uses visited_external_tis to keep track of visited ExternalTaskMarker. assert dag.clear(start_date=DEFAULT_DATE, dag_bag=dag_bag_head_tail, session=session) == 30 + + +@provide_session +def test_clear_overlapping_external_task_marker_with_end_date(dag_bag_head_tail, session): + dag: DAG = dag_bag_head_tail.get_dag("head_tail") + + # "Run" 10 times. + for delta in range(10): + logical_date = DEFAULT_DATE + timedelta(days=delta) + dagrun = DagRun( + dag_id=dag.dag_id, + start_date=logical_date, + state=DagRunState.SUCCESS, + logical_date=logical_date, + run_type=DagRunType.MANUAL, + run_id=f"test_{delta}", + ) + session.add(dagrun) + for task in dag.tasks: + ti = TaskInstance(task=task) + dagrun.task_instances.append(ti) + ti.state = TaskInstanceState.SUCCESS + session.flush() + assert ( dag.clear( start_date=DEFAULT_DATE, @@ -1678,6 +1700,7 @@ def test_clear_overlapping_external_task_marker_mapped_tasks(dag_bag_head_tail_m logical_date = DEFAULT_DATE + timedelta(days=delta) dagrun = DagRun( dag_id=dag.dag_id, + start_date=logical_date, state=DagRunState.SUCCESS, logical_date=logical_date, run_type=DagRunType.MANUAL,