Skip to content

Commit

Permalink
Feature/add pod pending timeout config (#4590)
Browse files Browse the repository at this point in the history
* add PodPendingTimeout config to fail pods stuck in pending

Signed-off-by: Paul Dittamo <[email protected]>

* update unit tests LastTransitionTime for pod pending timeout

Signed-off-by: Paul Dittamo <[email protected]>

* set default podpending timeout 0 as to be ignored

Signed-off-by: Paul Dittamo <[email protected]>

* maintain pod condition info during pod pending timeout

Signed-off-by: Paul Dittamo <[email protected]>

* Remove un-needed config set for unit test

Signed-off-by: Paul Dittamo <[email protected]>

* cleanup

Signed-off-by: Paul Dittamo <[email protected]>

* updat unit test

Signed-off-by: Paul Dittamo <[email protected]>

* don't export helper function

Signed-off-by: Paul Dittamo <[email protected]>

* remove always nil error return param

Signed-off-by: Paul Dittamo <[email protected]>

* cleanup

Signed-off-by: Paul Dittamo <[email protected]>

---------

Signed-off-by: Paul Dittamo <[email protected]>
Co-authored-by: Dan Rammer <[email protected]>
  • Loading branch information
pvditt and hamersaw authored Dec 13, 2023
1 parent 78efbf8 commit a5ea1c8
Show file tree
Hide file tree
Showing 3 changed files with 80 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,9 @@ var (
ImagePullBackoffGracePeriod: config2.Duration{
Duration: time.Minute * 3,
},
PodPendingTimeout: config2.Duration{
Duration: 0,
},
GpuDeviceNodeLabel: "k8s.amazonaws.com/accelerator",
GpuPartitionSizeNodeLabel: "k8s.amazonaws.com/gpu-partition-size",
GpuResourceName: ResourceNvidiaGPU,
Expand Down Expand Up @@ -149,6 +152,11 @@ type K8sPluginConfig struct {
// one, and the corresponding task marked as failed
ImagePullBackoffGracePeriod config2.Duration `json:"image-pull-backoff-grace-period" pflag:"-,Time to wait for transient ImagePullBackoff errors to be resolved."`

// Time to wait while pod is in pending phase. If the pod is stuck in
// pending phase past this timeout, it will be inferred to be a permanent
// issue, and the corresponding task marked as failed
PodPendingTimeout config2.Duration `json:"pod-pending-timeout" pflag:"-,Time to wait while pod is stuck in pending."`

// The node label that specifies the attached GPU device.
GpuDeviceNodeLabel string `json:"gpu-device-node-label" pflag:"-,The node label that specifies the attached GPU device."`

Expand Down
55 changes: 37 additions & 18 deletions flyteplugins/go/tasks/pluginmachinery/flytek8s/pod_helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -572,14 +572,38 @@ func BuildIdentityPod() *v1.Pod {
// and hence input gates. We should not allow bad requests that Request for large number of resource through.
// In the case it makes through, we will fail after timeout
func DemystifyPending(status v1.PodStatus) (pluginsCore.PhaseInfo, error) {
phaseInfo, t := demystifyPendingHelper(status)

if phaseInfo.Phase().IsTerminal() {
return phaseInfo, nil
}

podPendingTimeout := config.GetK8sPluginConfig().PodPendingTimeout.Duration
if podPendingTimeout > 0 && time.Since(t) >= podPendingTimeout {
return pluginsCore.PhaseInfoRetryableFailureWithCleanup("PodPendingTimeout", phaseInfo.Reason(), &pluginsCore.TaskInfo{
OccurredAt: &t,
}), nil
}

if phaseInfo.Phase() != pluginsCore.PhaseUndefined {
return phaseInfo, nil
}

return pluginsCore.PhaseInfoQueued(time.Now(), pluginsCore.DefaultPhaseVersion, "Scheduling"), nil
}

func demystifyPendingHelper(status v1.PodStatus) (pluginsCore.PhaseInfo, time.Time) {
// Search over the difference conditions in the status object. Note that the 'Pending' this function is
// demystifying is the 'phase' of the pod status. This is different than the PodReady condition type also used below
phaseInfo := pluginsCore.PhaseInfoUndefined
t := time.Now()
for _, c := range status.Conditions {
t = c.LastTransitionTime.Time
switch c.Type {
case v1.PodScheduled:
if c.Status == v1.ConditionFalse {
// Waiting to be scheduled. This usually refers to inability to acquire resources.
return pluginsCore.PhaseInfoQueued(c.LastTransitionTime.Time, pluginsCore.DefaultPhaseVersion, fmt.Sprintf("%s:%s", c.Reason, c.Message)), nil
return pluginsCore.PhaseInfoQueued(t, pluginsCore.DefaultPhaseVersion, fmt.Sprintf("%s:%s", c.Reason, c.Message)), t
}

case v1.PodReasonUnschedulable:
Expand All @@ -592,7 +616,7 @@ func DemystifyPending(status v1.PodStatus) (pluginsCore.PhaseInfo, error) {
// reason: Unschedulable
// status: "False"
// type: PodScheduled
return pluginsCore.PhaseInfoQueued(c.LastTransitionTime.Time, pluginsCore.DefaultPhaseVersion, fmt.Sprintf("%s:%s", c.Reason, c.Message)), nil
return pluginsCore.PhaseInfoQueued(t, pluginsCore.DefaultPhaseVersion, fmt.Sprintf("%s:%s", c.Reason, c.Message)), t

case v1.PodReady:
if c.Status == v1.ConditionFalse {
Expand Down Expand Up @@ -637,7 +661,7 @@ func DemystifyPending(status v1.PodStatus) (pluginsCore.PhaseInfo, error) {
// ErrImagePull -> Transitionary phase to ImagePullBackOff
// ContainerCreating -> Image is being downloaded
// PodInitializing -> Init containers are running
return pluginsCore.PhaseInfoInitializing(c.LastTransitionTime.Time, pluginsCore.DefaultPhaseVersion, fmt.Sprintf("[%s]: %s", finalReason, finalMessage), &pluginsCore.TaskInfo{OccurredAt: &c.LastTransitionTime.Time}), nil
return pluginsCore.PhaseInfoInitializing(t, pluginsCore.DefaultPhaseVersion, fmt.Sprintf("[%s]: %s", finalReason, finalMessage), &pluginsCore.TaskInfo{OccurredAt: &t}), t

case "CreateContainerError":
// This may consist of:
Expand All @@ -659,77 +683,72 @@ func DemystifyPending(status v1.PodStatus) (pluginsCore.PhaseInfo, error) {
// synced, and therefore, only provides an
// approximation of the elapsed time since the last
// transition.
t := c.LastTransitionTime.Time

gracePeriod := config.GetK8sPluginConfig().CreateContainerErrorGracePeriod.Duration
if time.Since(t) >= gracePeriod {
return pluginsCore.PhaseInfoFailure(finalReason, GetMessageAfterGracePeriod(finalMessage, gracePeriod), &pluginsCore.TaskInfo{
OccurredAt: &t,
}), nil
}), t
}
return pluginsCore.PhaseInfoInitializing(
t,
pluginsCore.DefaultPhaseVersion,
fmt.Sprintf("[%s]: %s", finalReason, finalMessage),
&pluginsCore.TaskInfo{OccurredAt: &t},
), nil
), t

case "CreateContainerConfigError":
t := c.LastTransitionTime.Time
gracePeriod := config.GetK8sPluginConfig().CreateContainerConfigErrorGracePeriod.Duration
if time.Since(t) >= gracePeriod {
return pluginsCore.PhaseInfoFailure(finalReason, GetMessageAfterGracePeriod(finalMessage, gracePeriod), &pluginsCore.TaskInfo{
OccurredAt: &t,
}), nil
}), t
}
return pluginsCore.PhaseInfoInitializing(
t,
pluginsCore.DefaultPhaseVersion,
fmt.Sprintf("[%s]: %s", finalReason, finalMessage),
&pluginsCore.TaskInfo{OccurredAt: &t},
), nil
), t

case "InvalidImageName":
t := c.LastTransitionTime.Time
return pluginsCore.PhaseInfoFailure(finalReason, finalMessage, &pluginsCore.TaskInfo{
OccurredAt: &t,
}), nil
}), t

case "ImagePullBackOff":
t := c.LastTransitionTime.Time
gracePeriod := config.GetK8sPluginConfig().ImagePullBackoffGracePeriod.Duration
if time.Since(t) >= gracePeriod {
return pluginsCore.PhaseInfoRetryableFailureWithCleanup(finalReason, GetMessageAfterGracePeriod(finalMessage, gracePeriod), &pluginsCore.TaskInfo{
OccurredAt: &t,
}), nil
}), t
}

return pluginsCore.PhaseInfoInitializing(
t,
pluginsCore.DefaultPhaseVersion,
fmt.Sprintf("[%s]: %s", finalReason, finalMessage),
&pluginsCore.TaskInfo{OccurredAt: &t},
), nil
), t

