diff --git a/docs/python_airflow_operator.md b/docs/python_airflow_operator.md index 32fa081fe83..b51e362d804 100644 --- a/docs/python_airflow_operator.md +++ b/docs/python_airflow_operator.md @@ -91,6 +91,8 @@ operator needs to be cleaned up, or it will leave ghost processes behind. +#### operator_extra_links(_: Collection[BaseOperatorLink_ _ = (LookoutLink(),_ ) + #### _property_ pod_manager(_: KubernetesPodLogManage_ ) #### render_template_fields(context, jinja_env=None) @@ -162,6 +164,35 @@ acknowledged by Armada. :type job_acknowledgement_timeout: int :param kwargs: Additional keyword arguments to pass to the BaseOperator. + +### _class_ armada.operators.armada.LookoutLink() +Bases: `BaseOperatorLink` + + +#### get_link(operator, \*, ti_key) +Link to external system. + +Note: The old signature of this function was `(self, operator, dttm: datetime)`. That is still +supported at runtime but is deprecated. + + +* **Parameters** + + + * **operator** (*BaseOperator*) – The Airflow operator object this link is associated to. + + + * **ti_key** (*TaskInstanceKey*) – TaskInstance ID to return link for. + + + +* **Returns** + + link to external system + + + +#### name(_ = 'Lookout_ ) ## armada.triggers.armada module ## armada.auth module diff --git a/third_party/airflow/armada/__init__.py b/third_party/airflow/armada/__init__.py index 807a199de85..a0f32fe1618 100644 --- a/third_party/airflow/armada/__init__.py +++ b/third_party/airflow/armada/__init__.py @@ -2,3 +2,13 @@ _extra_allowed.add("armada.model.RunningJobContext") _extra_allowed.add("armada.model.GrpcChannelArgs") + + +def get_provider_info(): + return { + "package-name": "armada-airflow", + "name": "Armada Airflow Operator", + "description": "Armada Airflow Operator.", + "extra-links": ["armada.operators.armada.LookoutLink"], + "versions": ["1.0.0"], + } diff --git a/third_party/airflow/armada/operators/armada.py b/third_party/airflow/armada/operators/armada.py index aa07227b80e..1d9e285f8f4 100644 --- a/third_party/airflow/armada/operators/armada.py +++ b/third_party/airflow/armada/operators/armada.py @@ -26,7 +26,8 @@ import jinja2 from airflow.configuration import conf from airflow.exceptions import AirflowException -from airflow.models import BaseOperator +from airflow.models import BaseOperator, BaseOperatorLink, XCom +from airflow.models.taskinstancekey import TaskInstanceKey from airflow.serialization.serde import deserialize from airflow.utils.context import Context from airflow.utils.log.logging_mixin import LoggingMixin @@ -44,6 +45,17 @@ from ..utils import log_exceptions +class LookoutLink(BaseOperatorLink): + name = "Lookout" + + def get_link(self, operator: BaseOperator, *, ti_key: TaskInstanceKey): + task_state = XCom.get_value(ti_key=ti_key) + if not task_state: + return "" + + return task_state.get("armada_lookout_url", "") + + class ArmadaOperator(BaseOperator, LoggingMixin): """ An Airflow operator that manages Job submission to Armada. @@ -52,6 +64,8 @@ class ArmadaOperator(BaseOperator, LoggingMixin): and handles job cancellation if the Airflow task is killed. """ + operator_extra_links = (LookoutLink(),) + template_fields: Sequence[str] = ("job_request", "job_set_prefix") template_fields_renderers: Dict[str, str] = {"job_request": "py"} diff --git a/third_party/airflow/armada/plugin.py b/third_party/airflow/armada/plugin.py new file mode 100644 index 00000000000..c7694566914 --- /dev/null +++ b/third_party/airflow/armada/plugin.py @@ -0,0 +1,10 @@ +from airflow.plugins_manager import AirflowPlugin + +from .armada.operators.armada import LookoutLink + + +class AirflowExtraLinkPlugin(AirflowPlugin): + name = "extra_link_plugin" + operator_extra_links = [ + LookoutLink(), + ] diff --git a/third_party/airflow/examples/bad_armada.py b/third_party/airflow/examples/bad_armada.py index 809648e625a..137f4730791 100644 --- a/third_party/airflow/examples/bad_armada.py +++ b/third_party/airflow/examples/bad_armada.py @@ -5,9 +5,8 @@ from armada.operators.armada import ArmadaOperator from armada_client.armada import submit_pb2 from armada_client.k8s.io.api.core.v1 import generated_pb2 as core_v1 -from armada_client.k8s.io.apimachinery.pkg.api.resource import ( - generated_pb2 as api_resource, -) +from armada_client.k8s.io.apimachinery.pkg.api.resource import \ + generated_pb2 as api_resource def submit_sleep_container(image: str): diff --git a/third_party/airflow/examples/big_armada.py b/third_party/airflow/examples/big_armada.py index 868a31516e9..ebd84d723ce 100644 --- a/third_party/airflow/examples/big_armada.py +++ b/third_party/airflow/examples/big_armada.py @@ -5,9 +5,8 @@ from armada.operators.armada import ArmadaOperator from armada_client.armada import submit_pb2 from armada_client.k8s.io.api.core.v1 import generated_pb2 as core_v1 -from armada_client.k8s.io.apimachinery.pkg.api.resource import ( - generated_pb2 as api_resource, -) +from armada_client.k8s.io.apimachinery.pkg.api.resource import \ + generated_pb2 as api_resource def submit_sleep_job(): diff --git a/third_party/airflow/examples/hello_armada.py b/third_party/airflow/examples/hello_armada.py index bdd773ce8fe..d3120bdf5f6 100644 --- a/third_party/airflow/examples/hello_armada.py +++ b/third_party/airflow/examples/hello_armada.py @@ -5,9 +5,8 @@ from armada.operators.armada import ArmadaOperator from armada_client.armada import submit_pb2 from armada_client.k8s.io.api.core.v1 import generated_pb2 as core_v1 -from armada_client.k8s.io.apimachinery.pkg.api.resource import ( - generated_pb2 as api_resource, -) +from armada_client.k8s.io.apimachinery.pkg.api.resource import \ + generated_pb2 as api_resource def submit_sleep_job(): diff --git a/third_party/airflow/examples/hello_armada_deferrable.py b/third_party/airflow/examples/hello_armada_deferrable.py index beac8fa9adb..f3e661875d0 100644 --- a/third_party/airflow/examples/hello_armada_deferrable.py +++ b/third_party/airflow/examples/hello_armada_deferrable.py @@ -5,9 +5,8 @@ from armada.operators.armada import ArmadaOperator from armada_client.armada import submit_pb2 from armada_client.k8s.io.api.core.v1 import generated_pb2 as core_v1 -from armada_client.k8s.io.apimachinery.pkg.api.resource import ( - generated_pb2 as api_resource, -) +from armada_client.k8s.io.apimachinery.pkg.api.resource import \ + generated_pb2 as api_resource def submit_sleep_job(): diff --git a/third_party/airflow/pyproject.toml b/third_party/airflow/pyproject.toml index 39040c3f3d8..ab78fcd676d 100644 --- a/third_party/airflow/pyproject.toml +++ b/third_party/airflow/pyproject.toml @@ -31,6 +31,9 @@ test = ["pytest==7.3.1", "coverage==7.3.2", "pytest-asyncio==0.21.1", # note(JayF): sphinx-jekyll-builder was broken by sphinx-markdown-builder 0.6 -- so pin to 0.5.5 docs = ["sphinx==7.1.2", "sphinx-jekyll-builder==0.3.0", "sphinx-toolbox==3.2.0b1", "sphinx-markdown-builder==0.5.5"] +[project.entry-points.apache_airflow_provider] +provider_info = "armada.__init__:get_provider_info" + [project.urls] repository='https://github.com/armadaproject/armada'