Skip to content

Commit

Permalink
Show a dataproc link on FlyteConsole (#2333)
Browse files Browse the repository at this point in the history
Signed-off-by: Kevin Su <[email protected]>
Signed-off-by: Jan Fiedler <[email protected]>
  • Loading branch information
pingsutw authored and fiedlerNr9 committed Jul 25, 2024
1 parent d7b6a4b commit a731786
Showing 1 changed file with 27 additions and 4 deletions.
31 changes: 27 additions & 4 deletions plugins/flytekit-airflow/flytekitplugins/airflow/agent.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,17 @@
import asyncio
import typing
from dataclasses import dataclass, field
from typing import Optional
from typing import List, Optional

import cloudpickle
import jsonpickle
from flyteidl.core.execution_pb2 import TaskExecution
from flyteidl.core.execution_pb2 import TaskExecution, TaskLog
from flytekitplugins.airflow.task import AirflowObj, _get_airflow_instance

from airflow.exceptions import AirflowException, TaskDeferred
from airflow.models import BaseOperator
from airflow.sensors.base import BaseSensorOperator
from airflow.triggers.base import TriggerEvent
from airflow.triggers.base import BaseTrigger, TriggerEvent
from airflow.utils.context import Context
from flytekit import logger
from flytekit.exceptions.user import FlyteUserException
Expand Down Expand Up @@ -134,10 +134,33 @@ async def get(self, resource_meta: AirflowMetadata, **kwargs) -> Resource:
else:
raise FlyteUserException("Only sensor and operator are supported.")

return Resource(phase=cur_phase, message=message)
return Resource(
phase=cur_phase,
message=message,
log_links=get_log_links(airflow_operator_instance, airflow_trigger_instance),
)

async def delete(self, resource_meta: AirflowMetadata, **kwargs):
return


def get_log_links(
airflow_operator: BaseOperator, airflow_trigger: Optional[BaseTrigger] = None
) -> Optional[List[TaskLog]]:
log_links: List[TaskLog] = []
try:
from airflow.providers.google.cloud.operators.dataproc import DataprocJobBaseOperator, DataprocSubmitTrigger

if isinstance(airflow_operator, DataprocJobBaseOperator):
log_link = TaskLog(
uri=f"https://console.cloud.google.com/dataproc/jobs/{typing.cast(DataprocSubmitTrigger, airflow_trigger).job_id}/monitoring?region={airflow_operator.region}&project={airflow_operator.project_id}",
name="Dataproc Console",
)
log_links.append(log_link)
return log_links
except ImportError:
...
return log_links


AgentRegistry.register(AirflowAgent())

0 comments on commit a731786

Please sign in to comment.