Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

support successPolicy and failurePolicy on pytorchjob #1575

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 26 additions & 0 deletions docs/api/kubeflow.org_v1_generated.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,8 @@ Package v1 contains API Schema definitions for the kubeflow.org v1 API group
| *`nProcPerNode`* __integer__ | Number of workers per node; supported values: [auto, cpu, gpu, int].
| *`maxRestarts`* __integer__ |
| *`metrics`* __link:https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.22/#metricspec-v2beta2-autoscaling[$$MetricSpec$$] array__ | Metrics contains the specifications which are used to calculate the desired replica count (the maximum replica count across all metrics will be used). The desired replica count is calculated with multiplying the ratio between the target value and the current value by the current number of pods. Ergo, metrics used must decrease as the pod count is increased, and vice-versa. See the individual metric source types for more information about how each type of metric must respond. If not set, the HPA will not be created.
| *`successPolicy`* __xref:{anchor_prefix}-github-com-kubeflow-training-operator-pkg-apis-kubeflow-org-v1-pytorchsuccesspolicy[$$PyTorchSuccessPolicy$$]__ |
| *`failurePolicy`* __xref:{anchor_prefix}-github-com-kubeflow-training-operator-pkg-apis-kubeflow-org-v1-pytorchfailurepolicy[$$PyTorchFailurePolicy$$]__ |
|===


Expand Down Expand Up @@ -195,6 +197,18 @@ MXJobSpec defines the desired state of MXJob



[id="{anchor_prefix}-github-com-kubeflow-training-operator-pkg-apis-kubeflow-org-v1-pytorchfailurepolicy"]
==== PyTorchFailurePolicy (string)



.Appears In:
****
- xref:{anchor_prefix}-github-com-kubeflow-training-operator-pkg-apis-kubeflow-org-v1-elasticpolicy[$$ElasticPolicy$$]
****



[id="{anchor_prefix}-github-com-kubeflow-training-operator-pkg-apis-kubeflow-org-v1-pytorchjob"]
==== PyTorchJob

Expand Down Expand Up @@ -256,6 +270,18 @@ PyTorchJobSpec is a desired state description of the PyTorchJob.
|===


[id="{anchor_prefix}-github-com-kubeflow-training-operator-pkg-apis-kubeflow-org-v1-pytorchsuccesspolicy"]
==== PyTorchSuccessPolicy (string)



.Appears In:
****
- xref:{anchor_prefix}-github-com-kubeflow-training-operator-pkg-apis-kubeflow-org-v1-elasticpolicy[$$ElasticPolicy$$]
****



[id="{anchor_prefix}-github-com-kubeflow-training-operator-pkg-apis-kubeflow-org-v1-rdzvbackend"]
==== RDZVBackend (string)

Expand Down
6 changes: 6 additions & 0 deletions hack/python-sdk/swagger.json
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,9 @@
"kubeflow.org.v1.ElasticPolicy": {
"type": "object",
"properties": {
"failurePolicy": {
"type": "string"
},
"maxReplicas": {
"description": "upper limit for the number of pods that can be set by the autoscaler; cannot be smaller than MinReplicas, defaults to null.",
"type": "integer",
Expand Down Expand Up @@ -61,6 +64,9 @@
"standalone": {
"description": "Start a local standalone rendezvous backend that is represented by a C10d TCP store on port 29400. Useful when launching single-node, multi-worker job. If specified --rdzv_backend, --rdzv_endpoint, --rdzv_id are auto-assigned; any explicitly set values are ignored.",
"type": "boolean"
},
"successPolicy": {
"type": "string"
}
}
},
Expand Down
4 changes: 4 additions & 0 deletions manifests/base/crds/kubeflow.org_pytorchjobs.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@ spec:
properties:
elasticPolicy:
properties:
failurePolicy:
type: string
maxReplicas:
description: upper limit for the number of pods that can be set
by the autoscaler; cannot be smaller than MinReplicas, defaults
Expand Down Expand Up @@ -583,6 +585,8 @@ spec:
--rdzv_endpoint, --rdzv_id are auto-assigned; any explicitly
set values are ignored.
type: boolean
successPolicy:
type: string
type: object
pytorchReplicaSpecs:
additionalProperties:
Expand Down
12 changes: 12 additions & 0 deletions pkg/apis/kubeflow.org/v1/openapi_generated.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 8 additions & 0 deletions pkg/apis/kubeflow.org/v1/pytorch_defaults.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,14 @@ func setElasticPolicy(pytorchJob *PyTorchJob) {
pytorchJob.Spec.ElasticPolicy.MaxReplicas = workerReplicas
pytorchJob.Spec.ElasticPolicy.MinReplicas = workerReplicas
}
if pytorchJob.Spec.ElasticPolicy.SuccessPolicy == nil {
policy := PyTorchSuccessPolicyDefault
pytorchJob.Spec.ElasticPolicy.SuccessPolicy = &policy
}
if pytorchJob.Spec.ElasticPolicy.FailurePolicy == nil {
policy := PyTorchFailurePolicyDefault
pytorchJob.Spec.ElasticPolicy.FailurePolicy = &policy
}
}
}

