Skip to content

Commit

Permalink
refactoring proposal
Browse files Browse the repository at this point in the history
Signed-off-by: yuteng <[email protected]>
  • Loading branch information
0yukali0 committed Oct 12, 2024
1 parent 7515dd8 commit bd74f60
Show file tree
Hide file tree
Showing 20 changed files with 228 additions and 183 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
package config

import (
schedulerConfig "github.com/flyteorg/flyte/flyteplugins/go/tasks/plugins/k8s/batchscheduler"
"time"

v1 "k8s.io/api/core/v1"
Expand Down Expand Up @@ -207,8 +206,6 @@ type K8sPluginConfig struct {

// SendObjectEvents indicates whether to send k8s object events in TaskExecutionEvent updates (similar to kubectl get events).
SendObjectEvents bool `json:"send-object-events" pflag:",If true, will send k8s object events in TaskExecutionEvent updates."`

BatchScheduler schedulerConfig.Config `json:"batchScheduler,omitempty"`
}

// FlyteCoPilotConfig specifies configuration for the Flyte CoPilot system. FlyteCoPilot, allows running flytekit-less containers
Expand Down
10 changes: 10 additions & 0 deletions flyteplugins/go/tasks/pluginmachinery/k8s/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,10 @@ package k8s

import (
"context"

"sigs.k8s.io/controller-runtime/pkg/client"

"github.com/flyteorg/flyte/flyteidl/gen/pb-go/flyteidl/core"
pluginsCore "github.com/flyteorg/flyte/flyteplugins/go/tasks/pluginmachinery/core"
"github.com/flyteorg/flyte/flyteplugins/go/tasks/pluginmachinery/io"
"github.com/flyteorg/flyte/flytestdlib/storage"
Expand Down Expand Up @@ -186,3 +188,11 @@ func MaybeUpdatePhaseVersionFromPluginContext(phaseInfo *pluginsCore.PhaseInfo,
MaybeUpdatePhaseVersion(phaseInfo, &pluginState)
return nil
}

type YunikornScheduablePlugin interface {
MutateResourceForYunikorn(ctx context.Context, object client.Object, taskTmpl *core.TaskTemplate) (client.Object, error)
}

type KueueScheduablePlugin interface {
MutateResourceForKueue(ctx context.Context, object client.Object, taskTmpl *core.TaskTemplate) (client.Object, error)
}
24 changes: 13 additions & 11 deletions flyteplugins/go/tasks/plugins/k8s/batchscheduler/config.go
Original file line number Diff line number Diff line change
@@ -1,21 +1,23 @@
package batchscheduler

type Config struct {
Scheduler string `json:"scheduler,omitempty" pflag:", Specify batch scheduler to"`
Parameters string `json:"parameters,omitempty" pflag:", Specify static parameters"`
Scheduler string `json:"scheduler,omitempty" pflag:", Specify batch scheduler to"`
Default SchedulingConfig `json:"default,omitempty" pflag:", Specify default scheduling config which batch scheduler adopts"`
NameSpace map[string]SchedulingConfig `json:"Namespace,omitempty" pflag:"-, Specify namespace scheduling config"`
Domain map[string]SchedulingConfig `json:"Domain,omitempty" pflag:"-, Specify domain scheduling config"`
}

func NewConfig() Config {
return Config{
Scheduler: "",
Parameters: "",
}
type SchedulingConfig struct {
KueueConfig `json:"Kueue,omitempty" pflag:", Specify Kueue scheduling scheduling config"`
YunikornConfig `json:"Yunikorn,omitempty" pflag:", Yunikorn scheduling config"`
}

func (b *Config) GetScheduler() string {
return b.Scheduler
type KueueConfig struct {
PriorityClassName string `json:"Priority,omitempty" pflag:", Kueue Prioty class"`
Queue string `json:"Queue,omitempty" pflag:", Specify batch scheduler to"`
}

func (b *Config) GetParameters() string {
return b.Parameters
type YunikornConfig struct {
Parameters string `json:"parameters,omitempty" pflag:", Specify gangscheduling policy"`
Queue string `json:"queue,omitempty" pflag:", Specify leaf queue to submit to"`
}
15 changes: 0 additions & 15 deletions flyteplugins/go/tasks/plugins/k8s/batchscheduler/config_test.go

This file was deleted.

16 changes: 16 additions & 0 deletions flyteplugins/go/tasks/plugins/k8s/batchscheduler/kueue/helper.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package kueue

import (
rayv1 "github.com/ray-project/kuberay/ray-operator/apis/ray/v1"

"github.com/flyteorg/flyte/flyteplugins/go/tasks/plugins/k8s/batchscheduler/utils"
)

const (
QueueName = "kueue.x-k8s.io/queue-name"
PriorityClassName = "kueue.x-k8s.io/priority-class"
)

func UpdateKueueLabels(labels map[string]string, app *rayv1.RayJob) {
utils.UpdateLabels(labels, &app.ObjectMeta)

Check warning on line 15 in flyteplugins/go/tasks/plugins/k8s/batchscheduler/kueue/helper.go

View check run for this annotation

Codecov / codecov/patch

flyteplugins/go/tasks/plugins/k8s/batchscheduler/kueue/helper.go#L14-L15

Added lines #L14 - L15 were not covered by tests
}
23 changes: 0 additions & 23 deletions flyteplugins/go/tasks/plugins/k8s/batchscheduler/plugins.go

This file was deleted.

This file was deleted.

This file was deleted.

This file was deleted.

30 changes: 30 additions & 0 deletions flyteplugins/go/tasks/plugins/k8s/batchscheduler/utils/helper.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
package utils

import (
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

func UpdateLabels(wanted map[string]string, objectMeta *metav1.ObjectMeta) {
for key, value := range wanted {
if _, exist := objectMeta.Labels[key]; !exist {
objectMeta.Labels[key] = value

Check warning on line 11 in flyteplugins/go/tasks/plugins/k8s/batchscheduler/utils/helper.go

View check run for this annotation

Codecov / codecov/patch

flyteplugins/go/tasks/plugins/k8s/batchscheduler/utils/helper.go#L8-L11

Added lines #L8 - L11 were not covered by tests
}
}
}

func UpdateAnnotations(wanted map[string]string, objectMeta *metav1.ObjectMeta) {
for key, value := range wanted {
if _, exist := objectMeta.Annotations[key]; !exist {
objectMeta.Annotations[key] = value

Check warning on line 19 in flyteplugins/go/tasks/plugins/k8s/batchscheduler/utils/helper.go

View check run for this annotation

Codecov / codecov/patch

flyteplugins/go/tasks/plugins/k8s/batchscheduler/utils/helper.go#L16-L19

Added lines #L16 - L19 were not covered by tests
}
}
}

func UpdatePodTemplateAnnotatations(wanted map[string]string, pod *v1.PodTemplateSpec) {
UpdateAnnotations(wanted, &pod.ObjectMeta)

Check warning on line 25 in flyteplugins/go/tasks/plugins/k8s/batchscheduler/utils/helper.go

View check run for this annotation

Codecov / codecov/patch

flyteplugins/go/tasks/plugins/k8s/batchscheduler/utils/helper.go#L24-L25

Added lines #L24 - L25 were not covered by tests
}

func UpdatePodTemplateLabels(wanted map[string]string, pod *v1.PodTemplateSpec) {
UpdateLabels(wanted, &pod.ObjectMeta)

Check warning on line 29 in flyteplugins/go/tasks/plugins/k8s/batchscheduler/utils/helper.go

View check run for this annotation

Codecov / codecov/patch

flyteplugins/go/tasks/plugins/k8s/batchscheduler/utils/helper.go#L28-L29

Added lines #L28 - L29 were not covered by tests
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,25 @@ package yunikorn

import (
"encoding/json"

rayv1 "github.com/ray-project/kuberay/ray-operator/apis/ray/v1"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"

"github.com/flyteorg/flyte/flyteplugins/go/tasks/plugins/k8s/batchscheduler/utils"
)

const (
Yunikorn = "yunikorn"
AppID = "yunikorn.apache.org/app-id"
Queue = "yunikorn.apache.org/queue"
TaskGroupNameKey = "yunikorn.apache.org/task-group-name"
TaskGroupsKey = "yunikorn.apache.org/task-groups"
TaskGroupParameters = "yunikorn.apache.org/schedulingPolicyParameters"
)

func MutateRayJob(parameters string, app *rayv1.RayJob) error {
func MutateRayJob(app *rayv1.RayJob) error {
appID := GenerateTaskGroupAppID()
rayjobSpec := &app.Spec
appSpec := rayjobSpec.RayClusterSpec
Expand Down Expand Up @@ -60,11 +73,30 @@ func MutateRayJob(parameters string, app *rayv1.RayJob) error {
return err

Check warning on line 73 in flyteplugins/go/tasks/plugins/k8s/batchscheduler/yunikorn/helper.go

View check run for this annotation

Codecov / codecov/patch

flyteplugins/go/tasks/plugins/k8s/batchscheduler/yunikorn/helper.go#L70-L73

Added lines #L70 - L73 were not covered by tests
}
meta.Annotations[TaskGroupsKey] = string(info[:])
meta.Annotations[TaskGroupParameters] = parameters
meta.Annotations[AppID] = appID
return nil

Check warning on line 77 in flyteplugins/go/tasks/plugins/k8s/batchscheduler/yunikorn/helper.go

View check run for this annotation

Codecov / codecov/patch

flyteplugins/go/tasks/plugins/k8s/batchscheduler/yunikorn/helper.go#L75-L77

Added lines #L75 - L77 were not covered by tests
}

func UpdateGangSchedulingParameters(parameters string, objectMeta *metav1.ObjectMeta) {
if len(parameters) == 0 {
return

Check warning on line 82 in flyteplugins/go/tasks/plugins/k8s/batchscheduler/yunikorn/helper.go

View check run for this annotation

Codecov / codecov/patch

flyteplugins/go/tasks/plugins/k8s/batchscheduler/yunikorn/helper.go#L80-L82

Added lines #L80 - L82 were not covered by tests
}
utils.UpdateAnnotations(
map[string]string{TaskGroupParameters: parameters},
objectMeta,
)

Check warning on line 87 in flyteplugins/go/tasks/plugins/k8s/batchscheduler/yunikorn/helper.go

View check run for this annotation

Codecov / codecov/patch

flyteplugins/go/tasks/plugins/k8s/batchscheduler/yunikorn/helper.go#L84-L87

Added lines #L84 - L87 were not covered by tests
}

func UpdateAnnotations(labels map[string]string, app *rayv1.RayJob) {
appSpec := app.Spec.RayClusterSpec
headSpec := appSpec.HeadGroupSpec
utils.UpdatePodTemplateAnnotatations(labels, &headSpec.Template)
for index := range appSpec.WorkerGroupSpecs {
worker := appSpec.WorkerGroupSpecs[index]
utils.UpdatePodTemplateAnnotatations(labels, &worker.Template)

Check warning on line 96 in flyteplugins/go/tasks/plugins/k8s/batchscheduler/yunikorn/helper.go

View check run for this annotation

Codecov / codecov/patch

flyteplugins/go/tasks/plugins/k8s/batchscheduler/yunikorn/helper.go#L90-L96

Added lines #L90 - L96 were not covered by tests
}
}

func Allocation(containers []v1.Container) v1.ResourceList {
totalResources := v1.ResourceList{}
for _, c := range containers {
Expand All @@ -81,19 +113,19 @@ func Allocation(containers []v1.Container) v1.ResourceList {
return totalResources

Check warning on line 113 in flyteplugins/go/tasks/plugins/k8s/batchscheduler/yunikorn/helper.go

View check run for this annotation

Codecov / codecov/patch

flyteplugins/go/tasks/plugins/k8s/batchscheduler/yunikorn/helper.go#L113

Added line #L113 was not covered by tests
}

func Add(a v1.ResourceList, b v1.ResourceList) v1.ResourceList {
result := a
for name, value := range a {
sum := &value
if value2, ok := b[name]; ok {
func Add(left v1.ResourceList, right v1.ResourceList) v1.ResourceList {
result := left
for name, value := range left {
sum := value
if value2, ok := right[name]; ok {
sum.Add(value2)
result[name] = *sum
result[name] = sum
} else {
result[name] = value

Check warning on line 124 in flyteplugins/go/tasks/plugins/k8s/batchscheduler/yunikorn/helper.go

View check run for this annotation

Codecov / codecov/patch

flyteplugins/go/tasks/plugins/k8s/batchscheduler/yunikorn/helper.go#L116-L124

Added lines #L116 - L124 were not covered by tests
}
}
for name, value := range b {
if _, ok := a[name]; !ok {
for name, value := range right {
if _, ok := left[name]; !ok {
result[name] = value

Check warning on line 129 in flyteplugins/go/tasks/plugins/k8s/batchscheduler/yunikorn/helper.go

View check run for this annotation

Codecov / codecov/patch

flyteplugins/go/tasks/plugins/k8s/batchscheduler/yunikorn/helper.go#L127-L129

Added lines #L127 - L129 were not covered by tests
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,15 @@ import (
"testing"

"github.com/stretchr/testify/assert"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
)

func TestMarshal(t *testing.T) {
res := v1.ResourceList{
v1.ResourceCPU: resource.MustParse("500m"),
v1.ResourceMemory: resource.MustParse("512Mi"),
}
t1 := TaskGroup{
Name: "tg1",
MinMember: int32(1),
Expand Down Expand Up @@ -43,4 +49,4 @@ func TestMarshal(t *testing.T) {
assert.Nil(t, err)
}
})
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -43,11 +43,9 @@ func TestGenerateTaskGroupName(t *testing.T) {

func TestGenerateTaskGroupAppID(t *testing.T) {
t.Run("Generate ray app ID", func(t *testing.T) {
for _, tt := range tests {
got := GenerateTaskGroupAppID()
if len(got) <= 0 {
t.Error("Ray app ID is empty")
}
got := GenerateTaskGroupAppID()
if len(got) <= 0 {
t.Error("Ray app ID is empty")
}
})
}
}
Loading

0 comments on commit bd74f60

Please sign in to comment.