Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support Elastic JobSets #463

Open
kannon92 opened this issue Mar 21, 2024 · 25 comments
Open

Support Elastic JobSets #463

kannon92 opened this issue Mar 21, 2024 · 25 comments
Assignees

Comments

@kannon92
Copy link
Contributor

What would you like to be added:

With Elastic Indexed jobs, it is possible to change completions/parallelism to down/up scale your jobs.

It would be nice to have something similar for JobSet.

Why is this needed:

Elastic jobs are an important usecase for autoscaling and other cases.

Implementation:

At a quick glance of the API this may be possible as replicas of a replicated job are not immutable so I think someone could patch the replicas of a ReplicatedJob to downscale or upscale.

But then I wonder what should we do with the existing replicated job?

And should we support ElasticIndexedJob with JobSet (so someone could patch the JobTemplate in a single replicated job?

@kannon92 kannon92 changed the title [RFC]: Elastic JobSet Discussion: Elastic JobSet Mar 21, 2024
@ahg-g
Copy link
Contributor

ahg-g commented Mar 21, 2024

Yes, I think we should think about allowing to autoscale the number of replicas in a replicatedJob! For example, the number of tpuslices (or more generally accelerator islands) supporting a large scale training job could scale down in case of failures.

But then I wonder what should we do with the existing replicated job?

The jobs are indexed, so a scale down means removing the higher order ones.

And should we support ElasticIndexedJob with JobSet (so someone could patch the JobTemplate in a single replicated job?

It is possible.

If the child jobs themselves should be elastic, then the operator could change the individual jobs directly. But I guess we could also allow changing that in bulk for all job replicas, but I need to hear a use case first.

@kannon92
Copy link
Contributor Author

So @ahg-g it sounds like this is supported as you are correct. Both replicas in ReplicatedJob are mutable and JobTemplate is mutable..

Maybe we should consider a task for this to at least document that this is possible?

I think Kueue or other use cases would be interested in this but not sure what we need in this repo.

@ahg-g
Copy link
Contributor

ahg-g commented Mar 21, 2024

We need to have tests for that to verify the behavior though.

@kannon92
Copy link
Contributor Author

Well good thing I tried it haha.

I think this code is blocking us from doing this.

https://github.com/kubernetes-sigs/jobset/blob/main/api/jobset/v1alpha2/jobset_webhook.go#L172

I tried a simple example and did a kubectl edit jobset and tried changing the replica of a ReplicatedJob.

Got:

error: jobsets.jobset.x-k8s.io "simple-no-ttl" could not be patched: admission webhook "vjobset.kb.io" denied the request: spec.replicatedJobs: Invalid value: []v1alpha2.ReplicatedJob{v1alpha2.ReplicatedJob{Name:"leader", Template:v1.JobTemplateSpec{ObjectMeta:v1.ObjectMeta{Name:"", GenerateName:"", Namespace:"", SelfLink:"", UID:"", ResourceVersion:"", Generation:0, CreationTimestamp:time.Date(1, time.January, 1, 0, 0, 0, 0, time.UTC), DeletionTimestamp:<nil>, DeletionGracePeriodSeconds:(*int64)(nil), Labels:map[string]string(nil), Annotations:map[string]string(nil), OwnerReferences:[]v1.OwnerReference(nil), Finalizers:[]string(nil), ManagedFields:[]v1.ManagedFieldsEntry(nil)}, Spec:v1.JobSpec{Parallelism:(*int32)(0xc0001fe320), Completions:(*int32)(0xc0001fe324), ActiveDeadlineSeconds:(*int64)(nil), PodFailurePolicy:(*v1.PodFailurePolicy)(nil), BackoffLimit:(*int32)(0xc0001fe328), BackoffLimitPerIndex:(*int32)(nil), MaxFailedIndexes:(*int32)(nil), Selector:(*v1.LabelSelector)(nil), ManualSelector:(*bool)(nil), Template:v1.PodTemplateSpec{ObjectMeta:v1.ObjectMeta{Name:"", GenerateName:"", Namespace:"", SelfLink:"", UID:"", ResourceVersion:"", Generation:0, CreationTimestamp:time.Date(1, time.January, 1, 0, 0, 0, 0, time.UTC), DeletionTimestamp:<nil>, DeletionGracePeriodSeconds:(*int64)(nil), Labels:map[string]string(nil), Annotations:map[string]string(nil), OwnerReferences:[]v1.OwnerReference(nil), Finalizers:[]string(nil), ManagedFields:[]v1.ManagedFieldsEntry(nil)}, Spec:v1.PodSpec{Volumes:[]v1.Volume(nil), InitContainers:[]v1.Container(nil), Containers:[]v1.Container{v1.Container{Name:"leader", Image:"bash:latest", Command:[]string{"bash", "-xc", "sleep 10000\n"}, Args:[]string(nil), WorkingDir:"", Ports:[]v1.ContainerPort(nil), EnvFrom:[]v1.EnvFromSource(nil), Env:[]v1.EnvVar(nil), Resources:v1.ResourceRequirements{Limits:v1.ResourceList(nil), Requests:v1.ResourceList(nil), Claims:[]v1.ResourceClaim(nil)}, ResizePolicy:[]v1.ContainerResizePolicy(nil), RestartPolicy:(*v1.ContainerRestartPolicy)(nil), VolumeMounts:[]v1.VolumeMount(nil), VolumeDevices:[]v1.VolumeDevice(nil), LivenessProbe:(*v1.Probe)(nil), ReadinessProbe:(*v1.Probe)(nil), StartupProbe:(*v1.Probe)(nil), Lifecycle:(*v1.Lifecycle)(nil), TerminationMessagePath:"", TerminationMessagePolicy:"", ImagePullPolicy:"", SecurityContext:(*v1.SecurityContext)(nil), Stdin:false, StdinOnce:false, TTY:false}}, EphemeralContainers:[]v1.EphemeralContainer(nil), RestartPolicy:"OnFailure", TerminationGracePeriodSeconds:(*int64)(nil), ActiveDeadlineSeconds:(*int64)(nil), DNSPolicy:"", NodeSelector:map[string]string(nil), ServiceAccountName:"", DeprecatedServiceAccount:"", AutomountServiceAccountToken:(*bool)(nil), NodeName:"", HostNetwork:false, HostPID:false, HostIPC:false, ShareProcessNamespace:(*bool)(nil), SecurityContext:(*v1.PodSecurityContext)(nil), ImagePullSecrets:[]v1.LocalObjectReference(nil), Hostname:"", Subdomain:"", Affinity:(*v1.Affinity)(nil), SchedulerName:"", Tolerations:[]v1.Toleration(nil), HostAliases:[]v1.HostAlias(nil), PriorityClassName:"", Priority:(*int32)(nil), DNSConfig:(*v1.PodDNSConfig)(nil), ReadinessGates:[]v1.PodReadinessGate(nil), RuntimeClassName:(*string)(nil), EnableServiceLinks:(*bool)(nil), PreemptionPolicy:(*v1.PreemptionPolicy)(nil), Overhead:v1.ResourceList(nil), TopologySpreadConstraints:[]v1.TopologySpreadConstraint(nil), SetHostnameAsFQDN:(*bool)(nil), OS:(*v1.PodOS)(nil), HostUsers:(*bool)(nil), SchedulingGates:[]v1.PodSchedulingGate(nil), ResourceClaims:[]v1.PodResourceClaim(nil)}}, TTLSecondsAfterFinished:(*int32)(nil), CompletionMode:(*v1.CompletionMode)(0xc00099fa60), Suspend:(*bool)(nil), PodReplacementPolicy:(*v1.PodReplacementPolicy)(nil)}}, Replicas:4}, v1alpha2.ReplicatedJob{Name:"workers", Template:v1.JobTemplateSpec{ObjectMeta:v1.ObjectMeta{Name:"", GenerateName:"", Namespace:"", SelfLink:"", UID:"", ResourceVersion:"", Generation:0, CreationTimestamp:time.Date(1, time.January, 1, 0, 0, 0, 0, time.UTC), DeletionTimestamp:<nil>, DeletionGracePeriodSeconds:(*int64)(nil), Labels:map[string]string(nil), Annotations:map[string]string(nil), OwnerReferences:[]v1.OwnerReference(nil), Finalizers:[]string(nil), ManagedFields:[]v1.ManagedFieldsEntry(nil)}, Spec:v1.JobSpec{Parallelism:(*int32)(0xc0001fe32c), Completions:(*int32)(0xc0001fe330), ActiveDeadlineSeconds:(*int64)(nil), PodFailurePolicy:(*v1.PodFailurePolicy)(nil), BackoffLimit:(*int32)(0xc0001fe334), BackoffLimitPerIndex:(*int32)(nil), MaxFailedIndexes:(*int32)(nil), Selector:(*v1.LabelSelector)(nil), ManualSelector:(*bool)(nil), Template:v1.PodTemplateSpec{ObjectMeta:v1.ObjectMeta{Name:"", GenerateName:"", Namespace:"", SelfLink:"", UID:"", ResourceVersion:"", Generation:0, CreationTimestamp:time.Date(1, time.January, 1, 0, 0, 0, 0, time.UTC), DeletionTimestamp:<nil>, DeletionGracePeriodSeconds:(*int64)(nil), Labels:map[string]string(nil), Annotations:map[string]string(nil), OwnerReferences:[]v1.OwnerReference(nil), Finalizers:[]string(nil), ManagedFields:[]v1.ManagedFieldsEntry(nil)}, Spec:v1.PodSpec{Volumes:[]v1.Volume(nil), InitContainers:[]v1.Container(nil), Containers:[]v1.Container{v1.Container{Name:"worker", Image:"bash:latest", Command:[]string{"bash", "-xc", "sleep 100000\n"}, Args:[]string(nil), WorkingDir:"", Ports:[]v1.ContainerPort(nil), EnvFrom:[]v1.EnvFromSource(nil), Env:[]v1.EnvVar(nil), Resources:v1.ResourceRequirements{Limits:v1.ResourceList(nil), Requests:v1.ResourceList(nil), Claims:[]v1.ResourceClaim(nil)}, ResizePolicy:[]v1.ContainerResizePolicy(nil), RestartPolicy:(*v1.ContainerRestartPolicy)(nil), VolumeMounts:[]v1.VolumeMount(nil), VolumeDevices:[]v1.VolumeDevice(nil), LivenessProbe:(*v1.Probe)(nil), ReadinessProbe:(*v1.Probe)(nil), StartupProbe:(*v1.Probe)(nil), Lifecycle:(*v1.Lifecycle)(nil), TerminationMessagePath:"", TerminationMessagePolicy:"", ImagePullPolicy:"", SecurityContext:(*v1.SecurityContext)(nil), Stdin:false, StdinOnce:false, TTY:false}}, EphemeralContainers:[]v1.EphemeralContainer(nil), RestartPolicy:"OnFailure", TerminationGracePeriodSeconds:(*int64)(nil), ActiveDeadlineSeconds:(*int64)(nil), DNSPolicy:"", NodeSelector:map[string]string(nil), ServiceAccountName:"", DeprecatedServiceAccount:"", AutomountServiceAccountToken:(*bool)(nil), NodeName:"", HostNetwork:false, HostPID:false, HostIPC:false, ShareProcessNamespace:(*bool)(nil), SecurityContext:(*v1.PodSecurityContext)(nil), ImagePullSecrets:[]v1.LocalObjectReference(nil), Hostname:"", Subdomain:"", Affinity:(*v1.Affinity)(nil), SchedulerName:"", Tolerations:[]v1.Toleration(nil), HostAliases:[]v1.HostAlias(nil), PriorityClassName:"", Priority:(*int32)(nil), DNSConfig:(*v1.PodDNSConfig)(nil), ReadinessGates:[]v1.PodReadinessGate(nil), RuntimeClassName:(*string)(nil), EnableServiceLinks:(*bool)(nil), PreemptionPolicy:(*v1.PreemptionPolicy)(nil), Overhead:v1.ResourceList(nil), TopologySpreadConstraints:[]v1.TopologySpreadConstraint(nil), SetHostnameAsFQDN:(*bool)(nil), OS:(*v1.PodOS)(nil), HostUsers:(*bool)(nil), SchedulingGates:[]v1.PodSchedulingGate(nil), ResourceClaims:[]v1.PodResourceClaim(nil)}}, TTLSecondsAfterFinished:(*int32)(nil), CompletionMode:(*v1.CompletionMode)(0xc00099fa70), Suspend:(*bool)(nil), PodReplacementPolicy:(*v1.PodReplacementPolicy)(nil)}}, Replicas:1}}: field is immutable

@kannon92
Copy link
Contributor Author

/retitle Support Elastic JobSets

@k8s-ci-robot k8s-ci-robot changed the title Discussion: Elastic JobSet Support Elastic JobSets Mar 21, 2024
@kannon92
Copy link
Contributor Author

I opened up #465 for discussion. We were treating the entire replicated job as immutable. It isn't clear to me what validation logic we want to have for a replicated job. We could go with just changing replicas (name and JobTemplate are immutable).

@k8s-triage-robot
Copy link

The Kubernetes project currently lacks enough contributors to adequately respond to all issues.

This bot triages un-triaged issues according to the following rules:

  • After 90d of inactivity, lifecycle/stale is applied
  • After 30d of inactivity since lifecycle/stale was applied, lifecycle/rotten is applied
  • After 30d of inactivity since lifecycle/rotten was applied, the issue is closed

You can:

  • Mark this issue as fresh with /remove-lifecycle stale
  • Close this issue with /close
  • Offer to help out with Issue Triage

Please send feedback to sig-contributor-experience at kubernetes/community.

/lifecycle stale

@k8s-ci-robot k8s-ci-robot added the lifecycle/stale Denotes an issue or PR has remained open with no activity and has become stale. label Jun 24, 2024
@ahg-g
Copy link
Contributor

ahg-g commented Jun 24, 2024

/remove-lifecycle stale

@k8s-ci-robot k8s-ci-robot removed the lifecycle/stale Denotes an issue or PR has remained open with no activity and has become stale. label Jun 24, 2024
@kannon92
Copy link
Contributor Author

@andreyvelich @tenzen-y Could you comment on what kind of behavior you'd expect from JobSet for Elastic PyTorch?

@danielvegamyhre
Copy link
Contributor

@andreyvelich @tenzen-y Could you comment on what kind of behavior you'd expect from JobSet for Elastic PyTorch?

Following up on this @andreyvelich @tenzen-y can you please respond to Kevin's question when you have a moment? Since his PR #622 is close to being ready to merge and I'd like to make sure we aren't locking ourselves into an implementation that may not play nicely with torch elastic training.

@andreyvelich
Copy link

Sorry for the late reply, sure, let me review #622.
Eventually, we are creating HPA that watch for utilization and scale-up number of Pods: https://github.com/kubeflow/training-operator/blob/master/pkg/controller.v1/pytorch/hpa.go#L33.

@kannon92
Copy link
Contributor Author

So for v2, would you use HPA to scale replicated jobs?

@tenzen-y
Copy link
Member

So for v2, would you use HPA to scale replicated jobs?

Yes, that's right. To support Elastic PyTorch Training, we need to implement this feature.
But, the current prioritized JobSet enhancement on the Kubeflow side is Serial Job ExecutionPolicy.

@ahg-g
Copy link
Contributor

ahg-g commented Oct 13, 2024

did we have a KEP for this feature?

@ahg-g
Copy link
Contributor

ahg-g commented Oct 13, 2024

The reason I am asking is because we didn't document the user stories and how this will work with PyTorch elastic training, so I am not sure if what we are implementing will address those stories.

@kannon92
Copy link
Contributor Author

I didn't think too because it seemed simple enough without any API changes.

@ahg-g
Copy link
Contributor

ahg-g commented Oct 13, 2024

It is more about how we expect this to be used, it is not clear to me how elastic PyTorch will integrate with this, and what we expect from other frameworks (like Jax for example) to behave to be able to take advantage of this feature.

@kannon92
Copy link
Contributor Author

I was following https://github.com/kubernetes/enhancements/tree/master/keps/sig-apps/3715-elastic-indexed-job#motivation lead. This is about allowing for one to change replicas of ReplicatedJobs.

@kannon92
Copy link
Contributor Author

From what Yuki and Andrey said, I don’t think HPA will be supported right away for v2. I also thought of this feature due to dynamic job support in Kueue. But I don’t have an exact use case at the moment.

@ahg-g
Copy link
Contributor

ahg-g commented Oct 14, 2024

Hard to build features without a clear and fully documented use case. I suggest to not more forward without a clear and practical user stories.

@kannon92
Copy link
Contributor Author

So what was the motivation behind Elastic Index Jobs? Have you found any use case with Elastic PyTorch Job or Jax?

@andreyvelich
Copy link

From what Yuki and Andrey said, I don’t think HPA will be supported right away for v2.

Our goal is to re-use elastic functionality from the JobSet when we support it.

So what was the motivation behind Elastic Index Jobs? Have you found any use case with Elastic PyTorch Job or Jax?

It will be useful for frameworks that support elasticity, for example you can configure torchrun --nnodes=1:4 which tolerates Node changes: https://pytorch.org/docs/stable/elastic/run.html#elastic-min-1-max-4-tolerates-up-to-3-membership-changes-or-failures.

Maybe @kuizhiqing @gaocegege @tenzen-y can share more elastic use-cases.

@kannon92
Copy link
Contributor Author

When I slept on this, I think I agree with @ahg-g.

We have two similar ideas for elasticity (#482) and this issue. It would be worth having a customer who has some ideas on how they would like this rather than implementing for the sake of it.

We could use Elastic Index Job or we could relax restrictions for the replicas. I think both are valuable but maybe having clear user stories for this or at least a customer ask where they are ready to pilot it is necessary.

@tenzen-y
Copy link
Member

I would recommend postponing this to v0.8 since it's challenging to provide the concrete user stories by the kubeflow side based on the v2 baseline development progresses.

After we implement the kubeflow v2 baseline, we can draw solid stories.

@danielvegamyhre
Copy link
Contributor

danielvegamyhre commented Oct 14, 2024

Hard to build features without a clear and fully documented use case. I suggest to not more forward without a clear and practical user stories.

@ahg-g you mentioned one potential use case here (support for scaling replicatedJobs to match number of healthy TPU slices). However, I agree that when designing this feature we should consider framework specific features with similar elasticity requirements, such as Pytorch elastic training. Handling changes to the underlying infra mid-training run would require both k8s layer handling and ML framework layer handling anyway.

It will be useful for frameworks that support elasticity, for example you can configure torchrun --nnodes=1:4 which tolerates Node changes

@andreyvelich I think this pytorch elastic use case would be best supported in JobSet by scaling the size of the indexed job (i.e., elastic indexed jobs) since we don't necessarily want to scale up by creating a new job replica with N pods (where N = .spec.parallelism), but rather want to create/delete individual pods to match the number of healthy nodes. The exact mechanism for triggering the scale up/down would need to be determined - seems like we may need a new controller which watches for Node events and scales Jobs up/down accordingly?)

For the use case of scaling the JobSet up/down to match the number of healthy of TPU slices, this would require scaling the number of Jobs, since N new nodes are atomically provisioned (or deprovisioned) and have special scheduling requirements to be usable, which are implemented in JobSet at the Job level (i.e., exclusive job placement per slice). For this the concept of an elastic jobset (i.e., scaling number of replicas in a replicatedJob) becomes useful.

In summary, it seems to me supporting pytorch elastic training and support for scaling up/down TPU slices will have different requirements and implementation details, so I agree it would be good to do a KEP discussing these use cases (among other use cases where elasticity is required) to determine the proper API(s) and implementation strategy.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
7 participants