Skip to content

Commit

Permalink
Merge remote-tracking branch 'flyteplugins/bug/maptask-log-index' int…
Browse files Browse the repository at this point in the history
…o move-to-monorepo/flyteplugins/bug/maptask-log-index
  • Loading branch information
eapolinario committed Oct 3, 2023
2 parents d9586b0 + 1d91eaa commit 950452f
Show file tree
Hide file tree
Showing 5 changed files with 18 additions and 12 deletions.
2 changes: 1 addition & 1 deletion flyteplugins/go/tasks/plugins/array/awsbatch/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ func (e Executor) Handle(ctx context.Context, tCtx core.TaskExecutionContext) (c
if p == arrayCore.PhaseStart && nextPhase != arrayCore.PhaseStart {
// if transitioning from PhaseStart to another phase then cache lookups have completed
externalResources, err = arrayCore.InitializeExternalResources(ctx, tCtx, pluginState.State,
func(tCtx core.TaskExecutionContext, childIndex int) string {
func(tCtx core.TaskExecutionContext, executionIndex, originalIndex int) string {
// subTaskIDs for the the aws_batch are generated based on the job ID, therefore
// to initialize we default to an empty string which will be updated later.
return ""
Expand Down
4 changes: 2 additions & 2 deletions flyteplugins/go/tasks/plugins/array/core/metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import (
// initial state of the subtask. This involves labeling all cached subtasks as successful with a
// cache hit and initializing others to undefined state.
func InitializeExternalResources(ctx context.Context, tCtx core.TaskExecutionContext, state *State,
generateSubTaskID func(core.TaskExecutionContext, int) string) ([]*core.ExternalResource, error) {
generateSubTaskID func(core.TaskExecutionContext, int, int) string) ([]*core.ExternalResource, error) {
externalResources := make([]*core.ExternalResource, state.GetOriginalArraySize())

taskTemplate, err := tCtx.TaskReader().Read(ctx)
Expand Down Expand Up @@ -51,7 +51,7 @@ func InitializeExternalResources(ctx context.Context, tCtx core.TaskExecutionCon
cachedSubTaskCount++
}

subTaskID := generateSubTaskID(tCtx, childIndex)
subTaskID := generateSubTaskID(tCtx, childIndex, i)
externalResources[i] = &core.ExternalResource{
ExternalID: subTaskID,
CacheStatus: cacheStatus,
Expand Down
2 changes: 1 addition & 1 deletion flyteplugins/go/tasks/plugins/array/core/metadata_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ func TestInitializeExternalResources(t *testing.T) {
}

externalResources, err := InitializeExternalResources(ctx, tCtx, &state,
func(_ core.TaskExecutionContext, i int) string {
func(_ core.TaskExecutionContext, i, j int) string {
return ""
},
)
Expand Down
4 changes: 2 additions & 2 deletions flyteplugins/go/tasks/plugins/array/k8s/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,8 +94,8 @@ func (e Executor) Handle(ctx context.Context, tCtx core.TaskExecutionContext) (c
// job configuration has then been validated and all of the metadata necessary to report subtask
// status (ie. cache hit / etc) is available.
externalResources, err = arrayCore.InitializeExternalResources(ctx, tCtx, nextState,
func(tCtx core.TaskExecutionContext, childIndex int) string {
subTaskExecutionID := NewSubTaskExecutionID(tCtx.TaskExecutionMetadata().GetTaskExecutionID(), childIndex, 0)
func(tCtx core.TaskExecutionContext, executionIndex, originalIndex int) string {
subTaskExecutionID := NewSubTaskExecutionID(tCtx.TaskExecutionMetadata().GetTaskExecutionID(), executionIndex, originalIndex, 0)
return subTaskExecutionID.GetGeneratedName()
},
)
Expand Down
18 changes: 12 additions & 6 deletions flyteplugins/go/tasks/plugins/array/k8s/subtask_exec_context.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ func (s SubTaskExecutionContext) PluginStateReader() pluginsCore.PluginStateRead
func NewSubTaskExecutionContext(ctx context.Context, tCtx pluginsCore.TaskExecutionContext, taskTemplate *core.TaskTemplate,
executionIndex, originalIndex int, retryAttempt uint64, systemFailures uint64) (SubTaskExecutionContext, error) {

subTaskExecutionMetadata, err := NewSubTaskExecutionMetadata(tCtx.TaskExecutionMetadata(), taskTemplate, executionIndex, retryAttempt, systemFailures)
subTaskExecutionMetadata, err := NewSubTaskExecutionMetadata(tCtx.TaskExecutionMetadata(), taskTemplate, executionIndex, originalIndex, retryAttempt, systemFailures)
if err != nil {
return SubTaskExecutionContext{}, err
}
Expand Down Expand Up @@ -146,6 +146,7 @@ func (s SubTaskReader) Read(ctx context.Context) (*core.TaskTemplate, error) {
type SubTaskExecutionID struct {
pluginsCore.TaskExecutionID
executionIndex int
originalIndex int
parentName string
subtaskRetryAttempt uint64
taskRetryAttempt uint32
Expand All @@ -170,11 +171,15 @@ func (s SubTaskExecutionID) GetLogSuffix() string {
// Append the retry attempt and executionIndex so that log names coincide with pod names per
// https://github.com/flyteorg/flyteplugins/pull/186#discussion_r666569825. To maintain
// backwards compatibility we append the subtaskRetryAttempt if it is not 0.

// To ensure UI indicies match, we append the originalIndex rather than the executionIndex.

Check failure on line 175 in flyteplugins/go/tasks/plugins/array/k8s/subtask_exec_context.go

View workflow job for this annotation

GitHub Actions / Check for spelling errors

indicies ==> indices
// This does mean that Pod names and log links may differ when the original and execution
// indicies differ (ex. cache hits).

Check failure on line 177 in flyteplugins/go/tasks/plugins/array/k8s/subtask_exec_context.go

View workflow job for this annotation

GitHub Actions / Check for spelling errors

indicies ==> indices
if s.subtaskRetryAttempt == 0 {
return fmt.Sprintf(" #%d-%d", s.taskRetryAttempt, s.executionIndex)
return fmt.Sprintf(" #%d-%d", s.taskRetryAttempt, s.originalIndex)
}

return fmt.Sprintf(" #%d-%d-%d", s.taskRetryAttempt, s.executionIndex, s.subtaskRetryAttempt)
return fmt.Sprintf(" #%d-%d-%d", s.taskRetryAttempt, s.originalIndex, s.subtaskRetryAttempt)

Check warning on line 182 in flyteplugins/go/tasks/plugins/array/k8s/subtask_exec_context.go

View check run for this annotation

Codecov / codecov/patch

flyteplugins/go/tasks/plugins/array/k8s/subtask_exec_context.go#L182

Added line #L182 was not covered by tests
}

var logTemplateRegexes = struct {
Expand Down Expand Up @@ -210,10 +215,11 @@ func (s SubTaskExecutionID) TemplateVarsByScheme() *tasklog.TemplateVarsByScheme
}

// NewSubtaskExecutionID constructs a SubTaskExecutionID using the provided parameters
func NewSubTaskExecutionID(taskExecutionID pluginsCore.TaskExecutionID, executionIndex int, retryAttempt uint64) SubTaskExecutionID {
func NewSubTaskExecutionID(taskExecutionID pluginsCore.TaskExecutionID, executionIndex, originalIndex int, retryAttempt uint64) SubTaskExecutionID {
return SubTaskExecutionID{
taskExecutionID,
executionIndex,
originalIndex,
taskExecutionID.GetGeneratedName(),
retryAttempt,
taskExecutionID.GetID().RetryAttempt,
Expand Down Expand Up @@ -251,7 +257,7 @@ func (s SubTaskExecutionMetadata) IsInterruptible() bool {

// NewSubtaskExecutionMetadata constructs a SubTaskExecutionMetadata using the provided parameters
func NewSubTaskExecutionMetadata(taskExecutionMetadata pluginsCore.TaskExecutionMetadata, taskTemplate *core.TaskTemplate,
executionIndex int, retryAttempt uint64, systemFailures uint64) (SubTaskExecutionMetadata, error) {
executionIndex, originalIndex int, retryAttempt uint64, systemFailures uint64) (SubTaskExecutionMetadata, error) {

var err error
secretsMap := make(map[string]string)
Expand All @@ -267,7 +273,7 @@ func NewSubTaskExecutionMetadata(taskExecutionMetadata pluginsCore.TaskExecution
}
}

subTaskExecutionID := NewSubTaskExecutionID(taskExecutionMetadata.GetTaskExecutionID(), executionIndex, retryAttempt)
subTaskExecutionID := NewSubTaskExecutionID(taskExecutionMetadata.GetTaskExecutionID(), executionIndex, originalIndex, retryAttempt)
interruptible := taskExecutionMetadata.IsInterruptible() && uint32(systemFailures) < taskExecutionMetadata.GetInterruptibleFailureThreshold()
return SubTaskExecutionMetadata{
taskExecutionMetadata,
Expand Down

0 comments on commit 950452f

Please sign in to comment.