Skip to content

Commit

Permalink
Drop simulation mode in favor of NopLogger and Recording scheduling r…
Browse files Browse the repository at this point in the history
…esults
  • Loading branch information
jonathan-innis committed Feb 21, 2024
1 parent 0fea7ce commit 3a549ab
Show file tree
Hide file tree
Showing 6 changed files with 63 additions and 82 deletions.
11 changes: 3 additions & 8 deletions pkg/controllers/disruption/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ import (
"sigs.k8s.io/controller-runtime/pkg/manager"
"sigs.k8s.io/controller-runtime/pkg/reconcile"

operatorlogging "sigs.k8s.io/karpenter/pkg/operator/logging"

"sigs.k8s.io/karpenter/pkg/apis/v1beta1"
"sigs.k8s.io/karpenter/pkg/cloudprovider"
"sigs.k8s.io/karpenter/pkg/controllers/disruption/orchestration"
Expand Down Expand Up @@ -211,14 +213,7 @@ func (c *Controller) executeCommand(ctx context.Context, m Method, cmd Command,
// tainted with the Karpenter taint, the provisioning controller will continue
// to do scheduling simulations and nominate the pods on the candidate nodes until
// the node is cleaned up.
for _, node := range schedulingResults.ExistingNodes {
if len(node.Pods) > 0 {
c.cluster.NominateNodeForPod(ctx, node.ProviderID())
}
for _, pod := range node.Pods {
c.recorder.Publish(scheduling.NominatePodEvent(pod, node.Node, node.NodeClaim))
}
}
schedulingResults.Record(logging.WithLogger(ctx, operatorlogging.NopLogger), c.recorder, c.cluster)

providerIDs := lo.Map(cmd.candidates, func(c *Candidate, _ int) string { return c.ProviderID() })
// We have the new NodeClaims created at the API server so mark the old NodeClaims for deletion
Expand Down
7 changes: 3 additions & 4 deletions pkg/controllers/disruption/helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ import (
"sigs.k8s.io/karpenter/pkg/controllers/state"
"sigs.k8s.io/karpenter/pkg/events"
"sigs.k8s.io/karpenter/pkg/metrics"
operatorlogging "sigs.k8s.io/karpenter/pkg/operator/logging"
"sigs.k8s.io/karpenter/pkg/scheduling"
)

Expand Down Expand Up @@ -78,9 +79,7 @@ func SimulateScheduling(ctx context.Context, kubeClient client.Client, cluster *
pods = append(pods, n.reschedulablePods...)
}
pods = append(pods, deletingNodePods...)
scheduler, err := provisioner.NewScheduler(ctx, pods, stateNodes, pscheduling.SchedulerOptions{
SimulationMode: true,
})
scheduler, err := provisioner.NewScheduler(logging.WithLogger(ctx, operatorlogging.NopLogger), pods, stateNodes)
if err != nil {
return pscheduling.Results{}, fmt.Errorf("creating scheduler, %w", err)
}
Expand All @@ -89,7 +88,7 @@ func SimulateScheduling(ctx context.Context, kubeClient client.Client, cluster *
return client.ObjectKeyFromObject(p), nil
})

results := scheduler.Solve(ctx, pods)
results := scheduler.Solve(logging.WithLogger(ctx, operatorlogging.NopLogger), pods)
results = results.TruncateInstanceTypes(pscheduling.MaxInstanceTypes)
for _, n := range results.ExistingNodes {
// We consider existing nodes for scheduling. When these nodes are unmanaged, their taint logic should
Expand Down
14 changes: 10 additions & 4 deletions pkg/controllers/provisioning/provisioner.go
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,7 @@ func (p *Provisioner) consolidationWarnings(ctx context.Context, po *v1.Pod) {
var ErrNodePoolsNotFound = errors.New("no nodepools found")

//nolint:gocyclo
func (p *Provisioner) NewScheduler(ctx context.Context, pods []*v1.Pod, stateNodes []*state.StateNode, opts scheduler.SchedulerOptions) (*scheduler.Scheduler, error) {
func (p *Provisioner) NewScheduler(ctx context.Context, pods []*v1.Pod, stateNodes []*state.StateNode) (*scheduler.Scheduler, error) {
// Build node templates
var nodeClaimTemplates []*scheduler.NodeClaimTemplate
instanceTypes := map[string][]*cloudprovider.InstanceType{}
Expand Down Expand Up @@ -284,11 +284,12 @@ func (p *Provisioner) NewScheduler(ctx context.Context, pods []*v1.Pod, stateNod
if err != nil {
return nil, fmt.Errorf("getting daemon pods, %w", err)
}
return scheduler.NewScheduler(ctx, p.kubeClient, nodeClaimTemplates, nodePoolList.Items, p.cluster, stateNodes, topology, instanceTypes, daemonSetPods, p.recorder, opts), nil
return scheduler.NewScheduler(ctx, p.kubeClient, nodeClaimTemplates, nodePoolList.Items, p.cluster, stateNodes, topology, instanceTypes, daemonSetPods, p.recorder), nil
}

func (p *Provisioner) Schedule(ctx context.Context) (scheduler.Results, error) {
defer metrics.Measure(schedulingDuration)()
start := time.Now()

// We collect the nodes with their used capacities before we get the list of pending pods. This ensures that
// the node capacities we schedule against are always >= what the actual capacity is at any given instance. This
Expand Down Expand Up @@ -319,7 +320,7 @@ func (p *Provisioner) Schedule(ctx context.Context) (scheduler.Results, error) {
if len(pods) == 0 {
return scheduler.Results{}, nil
}
s, err := p.NewScheduler(ctx, pods, nodes.Active(), scheduler.SchedulerOptions{})
s, err := p.NewScheduler(ctx, pods, nodes.Active())
if err != nil {
if errors.Is(err, ErrNodePoolsNotFound) {
logging.FromContext(ctx).Info(ErrNodePoolsNotFound)
Expand All @@ -328,7 +329,12 @@ func (p *Provisioner) Schedule(ctx context.Context) (scheduler.Results, error) {
return scheduler.Results{}, fmt.Errorf("creating scheduler, %w", err)
}
results := s.Solve(ctx, pods)
return results.TruncateInstanceTypes(scheduler.MaxInstanceTypes), nil
results.TruncateInstanceTypes(scheduler.MaxInstanceTypes)
logging.FromContext(ctx).With("pods", pretty.Slice(lo.Map(pods, func(p *v1.Pod, _ int) string { return client.ObjectKeyFromObject(p).String() }), 5)).
With("duration", time.Since(start)).
Infof("found provisionable pod(s)")
results.Record(ctx, p.recorder, p.cluster)
return results, nil
}

func (p *Provisioner) Create(ctx context.Context, n *scheduler.NodeClaim, opts ...functional.Option[LaunchOptions]) (string, error) {
Expand Down
106 changes: 42 additions & 64 deletions pkg/controllers/provisioning/scheduling/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import (
"context"
"fmt"
"sort"
"time"

"github.com/samber/lo"
"go.uber.org/multierr"
Expand All @@ -36,20 +35,13 @@ import (
"sigs.k8s.io/karpenter/pkg/metrics"
"sigs.k8s.io/karpenter/pkg/scheduling"
"sigs.k8s.io/karpenter/pkg/utils/pod"
"sigs.k8s.io/karpenter/pkg/utils/pretty"
"sigs.k8s.io/karpenter/pkg/utils/resources"
)

// SchedulerOptions can be used to control the scheduling, these options are currently only used during consolidation.
type SchedulerOptions struct {
// SimulationMode if true will prevent recording of the pod nomination decisions as events
SimulationMode bool
}

func NewScheduler(ctx context.Context, kubeClient client.Client, nodeClaimTemplates []*NodeClaimTemplate,
nodePools []v1beta1.NodePool, cluster *state.Cluster, stateNodes []*state.StateNode, topology *Topology,
instanceTypes map[string][]*cloudprovider.InstanceType, daemonSetPods []*v1.Pod,
recorder events.Recorder, opts SchedulerOptions) *Scheduler {
recorder events.Recorder) *Scheduler {

// if any of the nodePools add a taint with a prefer no schedule effect, we add a toleration for the taint
// during preference relaxation
Expand All @@ -71,7 +63,6 @@ func NewScheduler(ctx context.Context, kubeClient client.Client, nodeClaimTempla
instanceTypes: instanceTypes,
daemonOverhead: getDaemonOverhead(nodeClaimTemplates, daemonSetPods),
recorder: recorder,
opts: opts,
preferences: &Preferences{ToleratePreferNoSchedule: toleratePreferNoSchedule},
remainingResources: map[string]v1.ResourceList{},
}
Expand All @@ -94,7 +85,6 @@ type Scheduler struct {
topology *Topology
cluster *state.Cluster
recorder events.Recorder
opts SchedulerOptions
kubeClient client.Client
}

Expand All @@ -105,6 +95,45 @@ type Results struct {
PodErrors map[*v1.Pod]error
}

// Record sends eventing and log messages back for the results that were produced from a scheduling run
// It also nominates nodes in the cluster state based on the scheduling run to signal to other components
// leveraging the cluster state that a previous scheduling run that was recorded is relying on these nodes
func (r Results) Record(ctx context.Context, recorder events.Recorder, cluster *state.Cluster) {
// Report failures and nominations
for p, err := range r.PodErrors {
logging.FromContext(ctx).With("pod", client.ObjectKeyFromObject(p)).Errorf("Could not schedule pod, %s", err)
recorder.Publish(PodFailedToScheduleEvent(p, err))
}
for _, existing := range r.ExistingNodes {
if len(existing.Pods) > 0 {
cluster.NominateNodeForPod(ctx, existing.ProviderID())
}
for _, p := range existing.Pods {
recorder.Publish(NominatePodEvent(p, existing.Node, existing.NodeClaim))
}
}
// Report new nodes, or exit to avoid log spam
newCount := 0
for _, nodeClaim := range r.NewNodeClaims {
newCount += len(nodeClaim.Pods)
}
if newCount == 0 {
return
}
logging.FromContext(ctx).With("nodeclaims", len(r.NewNodeClaims), "pods", newCount).Infof("computed new nodeclaim(s) to fit pod(s)")
// Report in flight newNodes, or exit to avoid log spam
inflightCount := 0
existingCount := 0
for _, node := range lo.Filter(r.ExistingNodes, func(node *ExistingNode, _ int) bool { return len(node.Pods) > 0 }) {
inflightCount++
existingCount += len(node.Pods)
}
if existingCount == 0 {
return
}
logging.FromContext(ctx).Infof("computed %d unready node(s) will fit %d pod(s)", inflightCount, existingCount)
}

// AllNonPendingPodsScheduled returns true if all pods scheduled.
// We don't care if a pod was pending before consolidation and will still be pending after. It may be a pod that we can't
// schedule at all and don't want it to block consolidation.
Expand Down Expand Up @@ -172,7 +201,6 @@ func (r Results) TruncateInstanceTypes(maxInstanceTypes int) Results {

func (s *Scheduler) Solve(ctx context.Context, pods []*v1.Pod) Results {
defer metrics.Measure(schedulingSimulationDuration)()
schedulingStart := time.Now()
// We loop trying to schedule unschedulable pods as long as we are making progress. This solves a few
// issues including pods with affinity to another pod in the batch. We could topo-sort to solve this, but it wouldn't
// solve the problem of scheduling pods where a particular order is needed to prevent a max-skew violation. E.g. if we
Expand Down Expand Up @@ -205,10 +233,7 @@ func (s *Scheduler) Solve(ctx context.Context, pods []*v1.Pod) Results {
for _, m := range s.newNodeClaims {
m.FinalizeScheduling()
}
if !s.opts.SimulationMode {
s.recordSchedulingResults(ctx, pods, q.List(), errors, time.Since(schedulingStart))
}
// clear any nil errors so we can know that len(PodErrors) == 0 => all pods scheduled
// clear any nil errors, so we can know that len(PodErrors) == 0 => all pods scheduled
for k, v := range errors {
if v == nil {
delete(errors, k)
Expand All @@ -221,53 +246,6 @@ func (s *Scheduler) Solve(ctx context.Context, pods []*v1.Pod) Results {
}
}

func (s *Scheduler) recordSchedulingResults(ctx context.Context, pods []*v1.Pod, failedToSchedule []*v1.Pod, errors map[*v1.Pod]error, schedulingDuration time.Duration) {
// Report failures and nominations
for _, pod := range failedToSchedule {
logging.FromContext(ctx).With("pod", client.ObjectKeyFromObject(pod)).Errorf("Could not schedule pod, %s", errors[pod])
s.recorder.Publish(PodFailedToScheduleEvent(pod, errors[pod]))
}

for _, existing := range s.existingNodes {
if len(existing.Pods) > 0 {
s.cluster.NominateNodeForPod(ctx, existing.ProviderID())
}
for _, pod := range existing.Pods {
s.recorder.Publish(NominatePodEvent(pod, existing.Node, existing.NodeClaim))
}
}

// Report new nodes, or exit to avoid log spam
newCount := 0
for _, nodeClaim := range s.newNodeClaims {
newCount += len(nodeClaim.Pods)
}
if newCount == 0 {
return
}
var podNames []string
for _, p := range pods {
podNames = append(podNames, fmt.Sprintf("%s/%s", p.Namespace, p.Name))
}

logging.FromContext(ctx).With("pods", pretty.Slice(podNames, 5)).
With("duration", schedulingDuration).
Infof("found provisionable pod(s)")

logging.FromContext(ctx).With("nodeclaims", len(s.newNodeClaims), "pods", newCount).Infof("computed new nodeclaim(s) to fit pod(s)")
// Report in flight newNodes, or exit to avoid log spam
inflightCount := 0
existingCount := 0
for _, node := range lo.Filter(s.existingNodes, func(node *ExistingNode, _ int) bool { return len(node.Pods) > 0 }) {
inflightCount++
existingCount += len(node.Pods)
}
if existingCount == 0 {
return
}
logging.FromContext(ctx).Infof("computed %d unready node(s) will fit %d pod(s)", inflightCount, existingCount)
}

func (s *Scheduler) add(ctx context.Context, pod *v1.Pod) error {
// first try to schedule against an in-flight real node
for _, node := range s.existingNodes {
Expand Down Expand Up @@ -296,7 +274,7 @@ func (s *Scheduler) add(ctx context.Context, pod *v1.Pod) error {
if len(instanceTypes) == 0 {
errs = multierr.Append(errs, fmt.Errorf("all available instance types exceed limits for nodepool: %q", nodeClaimTemplate.NodePoolName))
continue
} else if len(s.instanceTypes[nodeClaimTemplate.NodePoolName]) != len(instanceTypes) && !s.opts.SimulationMode {
} else if len(s.instanceTypes[nodeClaimTemplate.NodePoolName]) != len(instanceTypes) {
logging.FromContext(ctx).With("nodepool", nodeClaimTemplate.NodePoolName).Debugf("%d out of %d instance types were excluded because they would breach limits",
len(s.instanceTypes[nodeClaimTemplate.NodePoolName])-len(instanceTypes), len(s.instanceTypes[nodeClaimTemplate.NodePoolName]))
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -149,8 +149,7 @@ func benchmarkScheduler(b *testing.B, instanceCount, podCount int) {
scheduler := scheduling.NewScheduler(ctx, client, []*scheduling.NodeClaimTemplate{scheduling.NewNodeClaimTemplate(nodePool)},
nil, cluster, nil, topology,
map[string][]*cloudprovider.InstanceType{nodePool.Name: instanceTypes}, nil,
events.NewRecorder(&record.FakeRecorder{}),
scheduling.SchedulerOptions{})
events.NewRecorder(&record.FakeRecorder{}))

b.ResetTimer()
// Pack benchmark
Expand Down
4 changes: 4 additions & 0 deletions pkg/operator/logging/logging.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,10 @@ const (
loggerCfgFilePath = loggerCfgDir + "/zap-logger-config"
)

// NopLogger is used to throw away logs when we don't actually want to log in
// certain portions of the code since logging would be too noisy
var NopLogger = zap.NewNop().Sugar()

func DefaultZapConfig(ctx context.Context, component string) zap.Config {
logLevel := lo.Ternary(component != "webhook", zap.NewAtomicLevelAt(zap.InfoLevel), zap.NewAtomicLevelAt(zap.ErrorLevel))
if l := options.FromContext(ctx).LogLevel; l != "" && component != "webhook" {
Expand Down

0 comments on commit 3a549ab

Please sign in to comment.