Skip to content

Commit

Permalink
Move log link filtering (by phase) from propeller to admin
Browse files Browse the repository at this point in the history
Signed-off-by: Fabio Graetz <[email protected]>
  • Loading branch information
fg91 committed Mar 18, 2024
1 parent 66c6b29 commit f41d673
Show file tree
Hide file tree
Showing 2 changed files with 21 additions and 35 deletions.
22 changes: 21 additions & 1 deletion flyteadmin/pkg/repositories/transformers/task_execution.go
Original file line number Diff line number Diff line change
Expand Up @@ -372,6 +372,22 @@ func mergeMetadata(existing, latest *event.TaskExecutionMetadata) *event.TaskExe
return existing
}

func filterLogsByPhase(logs []*core.TaskLog, phase core.TaskExecution_Phase) []*core.TaskLog {
filteredLogs := make([]*core.TaskLog, 0, len(logs))

for _, l := range logs {
if common.IsTaskExecutionTerminal(phase) && l.HideOnceFinished {
continue

Check warning on line 380 in flyteadmin/pkg/repositories/transformers/task_execution.go

View check run for this annotation

Codecov / codecov/patch

flyteadmin/pkg/repositories/transformers/task_execution.go#L380

Added line #L380 was not covered by tests
}
if phase == core.TaskExecution_QUEUED && !l.ShowWhilePending {
continue

Check warning on line 383 in flyteadmin/pkg/repositories/transformers/task_execution.go

View check run for this annotation

Codecov / codecov/patch

flyteadmin/pkg/repositories/transformers/task_execution.go#L383

Added line #L383 was not covered by tests
}
filteredLogs = append(filteredLogs, l)

}
return filteredLogs
}

func UpdateTaskExecutionModel(ctx context.Context, request *admin.TaskExecutionEventRequest, taskExecutionModel *models.TaskExecution,
inlineEventDataPolicy interfaces.InlineEventDataPolicy, storageClient *storage.DataStore) error {
err := handleTaskExecutionInputs(ctx, taskExecutionModel, request, storageClient)
Expand All @@ -393,7 +409,11 @@ func UpdateTaskExecutionModel(ctx context.Context, request *admin.TaskExecutionE
reportedAt = request.Event.OccurredAt
}
taskExecutionClosure.UpdatedAt = reportedAt
taskExecutionClosure.Logs = mergeLogs(taskExecutionClosure.Logs, request.Event.Logs)

mergedLogs := mergeLogs(taskExecutionClosure.Logs, request.Event.Logs)
filteredLogs := filterLogsByPhase(mergedLogs, request.Event.Phase)
taskExecutionClosure.Logs = filteredLogs

if len(request.Event.Reasons) > 0 {
for _, reason := range request.Event.Reasons {
taskExecutionClosure.Reasons = append(
Expand Down
34 changes: 0 additions & 34 deletions flyteplugins/go/tasks/pluginmachinery/core/phase.go
Original file line number Diff line number Diff line change
Expand Up @@ -227,16 +227,6 @@ func PhaseInfoQueued(t time.Time, version uint32, reason string) PhaseInfo {
}

func PhaseInfoQueuedWithTaskInfo(t time.Time, version uint32, reason string, info *TaskInfo) PhaseInfo {
if info != nil && info.Logs != nil {
logs := info.Logs
// Delete the logs for which ShowWhilePending is not true
info.Logs = make([]*core.TaskLog, 0, len(logs))
for _, l := range logs {
if l.ShowWhilePending {
info.Logs = append(info.Logs, l)
}
}
}
pi := phaseInfo(PhaseQueued, version, nil, info, false)
pi.reason = reason
return pi
Expand All @@ -249,7 +239,6 @@ func PhaseInfoInitializing(t time.Time, version uint32, reason string, info *Tas
}

func phaseInfoFailed(p Phase, err *core.ExecutionError, info *TaskInfo, cleanupOnFailure bool) PhaseInfo {
HideLogsOnceFinished(info)
if err == nil {
err = &core.ExecutionError{
Code: "Unknown",
Expand All @@ -267,30 +256,7 @@ func PhaseInfoRunning(version uint32, info *TaskInfo) PhaseInfo {
return phaseInfo(PhaseRunning, version, nil, info, false)
}

func HideLogsOnceFinished(info *TaskInfo) {
if info != nil && info.Logs != nil {
logs := info.Logs
// Delete the logs for which hideOnceFinished is true
info.Logs = make([]*core.TaskLog, 0, len(logs))
for _, l := range logs {
if !l.HideOnceFinished {
info.Logs = append(info.Logs, l)
}
}
}
}

func PhaseInfoSuccess(info *TaskInfo) PhaseInfo {
if info != nil && info.Logs != nil {
logs := info.Logs
// Delete the logs for which hideOnceFinished is true
info.Logs = make([]*core.TaskLog, 0, len(logs))
for _, l := range logs {
if !l.HideOnceFinished {
info.Logs = append(info.Logs, l)
}
}
}
return phaseInfo(PhaseSuccess, DefaultPhaseVersion, nil, info, false)
}

Expand Down

0 comments on commit f41d673

Please sign in to comment.