From 3a549abcdfc67b968a8e69f652a262abb5e56f1f Mon Sep 17 00:00:00 2001 From: Jonathan Innis Date: Fri, 9 Feb 2024 00:19:47 -0800 Subject: [PATCH] Drop simulation mode in favor of NopLogger and Recording scheduling results --- pkg/controllers/disruption/controller.go | 11 +- pkg/controllers/disruption/helpers.go | 7 +- pkg/controllers/provisioning/provisioner.go | 14 ++- .../provisioning/scheduling/scheduler.go | 106 +++++++----------- .../scheduling/scheduling_benchmark_test.go | 3 +- pkg/operator/logging/logging.go | 4 + 6 files changed, 63 insertions(+), 82 deletions(-) diff --git a/pkg/controllers/disruption/controller.go b/pkg/controllers/disruption/controller.go index 44a62f3978..b3c33a6c7d 100644 --- a/pkg/controllers/disruption/controller.go +++ b/pkg/controllers/disruption/controller.go @@ -32,6 +32,8 @@ import ( "sigs.k8s.io/controller-runtime/pkg/manager" "sigs.k8s.io/controller-runtime/pkg/reconcile" + operatorlogging "sigs.k8s.io/karpenter/pkg/operator/logging" + "sigs.k8s.io/karpenter/pkg/apis/v1beta1" "sigs.k8s.io/karpenter/pkg/cloudprovider" "sigs.k8s.io/karpenter/pkg/controllers/disruption/orchestration" @@ -211,14 +213,7 @@ func (c *Controller) executeCommand(ctx context.Context, m Method, cmd Command, // tainted with the Karpenter taint, the provisioning controller will continue // to do scheduling simulations and nominate the pods on the candidate nodes until // the node is cleaned up. - for _, node := range schedulingResults.ExistingNodes { - if len(node.Pods) > 0 { - c.cluster.NominateNodeForPod(ctx, node.ProviderID()) - } - for _, pod := range node.Pods { - c.recorder.Publish(scheduling.NominatePodEvent(pod, node.Node, node.NodeClaim)) - } - } + schedulingResults.Record(logging.WithLogger(ctx, operatorlogging.NopLogger), c.recorder, c.cluster) providerIDs := lo.Map(cmd.candidates, func(c *Candidate, _ int) string { return c.ProviderID() }) // We have the new NodeClaims created at the API server so mark the old NodeClaims for deletion diff --git a/pkg/controllers/disruption/helpers.go b/pkg/controllers/disruption/helpers.go index c7d466997a..7cb70d32c2 100644 --- a/pkg/controllers/disruption/helpers.go +++ b/pkg/controllers/disruption/helpers.go @@ -41,6 +41,7 @@ import ( "sigs.k8s.io/karpenter/pkg/controllers/state" "sigs.k8s.io/karpenter/pkg/events" "sigs.k8s.io/karpenter/pkg/metrics" + operatorlogging "sigs.k8s.io/karpenter/pkg/operator/logging" "sigs.k8s.io/karpenter/pkg/scheduling" ) @@ -78,9 +79,7 @@ func SimulateScheduling(ctx context.Context, kubeClient client.Client, cluster * pods = append(pods, n.reschedulablePods...) } pods = append(pods, deletingNodePods...) - scheduler, err := provisioner.NewScheduler(ctx, pods, stateNodes, pscheduling.SchedulerOptions{ - SimulationMode: true, - }) + scheduler, err := provisioner.NewScheduler(logging.WithLogger(ctx, operatorlogging.NopLogger), pods, stateNodes) if err != nil { return pscheduling.Results{}, fmt.Errorf("creating scheduler, %w", err) } @@ -89,7 +88,7 @@ func SimulateScheduling(ctx context.Context, kubeClient client.Client, cluster * return client.ObjectKeyFromObject(p), nil }) - results := scheduler.Solve(ctx, pods) + results := scheduler.Solve(logging.WithLogger(ctx, operatorlogging.NopLogger), pods) results = results.TruncateInstanceTypes(pscheduling.MaxInstanceTypes) for _, n := range results.ExistingNodes { // We consider existing nodes for scheduling. When these nodes are unmanaged, their taint logic should diff --git a/pkg/controllers/provisioning/provisioner.go b/pkg/controllers/provisioning/provisioner.go index 92536a5404..334b6ebe13 100644 --- a/pkg/controllers/provisioning/provisioner.go +++ b/pkg/controllers/provisioning/provisioner.go @@ -192,7 +192,7 @@ func (p *Provisioner) consolidationWarnings(ctx context.Context, po *v1.Pod) { var ErrNodePoolsNotFound = errors.New("no nodepools found") //nolint:gocyclo -func (p *Provisioner) NewScheduler(ctx context.Context, pods []*v1.Pod, stateNodes []*state.StateNode, opts scheduler.SchedulerOptions) (*scheduler.Scheduler, error) { +func (p *Provisioner) NewScheduler(ctx context.Context, pods []*v1.Pod, stateNodes []*state.StateNode) (*scheduler.Scheduler, error) { // Build node templates var nodeClaimTemplates []*scheduler.NodeClaimTemplate instanceTypes := map[string][]*cloudprovider.InstanceType{} @@ -284,11 +284,12 @@ func (p *Provisioner) NewScheduler(ctx context.Context, pods []*v1.Pod, stateNod if err != nil { return nil, fmt.Errorf("getting daemon pods, %w", err) } - return scheduler.NewScheduler(ctx, p.kubeClient, nodeClaimTemplates, nodePoolList.Items, p.cluster, stateNodes, topology, instanceTypes, daemonSetPods, p.recorder, opts), nil + return scheduler.NewScheduler(ctx, p.kubeClient, nodeClaimTemplates, nodePoolList.Items, p.cluster, stateNodes, topology, instanceTypes, daemonSetPods, p.recorder), nil } func (p *Provisioner) Schedule(ctx context.Context) (scheduler.Results, error) { defer metrics.Measure(schedulingDuration)() + start := time.Now() // We collect the nodes with their used capacities before we get the list of pending pods. This ensures that // the node capacities we schedule against are always >= what the actual capacity is at any given instance. This @@ -319,7 +320,7 @@ func (p *Provisioner) Schedule(ctx context.Context) (scheduler.Results, error) { if len(pods) == 0 { return scheduler.Results{}, nil } - s, err := p.NewScheduler(ctx, pods, nodes.Active(), scheduler.SchedulerOptions{}) + s, err := p.NewScheduler(ctx, pods, nodes.Active()) if err != nil { if errors.Is(err, ErrNodePoolsNotFound) { logging.FromContext(ctx).Info(ErrNodePoolsNotFound) @@ -328,7 +329,12 @@ func (p *Provisioner) Schedule(ctx context.Context) (scheduler.Results, error) { return scheduler.Results{}, fmt.Errorf("creating scheduler, %w", err) } results := s.Solve(ctx, pods) - return results.TruncateInstanceTypes(scheduler.MaxInstanceTypes), nil + results.TruncateInstanceTypes(scheduler.MaxInstanceTypes) + logging.FromContext(ctx).With("pods", pretty.Slice(lo.Map(pods, func(p *v1.Pod, _ int) string { return client.ObjectKeyFromObject(p).String() }), 5)). + With("duration", time.Since(start)). + Infof("found provisionable pod(s)") + results.Record(ctx, p.recorder, p.cluster) + return results, nil } func (p *Provisioner) Create(ctx context.Context, n *scheduler.NodeClaim, opts ...functional.Option[LaunchOptions]) (string, error) { diff --git a/pkg/controllers/provisioning/scheduling/scheduler.go b/pkg/controllers/provisioning/scheduling/scheduler.go index b6b2d5624c..5405fe3389 100644 --- a/pkg/controllers/provisioning/scheduling/scheduler.go +++ b/pkg/controllers/provisioning/scheduling/scheduler.go @@ -21,7 +21,6 @@ import ( "context" "fmt" "sort" - "time" "github.com/samber/lo" "go.uber.org/multierr" @@ -36,20 +35,13 @@ import ( "sigs.k8s.io/karpenter/pkg/metrics" "sigs.k8s.io/karpenter/pkg/scheduling" "sigs.k8s.io/karpenter/pkg/utils/pod" - "sigs.k8s.io/karpenter/pkg/utils/pretty" "sigs.k8s.io/karpenter/pkg/utils/resources" ) -// SchedulerOptions can be used to control the scheduling, these options are currently only used during consolidation. -type SchedulerOptions struct { - // SimulationMode if true will prevent recording of the pod nomination decisions as events - SimulationMode bool -} - func NewScheduler(ctx context.Context, kubeClient client.Client, nodeClaimTemplates []*NodeClaimTemplate, nodePools []v1beta1.NodePool, cluster *state.Cluster, stateNodes []*state.StateNode, topology *Topology, instanceTypes map[string][]*cloudprovider.InstanceType, daemonSetPods []*v1.Pod, - recorder events.Recorder, opts SchedulerOptions) *Scheduler { + recorder events.Recorder) *Scheduler { // if any of the nodePools add a taint with a prefer no schedule effect, we add a toleration for the taint // during preference relaxation @@ -71,7 +63,6 @@ func NewScheduler(ctx context.Context, kubeClient client.Client, nodeClaimTempla instanceTypes: instanceTypes, daemonOverhead: getDaemonOverhead(nodeClaimTemplates, daemonSetPods), recorder: recorder, - opts: opts, preferences: &Preferences{ToleratePreferNoSchedule: toleratePreferNoSchedule}, remainingResources: map[string]v1.ResourceList{}, } @@ -94,7 +85,6 @@ type Scheduler struct { topology *Topology cluster *state.Cluster recorder events.Recorder - opts SchedulerOptions kubeClient client.Client } @@ -105,6 +95,45 @@ type Results struct { PodErrors map[*v1.Pod]error } +// Record sends eventing and log messages back for the results that were produced from a scheduling run +// It also nominates nodes in the cluster state based on the scheduling run to signal to other components +// leveraging the cluster state that a previous scheduling run that was recorded is relying on these nodes +func (r Results) Record(ctx context.Context, recorder events.Recorder, cluster *state.Cluster) { + // Report failures and nominations + for p, err := range r.PodErrors { + logging.FromContext(ctx).With("pod", client.ObjectKeyFromObject(p)).Errorf("Could not schedule pod, %s", err) + recorder.Publish(PodFailedToScheduleEvent(p, err)) + } + for _, existing := range r.ExistingNodes { + if len(existing.Pods) > 0 { + cluster.NominateNodeForPod(ctx, existing.ProviderID()) + } + for _, p := range existing.Pods { + recorder.Publish(NominatePodEvent(p, existing.Node, existing.NodeClaim)) + } + } + // Report new nodes, or exit to avoid log spam + newCount := 0 + for _, nodeClaim := range r.NewNodeClaims { + newCount += len(nodeClaim.Pods) + } + if newCount == 0 { + return + } + logging.FromContext(ctx).With("nodeclaims", len(r.NewNodeClaims), "pods", newCount).Infof("computed new nodeclaim(s) to fit pod(s)") + // Report in flight newNodes, or exit to avoid log spam + inflightCount := 0 + existingCount := 0 + for _, node := range lo.Filter(r.ExistingNodes, func(node *ExistingNode, _ int) bool { return len(node.Pods) > 0 }) { + inflightCount++ + existingCount += len(node.Pods) + } + if existingCount == 0 { + return + } + logging.FromContext(ctx).Infof("computed %d unready node(s) will fit %d pod(s)", inflightCount, existingCount) +} + // AllNonPendingPodsScheduled returns true if all pods scheduled. // We don't care if a pod was pending before consolidation and will still be pending after. It may be a pod that we can't // schedule at all and don't want it to block consolidation. @@ -172,7 +201,6 @@ func (r Results) TruncateInstanceTypes(maxInstanceTypes int) Results { func (s *Scheduler) Solve(ctx context.Context, pods []*v1.Pod) Results { defer metrics.Measure(schedulingSimulationDuration)() - schedulingStart := time.Now() // We loop trying to schedule unschedulable pods as long as we are making progress. This solves a few // issues including pods with affinity to another pod in the batch. We could topo-sort to solve this, but it wouldn't // solve the problem of scheduling pods where a particular order is needed to prevent a max-skew violation. E.g. if we @@ -205,10 +233,7 @@ func (s *Scheduler) Solve(ctx context.Context, pods []*v1.Pod) Results { for _, m := range s.newNodeClaims { m.FinalizeScheduling() } - if !s.opts.SimulationMode { - s.recordSchedulingResults(ctx, pods, q.List(), errors, time.Since(schedulingStart)) - } - // clear any nil errors so we can know that len(PodErrors) == 0 => all pods scheduled + // clear any nil errors, so we can know that len(PodErrors) == 0 => all pods scheduled for k, v := range errors { if v == nil { delete(errors, k) @@ -221,53 +246,6 @@ func (s *Scheduler) Solve(ctx context.Context, pods []*v1.Pod) Results { } } -func (s *Scheduler) recordSchedulingResults(ctx context.Context, pods []*v1.Pod, failedToSchedule []*v1.Pod, errors map[*v1.Pod]error, schedulingDuration time.Duration) { - // Report failures and nominations - for _, pod := range failedToSchedule { - logging.FromContext(ctx).With("pod", client.ObjectKeyFromObject(pod)).Errorf("Could not schedule pod, %s", errors[pod]) - s.recorder.Publish(PodFailedToScheduleEvent(pod, errors[pod])) - } - - for _, existing := range s.existingNodes { - if len(existing.Pods) > 0 { - s.cluster.NominateNodeForPod(ctx, existing.ProviderID()) - } - for _, pod := range existing.Pods { - s.recorder.Publish(NominatePodEvent(pod, existing.Node, existing.NodeClaim)) - } - } - - // Report new nodes, or exit to avoid log spam - newCount := 0 - for _, nodeClaim := range s.newNodeClaims { - newCount += len(nodeClaim.Pods) - } - if newCount == 0 { - return - } - var podNames []string - for _, p := range pods { - podNames = append(podNames, fmt.Sprintf("%s/%s", p.Namespace, p.Name)) - } - - logging.FromContext(ctx).With("pods", pretty.Slice(podNames, 5)). - With("duration", schedulingDuration). - Infof("found provisionable pod(s)") - - logging.FromContext(ctx).With("nodeclaims", len(s.newNodeClaims), "pods", newCount).Infof("computed new nodeclaim(s) to fit pod(s)") - // Report in flight newNodes, or exit to avoid log spam - inflightCount := 0 - existingCount := 0 - for _, node := range lo.Filter(s.existingNodes, func(node *ExistingNode, _ int) bool { return len(node.Pods) > 0 }) { - inflightCount++ - existingCount += len(node.Pods) - } - if existingCount == 0 { - return - } - logging.FromContext(ctx).Infof("computed %d unready node(s) will fit %d pod(s)", inflightCount, existingCount) -} - func (s *Scheduler) add(ctx context.Context, pod *v1.Pod) error { // first try to schedule against an in-flight real node for _, node := range s.existingNodes { @@ -296,7 +274,7 @@ func (s *Scheduler) add(ctx context.Context, pod *v1.Pod) error { if len(instanceTypes) == 0 { errs = multierr.Append(errs, fmt.Errorf("all available instance types exceed limits for nodepool: %q", nodeClaimTemplate.NodePoolName)) continue - } else if len(s.instanceTypes[nodeClaimTemplate.NodePoolName]) != len(instanceTypes) && !s.opts.SimulationMode { + } else if len(s.instanceTypes[nodeClaimTemplate.NodePoolName]) != len(instanceTypes) { logging.FromContext(ctx).With("nodepool", nodeClaimTemplate.NodePoolName).Debugf("%d out of %d instance types were excluded because they would breach limits", len(s.instanceTypes[nodeClaimTemplate.NodePoolName])-len(instanceTypes), len(s.instanceTypes[nodeClaimTemplate.NodePoolName])) } diff --git a/pkg/controllers/provisioning/scheduling/scheduling_benchmark_test.go b/pkg/controllers/provisioning/scheduling/scheduling_benchmark_test.go index f265915cf2..044def37f7 100644 --- a/pkg/controllers/provisioning/scheduling/scheduling_benchmark_test.go +++ b/pkg/controllers/provisioning/scheduling/scheduling_benchmark_test.go @@ -149,8 +149,7 @@ func benchmarkScheduler(b *testing.B, instanceCount, podCount int) { scheduler := scheduling.NewScheduler(ctx, client, []*scheduling.NodeClaimTemplate{scheduling.NewNodeClaimTemplate(nodePool)}, nil, cluster, nil, topology, map[string][]*cloudprovider.InstanceType{nodePool.Name: instanceTypes}, nil, - events.NewRecorder(&record.FakeRecorder{}), - scheduling.SchedulerOptions{}) + events.NewRecorder(&record.FakeRecorder{})) b.ResetTimer() // Pack benchmark diff --git a/pkg/operator/logging/logging.go b/pkg/operator/logging/logging.go index 72c0c1b4d9..ef058c6689 100644 --- a/pkg/operator/logging/logging.go +++ b/pkg/operator/logging/logging.go @@ -44,6 +44,10 @@ const ( loggerCfgFilePath = loggerCfgDir + "/zap-logger-config" ) +// NopLogger is used to throw away logs when we don't actually want to log in +// certain portions of the code since logging would be too noisy +var NopLogger = zap.NewNop().Sugar() + func DefaultZapConfig(ctx context.Context, component string) zap.Config { logLevel := lo.Ternary(component != "webhook", zap.NewAtomicLevelAt(zap.InfoLevel), zap.NewAtomicLevelAt(zap.ErrorLevel)) if l := options.FromContext(ctx).LogLevel; l != "" && component != "webhook" {