Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 11 additions & 15 deletions cluster-autoscaler/core/static_autoscaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -292,7 +292,7 @@ func (a *StaticAutoscaler) RunOnce(currentTime time.Time) caerrors.AutoscalerErr
return err
}

originalScheduledPods, unschedulablePods, schedulerUnprocessed, err := listPods(podLister, a.BypassedSchedulers)
podsBySchedulability, err := listPods(podLister, a.BypassedSchedulers)
if err != nil {
return caerrors.ToAutoscalerError(caerrors.ApiCallError, err)
}
Expand Down Expand Up @@ -338,7 +338,7 @@ func (a *StaticAutoscaler) RunOnce(currentTime time.Time) caerrors.AutoscalerErr
} else {
metrics.UpdateMaxNodesCount(maxNodesCount)
}
nonExpendableScheduledPods := core_utils.FilterOutExpendablePods(originalScheduledPods, a.ExpendablePodsPriorityCutoff)
nonExpendableScheduledPods := core_utils.FilterOutExpendablePods(podsBySchedulability.Scheduled, a.ExpendablePodsPriorityCutoff)

if err := a.ClusterSnapshot.SetClusterState(allNodes, nonExpendableScheduledPods, draSnapshot); err != nil {
return caerrors.ToAutoscalerError(caerrors.InternalError, err).AddPrefix("failed to initialize ClusterSnapshot: ")
Expand Down Expand Up @@ -439,9 +439,9 @@ func (a *StaticAutoscaler) RunOnce(currentTime time.Time) caerrors.AutoscalerErr
metrics.UpdateLastTime(metrics.Autoscaling, time.Now())

// SchedulerUnprocessed might be zero here if it was disabled
metrics.UpdateUnschedulablePodsCount(len(unschedulablePods), len(schedulerUnprocessed))
metrics.UpdateUnschedulablePodsCount(len(podsBySchedulability.Unschedulable), len(podsBySchedulability.Unprocessed))
// Treat unknown pods as unschedulable, pod list processor will remove schedulable pods
unschedulablePods = append(unschedulablePods, schedulerUnprocessed...)
podsBySchedulability.Unschedulable = append(podsBySchedulability.Unschedulable, podsBySchedulability.Unprocessed...)
// Upcoming nodes are recently created nodes that haven't registered in the cluster yet, or haven't become ready yet.
upcomingCounts, registeredUpcoming := a.clusterStateRegistry.GetUpcomingNodes()
// For each upcoming node we inject a placeholder node faked to appear ready into the cluster snapshot, so that we can pack unschedulable pods on
Expand Down Expand Up @@ -480,7 +480,7 @@ func (a *StaticAutoscaler) RunOnce(currentTime time.Time) caerrors.AutoscalerErr
a.AutoscalingContext.DebuggingSnapshotter.SetClusterNodes(l)
}

unschedulablePodsToHelp, err := a.processors.PodListProcessor.Process(a.AutoscalingContext, unschedulablePods)
unschedulablePodsToHelp, err := a.processors.PodListProcessor.Process(a.AutoscalingContext, podsBySchedulability.Unschedulable)

