diff --git a/flyteplugins/go/tasks/plugins/array/awsbatch/executor.go b/flyteplugins/go/tasks/plugins/array/awsbatch/executor.go index a1ca632b27..68401add20 100644 --- a/flyteplugins/go/tasks/plugins/array/awsbatch/executor.go +++ b/flyteplugins/go/tasks/plugins/array/awsbatch/executor.go @@ -111,7 +111,7 @@ func (e Executor) Handle(ctx context.Context, tCtx core.TaskExecutionContext) (c if p == arrayCore.PhaseStart && nextPhase != arrayCore.PhaseStart { // if transitioning from PhaseStart to another phase then cache lookups have completed externalResources, err = arrayCore.InitializeExternalResources(ctx, tCtx, pluginState.State, - func(tCtx core.TaskExecutionContext, childIndex int) string { + func(tCtx core.TaskExecutionContext, executionIndex, originalIndex int) string { // subTaskIDs for the the aws_batch are generated based on the job ID, therefore // to initialize we default to an empty string which will be updated later. return "" diff --git a/flyteplugins/go/tasks/plugins/array/core/metadata.go b/flyteplugins/go/tasks/plugins/array/core/metadata.go index ed3bc52a3e..88016b0bd5 100644 --- a/flyteplugins/go/tasks/plugins/array/core/metadata.go +++ b/flyteplugins/go/tasks/plugins/array/core/metadata.go @@ -13,7 +13,7 @@ import ( // initial state of the subtask. This involves labeling all cached subtasks as successful with a // cache hit and initializing others to undefined state. func InitializeExternalResources(ctx context.Context, tCtx core.TaskExecutionContext, state *State, - generateSubTaskID func(core.TaskExecutionContext, int) string) ([]*core.ExternalResource, error) { + generateSubTaskID func(core.TaskExecutionContext, int, int) string) ([]*core.ExternalResource, error) { externalResources := make([]*core.ExternalResource, state.GetOriginalArraySize()) taskTemplate, err := tCtx.TaskReader().Read(ctx) @@ -51,7 +51,7 @@ func InitializeExternalResources(ctx context.Context, tCtx core.TaskExecutionCon cachedSubTaskCount++ } - subTaskID := generateSubTaskID(tCtx, childIndex) + subTaskID := generateSubTaskID(tCtx, childIndex, i) externalResources[i] = &core.ExternalResource{ ExternalID: subTaskID, CacheStatus: cacheStatus, diff --git a/flyteplugins/go/tasks/plugins/array/core/metadata_test.go b/flyteplugins/go/tasks/plugins/array/core/metadata_test.go index c10ef9d134..19aef9998d 100644 --- a/flyteplugins/go/tasks/plugins/array/core/metadata_test.go +++ b/flyteplugins/go/tasks/plugins/array/core/metadata_test.go @@ -48,7 +48,7 @@ func TestInitializeExternalResources(t *testing.T) { } externalResources, err := InitializeExternalResources(ctx, tCtx, &state, - func(_ core.TaskExecutionContext, i int) string { + func(_ core.TaskExecutionContext, i, j int) string { return "" }, ) diff --git a/flyteplugins/go/tasks/plugins/array/k8s/executor.go b/flyteplugins/go/tasks/plugins/array/k8s/executor.go index 172d0b65fd..50a17a0693 100644 --- a/flyteplugins/go/tasks/plugins/array/k8s/executor.go +++ b/flyteplugins/go/tasks/plugins/array/k8s/executor.go @@ -94,8 +94,8 @@ func (e Executor) Handle(ctx context.Context, tCtx core.TaskExecutionContext) (c // job configuration has then been validated and all of the metadata necessary to report subtask // status (ie. cache hit / etc) is available. externalResources, err = arrayCore.InitializeExternalResources(ctx, tCtx, nextState, - func(tCtx core.TaskExecutionContext, childIndex int) string { - subTaskExecutionID := NewSubTaskExecutionID(tCtx.TaskExecutionMetadata().GetTaskExecutionID(), childIndex, 0) + func(tCtx core.TaskExecutionContext, executionIndex, originalIndex int) string { + subTaskExecutionID := NewSubTaskExecutionID(tCtx.TaskExecutionMetadata().GetTaskExecutionID(), executionIndex, originalIndex, 0) return subTaskExecutionID.GetGeneratedName() }, ) diff --git a/flyteplugins/go/tasks/plugins/array/k8s/subtask_exec_context.go b/flyteplugins/go/tasks/plugins/array/k8s/subtask_exec_context.go index 5b606eadd7..821d4764af 100644 --- a/flyteplugins/go/tasks/plugins/array/k8s/subtask_exec_context.go +++ b/flyteplugins/go/tasks/plugins/array/k8s/subtask_exec_context.go @@ -73,7 +73,7 @@ func (s SubTaskExecutionContext) PluginStateReader() pluginsCore.PluginStateRead func NewSubTaskExecutionContext(ctx context.Context, tCtx pluginsCore.TaskExecutionContext, taskTemplate *core.TaskTemplate, executionIndex, originalIndex int, retryAttempt uint64, systemFailures uint64) (SubTaskExecutionContext, error) { - subTaskExecutionMetadata, err := NewSubTaskExecutionMetadata(tCtx.TaskExecutionMetadata(), taskTemplate, executionIndex, retryAttempt, systemFailures) + subTaskExecutionMetadata, err := NewSubTaskExecutionMetadata(tCtx.TaskExecutionMetadata(), taskTemplate, executionIndex, originalIndex, retryAttempt, systemFailures) if err != nil { return SubTaskExecutionContext{}, err } @@ -146,6 +146,7 @@ func (s SubTaskReader) Read(ctx context.Context) (*core.TaskTemplate, error) { type SubTaskExecutionID struct { pluginsCore.TaskExecutionID executionIndex int + originalIndex int parentName string subtaskRetryAttempt uint64 taskRetryAttempt uint32 @@ -170,11 +171,15 @@ func (s SubTaskExecutionID) GetLogSuffix() string { // Append the retry attempt and executionIndex so that log names coincide with pod names per // https://github.com/flyteorg/flyteplugins/pull/186#discussion_r666569825. To maintain // backwards compatibility we append the subtaskRetryAttempt if it is not 0. + + // To ensure UI indicies match, we append the originalIndex rather than the executionIndex. + // This does mean that Pod names and log links may differ when the original and execution + // indicies differ (ex. cache hits). if s.subtaskRetryAttempt == 0 { - return fmt.Sprintf(" #%d-%d", s.taskRetryAttempt, s.executionIndex) + return fmt.Sprintf(" #%d-%d", s.taskRetryAttempt, s.originalIndex) } - return fmt.Sprintf(" #%d-%d-%d", s.taskRetryAttempt, s.executionIndex, s.subtaskRetryAttempt) + return fmt.Sprintf(" #%d-%d-%d", s.taskRetryAttempt, s.originalIndex, s.subtaskRetryAttempt) } var logTemplateRegexes = struct { @@ -210,10 +215,11 @@ func (s SubTaskExecutionID) TemplateVarsByScheme() *tasklog.TemplateVarsByScheme } // NewSubtaskExecutionID constructs a SubTaskExecutionID using the provided parameters -func NewSubTaskExecutionID(taskExecutionID pluginsCore.TaskExecutionID, executionIndex int, retryAttempt uint64) SubTaskExecutionID { +func NewSubTaskExecutionID(taskExecutionID pluginsCore.TaskExecutionID, executionIndex, originalIndex int, retryAttempt uint64) SubTaskExecutionID { return SubTaskExecutionID{ taskExecutionID, executionIndex, + originalIndex, taskExecutionID.GetGeneratedName(), retryAttempt, taskExecutionID.GetID().RetryAttempt, @@ -251,7 +257,7 @@ func (s SubTaskExecutionMetadata) IsInterruptible() bool { // NewSubtaskExecutionMetadata constructs a SubTaskExecutionMetadata using the provided parameters func NewSubTaskExecutionMetadata(taskExecutionMetadata pluginsCore.TaskExecutionMetadata, taskTemplate *core.TaskTemplate, - executionIndex int, retryAttempt uint64, systemFailures uint64) (SubTaskExecutionMetadata, error) { + executionIndex, originalIndex int, retryAttempt uint64, systemFailures uint64) (SubTaskExecutionMetadata, error) { var err error secretsMap := make(map[string]string) @@ -267,7 +273,7 @@ func NewSubTaskExecutionMetadata(taskExecutionMetadata pluginsCore.TaskExecution } } - subTaskExecutionID := NewSubTaskExecutionID(taskExecutionMetadata.GetTaskExecutionID(), executionIndex, retryAttempt) + subTaskExecutionID := NewSubTaskExecutionID(taskExecutionMetadata.GetTaskExecutionID(), executionIndex, originalIndex, retryAttempt) interruptible := taskExecutionMetadata.IsInterruptible() && uint32(systemFailures) < taskExecutionMetadata.GetInterruptibleFailureThreshold() return SubTaskExecutionMetadata{ taskExecutionMetadata,