From b475c87b73b8f765804562f2e38266833cf67882 Mon Sep 17 00:00:00 2001 From: Kevin Su Date: Fri, 5 Apr 2024 12:25:24 -0700 Subject: [PATCH] Show a dataproc link on FlyteConsole (#2333) Signed-off-by: Kevin Su --- .../flytekitplugins/airflow/agent.py | 31 ++++++++++++++++--- 1 file changed, 27 insertions(+), 4 deletions(-) diff --git a/plugins/flytekit-airflow/flytekitplugins/airflow/agent.py b/plugins/flytekit-airflow/flytekitplugins/airflow/agent.py index 2ff0d0e9a5..76fdf9abd8 100644 --- a/plugins/flytekit-airflow/flytekitplugins/airflow/agent.py +++ b/plugins/flytekit-airflow/flytekitplugins/airflow/agent.py @@ -1,17 +1,17 @@ import asyncio import typing from dataclasses import dataclass, field -from typing import Optional +from typing import List, Optional import cloudpickle import jsonpickle -from flyteidl.core.execution_pb2 import TaskExecution +from flyteidl.core.execution_pb2 import TaskExecution, TaskLog from flytekitplugins.airflow.task import AirflowObj, _get_airflow_instance from airflow.exceptions import AirflowException, TaskDeferred from airflow.models import BaseOperator from airflow.sensors.base import BaseSensorOperator -from airflow.triggers.base import TriggerEvent +from airflow.triggers.base import BaseTrigger, TriggerEvent from airflow.utils.context import Context from flytekit import logger from flytekit.exceptions.user import FlyteUserException @@ -134,10 +134,33 @@ async def get(self, resource_meta: AirflowMetadata, **kwargs) -> Resource: else: raise FlyteUserException("Only sensor and operator are supported.") - return Resource(phase=cur_phase, message=message) + return Resource( + phase=cur_phase, + message=message, + log_links=get_log_links(airflow_operator_instance, airflow_trigger_instance), + ) async def delete(self, resource_meta: AirflowMetadata, **kwargs): return +def get_log_links( + airflow_operator: BaseOperator, airflow_trigger: Optional[BaseTrigger] = None +) -> Optional[List[TaskLog]]: + log_links: List[TaskLog] = [] + try: + from airflow.providers.google.cloud.operators.dataproc import DataprocJobBaseOperator, DataprocSubmitTrigger + + if isinstance(airflow_operator, DataprocJobBaseOperator): + log_link = TaskLog( + uri=f"https://console.cloud.google.com/dataproc/jobs/{typing.cast(DataprocSubmitTrigger, airflow_trigger).job_id}/monitoring?region={airflow_operator.region}&project={airflow_operator.project_id}", + name="Dataproc Console", + ) + log_links.append(log_link) + return log_links + except ImportError: + ... + return log_links + + AgentRegistry.register(AirflowAgent())