Skip to content

Commit

Permalink
Handle v1alpha2 version of queue in kube-batch
Browse files Browse the repository at this point in the history
  • Loading branch information
thandayuthapani committed Jul 4, 2019
1 parent f4e9918 commit 10b96b8
Show file tree
Hide file tree
Showing 8 changed files with 292 additions and 28 deletions.
39 changes: 39 additions & 0 deletions deployment/kube-batch/templates/scheduling_v1alpha2_queue.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
apiVersion: apiextensions.k8s.io/v1beta1
kind: CustomResourceDefinition
metadata:
name: queues.scheduling.sigs.dev
spec:
group: scheduling.sigs.dev
names:
kind: Queue
plural: queues
scope: Cluster
validation:
openAPIV3Schema:
properties:
apiVersion:
type: string
kind:
type: string
metadata:
type: object
spec:
properties:
weight:
format: int32
type: integer
type: object
status:
properties:
unknown:
format: int32
type: integer
pending:
format: int32
type: integer
running:
format: int32
type: integer
type: object
type: object
version: v1alpha2
2 changes: 1 addition & 1 deletion hack/run-e2e-kind.sh
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ function kube-batch-up {
kubectl create -f deployment/kube-batch/templates/scheduling_v1alpha1_queue.yaml
kubectl create -f deployment/kube-batch/templates/scheduling_v1alpha1_podgroup.yaml
kubectl create -f deployment/kube-batch/templates/scheduling_v1alpha2_podgroup.yaml
kubectl create -f deployment/kube-batch/templates/scheduling_v1alpha2_queue.yaml
kubectl create -f deployment/kube-batch/templates/default.yaml

# start kube-batch
Expand All @@ -87,4 +88,3 @@ kube-batch-up

cd ${ROOT_DIR}
go test ./test/e2e -v -timeout 30m

2 changes: 1 addition & 1 deletion pkg/scheduler/actions/allocate/allocate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,7 @@ func TestAllocate(t *testing.T) {
}

for _, q := range test.queues {
schedulerCache.AddQueue(q)
schedulerCache.AddQueuev1alpha1(q)
}

trueValue := true
Expand Down
2 changes: 1 addition & 1 deletion pkg/scheduler/actions/preempt/preempt_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ func TestPreempt(t *testing.T) {
}

for _, q := range test.queues {
schedulerCache.AddQueue(q)
schedulerCache.AddQueuev1alpha1(q)
}

trueValue := true
Expand Down
2 changes: 1 addition & 1 deletion pkg/scheduler/actions/reclaim/reclaim_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ func TestReclaim(t *testing.T) {
}

for _, q := range test.queues {
schedulerCache.AddQueue(q)
schedulerCache.AddQueuev1alpha1(q)
}

trueValue := true
Expand Down
51 changes: 48 additions & 3 deletions pkg/scheduler/api/queue_info.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,56 @@ limitations under the License.
package api

import (
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
)

const (
//QueueVersionV1Alpha1 represents PodGroupVersion of V1Alpha1
QueueVersionV1Alpha1 string = "v1alpha1"

arbcorev1 "github.com/kubernetes-sigs/kube-batch/pkg/apis/scheduling/v1alpha1"
//QueueVersionV1Alpha2 represents PodGroupVersion of V1Alpha2
QueueVersionV1Alpha2 string = "v1alpha2"
)

// Queue is a queue of PodGroup.
type Queue struct {
metav1.TypeMeta `json:",inline"`
// Standard object's metadata.
// More info: https://git.k8s.io/community/contributors/devel/api-conventions.md#metadata
// +optional
metav1.ObjectMeta `json:"metadata,omitempty" protobuf:"bytes,1,opt,name=metadata"`

// Specification of the desired behavior of the queue.
// More info: https://git.k8s.io/community/contributors/devel/api-conventions.md#spec-and-status
// +optional
Spec QueueSpec `json:"spec,omitempty" protobuf:"bytes,2,opt,name=spec"`

// The status of queue.
// +optional
Status QueueStatus `json:"status,omitempty" protobuf:"bytes,3,opt,name=status"`

//Version is used to retrieve information about queue version
Version string
}

// QueueStatus represents the status of Queue.
type QueueStatus struct {
// The number of 'Unknonw' PodGroup in this queue.
Unknown int32 `json:"unknown,omitempty" protobuf:"bytes,1,opt,name=unknown"`
// The number of 'Pending' PodGroup in this queue.
Pending int32 `json:"pending,omitempty" protobuf:"bytes,2,opt,name=pending"`
// The number of 'Running' PodGroup in this queue.
Running int32 `json:"running,omitempty" protobuf:"bytes,3,opt,name=running"`
}

// QueueSpec represents the template of Queue.
type QueueSpec struct {
Weight int32 `json:"weight,omitempty" protobuf:"bytes,1,opt,name=weight"`
Capability v1.ResourceList `json:"capability,omitempty" protobuf:"bytes,2,opt,name=capability"`
}

// QueueID is UID type, serves as unique ID for each queue
type QueueID types.UID

Expand All @@ -32,11 +77,11 @@ type QueueInfo struct {

Weight int32

Queue *arbcorev1.Queue
Queue *Queue
}

// NewQueueInfo creates new queueInfo object
func NewQueueInfo(queue *arbcorev1.Queue) *QueueInfo {
func NewQueueInfo(queue *Queue) *QueueInfo {
return &QueueInfo{
UID: QueueID(queue.Name),
Name: queue.Name,
Expand Down
29 changes: 20 additions & 9 deletions pkg/scheduler/cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,8 @@ type SchedulerCache struct {
nsInformer infov1.NamespaceInformer
podGroupInformerv1alpha1 kbinfov1.PodGroupInformer
podGroupInformerv1alpha2 kbinfov2.PodGroupInformer
queueInformer kbinfov1.QueueInformer
queueInformerv1alpha1 kbinfov1.QueueInformer
queueInformerv1alpha2 kbinfov2.QueueInformer
pvInformer infov1.PersistentVolumeInformer
pvcInformer infov1.PersistentVolumeClaimInformer
scInformer storagev1.StorageClassInformer
Expand Down Expand Up @@ -331,12 +332,20 @@ func newSchedulerCache(config *rest.Config, schedulerName string, defaultQueue s
DeleteFunc: sc.DeletePodGroupAlpha2,
})

// create informer for Queue information
sc.queueInformer = kbinformer.Scheduling().V1alpha1().Queues()
sc.queueInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: sc.AddQueue,
UpdateFunc: sc.UpdateQueue,
DeleteFunc: sc.DeleteQueue,
// create informer for Queue(v1alpha1) information
sc.queueInformerv1alpha1 = kbinformer.Scheduling().V1alpha1().Queues()
sc.queueInformerv1alpha1.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: sc.AddQueuev1alpha1,
UpdateFunc: sc.UpdateQueuev1alpha1,
DeleteFunc: sc.DeleteQueuev1alpha1,
})

// create informer for Queue(v1alpha2) information
sc.queueInformerv1alpha2 = kbinformer.Scheduling().V1alpha2().Queues()
sc.queueInformerv1alpha2.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: sc.AddQueuev1alpha2,
UpdateFunc: sc.UpdateQueuev1alpha2,
DeleteFunc: sc.DeleteQueuev1alpha2,
})

