Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Updating annotations of pods belonging Ray cluster in order to adopting Yunikorn Gang scheduling #5594

Draft
wants to merge 30 commits into
base: master
Choose a base branch
from

Conversation

0yukali0
Copy link

@0yukali0 0yukali0 commented Jul 26, 2024

Tracking issue

Related to #5575

Why are the changes needed?

Ability to classify pods to different gang scheduling group based on their role and worker group number.
Currently, Pods under ray cluster share same gang scheduling task group via podTemplate.
With same pod spec, pods share same gang scheduling group name when adopting yunikorn.

What changes were proposed in this pull request?

For enable gang scheduling with yunikorn, updating annotations pods belonging to ray cluster based on role and number of worker group.

How was this patch tested?

run make test_unit and make lint in flyteplugins.

Setup process

  1. adopting yaml including scheduler name and parameters
tasks:
  task-plugins:
    enabled-plugins:
      - container
      - sidecar
      - K8S-ARRAY
      - ray
    default-for-task-types:
      - container: container
      - container_array: K8S-ARRAY
      - ray: ray
    
plugins:
  ray:
    batchScheduler:
      scheduler: yunikorn
      parameters: "placeholderTimeoutInSeconds=10 gangSchedulingStyle=Soft"
  1. Set flyte dev env
  2. Yunikorn installation
embedAdmissionController: false
enableSchedulerPlugin: false
enableWebService: false

helm install yunikorn yunikorn/yunikorn -f values.yaml -n yunikorn --create-namespace
4. Set flyte plugin ray
5.Submit ray example

import typing
from flytekit import ImageSpec, Resources, task, workflow

custom_image = ImageSpec(
        registry="localhost:30000",
    packages=["flytekitplugins-ray"],
    apt_packages=["wget"],
)

import ray
from flytekitplugins.ray import HeadNodeConfig, RayJobConfig, WorkerNodeConfig

@ray.remote
def f(x):
    return x * x

ray_config = RayJobConfig(
    head_node_config=HeadNodeConfig(ray_start_params={"log-color": "True"}),
    worker_node_config=[WorkerNodeConfig(group_name="ray-group", replicas=1), WorkerNodeConfig(group_name="ray-group2", replicas=1)],
    runtime_env={"pip": ["numpy", "pandas"]},  # or runtime_env="./requirements.txt"
    enable_autoscaling=False,
    shutdown_after_job_finishes=True,
    ttl_seconds_after_finished=3600,
)

@task(
    task_config=ray_config,
    requests=Resources(mem="2Gi", cpu="2"),
    container_image=custom_image,
)
def ray_task(n: int) -> typing.List[int]:
    futures = [f.remote(i) for i in range(n)]
    return ray.get(futures)

@workflow
def ray_workflow(n: int) -> typing.List[int]:
    return ray_task(n=n)

if __name__ == "__main__":
    print(ray_workflow(n=10))

Screenshots

craete
yunikornlog
results
chments/assets/d3d7d765-598f-4fcc-ae40-86bb8d684681)

Check all the applicable boxes

  • I updated the documentation accordingly.
  • All new and existing tests passed.
  • All commits are signed-off.

Related PRs

Docs link

Yunikorn gang scheduling doc

Copy link

welcome bot commented Jul 26, 2024

Thank you for opening this pull request! 🙌

These tips will help get your PR across the finish line:

  • Most of the repos have a PR template; if not, fill it out to the best of your knowledge.
  • Sign off your commits (Reference: DCO Guide).

Copy link

codecov bot commented Jul 26, 2024

Codecov Report

Attention: Patch coverage is 10.00000% with 144 lines in your changes missing coverage. Please review.

Project coverage is 34.44%. Comparing base (bed761c) to head (bd74f60).
Report is 68 commits behind head on master.

Files with missing lines Patch % Lines
...asks/plugins/k8s/batchscheduler/yunikorn/helper.go 0.00% 88 Missing ⚠️
flyteplugins/go/tasks/plugins/k8s/ray/ray.go 2.85% 34 Missing ⚠️
...o/tasks/plugins/k8s/batchscheduler/utils/helper.go 0.00% 12 Missing ⚠️
...er/pkg/controller/nodes/task/k8s/plugin_manager.go 0.00% 4 Missing and 2 partials ⚠️
...tasks/pluginmachinery/workqueue/mocks/processor.go 0.00% 2 Missing ⚠️
...o/tasks/plugins/k8s/batchscheduler/kueue/helper.go 0.00% 2 Missing ⚠️
Additional details and impacted files
@@            Coverage Diff             @@
##           master    #5594      +/-   ##
==========================================
- Coverage   36.17%   34.44%   -1.73%     
==========================================
  Files        1302     1143     -159     
  Lines      109492   102898    -6594     
==========================================
- Hits        39606    35446    -4160     
+ Misses      65748    63776    -1972     
+ Partials     4138     3676     -462     
Flag Coverage Δ
unittests-datacatalog 51.37% <ø> (ø)
unittests-flyteadmin 55.60% <ø> (+0.27%) ⬆️
unittests-flytecopilot 12.17% <ø> (ø)
unittests-flytectl ?
unittests-flyteidl 7.17% <ø> (+0.08%) ⬆️
unittests-flyteplugins 52.90% <10.38%> (-0.42%) ⬇️
unittests-flytepropeller 42.01% <0.00%> (+0.27%) ⬆️
unittests-flytestdlib 55.29% <ø> (-0.07%) ⬇️

Flags with carried forward coverage won't be shown. Click here to find out more.

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

