Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Airflow Agent lazy module loading #2098

Closed
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 20 additions & 15 deletions plugins/flytekit-airflow/flytekitplugins/airflow/agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,17 +8,18 @@
from flyteidl.core.execution_pb2 import TaskExecution
from flytekitplugins.airflow.task import AirflowObj, _get_airflow_instance

from airflow.exceptions import AirflowException, TaskDeferred
from airflow.models import BaseOperator
from airflow.sensors.base import BaseSensorOperator
from airflow.triggers.base import TriggerEvent
from airflow.utils.context import Context
from flytekit import logger
from flytekit import lazy_module, logger
from flytekit.exceptions.user import FlyteUserException
from flytekit.extend.backend.base_agent import AgentRegistry, AsyncAgentBase, Resource, ResourceMeta
from flytekit.models.literals import LiteralMap
from flytekit.models.task import TaskTemplate

airflow_exceptions = lazy_module("airflow.exceptions")
airflow_models = lazy_module("airflow.models")
airflow_sensors = lazy_module("airflow.sensors.base")
airflow_triggers = lazy_module("airflow.triggers.base")
airflow_utils_context = lazy_module("airflow.utils.context")


@dataclass
class AirflowMetadata(ResourceMeta):
Expand Down Expand Up @@ -72,11 +73,13 @@
airflow_instance = _get_airflow_instance(airflow_obj)
resource_meta = AirflowMetadata(airflow_operator=airflow_obj)

if isinstance(airflow_instance, BaseOperator) and not isinstance(airflow_instance, BaseSensorOperator):
if isinstance(airflow_instance, airflow_models.BaseOperator) and not isinstance(
airflow_instance, airflow_sensors.BaseSensorOperator
):
try:
resource_meta = AirflowMetadata(airflow_operator=airflow_obj)
airflow_instance.execute(context=Context())
except TaskDeferred as td:
airflow_instance.execute(context=airflow_utils_context.Context())
except airflow_exceptions.TaskDeferred as td:

Check warning on line 82 in plugins/flytekit-airflow/flytekitplugins/airflow/agent.py

View check run for this annotation

Codecov / codecov/patch

plugins/flytekit-airflow/flytekitplugins/airflow/agent.py#L82

Added line #L82 was not covered by tests
parameters = td.trigger.__dict__.copy()
# Remove parameters that are in the base class
parameters.pop("task_instance", None)
Expand All @@ -94,14 +97,14 @@
airflow_trigger_instance = (
_get_airflow_instance(resource_meta.airflow_trigger) if resource_meta.airflow_trigger else None
)
airflow_ctx = Context()
airflow_ctx = airflow_utils_context.Context()
message = None
cur_phase = TaskExecution.RUNNING

if isinstance(airflow_operator_instance, BaseSensorOperator):
if isinstance(airflow_operator_instance, airflow_sensors.BaseSensorOperator):
ok = airflow_operator_instance.poke(context=airflow_ctx)
cur_phase = TaskExecution.SUCCEEDED if ok else TaskExecution.RUNNING
elif isinstance(airflow_operator_instance, BaseOperator):
elif isinstance(airflow_operator_instance, airflow_models.BaseOperator):
if airflow_trigger_instance:
try:
# Airflow trigger returns immediately when
Expand All @@ -114,14 +117,16 @@
try:
# Trigger callback will check the status of the task in the payload, and raise AirflowException if failed.
trigger_callback = getattr(airflow_operator_instance, resource_meta.airflow_trigger_callback)
trigger_callback(context=airflow_ctx, event=typing.cast(TriggerEvent, event).payload)
trigger_callback(

Check warning on line 120 in plugins/flytekit-airflow/flytekitplugins/airflow/agent.py

View check run for this annotation

Codecov / codecov/patch

plugins/flytekit-airflow/flytekitplugins/airflow/agent.py#L120

Added line #L120 was not covered by tests
context=airflow_ctx, event=typing.cast(airflow_triggers.TriggerEvent, event).payload
)
cur_phase = TaskExecution.SUCCEEDED
except AirflowException as e:
except airflow_exceptions.AirflowException as e:

Check warning on line 124 in plugins/flytekit-airflow/flytekitplugins/airflow/agent.py

View check run for this annotation

Codecov / codecov/patch

plugins/flytekit-airflow/flytekitplugins/airflow/agent.py#L124

Added line #L124 was not covered by tests
cur_phase = TaskExecution.FAILED
message = e.__str__()
except asyncio.TimeoutError:
logger.debug("No event received from airflow trigger")
except AirflowException as e:
except airflow_exceptions.AirflowException as e:

Check warning on line 129 in plugins/flytekit-airflow/flytekitplugins/airflow/agent.py

View check run for this annotation

Codecov / codecov/patch

plugins/flytekit-airflow/flytekitplugins/airflow/agent.py#L129

Added line #L129 was not covered by tests
cur_phase = TaskExecution.FAILED
message = e.__str__()
else:
Expand Down
Loading