diff --git a/flyteplugins/go/tasks/pluginmachinery/flytek8s/config/config.go b/flyteplugins/go/tasks/pluginmachinery/flytek8s/config/config.go index 4c32d290f8..4e777ee154 100644 --- a/flyteplugins/go/tasks/pluginmachinery/flytek8s/config/config.go +++ b/flyteplugins/go/tasks/pluginmachinery/flytek8s/config/config.go @@ -49,6 +49,9 @@ var ( CreateContainerErrorGracePeriod: config2.Duration{ Duration: time.Minute * 3, }, + CreateContainerConfigErrorGracePeriod: config2.Duration{ + Duration: time.Minute * 0, + }, ImagePullBackoffGracePeriod: config2.Duration{ Duration: time.Minute * 3, }, @@ -136,6 +139,11 @@ type K8sPluginConfig struct { // one, and the corresponding task marked as failed CreateContainerErrorGracePeriod config2.Duration `json:"create-container-error-grace-period" pflag:"-,Time to wait for transient CreateContainerError errors to be resolved."` + // Time to wait for transient CreateContainerConfigError errors to be resolved. If the + // error persists past this grace period, it will be inferred to be a permanent error. + // The pod will be deleted, and the corresponding task marked as failed. + CreateContainerConfigErrorGracePeriod config2.Duration `json:"create-container-config-error-grace-period" pflag:"-,Time to wait for transient CreateContainerConfigError errors to be resolved."` + // Time to wait for transient ImagePullBackoff errors to be resolved. If the // error persists past this grace period, it will be inferred to be a permanent // one, and the corresponding task marked as failed diff --git a/flyteplugins/go/tasks/pluginmachinery/flytek8s/pod_helper.go b/flyteplugins/go/tasks/pluginmachinery/flytek8s/pod_helper.go index c326a3ddd1..d8cc4dcc7f 100644 --- a/flyteplugins/go/tasks/pluginmachinery/flytek8s/pod_helper.go +++ b/flyteplugins/go/tasks/pluginmachinery/flytek8s/pod_helper.go @@ -660,8 +660,9 @@ func DemystifyPending(status v1.PodStatus) (pluginsCore.PhaseInfo, error) { // approximation of the elapsed time since the last // transition. t := c.LastTransitionTime.Time - if time.Since(t) >= config.GetK8sPluginConfig().CreateContainerErrorGracePeriod.Duration { - return pluginsCore.PhaseInfoFailure(finalReason, finalMessage, &pluginsCore.TaskInfo{ + gracePeriod := config.GetK8sPluginConfig().CreateContainerErrorGracePeriod.Duration + if time.Since(t) >= gracePeriod { + return pluginsCore.PhaseInfoFailure(finalReason, GetMessageAfterGracePeriod(finalMessage, gracePeriod), &pluginsCore.TaskInfo{ OccurredAt: &t, }), nil } @@ -672,7 +673,22 @@ func DemystifyPending(status v1.PodStatus) (pluginsCore.PhaseInfo, error) { &pluginsCore.TaskInfo{OccurredAt: &t}, ), nil - case "CreateContainerConfigError", "InvalidImageName": + case "CreateContainerConfigError": + t := c.LastTransitionTime.Time + gracePeriod := config.GetK8sPluginConfig().CreateContainerConfigErrorGracePeriod.Duration + if time.Since(t) >= gracePeriod { + return pluginsCore.PhaseInfoFailure(finalReason, GetMessageAfterGracePeriod(finalMessage, gracePeriod), &pluginsCore.TaskInfo{ + OccurredAt: &t, + }), nil + } + return pluginsCore.PhaseInfoInitializing( + t, + pluginsCore.DefaultPhaseVersion, + fmt.Sprintf("[%s]: %s", finalReason, finalMessage), + &pluginsCore.TaskInfo{OccurredAt: &t}, + ), nil + + case "InvalidImageName": t := c.LastTransitionTime.Time return pluginsCore.PhaseInfoFailure(finalReason, finalMessage, &pluginsCore.TaskInfo{ OccurredAt: &t, @@ -680,8 +696,9 @@ func DemystifyPending(status v1.PodStatus) (pluginsCore.PhaseInfo, error) { case "ImagePullBackOff": t := c.LastTransitionTime.Time - if time.Since(t) >= config.GetK8sPluginConfig().ImagePullBackoffGracePeriod.Duration { - return pluginsCore.PhaseInfoRetryableFailureWithCleanup(finalReason, finalMessage, &pluginsCore.TaskInfo{ + gracePeriod := config.GetK8sPluginConfig().ImagePullBackoffGracePeriod.Duration + if time.Since(t) >= gracePeriod { + return pluginsCore.PhaseInfoRetryableFailureWithCleanup(finalReason, GetMessageAfterGracePeriod(finalMessage, gracePeriod), &pluginsCore.TaskInfo{ OccurredAt: &t, }), nil } @@ -715,6 +732,10 @@ func DemystifyPending(status v1.PodStatus) (pluginsCore.PhaseInfo, error) { return pluginsCore.PhaseInfoQueued(time.Now(), pluginsCore.DefaultPhaseVersion, "Scheduling"), nil } +func GetMessageAfterGracePeriod(message string, gracePeriod time.Duration) string { + return fmt.Sprintf("Grace period [%s] exceeded|%s", gracePeriod, message) +} + func DemystifySuccess(status v1.PodStatus, info pluginsCore.TaskInfo) (pluginsCore.PhaseInfo, error) { for _, status := range append( append(status.InitContainerStatuses, status.ContainerStatuses...), status.EphemeralContainerStatuses...) { diff --git a/flyteplugins/go/tasks/pluginmachinery/flytek8s/pod_helper_test.go b/flyteplugins/go/tasks/pluginmachinery/flytek8s/pod_helper_test.go index ec2f8f89da..a98bfe6b4f 100644 --- a/flyteplugins/go/tasks/pluginmachinery/flytek8s/pod_helper_test.go +++ b/flyteplugins/go/tasks/pluginmachinery/flytek8s/pod_helper_test.go @@ -1175,6 +1175,9 @@ func TestDemystifyPending(t *testing.T) { CreateContainerErrorGracePeriod: config1.Duration{ Duration: time.Minute * 3, }, + CreateContainerConfigErrorGracePeriod: config1.Duration{ + Duration: time.Minute * 4, + }, ImagePullBackoffGracePeriod: config1.Duration{ Duration: time.Minute * 3, }, @@ -1398,19 +1401,40 @@ func TestDemystifyPending(t *testing.T) { assert.Equal(t, pluginsCore.PhaseRetryableFailure, taskStatus.Phase()) }) - t.Run("CreateContainerConfigError", func(t *testing.T) { - s.ContainerStatuses = []v1.ContainerStatus{ + t.Run("CreateContainerConfigErrorWithinGracePeriod", func(t *testing.T) { + s2 := *s.DeepCopy() + s2.Conditions[0].LastTransitionTime = metav1.Now() + s2.ContainerStatuses = []v1.ContainerStatus{ { Ready: false, State: v1.ContainerState{ Waiting: &v1.ContainerStateWaiting{ Reason: "CreateContainerConfigError", - Message: "this an error", + Message: "this is a transient error", }, }, }, } - taskStatus, err := DemystifyPending(s) + taskStatus, err := DemystifyPending(s2) + assert.NoError(t, err) + assert.Equal(t, pluginsCore.PhaseInitializing, taskStatus.Phase()) + }) + + t.Run("CreateContainerConfigErrorOutsideGracePeriod", func(t *testing.T) { + s2 := *s.DeepCopy() + s2.Conditions[0].LastTransitionTime.Time = metav1.Now().Add(-config.GetK8sPluginConfig().CreateContainerConfigErrorGracePeriod.Duration) + s2.ContainerStatuses = []v1.ContainerStatus{ + { + Ready: false, + State: v1.ContainerState{ + Waiting: &v1.ContainerStateWaiting{ + Reason: "CreateContainerConfigError", + Message: "this a permanent error", + }, + }, + }, + } + taskStatus, err := DemystifyPending(s2) assert.NoError(t, err) assert.Equal(t, pluginsCore.PhasePermanentFailure, taskStatus.Phase()) }) @@ -1583,7 +1607,7 @@ func TestDemystifyPending_testcases(t *testing.T) { errCode string message string }{ - {"ImagePullBackOff", "imagepull-failurepod.json", false, "ContainersNotReady|ImagePullBackOff", "containers with unready status: [fdf98e4ed2b524dc3bf7-get-flyte-id-task-0]|Back-off pulling image \"image\""}, + {"ImagePullBackOff", "imagepull-failurepod.json", false, "ContainersNotReady|ImagePullBackOff", "Grace period [3m0s] exceeded|containers with unready status: [fdf98e4ed2b524dc3bf7-get-flyte-id-task-0]|Back-off pulling image \"image\""}, } for _, tt := range tests {