Skip to content

Commit

Permalink
Add node pool garbage collector and improve events
Browse files Browse the repository at this point in the history
  • Loading branch information
nstogner committed Mar 7, 2024
1 parent 4fe6ac9 commit 5381442
Show file tree
Hide file tree
Showing 10 changed files with 233 additions and 51 deletions.
25 changes: 22 additions & 3 deletions tpu-provisioner/cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"net/http"
"os"
"strings"
"sync"
"time"

"k8s.io/apimachinery/pkg/runtime/schema"
Expand Down Expand Up @@ -198,6 +199,7 @@ func main() {
NodeSecondaryDisk: cfg.GCPNodeSecondaryDisk,
NodeTags: cfg.GCPNodeTags,
},
Recorder: mgr.GetEventRecorderFor("tpu-provisioner"),
}
case "mock":
provider = &cloud.Mock{}
Expand All @@ -209,7 +211,7 @@ func main() {
if err := (&controller.CreationReconciler{
Client: mgr.GetClient(),
Scheme: mgr.GetScheme(),
Recorder: mgr.GetEventRecorderFor("tpu-provisioner-creator"),
Recorder: mgr.GetEventRecorderFor("tpu-provisioner"),
Provider: provider,
PodCriteria: controller.PodCriteria{
ResourceType: cfg.PodResourceType,
Expand All @@ -222,7 +224,7 @@ func main() {
if err := (&controller.DeletionReconciler{
Client: mgr.GetClient(),
Scheme: mgr.GetScheme(),
Recorder: mgr.GetEventRecorderFor("tpu-provisioner-deleter"),
Recorder: mgr.GetEventRecorderFor("tpu-provisioner"),
Provider: provider,
NodeCriteria: controller.NodeCriteria{
MinLifetime: cfg.NodeMinLifespan,
Expand All @@ -241,10 +243,27 @@ func main() {
setupLog.Error(err, "unable to set up ready check")
os.Exit(1)
}
ctx := ctrl.SetupSignalHandler()

gc := &controller.NodePoolGarbageCollector{
Interval: time.Minute,
Client: mgr.GetClient(),
Provider: provider,
}
var wg sync.WaitGroup
wg.Add(1)
go func() {
gc.Run(ctx)
wg.Done()
}()

setupLog.Info("starting manager")
if err := mgr.Start(ctrl.SetupSignalHandler()); err != nil {
if err := mgr.Start(ctx); err != nil {
setupLog.Error(err, "problem running manager")
os.Exit(1)
}

setupLog.Info("waiting for all goroutines to finish")
wg.Wait()
setupLog.Info("exiting")
}
25 changes: 25 additions & 0 deletions tpu-provisioner/internal/cloud/constants.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
package cloud

const (
keyPrefix = "google.com/"

LabelNodepoolManager = keyPrefix + "nodepool-manager"
LabelNodepoolManagerTPUPodinator = "tpu-provisioner"

LabelParentKind = keyPrefix + "tpu-provisioner-parent-kind"
LabelParentName = keyPrefix + "tpu-provisioner-parent-name"
LabelParentNamespace = keyPrefix + "tpu-provisioner-parent-namespace"

LabelPodName = keyPrefix + "tpu-provisioner-pod-name"
LabelPodNamespace = keyPrefix + "tpu-provisioner-pod-namespace"

EventNodePoolCreationStarted = "NodePoolCreationStarted"
EventNodePoolCreationSucceeded = "NodePoolCreationSucceeded"
EventNodePoolCreationFailed = "NodePoolCreationFailed"

EventNodePoolDeletionStarted = "NodePoolDeletionStarted"
EventNodePoolDeletionSucceeded = "NodePoolDeletionSucceeded"
EventNodePoolDeletionFailed = "NodePoolDeletionFailed"

EventNodePoolNotFound = "NodePoolNotFound"
)
68 changes: 62 additions & 6 deletions tpu-provisioner/internal/cloud/gke.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,9 @@ import (
"google.golang.org/api/googleapi"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/tools/record"
"sigs.k8s.io/controller-runtime/pkg/client"
logf "sigs.k8s.io/controller-runtime/pkg/log"
)

Expand All @@ -37,17 +40,21 @@ const (
maxPodsPerNode = 15
)

var _ Provider = &GKE{}

type GKE struct {
Service *containerv1beta1.Service
ClusterContext GKEContext

Recorder record.EventRecorder

inProgressDeletes sync.Map
inProgressCreates sync.Map
}

func (g *GKE) NodePoolLabelKey() string { return GKENodePoolNameLabel }

func (g *GKE) EnsureNodePoolForPod(p *corev1.Pod) error {
func (g *GKE) EnsureNodePoolForPod(p *corev1.Pod, why string) error {
name := podToNodePoolName(p, GKENodePoolNamePrefix, "")

exists, err := g.nodePoolExists(name)
Expand All @@ -63,8 +70,6 @@ func (g *GKE) EnsureNodePoolForPod(p *corev1.Pod) error {
return fmt.Errorf("determining node pool for pod: %w", err)
}

log.Info("creating node pool", "name", name, "nodeCount", np.InitialNodeCount)

req := &containerv1beta1.CreateNodePoolRequest{
NodePool: np,
Parent: g.ClusterContext.ClusterName(),
Expand All @@ -80,20 +85,58 @@ func (g *GKE) EnsureNodePoolForPod(p *corev1.Pod) error {
g.inProgressCreates.Store(name, struct{}{})
defer g.inProgressCreates.Delete(name)

g.Recorder.Eventf(p, corev1.EventTypeNormal, EventNodePoolCreationStarted, "Starting creation of Node Pool %s (size = %v) because %s", name, np.InitialNodeCount, why)
call := g.Service.Projects.Locations.Clusters.NodePools.Create(g.ClusterContext.ClusterName(), req)
op, err := call.Do()
if err != nil {
g.Recorder.Eventf(p, corev1.EventTypeWarning, EventNodePoolCreationFailed, "Request to create Node Pool %s failed: %v.", name, err)
return fmt.Errorf("do: %w", err)
}

return waitForGkeOp(g.Service, g.ClusterContext, op)
if err := waitForGkeOp(g.Service, g.ClusterContext, op); err != nil {
g.Recorder.Eventf(p, corev1.EventTypeWarning, EventNodePoolCreationFailed, "Operation to create Node Pool %s failed: %v.", name, err)
return fmt.Errorf("waiting for operation: %w", err)
}

g.Recorder.Eventf(p, corev1.EventTypeNormal, EventNodePoolCreationSucceeded, "Successfully created Node Pool %s.", name)

return nil
}

func (g *GKE) DeleteNodePoolForNode(node *corev1.Node) error {
func (g *GKE) ListNodePools() ([]NodePoolRef, error) {
var names []NodePoolRef

resp, err := g.Service.Projects.Locations.Clusters.NodePools.List(g.ClusterContext.ClusterName()).Do()
if err != nil {
return nil, fmt.Errorf("listing node pools: %w", err)

}

for _, np := range resp.NodePools {
names = append(names, NodePoolRef{
Name: np.Name,
Error: np.Status == "ERROR",
ErrorMsg: np.StatusMessage,
CreatedForPod: types.NamespacedName{
Name: np.Config.Labels[LabelPodName],
Namespace: np.Config.Labels[LabelPodNamespace],
},
})
}

return names, nil
}

func (g *GKE) DeleteNodePoolForNode(node *corev1.Node, why string) error {
name, ok := node.GetLabels()[g.NodePoolLabelKey()]
if !ok {
return fmt.Errorf("node %q does not have node pool label", node.Name)
}

return g.DeleteNodePool(name, node, why)
}

func (g *GKE) DeleteNodePool(name string, eventObj client.Object, why string) error {
// Due to concurrent reconciles, multiple deletes for the same
// Node Pool will occur at the same time. The result is an error:
// To avoid a bunch of failed requests, we dedeuplicate here.
Expand All @@ -103,15 +146,25 @@ func (g *GKE) DeleteNodePoolForNode(node *corev1.Node) error {
g.inProgressDeletes.Store(name, struct{}{})
defer g.inProgressDeletes.Delete(name)

g.Recorder.Eventf(eventObj, corev1.EventTypeNormal, EventNodePoolDeletionStarted, "Starting deletion of Node Pool %s because %s", name, why)
op, err := g.Service.Projects.Locations.Clusters.Delete(g.ClusterContext.NodePoolName(name)).Do()
if err != nil {
if gerr, ok := err.(*googleapi.Error); ok && gerr.Code == http.StatusNotFound {
g.Recorder.Eventf(eventObj, corev1.EventTypeNormal, EventNodePoolNotFound, "Node pool not found - ignoring deletion attempt.", name)
return nil
}
g.Recorder.Eventf(eventObj, corev1.EventTypeWarning, EventNodePoolDeletionFailed, "Request to delete Node Pool %s failed: %v.", name, err)
return fmt.Errorf("deleting node pool %q: %w", name, err)
}

return waitForGkeOp(g.Service, g.ClusterContext, op)
if err := waitForGkeOp(g.Service, g.ClusterContext, op); err != nil {
g.Recorder.Eventf(eventObj, corev1.EventTypeWarning, EventNodePoolDeletionFailed, "Operation to delete Node Pool %s failed: %v.", name, err)
return err
}

g.Recorder.Eventf(eventObj, corev1.EventTypeNormal, EventNodePoolDeletionSucceeded, "Successfully deleted Node Pool %s.", name)

return nil
}

var ErrNodePoolStopping = errors.New("node pool stopping")
Expand Down Expand Up @@ -148,6 +201,9 @@ func (g *GKE) nodePoolForPod(name string, p *corev1.Pod) (*containerv1beta1.Node
LabelParentName: strings.ToLower(ref.Name),
// Assuming a Namespaced parent here...
LabelParentNamespace: strings.ToLower(p.Namespace),

LabelPodName: p.Name,
LabelPodNamespace: p.Namespace,
}

for k, v := range p.Spec.NodeSelector {
Expand Down
20 changes: 18 additions & 2 deletions tpu-provisioner/internal/cloud/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,30 @@ package cloud

import (
"errors"
"time"

corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/types"
"sigs.k8s.io/controller-runtime/pkg/client"
)

type Provider interface {
NodePoolLabelKey() string
EnsureNodePoolForPod(*corev1.Pod) error
DeleteNodePoolForNode(*corev1.Node) error
EnsureNodePoolForPod(*corev1.Pod, string) error
DeleteNodePoolForNode(*corev1.Node, string) error
DeleteNodePool(string, client.Object, string) error
ListNodePools() ([]NodePoolRef, error)
}

var ErrDuplicateRequest = errors.New("duplicate request")

type NodePoolRef struct {
Name string

CreationTime time.Time

CreatedForPod types.NamespacedName

Error bool
ErrorMsg string
}
12 changes: 0 additions & 12 deletions tpu-provisioner/internal/cloud/labels.go

This file was deleted.

15 changes: 11 additions & 4 deletions tpu-provisioner/internal/cloud/mock.go
Original file line number Diff line number Diff line change
@@ -1,12 +1,19 @@
package cloud

import corev1 "k8s.io/api/core/v1"
import (
corev1 "k8s.io/api/core/v1"
"sigs.k8s.io/controller-runtime/pkg/client"
)

var _ Provider = &Mock{}

// Mock is useful for local development or debugging purposes to understand what
// the controller would do without it doing anything.
type Mock struct{}

// TODO: Find a better mock node pool label key.
func (m *Mock) NodePoolLabelKey() string { return "kubernetes.io/os" }
func (m *Mock) EnsureNodePoolForPod(*corev1.Pod) error { return nil }
func (m *Mock) DeleteNodePoolForNode(*corev1.Node) error { return nil }
func (m *Mock) NodePoolLabelKey() string { return "kubernetes.io/os" }
func (m *Mock) EnsureNodePoolForPod(*corev1.Pod, string) error { return nil }
func (m *Mock) DeleteNodePoolForNode(*corev1.Node, string) error { return nil }
func (m *Mock) DeleteNodePool(string, client.Object, string) error { return nil }
func (m *Mock) ListNodePools() ([]NodePoolRef, error) { return nil, nil }
15 changes: 7 additions & 8 deletions tpu-provisioner/internal/controller/creation_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ import (
"k8s.io/client-go/tools/record"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/log"
ctrllog "sigs.k8s.io/controller-runtime/pkg/log"
)

// CreationReconciler watches Pods and creates Node Pools.
Expand All @@ -53,7 +53,7 @@ type PodCriteria struct {
//+kubebuilder:rbac:groups="",resources=pods/finalizers,verbs=update

func (r *CreationReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
lg := log.FromContext(ctx)
lg := ctrllog.FromContext(ctx)

lg.V(3).Info("Reconciling Pod")

Expand All @@ -67,14 +67,16 @@ func (r *CreationReconciler) Reconcile(ctx context.Context, req ctrl.Request) (c
}

// Return early if Pod should not trigger a scale up.
if !isPending(&pod) || !isUnschedulable(&pod) || !doesRequestResource(&pod, r.PodCriteria.ResourceType) || !hasNodeSelectors(&pod, cloud.GKETPUNodeSelector) {
if !isPending(&pod) || !isUnschedulable(&pod) ||
!doesRequestResource(&pod, r.PodCriteria.ResourceType) ||
!hasNodeSelectors(&pod, cloud.GKETPUNodeSelector) ||
pod.DeletionTimestamp != nil {
lg.V(3).Info("Ignoring pod")
return ctrl.Result{}, nil
}

lg.Info("Ensuring node pool for unschedulable pod")
r.Recorder.Eventf(&pod, corev1.EventTypeNormal, EventEnsuringNodePool, "Ensuring Node Pool, triggered by Pod %s/%s.", pod.Namespace, pod.Name)
if err := r.Provider.EnsureNodePoolForPod(&pod); err != nil {
if err := r.Provider.EnsureNodePoolForPod(&pod, "pod is currently unschedulable"); err != nil {
if errors.Is(err, cloud.ErrDuplicateRequest) {
lg.Info("Ignoring duplicate request to create node pool")
} else if errors.Is(err, cloud.ErrNodePoolStopping) {
Expand All @@ -83,11 +85,8 @@ func (r *CreationReconciler) Reconcile(ctx context.Context, req ctrl.Request) (c
"wait", wait)
return ctrl.Result{RequeueAfter: wait}, nil
} else {
r.Recorder.Event(&pod, corev1.EventTypeWarning, EventFailedEnsuringNodePool, "Failed to ensure existance of Node Pool: "+err.Error())
return ctrl.Result{}, err
}
} else {
r.Recorder.Event(&pod, corev1.EventTypeNormal, EventNodePoolEnsured, "Node Pool Ensured.")
}

return ctrl.Result{}, nil
Expand Down
9 changes: 3 additions & 6 deletions tpu-provisioner/internal/controller/deletion_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ import (
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/handler"
"sigs.k8s.io/controller-runtime/pkg/log"
ctrllog "sigs.k8s.io/controller-runtime/pkg/log"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
"sigs.k8s.io/controller-runtime/pkg/source"
)
Expand Down Expand Up @@ -50,7 +50,7 @@ type NodeCriteria struct {
//+kubebuilder:rbac:groups="",resources=nodes/finalizers,verbs=update

func (r *DeletionReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
lg := log.FromContext(ctx)
lg := ctrllog.FromContext(ctx)

lg.V(3).Info("Reconciling Node")

Expand Down Expand Up @@ -142,17 +142,14 @@ func (r *DeletionReconciler) Reconcile(ctx context.Context, req ctrl.Request) (c
// If this point is reached, the node pool has passed the deletion check twice
// and can be deleted.
lg.Info(fmt.Sprintf("Node pool %q passed deletion check twice. Ensuring Node Pool is deleted", nodePoolName))
r.Recorder.Event(&node, corev1.EventTypeNormal, EventDeletingNodePool, DeletingNodePoolEventMessage)
if err := r.Provider.DeleteNodePoolForNode(&node); err != nil {
if err := r.Provider.DeleteNodePoolForNode(&node, "no user Pods are running on any of the Nodes in this node pool"); err != nil {
if errors.Is(err, cloud.ErrDuplicateRequest) {
lg.Info("Ignoring duplicate request to delete node pool")
return ctrl.Result{}, nil
} else {
r.Recorder.Event(&node, corev1.EventTypeWarning, EventFailedDeletingNodePool, "Failed to delete Node Pool: "+err.Error())
return ctrl.Result{}, err
}
}
r.Recorder.Event(&node, corev1.EventTypeNormal, EventNodePoolDeleted, DeletedNodePoolEventMessage)

// Remove node pool from the map tracking node pools marked for deletion, in case the JobSet
// is reran in the future, as this will result in node pools with the same name being recreated,
Expand Down
Loading

0 comments on commit 5381442

Please sign in to comment.