return sc
Expand All @@ -352,7 +361,8 @@ func (sc *SchedulerCache) Run(stopCh <-chan struct{}) {
go sc.pvInformer.Informer().Run(stopCh)
go sc.pvcInformer.Informer().Run(stopCh)
go sc.scInformer.Informer().Run(stopCh)
go sc.queueInformer.Informer().Run(stopCh)
go sc.queueInformerv1alpha1.Informer().Run(stopCh)
go sc.queueInformerv1alpha2.Informer().Run(stopCh)

if options.ServerOpts.EnablePriorityClass {
go sc.pcInformer.Informer().Run(stopCh)
Expand All @@ -379,7 +389,8 @@ func (sc *SchedulerCache) WaitForCacheSync(stopCh <-chan struct{}) bool {
sc.pvInformer.Informer().HasSynced,
sc.pvcInformer.Informer().HasSynced,
sc.scInformer.Informer().HasSynced,
sc.queueInformer.Informer().HasSynced,
sc.queueInformerv1alpha1.Informer().HasSynced,
sc.queueInformerv1alpha2.Informer().HasSynced,
}
if options.ServerOpts.EnablePriorityClass {
informerSynced = append(informerSynced, sc.pcInformer.Informer().HasSynced)
Expand Down
Loading

0 comments on commit 10b96b8

Please sign in to comment.