Skip to content

Commit

Permalink
Feature/add cleanup non recoverable pod statuses (#4607)
Browse files Browse the repository at this point in the history
* clean up non-recoverable pods

Signed-off-by: Paul Dittamo <[email protected]>

* use non-exported phaseInfoFailed function to appease linter

Signed-off-by: Paul Dittamo <[email protected]>

---------

Signed-off-by: Paul Dittamo <[email protected]>
Co-authored-by: Dan Rammer <[email protected]>
  • Loading branch information
pvditt and hamersaw authored Dec 19, 2023
1 parent b562536 commit 6c6656c
Show file tree
Hide file tree
Showing 3 changed files with 20 additions and 9 deletions.
14 changes: 9 additions & 5 deletions flyteplugins/go/tasks/pluginmachinery/core/phase.go
Original file line number Diff line number Diff line change
Expand Up @@ -261,31 +261,35 @@ func PhaseInfoSuccess(info *TaskInfo) PhaseInfo {
}

func PhaseInfoSystemFailure(code, reason string, info *TaskInfo) PhaseInfo {
return PhaseInfoFailed(PhasePermanentFailure, &core.ExecutionError{Code: code, Message: reason, Kind: core.ExecutionError_SYSTEM}, info)
return phaseInfoFailed(PhasePermanentFailure, &core.ExecutionError{Code: code, Message: reason, Kind: core.ExecutionError_SYSTEM}, info, false)
}

func PhaseInfoSystemFailureWithCleanup(code, reason string, info *TaskInfo) PhaseInfo {
return phaseInfoFailed(PhasePermanentFailure, &core.ExecutionError{Code: code, Message: reason, Kind: core.ExecutionError_USER}, info, true)
return phaseInfoFailed(PhasePermanentFailure, &core.ExecutionError{Code: code, Message: reason, Kind: core.ExecutionError_SYSTEM}, info, true)
}

func PhaseInfoFailure(code, reason string, info *TaskInfo) PhaseInfo {
return PhaseInfoFailed(PhasePermanentFailure, &core.ExecutionError{Code: code, Message: reason, Kind: core.ExecutionError_USER}, info)
return phaseInfoFailed(PhasePermanentFailure, &core.ExecutionError{Code: code, Message: reason, Kind: core.ExecutionError_USER}, info, false)
}

func PhaseInfoFailureWithCleanup(code, reason string, info *TaskInfo) PhaseInfo {
return phaseInfoFailed(PhasePermanentFailure, &core.ExecutionError{Code: code, Message: reason, Kind: core.ExecutionError_USER}, info, true)
}

func PhaseInfoRetryableFailure(code, reason string, info *TaskInfo) PhaseInfo {
return PhaseInfoFailed(PhaseRetryableFailure, &core.ExecutionError{Code: code, Message: reason, Kind: core.ExecutionError_USER}, info)
return phaseInfoFailed(PhaseRetryableFailure, &core.ExecutionError{Code: code, Message: reason, Kind: core.ExecutionError_USER}, info, false)
}

func PhaseInfoRetryableFailureWithCleanup(code, reason string, info *TaskInfo) PhaseInfo {
return phaseInfoFailed(PhaseRetryableFailure, &core.ExecutionError{Code: code, Message: reason, Kind: core.ExecutionError_USER}, info, true)
}

func PhaseInfoSystemRetryableFailure(code, reason string, info *TaskInfo) PhaseInfo {
return PhaseInfoFailed(PhaseRetryableFailure, &core.ExecutionError{Code: code, Message: reason, Kind: core.ExecutionError_SYSTEM}, info)
return phaseInfoFailed(PhaseRetryableFailure, &core.ExecutionError{Code: code, Message: reason, Kind: core.ExecutionError_SYSTEM}, info, false)
}

func PhaseInfoSystemRetryableFailureWithCleanup(code, reason string, info *TaskInfo) PhaseInfo {
return phaseInfoFailed(PhaseRetryableFailure, &core.ExecutionError{Code: code, Message: reason, Kind: core.ExecutionError_SYSTEM}, info, true)
}

// Creates a new PhaseInfo with phase set to PhaseWaitingForCache
Expand Down
8 changes: 4 additions & 4 deletions flyteplugins/go/tasks/pluginmachinery/flytek8s/pod_helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -686,7 +686,7 @@ func demystifyPendingHelper(status v1.PodStatus) (pluginsCore.PhaseInfo, time.Ti

gracePeriod := config.GetK8sPluginConfig().CreateContainerErrorGracePeriod.Duration
if time.Since(t) >= gracePeriod {
return pluginsCore.PhaseInfoFailure(finalReason, GetMessageAfterGracePeriod(finalMessage, gracePeriod), &pluginsCore.TaskInfo{
return pluginsCore.PhaseInfoFailureWithCleanup(finalReason, GetMessageAfterGracePeriod(finalMessage, gracePeriod), &pluginsCore.TaskInfo{
OccurredAt: &t,
}), t
}
Expand All @@ -700,7 +700,7 @@ func demystifyPendingHelper(status v1.PodStatus) (pluginsCore.PhaseInfo, time.Ti
case "CreateContainerConfigError":
gracePeriod := config.GetK8sPluginConfig().CreateContainerConfigErrorGracePeriod.Duration
if time.Since(t) >= gracePeriod {
return pluginsCore.PhaseInfoFailure(finalReason, GetMessageAfterGracePeriod(finalMessage, gracePeriod), &pluginsCore.TaskInfo{
return pluginsCore.PhaseInfoFailureWithCleanup(finalReason, GetMessageAfterGracePeriod(finalMessage, gracePeriod), &pluginsCore.TaskInfo{
OccurredAt: &t,
}), t
}
Expand All @@ -712,7 +712,7 @@ func demystifyPendingHelper(status v1.PodStatus) (pluginsCore.PhaseInfo, time.Ti
), t

case "InvalidImageName":
return pluginsCore.PhaseInfoFailure(finalReason, finalMessage, &pluginsCore.TaskInfo{
return pluginsCore.PhaseInfoFailureWithCleanup(finalReason, finalMessage, &pluginsCore.TaskInfo{
OccurredAt: &t,
}), t

Expand All @@ -737,7 +737,7 @@ func demystifyPendingHelper(status v1.PodStatus) (pluginsCore.PhaseInfo, time.Ti
// by K8s and we get elusive 'pod not found' errors
// So be default if the container is not waiting with the PodInitializing/ContainerCreating
// reasons, then we will assume a failure reason, and fail instantly
return pluginsCore.PhaseInfoSystemRetryableFailure(finalReason, finalMessage, &pluginsCore.TaskInfo{
return pluginsCore.PhaseInfoSystemRetryableFailureWithCleanup(finalReason, finalMessage, &pluginsCore.TaskInfo{
OccurredAt: &t,
}), t
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1351,6 +1351,7 @@ func TestDemystifyPending(t *testing.T) {
taskStatus, err := DemystifyPending(s2)
assert.NoError(t, err)
assert.Equal(t, pluginsCore.PhaseRetryableFailure, taskStatus.Phase())
assert.True(t, taskStatus.CleanupOnFailure())
})

t.Run("InvalidImageName", func(t *testing.T) {
Expand All @@ -1368,6 +1369,7 @@ func TestDemystifyPending(t *testing.T) {
taskStatus, err := DemystifyPending(s)
assert.NoError(t, err)
assert.Equal(t, pluginsCore.PhasePermanentFailure, taskStatus.Phase())
assert.True(t, taskStatus.CleanupOnFailure())
})

t.Run("RegistryUnavailable", func(t *testing.T) {
Expand All @@ -1385,6 +1387,7 @@ func TestDemystifyPending(t *testing.T) {
taskStatus, err := DemystifyPending(s)
assert.NoError(t, err)
assert.Equal(t, pluginsCore.PhaseRetryableFailure, taskStatus.Phase())
assert.True(t, taskStatus.CleanupOnFailure())
})

t.Run("RandomError", func(t *testing.T) {
Expand All @@ -1402,6 +1405,7 @@ func TestDemystifyPending(t *testing.T) {
taskStatus, err := DemystifyPending(s)
assert.NoError(t, err)
assert.Equal(t, pluginsCore.PhaseRetryableFailure, taskStatus.Phase())
assert.True(t, taskStatus.CleanupOnFailure())
})

t.Run("CreateContainerConfigErrorWithinGracePeriod", func(t *testing.T) {
Expand Down Expand Up @@ -1440,6 +1444,7 @@ func TestDemystifyPending(t *testing.T) {
taskStatus, err := DemystifyPending(s2)
assert.NoError(t, err)
assert.Equal(t, pluginsCore.PhasePermanentFailure, taskStatus.Phase())
assert.True(t, taskStatus.CleanupOnFailure())
})

t.Run("CreateContainerErrorWithinGracePeriod", func(t *testing.T) {
Expand Down Expand Up @@ -1478,6 +1483,7 @@ func TestDemystifyPending(t *testing.T) {
taskStatus, err := DemystifyPending(s2)
assert.NoError(t, err)
assert.Equal(t, pluginsCore.PhasePermanentFailure, taskStatus.Phase())
assert.True(t, taskStatus.CleanupOnFailure())
})
}

Expand Down Expand Up @@ -1510,6 +1516,7 @@ func TestDemystifyPendingTimeout(t *testing.T) {
assert.NoError(t, err)
assert.Equal(t, pluginsCore.PhaseRetryableFailure, taskStatus.Phase())
assert.Equal(t, "PodPendingTimeout", taskStatus.Err().Code)
assert.True(t, taskStatus.CleanupOnFailure())
})
}

Expand Down

0 comments on commit 6c6656c

Please sign in to comment.