Skip to content

Commit

Permalink
add ActiveDeadlineSeconds and BackoffLimit features (#963)
Browse files Browse the repository at this point in the history
* add ActiveDeadlineSeconds and BackoffLimit features

* fix goimport and unassign variable in test

* fix test and delete the package added by dep ensure

* fix goimports

* fix ActiveDeadlineSeconds unit test

* add logger for test

* add logger for test

* add logger for test

* add BackoffForOnFailure test

* fix test

* fix test

* fix unit test
  • Loading branch information
ChanYiLin authored and k8s-ci-robot committed Mar 26, 2019
1 parent 4689a23 commit aa322c7
Show file tree
Hide file tree
Showing 10 changed files with 545 additions and 23 deletions.
11 changes: 11 additions & 0 deletions pkg/apis/tensorflow/v1beta2/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,17 @@ type TFJob struct {

// TFJobSpec is a desired state description of the TFJob.
type TFJobSpec struct {
// Specifies the duration in seconds relative to the startTime that the job may be active
// before the system tries to terminate it; value must be positive integer.
// This method applies only to pods with restartPolicy == OnFailure or Always.
// +optional
ActiveDeadlineSeconds *int64 `json:"activeDeadlineSeconds,omitempty"`

// Optional number of retries before marking this job failed.
// Defaults to 6
// +optional
BackoffLimit *int32 `json:"backoffLimit,omitempty"`

// CleanPodPolicy defines the policy to kill pods after TFJob is
// succeeded.
// Default to Running.
Expand Down
10 changes: 10 additions & 0 deletions pkg/apis/tensorflow/v1beta2/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 5 additions & 2 deletions pkg/common/util/v1beta2/testutil/pod.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,15 +64,18 @@ func NewPodList(count int32, status v1.PodPhase, tfJob *tfv1beta2.TFJob, typ str
return pods
}

func SetPodsStatuses(podIndexer cache.Indexer, tfJob *tfv1beta2.TFJob, typ string, pendingPods, activePods, succeededPods, failedPods int32, t *testing.T) {
func SetPodsStatuses(podIndexer cache.Indexer, tfJob *tfv1beta2.TFJob, typ string, pendingPods, activePods, succeededPods, failedPods int32, restartCounts []int32, t *testing.T) {
var index int32
for _, pod := range NewPodList(pendingPods, v1.PodPending, tfJob, typ, index, t) {
if err := podIndexer.Add(pod); err != nil {
t.Errorf("%s: unexpected error when adding pod %v", tfJob.Name, err)
}
}
index += pendingPods
for _, pod := range NewPodList(activePods, v1.PodRunning, tfJob, typ, index, t) {
for i, pod := range NewPodList(activePods, v1.PodRunning, tfJob, typ, index, t) {
if restartCounts != nil {
pod.Status.ContainerStatuses = []v1.ContainerStatus{{RestartCount: restartCounts[i]}}
}
if err := podIndexer.Add(pod); err != nil {
t.Errorf("%s: unexpected error when adding pod %v", tfJob.Name, err)
}
Expand Down
32 changes: 32 additions & 0 deletions pkg/common/util/v1beta2/testutil/tfjob.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,38 @@ func NewTFJobWithCleanupJobDelay(chief, worker, ps int, ttl *int32) *tfv1beta2.T
return tfJob
}

func NewTFJobWithActiveDeadlineSeconds(chief, worker, ps int, ads *int64) *tfv1beta2.TFJob {
if chief == 1 {
tfJob := NewTFJobWithChief(worker, ps)
tfJob.Spec.ActiveDeadlineSeconds = ads
policy := common.CleanPodPolicyAll
tfJob.Spec.CleanPodPolicy = &policy
return tfJob
}
tfJob := NewTFJob(worker, ps)
tfJob.Spec.ActiveDeadlineSeconds = ads
policy := common.CleanPodPolicyAll
tfJob.Spec.CleanPodPolicy = &policy
return tfJob
}

func NewTFJobWithBackoffLimit(chief, worker, ps int, backoffLimit *int32) *tfv1beta2.TFJob {
if chief == 1 {
tfJob := NewTFJobWithChief(worker, ps)
tfJob.Spec.BackoffLimit = backoffLimit
tfJob.Spec.TFReplicaSpecs["Worker"].RestartPolicy = "OnFailure"
policy := common.CleanPodPolicyAll
tfJob.Spec.CleanPodPolicy = &policy
return tfJob
}
tfJob := NewTFJob(worker, ps)
tfJob.Spec.BackoffLimit = backoffLimit
tfJob.Spec.TFReplicaSpecs["Worker"].RestartPolicy = "OnFailure"
policy := common.CleanPodPolicyAll
tfJob.Spec.CleanPodPolicy = &policy
return tfJob
}

func NewTFJobWithChief(worker, ps int) *tfv1beta2.TFJob {
tfJob := NewTFJob(worker, ps)
tfJob.Spec.TFReplicaSpecs[tfv1beta2.TFReplicaTypeChief] = &common.ReplicaSpec{
Expand Down
129 changes: 120 additions & 9 deletions pkg/controller.v1beta2/tensorflow/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package tensorflow

import (
"fmt"
"strings"
"time"

kubebatchclient "github.com/kubernetes-sigs/kube-batch/pkg/client/clientset/versioned"
Expand All @@ -31,6 +32,7 @@ import (
"k8s.io/client-go/tools/cache"

"github.com/kubeflow/tf-operator/cmd/tf-operator.v1beta2/app/options"
common "github.com/kubeflow/tf-operator/pkg/apis/common/v1beta2"
tfv1beta2 "github.com/kubeflow/tf-operator/pkg/apis/tensorflow/v1beta2"
tfjobclientset "github.com/kubeflow/tf-operator/pkg/client/clientset/versioned"
tfjobscheme "github.com/kubeflow/tf-operator/pkg/client/clientset/versioned/scheme"
Expand All @@ -39,6 +41,7 @@ import (
tfjoblisters "github.com/kubeflow/tf-operator/pkg/client/listers/tensorflow/v1beta2"
"github.com/kubeflow/tf-operator/pkg/common/jobcontroller"
tflogger "github.com/kubeflow/tf-operator/pkg/logger"
"github.com/kubeflow/tf-operator/pkg/util/k8sutil"
"k8s.io/apimachinery/pkg/runtime/schema"
)

Expand Down Expand Up @@ -325,17 +328,14 @@ func (tc *TFController) syncTFJob(key string) (bool, error) {
return true, err
}

func getTotalReplicas(tfjob *tfv1beta2.TFJob) int32 {
tfjobReplicas := int32(0)
for _, r := range tfjob.Spec.TFReplicaSpecs {
tfjobReplicas += *r.Replicas
}
return tfjobReplicas
}

// reconcileTFJobs checks and updates replicas for each given TFReplicaSpec.
// It will requeue the tfjob in case of an error while creating/deleting pods/services.
func (tc *TFController) reconcileTFJobs(tfjob *tfv1beta2.TFJob) error {
tfjobKey, err := KeyFunc(tfjob)
if err != nil {
utilruntime.HandleError(fmt.Errorf("couldn't get key for tfjob object %#v: %v", tfjob, err))
return err
}
logger := tflogger.LoggerForJob(tfjob)
logger.Infof("Reconcile TFJobs %s", tfjob.Name)

Expand All @@ -353,8 +353,46 @@ func (tc *TFController) reconcileTFJobs(tfjob *tfv1beta2.TFJob) error {
return err
}

// retrieve the previous number of retry
previousRetry := tc.WorkQueue.NumRequeues(tfjobKey)

activePods := k8sutil.FilterActivePods(pods)
active := int32(len(activePods))
_, failed := getSucceededAndFailedCount(pods)
totalReplicas := getTotalReplicas(tfjob)
prevReplicasFailedNum := getTotalFailedReplicas(tfjob)

tfJobExceedsLimit := false
var failureMessage string
var exceedsBackoffLimit bool = false
var pastBackoffLimit bool = false

if tfjob.Spec.BackoffLimit != nil {
jobHasNewFailure := failed > prevReplicasFailedNum
// new failures happen when status does not reflect the failures and active
// is different than parallelism, otherwise the previous controller loop
// failed updating status so even if we pick up failure it is not a new one
exceedsBackoffLimit = jobHasNewFailure && (active != totalReplicas) &&
(int32(previousRetry)+1 > *tfjob.Spec.BackoffLimit)

pastBackoffLimit, err = tc.pastBackoffLimit(tfjob, pods)
if err != nil {
return err
}
}

if exceedsBackoffLimit || pastBackoffLimit {
// check if the number of pod restart exceeds backoff (for restart OnFailure only)
// OR if the number of failed jobs increased since the last syncJob
tfJobExceedsLimit = true
failureMessage = fmt.Sprintf("TFJob %s has failed because it has reached the specified backoff limit", tfjob.Name)
} else if tc.pastActiveDeadline(tfjob) {
failureMessage = fmt.Sprintf("TFJob %s has failed because it was active longer than specified deadline", tfjob.Name)
tfJobExceedsLimit = true
}

// If the TFJob is terminated, delete all pods and services.
if isSucceeded(tfjob.Status) || isFailed(tfjob.Status) {
if isSucceeded(tfjob.Status) || isFailed(tfjob.Status) || tfJobExceedsLimit {
if err := tc.deletePodsAndServices(tfjob, pods); err != nil {
return err
}
Expand All @@ -374,6 +412,19 @@ func (tc *TFController) reconcileTFJobs(tfjob *tfv1beta2.TFJob) error {
}
}

if tfJobExceedsLimit {
tc.Recorder.Event(tfjob, v1.EventTypeNormal, tfJobFailedReason, failureMessage)
if tfjob.Status.CompletionTime == nil {
now := metav1.Now()
tfjob.Status.CompletionTime = &now
}
err := updateTFJobConditions(tfjob, common.JobFailed, tfJobFailedReason, failureMessage)
if err != nil {
tflogger.LoggerForJob(tfjob).Infof("Append tfjob condition error: %v", err)
return err
}
}

// At this point the pods may have been deleted, so if the job succeeded, we need to manually set the replica status.
// If any replicas are still Active, set their status to succeeded.
if isSucceeded(tfjob.Status) {
Expand Down Expand Up @@ -432,6 +483,66 @@ func (tc *TFController) satisfiedExpectations(tfjob *tfv1beta2.TFJob) bool {
return satisfied
}

// pastBackoffLimitOnFailure checks if container restartCounts sum exceeds BackoffLimit
// this method applies only to pods with restartPolicy == OnFailure or Always
func (tc *TFController) pastBackoffLimit(tfjob *tfv1beta2.TFJob, pods []*v1.Pod) (bool, error) {
if tfjob.Spec.BackoffLimit == nil {
return false, nil
}
logger := tflogger.LoggerForJob(tfjob)
result := int32(0)
for rtype, spec := range tfjob.Spec.TFReplicaSpecs {
if spec.RestartPolicy != common.RestartPolicyOnFailure && spec.RestartPolicy != common.RestartPolicyAlways {
logger.Warnf("The restart policy of replica %v of the job %v is not OnFailure or Always. Not counted in backoff limit.", rtype, tfjob.Name)
continue
}
// Convert TFReplicaType to lower string.
rt := strings.ToLower(string(rtype))
pods, err := tc.FilterPodsForReplicaType(pods, rt)
if err != nil {
return false, err
}
for i := range pods {
po := pods[i]
if po.Status.Phase != v1.PodRunning {
continue
}
for j := range po.Status.InitContainerStatuses {
stat := po.Status.InitContainerStatuses[j]
result += stat.RestartCount
}
for j := range po.Status.ContainerStatuses {
stat := po.Status.ContainerStatuses[j]
result += stat.RestartCount
}
}
}

if *tfjob.Spec.BackoffLimit == 0 {
return result > 0, nil
}
return result >= *tfjob.Spec.BackoffLimit, nil
}

// pastActiveDeadline checks if job has ActiveDeadlineSeconds field set and if it is exceeded.
func (tc *TFController) pastActiveDeadline(tfjob *tfv1beta2.TFJob) bool {
if tfjob.Spec.ActiveDeadlineSeconds == nil || tfjob.Status.StartTime == nil {
return false
}
now := metav1.Now()
start := tfjob.Status.StartTime.Time
duration := now.Time.Sub(start)
allowedDuration := time.Duration(*tfjob.Spec.ActiveDeadlineSeconds) * time.Second
return duration >= allowedDuration
}

// getSucceededAndFailedCount returns no of succeeded and failed pods running a job
func getSucceededAndFailedCount(pods []*v1.Pod) (succeeded, failed int32) {
succeeded = int32(k8sutil.FilterPods(pods, v1.PodSucceeded))
failed = int32(k8sutil.FilterPods(pods, v1.PodFailed))
return
}

func (tc *TFController) GetJobFromInformerCache(namespace, name string) (metav1.Object, error) {
return tc.getTFJobFromName(namespace, name)
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/controller.v1beta2/tensorflow/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -259,8 +259,8 @@ func TestNormalPath(t *testing.T) {
}

podIndexer := kubeInformerFactory.Core().V1().Pods().Informer().GetIndexer()
testutil.SetPodsStatuses(podIndexer, tfJob, testutil.LabelWorker, tc.pendingWorkerPods, tc.activeWorkerPods, tc.succeededWorkerPods, tc.failedWorkerPods, t)
testutil.SetPodsStatuses(podIndexer, tfJob, testutil.LabelPS, tc.pendingPSPods, tc.activePSPods, tc.succeededPSPods, tc.failedPSPods, t)
testutil.SetPodsStatuses(podIndexer, tfJob, testutil.LabelWorker, tc.pendingWorkerPods, tc.activeWorkerPods, tc.succeededWorkerPods, tc.failedWorkerPods, nil, t)
testutil.SetPodsStatuses(podIndexer, tfJob, testutil.LabelPS, tc.pendingPSPods, tc.activePSPods, tc.succeededPSPods, tc.failedPSPods, nil, t)

serviceIndexer := kubeInformerFactory.Core().V1().Services().Informer().GetIndexer()
testutil.SetServices(serviceIndexer, tfJob, testutil.LabelWorker, tc.activeWorkerServices, t)
Expand Down
45 changes: 45 additions & 0 deletions pkg/controller.v1beta2/tensorflow/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,8 +106,37 @@ func (tc *TFController) updateTFJob(old, cur interface{}) {
if err != nil {
return
}
curTFJob, err := tfJobFromUnstructured(cur)
if err != nil {
return
}

// never return error
key, err := KeyFunc(curTFJob)
if err != nil {
return
}

log.Infof("Updating tfjob: %s", oldTFJob.Name)
tc.enqueueTFJob(cur)

// check if need to add a new rsync for ActiveDeadlineSeconds
if curTFJob.Status.StartTime != nil {
curTFJobADS := curTFJob.Spec.ActiveDeadlineSeconds
if curTFJobADS == nil {
return
}
oldTFJobADS := oldTFJob.Spec.ActiveDeadlineSeconds
if oldTFJobADS == nil || *oldTFJobADS != *curTFJobADS {
now := metav1.Now()
start := curTFJob.Status.StartTime.Time
passed := now.Time.Sub(start)
total := time.Duration(*curTFJobADS) * time.Second
// AddAfter will handle total < passed
tc.WorkQueue.AddAfter(key, total-passed)
log.Infof("job ActiveDeadlineSeconds updated, will rsync after %d seconds", total-passed)
}
}
}

func (tc *TFController) deletePodsAndServices(tfJob *tfv1beta2.TFJob, pods []*v1.Pod) error {
Expand Down Expand Up @@ -164,3 +193,19 @@ func (tc *TFController) cleanupTFJob(tfJob *tfv1beta2.TFJob) error {
func (tc *TFController) deleteTFJob(tfJob *tfv1beta2.TFJob) error {
return tc.tfJobClientSet.KubeflowV1beta2().TFJobs(tfJob.Namespace).Delete(tfJob.Name, &metav1.DeleteOptions{})
}

func getTotalReplicas(tfjob *tfv1beta2.TFJob) int32 {
tfjobReplicas := int32(0)
for _, r := range tfjob.Spec.TFReplicaSpecs {
tfjobReplicas += *r.Replicas
}
return tfjobReplicas
}

func getTotalFailedReplicas(tfjob *tfv1beta2.TFJob) int32 {
totalFailedReplicas := int32(0)
for rtype := range tfjob.Status.ReplicaStatuses {
totalFailedReplicas += tfjob.Status.ReplicaStatuses[rtype].Failed
}
return totalFailedReplicas
}
Loading

0 comments on commit aa322c7

Please sign in to comment.