if err != nil {
klog.Warningf("Failed to process unschedulable pods: %v", err)
Expand Down Expand Up @@ -1156,22 +1156,18 @@ func nodeNames(ns []*apiv1.Node) []string {
return names
}

func listPods(podLister kube_util.PodLister, bypassedSchedulers map[string]bool) (scheduled, unschedulable, unprocessed []*apiv1.Pod, err error) {
func listPods(podLister kube_util.PodLister, bypassedSchedulers map[string]bool) (podsBySchedulability kube_util.PodsBySchedulability, err error) {
pods, err := podLister.List()
if err != nil {
klog.Errorf("Failed to list pods: %v", err)
return nil, nil, nil, err
}
scheduled = kube_util.ScheduledPods(pods)
unschedulable = kube_util.UnschedulablePods(pods)
if len(bypassedSchedulers) > 0 {
unprocessed = kube_util.SchedulerUnprocessedPods(pods, bypassedSchedulers)
return podsBySchedulability, err
}
podsBySchedulability = kube_util.ArrangePodsBySchedulability(pods, bypassedSchedulers)
// Skip logging in case of the boring scenario, when all pods are scheduled.
if len(pods) != len(scheduled) {
ignored := len(pods) - len(scheduled) - len(unschedulable) - len(unprocessed)
if len(pods) != len(podsBySchedulability.Scheduled) {
ignored := len(pods) - len(podsBySchedulability.Scheduled) - len(podsBySchedulability.Unschedulable) - len(podsBySchedulability.Unprocessed)
klog.Infof("Found %d pods in the cluster: %d scheduled, %d unschedulable, %d unprocessed by scheduler, %d ignored (most likely using custom scheduler)",
len(pods), len(scheduled), len(unschedulable), len(unprocessed), ignored)
len(pods), len(podsBySchedulability.Scheduled), len(podsBySchedulability.Unschedulable), len(podsBySchedulability.Unprocessed), ignored)
}
return
}
68 changes: 26 additions & 42 deletions cluster-autoscaler/utils/kubernetes/listers.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,13 @@ type listerRegistryImpl struct {
statefulSetLister v1appslister.StatefulSetLister
}

// PodsBySchedulability arranges pods by their schedulability
type PodsBySchedulability struct {
Scheduled []*apiv1.Pod
Unschedulable []*apiv1.Pod
Unprocessed []*apiv1.Pod
}

// NewListerRegistry returns a registry providing various listers to list pods or nodes matching conditions
func NewListerRegistry(allNode NodeLister, readyNode NodeLister, allPodLister PodLister, podDisruptionBudgetLister PodDisruptionBudgetLister,
daemonSetLister v1appslister.DaemonSetLister, replicationControllerLister v1lister.ReplicationControllerLister,
Expand Down Expand Up @@ -139,7 +146,7 @@ func (r listerRegistryImpl) StatefulSetLister() v1appslister.StatefulSetLister {
}

// PodLister lists all pods.
// To filter out the scheduled or unschedulable pods the helper methods ScheduledPods and UnschedulablePods should be used.
// To filter out scheduled, unschedulable, or unprocessed pods the helper method ArrangePodsBySchedulability should be used.
type PodLister interface {
List() ([]*apiv1.Pod, error)
}
Expand All @@ -156,19 +163,6 @@ func isDeleted(pod *apiv1.Pod) bool {
return pod.GetDeletionTimestamp() != nil
}

// isUnschedulable checks whether a pod is unschedulable or not
// This method doesn't check for nil ptr, it's the responsibility of the caller
func isUnschedulable(pod *apiv1.Pod) bool {
if isScheduled(pod) || isDeleted(pod) {
return false
}
_, condition := podv1.GetPodCondition(&pod.Status, apiv1.PodScheduled)
if condition == nil || condition.Status != apiv1.ConditionFalse || condition.Reason != apiv1.PodReasonUnschedulable {
return false
}
return true
}

// ScheduledPods is a helper method that returns all scheduled pods from given pod list.
func ScheduledPods(allPods []*apiv1.Pod) []*apiv1.Pod {
var scheduledPods []*apiv1.Pod
Expand All @@ -181,27 +175,29 @@ func ScheduledPods(allPods []*apiv1.Pod) []*apiv1.Pod {
return scheduledPods
}

// SchedulerUnprocessedPods is a helper method that returns all pods which are not yet processed by the specified bypassed schedulers
func SchedulerUnprocessedPods(allPods []*apiv1.Pod, bypassedSchedulers map[string]bool) []*apiv1.Pod {
var unprocessedPods []*apiv1.Pod

// ArrangePodsBySchedulability is a helper method that arranges pods by schedulability:
// scheduled, unschedulable, and unprocessed by any any bypassed schedulers.
func ArrangePodsBySchedulability(allPods []*apiv1.Pod, bypassedSchedulers map[string]bool) (podsBySchedulability PodsBySchedulability) {
for _, pod := range allPods {
if canBypass := bypassedSchedulers[pod.Spec.SchedulerName]; !canBypass {
if isScheduled(pod) {
podsBySchedulability.Scheduled = append(podsBySchedulability.Scheduled, pod)
continue
}
// Make sure it's not scheduled or deleted
if isScheduled(pod) || isDeleted(pod) || isUnschedulable(pod) {
} else if isDeleted(pod) {
continue
}
// Make sure that if it's not scheduled it's either
// Not processed (condition is nil)
// Or Reason is empty (not schedulerError, terminated, ...etc)
_, condition := podv1.GetPodCondition(&pod.Status, apiv1.PodScheduled)
if condition == nil || (condition.Status == apiv1.ConditionFalse && condition.Reason == "") {
unprocessedPods = append(unprocessedPods, pod)
} else {
_, condition := podv1.GetPodCondition(&pod.Status, apiv1.PodScheduled)
if condition != nil && condition.Status == apiv1.ConditionFalse && condition.Reason == apiv1.PodReasonUnschedulable {
podsBySchedulability.Unschedulable = append(podsBySchedulability.Unschedulable, pod)
} else {
if canBypass := bypassedSchedulers[pod.Spec.SchedulerName]; canBypass {
if condition == nil || (condition.Status == apiv1.ConditionFalse && condition.Reason == "") {
podsBySchedulability.Unprocessed = append(podsBySchedulability.Unprocessed, pod)
}
}
}
}
}
return unprocessedPods
return
}

// SchedulingGatedPods is a helper method that returns all pods which has scheduling gate
Expand All @@ -228,18 +224,6 @@ func isSchedulingGated(pod *apiv1.Pod) bool {
return false
}

// UnschedulablePods is a helper method that returns all unschedulable pods from given pod list.
func UnschedulablePods(allPods []*apiv1.Pod) []*apiv1.Pod {
var unschedulablePods []*apiv1.Pod
for _, pod := range allPods {
if !isUnschedulable(pod) {
continue
}
unschedulablePods = append(unschedulablePods, pod)
}
return unschedulablePods
}

// AllPodLister lists all pods.
type AllPodLister struct {
podLister v1lister.PodLister
Expand Down
Loading