Skip to content

(WIP)support scheduler plugins #3612

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
package schedulerplugins

import (
"context"
"fmt"

corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/rest"

"sigs.k8s.io/controller-runtime/pkg/builder"
"sigs.k8s.io/controller-runtime/pkg/cache"
"sigs.k8s.io/controller-runtime/pkg/client"

"sigs.k8s.io/scheduler-plugins/apis/scheduling/v1alpha1"

rayv1 "github.com/ray-project/kuberay/ray-operator/apis/ray/v1"
schedulerinterface "github.com/ray-project/kuberay/ray-operator/controllers/ray/batchscheduler/interface"
"github.com/ray-project/kuberay/ray-operator/controllers/ray/utils"
clientscheme "k8s.io/client-go/kubernetes/scheme"
)

const (
SchedulerName string = "kube-scheduler"
KubeSchedulerPodGroupLabelKey string = "scheduling.x-k8s.io/pod-group"
)

type KubeScheduler struct {
cli client.Client
}

type KubeSchedulerFactory struct{}

func GetPluginName() string {
return SchedulerName
}

func (y *KubeScheduler) Name() string {
return GetPluginName()
}

func (y *KubeScheduler) DoBatchSchedulingOnSubmission(ctx context.Context, rc *rayv1.RayCluster) error {
replica := int32(1)
for _, workerGroup := range rc.Spec.WorkerGroupSpecs {
if workerGroup.Replicas == nil {
continue
}
replica += *workerGroup.Replicas
}

podGroup := &v1alpha1.PodGroup{
ObjectMeta: metav1.ObjectMeta{},
Spec: v1alpha1.PodGroupSpec{
MinMember: replica,
MinResources: utils.CalculateMinResources(rc),
},
}

return y.cli.Create(ctx, podGroup)
}

// AddMetadataToPod adds essential labels and annotations to the Ray pods
// the yunikorn scheduler needs these labels and annotations in order to do the scheduling properly
func (y *KubeScheduler) AddMetadataToPod(ctx context.Context, app *rayv1.RayCluster, groupName string, pod *corev1.Pod) {
// when gang scheduling is enabled, extra annotations need to be added to all pods
if y.isGangSchedulingEnabled(app) {
// the group name for the head and each of the worker group should be different
pod.Annotations[KubeSchedulerPodGroupLabelKey] = groupName
}
}

func (y *KubeScheduler) isGangSchedulingEnabled(app *rayv1.RayCluster) bool {
_, exist := app.Labels[utils.RayClusterGangSchedulingEnabled]
return exist
}

func (yf *KubeSchedulerFactory) New(ctx context.Context, c *rest.Config) (schedulerinterface.BatchScheduler, error) {
scheme := runtime.NewScheme()
_ = clientscheme.AddToScheme(scheme)
_ = v1alpha1.AddToScheme(scheme)
ccache, err := cache.New(c, cache.Options{
Scheme: scheme,
})
if err != nil {
return nil, err
}
go ccache.Start(ctx)
if !ccache.WaitForCacheSync(ctx) {
return nil, fmt.Errorf("failed to sync cache")
}
cli, err := client.New(c, client.Options{
Scheme: scheme,
Cache: &client.CacheOptions{
Reader: ccache,
},
})
if err != nil {
return nil, err
}
return &KubeScheduler{
cli: cli,
}, nil
}

func (yf *KubeSchedulerFactory) AddToScheme(sche *runtime.Scheme) {
// No extra scheme needs to be registered
_ = v1alpha1.AddToScheme(sche)
}

func (yf *KubeSchedulerFactory) ConfigureReconciler(b *builder.Builder) *builder.Builder {
return b
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (

configapi "github.com/ray-project/kuberay/ray-operator/apis/config/v1alpha1"
schedulerinterface "github.com/ray-project/kuberay/ray-operator/controllers/ray/batchscheduler/interface"
schedulerplugins "github.com/ray-project/kuberay/ray-operator/controllers/ray/batchscheduler/scheduler-plugins"
"github.com/ray-project/kuberay/ray-operator/controllers/ray/batchscheduler/volcano"
"github.com/ray-project/kuberay/ray-operator/controllers/ray/batchscheduler/yunikorn"
)
Expand Down Expand Up @@ -58,6 +59,8 @@ func getSchedulerFactory(rayConfigs configapi.Configuration) (schedulerinterface
factory = &volcano.VolcanoBatchSchedulerFactory{}
case yunikorn.GetPluginName():
factory = &yunikorn.YuniKornSchedulerFactory{}
case schedulerplugins.GetPluginName():
factory = &schedulerplugins.KubeSchedulerFactory{}
default:
return nil, fmt.Errorf("the scheduler is not supported, name=%s", rayConfigs.BatchScheduler)
}
Expand Down
8 changes: 5 additions & 3 deletions ray-operator/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -21,16 +21,18 @@ require (
go.uber.org/mock v0.5.2
go.uber.org/zap v1.27.0
gopkg.in/natefinch/lumberjack.v2 v2.2.1
k8s.io/api v0.33.0
k8s.io/api v0.33.1
k8s.io/apiextensions-apiserver v0.33.0
k8s.io/apimachinery v0.33.0
k8s.io/apimachinery v0.33.1
k8s.io/apiserver v0.33.0
k8s.io/client-go v0.33.0
k8s.io/client-go v0.33.1
k8s.io/code-generator v0.33.0
k8s.io/component-base v0.33.0
k8s.io/component-helpers v0.33.1
k8s.io/klog/v2 v2.130.1
k8s.io/utils v0.0.0-20241210054802-24370beab758
sigs.k8s.io/controller-runtime v0.20.4
sigs.k8s.io/scheduler-plugins v0.31.8
sigs.k8s.io/structured-merge-diff/v4 v4.6.0
sigs.k8s.io/yaml v1.4.0
volcano.sh/apis v1.11.0
Expand Down
16 changes: 10 additions & 6 deletions ray-operator/go.sum

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading