Skip to content

Commit

Permalink
feat: property-based scheduling: add support for property-based sched…
Browse files Browse the repository at this point in the history
…uling in the scheduler member cluster watcher (Azure#714)
  • Loading branch information
michaelawyu authored Mar 21, 2024
1 parent 22d4e5f commit 747406e
Show file tree
Hide file tree
Showing 2 changed files with 252 additions and 4 deletions.
46 changes: 42 additions & 4 deletions pkg/scheduler/watchers/membercluster/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"reflect"
"time"

"k8s.io/apimachinery/pkg/api/equality"
"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/types"
"k8s.io/klog/v2"
Expand Down Expand Up @@ -55,15 +56,17 @@ func (r *Reconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Resu
//
// It may happen for 2 reasons:
//
// a) the cluster setting, specifically its labels, has changed; and/or
// a) the cluster's setup (e.g., its labels) or status (e.g., resource/non-resource properties),
// has changed; and/or
// b) an unexpected development which originally leads the scheduler to disregard the cluster
// (e.g., agents not joining, network partition, etc.) has been resolved.
//
// 2. a cluster, originally eligible for resource placement, becomes ineligible for some reason.
//
// Similarly, it may happen for 2 reasons:
//
// a) the cluster setting, specifically its labels, has changed; and/or
// a) the cluster's setup (e.g., its labels) or status (e.g., resource/non-resource properties),
// has changed; and/or
// b) an unexpected development (e.g., agents failing, network partition, etc.) has occurred.
// c) the cluster, which may or may not have resources placed on it, has left the fleet (deleting).
//
Expand Down Expand Up @@ -190,20 +193,55 @@ func (r *Reconciler) SetupWithManager(mgr ctrl.Manager) error {
return false
}

// Capture label changes.
//
clusterKObj := klog.KObj(newCluster)
// The cluster is being deleted.
if oldCluster.GetDeletionTimestamp().IsZero() && !newCluster.GetDeletionTimestamp().IsZero() {
klog.V(2).InfoS("A member cluster is leaving the fleet", "memberCluster", clusterKObj)
return true
}

// Capture label changes.
//
// Note that the controller runs only when label changes happen on joined clusters.
if !reflect.DeepEqual(oldCluster.Labels, newCluster.Labels) {
klog.V(2).InfoS("A member cluster label change has been detected", "memberCluster", clusterKObj)
return true
}

// Capture non-resource property changes.
//
// Observation time refreshes is not considered as a change.
oldProperties := oldCluster.Status.Properties
newProperties := newCluster.Status.Properties
if len(oldProperties) != len(newProperties) {
return true
}
for oldK, oldV := range oldProperties {
newV, ok := newProperties[oldK]
if !ok || oldV.Value != newV.Value {
return true
}
}

// Capture resource usage changes.
oldCapacity := oldCluster.Status.ResourceUsage.Capacity
newCapacity := newCluster.Status.ResourceUsage.Capacity
if !equality.Semantic.DeepEqual(oldCapacity, newCapacity) {
return true
}

oldAllocatable := oldCluster.Status.ResourceUsage.Allocatable
newAllocatable := newCluster.Status.ResourceUsage.Allocatable
if !equality.Semantic.DeepEqual(oldAllocatable, newAllocatable) {
return true
}

oldAvailable := oldCluster.Status.ResourceUsage.Available
newAvailable := newCluster.Status.ResourceUsage.Available
if !equality.Semantic.DeepEqual(oldAvailable, newAvailable) {
return true
}

// Check the resource placement eligibility for the old and new cluster object.
oldEligible, _ := r.ClusterEligibilityChecker.IsEligible(oldCluster)
newEligible, _ := r.ClusterEligibilityChecker.IsEligible(newCluster)
Expand Down
210 changes: 210 additions & 0 deletions pkg/scheduler/watchers/membercluster/controller_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ import (
. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"

corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"

Expand All @@ -32,6 +34,10 @@ const (
dummyReason = "dummyReason"
dummyLabel = "dummy-label"
dummyLabelValue = "dummy-label-value"

dummyNonResourcePropertyName = "dummy-non-resource-property"
dummyNonResourcePropertyValue1 = "0"
dummyNonResourcePropertyValue2 = "1"
)

var (
Expand Down Expand Up @@ -150,6 +156,210 @@ var _ = Describe("scheduler member cluster source controller", Serial, Ordered,
})
})

Context("ready cluster has a non-resource property change", func() {
BeforeAll(func() {
Consistently(noKeyEnqueuedActual, consistentlyDuration, consistentlyInterval).Should(Succeed(), "Workqueue is not empty")

// Retrieve the cluster.
memberCluster := &clusterv1beta1.MemberCluster{}
Expect(hubClient.Get(ctx, types.NamespacedName{Name: clusterName1}, memberCluster)).To(Succeed(), "Failed to get member cluster")

// Update the list of non-resource properties.
memberCluster.Status.Properties = map[clusterv1beta1.PropertyName]clusterv1beta1.PropertyValue{
dummyNonResourcePropertyName: {
Value: dummyNonResourcePropertyValue1,
ObservationTime: metav1.NewTime(time.Now()),
},
}
Expect(hubClient.Status().Update(ctx, memberCluster)).Should(Succeed(), "Failed to update member cluster non-resource properties")
})

It("should enqueue CRPs (case 1a)", func() {
Eventually(qualifiedKeysEnqueuedActual, eventuallyDuration, eventuallyInterval).Should(Succeed(), "Keys are not enqueued as expected")
Consistently(qualifiedKeysEnqueuedActual, consistentlyDuration, consistentlyInterval).Should(Succeed(), "Keys are not enqueued as expected")
})

It("can empty the key collector", func() {
keyCollector.Reset()
Eventually(noKeyEnqueuedActual, eventuallyDuration, eventuallyInterval).Should(Succeed(), "Workqueue is not empty")
Consistently(noKeyEnqueuedActual, consistentlyDuration, consistentlyInterval).Should(Succeed(), "Workqueue is not empty")
})

It("can update the property", func() {
// Retrieve the cluster.
memberCluster := &clusterv1beta1.MemberCluster{}
Expect(hubClient.Get(ctx, types.NamespacedName{Name: clusterName1}, memberCluster)).To(Succeed(), "Failed to get member cluster")

// Update the list of non-resource properties.
memberCluster.Status.Properties = map[clusterv1beta1.PropertyName]clusterv1beta1.PropertyValue{
dummyNonResourcePropertyName: {
Value: dummyNonResourcePropertyValue2,
ObservationTime: metav1.NewTime(time.Now()),
},
}
Expect(hubClient.Status().Update(ctx, memberCluster)).Should(Succeed(), "Failed to update member cluster non-resource properties")
})

It("should enqueue CRPs (case 1a)", func() {
Eventually(qualifiedKeysEnqueuedActual, eventuallyDuration, eventuallyInterval).Should(Succeed(), "Keys are not enqueued as expected")
Consistently(qualifiedKeysEnqueuedActual, consistentlyDuration, consistentlyInterval).Should(Succeed(), "Keys are not enqueued as expected")
})

AfterAll(func() {
keyCollector.Reset()
})
})

Context("ready cluster has a total capacity change", func() {
BeforeAll(func() {
Consistently(noKeyEnqueuedActual, consistentlyDuration, consistentlyInterval).Should(Succeed(), "Workqueue is not empty")

// Retrieve the cluster.
memberCluster := &clusterv1beta1.MemberCluster{}
Expect(hubClient.Get(ctx, types.NamespacedName{Name: clusterName1}, memberCluster)).To(Succeed(), "Failed to get member cluster")

// Update the list of non-resource properties.
memberCluster.Status.ResourceUsage.Capacity = corev1.ResourceList{
corev1.ResourceCPU: resource.MustParse("1000m"),
corev1.ResourceMemory: resource.MustParse("1Gi"),
}
Expect(hubClient.Status().Update(ctx, memberCluster)).Should(Succeed(), "Failed to update member cluster non-resource properties")
})

It("should enqueue CRPs (case 1a)", func() {
Eventually(qualifiedKeysEnqueuedActual, eventuallyDuration, eventuallyInterval).Should(Succeed(), "Keys are not enqueued as expected")
Consistently(qualifiedKeysEnqueuedActual, consistentlyDuration, consistentlyInterval).Should(Succeed(), "Keys are not enqueued as expected")
})

It("can empty the key collector", func() {
keyCollector.Reset()
Eventually(noKeyEnqueuedActual, eventuallyDuration, eventuallyInterval).Should(Succeed(), "Workqueue is not empty")
Consistently(noKeyEnqueuedActual, consistentlyDuration, consistentlyInterval).Should(Succeed(), "Workqueue is not empty")
})

It("can update the total capacity", func() {
// Retrieve the cluster.
memberCluster := &clusterv1beta1.MemberCluster{}
Expect(hubClient.Get(ctx, types.NamespacedName{Name: clusterName1}, memberCluster)).To(Succeed(), "Failed to get member cluster")

// Update the list of non-resource properties.
memberCluster.Status.ResourceUsage.Capacity = corev1.ResourceList{
corev1.ResourceCPU: resource.MustParse("2000m"),
corev1.ResourceMemory: resource.MustParse("2Gi"),
}
Expect(hubClient.Status().Update(ctx, memberCluster)).Should(Succeed(), "Failed to update member cluster non-resource properties")
})

It("should enqueue CRPs (case 1a)", func() {
Eventually(qualifiedKeysEnqueuedActual, eventuallyDuration, eventuallyInterval).Should(Succeed(), "Keys are not enqueued as expected")
Consistently(qualifiedKeysEnqueuedActual, consistentlyDuration, consistentlyInterval).Should(Succeed(), "Keys are not enqueued as expected")
})

AfterAll(func() {
keyCollector.Reset()
})
})

Context("ready cluster has an allocatable capacity change", func() {
BeforeAll(func() {
Consistently(noKeyEnqueuedActual, consistentlyDuration, consistentlyInterval).Should(Succeed(), "Workqueue is not empty")

// Retrieve the cluster.
memberCluster := &clusterv1beta1.MemberCluster{}
Expect(hubClient.Get(ctx, types.NamespacedName{Name: clusterName1}, memberCluster)).To(Succeed(), "Failed to get member cluster")

// Update the list of non-resource properties.
memberCluster.Status.ResourceUsage.Allocatable = corev1.ResourceList{
corev1.ResourceCPU: resource.MustParse("1000m"),
corev1.ResourceMemory: resource.MustParse("1Gi"),
}
Expect(hubClient.Status().Update(ctx, memberCluster)).Should(Succeed(), "Failed to update member cluster non-resource properties")
})

It("should enqueue CRPs (case 1a)", func() {
Eventually(qualifiedKeysEnqueuedActual, eventuallyDuration, eventuallyInterval).Should(Succeed(), "Keys are not enqueued as expected")
Consistently(qualifiedKeysEnqueuedActual, consistentlyDuration, consistentlyInterval).Should(Succeed(), "Keys are not enqueued as expected")
})

It("can empty the key collector", func() {
keyCollector.Reset()
Eventually(noKeyEnqueuedActual, eventuallyDuration, eventuallyInterval).Should(Succeed(), "Workqueue is not empty")
Consistently(noKeyEnqueuedActual, consistentlyDuration, consistentlyInterval).Should(Succeed(), "Workqueue is not empty")
})

It("can update the allocatable capacity", func() {
// Retrieve the cluster.
memberCluster := &clusterv1beta1.MemberCluster{}
Expect(hubClient.Get(ctx, types.NamespacedName{Name: clusterName1}, memberCluster)).To(Succeed(), "Failed to get member cluster")

// Update the list of non-resource properties.
memberCluster.Status.ResourceUsage.Allocatable = corev1.ResourceList{
corev1.ResourceCPU: resource.MustParse("2000m"),
corev1.ResourceMemory: resource.MustParse("2Gi"),
}
Expect(hubClient.Status().Update(ctx, memberCluster)).Should(Succeed(), "Failed to update member cluster non-resource properties")
})

It("should enqueue CRPs (case 1a)", func() {
Eventually(qualifiedKeysEnqueuedActual, eventuallyDuration, eventuallyInterval).Should(Succeed(), "Keys are not enqueued as expected")
Consistently(qualifiedKeysEnqueuedActual, consistentlyDuration, consistentlyInterval).Should(Succeed(), "Keys are not enqueued as expected")
})

AfterAll(func() {
keyCollector.Reset()
})
})

Context("ready cluster has an available capacity change", func() {
BeforeAll(func() {
Consistently(noKeyEnqueuedActual, consistentlyDuration, consistentlyInterval).Should(Succeed(), "Workqueue is not empty")

// Retrieve the cluster.
memberCluster := &clusterv1beta1.MemberCluster{}
Expect(hubClient.Get(ctx, types.NamespacedName{Name: clusterName1}, memberCluster)).To(Succeed(), "Failed to get member cluster")

// Update the list of non-resource properties.
memberCluster.Status.ResourceUsage.Available = corev1.ResourceList{
corev1.ResourceCPU: resource.MustParse("1000m"),
corev1.ResourceMemory: resource.MustParse("1Gi"),
}
Expect(hubClient.Status().Update(ctx, memberCluster)).Should(Succeed(), "Failed to update member cluster non-resource properties")
})

It("should enqueue CRPs (case 1a)", func() {
Eventually(qualifiedKeysEnqueuedActual, eventuallyDuration, eventuallyInterval).Should(Succeed(), "Keys are not enqueued as expected")
Consistently(qualifiedKeysEnqueuedActual, consistentlyDuration, consistentlyInterval).Should(Succeed(), "Keys are not enqueued as expected")
})

It("can empty the key collector", func() {
keyCollector.Reset()
Eventually(noKeyEnqueuedActual, eventuallyDuration, eventuallyInterval).Should(Succeed(), "Workqueue is not empty")
Consistently(noKeyEnqueuedActual, consistentlyDuration, consistentlyInterval).Should(Succeed(), "Workqueue is not empty")
})

It("can update the available capacity", func() {
// Retrieve the cluster.
memberCluster := &clusterv1beta1.MemberCluster{}
Expect(hubClient.Get(ctx, types.NamespacedName{Name: clusterName1}, memberCluster)).To(Succeed(), "Failed to get member cluster")

// Update the list of non-resource properties.
memberCluster.Status.ResourceUsage.Available = corev1.ResourceList{
corev1.ResourceCPU: resource.MustParse("2000m"),
corev1.ResourceMemory: resource.MustParse("2Gi"),
}
Expect(hubClient.Status().Update(ctx, memberCluster)).Should(Succeed(), "Failed to update member cluster non-resource properties")
})

It("should enqueue CRPs (case 1a)", func() {
Eventually(qualifiedKeysEnqueuedActual, eventuallyDuration, eventuallyInterval).Should(Succeed(), "Keys are not enqueued as expected")
Consistently(qualifiedKeysEnqueuedActual, consistentlyDuration, consistentlyInterval).Should(Succeed(), "Keys are not enqueued as expected")
})

AfterAll(func() {
keyCollector.Reset()
})
})

Context("ready cluster is out of sync", func() {
BeforeAll(func() {
Consistently(noKeyEnqueuedActual, consistentlyDuration, consistentlyInterval).Should(Succeed(), "Workqueue is not empty")
Expand Down

0 comments on commit 747406e

Please sign in to comment.