From 507a5b5e2f6d57c91d965a8b248f98f957e5cc84 Mon Sep 17 00:00:00 2001 From: LY-today <724102053@qq.com> Date: Fri, 20 Dec 2024 17:02:57 +0800 Subject: [PATCH 1/5] feat: add noderesourcefitplus and scarceresourceavoidance scheduler plugis Signed-off-by: LY-today <724102053@qq.com> --- cmd/koord-scheduler/main.go | 18 +- pkg/scheduler/apis/config/register.go | 2 + pkg/scheduler/apis/config/types.go | 23 ++ pkg/scheduler/apis/config/v1beta3/register.go | 2 + pkg/scheduler/apis/config/v1beta3/types.go | 23 ++ .../config/v1beta3/zz_generated.conversion.go | 92 ++++++ .../config/v1beta3/zz_generated.deepcopy.go | 78 +++++ .../apis/config/zz_generated.deepcopy.go | 78 +++++ .../node_resources_fit_plus.go | 171 +++++++++++ .../node_resources_fit_plus_test.go | 287 ++++++++++++++++++ .../scarce_resource_avoidance.go | 175 +++++++++++ .../scarce_resource_avoidance_test.go | 250 +++++++++++++++ 12 files changed, 1192 insertions(+), 7 deletions(-) create mode 100644 pkg/scheduler/plugins/noderesourcefitplus/node_resources_fit_plus.go create mode 100644 pkg/scheduler/plugins/noderesourcefitplus/node_resources_fit_plus_test.go create mode 100644 pkg/scheduler/plugins/scarceresourceavoidance/scarce_resource_avoidance.go create mode 100644 pkg/scheduler/plugins/scarceresourceavoidance/scarce_resource_avoidance_test.go diff --git a/cmd/koord-scheduler/main.go b/cmd/koord-scheduler/main.go index 214509e41..bc77d53d5 100644 --- a/cmd/koord-scheduler/main.go +++ b/cmd/koord-scheduler/main.go @@ -31,7 +31,9 @@ import ( "github.com/koordinator-sh/koordinator/pkg/scheduler/plugins/elasticquota" "github.com/koordinator-sh/koordinator/pkg/scheduler/plugins/loadaware" "github.com/koordinator-sh/koordinator/pkg/scheduler/plugins/nodenumaresource" + noderesourcesfitplus "github.com/koordinator-sh/koordinator/pkg/scheduler/plugins/noderesourcefitplus" "github.com/koordinator-sh/koordinator/pkg/scheduler/plugins/reservation" + "github.com/koordinator-sh/koordinator/pkg/scheduler/plugins/scarceresourceavoidance" // Ensure metric package is initialized _ "k8s.io/component-base/metrics/prometheus/clientgo" @@ -40,13 +42,15 @@ import ( ) var koordinatorPlugins = map[string]frameworkruntime.PluginFactory{ - loadaware.Name: loadaware.New, - nodenumaresource.Name: nodenumaresource.New, - reservation.Name: reservation.New, - coscheduling.Name: coscheduling.New, - deviceshare.Name: deviceshare.New, - elasticquota.Name: elasticquota.New, - defaultprebind.Name: defaultprebind.New, + loadaware.Name: loadaware.New, + nodenumaresource.Name: nodenumaresource.New, + reservation.Name: reservation.New, + coscheduling.Name: coscheduling.New, + deviceshare.Name: deviceshare.New, + elasticquota.Name: elasticquota.New, + defaultprebind.Name: defaultprebind.New, + noderesourcesfitplus.Name: noderesourcesfitplus.New, + scarceresourceavoidance.Name: scarceresourceavoidance.New, } func flatten(plugins map[string]frameworkruntime.PluginFactory) []app.Option { diff --git a/pkg/scheduler/apis/config/register.go b/pkg/scheduler/apis/config/register.go index 312a9f6a4..6e165a0ab 100644 --- a/pkg/scheduler/apis/config/register.go +++ b/pkg/scheduler/apis/config/register.go @@ -40,6 +40,8 @@ func addKnownTypes(scheme *runtime.Scheme) error { &ElasticQuotaArgs{}, &CoschedulingArgs{}, &DeviceShareArgs{}, + &NodeResourcesFitPlusArgs{}, + &ScarceResourceAvoidanceArgs{}, ) return nil } diff --git a/pkg/scheduler/apis/config/types.go b/pkg/scheduler/apis/config/types.go index 39ae2ca3b..e09fd0db2 100644 --- a/pkg/scheduler/apis/config/types.go +++ b/pkg/scheduler/apis/config/types.go @@ -19,9 +19,11 @@ package config import ( corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/kubernetes/pkg/scheduler/apis/config" schedconfig "k8s.io/kubernetes/pkg/scheduler/apis/config" "github.com/koordinator-sh/koordinator/apis/extension" + v1 "k8s.io/api/core/v1" ) // +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object @@ -237,3 +239,24 @@ type DeviceShareArgs struct { // DisableDeviceNUMATopologyAlignment indicates device don't need to align with other resources' numa topology DisableDeviceNUMATopologyAlignment bool } + +// +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object + +// ScarceResourceAvoidanceArgs defines the parameters for ScarceResourceAvoidance plugin. +type ScarceResourceAvoidanceArgs struct { + metav1.TypeMeta + Resources []v1.ResourceName +} + +// +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object + +// NodeResourcesFitPlusArgs defines the parameters for NodeResourcesFitPlus plugin. +type NodeResourcesFitPlusArgs struct { + metav1.TypeMeta + Resources map[v1.ResourceName]ResourcesType +} + +type ResourcesType struct { + Type config.ScoringStrategyType + Weight int64 +} diff --git a/pkg/scheduler/apis/config/v1beta3/register.go b/pkg/scheduler/apis/config/v1beta3/register.go index 54ebd3936..cb821860a 100644 --- a/pkg/scheduler/apis/config/v1beta3/register.go +++ b/pkg/scheduler/apis/config/v1beta3/register.go @@ -40,6 +40,8 @@ func addKnownTypes(scheme *runtime.Scheme) error { &ElasticQuotaArgs{}, &CoschedulingArgs{}, &DeviceShareArgs{}, + &NodeResourcesFitPlusArgs{}, + &ScarceResourceAvoidanceArgs{}, ) return nil } diff --git a/pkg/scheduler/apis/config/v1beta3/types.go b/pkg/scheduler/apis/config/v1beta3/types.go index 36a872864..74803030a 100644 --- a/pkg/scheduler/apis/config/v1beta3/types.go +++ b/pkg/scheduler/apis/config/v1beta3/types.go @@ -22,6 +22,8 @@ import ( schedconfigv1beta3 "k8s.io/kube-scheduler/config/v1beta3" "github.com/koordinator-sh/koordinator/apis/extension" + v1 "k8s.io/api/core/v1" + "k8s.io/kubernetes/pkg/scheduler/apis/config" ) // +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object @@ -232,3 +234,24 @@ type DeviceShareArgs struct { // DisableDeviceNUMATopologyAlignment indicates device don't need to align with other resources' numa topology DisableDeviceNUMATopologyAlignment bool `json:"disableDeviceNUMATopologyAlignment,omitempty"` } + +// +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object + +// ScarceResourceAvoidanceArgs defines the parameters for ScarceResourceAvoidance plugin. +type ScarceResourceAvoidanceArgs struct { + metav1.TypeMeta + Resources []v1.ResourceName `json:"resources,omitempty"` +} + +// +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object + +// NodeResourcesFitPlusArgs defines the parameters for NodeResourcesFitPlus plugin. +type NodeResourcesFitPlusArgs struct { + metav1.TypeMeta + Resources map[v1.ResourceName]ResourcesType `json:"resources"` +} + +type ResourcesType struct { + Type config.ScoringStrategyType `json:"type"` + Weight int64 `json:"weight"` +} diff --git a/pkg/scheduler/apis/config/v1beta3/zz_generated.conversion.go b/pkg/scheduler/apis/config/v1beta3/zz_generated.conversion.go index 5b9b56c37..f0d35f078 100644 --- a/pkg/scheduler/apis/config/v1beta3/zz_generated.conversion.go +++ b/pkg/scheduler/apis/config/v1beta3/zz_generated.conversion.go @@ -96,6 +96,16 @@ func RegisterConversions(s *runtime.Scheme) error { }); err != nil { return err } + if err := s.AddGeneratedConversionFunc((*NodeResourcesFitPlusArgs)(nil), (*config.NodeResourcesFitPlusArgs)(nil), func(a, b interface{}, scope conversion.Scope) error { + return Convert_v1beta3_NodeResourcesFitPlusArgs_To_config_NodeResourcesFitPlusArgs(a.(*NodeResourcesFitPlusArgs), b.(*config.NodeResourcesFitPlusArgs), scope) + }); err != nil { + return err + } + if err := s.AddGeneratedConversionFunc((*config.NodeResourcesFitPlusArgs)(nil), (*NodeResourcesFitPlusArgs)(nil), func(a, b interface{}, scope conversion.Scope) error { + return Convert_config_NodeResourcesFitPlusArgs_To_v1beta3_NodeResourcesFitPlusArgs(a.(*config.NodeResourcesFitPlusArgs), b.(*NodeResourcesFitPlusArgs), scope) + }); err != nil { + return err + } if err := s.AddGeneratedConversionFunc((*ReservationArgs)(nil), (*config.ReservationArgs)(nil), func(a, b interface{}, scope conversion.Scope) error { return Convert_v1beta3_ReservationArgs_To_config_ReservationArgs(a.(*ReservationArgs), b.(*config.ReservationArgs), scope) }); err != nil { @@ -106,6 +116,26 @@ func RegisterConversions(s *runtime.Scheme) error { }); err != nil { return err } + if err := s.AddGeneratedConversionFunc((*ResourcesType)(nil), (*config.ResourcesType)(nil), func(a, b interface{}, scope conversion.Scope) error { + return Convert_v1beta3_ResourcesType_To_config_ResourcesType(a.(*ResourcesType), b.(*config.ResourcesType), scope) + }); err != nil { + return err + } + if err := s.AddGeneratedConversionFunc((*config.ResourcesType)(nil), (*ResourcesType)(nil), func(a, b interface{}, scope conversion.Scope) error { + return Convert_config_ResourcesType_To_v1beta3_ResourcesType(a.(*config.ResourcesType), b.(*ResourcesType), scope) + }); err != nil { + return err + } + if err := s.AddGeneratedConversionFunc((*ScarceResourceAvoidanceArgs)(nil), (*config.ScarceResourceAvoidanceArgs)(nil), func(a, b interface{}, scope conversion.Scope) error { + return Convert_v1beta3_ScarceResourceAvoidanceArgs_To_config_ScarceResourceAvoidanceArgs(a.(*ScarceResourceAvoidanceArgs), b.(*config.ScarceResourceAvoidanceArgs), scope) + }); err != nil { + return err + } + if err := s.AddGeneratedConversionFunc((*config.ScarceResourceAvoidanceArgs)(nil), (*ScarceResourceAvoidanceArgs)(nil), func(a, b interface{}, scope conversion.Scope) error { + return Convert_config_ScarceResourceAvoidanceArgs_To_v1beta3_ScarceResourceAvoidanceArgs(a.(*config.ScarceResourceAvoidanceArgs), b.(*ScarceResourceAvoidanceArgs), scope) + }); err != nil { + return err + } if err := s.AddGeneratedConversionFunc((*ScoringStrategy)(nil), (*config.ScoringStrategy)(nil), func(a, b interface{}, scope conversion.Scope) error { return Convert_v1beta3_ScoringStrategy_To_config_ScoringStrategy(a.(*ScoringStrategy), b.(*config.ScoringStrategy), scope) }); err != nil { @@ -355,6 +385,26 @@ func Convert_config_NodeNUMAResourceArgs_To_v1beta3_NodeNUMAResourceArgs(in *con return autoConvert_config_NodeNUMAResourceArgs_To_v1beta3_NodeNUMAResourceArgs(in, out, s) } +func autoConvert_v1beta3_NodeResourcesFitPlusArgs_To_config_NodeResourcesFitPlusArgs(in *NodeResourcesFitPlusArgs, out *config.NodeResourcesFitPlusArgs, s conversion.Scope) error { + out.Resources = *(*map[corev1.ResourceName]config.ResourcesType)(unsafe.Pointer(&in.Resources)) + return nil +} + +// Convert_v1beta3_NodeResourcesFitPlusArgs_To_config_NodeResourcesFitPlusArgs is an autogenerated conversion function. +func Convert_v1beta3_NodeResourcesFitPlusArgs_To_config_NodeResourcesFitPlusArgs(in *NodeResourcesFitPlusArgs, out *config.NodeResourcesFitPlusArgs, s conversion.Scope) error { + return autoConvert_v1beta3_NodeResourcesFitPlusArgs_To_config_NodeResourcesFitPlusArgs(in, out, s) +} + +func autoConvert_config_NodeResourcesFitPlusArgs_To_v1beta3_NodeResourcesFitPlusArgs(in *config.NodeResourcesFitPlusArgs, out *NodeResourcesFitPlusArgs, s conversion.Scope) error { + out.Resources = *(*map[corev1.ResourceName]ResourcesType)(unsafe.Pointer(&in.Resources)) + return nil +} + +// Convert_config_NodeResourcesFitPlusArgs_To_v1beta3_NodeResourcesFitPlusArgs is an autogenerated conversion function. +func Convert_config_NodeResourcesFitPlusArgs_To_v1beta3_NodeResourcesFitPlusArgs(in *config.NodeResourcesFitPlusArgs, out *NodeResourcesFitPlusArgs, s conversion.Scope) error { + return autoConvert_config_NodeResourcesFitPlusArgs_To_v1beta3_NodeResourcesFitPlusArgs(in, out, s) +} + func autoConvert_v1beta3_ReservationArgs_To_config_ReservationArgs(in *ReservationArgs, out *config.ReservationArgs, s conversion.Scope) error { if err := v1.Convert_Pointer_bool_To_bool(&in.EnablePreemption, &out.EnablePreemption, s); err != nil { return err @@ -391,6 +441,48 @@ func Convert_config_ReservationArgs_To_v1beta3_ReservationArgs(in *config.Reserv return autoConvert_config_ReservationArgs_To_v1beta3_ReservationArgs(in, out, s) } +func autoConvert_v1beta3_ResourcesType_To_config_ResourcesType(in *ResourcesType, out *config.ResourcesType, s conversion.Scope) error { + out.Type = apisconfig.ScoringStrategyType(in.Type) + out.Weight = in.Weight + return nil +} + +// Convert_v1beta3_ResourcesType_To_config_ResourcesType is an autogenerated conversion function. +func Convert_v1beta3_ResourcesType_To_config_ResourcesType(in *ResourcesType, out *config.ResourcesType, s conversion.Scope) error { + return autoConvert_v1beta3_ResourcesType_To_config_ResourcesType(in, out, s) +} + +func autoConvert_config_ResourcesType_To_v1beta3_ResourcesType(in *config.ResourcesType, out *ResourcesType, s conversion.Scope) error { + out.Type = apisconfig.ScoringStrategyType(in.Type) + out.Weight = in.Weight + return nil +} + +// Convert_config_ResourcesType_To_v1beta3_ResourcesType is an autogenerated conversion function. +func Convert_config_ResourcesType_To_v1beta3_ResourcesType(in *config.ResourcesType, out *ResourcesType, s conversion.Scope) error { + return autoConvert_config_ResourcesType_To_v1beta3_ResourcesType(in, out, s) +} + +func autoConvert_v1beta3_ScarceResourceAvoidanceArgs_To_config_ScarceResourceAvoidanceArgs(in *ScarceResourceAvoidanceArgs, out *config.ScarceResourceAvoidanceArgs, s conversion.Scope) error { + out.Resources = *(*[]corev1.ResourceName)(unsafe.Pointer(&in.Resources)) + return nil +} + +// Convert_v1beta3_ScarceResourceAvoidanceArgs_To_config_ScarceResourceAvoidanceArgs is an autogenerated conversion function. +func Convert_v1beta3_ScarceResourceAvoidanceArgs_To_config_ScarceResourceAvoidanceArgs(in *ScarceResourceAvoidanceArgs, out *config.ScarceResourceAvoidanceArgs, s conversion.Scope) error { + return autoConvert_v1beta3_ScarceResourceAvoidanceArgs_To_config_ScarceResourceAvoidanceArgs(in, out, s) +} + +func autoConvert_config_ScarceResourceAvoidanceArgs_To_v1beta3_ScarceResourceAvoidanceArgs(in *config.ScarceResourceAvoidanceArgs, out *ScarceResourceAvoidanceArgs, s conversion.Scope) error { + out.Resources = *(*[]corev1.ResourceName)(unsafe.Pointer(&in.Resources)) + return nil +} + +// Convert_config_ScarceResourceAvoidanceArgs_To_v1beta3_ScarceResourceAvoidanceArgs is an autogenerated conversion function. +func Convert_config_ScarceResourceAvoidanceArgs_To_v1beta3_ScarceResourceAvoidanceArgs(in *config.ScarceResourceAvoidanceArgs, out *ScarceResourceAvoidanceArgs, s conversion.Scope) error { + return autoConvert_config_ScarceResourceAvoidanceArgs_To_v1beta3_ScarceResourceAvoidanceArgs(in, out, s) +} + func autoConvert_v1beta3_ScoringStrategy_To_config_ScoringStrategy(in *ScoringStrategy, out *config.ScoringStrategy, s conversion.Scope) error { out.Type = config.ScoringStrategyType(in.Type) out.Resources = *(*[]apisconfig.ResourceSpec)(unsafe.Pointer(&in.Resources)) diff --git a/pkg/scheduler/apis/config/v1beta3/zz_generated.deepcopy.go b/pkg/scheduler/apis/config/v1beta3/zz_generated.deepcopy.go index ae708e93a..61b213ca3 100644 --- a/pkg/scheduler/apis/config/v1beta3/zz_generated.deepcopy.go +++ b/pkg/scheduler/apis/config/v1beta3/zz_generated.deepcopy.go @@ -313,6 +313,38 @@ func (in *NodeNUMAResourceArgs) DeepCopyObject() runtime.Object { return nil } +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *NodeResourcesFitPlusArgs) DeepCopyInto(out *NodeResourcesFitPlusArgs) { + *out = *in + out.TypeMeta = in.TypeMeta + if in.Resources != nil { + in, out := &in.Resources, &out.Resources + *out = make(map[corev1.ResourceName]ResourcesType, len(*in)) + for key, val := range *in { + (*out)[key] = val + } + } + return +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new NodeResourcesFitPlusArgs. +func (in *NodeResourcesFitPlusArgs) DeepCopy() *NodeResourcesFitPlusArgs { + if in == nil { + return nil + } + out := new(NodeResourcesFitPlusArgs) + in.DeepCopyInto(out) + return out +} + +// DeepCopyObject is an autogenerated deepcopy function, copying the receiver, creating a new runtime.Object. +func (in *NodeResourcesFitPlusArgs) DeepCopyObject() runtime.Object { + if c := in.DeepCopy(); c != nil { + return c + } + return nil +} + // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *ReservationArgs) DeepCopyInto(out *ReservationArgs) { *out = *in @@ -353,6 +385,52 @@ func (in *ReservationArgs) DeepCopyObject() runtime.Object { return nil } +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *ResourcesType) DeepCopyInto(out *ResourcesType) { + *out = *in + return +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new ResourcesType. +func (in *ResourcesType) DeepCopy() *ResourcesType { + if in == nil { + return nil + } + out := new(ResourcesType) + in.DeepCopyInto(out) + return out +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *ScarceResourceAvoidanceArgs) DeepCopyInto(out *ScarceResourceAvoidanceArgs) { + *out = *in + out.TypeMeta = in.TypeMeta + if in.Resources != nil { + in, out := &in.Resources, &out.Resources + *out = make([]corev1.ResourceName, len(*in)) + copy(*out, *in) + } + return +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new ScarceResourceAvoidanceArgs. +func (in *ScarceResourceAvoidanceArgs) DeepCopy() *ScarceResourceAvoidanceArgs { + if in == nil { + return nil + } + out := new(ScarceResourceAvoidanceArgs) + in.DeepCopyInto(out) + return out +} + +// DeepCopyObject is an autogenerated deepcopy function, copying the receiver, creating a new runtime.Object. +func (in *ScarceResourceAvoidanceArgs) DeepCopyObject() runtime.Object { + if c := in.DeepCopy(); c != nil { + return c + } + return nil +} + // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *ScoringStrategy) DeepCopyInto(out *ScoringStrategy) { *out = *in diff --git a/pkg/scheduler/apis/config/zz_generated.deepcopy.go b/pkg/scheduler/apis/config/zz_generated.deepcopy.go index 97dac7428..fd010dfb2 100644 --- a/pkg/scheduler/apis/config/zz_generated.deepcopy.go +++ b/pkg/scheduler/apis/config/zz_generated.deepcopy.go @@ -257,6 +257,38 @@ func (in *NodeNUMAResourceArgs) DeepCopyObject() runtime.Object { return nil } +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *NodeResourcesFitPlusArgs) DeepCopyInto(out *NodeResourcesFitPlusArgs) { + *out = *in + out.TypeMeta = in.TypeMeta + if in.Resources != nil { + in, out := &in.Resources, &out.Resources + *out = make(map[v1.ResourceName]ResourcesType, len(*in)) + for key, val := range *in { + (*out)[key] = val + } + } + return +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new NodeResourcesFitPlusArgs. +func (in *NodeResourcesFitPlusArgs) DeepCopy() *NodeResourcesFitPlusArgs { + if in == nil { + return nil + } + out := new(NodeResourcesFitPlusArgs) + in.DeepCopyInto(out) + return out +} + +// DeepCopyObject is an autogenerated deepcopy function, copying the receiver, creating a new runtime.Object. +func (in *NodeResourcesFitPlusArgs) DeepCopyObject() runtime.Object { + if c := in.DeepCopy(); c != nil { + return c + } + return nil +} + // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *ReservationArgs) DeepCopyInto(out *ReservationArgs) { *out = *in @@ -282,6 +314,52 @@ func (in *ReservationArgs) DeepCopyObject() runtime.Object { return nil } +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *ResourcesType) DeepCopyInto(out *ResourcesType) { + *out = *in + return +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new ResourcesType. +func (in *ResourcesType) DeepCopy() *ResourcesType { + if in == nil { + return nil + } + out := new(ResourcesType) + in.DeepCopyInto(out) + return out +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *ScarceResourceAvoidanceArgs) DeepCopyInto(out *ScarceResourceAvoidanceArgs) { + *out = *in + out.TypeMeta = in.TypeMeta + if in.Resources != nil { + in, out := &in.Resources, &out.Resources + *out = make([]v1.ResourceName, len(*in)) + copy(*out, *in) + } + return +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new ScarceResourceAvoidanceArgs. +func (in *ScarceResourceAvoidanceArgs) DeepCopy() *ScarceResourceAvoidanceArgs { + if in == nil { + return nil + } + out := new(ScarceResourceAvoidanceArgs) + in.DeepCopyInto(out) + return out +} + +// DeepCopyObject is an autogenerated deepcopy function, copying the receiver, creating a new runtime.Object. +func (in *ScarceResourceAvoidanceArgs) DeepCopyObject() runtime.Object { + if c := in.DeepCopy(); c != nil { + return c + } + return nil +} + // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *ScoringStrategy) DeepCopyInto(out *ScoringStrategy) { *out = *in diff --git a/pkg/scheduler/plugins/noderesourcefitplus/node_resources_fit_plus.go b/pkg/scheduler/plugins/noderesourcefitplus/node_resources_fit_plus.go new file mode 100644 index 000000000..ad8837c8d --- /dev/null +++ b/pkg/scheduler/plugins/noderesourcefitplus/node_resources_fit_plus.go @@ -0,0 +1,171 @@ +/* +Copyright 2022 The Koordinator Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package noderesourcesfitplus + +import ( + "context" + "fmt" + + v1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/kubernetes/pkg/api/v1/resource" + k8sConfig "k8s.io/kubernetes/pkg/scheduler/apis/config" + "k8s.io/kubernetes/pkg/scheduler/framework" + plfeature "k8s.io/kubernetes/pkg/scheduler/framework/plugins/feature" + "k8s.io/kubernetes/pkg/scheduler/framework/plugins/noderesources" + + "github.com/koordinator-sh/koordinator/pkg/scheduler/apis/config" +) + +const ( + // Name is plugin name + Name = "NodeResourcesFitPlus" +) + +var ( + _ framework.ScorePlugin = &Plugin{} +) + +type Plugin struct { + handle framework.Handle + args *config.NodeResourcesFitPlusArgs +} + +func New(args runtime.Object, handle framework.Handle) (framework.Plugin, error) { + + sampleArgs2, ok := args.(*config.NodeResourcesFitPlusArgs) + + if !ok { + return nil, fmt.Errorf("want args to be of type NodeResourcesArgs, got %T", args) + } + + return &Plugin{ + handle: handle, + args: sampleArgs2, + }, nil +} + +func (s *Plugin) Name() string { + return Name +} + +func (s *Plugin) Score(ctx context.Context, state *framework.CycleState, p *v1.Pod, nodeName string) (int64, *framework.Status) { + + nodeInfo, err := s.handle.SnapshotSharedLister().NodeInfos().Get(nodeName) + if err != nil { + return 0, framework.NewStatus(framework.Error, fmt.Sprintf("getting node %q from Snapshot: %v", nodeName, err)) + } + + var nodeScore int64 + var weightSum int64 + + podRequest, _ := fitsRequest(computePodResourceRequest(p).Resource, nodeInfo) + + for _, requestSourceName := range podRequest { + v, ok := s.args.Resources[requestSourceName] + if !ok { + continue + } + fit, err := noderesources.NewFit( + &k8sConfig.NodeResourcesFitArgs{ + ScoringStrategy: &k8sConfig.ScoringStrategy{ + Type: v.Type, // MostAllocated or LeastAllocated + Resources: []k8sConfig.ResourceSpec{ + {Name: string(requestSourceName), Weight: 1}, + }, + }, + }, s.handle, plfeature.Features{}) + + if err != nil { + return 0, framework.NewStatus(framework.Error, err.Error()) + } + + resourceScore, status := fit.(framework.ScorePlugin).Score(ctx, state, p, nodeName) + if !status.IsSuccess() { + return 0, framework.NewStatus(framework.Error, err.Error()) + } + + nodeScore += resourceScore * v.Weight + weightSum += v.Weight + } + + if weightSum == 0 { + return framework.MaxNodeScore, framework.NewStatus(framework.Success, "") + } + scores := nodeScore / weightSum + + return scores, framework.NewStatus(framework.Success, "") +} + +func (p *Plugin) ScoreExtensions() framework.ScoreExtensions { + return nil +} + +type preFilterState struct { + framework.Resource +} + +func computePodResourceRequest(pod *v1.Pod) *preFilterState { + // pod hasn't scheduled yet so we don't need to worry about InPlacePodVerticalScalingEnabled + reqs := resource.PodRequests(pod, resource.PodResourcesOptions{}) + result := &preFilterState{} + result.SetMaxResource(reqs) + return result +} + +func fitsRequest(podRequest framework.Resource, nodeInfo *framework.NodeInfo) ([]v1.ResourceName, []v1.ResourceName) { + var podRequestResource []v1.ResourceName + var nodeRequestResource []v1.ResourceName + + if podRequest.MilliCPU > 0 { + podRequestResource = append(podRequestResource, v1.ResourceCPU) + } + + if nodeInfo.Allocatable.MilliCPU > 0 { + nodeRequestResource = append(nodeRequestResource, v1.ResourceCPU) + } + + if podRequest.Memory > 0 { + podRequestResource = append(podRequestResource, v1.ResourceMemory) + } + + if nodeInfo.Allocatable.Memory > 0 { + nodeRequestResource = append(nodeRequestResource, v1.ResourceMemory) + } + + if podRequest.EphemeralStorage > 0 { + podRequestResource = append(podRequestResource, v1.ResourceEphemeralStorage) + } + + if nodeInfo.Allocatable.EphemeralStorage > 0 { + nodeRequestResource = append(nodeRequestResource, v1.ResourceEphemeralStorage) + } + + for rName, rQuant := range podRequest.ScalarResources { + if rQuant > 0 { + podRequestResource = append(podRequestResource, rName) + } + } + + for rName, rQuant := range nodeInfo.Allocatable.ScalarResources { + if rQuant > 0 { + nodeRequestResource = append(nodeRequestResource, rName) + } + } + + return podRequestResource, nodeRequestResource +} diff --git a/pkg/scheduler/plugins/noderesourcefitplus/node_resources_fit_plus_test.go b/pkg/scheduler/plugins/noderesourcefitplus/node_resources_fit_plus_test.go new file mode 100644 index 000000000..0ee87ee2b --- /dev/null +++ b/pkg/scheduler/plugins/noderesourcefitplus/node_resources_fit_plus_test.go @@ -0,0 +1,287 @@ +/* +Copyright 2022 The Koordinator Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package noderesourcesfitplus + +import ( + "context" + "testing" + + "github.com/stretchr/testify/assert" + corev1 "k8s.io/api/core/v1" + v1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/resource" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + apiruntime "k8s.io/apimachinery/pkg/runtime" + "k8s.io/client-go/informers" + kubefake "k8s.io/client-go/kubernetes/fake" + k8sConfig "k8s.io/kubernetes/pkg/scheduler/apis/config" + "k8s.io/kubernetes/pkg/scheduler/framework" + "k8s.io/kubernetes/pkg/scheduler/framework/plugins/defaultbinder" + "k8s.io/kubernetes/pkg/scheduler/framework/plugins/queuesort" + frameworkruntime "k8s.io/kubernetes/pkg/scheduler/framework/runtime" + schedulertesting "k8s.io/kubernetes/pkg/scheduler/testing" + + koordinatorclientset "github.com/koordinator-sh/koordinator/pkg/client/clientset/versioned" + koordfake "github.com/koordinator-sh/koordinator/pkg/client/clientset/versioned/fake" + koordinatorinformers "github.com/koordinator-sh/koordinator/pkg/client/informers/externalversions" + "github.com/koordinator-sh/koordinator/pkg/scheduler/apis/config" + "github.com/koordinator-sh/koordinator/pkg/scheduler/apis/config/v1beta3" + "github.com/koordinator-sh/koordinator/pkg/scheduler/frameworkext" +) + +var _ framework.SharedLister = &testSharedLister{} + +type testSharedLister struct { + nodes []*corev1.Node + nodeInfos []*framework.NodeInfo + nodeInfoMap map[string]*framework.NodeInfo +} + +func newTestSharedLister(pods []*corev1.Pod, nodes []*corev1.Node) *testSharedLister { + nodeInfoMap := make(map[string]*framework.NodeInfo) + nodeInfos := make([]*framework.NodeInfo, 0) + for _, pod := range pods { + nodeName := pod.Spec.NodeName + if _, ok := nodeInfoMap[nodeName]; !ok { + nodeInfoMap[nodeName] = framework.NewNodeInfo() + } + nodeInfoMap[nodeName].AddPod(pod) + } + for _, node := range nodes { + if _, ok := nodeInfoMap[node.Name]; !ok { + nodeInfoMap[node.Name] = framework.NewNodeInfo() + } + nodeInfoMap[node.Name].SetNode(node) + } + + for _, v := range nodeInfoMap { + nodeInfos = append(nodeInfos, v) + } + + return &testSharedLister{ + nodes: nodes, + nodeInfos: nodeInfos, + nodeInfoMap: nodeInfoMap, + } +} + +type PredicateClientSetAndHandle struct { + frameworkext.ExtendedHandle + koordinatorClientSet koordinatorclientset.Interface + koordInformerFactory koordinatorinformers.SharedInformerFactory +} + +func NodeResourcesPluginFactoryProxy(factoryFn frameworkruntime.PluginFactory, plugin *framework.Plugin) frameworkruntime.PluginFactory { + return func(args apiruntime.Object, handle framework.Handle) (framework.Plugin, error) { + koordClient := koordfake.NewSimpleClientset() + koordInformerFactory := koordinatorinformers.NewSharedInformerFactory(koordClient, 0) + extenderFactory, err := frameworkext.NewFrameworkExtenderFactory( + frameworkext.WithKoordinatorClientSet(koordClient), + frameworkext.WithKoordinatorSharedInformerFactory(koordInformerFactory)) + if err != nil { + return nil, err + } + extender := extenderFactory.NewFrameworkExtender(handle.(framework.Framework)) + *plugin, err = factoryFn(args, &PredicateClientSetAndHandle{ + ExtendedHandle: extender, + koordinatorClientSet: koordClient, + koordInformerFactory: koordInformerFactory, + }) + return *plugin, err + } +} + +func TestPlugin_Score(t *testing.T) { + + var v1beta2args v1beta3.NodeResourcesFitPlusArgs + v1beta2args.Resources = map[v1.ResourceName]v1beta3.ResourcesType{ + "nvidia.com/gpu": {Type: k8sConfig.MostAllocated, Weight: 2}, + "cpu": {Type: k8sConfig.LeastAllocated, Weight: 1}, + "memory": {Type: k8sConfig.LeastAllocated, Weight: 1}, + } + + var nodeResourcesFitPlusArgs config.NodeResourcesFitPlusArgs + err := v1beta3.Convert_v1beta3_NodeResourcesFitPlusArgs_To_config_NodeResourcesFitPlusArgs(&v1beta2args, &nodeResourcesFitPlusArgs, nil) + assert.NoError(t, err) + + var ptplugin framework.Plugin + proxyNew := NodeResourcesPluginFactoryProxy(New, &ptplugin) + + cs := kubefake.NewSimpleClientset() + informerFactory := informers.NewSharedInformerFactory(cs, 0) + + nodes := []*corev1.Node{ + { + ObjectMeta: metav1.ObjectMeta{ + Name: "testNode1", + }, + Status: corev1.NodeStatus{ + Allocatable: corev1.ResourceList{ + corev1.ResourceCPU: resource.MustParse("96"), + corev1.ResourceMemory: resource.MustParse("512Gi"), + "nvidia.com/gpu": resource.MustParse("8"), + }, + Capacity: corev1.ResourceList{ + corev1.ResourceCPU: resource.MustParse("96"), + corev1.ResourceMemory: resource.MustParse("512Gi"), + "nvidia.com/gpu": resource.MustParse("8"), + }, + }, + }, + { + ObjectMeta: metav1.ObjectMeta{ + Name: "testNode2", + }, + Status: corev1.NodeStatus{ + Allocatable: corev1.ResourceList{ + corev1.ResourceCPU: resource.MustParse("96"), + corev1.ResourceMemory: resource.MustParse("512Gi"), + "nvidia.com/gpu": resource.MustParse("8"), + "xx.xx/xx": resource.MustParse("8"), + }, + Capacity: corev1.ResourceList{ + corev1.ResourceCPU: resource.MustParse("96"), + corev1.ResourceMemory: resource.MustParse("512Gi"), + "nvidia.com/gpu": resource.MustParse("8"), + "xx.xx/xx": resource.MustParse("8"), + }, + }, + }, + } + + pods := []*corev1.Pod{ + { + ObjectMeta: metav1.ObjectMeta{ + Namespace: "default", + Name: "test-pod-0", + }, + Spec: corev1.PodSpec{ + NodeName: "testNode1", + Containers: []corev1.Container{ + { + Name: "test-container", + Resources: corev1.ResourceRequirements{ + Limits: corev1.ResourceList{ + corev1.ResourceCPU: resource.MustParse("16"), + corev1.ResourceMemory: resource.MustParse("32Gi"), + "nvidia.com/gpu": resource.MustParse("4"), + }, + Requests: corev1.ResourceList{ + corev1.ResourceCPU: resource.MustParse("16"), + corev1.ResourceMemory: resource.MustParse("32Gi"), + "nvidia.com/gpu": resource.MustParse("4"), + }, + }, + }, + }, + }, + }, + } + registeredPlugins := []schedulertesting.RegisterPluginFunc{ + schedulertesting.RegisterBindPlugin(defaultbinder.Name, defaultbinder.New), + schedulertesting.RegisterQueueSortPlugin(queuesort.Name, queuesort.New), + } + + snapshot := newTestSharedLister(pods, nodes) + fh, err := schedulertesting.NewFramework(context.TODO(), registeredPlugins, "koord-scheduler", + frameworkruntime.WithClientSet(cs), + frameworkruntime.WithInformerFactory(informerFactory), + frameworkruntime.WithSnapshotSharedLister(snapshot), + ) + assert.Nil(t, err) + + p, err := proxyNew(&nodeResourcesFitPlusArgs, fh) + p.Name() + assert.NotNil(t, p) + assert.Nil(t, err) + plug := p.(*Plugin) + h := plug.handle.(*PredicateClientSetAndHandle) + + informerFactory.Start(context.TODO().Done()) + informerFactory.WaitForCacheSync(context.TODO().Done()) + + h.koordInformerFactory.Start(context.TODO().Done()) + h.koordInformerFactory.WaitForCacheSync(context.TODO().Done()) + + cycleState := framework.NewCycleState() + + nodeInfo, err := snapshot.Get("testNode1") + assert.NoError(t, err) + assert.NotNil(t, nodeInfo) + nodeInfo, err = snapshot.Get("testNode2") + assert.NoError(t, err) + assert.NotNil(t, nodeInfo) + + pod := &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: "default", + Name: "test-pod-1", + }, + Spec: corev1.PodSpec{ + Containers: []corev1.Container{ + { + Name: "test-container", + Resources: corev1.ResourceRequirements{ + Limits: corev1.ResourceList{ + corev1.ResourceCPU: resource.MustParse("16"), + corev1.ResourceMemory: resource.MustParse("32Gi"), + "nvidia.com/gpu": resource.MustParse("2"), + }, + Requests: corev1.ResourceList{ + corev1.ResourceCPU: resource.MustParse("16"), + corev1.ResourceMemory: resource.MustParse("32Gi"), + "nvidia.com/gpu": resource.MustParse("2"), + }, + }, + }, + }, + }, + } + scoreNode1, _ := p.(*Plugin).Score(context.TODO(), cycleState, pod, "testNode1") + scoreNode2, _ := p.(*Plugin).Score(context.TODO(), cycleState, pod, "testNode2") + if scoreNode1 <= scoreNode2 { + t.Fatal("scoreNode1 must <= scoreNode2") + } +} + +func (f *testSharedLister) StorageInfos() framework.StorageInfoLister { + return f +} + +func (f *testSharedLister) IsPVCUsedByPods(key string) bool { + return false +} + +func (f *testSharedLister) NodeInfos() framework.NodeInfoLister { + return f +} + +func (f *testSharedLister) List() ([]*framework.NodeInfo, error) { + return f.nodeInfos, nil +} + +func (f *testSharedLister) HavePodsWithAffinityList() ([]*framework.NodeInfo, error) { + return nil, nil +} + +func (f *testSharedLister) HavePodsWithRequiredAntiAffinityList() ([]*framework.NodeInfo, error) { + return nil, nil +} + +func (f *testSharedLister) Get(nodeName string) (*framework.NodeInfo, error) { + return f.nodeInfoMap[nodeName], nil +} diff --git a/pkg/scheduler/plugins/scarceresourceavoidance/scarce_resource_avoidance.go b/pkg/scheduler/plugins/scarceresourceavoidance/scarce_resource_avoidance.go new file mode 100644 index 000000000..487cd7221 --- /dev/null +++ b/pkg/scheduler/plugins/scarceresourceavoidance/scarce_resource_avoidance.go @@ -0,0 +1,175 @@ +/* +Copyright 2022 The Koordinator Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package scarceresourceavoidance + +import ( + "context" + "fmt" + + v1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/kubernetes/pkg/api/v1/resource" + "k8s.io/kubernetes/pkg/scheduler/framework" + + "github.com/koordinator-sh/koordinator/pkg/scheduler/apis/config" +) + +const ( + // Name is plugin name + Name = "ScarceResourceAvoidance" +) + +var ( + _ framework.ScorePlugin = &Plugin{} +) + +type Plugin struct { + handle framework.Handle + args *config.ScarceResourceAvoidanceArgs +} + +func New(args runtime.Object, handle framework.Handle) (framework.Plugin, error) { + + sampleArgs2, ok := args.(*config.ScarceResourceAvoidanceArgs) + + if !ok { + return nil, fmt.Errorf("want args to be of type ResourceTypesArgs, got %T", args) + } + + return &Plugin{ + handle: handle, + args: sampleArgs2, + }, nil +} + +func (s *Plugin) Name() string { + return Name +} + +func (s *Plugin) Score(ctx context.Context, state *framework.CycleState, p *v1.Pod, nodeName string) (int64, *framework.Status) { + nodeInfo, err := s.handle.SnapshotSharedLister().NodeInfos().Get(nodeName) + if err != nil { + return 0, framework.NewStatus(framework.Error, fmt.Sprintf("getting node %q from Snapshot: %v", nodeName, err)) + } + + podRequest := computePodResourceRequest(p) + podRequestResource, nodeAllocatableResource := fitsRequest(podRequest.Resource, nodeInfo) + diffNames := difference(nodeAllocatableResource, podRequestResource) + intersectNames := intersection(diffNames, s.args.Resources) + + if len(diffNames) == 0 || len(intersectNames) == 0 { + return framework.MaxNodeScore, framework.NewStatus(framework.Success, "") + } + scores := resourceTypesScore(int64(len(intersectNames)), int64(len(diffNames))) + + return scores, framework.NewStatus(framework.Success, "") +} + +func intersection(slice1, slice2 []v1.ResourceName) []v1.ResourceName { + m := make(map[v1.ResourceName]struct{}) + result := []v1.ResourceName{} + + for _, v := range slice2 { + m[v] = struct{}{} + } + + for _, v := range slice1 { + if _, found := m[v]; found { + result = append(result, v) + } + } + + return result +} + +func difference(slice1, slice2 []v1.ResourceName) []v1.ResourceName { + var result []v1.ResourceName + m := make(map[v1.ResourceName]struct{}) + for _, v := range slice2 { + m[v] = struct{}{} + } + + for _, v := range slice1 { + if _, found := m[v]; !found { + result = append(result, v) + } + } + + return result +} + +func (p *Plugin) ScoreExtensions() framework.ScoreExtensions { + return nil +} + +type preFilterState struct { + framework.Resource +} + +func computePodResourceRequest(pod *v1.Pod) *preFilterState { + // pod hasn't scheduled yet so we don't need to worry about InPlacePodVerticalScalingEnabled + reqs := resource.PodRequests(pod, resource.PodResourcesOptions{}) + result := &preFilterState{} + result.SetMaxResource(reqs) + return result +} + +func fitsRequest(podRequest framework.Resource, nodeInfo *framework.NodeInfo) ([]v1.ResourceName, []v1.ResourceName) { + var podRequestResource []v1.ResourceName + var nodeRequestResource []v1.ResourceName + + if podRequest.MilliCPU > 0 { + podRequestResource = append(podRequestResource, v1.ResourceCPU) + } + + if nodeInfo.Allocatable.MilliCPU > 0 { + nodeRequestResource = append(nodeRequestResource, v1.ResourceCPU) + } + + if podRequest.Memory > 0 { + podRequestResource = append(podRequestResource, v1.ResourceMemory) + } + + if nodeInfo.Allocatable.Memory > 0 { + nodeRequestResource = append(nodeRequestResource, v1.ResourceMemory) + } + + if podRequest.EphemeralStorage > 0 { + podRequestResource = append(podRequestResource, v1.ResourceEphemeralStorage) + } + + if nodeInfo.Allocatable.EphemeralStorage > 0 { + nodeRequestResource = append(nodeRequestResource, v1.ResourceEphemeralStorage) + } + + for rName, rQuant := range podRequest.ScalarResources { + if rQuant > 0 { + podRequestResource = append(podRequestResource, rName) + } + } + + for rName, rQuant := range nodeInfo.Allocatable.ScalarResources { + if rQuant > 0 { + nodeRequestResource = append(nodeRequestResource, rName) + } + } + + return podRequestResource, nodeRequestResource +} +func resourceTypesScore(requestsSourcesNum, allocatablesSourcesNum int64) int64 { + return (allocatablesSourcesNum - requestsSourcesNum) * framework.MaxNodeScore / allocatablesSourcesNum +} diff --git a/pkg/scheduler/plugins/scarceresourceavoidance/scarce_resource_avoidance_test.go b/pkg/scheduler/plugins/scarceresourceavoidance/scarce_resource_avoidance_test.go new file mode 100644 index 000000000..e5b2f9822 --- /dev/null +++ b/pkg/scheduler/plugins/scarceresourceavoidance/scarce_resource_avoidance_test.go @@ -0,0 +1,250 @@ +/* +Copyright 2022 The Koordinator Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package scarceresourceavoidance + +import ( + "context" + "testing" + + "github.com/stretchr/testify/assert" + corev1 "k8s.io/api/core/v1" + v1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/resource" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + apiruntime "k8s.io/apimachinery/pkg/runtime" + "k8s.io/client-go/informers" + kubefake "k8s.io/client-go/kubernetes/fake" + "k8s.io/kubernetes/pkg/scheduler/framework" + "k8s.io/kubernetes/pkg/scheduler/framework/plugins/defaultbinder" + "k8s.io/kubernetes/pkg/scheduler/framework/plugins/queuesort" + frameworkruntime "k8s.io/kubernetes/pkg/scheduler/framework/runtime" + schedulertesting "k8s.io/kubernetes/pkg/scheduler/testing" + + koordinatorclientset "github.com/koordinator-sh/koordinator/pkg/client/clientset/versioned" + koordfake "github.com/koordinator-sh/koordinator/pkg/client/clientset/versioned/fake" + koordinatorinformers "github.com/koordinator-sh/koordinator/pkg/client/informers/externalversions" + "github.com/koordinator-sh/koordinator/pkg/scheduler/apis/config" + "github.com/koordinator-sh/koordinator/pkg/scheduler/apis/config/v1beta3" + "github.com/koordinator-sh/koordinator/pkg/scheduler/frameworkext" +) + +var _ framework.SharedLister = &testSharedLister{} + +type testSharedLister struct { + nodes []*corev1.Node + nodeInfos []*framework.NodeInfo + nodeInfoMap map[string]*framework.NodeInfo +} + +func newTestSharedLister(pods []*corev1.Pod, nodes []*corev1.Node) *testSharedLister { + nodeInfoMap := make(map[string]*framework.NodeInfo) + nodeInfos := make([]*framework.NodeInfo, 0) + for _, pod := range pods { + nodeName := pod.Spec.NodeName + if _, ok := nodeInfoMap[nodeName]; !ok { + nodeInfoMap[nodeName] = framework.NewNodeInfo() + } + nodeInfoMap[nodeName].AddPod(pod) + } + for _, node := range nodes { + if _, ok := nodeInfoMap[node.Name]; !ok { + nodeInfoMap[node.Name] = framework.NewNodeInfo() + } + nodeInfoMap[node.Name].SetNode(node) + } + + for _, v := range nodeInfoMap { + nodeInfos = append(nodeInfos, v) + } + + return &testSharedLister{ + nodes: nodes, + nodeInfos: nodeInfos, + nodeInfoMap: nodeInfoMap, + } +} + +type PredicateClientSetAndHandle struct { + frameworkext.ExtendedHandle + koordinatorClientSet koordinatorclientset.Interface + koordInformerFactory koordinatorinformers.SharedInformerFactory +} + +func NodeResourcesPluginFactoryProxy(factoryFn frameworkruntime.PluginFactory, plugin *framework.Plugin) frameworkruntime.PluginFactory { + return func(args apiruntime.Object, handle framework.Handle) (framework.Plugin, error) { + koordClient := koordfake.NewSimpleClientset() + koordInformerFactory := koordinatorinformers.NewSharedInformerFactory(koordClient, 0) + extenderFactory, err := frameworkext.NewFrameworkExtenderFactory( + frameworkext.WithKoordinatorClientSet(koordClient), + frameworkext.WithKoordinatorSharedInformerFactory(koordInformerFactory)) + if err != nil { + return nil, err + } + extender := extenderFactory.NewFrameworkExtender(handle.(framework.Framework)) + *plugin, err = factoryFn(args, &PredicateClientSetAndHandle{ + ExtendedHandle: extender, + koordinatorClientSet: koordClient, + koordInformerFactory: koordInformerFactory, + }) + return *plugin, err + } +} + +func TestPlugin_Score(t *testing.T) { + + var v1beta2args v1beta3.ScarceResourceAvoidanceArgs + v1beta2args.Resources = []v1.ResourceName{"nvidia.com/gpu"} + + var scarceResourceAvoidanceArgs config.ScarceResourceAvoidanceArgs + err := v1beta3.Convert_v1beta3_ScarceResourceAvoidanceArgs_To_config_ScarceResourceAvoidanceArgs(&v1beta2args, &scarceResourceAvoidanceArgs, nil) + assert.NoError(t, err) + + var ptplugin framework.Plugin + proxyNew := NodeResourcesPluginFactoryProxy(New, &ptplugin) + + cs := kubefake.NewSimpleClientset() + informerFactory := informers.NewSharedInformerFactory(cs, 0) + + nodes := []*corev1.Node{ + { + ObjectMeta: metav1.ObjectMeta{ + Name: "testNode1", + }, + Status: corev1.NodeStatus{ + Allocatable: corev1.ResourceList{ + corev1.ResourceCPU: resource.MustParse("96"), + corev1.ResourceMemory: resource.MustParse("512Gi"), + "nvidia.com/gpu": resource.MustParse("8"), + "xx.xx/xx": resource.MustParse("8"), + }, + Capacity: corev1.ResourceList{ + corev1.ResourceCPU: resource.MustParse("96"), + corev1.ResourceMemory: resource.MustParse("512Gi"), + "nvidia.com/gpu": resource.MustParse("8"), + "xx.xx/xx": resource.MustParse("8"), + }, + }, + }, + { + ObjectMeta: metav1.ObjectMeta{ + Name: "testNode2", + }, + Status: corev1.NodeStatus{ + Allocatable: corev1.ResourceList{ + corev1.ResourceCPU: resource.MustParse("96"), + corev1.ResourceMemory: resource.MustParse("512Gi"), + }, + Capacity: corev1.ResourceList{ + corev1.ResourceCPU: resource.MustParse("96"), + corev1.ResourceMemory: resource.MustParse("512Gi"), + }, + }, + }, + } + + registeredPlugins := []schedulertesting.RegisterPluginFunc{ + schedulertesting.RegisterBindPlugin(defaultbinder.Name, defaultbinder.New), + schedulertesting.RegisterQueueSortPlugin(queuesort.Name, queuesort.New), + } + + snapshot := newTestSharedLister(nil, nodes) + fh, err := schedulertesting.NewFramework(context.TODO(), registeredPlugins, "koord-scheduler", + frameworkruntime.WithClientSet(cs), + frameworkruntime.WithInformerFactory(informerFactory), + frameworkruntime.WithSnapshotSharedLister(snapshot), + ) + assert.Nil(t, err) + + p, err := proxyNew(&scarceResourceAvoidanceArgs, fh) + p.Name() + assert.NotNil(t, p) + assert.Nil(t, err) + plug := p.(*Plugin) + h := plug.handle.(*PredicateClientSetAndHandle) + + informerFactory.Start(context.TODO().Done()) + informerFactory.WaitForCacheSync(context.TODO().Done()) + + h.koordInformerFactory.Start(context.TODO().Done()) + h.koordInformerFactory.WaitForCacheSync(context.TODO().Done()) + + cycleState := framework.NewCycleState() + + nodeInfo, err := snapshot.Get("testNode1") + assert.NoError(t, err) + assert.NotNil(t, nodeInfo) + nodeInfo, err = snapshot.Get("testNode2") + assert.NoError(t, err) + assert.NotNil(t, nodeInfo) + + pod := &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: "default", + Name: "test-pod-1", + }, + Spec: corev1.PodSpec{ + Containers: []corev1.Container{ + { + Name: "test-container", + Resources: corev1.ResourceRequirements{ + Limits: corev1.ResourceList{ + corev1.ResourceCPU: resource.MustParse("16"), + corev1.ResourceMemory: resource.MustParse("32Gi"), + }, + Requests: corev1.ResourceList{ + corev1.ResourceCPU: resource.MustParse("16"), + corev1.ResourceMemory: resource.MustParse("32Gi"), + }, + }, + }, + }, + }, + } + scoreNode1, _ := p.(*Plugin).Score(context.TODO(), cycleState, pod, "testNode1") + scoreNode2, _ := p.(*Plugin).Score(context.TODO(), cycleState, pod, "testNode2") + if scoreNode1 >= scoreNode2 { + t.Fatal("scoreNode1 must >= scoreNode2") + } +} + +func (f *testSharedLister) StorageInfos() framework.StorageInfoLister { + return f +} + +func (f *testSharedLister) IsPVCUsedByPods(key string) bool { + return false +} + +func (f *testSharedLister) NodeInfos() framework.NodeInfoLister { + return f +} + +func (f *testSharedLister) List() ([]*framework.NodeInfo, error) { + return f.nodeInfos, nil +} + +func (f *testSharedLister) HavePodsWithAffinityList() ([]*framework.NodeInfo, error) { + return nil, nil +} + +func (f *testSharedLister) HavePodsWithRequiredAntiAffinityList() ([]*framework.NodeInfo, error) { + return nil, nil +} + +func (f *testSharedLister) Get(nodeName string) (*framework.NodeInfo, error) { + return f.nodeInfoMap[nodeName], nil +} From 1b77ba5756c90b7cc114b5909ecde22ae478da7c Mon Sep 17 00:00:00 2001 From: LY-today <724102053@qq.com> Date: Fri, 3 Jan 2025 15:00:21 +0800 Subject: [PATCH 2/5] feat: add preScore Signed-off-by: LY-today <724102053@qq.com> --- .../node_resources_fit_plus.go | 39 ++++++++++++++++-- .../node_resources_fit_plus_test.go | 6 +++ .../scarce_resource_avoidance.go | 40 ++++++++++++++++--- .../scarce_resource_avoidance_test.go | 6 +++ 4 files changed, 82 insertions(+), 9 deletions(-) diff --git a/pkg/scheduler/plugins/noderesourcefitplus/node_resources_fit_plus.go b/pkg/scheduler/plugins/noderesourcefitplus/node_resources_fit_plus.go index ad8837c8d..5651375f5 100644 --- a/pkg/scheduler/plugins/noderesourcefitplus/node_resources_fit_plus.go +++ b/pkg/scheduler/plugins/noderesourcefitplus/node_resources_fit_plus.go @@ -34,6 +34,8 @@ import ( const ( // Name is plugin name Name = "NodeResourcesFitPlus" + + preScoreStateKey = "PreScore" + Name ) var ( @@ -63,6 +65,11 @@ func (s *Plugin) Name() string { return Name } +func (s *Plugin) PreScore(ctx context.Context, cycleState *framework.CycleState, pod *v1.Pod, nodes []*v1.Node) *framework.Status { + cycleState.Write(preScoreStateKey, computePodResourceRequest(pod)) + return nil +} + func (s *Plugin) Score(ctx context.Context, state *framework.CycleState, p *v1.Pod, nodeName string) (int64, *framework.Status) { nodeInfo, err := s.handle.SnapshotSharedLister().NodeInfos().Get(nodeName) @@ -73,7 +80,12 @@ func (s *Plugin) Score(ctx context.Context, state *framework.CycleState, p *v1.P var nodeScore int64 var weightSum int64 - podRequest, _ := fitsRequest(computePodResourceRequest(p).Resource, nodeInfo) + scoreState, err := getPreScoreState(state) + if err != nil { + return 0, framework.NewStatus(framework.Error, fmt.Sprintf("get State node %q from PreScore: %v", nodeName, err)) + } + + podRequest, _ := fitsRequest(scoreState.Resource, nodeInfo) for _, requestSourceName := range podRequest { v, ok := s.args.Resources[requestSourceName] @@ -115,14 +127,19 @@ func (p *Plugin) ScoreExtensions() framework.ScoreExtensions { return nil } -type preFilterState struct { +type preScoreState struct { framework.Resource } -func computePodResourceRequest(pod *v1.Pod) *preFilterState { +// Clone the prefilter state. +func (s *preScoreState) Clone() framework.StateData { + return s +} + +func computePodResourceRequest(pod *v1.Pod) *preScoreState { // pod hasn't scheduled yet so we don't need to worry about InPlacePodVerticalScalingEnabled reqs := resource.PodRequests(pod, resource.PodResourcesOptions{}) - result := &preFilterState{} + result := &preScoreState{} result.SetMaxResource(reqs) return result } @@ -169,3 +186,17 @@ func fitsRequest(podRequest framework.Resource, nodeInfo *framework.NodeInfo) ([ return podRequestResource, nodeRequestResource } + +func getPreScoreState(cycleState *framework.CycleState) (*preScoreState, error) { + c, err := cycleState.Read(preScoreStateKey) + if err != nil { + // preFilterState doesn't exist, likely PreFilter wasn't invoked. + return nil, fmt.Errorf("error reading %q from cycleState: %w", preScoreStateKey, err) + } + + s, ok := c.(*preScoreState) + if !ok { + return nil, fmt.Errorf("%+v convert to NodeResourcesFit.preFilterState error", c) + } + return s, nil +} diff --git a/pkg/scheduler/plugins/noderesourcefitplus/node_resources_fit_plus_test.go b/pkg/scheduler/plugins/noderesourcefitplus/node_resources_fit_plus_test.go index 0ee87ee2b..17e344893 100644 --- a/pkg/scheduler/plugins/noderesourcefitplus/node_resources_fit_plus_test.go +++ b/pkg/scheduler/plugins/noderesourcefitplus/node_resources_fit_plus_test.go @@ -251,6 +251,12 @@ func TestPlugin_Score(t *testing.T) { }, }, } + + status := p.(framework.PreScorePlugin).PreScore(context.TODO(), cycleState, pod, nodes) + if status != nil { + t.Fatal("PreScore run err") + } + scoreNode1, _ := p.(*Plugin).Score(context.TODO(), cycleState, pod, "testNode1") scoreNode2, _ := p.(*Plugin).Score(context.TODO(), cycleState, pod, "testNode2") if scoreNode1 <= scoreNode2 { diff --git a/pkg/scheduler/plugins/scarceresourceavoidance/scarce_resource_avoidance.go b/pkg/scheduler/plugins/scarceresourceavoidance/scarce_resource_avoidance.go index 487cd7221..f1d2fa437 100644 --- a/pkg/scheduler/plugins/scarceresourceavoidance/scarce_resource_avoidance.go +++ b/pkg/scheduler/plugins/scarceresourceavoidance/scarce_resource_avoidance.go @@ -31,6 +31,8 @@ import ( const ( // Name is plugin name Name = "ScarceResourceAvoidance" + + preScoreStateKey = "PreScore" + Name ) var ( @@ -60,14 +62,22 @@ func (s *Plugin) Name() string { return Name } +func (s *Plugin) PreScore(ctx context.Context, cycleState *framework.CycleState, pod *v1.Pod, nodes []*v1.Node) *framework.Status { + cycleState.Write(preScoreStateKey, computePodResourceRequest(pod)) + return nil +} + func (s *Plugin) Score(ctx context.Context, state *framework.CycleState, p *v1.Pod, nodeName string) (int64, *framework.Status) { nodeInfo, err := s.handle.SnapshotSharedLister().NodeInfos().Get(nodeName) if err != nil { return 0, framework.NewStatus(framework.Error, fmt.Sprintf("getting node %q from Snapshot: %v", nodeName, err)) } - podRequest := computePodResourceRequest(p) - podRequestResource, nodeAllocatableResource := fitsRequest(podRequest.Resource, nodeInfo) + scoreState, err := getPreScoreState(state) + if err != nil { + return 0, framework.NewStatus(framework.Error, fmt.Sprintf("get State node %q from PreScore: %v", nodeName, err)) + } + podRequestResource, nodeAllocatableResource := fitsRequest(scoreState.Resource, nodeInfo) diffNames := difference(nodeAllocatableResource, podRequestResource) intersectNames := intersection(diffNames, s.args.Resources) @@ -116,14 +126,19 @@ func (p *Plugin) ScoreExtensions() framework.ScoreExtensions { return nil } -type preFilterState struct { +type preScoreState struct { framework.Resource } -func computePodResourceRequest(pod *v1.Pod) *preFilterState { +// Clone the prefilter state. +func (s *preScoreState) Clone() framework.StateData { + return s +} + +func computePodResourceRequest(pod *v1.Pod) *preScoreState { // pod hasn't scheduled yet so we don't need to worry about InPlacePodVerticalScalingEnabled reqs := resource.PodRequests(pod, resource.PodResourcesOptions{}) - result := &preFilterState{} + result := &preScoreState{} result.SetMaxResource(reqs) return result } @@ -170,6 +185,21 @@ func fitsRequest(podRequest framework.Resource, nodeInfo *framework.NodeInfo) ([ return podRequestResource, nodeRequestResource } + func resourceTypesScore(requestsSourcesNum, allocatablesSourcesNum int64) int64 { return (allocatablesSourcesNum - requestsSourcesNum) * framework.MaxNodeScore / allocatablesSourcesNum } + +func getPreScoreState(cycleState *framework.CycleState) (*preScoreState, error) { + c, err := cycleState.Read(preScoreStateKey) + if err != nil { + // preFilterState doesn't exist, likely PreFilter wasn't invoked. + return nil, fmt.Errorf("error reading %q from cycleState: %w", preScoreStateKey, err) + } + + s, ok := c.(*preScoreState) + if !ok { + return nil, fmt.Errorf("%+v convert to NodeResourcesFit.preFilterState error", c) + } + return s, nil +} diff --git a/pkg/scheduler/plugins/scarceresourceavoidance/scarce_resource_avoidance_test.go b/pkg/scheduler/plugins/scarceresourceavoidance/scarce_resource_avoidance_test.go index e5b2f9822..f53baa72b 100644 --- a/pkg/scheduler/plugins/scarceresourceavoidance/scarce_resource_avoidance_test.go +++ b/pkg/scheduler/plugins/scarceresourceavoidance/scarce_resource_avoidance_test.go @@ -214,6 +214,12 @@ func TestPlugin_Score(t *testing.T) { }, }, } + + status := p.(framework.PreScorePlugin).PreScore(context.TODO(), cycleState, pod, nodes) + if status != nil { + t.Fatal("PreScore run err") + } + scoreNode1, _ := p.(*Plugin).Score(context.TODO(), cycleState, pod, "testNode1") scoreNode2, _ := p.(*Plugin).Score(context.TODO(), cycleState, pod, "testNode2") if scoreNode1 >= scoreNode2 { From 8438a3d300405d6551550c8d7345f92d8bf55c43 Mon Sep 17 00:00:00 2001 From: LY-today <724102053@qq.com> Date: Mon, 13 Jan 2025 14:23:09 +0800 Subject: [PATCH 3/5] feat: update noderesourcefitplus Signed-off-by: LY-today <724102053@qq.com> --- pkg/scheduler/apis/config/v1/types.go | 23 ++ .../apis/config/v1/zz_generated.conversion.go | 92 ++++++++ .../apis/config/v1/zz_generated.deepcopy.go | 78 +++++++ .../node_resource_fit_plus_utils.go | 203 ++++++++++++++++++ .../node_resources_fit_plus.go | 103 ++------- .../node_resources_fit_plus_test.go | 2 +- .../scarce_resource_avoidance.go | 45 +--- 7 files changed, 422 insertions(+), 124 deletions(-) create mode 100644 pkg/scheduler/plugins/noderesourcefitplus/node_resource_fit_plus_utils.go diff --git a/pkg/scheduler/apis/config/v1/types.go b/pkg/scheduler/apis/config/v1/types.go index 28978fbd8..b5f1c2b1c 100644 --- a/pkg/scheduler/apis/config/v1/types.go +++ b/pkg/scheduler/apis/config/v1/types.go @@ -18,8 +18,10 @@ package v1 import ( corev1 "k8s.io/api/core/v1" + v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" schedconfigv1 "k8s.io/kube-scheduler/config/v1" + "k8s.io/kubernetes/pkg/scheduler/apis/config" "github.com/koordinator-sh/koordinator/apis/extension" ) @@ -232,3 +234,24 @@ type DeviceShareArgs struct { // DisableDeviceNUMATopologyAlignment indicates device don't need to align with other resources' numa topology DisableDeviceNUMATopologyAlignment bool `json:"disableDeviceNUMATopologyAlignment,omitempty"` } + +// +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object + +// ScarceResourceAvoidanceArgs defines the parameters for ScarceResourceAvoidance plugin. +type ScarceResourceAvoidanceArgs struct { + metav1.TypeMeta + Resources []v1.ResourceName `json:"resources,omitempty"` +} + +// +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object + +// NodeResourcesFitPlusArgs defines the parameters for NodeResourcesFitPlus plugin. +type NodeResourcesFitPlusArgs struct { + metav1.TypeMeta + Resources map[v1.ResourceName]ResourcesType `json:"resources"` +} + +type ResourcesType struct { + Type config.ScoringStrategyType `json:"type"` + Weight int64 `json:"weight"` +} diff --git a/pkg/scheduler/apis/config/v1/zz_generated.conversion.go b/pkg/scheduler/apis/config/v1/zz_generated.conversion.go index d42180401..799ade707 100644 --- a/pkg/scheduler/apis/config/v1/zz_generated.conversion.go +++ b/pkg/scheduler/apis/config/v1/zz_generated.conversion.go @@ -101,6 +101,16 @@ func RegisterConversions(s *runtime.Scheme) error { }); err != nil { return err } + if err := s.AddGeneratedConversionFunc((*NodeResourcesFitPlusArgs)(nil), (*config.NodeResourcesFitPlusArgs)(nil), func(a, b interface{}, scope conversion.Scope) error { + return Convert_v1_NodeResourcesFitPlusArgs_To_config_NodeResourcesFitPlusArgs(a.(*NodeResourcesFitPlusArgs), b.(*config.NodeResourcesFitPlusArgs), scope) + }); err != nil { + return err + } + if err := s.AddGeneratedConversionFunc((*config.NodeResourcesFitPlusArgs)(nil), (*NodeResourcesFitPlusArgs)(nil), func(a, b interface{}, scope conversion.Scope) error { + return Convert_config_NodeResourcesFitPlusArgs_To_v1_NodeResourcesFitPlusArgs(a.(*config.NodeResourcesFitPlusArgs), b.(*NodeResourcesFitPlusArgs), scope) + }); err != nil { + return err + } if err := s.AddGeneratedConversionFunc((*ReservationArgs)(nil), (*config.ReservationArgs)(nil), func(a, b interface{}, scope conversion.Scope) error { return Convert_v1_ReservationArgs_To_config_ReservationArgs(a.(*ReservationArgs), b.(*config.ReservationArgs), scope) }); err != nil { @@ -111,6 +121,26 @@ func RegisterConversions(s *runtime.Scheme) error { }); err != nil { return err } + if err := s.AddGeneratedConversionFunc((*ResourcesType)(nil), (*config.ResourcesType)(nil), func(a, b interface{}, scope conversion.Scope) error { + return Convert_v1_ResourcesType_To_config_ResourcesType(a.(*ResourcesType), b.(*config.ResourcesType), scope) + }); err != nil { + return err + } + if err := s.AddGeneratedConversionFunc((*config.ResourcesType)(nil), (*ResourcesType)(nil), func(a, b interface{}, scope conversion.Scope) error { + return Convert_config_ResourcesType_To_v1_ResourcesType(a.(*config.ResourcesType), b.(*ResourcesType), scope) + }); err != nil { + return err + } + if err := s.AddGeneratedConversionFunc((*ScarceResourceAvoidanceArgs)(nil), (*config.ScarceResourceAvoidanceArgs)(nil), func(a, b interface{}, scope conversion.Scope) error { + return Convert_v1_ScarceResourceAvoidanceArgs_To_config_ScarceResourceAvoidanceArgs(a.(*ScarceResourceAvoidanceArgs), b.(*config.ScarceResourceAvoidanceArgs), scope) + }); err != nil { + return err + } + if err := s.AddGeneratedConversionFunc((*config.ScarceResourceAvoidanceArgs)(nil), (*ScarceResourceAvoidanceArgs)(nil), func(a, b interface{}, scope conversion.Scope) error { + return Convert_config_ScarceResourceAvoidanceArgs_To_v1_ScarceResourceAvoidanceArgs(a.(*config.ScarceResourceAvoidanceArgs), b.(*ScarceResourceAvoidanceArgs), scope) + }); err != nil { + return err + } if err := s.AddGeneratedConversionFunc((*ScoringStrategy)(nil), (*config.ScoringStrategy)(nil), func(a, b interface{}, scope conversion.Scope) error { return Convert_v1_ScoringStrategy_To_config_ScoringStrategy(a.(*ScoringStrategy), b.(*config.ScoringStrategy), scope) }); err != nil { @@ -360,6 +390,26 @@ func Convert_config_NodeNUMAResourceArgs_To_v1_NodeNUMAResourceArgs(in *config.N return autoConvert_config_NodeNUMAResourceArgs_To_v1_NodeNUMAResourceArgs(in, out, s) } +func autoConvert_v1_NodeResourcesFitPlusArgs_To_config_NodeResourcesFitPlusArgs(in *NodeResourcesFitPlusArgs, out *config.NodeResourcesFitPlusArgs, s conversion.Scope) error { + out.Resources = *(*map[corev1.ResourceName]config.ResourcesType)(unsafe.Pointer(&in.Resources)) + return nil +} + +// Convert_v1_NodeResourcesFitPlusArgs_To_config_NodeResourcesFitPlusArgs is an autogenerated conversion function. +func Convert_v1_NodeResourcesFitPlusArgs_To_config_NodeResourcesFitPlusArgs(in *NodeResourcesFitPlusArgs, out *config.NodeResourcesFitPlusArgs, s conversion.Scope) error { + return autoConvert_v1_NodeResourcesFitPlusArgs_To_config_NodeResourcesFitPlusArgs(in, out, s) +} + +func autoConvert_config_NodeResourcesFitPlusArgs_To_v1_NodeResourcesFitPlusArgs(in *config.NodeResourcesFitPlusArgs, out *NodeResourcesFitPlusArgs, s conversion.Scope) error { + out.Resources = *(*map[corev1.ResourceName]ResourcesType)(unsafe.Pointer(&in.Resources)) + return nil +} + +// Convert_config_NodeResourcesFitPlusArgs_To_v1_NodeResourcesFitPlusArgs is an autogenerated conversion function. +func Convert_config_NodeResourcesFitPlusArgs_To_v1_NodeResourcesFitPlusArgs(in *config.NodeResourcesFitPlusArgs, out *NodeResourcesFitPlusArgs, s conversion.Scope) error { + return autoConvert_config_NodeResourcesFitPlusArgs_To_v1_NodeResourcesFitPlusArgs(in, out, s) +} + func autoConvert_v1_ReservationArgs_To_config_ReservationArgs(in *ReservationArgs, out *config.ReservationArgs, s conversion.Scope) error { if err := metav1.Convert_Pointer_bool_To_bool(&in.EnablePreemption, &out.EnablePreemption, s); err != nil { return err @@ -396,6 +446,48 @@ func Convert_config_ReservationArgs_To_v1_ReservationArgs(in *config.Reservation return autoConvert_config_ReservationArgs_To_v1_ReservationArgs(in, out, s) } +func autoConvert_v1_ResourcesType_To_config_ResourcesType(in *ResourcesType, out *config.ResourcesType, s conversion.Scope) error { + out.Type = apisconfig.ScoringStrategyType(in.Type) + out.Weight = in.Weight + return nil +} + +// Convert_v1_ResourcesType_To_config_ResourcesType is an autogenerated conversion function. +func Convert_v1_ResourcesType_To_config_ResourcesType(in *ResourcesType, out *config.ResourcesType, s conversion.Scope) error { + return autoConvert_v1_ResourcesType_To_config_ResourcesType(in, out, s) +} + +func autoConvert_config_ResourcesType_To_v1_ResourcesType(in *config.ResourcesType, out *ResourcesType, s conversion.Scope) error { + out.Type = apisconfig.ScoringStrategyType(in.Type) + out.Weight = in.Weight + return nil +} + +// Convert_config_ResourcesType_To_v1_ResourcesType is an autogenerated conversion function. +func Convert_config_ResourcesType_To_v1_ResourcesType(in *config.ResourcesType, out *ResourcesType, s conversion.Scope) error { + return autoConvert_config_ResourcesType_To_v1_ResourcesType(in, out, s) +} + +func autoConvert_v1_ScarceResourceAvoidanceArgs_To_config_ScarceResourceAvoidanceArgs(in *ScarceResourceAvoidanceArgs, out *config.ScarceResourceAvoidanceArgs, s conversion.Scope) error { + out.Resources = *(*[]corev1.ResourceName)(unsafe.Pointer(&in.Resources)) + return nil +} + +// Convert_v1_ScarceResourceAvoidanceArgs_To_config_ScarceResourceAvoidanceArgs is an autogenerated conversion function. +func Convert_v1_ScarceResourceAvoidanceArgs_To_config_ScarceResourceAvoidanceArgs(in *ScarceResourceAvoidanceArgs, out *config.ScarceResourceAvoidanceArgs, s conversion.Scope) error { + return autoConvert_v1_ScarceResourceAvoidanceArgs_To_config_ScarceResourceAvoidanceArgs(in, out, s) +} + +func autoConvert_config_ScarceResourceAvoidanceArgs_To_v1_ScarceResourceAvoidanceArgs(in *config.ScarceResourceAvoidanceArgs, out *ScarceResourceAvoidanceArgs, s conversion.Scope) error { + out.Resources = *(*[]corev1.ResourceName)(unsafe.Pointer(&in.Resources)) + return nil +} + +// Convert_config_ScarceResourceAvoidanceArgs_To_v1_ScarceResourceAvoidanceArgs is an autogenerated conversion function. +func Convert_config_ScarceResourceAvoidanceArgs_To_v1_ScarceResourceAvoidanceArgs(in *config.ScarceResourceAvoidanceArgs, out *ScarceResourceAvoidanceArgs, s conversion.Scope) error { + return autoConvert_config_ScarceResourceAvoidanceArgs_To_v1_ScarceResourceAvoidanceArgs(in, out, s) +} + func autoConvert_v1_ScoringStrategy_To_config_ScoringStrategy(in *ScoringStrategy, out *config.ScoringStrategy, s conversion.Scope) error { out.Type = config.ScoringStrategyType(in.Type) out.Resources = *(*[]apisconfig.ResourceSpec)(unsafe.Pointer(&in.Resources)) diff --git a/pkg/scheduler/apis/config/v1/zz_generated.deepcopy.go b/pkg/scheduler/apis/config/v1/zz_generated.deepcopy.go index 4ea7d24fb..0991ee336 100644 --- a/pkg/scheduler/apis/config/v1/zz_generated.deepcopy.go +++ b/pkg/scheduler/apis/config/v1/zz_generated.deepcopy.go @@ -313,6 +313,38 @@ func (in *NodeNUMAResourceArgs) DeepCopyObject() runtime.Object { return nil } +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *NodeResourcesFitPlusArgs) DeepCopyInto(out *NodeResourcesFitPlusArgs) { + *out = *in + out.TypeMeta = in.TypeMeta + if in.Resources != nil { + in, out := &in.Resources, &out.Resources + *out = make(map[corev1.ResourceName]ResourcesType, len(*in)) + for key, val := range *in { + (*out)[key] = val + } + } + return +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new NodeResourcesFitPlusArgs. +func (in *NodeResourcesFitPlusArgs) DeepCopy() *NodeResourcesFitPlusArgs { + if in == nil { + return nil + } + out := new(NodeResourcesFitPlusArgs) + in.DeepCopyInto(out) + return out +} + +// DeepCopyObject is an autogenerated deepcopy function, copying the receiver, creating a new runtime.Object. +func (in *NodeResourcesFitPlusArgs) DeepCopyObject() runtime.Object { + if c := in.DeepCopy(); c != nil { + return c + } + return nil +} + // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *ReservationArgs) DeepCopyInto(out *ReservationArgs) { *out = *in @@ -353,6 +385,52 @@ func (in *ReservationArgs) DeepCopyObject() runtime.Object { return nil } +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *ResourcesType) DeepCopyInto(out *ResourcesType) { + *out = *in + return +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new ResourcesType. +func (in *ResourcesType) DeepCopy() *ResourcesType { + if in == nil { + return nil + } + out := new(ResourcesType) + in.DeepCopyInto(out) + return out +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *ScarceResourceAvoidanceArgs) DeepCopyInto(out *ScarceResourceAvoidanceArgs) { + *out = *in + out.TypeMeta = in.TypeMeta + if in.Resources != nil { + in, out := &in.Resources, &out.Resources + *out = make([]corev1.ResourceName, len(*in)) + copy(*out, *in) + } + return +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new ScarceResourceAvoidanceArgs. +func (in *ScarceResourceAvoidanceArgs) DeepCopy() *ScarceResourceAvoidanceArgs { + if in == nil { + return nil + } + out := new(ScarceResourceAvoidanceArgs) + in.DeepCopyInto(out) + return out +} + +// DeepCopyObject is an autogenerated deepcopy function, copying the receiver, creating a new runtime.Object. +func (in *ScarceResourceAvoidanceArgs) DeepCopyObject() runtime.Object { + if c := in.DeepCopy(); c != nil { + return c + } + return nil +} + // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *ScoringStrategy) DeepCopyInto(out *ScoringStrategy) { *out = *in diff --git a/pkg/scheduler/plugins/noderesourcefitplus/node_resource_fit_plus_utils.go b/pkg/scheduler/plugins/noderesourcefitplus/node_resource_fit_plus_utils.go new file mode 100644 index 000000000..900499889 --- /dev/null +++ b/pkg/scheduler/plugins/noderesourcefitplus/node_resource_fit_plus_utils.go @@ -0,0 +1,203 @@ +/* +Copyright 2022 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package noderesourcesfitplus + +import ( + v1 "k8s.io/api/core/v1" + utilfeature "k8s.io/apiserver/pkg/util/feature" + "k8s.io/klog/v2" + "k8s.io/kubernetes/pkg/api/v1/resource" + k8sConfig "k8s.io/kubernetes/pkg/scheduler/apis/config" + "k8s.io/kubernetes/pkg/scheduler/framework" + schedutil "k8s.io/kubernetes/pkg/scheduler/util" + + "github.com/koordinator-sh/koordinator/pkg/scheduler/apis/config" +) + +type ResourceAllocationPriority struct { + scorer func(nodeName string, args *config.NodeResourcesFitPlusArgs, requestedMap, allocatableMap map[v1.ResourceName]int64) int64 +} + +func mostRequestedScore(requested, capacity int64) int64 { + if capacity == 0 { + return 0 + } + if requested > capacity { + requested = capacity + } + + return requested * framework.MaxNodeScore / capacity +} + +func leastRequestedScore(requested, capacity int64) int64 { + if capacity == 0 { + return 0 + } + if requested > capacity { + return 0 + } + + return ((capacity - requested) * framework.MaxNodeScore) / capacity +} + +func resourceScorer(nodeName string, args *config.NodeResourcesFitPlusArgs, requestedMap, allocatableMap map[v1.ResourceName]int64) int64 { + var nodeScore int64 + var weightSum int64 + + for resourceName, requested := range requestedMap { + if _, ok := args.Resources[resourceName]; !ok { + continue + } + resourceArgs := args.Resources[resourceName] + + var resourceScore int64 + + switch resourceArgs.Type { + case k8sConfig.MostAllocated: + resourceScore = mostRequestedScore(requested, allocatableMap[resourceName]) + case k8sConfig.LeastAllocated: + resourceScore = leastRequestedScore(requested, allocatableMap[resourceName]) + } + nodeScore += resourceScore * resourceArgs.Weight + weightSum += resourceArgs.Weight + + } + if weightSum == 0 { + return framework.MaxNodeScore + } + + i := nodeScore / weightSum + + return i +} + +func (r *ResourceAllocationPriority) getResourceScore(args *config.NodeResourcesFitPlusArgs, podRequestNames []v1.ResourceName, pod *v1.Pod, nodeInfo *framework.NodeInfo, nodeName string) int64 { + requested := make(resourceToValueMap, len(podRequestNames)) + allocatable := make(resourceToValueMap, len(podRequestNames)) + for _, resourceName := range podRequestNames { + allocatable[resourceName], requested[resourceName] = calculateResourceAllocatableRequest(nodeInfo, pod, resourceName) + } + + var score int64 + score = r.scorer(nodeName, args, requested, allocatable) + + return score +} + +func computePodResourceRequest(pod *v1.Pod) *preScoreState { + // pod hasn't scheduled yet so we don't need to worry about InPlacePodVerticalScalingEnabled + reqs := resource.PodRequests(pod, resource.PodResourcesOptions{}) + result := &preScoreState{} + result.SetMaxResource(reqs) + result.ResourceName = fitsPodRequestName(result.Resource) + return result +} + +// resourceToValueMap contains resource name and score. +type resourceToValueMap map[v1.ResourceName]int64 + +// calculateResourceAllocatableRequest returns resources Allocatable and Requested values +func calculateResourceAllocatableRequest(nodeInfo *framework.NodeInfo, pod *v1.Pod, resource v1.ResourceName) (int64, int64) { + podRequest := calculatePodResourceRequest(pod, resource) + switch resource { + case v1.ResourceCPU: + return nodeInfo.Allocatable.MilliCPU, nodeInfo.NonZeroRequested.MilliCPU + podRequest + case v1.ResourceMemory: + return nodeInfo.Allocatable.Memory, nodeInfo.NonZeroRequested.Memory + podRequest + + case v1.ResourceEphemeralStorage: + return nodeInfo.Allocatable.EphemeralStorage, nodeInfo.Requested.EphemeralStorage + podRequest + default: + if schedutil.IsScalarResourceName(resource) { + return nodeInfo.Allocatable.ScalarResources[resource], nodeInfo.Requested.ScalarResources[resource] + podRequest + } + } + if klog.V(10).Enabled() { + klog.Infof("requested resource %v not considered for node score calculation", + resource, + ) + } + return 0, 0 +} + +// calculatePodResourceRequest returns the total non-zero requests. If Overhead is defined for the pod and the +// PodOverhead feature is enabled, the Overhead is added to the result. +// podResourceRequest = max(sum(podSpec.Containers), podSpec.InitContainers) + overHead +func calculatePodResourceRequest(pod *v1.Pod, resource v1.ResourceName) int64 { + var podRequest int64 + for i := range pod.Spec.Containers { + container := &pod.Spec.Containers[i] + value := GetNonzeroRequestForResource(resource, &container.Resources.Requests) + podRequest += value + } + + for i := range pod.Spec.InitContainers { + initContainer := &pod.Spec.InitContainers[i] + value := GetNonzeroRequestForResource(resource, &initContainer.Resources.Requests) + if podRequest < value { + podRequest = value + } + } + + // If Overhead is being utilized, add to the total requests for the pod + if pod.Spec.Overhead != nil && utilfeature.DefaultFeatureGate.Enabled("PodOverhead") { + if quantity, found := pod.Spec.Overhead[resource]; found { + podRequest += quantity.Value() + } + } + + return podRequest +} + +// GetNonzeroRequestForResource returns the default resource request if none is found or +// what is provided on the request. +func GetNonzeroRequestForResource(resource v1.ResourceName, requests *v1.ResourceList) int64 { + switch resource { + case v1.ResourceCPU: + // Override if un-set, but not if explicitly set to zero + if _, found := (*requests)[v1.ResourceCPU]; !found { + return schedutil.DefaultMilliCPURequest + } + return requests.Cpu().MilliValue() + case v1.ResourceMemory: + // Override if un-set, but not if explicitly set to zero + if _, found := (*requests)[v1.ResourceMemory]; !found { + return schedutil.DefaultMemoryRequest + } + return requests.Memory().Value() + case v1.ResourceEphemeralStorage: + // if the local storage capacity isolation feature gate is disabled, pods request 0 disk. + if !utilfeature.DefaultFeatureGate.Enabled("LocalStorageCapacityIsolation") { + return 0 + } + + quantity, found := (*requests)[v1.ResourceEphemeralStorage] + if !found { + return 0 + } + return quantity.Value() + default: + if schedutil.IsScalarResourceName(resource) { + quantity, found := (*requests)[resource] + if !found { + return 0 + } + return quantity.Value() + } + } + return 0 +} diff --git a/pkg/scheduler/plugins/noderesourcefitplus/node_resources_fit_plus.go b/pkg/scheduler/plugins/noderesourcefitplus/node_resources_fit_plus.go index 5651375f5..fb57d0a63 100644 --- a/pkg/scheduler/plugins/noderesourcefitplus/node_resources_fit_plus.go +++ b/pkg/scheduler/plugins/noderesourcefitplus/node_resources_fit_plus.go @@ -22,11 +22,7 @@ import ( v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/runtime" - "k8s.io/kubernetes/pkg/api/v1/resource" - k8sConfig "k8s.io/kubernetes/pkg/scheduler/apis/config" "k8s.io/kubernetes/pkg/scheduler/framework" - plfeature "k8s.io/kubernetes/pkg/scheduler/framework/plugins/feature" - "k8s.io/kubernetes/pkg/scheduler/framework/plugins/noderesources" "github.com/koordinator-sh/koordinator/pkg/scheduler/apis/config" ) @@ -48,16 +44,15 @@ type Plugin struct { } func New(args runtime.Object, handle framework.Handle) (framework.Plugin, error) { - - sampleArgs2, ok := args.(*config.NodeResourcesFitPlusArgs) + nodeResourcesFitPlusArgs, ok := args.(*config.NodeResourcesFitPlusArgs) if !ok { - return nil, fmt.Errorf("want args to be of type NodeResourcesArgs, got %T", args) + return nil, fmt.Errorf("want args to be of type NodeResourcesArgs, got %T", nodeResourcesFitPlusArgs) } return &Plugin{ handle: handle, - args: sampleArgs2, + args: nodeResourcesFitPlusArgs, }, nil } @@ -65,60 +60,36 @@ func (s *Plugin) Name() string { return Name } +type preScoreState struct { + framework.Resource + ResourceName []v1.ResourceName +} + +// Clone the prefilter state. +func (s *preScoreState) Clone() framework.StateData { + return s +} + func (s *Plugin) PreScore(ctx context.Context, cycleState *framework.CycleState, pod *v1.Pod, nodes []*v1.Node) *framework.Status { cycleState.Write(preScoreStateKey, computePodResourceRequest(pod)) return nil } func (s *Plugin) Score(ctx context.Context, state *framework.CycleState, p *v1.Pod, nodeName string) (int64, *framework.Status) { - nodeInfo, err := s.handle.SnapshotSharedLister().NodeInfos().Get(nodeName) if err != nil { return 0, framework.NewStatus(framework.Error, fmt.Sprintf("getting node %q from Snapshot: %v", nodeName, err)) } - var nodeScore int64 - var weightSum int64 + r := ResourceAllocationPriority{ + scorer: resourceScorer, + } scoreState, err := getPreScoreState(state) if err != nil { return 0, framework.NewStatus(framework.Error, fmt.Sprintf("get State node %q from PreScore: %v", nodeName, err)) } - - podRequest, _ := fitsRequest(scoreState.Resource, nodeInfo) - - for _, requestSourceName := range podRequest { - v, ok := s.args.Resources[requestSourceName] - if !ok { - continue - } - fit, err := noderesources.NewFit( - &k8sConfig.NodeResourcesFitArgs{ - ScoringStrategy: &k8sConfig.ScoringStrategy{ - Type: v.Type, // MostAllocated or LeastAllocated - Resources: []k8sConfig.ResourceSpec{ - {Name: string(requestSourceName), Weight: 1}, - }, - }, - }, s.handle, plfeature.Features{}) - - if err != nil { - return 0, framework.NewStatus(framework.Error, err.Error()) - } - - resourceScore, status := fit.(framework.ScorePlugin).Score(ctx, state, p, nodeName) - if !status.IsSuccess() { - return 0, framework.NewStatus(framework.Error, err.Error()) - } - - nodeScore += resourceScore * v.Weight - weightSum += v.Weight - } - - if weightSum == 0 { - return framework.MaxNodeScore, framework.NewStatus(framework.Success, "") - } - scores := nodeScore / weightSum + scores := r.getResourceScore(s.args, scoreState.ResourceName, p, nodeInfo, nodeName) return scores, framework.NewStatus(framework.Success, "") } @@ -127,64 +98,28 @@ func (p *Plugin) ScoreExtensions() framework.ScoreExtensions { return nil } -type preScoreState struct { - framework.Resource -} - -// Clone the prefilter state. -func (s *preScoreState) Clone() framework.StateData { - return s -} - -func computePodResourceRequest(pod *v1.Pod) *preScoreState { - // pod hasn't scheduled yet so we don't need to worry about InPlacePodVerticalScalingEnabled - reqs := resource.PodRequests(pod, resource.PodResourcesOptions{}) - result := &preScoreState{} - result.SetMaxResource(reqs) - return result -} - -func fitsRequest(podRequest framework.Resource, nodeInfo *framework.NodeInfo) ([]v1.ResourceName, []v1.ResourceName) { +func fitsPodRequestName(podRequest framework.Resource) []v1.ResourceName { var podRequestResource []v1.ResourceName - var nodeRequestResource []v1.ResourceName if podRequest.MilliCPU > 0 { podRequestResource = append(podRequestResource, v1.ResourceCPU) } - if nodeInfo.Allocatable.MilliCPU > 0 { - nodeRequestResource = append(nodeRequestResource, v1.ResourceCPU) - } - if podRequest.Memory > 0 { podRequestResource = append(podRequestResource, v1.ResourceMemory) } - if nodeInfo.Allocatable.Memory > 0 { - nodeRequestResource = append(nodeRequestResource, v1.ResourceMemory) - } - if podRequest.EphemeralStorage > 0 { podRequestResource = append(podRequestResource, v1.ResourceEphemeralStorage) } - if nodeInfo.Allocatable.EphemeralStorage > 0 { - nodeRequestResource = append(nodeRequestResource, v1.ResourceEphemeralStorage) - } - for rName, rQuant := range podRequest.ScalarResources { if rQuant > 0 { podRequestResource = append(podRequestResource, rName) } } - for rName, rQuant := range nodeInfo.Allocatable.ScalarResources { - if rQuant > 0 { - nodeRequestResource = append(nodeRequestResource, rName) - } - } - - return podRequestResource, nodeRequestResource + return podRequestResource } func getPreScoreState(cycleState *framework.CycleState) (*preScoreState, error) { diff --git a/pkg/scheduler/plugins/noderesourcefitplus/node_resources_fit_plus_test.go b/pkg/scheduler/plugins/noderesourcefitplus/node_resources_fit_plus_test.go index 17e344893..119b70916 100644 --- a/pkg/scheduler/plugins/noderesourcefitplus/node_resources_fit_plus_test.go +++ b/pkg/scheduler/plugins/noderesourcefitplus/node_resources_fit_plus_test.go @@ -260,7 +260,7 @@ func TestPlugin_Score(t *testing.T) { scoreNode1, _ := p.(*Plugin).Score(context.TODO(), cycleState, pod, "testNode1") scoreNode2, _ := p.(*Plugin).Score(context.TODO(), cycleState, pod, "testNode2") if scoreNode1 <= scoreNode2 { - t.Fatal("scoreNode1 must <= scoreNode2") + t.Fatal("scoreNode1 must more than scoreNode2") } } diff --git a/pkg/scheduler/plugins/scarceresourceavoidance/scarce_resource_avoidance.go b/pkg/scheduler/plugins/scarceresourceavoidance/scarce_resource_avoidance.go index f1d2fa437..6d4bc445f 100644 --- a/pkg/scheduler/plugins/scarceresourceavoidance/scarce_resource_avoidance.go +++ b/pkg/scheduler/plugins/scarceresourceavoidance/scarce_resource_avoidance.go @@ -22,6 +22,7 @@ import ( v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/runtime" + quotav1 "k8s.io/apiserver/pkg/quota/v1" "k8s.io/kubernetes/pkg/api/v1/resource" "k8s.io/kubernetes/pkg/scheduler/framework" @@ -45,16 +46,15 @@ type Plugin struct { } func New(args runtime.Object, handle framework.Handle) (framework.Plugin, error) { - - sampleArgs2, ok := args.(*config.ScarceResourceAvoidanceArgs) + scarceResourceAvoidanceArgs, ok := args.(*config.ScarceResourceAvoidanceArgs) if !ok { - return nil, fmt.Errorf("want args to be of type ResourceTypesArgs, got %T", args) + return nil, fmt.Errorf("want args to be of type ResourceTypesArgs, got %T", scarceResourceAvoidanceArgs) } return &Plugin{ handle: handle, - args: sampleArgs2, + args: scarceResourceAvoidanceArgs, }, nil } @@ -78,8 +78,8 @@ func (s *Plugin) Score(ctx context.Context, state *framework.CycleState, p *v1.P return 0, framework.NewStatus(framework.Error, fmt.Sprintf("get State node %q from PreScore: %v", nodeName, err)) } podRequestResource, nodeAllocatableResource := fitsRequest(scoreState.Resource, nodeInfo) - diffNames := difference(nodeAllocatableResource, podRequestResource) - intersectNames := intersection(diffNames, s.args.Resources) + diffNames := quotav1.Difference(nodeAllocatableResource, podRequestResource) + intersectNames := quotav1.Intersection(diffNames, s.args.Resources) if len(diffNames) == 0 || len(intersectNames) == 0 { return framework.MaxNodeScore, framework.NewStatus(framework.Success, "") @@ -89,39 +89,6 @@ func (s *Plugin) Score(ctx context.Context, state *framework.CycleState, p *v1.P return scores, framework.NewStatus(framework.Success, "") } -func intersection(slice1, slice2 []v1.ResourceName) []v1.ResourceName { - m := make(map[v1.ResourceName]struct{}) - result := []v1.ResourceName{} - - for _, v := range slice2 { - m[v] = struct{}{} - } - - for _, v := range slice1 { - if _, found := m[v]; found { - result = append(result, v) - } - } - - return result -} - -func difference(slice1, slice2 []v1.ResourceName) []v1.ResourceName { - var result []v1.ResourceName - m := make(map[v1.ResourceName]struct{}) - for _, v := range slice2 { - m[v] = struct{}{} - } - - for _, v := range slice1 { - if _, found := m[v]; !found { - result = append(result, v) - } - } - - return result -} - func (p *Plugin) ScoreExtensions() framework.ScoreExtensions { return nil } From ac9e7cb672da3f4047a582d0e32b8eb9edc19f3e Mon Sep 17 00:00:00 2001 From: LY-today <724102053@qq.com> Date: Mon, 13 Jan 2025 14:26:40 +0800 Subject: [PATCH 4/5] feat: update header Signed-off-by: LY-today <724102053@qq.com> --- .../plugins/noderesourcefitplus/node_resource_fit_plus_utils.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/scheduler/plugins/noderesourcefitplus/node_resource_fit_plus_utils.go b/pkg/scheduler/plugins/noderesourcefitplus/node_resource_fit_plus_utils.go index 900499889..e340d049b 100644 --- a/pkg/scheduler/plugins/noderesourcefitplus/node_resource_fit_plus_utils.go +++ b/pkg/scheduler/plugins/noderesourcefitplus/node_resource_fit_plus_utils.go @@ -1,5 +1,5 @@ /* -Copyright 2022 The Kubernetes Authors. +Copyright 2022 The Koordinator Authors. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. From a3f12facb98f4edde327d7a896679585e19d1284 Mon Sep 17 00:00:00 2001 From: LY-today <724102053@qq.com> Date: Mon, 13 Jan 2025 15:03:37 +0800 Subject: [PATCH 5/5] feat: update log Signed-off-by: LY-today <724102053@qq.com> --- .../plugins/noderesourcefitplus/node_resources_fit_plus.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/scheduler/plugins/noderesourcefitplus/node_resources_fit_plus.go b/pkg/scheduler/plugins/noderesourcefitplus/node_resources_fit_plus.go index fb57d0a63..29a23f5f8 100644 --- a/pkg/scheduler/plugins/noderesourcefitplus/node_resources_fit_plus.go +++ b/pkg/scheduler/plugins/noderesourcefitplus/node_resources_fit_plus.go @@ -87,7 +87,7 @@ func (s *Plugin) Score(ctx context.Context, state *framework.CycleState, p *v1.P scoreState, err := getPreScoreState(state) if err != nil { - return 0, framework.NewStatus(framework.Error, fmt.Sprintf("get State node %q from PreScore: %v", nodeName, err)) + return 0, framework.NewStatus(framework.Error, fmt.Sprintf("get state node %q from PreScore: %v", nodeName, err)) } scores := r.getResourceScore(s.args, scoreState.ResourceName, p, nodeInfo, nodeName)