Skip to content

Commit

Permalink
Create dags_airflow_fede_default_queue.py
Browse files Browse the repository at this point in the history
  • Loading branch information
leggerf authored Jan 29, 2024
1 parent 4010444 commit 34950aa
Showing 1 changed file with 339 additions and 0 deletions.
339 changes: 339 additions & 0 deletions dags_airflow_fede_default_queue.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,339 @@
"""
This is an example dag for using a Kubernetes Executor Configuration.
It starts the following tasks:
1) pod with annotation
2) pod with mounted volume
3) pod with sidecar and shared volume
4) pod with label
5) pod with other namespace
6) pod with image
7) pod with resource limits
"""
from __future__ import annotations

import logging
import os
from pathlib import Path

import pendulum
from airflow.configuration import conf
from airflow.decorators import task
from airflow.example_dags.libs.helper import print_stuff
from airflow.models.dag import DAG

log = logging.getLogger(__name__)

# Check k8s is there
try:
from kubernetes.client import models as k8s
except ImportError:
log.warning(
"This DAG requires the kubernetes provider."
" Please install it with: pip install apache-airflow[cncf.kubernetes]"
)
k8s = None

default_queue = "default"

if k8s:
with DAG(
dag_id="kubernetes_executor",
schedule=None,
start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
catchup=False,
tags=["fede"],
) as dag:

#############################################################
# Define config for task with pod annotation
#############################################################
executor_config_annotation = {
"pod_override": k8s.V1Pod(
metadata=k8s.V1ObjectMeta(annotations={"test": "fede"})
)
}

#############################################################
# 1) task with pod annotation
#############################################################
@task(
executor_config=executor_config_annotation,
queue=default_queue,
task_id="task_k8s_annotation",
)
def test_annotation():
print_stuff()

annotation_task = test_annotation()

#############################################################
# Define config for task with volume, mount host /tmp/ to /foo/
#############################################################
executor_config_volume_mount = {
"pod_override": k8s.V1Pod(
spec=k8s.V1PodSpec(
containers=[
k8s.V1Container(
name="base",
volume_mounts=[
k8s.V1VolumeMount(
mount_path="/foo/", name="test-volume"
)
],
)
],
volumes=[
k8s.V1Volume(
name="test-volume",
host_path=k8s.V1HostPathVolumeSource(path="/tmp/"),
)
],
)
),
}

###########################################################
# 2) task with mount volume
###########################################################
@task(
executor_config=executor_config_volume_mount,
queue=default_queue,
task_id="task_k8s_volume_mount",
)
def test_volume_mount():
"""
Tests whether the volume has been mounted.
"""

foo_dir = Path("/foo")
if foo_dir.exists():
log.info("Can open /foo/")
else:
log.error("Cannot open /foo/")

try:
with open("/foo/volume_mount_test.txt", "w+") as foo:
foo.write("Hello")
except Exception:
log.error("Cannot open file /foo/volume_mount_test.txt")

return_code = os.system("cat /foo/volume_mount_test.txt")
if return_code != 0:
raise ValueError(
f"Error when checking volume mount. Return code {return_code}"
)

volume_task = test_volume_mount()

#############################################################
# Define config for task with sidecar and shared volume /shared/
#############################################################
executor_config_sidecar = {
"pod_override": k8s.V1Pod(
spec=k8s.V1PodSpec(
containers=[
k8s.V1Container(
name="base",
volume_mounts=[
k8s.V1VolumeMount(
mount_path="/shared/", name="shared-empty-dir"
)
],
),
k8s.V1Container(
name="sidecar",
image="ubuntu",
args=['echo "retrieved from mount" > /shared/test.txt'],
command=["bash", "-cx"],
volume_mounts=[
k8s.V1VolumeMount(
mount_path="/shared/", name="shared-empty-dir"
)
],
),
],
volumes=[
k8s.V1Volume(
name="shared-empty-dir",
empty_dir=k8s.V1EmptyDirVolumeSource(),
),
],
)
),
}

###########################################################
# 3) pod with sidecar and shared volumes
###########################################################
@task(
executor_config=executor_config_sidecar,
queue=default_queue,
task_id="task_k8s_sidecar",
)
def test_sharedvolume_mount():
"""
Tests whether the volume has been mounted.
"""
for i in range(5):
try:
exit_code = os.system("cat /shared/test.txt")
if exit_code != 0:
raise ValueError(
f"Error when checking volume mount. Exit code {exit_code}"
)
except ValueError as e:
if i > 4:
raise e

sidecar_task = test_sharedvolume_mount()

#############################################################
# Define config for task: pod with label
#############################################################
executor_config_label = {
"pod_override": k8s.V1Pod(
metadata=k8s.V1ObjectMeta(labels={"release": "stable"})
)
}

#############################################################
# 4) pod with label
#############################################################
@task(
executor_config=executor_config_label,
queue=default_queue,
task_id="task_k8s_label",
)
def test_label():
print_stuff()

label_task = test_label()

#############################################################
# Define config for task: pod with namespace
#############################################################
executor_config_other_ns = {
"pod_override": k8s.V1Pod(
metadata=k8s.V1ObjectMeta(
namespace="test-namespace", labels={"release": "stable"}
)
)
}

#############################################################
# 5) pod with other namespace
#############################################################
@task(
executor_config=executor_config_other_ns,
queue=default_queue,
task_id="task_k8s_other_ns",
)
def other_namespace_task():
print_stuff()

other_ns_task = other_namespace_task()

#############################################################
# Define config for task: pod with image
#############################################################

# use image from worker node as example
repo = conf.get("kubernetes_executor", "worker_container_repository")
tag = conf.get("kubernetes_executor", "worker_container_tag")

kube_exec_config_image = {
"pod_override": k8s.V1Pod(
spec=k8s.V1PodSpec(
containers=[
k8s.V1Container(
name="base",
image=f"{repo}:{tag}",
),
]
)
)
}

#############################################################
# 6) pod with image
#############################################################
@task(
executor_config=kube_exec_config_image,
queue=default_queue,
task_id="task_k8s_image",
)
def image_override_task():
print_stuff()

image_task = image_override_task()

#############################################################
# Define config for task: pod with resource limits
#############################################################

# Use k8s_client.V1Affinity to define node affinity
k8s_affinity = k8s.V1Affinity(
pod_anti_affinity=k8s.V1PodAntiAffinity(
required_during_scheduling_ignored_during_execution=[
k8s.V1PodAffinityTerm(
label_selector=k8s.V1LabelSelector(
match_expressions=[
k8s.V1LabelSelectorRequirement(
key="app", operator="In", values=["airflow"]
)
]
),
topology_key="kubernetes.io/hostname",
)
]
)
)

# Use k8s_client.V1Toleration to define node tolerations
k8s_tolerations = [
k8s.V1Toleration(key="dedicated", operator="Equal", value="airflow")
]

# Use k8s_client.V1ResourceRequirements to define resource limits
k8s_resource_requirements = k8s.V1ResourceRequirements(
requests={"memory": "512Mi"}, limits={"memory": "512Mi"}
)

kube_exec_config_resource_limits = {
"pod_override": k8s.V1Pod(
spec=k8s.V1PodSpec(
containers=[
k8s.V1Container(
name="base",
resources=k8s_resource_requirements,
)
],
affinity=k8s_affinity,
tolerations=k8s_tolerations,
)
)
}

#############################################################
# 7) pod with resource limits
#############################################################
@task(
executor_config=kube_exec_config_resource_limits,
queue=default_queue,
task_id="task_k8s_resource_limits",
)
def task_with_resource_limits():
print_stuff()

resource_task = task_with_resource_limits()

#############################################################
# Define DAG execution
#############################################################
(
annotation_task # first execute this
>> [other_ns_task, sidecar_task] # then these
>> label_task # then this
>> [image_task, resource_task] # then the last two
>> volume_task
)

0 comments on commit 34950aa

Please sign in to comment.