@0yukali0 0yukali0 marked this pull request as draft July 27, 2024 05:43
@0yukali0 0yukali0 marked this pull request as ready for review July 29, 2024 21:48
@0yukali0 0yukali0 marked this pull request as draft July 30, 2024 14:04
@0yukali0 0yukali0 marked this pull request as ready for review July 30, 2024 15:28
@0yukali0 0yukali0 marked this pull request as draft August 15, 2024 12:57
@0yukali0
Copy link
Author

Hi @pingsutw, thanks for review.
I am about to improve this approach in these days.

Signed-off-by: yuteng <[email protected]>
Signed-off-by: yuteng <[email protected]>
Signed-off-by: yuteng <[email protected]>
Signed-off-by: yuteng <[email protected]>
Signed-off-by: yuteng <[email protected]>
Signed-off-by: yuteng <[email protected]>
Signed-off-by: yuteng <[email protected]>
Signed-off-by: yuteng <[email protected]>
Signed-off-by: yuteng <[email protected]>
Signed-off-by: yuteng <[email protected]>
Signed-off-by: yuteng <[email protected]>
Signed-off-by: yuteng <[email protected]>
Signed-off-by: yuteng <[email protected]>
Signed-off-by: yuteng <[email protected]>
Signed-off-by: yuteng <[email protected]>
@davidmirror-ops
Copy link
Contributor

@0yukali0 thank you, this is great. Let us know if you need help taking this to the finish line. This is important for several Flyte users

@0yukali0
Copy link
Author

0yukali0 commented Sep 1, 2024

Thanks @davidmirror-ops , i will.
Please feel free to mark any areas where i can improve this commit.
BTW, there's some exciting news regarding Spark.
The Spark operator has supported gang scheduling with Yunikorn since last week. Therefore, the future plugin for spark in Flyte can focus on generating Yunikorn application IDs and updating gang scheduling parameters when a new application is submitted.

The commit in spark operator

pingsutw and others added 4 commits September 8, 2024 02:09
Signed-off-by: Kevin Su <[email protected]>
Signed-off-by: Kevin Su <[email protected]>
Signed-off-by: Kevin Su <[email protected]>
Signed-off-by: yuteng <[email protected]>
"k8s.io/apimachinery/pkg/api/resource"
)

func MutateRayJob(parameters string, app *rayv1.RayJob) error {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Lets add some comments explaining what this does?

@@ -0,0 +1,16 @@
package scheduler
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Lets add some module level comments that will help in generated docs

)

type SchedulerManager interface {
// Mutate is responsible for mutating the object to be scheduled.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Lets explain that the Mutate method will mutate the object in place.

"sigs.k8s.io/controller-runtime/pkg/client"
)

type SchedulerManager interface {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about docs that say,something to this effect)

Suggested change
type SchedulerManager interface {
// SchedulerManager is an interface that allows plugging in custom schedulers
// such as Yunikorn, Kueue, or any other scheduler that manages pod scheduling
// with advanced features like gang scheduling, preemption, and priority.
type SchedulerManager interface {

@@ -0,0 +1,23 @@
package batchscheduler
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
package batchscheduler
// Flyte often deals with complex, distributed workloads that may require advanced scheduling features. Native Kubernetes scheduling may not suffice for scenarios that involve job-level scheduling policies like gang scheduling or job preemption. The SchedulerManager interface provides a flexible mechanism for integrating such schedulers into Flyte’s plugin system by allowing mutation of the scheduling-related properties of Kubernetes objects.
package batchscheduler

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

also how about we call this scheduler?

type SchedulerManager interface {
// Mutate is responsible for mutating the object to be scheduled.
// It will add the necessary annotations, labels, etc. to the object.
Mutate(ctx context.Context, object client.Object) error
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
Mutate(ctx context.Context, object client.Object) error
// Mutate allows the custom scheduler to modify the given Kubernetes object (e.g., a Pod or
// a custom resource representing a job) before it is submitted to the Kubernetes API.
// This may include adding annotations, labels, or modifying fields related to scheduling.
// The object is typically a Kubernetes Pod or a custom resource.
// It returns an error if the mutation fails.
Mutate(ctx context.Context, object client.Object) error


func (y *YunikornSchedulerManager) Mutate(ctx context.Context, object client.Object) error {
switch object.(type) {
case *rayv1.RayJob:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is weird isnt it?
WE have a common interface, but have a switch for each?
In this case, we should allow this overriding to exist next to ray job plugin and not with yunikorn. this will break when things update pretty quickly.

Why have a common interface?

@@ -203,6 +205,12 @@ func (e *PluginManager) launchResource(ctx context.Context, tCtx pluginsCore.Tas

key := backoff.ComposeResourceKey(o)

err = e.schedulerMgr.Mutate(ctx, o)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see this is why... So here is my suggestion, how about we handle the scheduler related changes directly to the plugin itself.

For example
MutateForGangScheduling

Also how about preemption etc if we add that in the future?

@kumare3
Copy link
Contributor

kumare3 commented Sep 22, 2024

I love this PR, @0yukali0 but there is probably a better way to introduce this interface. I do not love the idea of adding cases in a separate place from the actual ray job plugin. My preference is to keep all that code under one module, also avoids imports

Also can you share an example of how preemption and priority works with yunikorn?

@fg91
Copy link
Member

fg91 commented Oct 4, 2024

Adding support for a gang scheduler is a great effort, thank you 🙏

I personally am not a huge fan of the folder/module structure proposed in this PR because for each k8s plugin, we'd need to create another module under scheduler/yunikorn.

Is my understanding correct that we "only" have to add yunikorn annotations to the respective pod template specs?

If yes, would it be possible to have one central helper function defined that can be imported in the plugins and, if configured in the respective plugin's config, is called in the BuildResource function to append the annotations to the pod template spec?

Signed-off-by: yuteng <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants