Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Create worker pods through Deployments #730

Merged
merged 5 commits into from
Jun 23, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
56 changes: 35 additions & 21 deletions dask_kubernetes/operator/controller/controller.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
import asyncio
import copy
from collections import defaultdict
import time
from contextlib import suppress
Expand Down Expand Up @@ -111,10 +110,9 @@ def build_scheduler_service_spec(cluster_name, spec, annotations, labels):
}


def build_worker_pod_spec(
worker_group_name, namespace, cluster_name, uuid, spec, annotations, labels
def build_worker_deployment_spec(
worker_group_name, namespace, cluster_name, uuid, pod_spec, annotations, labels
):
spec = copy.deepcopy(spec)
labels.update(
**{
"dask.org/cluster-name": cluster_name,
Expand All @@ -124,14 +122,24 @@ def build_worker_pod_spec(
}
)
worker_name = f"{worker_group_name}-worker-{uuid}"
pod_spec = {
"apiVersion": "v1",
"kind": "Pod",
"metadata": {
"name": worker_name,
"labels": labels,
"annotations": annotations,
},
metadata = {
"name": worker_name,
"labels": labels,
"annotations": annotations,
}
spec = {}
spec["replicas"] = 1 # make_worker_spec returns dict with a replicas key?
spec["selector"] = {
"matchLabels": labels,
}
spec["template"] = {
"metadata": metadata,
"spec": pod_spec,
}
deployment_spec = {
"apiVersion": "apps/v1",
"kind": "Deployment",
"metadata": metadata,
"spec": spec,
}
env = [
Expand All @@ -144,12 +152,14 @@ def build_worker_pod_spec(
"value": f"tcp://{cluster_name}-scheduler.{namespace}.svc.cluster.local:8786",
},
]
for i in range(len(pod_spec["spec"]["containers"])):
if "env" in pod_spec["spec"]["containers"][i]:
pod_spec["spec"]["containers"][i]["env"].extend(env)
for i in range(len(deployment_spec["spec"]["template"]["spec"]["containers"])):
if "env" in deployment_spec["spec"]["template"]["spec"]["containers"][i]:
deployment_spec["spec"]["template"]["spec"]["containers"][i]["env"].extend(
env
)
else:
pod_spec["spec"]["containers"][i]["env"] = env
return pod_spec
deployment_spec["spec"]["template"]["spec"]["containers"][i]["env"] = env
return deployment_spec


def get_job_runner_pod_name(job_name):
Expand Down Expand Up @@ -632,18 +642,20 @@ async def daskworkergroup_replica_update(
labels.update(**worker_spec["metadata"]["labels"])
if workers_needed > 0:
for _ in range(workers_needed):
data = build_worker_pod_spec(
data = build_worker_deployment_spec(
worker_group_name=name,
namespace=namespace,
cluster_name=cluster_name,
uuid=uuid4().hex[:10],
spec=worker_spec["spec"],
pod_spec=worker_spec["spec"],
annotations=annotations,
labels=labels,
)
kopf.adopt(data, owner=body)
kopf.label(data, labels=cluster_labels)
await corev1api.create_namespaced_pod(
await kubernetes.client.AppsV1Api(
api_client
).create_namespaced_deployment(
namespace=namespace,
body=data,
)
Expand All @@ -660,7 +672,9 @@ async def daskworkergroup_replica_update(
)
logger.info(f"Workers to close: {worker_ids}")
for wid in worker_ids:
await corev1api.delete_namespaced_pod(
await kubernetes.client.AppsV1Api(
api_client
).delete_namespaced_deployment(
name=wid,
namespace=namespace,
)
Expand Down
40 changes: 40 additions & 0 deletions dask_kubernetes/operator/controller/tests/test_controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -374,6 +374,46 @@ async def test_recreate_scheduler_pod(k8s_cluster, kopf_runner, gen_cluster):
)


@pytest.mark.asyncio
async def test_recreate_worker_pods(k8s_cluster, kopf_runner, gen_cluster):
with kopf_runner as runner:
async with gen_cluster() as (cluster_name, ns):
scheduler_deployment_name = "simple-scheduler"
worker_deployment_name = "simple-default-worker"
service_name = "simple-scheduler"
while scheduler_deployment_name not in k8s_cluster.kubectl(
"get", "pods", "-n", ns
):
await asyncio.sleep(0.1)
while service_name not in k8s_cluster.kubectl("get", "svc", "-n", ns):
await asyncio.sleep(0.1)
while worker_deployment_name not in k8s_cluster.kubectl(
"get", "pods", "-n", ns
):
await asyncio.sleep(0.1)
k8s_cluster.kubectl(
"delete",
"pods",
"-l",
"dask.org/cluster-name=simple,dask.org/component=worker",
"-n",
ns,
)
k8s_cluster.kubectl(
"wait",
"--for=condition=Ready",
"-l",
"dask.org/cluster-name=simple,dask.org/component=worker",
"pod",
"-n",
ns,
"--timeout=60s",
)
assert worker_deployment_name in k8s_cluster.kubectl(
"get", "pods", "-n", ns
)
Comment on lines +379 to +414
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I spent some time tinkering with this test. I wrote this implementation which uses kr8s and tests a little more thoroughly that the Pods get created, become Ready, get deleted, get recreated and get back to Ready.

This has also inspired a few tweaks to kr8s so I may modify this a little further after it is merged.

Suggested change
with kopf_runner as runner:
async with gen_cluster() as (cluster_name, ns):
scheduler_deployment_name = "simple-scheduler"
worker_deployment_name = "simple-default-worker"
service_name = "simple-scheduler"
while scheduler_deployment_name not in k8s_cluster.kubectl(
"get", "pods", "-n", ns
):
await asyncio.sleep(0.1)
while service_name not in k8s_cluster.kubectl("get", "svc", "-n", ns):
await asyncio.sleep(0.1)
while worker_deployment_name not in k8s_cluster.kubectl(
"get", "pods", "-n", ns
):
await asyncio.sleep(0.1)
k8s_cluster.kubectl(
"delete",
"pods",
"-l",
"dask.org/cluster-name=simple,dask.org/component=worker",
"-n",
ns,
)
k8s_cluster.kubectl(
"wait",
"--for=condition=Ready",
"-l",
"dask.org/cluster-name=simple,dask.org/component=worker",
"pod",
"-n",
ns,
"--timeout=60s",
)
assert worker_deployment_name in k8s_cluster.kubectl(
"get", "pods", "-n", ns
)
api = await kr8s.asyncio.api()
with kopf_runner as runner:
async with gen_cluster() as (cluster_name, ns):
# Wait for worker Pods to be created
while True:
pods = await api.get(
"pods",
namespace=ns,
label_selector=f"dask.org/cluster-name={cluster_name},dask.org/component=worker",
)
if not pods:
await asyncio.sleep(0.1)
continue
break
# Store number of workers
n_pods = len(pods)
# Wait for worker Pods to be ready
await asyncio.gather(
*[pod.wait(conditions="condition=Ready", timeout=60) for pod in pods]
)
# Delete a worker Pod
await pods[0].delete()
# Wait for Pods to be recreated
while True:
pods = await api.get(
"pods",
namespace=ns,
label_selector=f"dask.org/cluster-name={cluster_name},dask.org/component=worker",
)
if len(pods) < n_pods:
await asyncio.sleep(0.1)
continue
break
# Wait for worker Pods to be ready
await asyncio.gather(
*[pod.wait(conditions="condition=Ready", timeout=60) for pod in pods]
)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think I'm going to merge this and then update things in a follow up.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Follow up is #743



def _get_job_status(k8s_cluster, ns):
return json.loads(
k8s_cluster.kubectl(
Expand Down