Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Updating annotations of pods belonging Ray cluster in order to adopting Yunikorn Gang scheduling #5594

Draft
wants to merge 30 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from 29 commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
package config

import (
schedulerConfig "github.com/flyteorg/flyte/flyteplugins/go/tasks/plugins/k8s/batchscheduler"
"time"

v1 "k8s.io/api/core/v1"
Expand Down Expand Up @@ -206,6 +207,8 @@ type K8sPluginConfig struct {

// SendObjectEvents indicates whether to send k8s object events in TaskExecutionEvent updates (similar to kubectl get events).
SendObjectEvents bool `json:"send-object-events" pflag:",If true, will send k8s object events in TaskExecutionEvent updates."`

BatchScheduler schedulerConfig.Config `json:"batchScheduler,omitempty"`
}

// FlyteCoPilotConfig specifies configuration for the Flyte CoPilot system. FlyteCoPilot, allows running flytekit-less containers
Expand Down
1 change: 0 additions & 1 deletion flyteplugins/go/tasks/pluginmachinery/k8s/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package k8s

import (
"context"

"sigs.k8s.io/controller-runtime/pkg/client"

pluginsCore "github.com/flyteorg/flyte/flyteplugins/go/tasks/pluginmachinery/core"
Expand Down

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

21 changes: 21 additions & 0 deletions flyteplugins/go/tasks/plugins/k8s/batchscheduler/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package batchscheduler

type Config struct {
Scheduler string `json:"scheduler,omitempty" pflag:", Specify batch scheduler to"`
Parameters string `json:"parameters,omitempty" pflag:", Specify static parameters"`
}

func NewConfig() Config {
return Config{
Scheduler: "",
Parameters: "",
}
}

func (b *Config) GetScheduler() string {
return b.Scheduler
}

func (b *Config) GetParameters() string {
return b.Parameters
}
15 changes: 15 additions & 0 deletions flyteplugins/go/tasks/plugins/k8s/batchscheduler/config_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package batchscheduler

import (
"testing"

"github.com/stretchr/testify/assert"
)

func TestNewConfig(t *testing.T) {
t.Run("New scheduler plugin config", func(t *testing.T) {
config := NewConfig()
assert.Equal(t, "", config.GetScheduler())
assert.Equal(t, "", config.GetParameters())
})
}
23 changes: 23 additions & 0 deletions flyteplugins/go/tasks/plugins/k8s/batchscheduler/plugins.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package batchscheduler
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
package batchscheduler
// Flyte often deals with complex, distributed workloads that may require advanced scheduling features. Native Kubernetes scheduling may not suffice for scenarios that involve job-level scheduling policies like gang scheduling or job preemption. The SchedulerManager interface provides a flexible mechanism for integrating such schedulers into Flyte’s plugin system by allowing mutation of the scheduling-related properties of Kubernetes objects.
package batchscheduler

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

also how about we call this scheduler?


import (
"context"
"github.com/flyteorg/flyte/flyteplugins/go/tasks/plugins/k8s/batchscheduler/scheduler"
"github.com/flyteorg/flyte/flyteplugins/go/tasks/plugins/k8s/batchscheduler/scheduler/yunikorn"
"sigs.k8s.io/controller-runtime/pkg/client"
)

type SchedulerManager interface {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about docs that say,something to this effect)

Suggested change
type SchedulerManager interface {
// SchedulerManager is an interface that allows plugging in custom schedulers
// such as Yunikorn, Kueue, or any other scheduler that manages pod scheduling
// with advanced features like gang scheduling, preemption, and priority.
type SchedulerManager interface {

// Mutate is responsible for mutating the object to be scheduled.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Lets explain that the Mutate method will mutate the object in place.

// It will add the necessary annotations, labels, etc. to the object.
Mutate(ctx context.Context, object client.Object) error
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
Mutate(ctx context.Context, object client.Object) error
// Mutate allows the custom scheduler to modify the given Kubernetes object (e.g., a Pod or
// a custom resource representing a job) before it is submitted to the Kubernetes API.
// This may include adding annotations, labels, or modifying fields related to scheduling.
// The object is typically a Kubernetes Pod or a custom resource.
// It returns an error if the mutation fails.
Mutate(ctx context.Context, object client.Object) error

}

func NewSchedulerManager(cfg *Config) SchedulerManager {
switch cfg.GetScheduler() {
case yunikorn.Yunikorn:
return yunikorn.NewSchedulerManager(cfg)
default:
return scheduler.NewNoopSchedulerManager()
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package scheduler
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Lets add some module level comments that will help in generated docs


import (
"context"
"sigs.k8s.io/controller-runtime/pkg/client"
)

type NoopSchedulerManager struct{}

func NewNoopSchedulerManager() *NoopSchedulerManager {
return &NoopSchedulerManager{}
}

func (p *NoopSchedulerManager) Mutate(ctx context.Context, object client.Object) error {
return nil
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
package yunikorn

import (
"encoding/json"
rayv1 "github.com/ray-project/kuberay/ray-operator/apis/ray/v1"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
)

func MutateRayJob(parameters string, app *rayv1.RayJob) error {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Lets add some comments explaining what this does?

appID := GenerateTaskGroupAppID()
rayjobSpec := &app.Spec
appSpec := rayjobSpec.RayClusterSpec
TaskGroups := make([]TaskGroup, 1)
for index := range appSpec.WorkerGroupSpecs {
worker := &appSpec.WorkerGroupSpecs[index]
worker.Template.Spec.SchedulerName = Yunikorn
meta := worker.Template.ObjectMeta
spec := worker.Template.Spec
name := GenerateTaskGroupName(false, index)
TaskGroups = append(TaskGroups, TaskGroup{
Name: name,
MinMember: *worker.Replicas,
Labels: meta.Labels,
Annotations: meta.Annotations,
MinResource: Allocation(spec.Containers),
NodeSelector: spec.NodeSelector,
Affinity: spec.Affinity,
TopologySpreadConstraints: spec.TopologySpreadConstraints,
})
meta.Annotations[TaskGroupNameKey] = name
meta.Annotations[AppID] = appID
}
headSpec := &appSpec.HeadGroupSpec
headSpec.Template.Spec.SchedulerName = Yunikorn
meta := headSpec.Template.ObjectMeta
spec := headSpec.Template.Spec
headName := GenerateTaskGroupName(true, 0)
res := Allocation(spec.Containers)
if ok := *appSpec.EnableInTreeAutoscaling; ok {
res2 := v1.ResourceList{
v1.ResourceCPU: resource.MustParse("500m"),
v1.ResourceMemory: resource.MustParse("512Mi"),
}
res = Add(res, res2)
}
TaskGroups[0] = TaskGroup{
Name: headName,
MinMember: 1,
Labels: meta.Labels,
Annotations: meta.Annotations,
MinResource: res,
NodeSelector: spec.NodeSelector,
Affinity: spec.Affinity,
TopologySpreadConstraints: spec.TopologySpreadConstraints,
}
meta.Annotations[TaskGroupNameKey] = headName
info, err := json.Marshal(TaskGroups)
if err != nil {
return err
}
meta.Annotations[TaskGroupsKey] = string(info[:])
meta.Annotations[TaskGroupParameters] = parameters
meta.Annotations[AppID] = appID
return nil
}

func Allocation(containers []v1.Container) v1.ResourceList {
totalResources := v1.ResourceList{}
for _, c := range containers {
for name, q := range c.Resources.Limits {
if _, exists := totalResources[name]; !exists {
totalResources[name] = q.DeepCopy()
continue
}
total := totalResources[name]
total.Add(q)
totalResources[name] = total
}
}
return totalResources
}

func Add(a v1.ResourceList, b v1.ResourceList) v1.ResourceList {
result := a
for name, value := range a {
sum := &value
if value2, ok := b[name]; ok {
sum.Add(value2)
result[name] = *sum
} else {
result[name] = value
}
}
for name, value := range b {
if _, ok := a[name]; !ok {
result[name] = value
}
}
return result
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
package yunikorn

import (
"encoding/json"

v1 "k8s.io/api/core/v1"
)

type TaskGroup struct {
Name string
MinMember int32
Labels map[string]string
Annotations map[string]string
MinResource v1.ResourceList
NodeSelector map[string]string
Tolerations []v1.Toleration
Affinity *v1.Affinity
TopologySpreadConstraints []v1.TopologySpreadConstraint
}

func Marshal(taskGroups []TaskGroup) ([]byte, error) {
info, err := json.Marshal(taskGroups)
return info, err
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
package yunikorn

import (
"testing"

"github.com/stretchr/testify/assert"
)

func TestMarshal(t *testing.T) {
t1 := TaskGroup{
Name: "tg1",
MinMember: int32(1),
Labels: map[string]string{"attr": "value"},
Annotations: map[string]string{"attr": "value"},
MinResource: res,
NodeSelector: map[string]string{"node": "gpunode"},
Tolerations: nil,
Affinity: nil,
TopologySpreadConstraints: nil,
}
t2 := TaskGroup{
Name: "tg2",
MinMember: int32(1),
Labels: map[string]string{"attr": "value"},
Annotations: map[string]string{"attr": "value"},
MinResource: res,
NodeSelector: map[string]string{"node": "gpunode"},
Tolerations: nil,
Affinity: nil,
TopologySpreadConstraints: nil,
}
var tests = []struct {
input []TaskGroup
}{
{input: nil},
{input: []TaskGroup{}},
{input: []TaskGroup{t1}},
{input: []TaskGroup{t1, t2}},
}
t.Run("Serialize task groups", func(t *testing.T) {
for _, tt := range tests {
_, err := Marshal(tt.input)
assert.Nil(t, err)
}
})
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package yunikorn

import (
"fmt"

"github.com/google/uuid"
)

const (
TaskGroupGenericName = "task-group"
)

func GenerateTaskGroupName(master bool, index int) string {
if master {
return fmt.Sprintf("%s-%s", TaskGroupGenericName, "head")
}
return fmt.Sprintf("%s-%s-%d", TaskGroupGenericName, "worker", index)
}

func GenerateTaskGroupAppID() string {
uid := uuid.New().String()
return fmt.Sprintf("%s-%s", TaskGroupGenericName, uid)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
package yunikorn

import (
"fmt"
"testing"

"github.com/stretchr/testify/assert"
)

func TestGenerateTaskGroupName(t *testing.T) {
type inputFormat struct {
isMaster bool
index int
}
var tests = []struct {
input inputFormat
expect string
}{
{
input: inputFormat{isMaster: true, index: 0},
expect: fmt.Sprintf("%s-%s", TaskGroupGenericName, "head"),
},
{
input: inputFormat{isMaster: true, index: 1},
expect: fmt.Sprintf("%s-%s", TaskGroupGenericName, "head"),
},
{
input: inputFormat{isMaster: false, index: 0},
expect: fmt.Sprintf("%s-%s-%d", TaskGroupGenericName, "worker", 0),
},
{
input: inputFormat{isMaster: false, index: 1},
expect: fmt.Sprintf("%s-%s-%d", TaskGroupGenericName, "worker", 1),
},
}
t.Run("Generate ray task group name", func(t *testing.T) {
for _, tt := range tests {
got := GenerateTaskGroupName(tt.input.isMaster, tt.input.index)
assert.Equal(t, tt.expect, got)
}
})
}

func TestGenerateTaskGroupAppID(t *testing.T) {
t.Run("Generate ray app ID", func(t *testing.T) {
for _, tt := range tests {
got := GenerateTaskGroupAppID()
if len(got) <= 0 {
t.Error("Ray app ID is empty")
}
}
})
}
Loading