Skip to content

Commit

Permalink
Add Pod deletion logic to webhook
Browse files Browse the repository at this point in the history
  • Loading branch information
ryanaoleary committed Mar 5, 2024
1 parent 12b0fdc commit aa3315f
Show file tree
Hide file tree
Showing 3 changed files with 79 additions and 24 deletions.
Binary file modified applications/ray/kuberay-tpu-webhook/bin/kuberay-tpu-webhook
Binary file not shown.
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,10 @@ webhooks:
namespace: default
path: /validate
rules:
- operations: ["DELETE"]
apiGroups: [""]
apiVersions: ["v1"]
resources: ["pods"]
- operations: ["CREATE"]
apiGroups: ["ray.io"]
apiVersions: ["*"]
Expand Down
99 changes: 75 additions & 24 deletions applications/ray/kuberay-tpu-webhook/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ type slice struct {
type worker struct {
workerIndex int // TPU_WORKER_ID
replicaIndex int // index of replica worker belongs to
running bool // true = pod is running, false = pod deleted or hasn't been created
isRunning bool // true = pod is running, false = pod deleted or hasn't been created
}

// JSON patch describing mutate operation(s) for incoming object
Expand Down Expand Up @@ -253,26 +253,15 @@ func validateRayCluster(admissionReview *admissionv1.AdmissionReview) (*admissio
return admissionResponse, nil
}

func hasWorkerID(container corev1.Container) bool {
func getEnvironmentVariable(varName string, container corev1.Container) string {
if container.Env != nil && len(container.Env) > 0 {
for _, envVar := range container.Env {
if envVar.Name == "TPU_WORKER_ID" {
return true
if envVar.Name == varName {
return envVar.Value
}
}
}
return false
}

func hasWorkerName(container corev1.Container) bool {
if container.Env != nil && len(container.Env) > 0 {
for _, envVar := range container.Env {
if envVar.Name == "TPU_NAME" {
return true
}
}
}
return false
return ""
}

// get next replica ID to assign a pod to
Expand All @@ -284,7 +273,7 @@ func getReplicaIndex() int {
for slice, workerList := range sliceToWorkers {
runningPods := 0
for _, worker := range workerList {
if worker.running {
if worker.isRunning {
runningPods++
}
}
Expand All @@ -308,17 +297,17 @@ func getNextWorkerID(podSlice slice, replicaIndex int) int {
replace_pod := false
// iterate through existing workers and check if any have been deleted
for _, worker := range sliceToWorkers[podSlice] {
if worker.running == false && worker.workerIndex < next_lowest_id {
if worker.isRunning == false && worker.workerIndex < next_lowest_id {
replace_pod = true
next_lowest_id = worker.workerIndex
}
}
// reassign next lowest TPU_WORKER_ID if pod has been deleted
if replace_pod == true {
for _, worker := range sliceToWorkers[podSlice] {
// set worker.running to true now that pod is being re-created
// set worker.isRunning to true now that pod is being re-created
if worker.workerIndex == next_lowest_id {
worker.running = true
worker.isRunning = true
}
}
} else {
Expand Down Expand Up @@ -388,7 +377,6 @@ func mutatePod(admissionReview *admissionv1.AdmissionReview) (*admissionv1.Admis

// inject pod affinity/anti-affinity for scheduling


// inject all environment variables into the container requesting TPUs
for i := 0; i < len(containers); i++ {
container := containers[i]
Expand All @@ -397,7 +385,7 @@ func mutatePod(admissionReview *admissionv1.AdmissionReview) (*admissionv1.Admis
// inject TPU_WORKER_HOSTNAMES set during RayCluster interception
injectHostnames(sliceToHostnames[podSlice], path, container, &patches)
// inject TPU_WORKER_ID
if !hasWorkerID(container) {
if getEnvironmentVariable("TPU_WORKER_ID", container) == "" {
workerID := corev1.EnvVar{
Name: "TPU_WORKER_ID",
Value: fmt.Sprint(tpuWorkerID),
Expand All @@ -414,7 +402,7 @@ func mutatePod(admissionReview *admissionv1.AdmissionReview) (*admissionv1.Admis
patches = append(patches, idPatch)
}
// inject TPU_NAME
if !hasWorkerName(container) {
if getEnvironmentVariable("TPU_WORKER_NAME", container) == "" {
tpuName := corev1.EnvVar{
Name: "TPU_NAME",
Value: fmt.Sprint(groupName),
Expand Down Expand Up @@ -451,6 +439,62 @@ func mutatePod(admissionReview *admissionv1.AdmissionReview) (*admissionv1.Admis
return admissionResponse, nil
}

// update sliceToWorkers map on pod deletion
func deletePod(admissionReview *admissionv1.AdmissionReview) (*admissionv1.AdmissionResponse, error) {
pod, err := extractPod(admissionReview)
if err != nil {
klog.Fatalf("Pod extraction failed: %s", err)
}

multiHostReplicaLabel := pod.Labels["multiHostReplica"]

if multiHostReplicaLabel != "" {
multiHostReplicaLabelValues := strings.Split(multiHostReplicaLabel, "-")
groupName := multiHostReplicaLabelValues[0]
replicaIndex, _ := strconv.Atoi(multiHostReplicaLabelValues[1]) // ignore error here since must be set

containers := pod.Spec.Containers
if containers == nil {
return nil, errors.New("Pod spec missing containers")
}
tpuWorkerID := -1
for _, container := range pod.Spec.Containers {
if containerRequestingTPUs(container) {
tpuWorkerID, err = strconv.Atoi(getEnvironmentVariable("TPU_WORKER_ID", container))
if err != nil {
return nil, errors.New("Unable to extract TPU_WORKER_ID")
}
}
}
// pod belongs to a multi-host replica -> update the map
for slice, _ := range sliceToWorkers {
if slice.groupName == groupName && slice.replicaIndex == replicaIndex {
// set the pod state to indicate it is not running
for _, worker := range sliceToWorkers[slice] {
if worker.workerIndex == tpuWorkerID {
worker.isRunning = false
break
}
}
break
}
}
}


// Create AdmissionResponse - we never deny the deletion request
admissionResponse := &admissionv1.AdmissionResponse{
UID: admissionReview.Request.UID,
Allowed: true,
Result: &metav1.Status{
Status: "Success",
Message: "",
},
}
return admissionResponse, nil
}


func writeCertfile(filename string, encodedData string) error {
data, err := base64.StdEncoding.DecodeString(encodedData)
if err != nil {
Expand Down Expand Up @@ -487,7 +531,14 @@ func main() {

if admissionReview.Request.Kind.Kind == "Pod" {
klog.Info("Received review for Pod")
response, err := mutatePod(admissionReview)
var response *admissionv1.AdmissionResponse
var err error
if admissionReview.Request.Operation == "CREATE" {
response, err = mutatePod(admissionReview)
}
if admissionReview.Request.Operation == "DELETE" {
response, err = deletePod(admissionReview)
}
if err != nil {
klog.Errorf("Failed to mutate pod: %s", err)
return
Expand Down

0 comments on commit aa3315f

Please sign in to comment.