From 49efb9d2419dbdccccf014e526c968022afda474 Mon Sep 17 00:00:00 2001 From: Nick Stogner Date: Fri, 29 Mar 2024 17:01:42 -0400 Subject: [PATCH] tpu-provisioner - Dedup NP creations on job-key and reduce noisy node pool deletion reconciles (#427) * Dedup NP creations on job-key and reduce noisy node pool deletion reconciles * Address comments * Fix comment --- tpu-provisioner/internal/cloud/gke.go | 41 +++++++++++++++---- .../controller/creation_controller.go | 2 +- .../controller/deletion_controller.go | 13 +++++- 3 files changed, 45 insertions(+), 11 deletions(-) diff --git a/tpu-provisioner/internal/cloud/gke.go b/tpu-provisioner/internal/cloud/gke.go index f1f7fab79..099e15729 100644 --- a/tpu-provisioner/internal/cloud/gke.go +++ b/tpu-provisioner/internal/cloud/gke.go @@ -26,6 +26,7 @@ const ( GKEAcceleratorNodeSelector = "cloud.google.com/gke-tpu-accelerator" GKENodePoolNameLabel = "cloud.google.com/gke-nodepool" GKENodePoolNamePrefix = "tpu-provisioner-" + jobKeyLabel = "jobset.sigs.k8s.io/job-key" V4PodSliceAccelerator = "tpu-v4-podslice" V5ePodSliceAccelerator = "tpu-v5-lite-podslice" V5pPodSliceAccelerator = "tpu-v5p-slice" @@ -48,8 +49,9 @@ type GKE struct { Recorder record.EventRecorder - inProgressDeletes sync.Map - inProgressCreates sync.Map + inProgressDeletesNPName sync.Map + inProgressCreatesNPName sync.Map + inProgressCreatesJobKey sync.Map } func (g *GKE) NodePoolLabelKey() string { return GKENodePoolNameLabel } @@ -79,11 +81,24 @@ func (g *GKE) EnsureNodePoolForPod(p *corev1.Pod, why string) error { // Node Pool will occur at the same time. The result is an error: // "do: googleapi: Error 400: Cluster is running incompatible operation ..." // To avoid a bunch of failed requests, we dedeuplicate here. - if _, inProgress := g.inProgressCreates.Load(name); inProgress { - return ErrDuplicateRequest + if _, inProgress := g.inProgressCreatesNPName.Load(name); inProgress { + return fmt.Errorf("creation ongoing for node pool name: %v: %w", name, ErrDuplicateRequest) + } + g.inProgressCreatesNPName.Store(name, struct{}{}) + defer g.inProgressCreatesNPName.Delete(name) + + // A restarting JobSet will trigger a new Node Pool creation. + // The current creation attempt might overlap with the previous one, + // which could still be ongoing, so we need to deduplicate. + // This works because job-key remains constant across restarts. + // NOTE: These checks dont work across controller restarts. + if jobKey := p.Labels[jobKeyLabel]; jobKey != "" { + if _, inProgress := g.inProgressCreatesJobKey.Load(jobKey); inProgress { + return fmt.Errorf("creation ongoing for job-key: %v: %w", jobKey, ErrDuplicateRequest) + } + g.inProgressCreatesJobKey.Store(jobKey, struct{}{}) + defer g.inProgressCreatesJobKey.Delete(jobKey) } - g.inProgressCreates.Store(name, struct{}{}) - defer g.inProgressCreates.Delete(name) g.Recorder.Eventf(p, corev1.EventTypeNormal, EventNodePoolCreationStarted, "Starting creation of Node Pool %s (size = %v) because %s", name, np.InitialNodeCount, why) call := g.Service.Projects.Locations.Clusters.NodePools.Create(g.ClusterContext.ClusterName(), req) @@ -140,11 +155,11 @@ func (g *GKE) DeleteNodePool(name string, eventObj client.Object, why string) er // Due to concurrent reconciles, multiple deletes for the same // Node Pool will occur at the same time. The result is an error: // To avoid a bunch of failed requests, we dedeuplicate here. - if _, inProgress := g.inProgressDeletes.Load(name); inProgress { + if _, inProgress := g.inProgressDeletesNPName.Load(name); inProgress { return ErrDuplicateRequest } - g.inProgressDeletes.Store(name, struct{}{}) - defer g.inProgressDeletes.Delete(name) + g.inProgressDeletesNPName.Store(name, struct{}{}) + defer g.inProgressDeletesNPName.Delete(name) g.Recorder.Eventf(eventObj, corev1.EventTypeNormal, EventNodePoolDeletionStarted, "Starting deletion of Node Pool %s because %s", name, why) op, err := g.Service.Projects.Locations.Clusters.Delete(g.ClusterContext.NodePoolName(name)).Do() @@ -311,6 +326,14 @@ func sumTPURequests(p *corev1.Pod) (int, error) { } func podToNodePoolName(p *corev1.Pod, prefix, suffix string) string { + // Use the UID of the Pod's owner (falling back to the Pod UID if it has + // no owner) as the unique identifier for the node pool. + // It is necessary to use something that is unique to the Pod not the Job/JobSet + // because the scheduler is not guaranteed to place Pods on the same + // node pools that were created for them. This commonly happens when + // node pools are reused by other Jobs after the original Job has completed + // or restarted. Using another identifier like the job-key could result in + // deadlocks in this case. var uid string ref := metav1.GetControllerOf(p) if ref != nil { diff --git a/tpu-provisioner/internal/controller/creation_controller.go b/tpu-provisioner/internal/controller/creation_controller.go index a4eca0eb3..9df607ffe 100644 --- a/tpu-provisioner/internal/controller/creation_controller.go +++ b/tpu-provisioner/internal/controller/creation_controller.go @@ -78,7 +78,7 @@ func (r *CreationReconciler) Reconcile(ctx context.Context, req ctrl.Request) (c lg.Info("Ensuring node pool for unschedulable pod") if err := r.Provider.EnsureNodePoolForPod(&pod, "pod is currently unschedulable"); err != nil { if errors.Is(err, cloud.ErrDuplicateRequest) { - lg.Info("Ignoring duplicate request to create node pool") + lg.V(3).Info("Ignoring duplicate request to create node pool", "message", err.Error()) } else if errors.Is(err, cloud.ErrNodePoolStopping) { wait := 5 * time.Second lg.Info("Attempted to create a node pool that is currently undergoing deletion, retrying soon", diff --git a/tpu-provisioner/internal/controller/deletion_controller.go b/tpu-provisioner/internal/controller/deletion_controller.go index 8b2ed9e4b..5813f1318 100644 --- a/tpu-provisioner/internal/controller/deletion_controller.go +++ b/tpu-provisioner/internal/controller/deletion_controller.go @@ -69,6 +69,17 @@ func (r *DeletionReconciler) Reconcile(ctx context.Context, req ctrl.Request) (c return ctrl.Result{}, nil } + // Avoid noisy reconciliation when nodes are shutting down. + for _, c := range node.Status.Conditions { + if c.Type == corev1.NodeReady && + c.Status == corev1.ConditionFalse && + c.Reason == "KubeletNotReady" && + c.Message == "node is shutting down" { + lg.V(3).Info("Node is shutting down, ignoring") + return ctrl.Result{}, nil + } + } + // Ensure node was not just created to make sure Pods have had time to schedule. if since := time.Since(node.GetCreationTimestamp().Time); since < r.NodeCriteria.MinLifetime { wait := r.NodeCriteria.MinLifetime - since + time.Second @@ -144,7 +155,7 @@ func (r *DeletionReconciler) Reconcile(ctx context.Context, req ctrl.Request) (c lg.Info(fmt.Sprintf("Node pool %q passed deletion check twice. Ensuring Node Pool is deleted", nodePoolName)) if err := r.Provider.DeleteNodePoolForNode(&node, "no user Pods are running on any of the Nodes in this node pool"); err != nil { if errors.Is(err, cloud.ErrDuplicateRequest) { - lg.Info("Ignoring duplicate request to delete node pool") + lg.V(3).Info("Ignoring duplicate request to delete node pool") return ctrl.Result{}, nil } else { return ctrl.Result{}, err