default:
// Since we are not checking for all error states, we may end up perpetually
// in the queued state returned at the bottom of this function, until the Pod is reaped
// by K8s and we get elusive 'pod not found' errors
// So be default if the container is not waiting with the PodInitializing/ContainerCreating
// reasons, then we will assume a failure reason, and fail instantly
t := c.LastTransitionTime.Time
return pluginsCore.PhaseInfoSystemRetryableFailure(finalReason, finalMessage, &pluginsCore.TaskInfo{
OccurredAt: &t,
}), nil
}), t
}

}
}
}
}
}
}

return pluginsCore.PhaseInfoQueued(time.Now(), pluginsCore.DefaultPhaseVersion, "Scheduling"), nil
return phaseInfo, t
}

func GetMessageAfterGracePeriod(message string, gracePeriod time.Duration) string {
Expand Down
35 changes: 35 additions & 0 deletions flyteplugins/go/tasks/pluginmachinery/flytek8s/pod_helper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1181,6 +1181,9 @@ func TestDemystifyPending(t *testing.T) {
ImagePullBackoffGracePeriod: config1.Duration{
Duration: time.Minute * 3,
},
PodPendingTimeout: config1.Duration{
Duration: 0,
},
}))

t.Run("PodNotScheduled", func(t *testing.T) {
Expand Down Expand Up @@ -1478,6 +1481,38 @@ func TestDemystifyPending(t *testing.T) {
})
}

