Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove core Airflow support for static hybrid executors #47322

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions airflow/config_templates/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -71,8 +71,7 @@ core:
description: |
The executor class that airflow should use. Choices include
``SequentialExecutor``, ``LocalExecutor``, ``CeleryExecutor``,
``KubernetesExecutor``, ``CeleryKubernetesExecutor``, ``LocalKubernetesExecutor`` or the
full import path to the class when using a custom executor.
``KubernetesExecutor`` or the full import path to the class when using a custom executor.
version_added: ~
type: string
example: ~
Expand Down
4 changes: 0 additions & 4 deletions airflow/executors/executor_constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,19 +28,15 @@ class ConnectorSource(Enum):


LOCAL_EXECUTOR = "LocalExecutor"
LOCAL_KUBERNETES_EXECUTOR = "LocalKubernetesExecutor"
SEQUENTIAL_EXECUTOR = "SequentialExecutor"
CELERY_EXECUTOR = "CeleryExecutor"
CELERY_KUBERNETES_EXECUTOR = "CeleryKubernetesExecutor"
KUBERNETES_EXECUTOR = "KubernetesExecutor"
DEBUG_EXECUTOR = "DebugExecutor"
MOCK_EXECUTOR = "MockExecutor"
CORE_EXECUTOR_NAMES = {
LOCAL_EXECUTOR,
LOCAL_KUBERNETES_EXECUTOR,
SEQUENTIAL_EXECUTOR,
CELERY_EXECUTOR,
CELERY_KUBERNETES_EXECUTOR,
KUBERNETES_EXECUTOR,
DEBUG_EXECUTOR,
MOCK_EXECUTOR,
Expand Down
37 changes: 5 additions & 32 deletions airflow/executors/executor_loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,10 @@
from airflow.exceptions import AirflowConfigException, UnknownExecutorException
from airflow.executors.executor_constants import (
CELERY_EXECUTOR,
CELERY_KUBERNETES_EXECUTOR,
CORE_EXECUTOR_NAMES,
DEBUG_EXECUTOR,
KUBERNETES_EXECUTOR,
LOCAL_EXECUTOR,
LOCAL_KUBERNETES_EXECUTOR,
SEQUENTIAL_EXECUTOR,
ConnectorSource,
)
Expand Down Expand Up @@ -59,12 +57,8 @@ class ExecutorLoader:

executors = {
LOCAL_EXECUTOR: "airflow.executors.local_executor.LocalExecutor",
LOCAL_KUBERNETES_EXECUTOR: "airflow.providers.cncf.kubernetes."
"executors.local_kubernetes_executor.LocalKubernetesExecutor",
SEQUENTIAL_EXECUTOR: "airflow.executors.sequential_executor.SequentialExecutor",
CELERY_EXECUTOR: "airflow.providers.celery.executors.celery_executor.CeleryExecutor",
CELERY_KUBERNETES_EXECUTOR: "airflow.providers.celery."
"executors.celery_kubernetes_executor.CeleryKubernetesExecutor",
KUBERNETES_EXECUTOR: "airflow.providers.cncf.kubernetes."
"executors.kubernetes_executor.KubernetesExecutor",
DEBUG_EXECUTOR: "airflow.executors.debug_executor.DebugExecutor",
Expand Down Expand Up @@ -265,17 +259,12 @@ def load_executor(cls, executor_name: ExecutorName | str | None) -> BaseExecutor
_executor_name = executor_name

try:
if _executor_name.alias == CELERY_KUBERNETES_EXECUTOR:
executor = cls.__load_celery_kubernetes_executor()
elif _executor_name.alias == LOCAL_KUBERNETES_EXECUTOR:
executor = cls.__load_local_kubernetes_executor()
executor_cls, import_source = cls.import_executor_cls(_executor_name)
log.debug("Loading executor %s from %s", _executor_name, import_source.value)
if _executor_name.team_id:
executor = executor_cls(team_id=_executor_name.team_id)
else:
executor_cls, import_source = cls.import_executor_cls(_executor_name)
log.debug("Loading executor %s from %s", _executor_name, import_source.value)
if _executor_name.team_id:
executor = executor_cls(team_id=_executor_name.team_id)
else:
executor = executor_cls()
executor = executor_cls()

except ImportError as e:
log.error(e)
Expand Down Expand Up @@ -315,19 +304,3 @@ def import_default_executor_cls(cls) -> tuple[type[BaseExecutor], ConnectorSourc
executor_name = cls.get_default_executor_name()
executor, source = cls.import_executor_cls(executor_name)
return executor, source

@classmethod
def __load_celery_kubernetes_executor(cls) -> BaseExecutor:
celery_executor = import_string(cls.executors[CELERY_EXECUTOR])()
kubernetes_executor = import_string(cls.executors[KUBERNETES_EXECUTOR])()

celery_kubernetes_executor_cls = import_string(cls.executors[CELERY_KUBERNETES_EXECUTOR])
return celery_kubernetes_executor_cls(celery_executor, kubernetes_executor)

@classmethod
def __load_local_kubernetes_executor(cls) -> BaseExecutor:
local_executor = import_string(cls.executors[LOCAL_EXECUTOR])()
kubernetes_executor = import_string(cls.executors[KUBERNETES_EXECUTOR])()

local_kubernetes_executor_cls = import_string(cls.executors[LOCAL_KUBERNETES_EXECUTOR])
return local_kubernetes_executor_cls(local_executor, kubernetes_executor)
6 changes: 1 addition & 5 deletions airflow/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -687,11 +687,7 @@ def initialize():
LAZY_LOAD_PROVIDERS: bool = conf.getboolean("core", "lazy_discover_providers", fallback=True)

# Determines if the executor utilizes Kubernetes
IS_K8S_OR_K8SCELERY_EXECUTOR = conf.get("core", "EXECUTOR") in {
executor_constants.KUBERNETES_EXECUTOR,
executor_constants.CELERY_KUBERNETES_EXECUTOR,
executor_constants.LOCAL_KUBERNETES_EXECUTOR,
}
IS_K8S_OR_K8SCELERY_EXECUTOR = conf.get("core", "EXECUTOR") == executor_constants.KUBERNETES_EXECUTOR
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This variable name probably needs some work. Also probably means this should be a flag on the interface? Not sure what it actually controls on the ui though.


# Executors can set this to true to configure logging correctly for
# containerized executors.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@ def setup_class(cls):
("LocalExecutor", True),
("SequentialExecutor", True),
("KubernetesExecutor", False),
("LocalKubernetesExecutor", True),
],
)
@mock.patch("airflow.cli.commands.local_commands.scheduler_command.SchedulerJobRunner")
Expand Down
6 changes: 0 additions & 6 deletions tests/cli/commands/local_commands/test_standalone_command.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,9 @@
from airflow.executors import executor_loader
from airflow.executors.executor_constants import (
CELERY_EXECUTOR,
CELERY_KUBERNETES_EXECUTOR,
DEBUG_EXECUTOR,
KUBERNETES_EXECUTOR,
LOCAL_EXECUTOR,
LOCAL_KUBERNETES_EXECUTOR,
SEQUENTIAL_EXECUTOR,
)

Expand All @@ -40,17 +38,13 @@ class TestStandaloneCommand:
"conf_executor_name, conf_sql_alchemy_conn, expected_standalone_executor",
[
(LOCAL_EXECUTOR, "sqlite_conn_string", LOCAL_EXECUTOR),
(LOCAL_KUBERNETES_EXECUTOR, "sqlite_conn_string", SEQUENTIAL_EXECUTOR),
(SEQUENTIAL_EXECUTOR, "sqlite_conn_string", SEQUENTIAL_EXECUTOR),
(CELERY_EXECUTOR, "sqlite_conn_string", SEQUENTIAL_EXECUTOR),
(CELERY_KUBERNETES_EXECUTOR, "sqlite_conn_string", SEQUENTIAL_EXECUTOR),
(KUBERNETES_EXECUTOR, "sqlite_conn_string", SEQUENTIAL_EXECUTOR),
(DEBUG_EXECUTOR, "sqlite_conn_string", SEQUENTIAL_EXECUTOR),
(LOCAL_EXECUTOR, "other_db_conn_string", LOCAL_EXECUTOR),
(LOCAL_KUBERNETES_EXECUTOR, "other_db_conn_string", LOCAL_EXECUTOR),
(SEQUENTIAL_EXECUTOR, "other_db_conn_string", SEQUENTIAL_EXECUTOR),
(CELERY_EXECUTOR, "other_db_conn_string", LOCAL_EXECUTOR),
(CELERY_KUBERNETES_EXECUTOR, "other_db_conn_string", LOCAL_EXECUTOR),
(KUBERNETES_EXECUTOR, "other_db_conn_string", LOCAL_EXECUTOR),
(DEBUG_EXECUTOR, "other_db_conn_string", LOCAL_EXECUTOR),
],
Expand Down
10 changes: 2 additions & 8 deletions tests/cli/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@

from airflow.executors import local_executor
from airflow.models.dagbag import DagBag
from airflow.providers.celery.executors import celery_executor, celery_kubernetes_executor
from airflow.providers.cncf.kubernetes.executors import kubernetes_executor, local_kubernetes_executor
from airflow.providers.celery.executors import celery_executor
from airflow.providers.cncf.kubernetes.executors import kubernetes_executor

from tests_common.test_utils.config import conf_vars

Expand All @@ -33,15 +33,9 @@
custom_executor_module.CustomCeleryExecutor = type( # type: ignore
"CustomCeleryExecutor", (celery_executor.CeleryExecutor,), {}
)
custom_executor_module.CustomCeleryKubernetesExecutor = type( # type: ignore
"CustomCeleryKubernetesExecutor", (celery_kubernetes_executor.CeleryKubernetesExecutor,), {}
)
custom_executor_module.CustomLocalExecutor = type( # type: ignore
"CustomLocalExecutor", (local_executor.LocalExecutor,), {}
)
custom_executor_module.CustomLocalKubernetesExecutor = type( # type: ignore
"CustomLocalKubernetesExecutor", (local_kubernetes_executor.LocalKubernetesExecutor,), {}
)
custom_executor_module.CustomKubernetesExecutor = type( # type: ignore
"CustomKubernetesExecutor", (kubernetes_executor.KubernetesExecutor,), {}
)
Expand Down
4 changes: 0 additions & 4 deletions tests/cli/test_cli_parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -365,16 +365,12 @@ def test_executor_specific_commands_not_accessible(self, command):
"executor,expected_args",
[
("CeleryExecutor", ["celery"]),
("CeleryKubernetesExecutor", ["celery", "kubernetes"]),
("KubernetesExecutor", ["kubernetes"]),
("LocalExecutor", []),
("LocalKubernetesExecutor", ["kubernetes"]),
("SequentialExecutor", []),
# custom executors are mapped to the regular ones in `conftest.py`
("custom_executor.CustomLocalExecutor", []),
("custom_executor.CustomLocalKubernetesExecutor", ["kubernetes"]),
("custom_executor.CustomCeleryExecutor", ["celery"]),
("custom_executor.CustomCeleryKubernetesExecutor", ["celery", "kubernetes"]),
("custom_executor.CustomKubernetesExecutor", ["kubernetes"]),
],
)
Expand Down
2 changes: 0 additions & 2 deletions tests/executors/test_executor_loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@ def test_no_executor_configured(self):
"executor_name",
[
"CeleryExecutor",
"CeleryKubernetesExecutor",
"DebugExecutor",
"KubernetesExecutor",
"LocalExecutor",
Expand Down Expand Up @@ -287,7 +286,6 @@ def test_get_hybrid_executors_from_config_core_executors_bad_config_format(self,
("executor_config", "expected_value"),
[
("CeleryExecutor", "CeleryExecutor"),
("CeleryKubernetesExecutor", "CeleryKubernetesExecutor"),
("DebugExecutor", "DebugExecutor"),
("KubernetesExecutor", "KubernetesExecutor"),
("LocalExecutor", "LocalExecutor"),
Expand Down
8 changes: 0 additions & 8 deletions tests/sensors/test_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,11 +35,9 @@
from airflow.executors.debug_executor import DebugExecutor
from airflow.executors.executor_constants import (
CELERY_EXECUTOR,
CELERY_KUBERNETES_EXECUTOR,
DEBUG_EXECUTOR,
KUBERNETES_EXECUTOR,
LOCAL_EXECUTOR,
LOCAL_KUBERNETES_EXECUTOR,
SEQUENTIAL_EXECUTOR,
)
from airflow.executors.local_executor import LocalExecutor
Expand All @@ -48,9 +46,7 @@
from airflow.models.trigger import TriggerFailureReason
from airflow.models.xcom import XCom
from airflow.providers.celery.executors.celery_executor import CeleryExecutor
from airflow.providers.celery.executors.celery_kubernetes_executor import CeleryKubernetesExecutor
from airflow.providers.cncf.kubernetes.executors.kubernetes_executor import KubernetesExecutor
from airflow.providers.cncf.kubernetes.executors.local_kubernetes_executor import LocalKubernetesExecutor
from airflow.providers.standard.operators.empty import EmptyOperator
from airflow.sensors.base import BaseSensorOperator, PokeReturnValue, poke_mode_only
from airflow.ti_deps.deps.ready_to_reschedule import ReadyToRescheduleDep
Expand Down Expand Up @@ -1306,20 +1302,16 @@ def test_sensor_with_xcom_fails(self, make_sensor):
"executor_cls_mode",
[
(CeleryExecutor, "poke"),
(CeleryKubernetesExecutor, "poke"),
(DebugExecutor, "reschedule"),
(KubernetesExecutor, "poke"),
(LocalExecutor, "poke"),
(LocalKubernetesExecutor, "poke"),
(SequentialExecutor, "poke"),
],
ids=[
CELERY_EXECUTOR,
CELERY_KUBERNETES_EXECUTOR,
DEBUG_EXECUTOR,
KUBERNETES_EXECUTOR,
LOCAL_EXECUTOR,
LOCAL_KUBERNETES_EXECUTOR,
SEQUENTIAL_EXECUTOR,
],
)
Expand Down
4 changes: 0 additions & 4 deletions tests/utils/test_log_handlers.py
Original file line number Diff line number Diff line change
Expand Up @@ -199,8 +199,6 @@ def task_callable(ti):
@pytest.mark.parametrize(
"executor_name",
[
(executor_constants.LOCAL_KUBERNETES_EXECUTOR),
(executor_constants.CELERY_KUBERNETES_EXECUTOR),
(executor_constants.KUBERNETES_EXECUTOR),
(None),
],
Expand All @@ -209,8 +207,6 @@ def task_callable(ti):
{
("core", "EXECUTOR"): ",".join(
[
executor_constants.LOCAL_KUBERNETES_EXECUTOR,
executor_constants.CELERY_KUBERNETES_EXECUTOR,
executor_constants.KUBERNETES_EXECUTOR,
]
),
Expand Down
Loading