Expand Down
17 changes: 17 additions & 0 deletions pkg/apis/kubeflow.org/v1/pytorch_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,8 +122,25 @@ type ElasticPolicy struct {
// If not set, the HPA will not be created.
// +optional
Metrics []autoscalingv2beta2.MetricSpec `json:"metrics,omitempty"`

SuccessPolicy *PyTorchSuccessPolicy `json:"successPolicy,omitempty"`
FailurePolicy *PyTorchFailurePolicy `json:"failurePolicy,omitempty"`
}

type PyTorchSuccessPolicy string

const (
PyTorchSuccessPolicyDefault PyTorchSuccessPolicy = "" // if worker0 is success, the job is set to be success
PyTorchSuccessPolicyAllWorkers PyTorchSuccessPolicy = "AllWorkers" // only if all pods is success, the job is set to be success
)

type PyTorchFailurePolicy string

const (
PyTorchFailurePolicyDefault PyTorchFailurePolicy = "" // if one pods fails, the job is set to be fail
PyTorchFailurePolicyByMinReplicas PyTorchFailurePolicy = "ByMinReplicas" // only if running pods is less than MinReplicas, the job is set to be fail
)

type RDZVConf struct {
Key string `json:"key,omitempty"`
Value string `json:"value,omitempty"`
Expand Down
10 changes: 10 additions & 0 deletions pkg/apis/kubeflow.org/v1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

14 changes: 7 additions & 7 deletions pkg/client/clientset/versioned/fake/register.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

14 changes: 7 additions & 7 deletions pkg/client/clientset/versioned/scheme/register.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

12 changes: 12 additions & 0 deletions pkg/common/util/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,3 +99,15 @@ func DurationUntilExpireTime(runPolicy *commonv1.RunPolicy, jobStatus commonv1.J
return expireTime.Sub(currentTime), nil
}
}

// GetContainerExitCode gets the container exit code from the given pod.
func GetContainerExitCode(pod *corev1.Pod, name string) int32 {
var exitCode int32 = 0xbeef // magic number
for _, status := range pod.Status.ContainerStatuses {
state := status.State
if status.Name == name && state.Terminated != nil {
exitCode = state.Terminated.ExitCode
}
}
return exitCode
}
46 changes: 46 additions & 0 deletions pkg/controller.v1/pytorch/label.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
package pytorch

import (
"fmt"
"strconv"

kubeflowv1 "github.com/kubeflow/training-operator/pkg/apis/kubeflow.org/v1"
corev1 "k8s.io/api/core/v1"
volcanov1beta1 "volcano.sh/apis/pkg/apis/scheduling/v1beta1"
)

type PodTemplateLabelDecoratorFunc func(obj interface{}, podTemplateSpec *corev1.PodTemplateSpec, rtype, index string) error

