diff --git a/cluster-autoscaler/cloudprovider/test/test_cloud_provider.go b/cluster-autoscaler/cloudprovider/test/test_cloud_provider.go index 0dfea4c69150..30f86dd79da1 100644 --- a/cluster-autoscaler/cloudprovider/test/test_cloud_provider.go +++ b/cluster-autoscaler/cloudprovider/test/test_cloud_provider.go @@ -399,7 +399,11 @@ func (tng *TestNodeGroup) IncreaseSize(delta int) error { // AtomicIncreaseSize is not implemented. func (tng *TestNodeGroup) AtomicIncreaseSize(delta int) error { - return cloudprovider.ErrNotImplemented + tng.Lock() + tng.targetSize += delta + tng.Unlock() + + return tng.cloudProvider.onScaleUp(tng.id, delta) } // Exist checks if the node group really exists on the cloud provider side. Allows to tell the diff --git a/cluster-autoscaler/provisioningrequest/besteffortatomic/provisioning_class.go b/cluster-autoscaler/provisioningrequest/besteffortatomic/provisioning_class.go new file mode 100644 index 000000000000..0c7091c92e56 --- /dev/null +++ b/cluster-autoscaler/provisioningrequest/besteffortatomic/provisioning_class.go @@ -0,0 +1,156 @@ +/* +Copyright 2024 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package besteffortatomic + +import ( + appsv1 "k8s.io/api/apps/v1" + apiv1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/types" + "k8s.io/klog/v2" + + "k8s.io/autoscaler/cluster-autoscaler/apis/provisioningrequest/autoscaling.x-k8s.io/v1beta1" + "k8s.io/autoscaler/cluster-autoscaler/clusterstate" + "k8s.io/autoscaler/cluster-autoscaler/context" + "k8s.io/autoscaler/cluster-autoscaler/core/scaleup" + "k8s.io/autoscaler/cluster-autoscaler/core/scaleup/orchestrator" + "k8s.io/autoscaler/cluster-autoscaler/estimator" + "k8s.io/autoscaler/cluster-autoscaler/processors/status" + "k8s.io/autoscaler/cluster-autoscaler/provisioningrequest/conditions" + "k8s.io/autoscaler/cluster-autoscaler/provisioningrequest/provreqclient" + "k8s.io/autoscaler/cluster-autoscaler/simulator/scheduling" + "k8s.io/autoscaler/cluster-autoscaler/utils/errors" + "k8s.io/autoscaler/cluster-autoscaler/utils/taints" + + ca_processors "k8s.io/autoscaler/cluster-autoscaler/processors" + schedulerframework "k8s.io/kubernetes/pkg/scheduler/framework" +) + +// Best effort atomic provisionig class requests scale-up only if it's possible +// to atomically request enough resources for all pods specified in a +// ProvisioningRequest. It's "best effort" as it admits workload immediately +// after successful request, without waiting to verify that resources started. +type bestEffortAtomicProvClass struct { + context *context.AutoscalingContext + client *provreqclient.ProvisioningRequestClient + injector *scheduling.HintingSimulator + scaleUpOrchestrator scaleup.Orchestrator +} + +// New creates best effort atomic provisioning class supporting create capacity scale-up mode. +func New( + client *provreqclient.ProvisioningRequestClient, +) *bestEffortAtomicProvClass { + return &bestEffortAtomicProvClass{client: client, scaleUpOrchestrator: orchestrator.New()} +} + +func (o *bestEffortAtomicProvClass) Initialize( + autoscalingContext *context.AutoscalingContext, + processors *ca_processors.AutoscalingProcessors, + clusterStateRegistry *clusterstate.ClusterStateRegistry, + estimatorBuilder estimator.EstimatorBuilder, + taintConfig taints.TaintConfig, + injector *scheduling.HintingSimulator, +) { + o.context = autoscalingContext + o.injector = injector + o.scaleUpOrchestrator.Initialize(autoscalingContext, processors, clusterStateRegistry, estimatorBuilder, taintConfig) +} + +// Provision returns success if there is, or has just been requested, sufficient capacity in the cluster for pods from ProvisioningRequest. +func (o *bestEffortAtomicProvClass) Provision( + unschedulablePods []*apiv1.Pod, + nodes []*apiv1.Node, + daemonSets []*appsv1.DaemonSet, + nodeInfos map[string]*schedulerframework.NodeInfo, +) (*status.ScaleUpStatus, errors.AutoscalerError) { + if len(unschedulablePods) == 0 { + return &status.ScaleUpStatus{Result: status.ScaleUpNotTried}, nil + } + pr, err := provreqclient.ProvisioningRequestForPods(o.client, unschedulablePods) + if err != nil { + return status.UpdateScaleUpError(&status.ScaleUpStatus{}, errors.NewAutoscalerError(errors.InternalError, err.Error())) + } + if pr.Spec.ProvisioningClassName != v1beta1.ProvisioningClassBestEffortAtomicScaleUp { + return &status.ScaleUpStatus{Result: status.ScaleUpNotTried}, nil + } + + o.context.ClusterSnapshot.Fork() + defer o.context.ClusterSnapshot.Revert() + + // For provisioning requests, unschedulablePods are actually all injected pods. Some may even be schedulable! + actuallyUnschedulablePods, err := o.filterOutSchedulable(unschedulablePods) + if err != nil { + conditions.AddOrUpdateCondition(pr, v1beta1.Provisioned, metav1.ConditionFalse, conditions.FailedToCheckCapacityReason, conditions.FailedToCheckCapacityMsg, metav1.Now()) + if _, updateErr := o.client.UpdateProvisioningRequest(pr.ProvisioningRequest); updateErr != nil { + klog.Errorf("failed to add Provisioned=false condition to ProvReq %s/%s, err: %v", pr.Namespace, pr.Name, updateErr) + } + return status.UpdateScaleUpError(&status.ScaleUpStatus{}, errors.NewAutoscalerError(errors.InternalError, "error during ScaleUp: %s", err.Error())) + } + + if len(actuallyUnschedulablePods) == 0 { + // Nothing to do here - everything fits without scale-up. + conditions.AddOrUpdateCondition(pr, v1beta1.Provisioned, metav1.ConditionTrue, conditions.CapacityIsFoundReason, conditions.CapacityIsFoundMsg, metav1.Now()) + if _, updateErr := o.client.UpdateProvisioningRequest(pr.ProvisioningRequest); updateErr != nil { + klog.Errorf("failed to add Provisioned=true condition to ProvReq %s/%s, err: %v", pr.Namespace, pr.Name, updateErr) + return status.UpdateScaleUpError(&status.ScaleUpStatus{}, errors.NewAutoscalerError(errors.InternalError, "capacity available, but failed to admit workload: %s", updateErr.Error())) + } + return &status.ScaleUpStatus{Result: status.ScaleUpNotNeeded}, nil + } + + st, err := o.scaleUpOrchestrator.ScaleUp(actuallyUnschedulablePods, nodes, daemonSets, nodeInfos, true) + if err == nil && st.Result == status.ScaleUpSuccessful { + // Happy path - all is well. + conditions.AddOrUpdateCondition(pr, v1beta1.Provisioned, metav1.ConditionTrue, conditions.CapacityIsProvisionedReason, conditions.CapacityIsProvisionedMsg, metav1.Now()) + if _, updateErr := o.client.UpdateProvisioningRequest(pr.ProvisioningRequest); updateErr != nil { + klog.Errorf("failed to add Provisioned=true condition to ProvReq %s/%s, err: %v", pr.Namespace, pr.Name, updateErr) + return st, errors.NewAutoscalerError(errors.InternalError, "scale up requested, but failed to admit workload: %s", updateErr.Error()) + } + return st, nil + } + + // We are not happy with the results. + conditions.AddOrUpdateCondition(pr, v1beta1.Provisioned, metav1.ConditionFalse, conditions.CapacityIsNotFoundReason, "Capacity is not found, CA will try to find it later.", metav1.Now()) + if _, updateErr := o.client.UpdateProvisioningRequest(pr.ProvisioningRequest); updateErr != nil { + klog.Errorf("failed to add Provisioned=false condition to ProvReq %s/%s, err: %v", pr.Namespace, pr.Name, updateErr) + } + if err != nil { + return status.UpdateScaleUpError(&status.ScaleUpStatus{}, errors.NewAutoscalerError(errors.InternalError, "error during ScaleUp: %s", err.Error())) + } + return st, nil +} + +func (o *bestEffortAtomicProvClass) filterOutSchedulable(pods []*apiv1.Pod) ([]*apiv1.Pod, error) { + statuses, _, err := o.injector.TrySchedulePods(o.context.ClusterSnapshot, pods, scheduling.ScheduleAnywhere, false) + if err != nil { + return nil, err + } + + scheduledPods := make(map[types.UID]bool) + for _, status := range statuses { + scheduledPods[status.Pod.UID] = true + } + + var unschedulablePods []*apiv1.Pod + for _, pod := range pods { + if !scheduledPods[pod.UID] { + unschedulablePods = append(unschedulablePods, pod) + } + } + return unschedulablePods, nil + +} diff --git a/cluster-autoscaler/provisioningrequest/conditions/conditions.go b/cluster-autoscaler/provisioningrequest/conditions/conditions.go index e08feb56797e..33ba73c95d87 100644 --- a/cluster-autoscaler/provisioningrequest/conditions/conditions.go +++ b/cluster-autoscaler/provisioningrequest/conditions/conditions.go @@ -25,33 +25,41 @@ import ( ) const ( - //AcceptedReason is added when ProvisioningRequest is accepted by ClusterAutoscaler + // AcceptedReason is added when ProvisioningRequest is accepted by ClusterAutoscaler AcceptedReason = "Accepted" - //AcceptedMsg is added when ProvisioningRequest is accepted by ClusterAutoscaler + // AcceptedMsg is added when ProvisioningRequest is accepted by ClusterAutoscaler AcceptedMsg = "ProvisioningRequest is accepted by ClusterAutoscaler" - //CapacityIsNotFoundReason is added when capacity was not found in the cluster. + // CapacityIsNotFoundReason is added when capacity was not found in the cluster. CapacityIsNotFoundReason = "CapacityIsNotFound" - //CapacityIsFoundReason is added when capacity was found in the cluster. + // CapacityIsFoundReason is added when capacity was found in the cluster. CapacityIsFoundReason = "CapacityIsFound" // CapacityIsFoundMsg is added when capacity was found in the cluster. CapacityIsFoundMsg = "Capacity is found in the cluster" - //FailedToCreatePodsReason is added when CA failed to create pods for ProvisioningRequest. + // CapacityIsProvisionedReason is added when capacity was requested successfully. + CapacityIsProvisionedReason = "CapacityIsProvisioned" + // CapacityIsProvisionedMsg is added when capacity was requested successfully. + CapacityIsProvisionedMsg = "Capacity is found in the cluster" + // FailedToCheckCapacityReason is added when CA failed to check pre-existing capacity. + FailedToCheckCapacityReason = "FailedToCheckCapacity" + // FailedToCheckCapacityMsg is added when CA failed to check pre-existing capacity. + FailedToCheckCapacityMsg = "Failed to check pre-existing capacity in the cluster" + // FailedToCreatePodsReason is added when CA failed to create pods for ProvisioningRequest. FailedToCreatePodsReason = "FailedToCreatePods" - //FailedToBookCapacityReason is added when Cluster Autoscaler failed to book capacity in the cluster. + // FailedToBookCapacityReason is added when Cluster Autoscaler failed to book capacity in the cluster. FailedToBookCapacityReason = "FailedToBookCapacity" - //CapacityReservationTimeExpiredReason is added whed capacity reservation time is expired. + // CapacityReservationTimeExpiredReason is added whed capacity reservation time is expired. CapacityReservationTimeExpiredReason = "CapacityReservationTimeExpired" - //CapacityReservationTimeExpiredMsg is added if capacity reservation time is expired. + // CapacityReservationTimeExpiredMsg is added if capacity reservation time is expired. CapacityReservationTimeExpiredMsg = "Capacity reservation time is expired" - //ExpiredReason is added if ProvisioningRequest is expired. + // ExpiredReason is added if ProvisioningRequest is expired. ExpiredReason = "Expired" - //ExpiredMsg is added if ProvisioningRequest is expired. + // ExpiredMsg is added if ProvisioningRequest is expired. ExpiredMsg = "ProvisioningRequest is expired" ) // ShouldCapacityBeBooked returns whether capacity should be booked. func ShouldCapacityBeBooked(pr *provreqwrapper.ProvisioningRequest) bool { - if pr.Spec.ProvisioningClassName != v1beta1.ProvisioningClassCheckCapacity { + if pr.Spec.ProvisioningClassName != v1beta1.ProvisioningClassCheckCapacity && pr.Spec.ProvisioningClassName != v1beta1.ProvisioningClassBestEffortAtomicScaleUp { return false } conditions := pr.Status.Conditions diff --git a/cluster-autoscaler/provisioningrequest/orchestrator/orchestrator.go b/cluster-autoscaler/provisioningrequest/orchestrator/orchestrator.go index 6738ab5ea006..ab86f7193cf7 100644 --- a/cluster-autoscaler/provisioningrequest/orchestrator/orchestrator.go +++ b/cluster-autoscaler/provisioningrequest/orchestrator/orchestrator.go @@ -27,6 +27,7 @@ import ( "k8s.io/autoscaler/cluster-autoscaler/context" "k8s.io/autoscaler/cluster-autoscaler/estimator" "k8s.io/autoscaler/cluster-autoscaler/processors/status" + "k8s.io/autoscaler/cluster-autoscaler/provisioningrequest/besteffortatomic" "k8s.io/autoscaler/cluster-autoscaler/provisioningrequest/checkcapacity" "k8s.io/autoscaler/cluster-autoscaler/provisioningrequest/conditions" provreq_pods "k8s.io/autoscaler/cluster-autoscaler/provisioningrequest/pods" @@ -64,7 +65,13 @@ func New(kubeConfig *rest.Config) (*provReqOrchestrator, error) { return nil, err } - return &provReqOrchestrator{client: client, provisioningClasses: []provisioningClass{checkcapacity.New(client)}}, nil + return &provReqOrchestrator{ + client: client, + provisioningClasses: []provisioningClass{ + checkcapacity.New(client), + besteffortatomic.New(client), + }, + }, nil } // Initialize initialize orchestrator. diff --git a/cluster-autoscaler/provisioningrequest/orchestrator/orchestrator_test.go b/cluster-autoscaler/provisioningrequest/orchestrator/orchestrator_test.go index ece104808f27..fa2fd55576c3 100644 --- a/cluster-autoscaler/provisioningrequest/orchestrator/orchestrator_test.go +++ b/cluster-autoscaler/provisioningrequest/orchestrator/orchestrator_test.go @@ -20,6 +20,7 @@ import ( "context" "fmt" "testing" + "time" "github.com/stretchr/testify/assert" v1 "k8s.io/api/apps/v1" @@ -27,33 +28,43 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/autoscaler/cluster-autoscaler/apis/provisioningrequest/autoscaling.x-k8s.io/v1beta1" testprovider "k8s.io/autoscaler/cluster-autoscaler/cloudprovider/test" + "k8s.io/autoscaler/cluster-autoscaler/clusterstate" "k8s.io/autoscaler/cluster-autoscaler/config" . "k8s.io/autoscaler/cluster-autoscaler/core/test" + "k8s.io/autoscaler/cluster-autoscaler/estimator" + "k8s.io/autoscaler/cluster-autoscaler/processors/nodegroupconfig" + "k8s.io/autoscaler/cluster-autoscaler/processors/nodeinfosprovider" "k8s.io/autoscaler/cluster-autoscaler/processors/status" + "k8s.io/autoscaler/cluster-autoscaler/provisioningrequest/besteffortatomic" "k8s.io/autoscaler/cluster-autoscaler/provisioningrequest/checkcapacity" "k8s.io/autoscaler/cluster-autoscaler/provisioningrequest/pods" "k8s.io/autoscaler/cluster-autoscaler/provisioningrequest/provreqclient" "k8s.io/autoscaler/cluster-autoscaler/provisioningrequest/provreqwrapper" "k8s.io/autoscaler/cluster-autoscaler/simulator/clustersnapshot" + kube_util "k8s.io/autoscaler/cluster-autoscaler/utils/kubernetes" "k8s.io/autoscaler/cluster-autoscaler/utils/taints" . "k8s.io/autoscaler/cluster-autoscaler/utils/test" + "k8s.io/autoscaler/cluster-autoscaler/utils/units" "k8s.io/client-go/kubernetes/fake" - "k8s.io/kubernetes/pkg/scheduler/framework" + schedulerframework "k8s.io/kubernetes/pkg/scheduler/framework" ) func TestScaleUp(t *testing.T) { // Set up a cluster with 200 nodes: - // - 100 nodes with high cpu, low memory - // - 100 nodes with high memory, low cpu + // - 100 nodes with high cpu, low memory in autoscaled group with max 150 + // - 100 nodes with high memory, low cpu not in autoscaled group + now := time.Now() allNodes := []*apiv1.Node{} for i := 0; i < 100; i++ { name := fmt.Sprintf("test-cpu-node-%d", i) node := BuildTestNode(name, 100, 10) + SetNodeReadyState(node, true, now.Add(-2*time.Minute)) allNodes = append(allNodes, node) } for i := 0; i < 100; i++ { name := fmt.Sprintf("test-mem-node-%d", i) node := BuildTestNode(name, 1, 1000) + SetNodeReadyState(node, true, now.Add(-2*time.Minute)) allNodes = append(allNodes, node) } @@ -76,17 +87,50 @@ func TestScaleUp(t *testing.T) { Class: v1beta1.ProvisioningClassCheckCapacity, }) - // Active atomic scale up request. + // Active atomic scale up requests. atomicScaleUpProvReq := provreqwrapper.BuildValidTestProvisioningRequestFromOptions( provreqwrapper.TestProvReqOptions{ Name: "atomicScaleUpProvReq", - CPU: "1", + CPU: "5m", + Memory: "5", + PodCount: int32(5), + Class: v1beta1.ProvisioningClassBestEffortAtomicScaleUp, + }) + largeAtomicScaleUpProvReq := provreqwrapper.BuildValidTestProvisioningRequestFromOptions( + provreqwrapper.TestProvReqOptions{ + Name: "largeAtomicScaleUpProvReq", + CPU: "1m", + Memory: "100", + PodCount: int32(100), + Class: v1beta1.ProvisioningClassBestEffortAtomicScaleUp, + }) + impossibleAtomicScaleUpReq := provreqwrapper.BuildValidTestProvisioningRequestFromOptions( + provreqwrapper.TestProvReqOptions{ + Name: "impossibleAtomicScaleUpRequest", + CPU: "1m", Memory: "1", + PodCount: int32(5001), + Class: v1beta1.ProvisioningClassBestEffortAtomicScaleUp, + }) + possibleAtomicScaleUpReq := provreqwrapper.BuildValidTestProvisioningRequestFromOptions( + provreqwrapper.TestProvReqOptions{ + Name: "possibleAtomicScaleUpReq", + CPU: "100m", + Memory: "1", + PodCount: int32(120), + Class: v1beta1.ProvisioningClassBestEffortAtomicScaleUp, + }) + autoprovisioningAtomicScaleUpReq := provreqwrapper.BuildValidTestProvisioningRequestFromOptions( + provreqwrapper.TestProvReqOptions{ + Name: "autoprovisioningAtomicScaleUpReq", + CPU: "100m", + Memory: "100", PodCount: int32(5), Class: v1beta1.ProvisioningClassBestEffortAtomicScaleUp, }) // Already provisioned provisioning request - capacity should be booked before processing a new request. + // Books 20 out of 100 high-memory nodes. bookedCapacityProvReq := provreqwrapper.BuildValidTestProvisioningRequestFromOptions( provreqwrapper.TestProvReqOptions{ Name: "bookedCapacityProvReq", @@ -123,6 +167,7 @@ func TestScaleUp(t *testing.T) { provReqs []*provreqwrapper.ProvisioningRequest provReqToScaleUp *provreqwrapper.ProvisioningRequest scaleUpResult status.ScaleUpResult + autoprovisioning bool err bool }{ { @@ -136,6 +181,12 @@ func TestScaleUp(t *testing.T) { provReqToScaleUp: newCheckCapacityCpuProvReq, scaleUpResult: status.ScaleUpSuccessful, }, + { + name: "one ProvisioningRequest of atomic scale up class", + provReqs: []*provreqwrapper.ProvisioningRequest{atomicScaleUpProvReq}, + provReqToScaleUp: atomicScaleUpProvReq, + scaleUpResult: status.ScaleUpNotNeeded, + }, { name: "capacity in the cluster is booked", provReqs: []*provreqwrapper.ProvisioningRequest{newCheckCapacityMemProvReq, bookedCapacityProvReq}, @@ -154,29 +205,69 @@ func TestScaleUp(t *testing.T) { provReqToScaleUp: newCheckCapacityCpuProvReq, scaleUpResult: status.ScaleUpSuccessful, }, + { + name: "some capacity is pre-booked, atomic scale-up not needed", + provReqs: []*provreqwrapper.ProvisioningRequest{bookedCapacityProvReq, atomicScaleUpProvReq}, + provReqToScaleUp: atomicScaleUpProvReq, + scaleUpResult: status.ScaleUpNotNeeded, + }, + { + name: "some capacity is pre-booked, large atomic scale-up request doesn't fit", + provReqs: []*provreqwrapper.ProvisioningRequest{bookedCapacityProvReq, largeAtomicScaleUpProvReq}, + provReqToScaleUp: largeAtomicScaleUpProvReq, + scaleUpResult: status.ScaleUpNoOptionsAvailable, + }, + { + name: "capacity is there, large atomic scale-up request doesn't require scale-up", + provReqs: []*provreqwrapper.ProvisioningRequest{largeAtomicScaleUpProvReq}, + provReqToScaleUp: largeAtomicScaleUpProvReq, + scaleUpResult: status.ScaleUpNotNeeded, + }, + { + name: "impossible atomic scale-up request doesn't trigger scale-up", + provReqs: []*provreqwrapper.ProvisioningRequest{impossibleAtomicScaleUpReq}, + provReqToScaleUp: impossibleAtomicScaleUpReq, + scaleUpResult: status.ScaleUpNoOptionsAvailable, + }, + { + name: "possible atomic scale-up request triggers scale-up", + provReqs: []*provreqwrapper.ProvisioningRequest{possibleAtomicScaleUpReq}, + provReqToScaleUp: possibleAtomicScaleUpReq, + scaleUpResult: status.ScaleUpSuccessful, + }, + { + name: "autoprovisioning atomic scale-up request triggers scale-up", + provReqs: []*provreqwrapper.ProvisioningRequest{autoprovisioningAtomicScaleUpReq}, + provReqToScaleUp: autoprovisioningAtomicScaleUpReq, + autoprovisioning: true, + scaleUpResult: status.ScaleUpSuccessful, + }, } for _, tc := range testCases { tc := tc allNodes := allNodes t.Run(tc.name, func(t *testing.T) { t.Parallel() - provider := testprovider.NewTestCloudProvider(nil, nil) - autoscalingContext, err := NewScaleTestAutoscalingContext(config.AutoscalingOptions{}, &fake.Clientset{}, nil, provider, nil, nil) - assert.NoError(t, err) - clustersnapshot.InitializeClusterSnapshotOrDie(t, autoscalingContext.ClusterSnapshot, allNodes, nil) prPods, err := pods.PodsForProvisioningRequest(tc.provReqToScaleUp) assert.NoError(t, err) - client := provreqclient.NewFakeProvisioningRequestClient(context.Background(), t, tc.provReqs...) - orchestrator := &provReqOrchestrator{ - client: client, - provisioningClasses: []provisioningClass{checkcapacity.New(client)}, + onScaleUpFunc := func(name string, n int) error { + if tc.scaleUpResult == status.ScaleUpSuccessful { + return nil + } + return fmt.Errorf("unexpected scale-up of %s by %d", name, n) } - orchestrator.Initialize(&autoscalingContext, nil, nil, nil, taints.TaintConfig{}) - st, err := orchestrator.ScaleUp(prPods, []*apiv1.Node{}, []*v1.DaemonSet{}, map[string]*framework.NodeInfo{}, false) + orchestrator, nodeInfos := setupTest(t, allNodes, tc.provReqs, onScaleUpFunc, tc.autoprovisioning) + + st, err := orchestrator.ScaleUp(prPods, []*apiv1.Node{}, []*v1.DaemonSet{}, nodeInfos, false) if !tc.err { assert.NoError(t, err) + if tc.scaleUpResult != st.Result && len(st.PodsRemainUnschedulable) > 0 { + // We expected all pods to be scheduled, but some remain unschedulable. + // Let's add the reason groups were rejected to errors. This is useful for debugging. + t.Errorf("noScaleUpInfo: %#v", st.PodsRemainUnschedulable[0].RejectedNodeGroups) + } assert.Equal(t, tc.scaleUpResult, st.Result) } else { assert.Error(t, err) @@ -184,3 +275,67 @@ func TestScaleUp(t *testing.T) { }) } } + +func setupTest(t *testing.T, nodes []*apiv1.Node, prs []*provreqwrapper.ProvisioningRequest, onScaleUpFunc func(string, int) error, autoprovisioning bool) (*provReqOrchestrator, map[string]*schedulerframework.NodeInfo) { + provider := testprovider.NewTestCloudProvider(onScaleUpFunc, nil) + if autoprovisioning { + machineTypes := []string{"large-machine"} + template := BuildTestNode("large-node-template", 100, 100) + SetNodeReadyState(template, true, time.Now()) + nodeInfoTemplate := schedulerframework.NewNodeInfo() + nodeInfoTemplate.SetNode(template) + machineTemplates := map[string]*schedulerframework.NodeInfo{ + "large-machine": nodeInfoTemplate, + } + onNodeGroupCreateFunc := func(name string) error { return nil } + provider = testprovider.NewTestAutoprovisioningCloudProvider(onScaleUpFunc, nil, onNodeGroupCreateFunc, nil, machineTypes, machineTemplates) + } + + provider.AddNodeGroup("test-cpu", 50, 150, 100) + for _, n := range nodes[:100] { + provider.AddNode("test-cpu", n) + } + + podLister := kube_util.NewTestPodLister(nil) + listers := kube_util.NewListerRegistry(nil, nil, podLister, nil, nil, nil, nil, nil, nil) + autoscalingContext, err := NewScaleTestAutoscalingContext(config.AutoscalingOptions{}, &fake.Clientset{}, listers, provider, nil, nil) + assert.NoError(t, err) + + clustersnapshot.InitializeClusterSnapshotOrDie(t, autoscalingContext.ClusterSnapshot, nodes, nil) + client := provreqclient.NewFakeProvisioningRequestClient(context.Background(), t, prs...) + processors := NewTestProcessors(&autoscalingContext) + if autoprovisioning { + processors.NodeGroupListProcessor = &MockAutoprovisioningNodeGroupListProcessor{T: t} + processors.NodeGroupManager = &MockAutoprovisioningNodeGroupManager{T: t, ExtraGroups: 2} + } + + now := time.Now() + nodeInfos, err := nodeinfosprovider.NewDefaultTemplateNodeInfoProvider(nil, false).Process(&autoscalingContext, nodes, []*v1.DaemonSet{}, taints.TaintConfig{}, now) + assert.NoError(t, err) + + options := config.AutoscalingOptions{ + EstimatorName: estimator.BinpackingEstimatorName, + MaxCoresTotal: config.DefaultMaxClusterCores, + MaxMemoryTotal: config.DefaultMaxClusterMemory * units.GiB, + MinCoresTotal: 0, + MinMemoryTotal: 0, + NodeAutoprovisioningEnabled: autoprovisioning, + MaxAutoprovisionedNodeGroupCount: 10, + } + estimatorBuilder, _ := estimator.NewEstimatorBuilder( + estimator.BinpackingEstimatorName, + estimator.NewThresholdBasedEstimationLimiter(nil), + estimator.NewDecreasingPodOrderer(), + nil, + ) + + clusterState := clusterstate.NewClusterStateRegistry(provider, clusterstate.ClusterStateRegistryConfig{}, autoscalingContext.LogRecorder, NewBackoff(), nodegroupconfig.NewDefaultNodeGroupConfigProcessor(options.NodeGroupDefaults)) + clusterState.UpdateNodes(nodes, nodeInfos, now) + + orchestrator := &provReqOrchestrator{ + client: client, + provisioningClasses: []provisioningClass{checkcapacity.New(client), besteffortatomic.New(client)}, + } + orchestrator.Initialize(&autoscalingContext, processors, clusterState, estimatorBuilder, taints.TaintConfig{}) + return orchestrator, nodeInfos +}