Skip to content

Commit

Permalink
Add lookout link to Airflow Operator (#207) (#3888)
Browse files Browse the repository at this point in the history
  • Loading branch information
masipauskas authored Aug 23, 2024
1 parent 9b6f770 commit 537852a
Show file tree
Hide file tree
Showing 9 changed files with 77 additions and 13 deletions.
31 changes: 31 additions & 0 deletions docs/python_airflow_operator.md
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,8 @@ operator needs to be cleaned up, or it will leave ghost processes behind.



#### operator_extra_links(_: Collection[BaseOperatorLink_ _ = (LookoutLink(),_ )

#### _property_ pod_manager(_: KubernetesPodLogManage_ )

#### render_template_fields(context, jinja_env=None)
Expand Down Expand Up @@ -162,6 +164,35 @@ acknowledged by Armada.
:type job_acknowledgement_timeout: int
:param kwargs: Additional keyword arguments to pass to the BaseOperator.


### _class_ armada.operators.armada.LookoutLink()
Bases: `BaseOperatorLink`


#### get_link(operator, \*, ti_key)
Link to external system.

Note: The old signature of this function was `(self, operator, dttm: datetime)`. That is still
supported at runtime but is deprecated.


* **Parameters**


* **operator** (*BaseOperator*) – The Airflow operator object this link is associated to.


* **ti_key** (*TaskInstanceKey*) – TaskInstance ID to return link for.



* **Returns**

link to external system



#### name(_ = 'Lookout_ )
## armada.triggers.armada module

## armada.auth module
Expand Down
10 changes: 10 additions & 0 deletions third_party/airflow/armada/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,3 +2,13 @@

_extra_allowed.add("armada.model.RunningJobContext")
_extra_allowed.add("armada.model.GrpcChannelArgs")


def get_provider_info():
return {
"package-name": "armada-airflow",
"name": "Armada Airflow Operator",
"description": "Armada Airflow Operator.",
"extra-links": ["armada.operators.armada.LookoutLink"],
"versions": ["1.0.0"],
}
16 changes: 15 additions & 1 deletion third_party/airflow/armada/operators/armada.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,8 @@
import jinja2
from airflow.configuration import conf
from airflow.exceptions import AirflowException
from airflow.models import BaseOperator
from airflow.models import BaseOperator, BaseOperatorLink, XCom
from airflow.models.taskinstancekey import TaskInstanceKey
from airflow.serialization.serde import deserialize
from airflow.utils.context import Context
from airflow.utils.log.logging_mixin import LoggingMixin
Expand All @@ -44,6 +45,17 @@
from ..utils import log_exceptions


class LookoutLink(BaseOperatorLink):
name = "Lookout"

def get_link(self, operator: BaseOperator, *, ti_key: TaskInstanceKey):
task_state = XCom.get_value(ti_key=ti_key)
if not task_state:
return ""

return task_state.get("armada_lookout_url", "")


class ArmadaOperator(BaseOperator, LoggingMixin):
"""
An Airflow operator that manages Job submission to Armada.
Expand All @@ -52,6 +64,8 @@ class ArmadaOperator(BaseOperator, LoggingMixin):
and handles job cancellation if the Airflow task is killed.
"""

operator_extra_links = (LookoutLink(),)

template_fields: Sequence[str] = ("job_request", "job_set_prefix")
template_fields_renderers: Dict[str, str] = {"job_request": "py"}

Expand Down
10 changes: 10 additions & 0 deletions third_party/airflow/armada/plugin.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
from airflow.plugins_manager import AirflowPlugin

from .armada.operators.armada import LookoutLink


class AirflowExtraLinkPlugin(AirflowPlugin):
name = "extra_link_plugin"
operator_extra_links = [
LookoutLink(),
]
5 changes: 2 additions & 3 deletions third_party/airflow/examples/bad_armada.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,8 @@
from armada.operators.armada import ArmadaOperator
from armada_client.armada import submit_pb2
from armada_client.k8s.io.api.core.v1 import generated_pb2 as core_v1
from armada_client.k8s.io.apimachinery.pkg.api.resource import (
generated_pb2 as api_resource,
)
from armada_client.k8s.io.apimachinery.pkg.api.resource import \
generated_pb2 as api_resource


def submit_sleep_container(image: str):
Expand Down
5 changes: 2 additions & 3 deletions third_party/airflow/examples/big_armada.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,8 @@
from armada.operators.armada import ArmadaOperator
from armada_client.armada import submit_pb2
from armada_client.k8s.io.api.core.v1 import generated_pb2 as core_v1
from armada_client.k8s.io.apimachinery.pkg.api.resource import (
generated_pb2 as api_resource,
)
from armada_client.k8s.io.apimachinery.pkg.api.resource import \
generated_pb2 as api_resource


def submit_sleep_job():
Expand Down
5 changes: 2 additions & 3 deletions third_party/airflow/examples/hello_armada.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,8 @@
from armada.operators.armada import ArmadaOperator
from armada_client.armada import submit_pb2
from armada_client.k8s.io.api.core.v1 import generated_pb2 as core_v1
from armada_client.k8s.io.apimachinery.pkg.api.resource import (
generated_pb2 as api_resource,
)
from armada_client.k8s.io.apimachinery.pkg.api.resource import \
generated_pb2 as api_resource


def submit_sleep_job():
Expand Down
5 changes: 2 additions & 3 deletions third_party/airflow/examples/hello_armada_deferrable.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,8 @@
from armada.operators.armada import ArmadaOperator
from armada_client.armada import submit_pb2
from armada_client.k8s.io.api.core.v1 import generated_pb2 as core_v1
from armada_client.k8s.io.apimachinery.pkg.api.resource import (
generated_pb2 as api_resource,
)
from armada_client.k8s.io.apimachinery.pkg.api.resource import \
generated_pb2 as api_resource


def submit_sleep_job():
Expand Down
3 changes: 3 additions & 0 deletions third_party/airflow/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,9 @@ test = ["pytest==7.3.1", "coverage==7.3.2", "pytest-asyncio==0.21.1",
# note(JayF): sphinx-jekyll-builder was broken by sphinx-markdown-builder 0.6 -- so pin to 0.5.5
docs = ["sphinx==7.1.2", "sphinx-jekyll-builder==0.3.0", "sphinx-toolbox==3.2.0b1", "sphinx-markdown-builder==0.5.5"]

[project.entry-points.apache_airflow_provider]
provider_info = "armada.__init__:get_provider_info"

[project.urls]
repository='https://github.com/armadaproject/armada'

Expand Down

0 comments on commit 537852a

Please sign in to comment.