Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Moving from flyteplugins - updated maptask log links to use original index #4134

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion flyteplugins/go/tasks/plugins/array/awsbatch/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ func (e Executor) Handle(ctx context.Context, tCtx core.TaskExecutionContext) (c
if p == arrayCore.PhaseStart && nextPhase != arrayCore.PhaseStart {
// if transitioning from PhaseStart to another phase then cache lookups have completed
externalResources, err = arrayCore.InitializeExternalResources(ctx, tCtx, pluginState.State,
func(tCtx core.TaskExecutionContext, childIndex int) string {
func(tCtx core.TaskExecutionContext, executionIndex, originalIndex int) string {
// subTaskIDs for the the aws_batch are generated based on the job ID, therefore
// to initialize we default to an empty string which will be updated later.
return ""
Expand Down
4 changes: 2 additions & 2 deletions flyteplugins/go/tasks/plugins/array/core/metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import (
// initial state of the subtask. This involves labeling all cached subtasks as successful with a
// cache hit and initializing others to undefined state.
func InitializeExternalResources(ctx context.Context, tCtx core.TaskExecutionContext, state *State,
generateSubTaskID func(core.TaskExecutionContext, int) string) ([]*core.ExternalResource, error) {
generateSubTaskID func(core.TaskExecutionContext, int, int) string) ([]*core.ExternalResource, error) {
externalResources := make([]*core.ExternalResource, state.GetOriginalArraySize())

taskTemplate, err := tCtx.TaskReader().Read(ctx)
Expand Down Expand Up @@ -51,7 +51,7 @@ func InitializeExternalResources(ctx context.Context, tCtx core.TaskExecutionCon
cachedSubTaskCount++
}

subTaskID := generateSubTaskID(tCtx, childIndex)
subTaskID := generateSubTaskID(tCtx, childIndex, i)
externalResources[i] = &core.ExternalResource{
ExternalID: subTaskID,
CacheStatus: cacheStatus,
Expand Down
2 changes: 1 addition & 1 deletion flyteplugins/go/tasks/plugins/array/core/metadata_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ func TestInitializeExternalResources(t *testing.T) {
}

externalResources, err := InitializeExternalResources(ctx, tCtx, &state,
func(_ core.TaskExecutionContext, i int) string {
func(_ core.TaskExecutionContext, i, j int) string {
return ""
},
)
Expand Down
4 changes: 2 additions & 2 deletions flyteplugins/go/tasks/plugins/array/k8s/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,8 +94,8 @@ func (e Executor) Handle(ctx context.Context, tCtx core.TaskExecutionContext) (c
// job configuration has then been validated and all of the metadata necessary to report subtask
// status (ie. cache hit / etc) is available.
externalResources, err = arrayCore.InitializeExternalResources(ctx, tCtx, nextState,
func(tCtx core.TaskExecutionContext, childIndex int) string {
subTaskExecutionID := NewSubTaskExecutionID(tCtx.TaskExecutionMetadata().GetTaskExecutionID(), childIndex, 0)
func(tCtx core.TaskExecutionContext, executionIndex, originalIndex int) string {
subTaskExecutionID := NewSubTaskExecutionID(tCtx.TaskExecutionMetadata().GetTaskExecutionID(), executionIndex, originalIndex, 0)
return subTaskExecutionID.GetGeneratedName()
},
)
Expand Down
18 changes: 12 additions & 6 deletions flyteplugins/go/tasks/plugins/array/k8s/subtask_exec_context.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@
func NewSubTaskExecutionContext(ctx context.Context, tCtx pluginsCore.TaskExecutionContext, taskTemplate *core.TaskTemplate,
executionIndex, originalIndex int, retryAttempt uint64, systemFailures uint64) (SubTaskExecutionContext, error) {

subTaskExecutionMetadata, err := NewSubTaskExecutionMetadata(tCtx.TaskExecutionMetadata(), taskTemplate, executionIndex, retryAttempt, systemFailures)
subTaskExecutionMetadata, err := NewSubTaskExecutionMetadata(tCtx.TaskExecutionMetadata(), taskTemplate, executionIndex, originalIndex, retryAttempt, systemFailures)
if err != nil {
return SubTaskExecutionContext{}, err
}
Expand Down Expand Up @@ -146,6 +146,7 @@
type SubTaskExecutionID struct {
pluginsCore.TaskExecutionID
executionIndex int
originalIndex int
parentName string
subtaskRetryAttempt uint64
taskRetryAttempt uint32
Expand All @@ -170,11 +171,15 @@
// Append the retry attempt and executionIndex so that log names coincide with pod names per
// https://github.com/flyteorg/flyteplugins/pull/186#discussion_r666569825. To maintain
// backwards compatibility we append the subtaskRetryAttempt if it is not 0.

// To ensure UI indicies match, we append the originalIndex rather than the executionIndex.

Check failure on line 175 in flyteplugins/go/tasks/plugins/array/k8s/subtask_exec_context.go

View workflow job for this annotation

GitHub Actions / Check for spelling errors

indicies ==> indices
// This does mean that Pod names and log links may differ when the original and execution
// indicies differ (ex. cache hits).

Check failure on line 177 in flyteplugins/go/tasks/plugins/array/k8s/subtask_exec_context.go

View workflow job for this annotation

GitHub Actions / Check for spelling errors

indicies ==> indices
if s.subtaskRetryAttempt == 0 {
return fmt.Sprintf(" #%d-%d", s.taskRetryAttempt, s.executionIndex)
return fmt.Sprintf(" #%d-%d", s.taskRetryAttempt, s.originalIndex)
}

return fmt.Sprintf(" #%d-%d-%d", s.taskRetryAttempt, s.executionIndex, s.subtaskRetryAttempt)
return fmt.Sprintf(" #%d-%d-%d", s.taskRetryAttempt, s.originalIndex, s.subtaskRetryAttempt)

Check warning on line 182 in flyteplugins/go/tasks/plugins/array/k8s/subtask_exec_context.go

View check run for this annotation

Codecov / codecov/patch

flyteplugins/go/tasks/plugins/array/k8s/subtask_exec_context.go#L182

Added line #L182 was not covered by tests
}

var logTemplateRegexes = struct {
Expand Down Expand Up @@ -210,10 +215,11 @@
}

// NewSubtaskExecutionID constructs a SubTaskExecutionID using the provided parameters
func NewSubTaskExecutionID(taskExecutionID pluginsCore.TaskExecutionID, executionIndex int, retryAttempt uint64) SubTaskExecutionID {
func NewSubTaskExecutionID(taskExecutionID pluginsCore.TaskExecutionID, executionIndex, originalIndex int, retryAttempt uint64) SubTaskExecutionID {
return SubTaskExecutionID{
taskExecutionID,
executionIndex,
originalIndex,
taskExecutionID.GetGeneratedName(),
retryAttempt,
taskExecutionID.GetID().RetryAttempt,
Expand Down Expand Up @@ -251,7 +257,7 @@

// NewSubtaskExecutionMetadata constructs a SubTaskExecutionMetadata using the provided parameters
func NewSubTaskExecutionMetadata(taskExecutionMetadata pluginsCore.TaskExecutionMetadata, taskTemplate *core.TaskTemplate,
executionIndex int, retryAttempt uint64, systemFailures uint64) (SubTaskExecutionMetadata, error) {
executionIndex, originalIndex int, retryAttempt uint64, systemFailures uint64) (SubTaskExecutionMetadata, error) {

var err error
secretsMap := make(map[string]string)
Expand All @@ -267,7 +273,7 @@
}
}

subTaskExecutionID := NewSubTaskExecutionID(taskExecutionMetadata.GetTaskExecutionID(), executionIndex, retryAttempt)
subTaskExecutionID := NewSubTaskExecutionID(taskExecutionMetadata.GetTaskExecutionID(), executionIndex, originalIndex, retryAttempt)
interruptible := taskExecutionMetadata.IsInterruptible() && int32(systemFailures) < taskExecutionMetadata.GetInterruptibleFailureThreshold()
return SubTaskExecutionMetadata{
taskExecutionMetadata,
Expand Down
Loading