Skip to content

Commit d75c631

Browse files
authored
Remove some top-level imports to airflow.sdk from Core airflow. (#47785)
This removes anything from the import path in the Execution API, which was failing to load when building the sdk's datamodels/_generated.py. It would be nice to add a ruff rule to enforce this, but while `airflow/` is still in the top level (and not in a subproject) it's a lot of work to do this without needint go exclude it from _every_ providers rules. So I'm doing this right now with the knowledge that this is a game of whack-a-mole.
1 parent b6a41a4 commit d75c631

File tree

5 files changed

+40
-17
lines changed

5 files changed

+40
-17
lines changed

airflow/exceptions.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,13 +27,13 @@
2727
from http import HTTPStatus
2828
from typing import TYPE_CHECKING, Any, NamedTuple
2929

30-
from airflow.sdk.definitions.asset import AssetNameRef, AssetUniqueKey, AssetUriRef
3130
from airflow.utils.trigger_rule import TriggerRule
3231

3332
if TYPE_CHECKING:
3433
from collections.abc import Sized
3534

3635
from airflow.models import DagRun
36+
from airflow.sdk.definitions.asset import AssetNameRef, AssetUniqueKey, AssetUriRef
3737

3838

3939
class AirflowException(Exception):
@@ -121,6 +121,8 @@ def __init__(self, inactive_asset_keys: Collection[AssetUniqueKey | AssetNameRef
121121

122122
@staticmethod
123123
def _render_asset_key(key: AssetUniqueKey | AssetNameRef | AssetUriRef) -> str:
124+
from airflow.sdk.definitions.asset import AssetNameRef, AssetUniqueKey, AssetUriRef
125+
124126
if isinstance(key, AssetUniqueKey):
125127
return f"Asset(name={key.name!r}, uri={key.uri!r})"
126128
elif isinstance(key, AssetNameRef):

airflow/models/asset.py

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,6 @@
3636
from sqlalchemy.orm import relationship
3737

3838
from airflow.models.base import Base, StringID
39-
from airflow.sdk.definitions.asset import Asset, AssetAlias
4039
from airflow.settings import json
4140
from airflow.utils import timezone
4241
from airflow.utils.sqlalchemy import UtcDateTime
@@ -47,6 +46,8 @@
4746

4847
from sqlalchemy.orm import Session
4948

49+
from airflow.sdk.definitions.asset import Asset, AssetAlias
50+
5051

5152
def fetch_active_assets_by_name(names: Iterable[str], session: Session) -> dict[str, Asset]:
5253
return {
@@ -187,12 +188,16 @@ def __hash__(self):
187188
return hash(self.name)
188189

189190
def __eq__(self, other):
191+
from airflow.sdk.definitions.asset import AssetAlias
192+
190193
if isinstance(other, (self.__class__, AssetAlias)):
191194
return self.name == other.name
192195
else:
193196
return NotImplemented
194197

195198
def to_public(self) -> AssetAlias:
199+
from airflow.sdk.definitions.asset import AssetAlias
200+
196201
return AssetAlias(name=self.name)
197202

198203

@@ -280,6 +285,8 @@ def __init__(self, name: str = "", uri: str = "", **kwargs):
280285
super().__init__(name=name, uri=uri, **kwargs)
281286

282287
def __eq__(self, other):
288+
from airflow.sdk.definitions.asset import Asset
289+
283290
if isinstance(other, (self.__class__, Asset)):
284291
return self.name == other.name and self.uri == other.uri
285292
return NotImplemented
@@ -291,6 +298,8 @@ def __repr__(self):
291298
return f"{self.__class__.__name__}(name={self.name!r}, uri={self.uri!r}, extra={self.extra!r})"
292299

293300
def to_public(self) -> Asset:
301+
from airflow.sdk.definitions.asset import Asset
302+
294303
return Asset(name=self.name, uri=self.uri, group=self.group, extra=self.extra)
295304

296305

airflow/models/dag.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -85,7 +85,6 @@
8585
from airflow.models.dag_version import DagVersion
8686
from airflow.models.dagrun import RUN_ID_REGEX, DagRun
8787
from airflow.models.taskinstance import (
88-
Context,
8988
TaskInstance,
9089
TaskInstanceKey,
9190
clear_task_instances,
@@ -105,6 +104,7 @@
105104
OnceTimetable,
106105
)
107106
from airflow.utils import timezone
107+
from airflow.utils.context import Context
108108
from airflow.utils.dag_cycle_tester import check_cycle
109109
from airflow.utils.log.logging_mixin import LoggingMixin
110110
from airflow.utils.session import NEW_SESSION, provide_session

airflow/models/taskinstance.py

Lines changed: 24 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -104,26 +104,13 @@
104104
from airflow.models.taskreschedule import TaskReschedule
105105
from airflow.models.xcom import LazyXComSelectSequence, XCom
106106
from airflow.plugins_manager import integrate_macros_plugins
107-
from airflow.sdk.definitions._internal.templater import SandboxedEnvironment
108-
from airflow.sdk.definitions.asset import Asset, AssetAlias, AssetNameRef, AssetUniqueKey, AssetUriRef
109-
from airflow.sdk.definitions.param import process_params
110-
from airflow.sdk.definitions.taskgroup import MappedTaskGroup
111-
from airflow.sdk.execution_time.context import InletEventsAccessors
112107
from airflow.sentry import Sentry
113108
from airflow.settings import task_instance_mutation_hook
114109
from airflow.stats import Stats
115110
from airflow.ti_deps.dep_context import DepContext
116111
from airflow.ti_deps.dependencies_deps import REQUEUEABLE_DEPS, RUNNING_DEPS
117112
from airflow.traces.tracer import Trace
118113
from airflow.utils import timezone
119-
from airflow.utils.context import (
120-
ConnectionAccessor,
121-
Context,
122-
OutletEventAccessors,
123-
VariableAccessor,
124-
context_get_outlet_events,
125-
context_merge,
126-
)
127114
from airflow.utils.email import send_email
128115
from airflow.utils.helpers import prune_dict, render_template_to_string
129116
from airflow.utils.log.logging_mixin import LoggingMixin
@@ -160,9 +147,12 @@
160147
from airflow.models.dagrun import DagRun
161148
from airflow.sdk.api.datamodels._generated import AssetProfile
162149
from airflow.sdk.definitions._internal.abstractoperator import Operator
150+
from airflow.sdk.definitions.asset import AssetNameRef, AssetUniqueKey, AssetUriRef
163151
from airflow.sdk.definitions.dag import DAG
152+
from airflow.sdk.definitions.taskgroup import MappedTaskGroup
164153
from airflow.sdk.types import RuntimeTaskInstanceProtocol
165154
from airflow.typing_compat import Literal
155+
from airflow.utils.context import Context
166156
from airflow.utils.task_group import TaskGroup
167157

168158

@@ -261,6 +251,8 @@ def _run_raw_task(
261251

262252
try:
263253
if ti.task:
254+
from airflow.sdk.definitions.asset import Asset
255+
264256
inlets = [asset.asprofile() for asset in ti.task.inlets if isinstance(asset, Asset)]
265257
outlets = [asset.asprofile() for asset in ti.task.outlets if isinstance(asset, Asset)]
266258
TaskInstance.validate_inlet_outlet_assets_activeness(inlets, outlets, session=session)
@@ -678,6 +670,8 @@ def _execute_task(task_instance: TaskInstance, context: Context, task_orig: Oper
678670
)
679671

680672
def _execute_callable(context: Context, **execute_callable_kwargs):
673+
from airflow.utils.context import context_get_outlet_events
674+
681675
try:
682676
# Print a marker for log grouping of details before task execution
683677
log.info("::endgroup::")
@@ -903,6 +897,13 @@ def _get_template_context(
903897
PrevSuccessfulDagRunResponse,
904898
TIRunContext,
905899
)
900+
from airflow.sdk.definitions.param import process_params
901+
from airflow.sdk.execution_time.context import InletEventsAccessors
902+
from airflow.utils.context import (
903+
ConnectionAccessor,
904+
OutletEventAccessors,
905+
VariableAccessor,
906+
)
906907

907908
integrate_macros_plugins()
908909

@@ -1347,6 +1348,9 @@ def _get_email_subject_content(
13471348
html_content_err = jinja_env.from_string(default_html_content_err).render(**default_context)
13481349

13491350
else:
1351+
from airflow.sdk.definitions._internal.templater import SandboxedEnvironment
1352+
from airflow.utils.context import context_merge
1353+
13501354
if TYPE_CHECKING:
13511355
assert task_instance.task
13521356

@@ -2736,6 +2740,8 @@ def register_asset_changes_in_db(
27362740
outlet_events: list[dict[str, Any]],
27372741
session: Session = NEW_SESSION,
27382742
) -> None:
2743+
from airflow.sdk.definitions.asset import Asset, AssetAlias, AssetNameRef, AssetUniqueKey, AssetUriRef
2744+
27392745
asset_keys = {
27402746
AssetUniqueKey(o.name, o.uri)
27412747
for o in task_outlets
@@ -3682,6 +3688,8 @@ def duration_expression_update(
36823688
def validate_inlet_outlet_assets_activeness(
36833689
inlets: list[AssetProfile], outlets: list[AssetProfile], session: Session
36843690
) -> None:
3691+
from airflow.sdk.definitions.asset import AssetUniqueKey
3692+
36853693
if not (inlets or outlets):
36863694
return
36873695

@@ -3699,6 +3707,8 @@ def validate_inlet_outlet_assets_activeness(
36993707
def _get_inactive_asset_unique_keys(
37003708
asset_unique_keys: set[AssetUniqueKey], session: Session
37013709
) -> set[AssetUniqueKey]:
3710+
from airflow.sdk.definitions.asset import AssetUniqueKey
3711+
37023712
active_asset_unique_keys = {
37033713
AssetUniqueKey(name, uri)
37043714
for name, uri in session.execute(
@@ -3724,6 +3734,7 @@ def _find_common_ancestor_mapped_group(node1: Operator, node2: Operator) -> Mapp
37243734
def _is_further_mapped_inside(operator: Operator, container: TaskGroup) -> bool:
37253735
"""Whether given operator is *further* mapped inside a task group."""
37263736
from airflow.sdk.definitions.mappedoperator import MappedOperator
3737+
from airflow.sdk.definitions.taskgroup import MappedTaskGroup
37273738

37283739
if isinstance(operator, MappedOperator):
37293740
return True

airflow/utils/operator_helpers.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,6 @@
2424
from typing import TYPE_CHECKING, Any, Callable, Protocol, TypeVar
2525

2626
from airflow import settings
27-
from airflow.sdk.definitions.asset.metadata import Metadata
2827
from airflow.typing_compat import ParamSpec
2928
from airflow.utils.types import NOTSET
3029

@@ -257,6 +256,8 @@ def ExecutionCallableRunner(
257256
class _ExecutionCallableRunnerImpl:
258257
@staticmethod
259258
def run(*args: P.args, **kwargs: P.kwargs) -> R:
259+
from airflow.sdk.definitions.asset.metadata import Metadata
260+
260261
if not inspect.isgeneratorfunction(func):
261262
return func(*args, **kwargs)
262263

0 commit comments

Comments
 (0)