Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP: Implement DRA support in Cluster Autoscaler #7350

Closed
wants to merge 21 commits into from
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
4992123
DRA: extract interacting with the scheduler framework out of Predicat…
towca Sep 26, 2024
2d55ff2
DRA: introduce internal NodeInfo/PodInfo with DRA objects attached
towca Sep 27, 2024
7c1f8d5
DRA: migrate all of CA to use the new internal NodeInfo/PodInfo
towca Sep 27, 2024
dfd0234
DRA: remove AddNodeWithPods from ClusterSnapshot, replace uses with A…
towca Sep 30, 2024
fafb78a
DRA: add Initialize to ClusterSnapshot, remove AddNodes
towca Sep 30, 2024
c249f46
DRA: remove redundant IsPVCUsedByPods from ClusterSnapshot
towca Sep 30, 2024
f876a51
DRA: remove AddNode from ClusterSnapshot
towca Sep 30, 2024
bb87555
DRA: refactor utils related to NodeInfos
towca Sep 30, 2024
fad6868
DRA: propagate schedulerframework handle and DRA feature flag to Clus…
towca Sep 30, 2024
26e4787
DRA: Implement a Snapshot of DRA objects, its Provider, and utils
towca Sep 26, 2024
9e32e07
DRA: grab a snapshot of DRA objects and plumb to ClusterSnapshot.Init…
towca Sep 30, 2024
006685c
DRA: propagate DRA objects through NodeInfos in node_info utils
towca Sep 30, 2024
c5edd3b
DRA: rename ClusterSnapshot methods to better reflect their purpose
towca Oct 1, 2024
bdef0a7
DRA: extend ClusterSnapshot.SchedulePod, propagate scheduling state f…
towca Oct 1, 2024
0e055c4
DRA: plumb the DRA snapshot into scheduler framework through ClusterS…
towca Oct 1, 2024
0a11e9c
DRA: implement calculating utilization for DRA resources
towca Oct 1, 2024
ef9d420
DRA: integrate BasicClusterSnapshot with the DRA snapshot
towca Oct 1, 2024
38fb034
DRA: add integration tests
towca Sep 26, 2024
7e70b41
DRA: handle expendable pods using DRA
towca Oct 3, 2024
3544bb4
DRA: handle duplicating unschedulable pods using DRA
towca Oct 4, 2024
2e7eeea
DRA TMP: vendor in the required scheduler framework channges
towca Oct 7, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
70 changes: 70 additions & 0 deletions cluster-autoscaler/dynamicresources/listers.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
/*
Copyright 2024 The Kubernetes Authors.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package dynamicresources
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If it's not too unbearably long, dynamicresourceallocation might be a clearer name for this package.


import (
resourceapi "k8s.io/api/resource/v1alpha3"
"k8s.io/apimachinery/pkg/labels"
resourceapilisters "k8s.io/client-go/listers/resource/v1alpha3"
)

// These listers are introduced for Provider dependency injection.

// providerClaimLister is a subset of ResourceClaimLister needed by the Provider.
type providerClaimLister interface {
List() ([]*resourceapi.ResourceClaim, error)
}

// providerClaimLister is a subset of ResourceSliceLister needed by the Provider.
type providerSliceLister interface {
List() ([]*resourceapi.ResourceSlice, error)
}

// providerClassLister is a subset of DeviceClassLister needed by the Provider.
type providerClassLister interface {
List() ([]*resourceapi.DeviceClass, error)
}

// resourceClaimApiLister implements providerClaimLister using a real ResourceClaimLister listing from the API.
type resourceClaimApiLister struct {
apiLister resourceapilisters.ResourceClaimLister
}

// List lists all ResourceClaims.
func (l *resourceClaimApiLister) List() ([]*resourceapi.ResourceClaim, error) {
return l.apiLister.List(labels.Everything())
}

// resourceSliceApiLister implements providerSliceLister using a real ResourceSliceLister listing from the API.
type resourceSliceApiLister struct {
apiLister resourceapilisters.ResourceSliceLister
}

// List lists all ResourceSlices.
func (l *resourceSliceApiLister) List() (ret []*resourceapi.ResourceSlice, err error) {
return l.apiLister.List(labels.Everything())
}

// deviceClassApiLister implements providerClassLister using a real DeviceClassLister listing from the API.
type deviceClassApiLister struct {
apiLister resourceapilisters.DeviceClassLister
}

// List lists all DeviceClasses.
func (l *deviceClassApiLister) List() (ret []*resourceapi.DeviceClass, err error) {
return l.apiLister.List(labels.Everything())
}
89 changes: 89 additions & 0 deletions cluster-autoscaler/dynamicresources/provider.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
/*
Copyright 2024 The Kubernetes Authors.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package dynamicresources

import (
resourceapi "k8s.io/api/resource/v1alpha3"
"k8s.io/client-go/informers"
)

// Provider provides DRA-related objects.
type Provider struct {
resourceClaims providerClaimLister
resourceSlices providerSliceLister
deviceClasses providerClassLister
}

// NewProviderFromInformers returns a new Provider which uses InformerFactory listers to list the DRA resources.
func NewProviderFromInformers(informerFactory informers.SharedInformerFactory) *Provider {
claims := &resourceClaimApiLister{apiLister: informerFactory.Resource().V1alpha3().ResourceClaims().Lister()}
slices := &resourceSliceApiLister{apiLister: informerFactory.Resource().V1alpha3().ResourceSlices().Lister()}
devices := &deviceClassApiLister{apiLister: informerFactory.Resource().V1alpha3().DeviceClasses().Lister()}
return NewProvider(claims, slices, devices)
}

// NewProvider returns a new Provider which uses the provided listers to list the DRA resources.
func NewProvider(claims providerClaimLister, slices providerSliceLister, classes providerClassLister) *Provider {
return &Provider{
resourceClaims: claims,
resourceSlices: slices,
deviceClasses: classes,
}
}

// Snapshot returns a snapshot of all DRA resources at a ~single point in time.
func (p *Provider) Snapshot() (Snapshot, error) {
claims, err := p.resourceClaims.List()
if err != nil {
return Snapshot{}, err
}
claimMap := make(map[resourceClaimRef]*resourceapi.ResourceClaim)
for _, claim := range claims {
claimMap[resourceClaimRef{Name: claim.Name, Namespace: claim.Namespace}] = claim
}

slices, err := p.resourceSlices.List()

if err != nil {
return Snapshot{}, err
}
slicesMap := make(map[string][]*resourceapi.ResourceSlice)
var nonNodeLocalSlices []*resourceapi.ResourceSlice
for _, slice := range slices {
if slice.Spec.NodeName == "" {
nonNodeLocalSlices = append(nonNodeLocalSlices, slice)
} else {
slicesMap[slice.Spec.NodeName] = append(slicesMap[slice.Spec.NodeName], slice)
}
}

classes, err := p.deviceClasses.List()
if err != nil {
return Snapshot{}, err
}
classMap := make(map[string]*resourceapi.DeviceClass)
for _, class := range classes {
classMap[class.Name] = class
}

return Snapshot{
resourceClaimsByRef: claimMap,
resourceSlicesByNodeName: slicesMap,
nonNodeLocalResourceSlices: nonNodeLocalSlices,
deviceClasses: classMap,
}, nil
}
23 changes: 23 additions & 0 deletions cluster-autoscaler/dynamicresources/provider_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
/*
Copyright 2024 The Kubernetes Authors.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package dynamicresources

import "testing"

func TestProviderSnapshot(t *testing.T) {
// TODO(DRA): Write.
}
123 changes: 123 additions & 0 deletions cluster-autoscaler/dynamicresources/resource_claim_utils.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
/*
Copyright 2024 The Kubernetes Authors.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package dynamicresources

import (
"fmt"

apiv1 "k8s.io/api/core/v1"
resourceapi "k8s.io/api/resource/v1alpha3"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/utils/ptr"
)

// ClaimOwningPod returns the name and UID of the Pod owner of the provided claim. If the claim isn't
// owned by a Pod, empty strings are returned.
func ClaimOwningPod(claim *resourceapi.ResourceClaim) (string, types.UID) {
for _, owner := range claim.OwnerReferences {
if ptr.Deref(owner.Controller, false) &&
owner.APIVersion == "v1" &&
owner.Kind == "Pod" {
return owner.Name, owner.UID
}
}
return "", ""
}

// ClaimAllocated returns whether the provided claim is allocated.
func ClaimAllocated(claim *resourceapi.ResourceClaim) bool {
return claim.Status.Allocation != nil
}

// ClaimInUse returns whether the provided claim is currently reserved for any consumer.
func ClaimInUse(claim *resourceapi.ResourceClaim) bool {
return len(claim.Status.ReservedFor) > 0
}

// ClaimReservedForPod returns whether the provided claim is currently reserved for the provided Pod.
func ClaimReservedForPod(claim *resourceapi.ResourceClaim, pod *apiv1.Pod) bool {
for _, consumerRef := range claim.Status.ReservedFor {
if claimConsumerReferenceMatchesPod(pod, consumerRef) {
return true
}
}
return false
}

// DeallocateClaimInPlace clears the allocation of the provided claim.
func DeallocateClaimInPlace(claim *resourceapi.ResourceClaim) {
claim.Status.Allocation = nil
}

// ClearPodReservationInPlace clears the reservation for the provided pod in the provided Claim. It is a no-op
// if the claim isn't reserved for the Pod.
func ClearPodReservationInPlace(claim *resourceapi.ResourceClaim, pod *apiv1.Pod) {
newReservedFor := make([]resourceapi.ResourceClaimConsumerReference, 0, len(claim.Status.ReservedFor))
for _, consumerRef := range claim.Status.ReservedFor {
if claimConsumerReferenceMatchesPod(pod, consumerRef) {
continue
}
newReservedFor = append(newReservedFor, consumerRef)
}
claim.Status.ReservedFor = newReservedFor
}

// AddPodReservationIfNeededInPlace adds a reservation for the provided pod to the provided Claim. It is a no-op
// if the claim is already reserved for the Pod. Error is returned if the claim already has the maximum number of
// reservations.
func AddPodReservationIfNeededInPlace(claim *resourceapi.ResourceClaim, pod *apiv1.Pod) error {
alreadyReservedForPod := false
for _, consumerRef := range claim.Status.ReservedFor {
if claimConsumerReferenceMatchesPod(pod, consumerRef) {
alreadyReservedForPod = true
break
}
}
if !alreadyReservedForPod {
if len(claim.Status.ReservedFor) >= resourceapi.ResourceClaimReservedForMaxSize {
return fmt.Errorf("claim already reserved for %d consumers, can't add more", len(claim.Status.ReservedFor))
}
claim.Status.ReservedFor = append(claim.Status.ReservedFor, podClaimConsumerReference(pod))
}
return nil
}

func claimConsumerReferenceMatchesPod(pod *apiv1.Pod, ref resourceapi.ResourceClaimConsumerReference) bool {
return ref.APIGroup == "" && ref.Resource == "pods" && ref.Name == pod.Name && ref.UID == pod.UID
}

func podClaimConsumerReference(pod *apiv1.Pod) resourceapi.ResourceClaimConsumerReference {
return resourceapi.ResourceClaimConsumerReference{
Name: pod.Name,
UID: pod.UID,
Resource: "pods",
APIGroup: "",
}
}

func podClaimOwnerReference(pod *apiv1.Pod) metav1.OwnerReference {
truePtr := true
return metav1.OwnerReference{
APIVersion: "v1",
Kind: "Pod",
Name: pod.Name,
UID: pod.UID,
BlockOwnerDeletion: &truePtr,
Controller: &truePtr,
}
}
19 changes: 19 additions & 0 deletions cluster-autoscaler/dynamicresources/resource_claim_utils_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
/*
Copyright 2024 The Kubernetes Authors.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package dynamicresources

// TODO(DRA): Write util tests.
Loading