Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions cluster-autoscaler/config/autoscaling_options.go
Original file line number Diff line number Diff line change
Expand Up @@ -314,6 +314,8 @@ type AutoscalingOptions struct {
ForceDeleteFailedNodes bool
// DynamicResourceAllocationEnabled configures whether logic for handling DRA objects is enabled.
DynamicResourceAllocationEnabled bool
// CSINodeAwareSchedulingEnabled configures whether logic for handling CSINode objects is enabled.
CSINodeAwareSchedulingEnabled bool
// ClusterSnapshotParallelism is the maximum parallelism of cluster snapshot creation.
ClusterSnapshotParallelism int
// PredicateParallelism is the number of goroutines to use for running scheduler predicates.
Expand Down
2 changes: 2 additions & 0 deletions cluster-autoscaler/config/flags/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -226,6 +226,7 @@ var (
forceDeleteLongUnregisteredNodes = flag.Bool("force-delete-unregistered-nodes", false, "Whether to enable force deletion of long unregistered nodes, regardless of the min size of the node group the belong to.")
forceDeleteFailedNodes = flag.Bool("force-delete-failed-nodes", false, "Whether to enable force deletion of failed nodes, regardless of the min size of the node group the belong to.")
enableDynamicResourceAllocation = flag.Bool("enable-dynamic-resource-allocation", false, "Whether logic for handling DRA (Dynamic Resource Allocation) objects is enabled.")
enableCSINodeAwareScheduling = flag.Bool("enable-csi-node-aware-scheduling", false, "Whether logic for handling CSINode objects is enabled.")
clusterSnapshotParallelism = flag.Int("cluster-snapshot-parallelism", 16, "Maximum parallelism of cluster snapshot creation.")
predicateParallelism = flag.Int("predicate-parallelism", 4, "Maximum parallelism of scheduler predicate checking.")
checkCapacityProcessorInstance = flag.String("check-capacity-processor-instance", "", "Name of the processor instance. Only ProvisioningRequests that define this name in their parameters with the key \"processorInstance\" will be processed by this CA instance. It only refers to check capacity ProvisioningRequests, but if not empty, best-effort atomic ProvisioningRequests processing is disabled in this instance. Not recommended: Until CA 1.35, ProvisioningRequests with this name as prefix in their class will be also processed.")
Expand Down Expand Up @@ -404,6 +405,7 @@ func createAutoscalingOptions() config.AutoscalingOptions {
ForceDeleteLongUnregisteredNodes: *forceDeleteLongUnregisteredNodes,
ForceDeleteFailedNodes: *forceDeleteFailedNodes,
DynamicResourceAllocationEnabled: *enableDynamicResourceAllocation,
CSINodeAwareSchedulingEnabled: *enableCSINodeAwareScheduling,
ClusterSnapshotParallelism: *clusterSnapshotParallelism,
PredicateParallelism: *predicateParallelism,
CheckCapacityProcessorInstance: *checkCapacityProcessorInstance,
Expand Down
5 changes: 5 additions & 0 deletions cluster-autoscaler/context/autoscaling_context.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"k8s.io/autoscaler/cluster-autoscaler/expander"
processor_callbacks "k8s.io/autoscaler/cluster-autoscaler/processors/callbacks"
"k8s.io/autoscaler/cluster-autoscaler/simulator/clustersnapshot"
csinodeprovider "k8s.io/autoscaler/cluster-autoscaler/simulator/csi/provider"
draprovider "k8s.io/autoscaler/cluster-autoscaler/simulator/dynamicresources/provider"
"k8s.io/autoscaler/cluster-autoscaler/simulator/framework"
kube_util "k8s.io/autoscaler/cluster-autoscaler/utils/kubernetes"
Expand Down Expand Up @@ -65,6 +66,8 @@ type AutoscalingContext struct {
ProvisioningRequestScaleUpMode bool
// DraProvider is the provider for dynamic resources allocation.
DraProvider *draprovider.Provider
// CsiProvider is the provider for CSI node aware scheduling.
CsiProvider *csinodeprovider.Provider
}

// AutoscalingKubeClients contains all Kubernetes API clients,
Expand Down Expand Up @@ -112,6 +115,7 @@ func NewAutoscalingContext(
remainingPdbTracker pdb.RemainingPdbTracker,
clusterStateRegistry *clusterstate.ClusterStateRegistry,
draProvider *draprovider.Provider,
csiProvider *csinodeprovider.Provider,
) *AutoscalingContext {
return &AutoscalingContext{
AutoscalingOptions: options,
Expand All @@ -125,6 +129,7 @@ func NewAutoscalingContext(
RemainingPdbTracker: remainingPdbTracker,
ClusterStateRegistry: clusterStateRegistry,
DraProvider: draProvider,
CsiProvider: csiProvider,
}
}

Expand Down
5 changes: 3 additions & 2 deletions cluster-autoscaler/core/autoscaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ func NewAutoscaler(opts coreoptions.AutoscalerOptions, informerFactory informers
opts.DeleteOptions,
opts.DrainabilityRules,
opts.DraProvider,
opts.CsiProvider,
), nil
}

Expand All @@ -91,14 +92,14 @@ func initializeDefaultOptions(opts *coreoptions.AutoscalerOptions, informerFacto
opts.AutoscalingKubeClients = ca_context.NewAutoscalingKubeClients(opts.AutoscalingOptions, opts.KubeClient, opts.InformerFactory)
}
if opts.FrameworkHandle == nil {
fwHandle, err := framework.NewHandle(opts.InformerFactory, opts.SchedulerConfig, opts.DynamicResourceAllocationEnabled)
fwHandle, err := framework.NewHandle(opts.InformerFactory, opts.SchedulerConfig, opts.DynamicResourceAllocationEnabled, opts.CSINodeAwareSchedulingEnabled)
if err != nil {
return err
}
opts.FrameworkHandle = fwHandle
}
if opts.ClusterSnapshot == nil {
opts.ClusterSnapshot = predicate.NewPredicateSnapshot(store.NewBasicSnapshotStore(), opts.FrameworkHandle, opts.DynamicResourceAllocationEnabled, opts.PredicateParallelism)
opts.ClusterSnapshot = predicate.NewPredicateSnapshot(store.NewBasicSnapshotStore(), opts.FrameworkHandle, opts.DynamicResourceAllocationEnabled, opts.PredicateParallelism, opts.CSINodeAwareSchedulingEnabled)
}
if opts.RemainingPdbTracker == nil {
opts.RemainingPdbTracker = pdb.NewBasicRemainingPdbTracker()
Expand Down
2 changes: 2 additions & 0 deletions cluster-autoscaler/core/options/autoscaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"k8s.io/autoscaler/cluster-autoscaler/observers/loopstart"
ca_processors "k8s.io/autoscaler/cluster-autoscaler/processors"
"k8s.io/autoscaler/cluster-autoscaler/simulator/clustersnapshot"
csinodeprovider "k8s.io/autoscaler/cluster-autoscaler/simulator/csi/provider"
"k8s.io/autoscaler/cluster-autoscaler/simulator/drainability/rules"
draprovider "k8s.io/autoscaler/cluster-autoscaler/simulator/dynamicresources/provider"
"k8s.io/autoscaler/cluster-autoscaler/simulator/framework"
Expand Down Expand Up @@ -57,4 +58,5 @@ type AutoscalerOptions struct {
DeleteOptions options.NodeDeleteOptions
DrainabilityRules rules.Rules
DraProvider *draprovider.Provider
CsiProvider *csinodeprovider.Provider
}
13 changes: 11 additions & 2 deletions cluster-autoscaler/core/scaledown/actuation/actuator.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import (
"k8s.io/autoscaler/cluster-autoscaler/simulator/clustersnapshot"
"k8s.io/autoscaler/cluster-autoscaler/simulator/clustersnapshot/predicate"
"k8s.io/autoscaler/cluster-autoscaler/simulator/clustersnapshot/store"
csisnapshot "k8s.io/autoscaler/cluster-autoscaler/simulator/csi/snapshot"
"k8s.io/autoscaler/cluster-autoscaler/simulator/drainability/rules"
drasnapshot "k8s.io/autoscaler/cluster-autoscaler/simulator/dynamicresources/snapshot"
"k8s.io/autoscaler/cluster-autoscaler/simulator/options"
Expand Down Expand Up @@ -397,7 +398,7 @@ func (a *Actuator) taintNode(node *apiv1.Node) error {
}

func (a *Actuator) createSnapshot(nodes []*apiv1.Node) (clustersnapshot.ClusterSnapshot, error) {
snapshot := predicate.NewPredicateSnapshot(store.NewBasicSnapshotStore(), a.autoscalingCtx.FrameworkHandle, a.autoscalingCtx.DynamicResourceAllocationEnabled, a.autoscalingCtx.PredicateParallelism)
snapshot := predicate.NewPredicateSnapshot(store.NewBasicSnapshotStore(), a.autoscalingCtx.FrameworkHandle, a.autoscalingCtx.DynamicResourceAllocationEnabled, a.autoscalingCtx.PredicateParallelism, a.autoscalingCtx.CSINodeAwareSchedulingEnabled)
pods, err := a.autoscalingCtx.AllPodLister().List()
if err != nil {
return nil, err
Expand All @@ -414,7 +415,15 @@ func (a *Actuator) createSnapshot(nodes []*apiv1.Node) (clustersnapshot.ClusterS
}
}

err = snapshot.SetClusterState(nodes, nonExpendableScheduledPods, draSnapshot)
var csiSnapshot *csisnapshot.Snapshot
if a.autoscalingCtx.CSINodeAwareSchedulingEnabled {
csiSnapshot, err = a.autoscalingCtx.CsiProvider.Snapshot()
if err != nil {
return nil, err
}
}

err = snapshot.SetClusterState(nodes, nonExpendableScheduledPods, draSnapshot, csiSnapshot)
if err != nil {
return nil, err
}
Expand Down
7 changes: 6 additions & 1 deletion cluster-autoscaler/core/scaleup/orchestrator/orchestrator.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ func (o *ScaleUpOrchestrator) ScaleUp(
if aErr != nil {
return status.UpdateScaleUpError(&status.ScaleUpStatus{}, aErr.AddPrefix("could not get upcoming nodes: "))
}
klog.V(4).Infof("Upcoming %d nodes", len(upcomingNodes))
klog.V(4).Infof("hemant Upcoming %d nodes", len(upcomingNodes))

nodeGroups := o.autoscalingCtx.CloudProvider.NodeGroups()
if o.processors != nil && o.processors.NodeGroupListProcessor != nil {
Expand Down Expand Up @@ -135,11 +135,14 @@ func (o *ScaleUpOrchestrator) ScaleUp(
for nodegroupID := range skippedNodeGroups {
o.processors.BinpackingLimiter.MarkProcessed(o.autoscalingCtx, nodegroupID)
}
klog.V(4).Infof("hemant validNodeGroups %d", len(validNodeGroups))

// Calculate expansion options
schedulablePodGroups := map[string][]estimator.PodEquivalenceGroup{}
var options []expander.Option

// This code here runs a simulation to see which pods can be scheduled on which node groups.
// TODO: Fix bug with CSI node not being added to the simulation.
for _, nodeGroup := range validNodeGroups {
schedulablePodGroups[nodeGroup.Id()] = o.SchedulablePodGroups(podEquivalenceGroups, nodeGroup, nodeInfos[nodeGroup.Id()])
}
Expand All @@ -150,6 +153,7 @@ func (o *ScaleUpOrchestrator) ScaleUp(

if len(option.Pods) == 0 || option.NodeCount == 0 {
klog.V(4).Infof("No pod can fit to %s", nodeGroup.Id())
klog.Infof("hemant no pod can fit to %s", nodeGroup.Id())
} else if allOrNothing && len(option.Pods) < len(unschedulablePods) {
klog.V(4).Infof("Some pods can't fit to %s, giving up due to all-or-nothing scale-up strategy", nodeGroup.Id())
} else {
Expand Down Expand Up @@ -486,6 +490,7 @@ func (o *ScaleUpOrchestrator) ComputeExpansionOption(
o.autoscalingCtx.ClusterSnapshot,
estimator.NewEstimationContext(o.autoscalingCtx.MaxNodesTotal, option.SimilarNodeGroups, currentNodeCount),
)
klog.Infof("hemant about to run estimater for node group %s", nodeGroup.Id())
option.NodeCount, option.Pods = expansionEstimator.Estimate(podGroups, nodeInfo, nodeGroup)
metrics.UpdateDurationFromStart(metrics.Estimate, estimateStart)

Expand Down
19 changes: 16 additions & 3 deletions cluster-autoscaler/core/static_autoscaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@ import (
"k8s.io/autoscaler/cluster-autoscaler/processors/status"
"k8s.io/autoscaler/cluster-autoscaler/simulator"
"k8s.io/autoscaler/cluster-autoscaler/simulator/clustersnapshot"
csinodeprovider "k8s.io/autoscaler/cluster-autoscaler/simulator/csi/provider"
csisnapshot "k8s.io/autoscaler/cluster-autoscaler/simulator/csi/snapshot"
"k8s.io/autoscaler/cluster-autoscaler/simulator/drainability/rules"
draprovider "k8s.io/autoscaler/cluster-autoscaler/simulator/dynamicresources/provider"
drasnapshot "k8s.io/autoscaler/cluster-autoscaler/simulator/dynamicresources/snapshot"
Expand Down Expand Up @@ -141,7 +143,8 @@ func NewStaticAutoscaler(
scaleUpOrchestrator scaleup.Orchestrator,
deleteOptions options.NodeDeleteOptions,
drainabilityRules rules.Rules,
draProvider *draprovider.Provider) *StaticAutoscaler {
draProvider *draprovider.Provider,
csiProvider *csinodeprovider.Provider) *StaticAutoscaler {

klog.V(4).Infof("Creating new static autoscaler with opts: %v", opts)

Expand All @@ -162,7 +165,8 @@ func NewStaticAutoscaler(
debuggingSnapshotter,
remainingPdbTracker,
clusterStateRegistry,
draProvider)
draProvider,
csiProvider)

taintConfig := taints.NewTaintConfig(opts)
processors.ScaleDownCandidatesNotifier.Register(clusterStateRegistry)
Expand Down Expand Up @@ -280,6 +284,15 @@ func (a *StaticAutoscaler) RunOnce(currentTime time.Time) caerrors.AutoscalerErr
}
}

var csiSnapshot *csisnapshot.Snapshot
if a.AutoscalingContext.CsiProvider != nil {
var err error
csiSnapshot, err = a.AutoscalingContext.CsiProvider.Snapshot()
if err != nil {
return caerrors.ToAutoscalerError(caerrors.ApiCallError, err)
}
}

// Get nodes and pods currently living on cluster
allNodes, readyNodes, typedErr := a.obtainNodeLists(draSnapshot)
if typedErr != nil {
Expand Down Expand Up @@ -340,7 +353,7 @@ func (a *StaticAutoscaler) RunOnce(currentTime time.Time) caerrors.AutoscalerErr
}
nonExpendableScheduledPods := core_utils.FilterOutExpendablePods(originalScheduledPods, a.ExpendablePodsPriorityCutoff)

if err := a.ClusterSnapshot.SetClusterState(allNodes, nonExpendableScheduledPods, draSnapshot); err != nil {
if err := a.ClusterSnapshot.SetClusterState(allNodes, nonExpendableScheduledPods, draSnapshot, csiSnapshot); err != nil {
return caerrors.ToAutoscalerError(caerrors.InternalError, err).AddPrefix("failed to initialize ClusterSnapshot: ")
}
// Initialize Pod Disruption Budget tracking
Expand Down
12 changes: 12 additions & 0 deletions cluster-autoscaler/estimator/binpacking_estimator.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ type estimationState struct {
newNodeNameIndex int
lastNodeName string
newNodeNames map[string]bool
// map of node name that has at least one pod scheduled on it
newNodesWithPods map[string]bool
}

Expand Down Expand Up @@ -142,18 +143,21 @@ func (e *BinpackingNodeEstimator) tryToScheduleOnExistingNodes(
pods []*apiv1.Pod,
) ([]*apiv1.Pod, error) {
var index int
klog.Infof("hemant about to try to schedule on existing nodes %d", len(pods))
for index = 0; index < len(pods); index++ {
pod := pods[index]

// Try to schedule the pod on all nodes created during simulation
nodeName, err := e.clusterSnapshot.SchedulePodOnAnyNodeMatching(pod, func(nodeInfo *framework.NodeInfo) bool {
return estimationState.newNodeNames[nodeInfo.Node().Name]
})
klog.Infof("hemant trying scheduling of pod %s on node %s", pod.Name, nodeName)
if err != nil && err.Type() == clustersnapshot.SchedulingInternalError {
// Unexpected error.
return nil, err
} else if err != nil {
// The pod couldn't be scheduled on any Node because of scheduling predicates.
klog.Infof("hemant failed to schedule pod %s on existing node %s", pod.Name, nodeName)
break
}
// The pod was scheduled on nodeName.
Expand All @@ -174,6 +178,7 @@ func (e *BinpackingNodeEstimator) tryToScheduleOnNewNodes(

if estimationState.lastNodeName != "" {
// Try to schedule the pod on only newly created node.
klog.Infof("hemant - Trying to schedule pod %s on last node %s", pod.Name, estimationState.lastNodeName)
err := e.clusterSnapshot.SchedulePod(pod, estimationState.lastNodeName)
if err == nil {
// The pod was scheduled on the newly created node.
Expand Down Expand Up @@ -222,6 +227,7 @@ func (e *BinpackingNodeEstimator) tryToScheduleOnNewNodes(
return false, nil
}

klog.Infof("hemant - Adding new node %s", estimationState.lastNodeName)
// Add new node
if err := e.addNewNodeToSnapshot(estimationState, nodeTemplate); err != nil {
return false, fmt.Errorf("Error while adding new node for template to ClusterSnapshot; %w", err)
Expand Down Expand Up @@ -249,10 +255,16 @@ func (e *BinpackingNodeEstimator) addNewNodeToSnapshot(
estimationState *estimationState,
template *framework.NodeInfo,
) error {
klog.Infof("hemant adding new node with template %s to snapshot during estimation", template.Node().Name)
newNodeInfo, err := core_utils.SanitizedNodeInfo(template, fmt.Sprintf("e-%d", estimationState.newNodeNameIndex))
if err != nil {
return err
}

if template.CSINode != nil {
newNodeInfo.AddCSINode(core_utils.CreateSanitizedCSINode(template.CSINode, newNodeInfo))
}

if err := e.clusterSnapshot.AddNodeInfo(newNodeInfo); err != nil {
return err
}
Expand Down
Loading
Loading