func TestDemystifyPendingTimeout(t *testing.T) {
assert.NoError(t, config.SetK8sPluginConfig(&config.K8sPluginConfig{
CreateContainerErrorGracePeriod: config1.Duration{
Duration: time.Minute * 3,
},
ImagePullBackoffGracePeriod: config1.Duration{
Duration: time.Minute * 3,
},
PodPendingTimeout: config1.Duration{
Duration: 10,
},
}))

s := v1.PodStatus{
Phase: v1.PodPending,
Conditions: []v1.PodCondition{
{
Type: v1.PodScheduled,
Status: v1.ConditionFalse,
},
},
}
s.Conditions[0].LastTransitionTime.Time = metav1.Now().Add(-config.GetK8sPluginConfig().PodPendingTimeout.Duration)

t.Run("PodPendingExceedsTimeout", func(t *testing.T) {
taskStatus, err := DemystifyPending(s)
assert.NoError(t, err)
assert.Equal(t, pluginsCore.PhaseRetryableFailure, taskStatus.Phase())
assert.Equal(t, "PodPendingTimeout", taskStatus.Err().Code)
})
}

func TestDemystifySuccess(t *testing.T) {
t.Run("OOMKilled", func(t *testing.T) {
phaseInfo, err := DemystifySuccess(v1.PodStatus{
Expand Down

0 comments on commit a5ea1c8

Please sign in to comment.