func LabelDecoratorForVolcanoPreemptable(obj interface{}, podTemplateSpec *corev1.PodTemplateSpec, rtype, index string) error {
pytorchjob, ok := obj.(*kubeflowv1.PyTorchJob)
if !ok {
return fmt.Errorf("%+v is not a type of PyTorchJob", obj)
}
if len(podTemplateSpec.Labels) == 0 {
podTemplateSpec.Labels = make(map[string]string)
}
// elastic mode
if pytorchjob.Spec.ElasticPolicy != nil && pytorchjob.Spec.PyTorchReplicaSpecs[kubeflowv1.PyTorchJobReplicaTypeMaster] == nil {
// If the master is null, then we need to set the volcano.sh/preemptable = false to make sure that work0 can not be preempted
rank, err := strconv.Atoi(index)
if err != nil {
return err
}
if rank == 0 {
podTemplateSpec.Labels[volcanov1beta1.PodPreemptable] = "false"
} else {
podTemplateSpec.Labels[volcanov1beta1.PodPreemptable] = "true"
}
}
return nil
}

func setPodLabel(obj interface{}, podTemplateSpec *corev1.PodTemplateSpec, rtype, index string, decorators ...PodTemplateLabelDecoratorFunc) error {

for _, decorator := range decorators {
if err := decorator(obj, podTemplateSpec, rtype, index); err != nil {
return err
}
}
return nil
}
67 changes: 62 additions & 5 deletions pkg/controller.v1/pytorch/pytorchjob_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package pytorch
import (
"context"
"fmt"
"strings"
"time"

"github.com/go-logr/logr"
Expand Down Expand Up @@ -415,9 +416,16 @@ func (r *PyTorchJobReconciler) UpdateJobStatus(job interface{},
}
} else {
if rtype == kubeflowv1.PyTorchJobReplicaTypeWorker {
// TODO(gaocegege): Support SuccessPolicy
if expected == 0 {
msg := fmt.Sprintf("PyTorchJob %s/%s successfully completed.",
worker0Completed, err := r.IsWorker0Completed(pytorchjob, replicas)
if err != nil {
logger.Warnf("check if worker 0 completed error %v", err)
return err
}
// Leave a succeeded condition for the following two cases:
// 1. If default success policy is used and worker 0 has completed.
// 2. If `SuccessPolicyAllWorkers` success policy is used and all workers are succeeded.
if expected == 0 || (worker0Completed && pytorchjob.Spec.ElasticPolicy != nil && *pytorchjob.Spec.ElasticPolicy.SuccessPolicy != kubeflowv1.PyTorchSuccessPolicyAllWorkers) {
msg := fmt.Sprintf("TFJob %s/%s successfully completed.",
pytorchjob.Namespace, pytorchjob.Name)
r.recorder.Event(pytorchjob, corev1.EventTypeNormal, commonutil.JobSucceededReason, msg)
if jobStatus.CompletionTime == nil {
Expand Down Expand Up @@ -454,7 +462,7 @@ func (r *PyTorchJobReconciler) UpdateJobStatus(job interface{},
return err
}
trainingoperatorcommon.RestartedJobsCounterInc(pytorchjob.Namespace, kubeflowv1.PytorchJobFrameworkName)
} else {
} else if pytorchjob.Spec.ElasticPolicy == nil || running < *pytorchjob.Spec.ElasticPolicy.MinReplicas || *pytorchjob.Spec.ElasticPolicy.FailurePolicy == kubeflowv1.PyTorchFailurePolicyDefault {
msg := fmt.Sprintf("PyTorchJob %s is failed because %d %s replica(s) failed.", pytorchjob.Name, failed, rtype)
r.Recorder.Event(pytorchjob, corev1.EventTypeNormal, commonutil.JobFailedReason, msg)
if pytorchjob.Status.CompletionTime == nil {
Expand All @@ -468,13 +476,59 @@ func (r *PyTorchJobReconciler) UpdateJobStatus(job interface{},
}
trainingoperatorcommon.FailedJobsCounterInc(pytorchjob.Namespace, kubeflowv1.PytorchJobFrameworkName)
}
// if running pods is greater or equal than MinReplicas, the job is running
}
}

return nil
}

// ContainsMasterSpec returns true if the pytorchjob contains master spec.
// IsWorker0Completed returns true if pod of worker0 succeeded and qexited with 0
func (p *PyTorchJobReconciler) IsWorker0Completed(job *kubeflowv1.PyTorchJob,
replicas map[commonv1.ReplicaType]*commonv1.ReplicaSpec) (bool, error) {
worker0Completed := false
_, ok := replicas[kubeflowv1.PyTorchJobReplicaTypeWorker]
if !ok {
return true, nil
}
podSlices, err := p.getPodSlices(job, replicas[kubeflowv1.PyTorchJobReplicaTypeWorker].Replicas, string(kubeflowv1.PyTorchJobReplicaTypeWorker))
if err != nil {
return false, err
}
for index, podSlice := range podSlices {
if len(podSlice) == 1 {
pod := podSlice[0]
exitCode := util.GetContainerExitCode(pod, kubeflowv1.PytorchJobDefaultContainerName)
if index == 0 && exitCode == 0 && pod.Status.Phase == corev1.PodSucceeded {
worker0Completed = true
}
}
}
return worker0Completed, nil
}

// getPodSlices returns a slice, which element is the slice of pod.
// It gives enough information to caller to make decision to up/down scale resources.
func (p *PyTorchJobReconciler) getPodSlices(job *kubeflowv1.PyTorchJob, replicasNum *int32, rtype string) ([][]*corev1.Pod, error) {
logger := commonutil.LoggerForReplica(job, strings.ToLower(rtype))

pods, err := p.GetPodsForJob(job)
if err != nil {
commonutil.LoggerForJob(job).Warnf("getPodsForTFJob error %v", err)
return nil, err
}

// Get all pods for the type rt.
pods, err = p.JobController.FilterPodsForReplicaType(pods, strings.ToLower(rtype))
if err != nil {
return nil, err
}

podSlices := p.GetPodSlices(pods, int(*replicasNum), logger)
return podSlices, nil
}

// ContainsMasterSpec returns true if the tfjob contains master spec.
func ContainsMasterSpec(replicas map[commonv1.ReplicaType]*commonv1.ReplicaSpec) bool {
if _, ok := replicas[kubeflowv1.PyTorchJobReplicaTypeMaster]; ok {
return true
Expand Down Expand Up @@ -515,6 +569,9 @@ func (r *PyTorchJobReconciler) UpdateJobStatusInApiServer(job interface{}, jobSt

// SetClusterSpec sets the cluster spec and init container for the pod
func (r *PyTorchJobReconciler) SetClusterSpec(job interface{}, podTemplate *corev1.PodTemplateSpec, rtype, index string) error {
if err := setPodLabel(job, podTemplate, rtype, index, LabelDecoratorForVolcanoPreemptable); err != nil {
return err
}
if err := setPodEnv(job, podTemplate, rtype, index); err != nil {
return err
}
Expand Down
2 changes: 2 additions & 0 deletions sdk/python/docs/KubeflowOrgV1ElasticPolicy.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
## Properties
Name | Type | Description | Notes
------------ | ------------- | ------------- | -------------
**failure_policy** | **str** | | [optional]
**max_replicas** | **int** | upper limit for the number of pods that can be set by the autoscaler; cannot be smaller than MinReplicas, defaults to null. | [optional]
**max_restarts** | **int** | | [optional]
**metrics** | [**list[K8sIoApiAutoscalingV2beta2MetricSpec]**](K8sIoApiAutoscalingV2beta2MetricSpec.md) | Metrics contains the specifications which are used to calculate the desired replica count (the maximum replica count across all metrics will be used). The desired replica count is calculated with multiplying the ratio between the target value and the current value by the current number of pods. Ergo, metrics used must decrease as the pod count is increased, and vice-versa. See the individual metric source types for more information about how each type of metric must respond. If not set, the HPA will not be created. | [optional]
Expand All @@ -14,6 +15,7 @@ Name | Type | Description | Notes
**rdzv_id** | **str** | | [optional]
**rdzv_port** | **int** | | [optional]
**standalone** | **bool** | Start a local standalone rendezvous backend that is represented by a C10d TCP store on port 29400. Useful when launching single-node, multi-worker job. If specified --rdzv_backend, --rdzv_endpoint, --rdzv_id are auto-assigned; any explicitly set values are ignored. | [optional]
**success_policy** | **str** | | [optional]

[[Back to Model list]](../README.md#documentation-for-models) [[Back to API list]](../README.md#documentation-for-api-endpoints) [[Back to README]](../README.md)

Expand Down
Loading