Skip to content

Commit

Permalink
tpu-provisioner - Dedup NP creations on job-key and reduce noisy node…
Browse files Browse the repository at this point in the history
… pool deletion reconciles (#427)

* Dedup NP creations on job-key and reduce noisy node pool deletion reconciles

* Address comments

* Fix comment
  • Loading branch information
nstogner committed Mar 29, 2024
1 parent 97a9cd8 commit 49efb9d
Show file tree
Hide file tree
Showing 3 changed files with 45 additions and 11 deletions.
41 changes: 32 additions & 9 deletions tpu-provisioner/internal/cloud/gke.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ const (
GKEAcceleratorNodeSelector = "cloud.google.com/gke-tpu-accelerator"
GKENodePoolNameLabel = "cloud.google.com/gke-nodepool"
GKENodePoolNamePrefix = "tpu-provisioner-"
jobKeyLabel = "jobset.sigs.k8s.io/job-key"
V4PodSliceAccelerator = "tpu-v4-podslice"
V5ePodSliceAccelerator = "tpu-v5-lite-podslice"
V5pPodSliceAccelerator = "tpu-v5p-slice"
Expand All @@ -48,8 +49,9 @@ type GKE struct {

Recorder record.EventRecorder

inProgressDeletes sync.Map
inProgressCreates sync.Map
inProgressDeletesNPName sync.Map
inProgressCreatesNPName sync.Map
inProgressCreatesJobKey sync.Map
}

func (g *GKE) NodePoolLabelKey() string { return GKENodePoolNameLabel }
Expand Down Expand Up @@ -79,11 +81,24 @@ func (g *GKE) EnsureNodePoolForPod(p *corev1.Pod, why string) error {
// Node Pool will occur at the same time. The result is an error:
// "do: googleapi: Error 400: Cluster is running incompatible operation ..."
// To avoid a bunch of failed requests, we dedeuplicate here.
if _, inProgress := g.inProgressCreates.Load(name); inProgress {
return ErrDuplicateRequest
if _, inProgress := g.inProgressCreatesNPName.Load(name); inProgress {
return fmt.Errorf("creation ongoing for node pool name: %v: %w", name, ErrDuplicateRequest)
}
g.inProgressCreatesNPName.Store(name, struct{}{})
defer g.inProgressCreatesNPName.Delete(name)

// A restarting JobSet will trigger a new Node Pool creation.
// The current creation attempt might overlap with the previous one,
// which could still be ongoing, so we need to deduplicate.
// This works because job-key remains constant across restarts.
// NOTE: These checks dont work across controller restarts.
if jobKey := p.Labels[jobKeyLabel]; jobKey != "" {
if _, inProgress := g.inProgressCreatesJobKey.Load(jobKey); inProgress {
return fmt.Errorf("creation ongoing for job-key: %v: %w", jobKey, ErrDuplicateRequest)
}
g.inProgressCreatesJobKey.Store(jobKey, struct{}{})
defer g.inProgressCreatesJobKey.Delete(jobKey)
}
g.inProgressCreates.Store(name, struct{}{})
defer g.inProgressCreates.Delete(name)

g.Recorder.Eventf(p, corev1.EventTypeNormal, EventNodePoolCreationStarted, "Starting creation of Node Pool %s (size = %v) because %s", name, np.InitialNodeCount, why)
call := g.Service.Projects.Locations.Clusters.NodePools.Create(g.ClusterContext.ClusterName(), req)
Expand Down Expand Up @@ -140,11 +155,11 @@ func (g *GKE) DeleteNodePool(name string, eventObj client.Object, why string) er
// Due to concurrent reconciles, multiple deletes for the same
// Node Pool will occur at the same time. The result is an error:
// To avoid a bunch of failed requests, we dedeuplicate here.
if _, inProgress := g.inProgressDeletes.Load(name); inProgress {
if _, inProgress := g.inProgressDeletesNPName.Load(name); inProgress {
return ErrDuplicateRequest
}
g.inProgressDeletes.Store(name, struct{}{})
defer g.inProgressDeletes.Delete(name)
g.inProgressDeletesNPName.Store(name, struct{}{})
defer g.inProgressDeletesNPName.Delete(name)

g.Recorder.Eventf(eventObj, corev1.EventTypeNormal, EventNodePoolDeletionStarted, "Starting deletion of Node Pool %s because %s", name, why)
op, err := g.Service.Projects.Locations.Clusters.Delete(g.ClusterContext.NodePoolName(name)).Do()
Expand Down Expand Up @@ -311,6 +326,14 @@ func sumTPURequests(p *corev1.Pod) (int, error) {
}

func podToNodePoolName(p *corev1.Pod, prefix, suffix string) string {
// Use the UID of the Pod's owner (falling back to the Pod UID if it has
// no owner) as the unique identifier for the node pool.
// It is necessary to use something that is unique to the Pod not the Job/JobSet
// because the scheduler is not guaranteed to place Pods on the same
// node pools that were created for them. This commonly happens when
// node pools are reused by other Jobs after the original Job has completed
// or restarted. Using another identifier like the job-key could result in
// deadlocks in this case.
var uid string
ref := metav1.GetControllerOf(p)
if ref != nil {
Expand Down
2 changes: 1 addition & 1 deletion tpu-provisioner/internal/controller/creation_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ func (r *CreationReconciler) Reconcile(ctx context.Context, req ctrl.Request) (c
lg.Info("Ensuring node pool for unschedulable pod")
if err := r.Provider.EnsureNodePoolForPod(&pod, "pod is currently unschedulable"); err != nil {
if errors.Is(err, cloud.ErrDuplicateRequest) {
lg.Info("Ignoring duplicate request to create node pool")
lg.V(3).Info("Ignoring duplicate request to create node pool", "message", err.Error())
} else if errors.Is(err, cloud.ErrNodePoolStopping) {
wait := 5 * time.Second
lg.Info("Attempted to create a node pool that is currently undergoing deletion, retrying soon",
Expand Down
13 changes: 12 additions & 1 deletion tpu-provisioner/internal/controller/deletion_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,17 @@ func (r *DeletionReconciler) Reconcile(ctx context.Context, req ctrl.Request) (c
return ctrl.Result{}, nil
}

// Avoid noisy reconciliation when nodes are shutting down.
for _, c := range node.Status.Conditions {
if c.Type == corev1.NodeReady &&
c.Status == corev1.ConditionFalse &&
c.Reason == "KubeletNotReady" &&
c.Message == "node is shutting down" {
lg.V(3).Info("Node is shutting down, ignoring")
return ctrl.Result{}, nil
}
}

// Ensure node was not just created to make sure Pods have had time to schedule.
if since := time.Since(node.GetCreationTimestamp().Time); since < r.NodeCriteria.MinLifetime {
wait := r.NodeCriteria.MinLifetime - since + time.Second
Expand Down Expand Up @@ -144,7 +155,7 @@ func (r *DeletionReconciler) Reconcile(ctx context.Context, req ctrl.Request) (c
lg.Info(fmt.Sprintf("Node pool %q passed deletion check twice. Ensuring Node Pool is deleted", nodePoolName))
if err := r.Provider.DeleteNodePoolForNode(&node, "no user Pods are running on any of the Nodes in this node pool"); err != nil {
if errors.Is(err, cloud.ErrDuplicateRequest) {
lg.Info("Ignoring duplicate request to delete node pool")
lg.V(3).Info("Ignoring duplicate request to delete node pool")
return ctrl.Result{}, nil
} else {
return ctrl.Result{}, err
Expand Down

0 comments on commit 49efb9d

Please sign in to comment.