Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feat: Allow controlling in which task phases log links are shown #4726

Merged
merged 35 commits into from
Jun 17, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
35 commits
Select commit Hold shift + click to select a range
0ba3b4a
Add ShowWhilePending arg to TaskLog flyteidl message
fg91 Jan 14, 2024
02d1067
Allow showing specific logs already during queued phase
fg91 Jan 14, 2024
666b209
Use core.PhaseInfoQueuedWithTaskInfo instead of core.PhaseInfoQueued …
fg91 Jan 14, 2024
9324f10
Bump phase version in pytorch plugin
fg91 Jan 14, 2024
52c1a61
Fix nil containerId in pending phase
fg91 Jan 14, 2024
de4de1a
Undo changes from rebase in ray.go
fg91 Jan 14, 2024
ab93bc5
Regenerate protos
Mar 6, 2024
703548a
Fix after rebasing
fg91 Mar 6, 2024
1873c3d
Add HideOnceFinished option to TaskLog proto message
Mar 6, 2024
8122e76
Hide certain logs once finished
fg91 Mar 8, 2024
2c742a5
Move log link filtering (by phase) from propeller to admin
fg91 Mar 18, 2024
039d922
Move bumping of plugin state phase version into function
fg91 Apr 6, 2024
c73e59f
Move helper function which bumps phase version to k8s plugin package
fg91 Apr 6, 2024
0bb5fc2
Consistently bump phase version when reason changes in pod, pytorch, …
fg91 Apr 6, 2024
79d51f5
Make controlling lifetime of log links work with dask plugin
fg91 Apr 7, 2024
0c8bdc3
Make controlling lifetime of log links work with ray plugin
fg91 Apr 7, 2024
d4102ac
Make controlling lifetime of log links work with spark plugin
fg91 Apr 7, 2024
66db326
Don't return pluginsCore.PhaseInfoUndefined but already known phaseIn…
fg91 Apr 7, 2024
720f053
Remove now obsolete logic to check whether dask job is queued
fg91 Apr 7, 2024
24c8a80
Adapt docstring explaining why we treat queued and init phase the sam…
fg91 Apr 7, 2024
382ed0a
Make propeller tests pass
fg91 Apr 7, 2024
105b766
Make pluginmachinery/flytek8s tests pass
fg91 Apr 7, 2024
383c043
Fix dask, pytorch, tensorflow, and mpi tests
fg91 Apr 8, 2024
0aa3d81
Make log link filtering by phase work for map tasks
fg91 Apr 8, 2024
459c70d
Add tests for filtering log links when updating task execution
fg91 Apr 11, 2024
e494723
Show All user logs while queueing phase as before
fg91 Apr 17, 2024
5dc8525
Fix spark tests
fg91 Apr 17, 2024
edf30fc
Fix after rebase
fg91 May 23, 2024
2e54f69
Fix flyteidl go.mod
fg91 May 23, 2024
dfc44b4
Fix mpi test
fg91 May 23, 2024
f8c68c5
Add tests for PR #4726 (#5200)
fg91 Jun 5, 2024
7bfc8bc
Update flyteplugins/go/tasks/logs/logging_utils.go
fg91 Jun 5, 2024
683a6a5
Update go.mod after flyteidl make generate
fg91 Jun 16, 2024
f557e37
Restrict numpy version in single binary e2e tests
fg91 Jun 16, 2024
3bfba55
Merge branch 'master' into fg91/feat/log-links-show-while-pending
eapolinario Jun 17, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 36 additions & 1 deletion flyteadmin/pkg/repositories/transformers/task_execution.go
Original file line number Diff line number Diff line change
Expand Up @@ -372,6 +372,31 @@ func mergeMetadata(existing, latest *event.TaskExecutionMetadata) *event.TaskExe
return existing
}

func filterExternalResourceLogsByPhase(externalResources []*event.ExternalResourceInfo, phase core.TaskExecution_Phase) {
for _, externalResource := range externalResources {
externalResource.Logs = filterLogsByPhase(externalResource.Logs, phase)
}
}

func filterLogsByPhase(logs []*core.TaskLog, phase core.TaskExecution_Phase) []*core.TaskLog {
filteredLogs := make([]*core.TaskLog, 0, len(logs))

for _, l := range logs {
if common.IsTaskExecutionTerminal(phase) && l.HideOnceFinished {
continue
}
// Some plugins like e.g. Dask, Ray start with or very quickly transition to core.TaskExecution_INITIALIZING
// once the CR has been created even though the underlying pods are still pending. We thus treat queued and
// initializing the same here.
if (phase == core.TaskExecution_QUEUED || phase == core.TaskExecution_INITIALIZING) && !l.ShowWhilePending {
continue
}
filteredLogs = append(filteredLogs, l)

}
return filteredLogs
}

func UpdateTaskExecutionModel(ctx context.Context, request *admin.TaskExecutionEventRequest, taskExecutionModel *models.TaskExecution,
inlineEventDataPolicy interfaces.InlineEventDataPolicy, storageClient *storage.DataStore) error {
err := handleTaskExecutionInputs(ctx, taskExecutionModel, request, storageClient)
Expand All @@ -384,6 +409,7 @@ func UpdateTaskExecutionModel(ctx context.Context, request *admin.TaskExecutionE
return errors.NewFlyteAdminErrorf(codes.Internal,
"failed to unmarshal task execution closure with error: %+v", err)
}
isPhaseChange := taskExecutionModel.Phase != request.Event.Phase.String()
existingTaskPhase := taskExecutionModel.Phase
taskExecutionModel.Phase = request.Event.Phase.String()
taskExecutionModel.PhaseVersion = request.Event.PhaseVersion
Expand All @@ -393,7 +419,11 @@ func UpdateTaskExecutionModel(ctx context.Context, request *admin.TaskExecutionE
reportedAt = request.Event.OccurredAt
}
taskExecutionClosure.UpdatedAt = reportedAt
taskExecutionClosure.Logs = mergeLogs(taskExecutionClosure.Logs, request.Event.Logs)

Copy link
Member Author

@fg91 fg91 May 29, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

On a high level, the contract between flyteplugins/flytepropeller and flyteadmin changes the following way:

Before:

  • If a plugin wants a certain task log link to be shown from a certain phase onwards (almost exclusively the Running phase, only exception being the spark plugin), the plugin needs to send this log link in that phase.
    • Subsequently, the task log doesn't have to be sent again since flyteadmin aggregates the log links.

After:

  • If a plugin wants a certain task log link to be shown from a certain phase onwards, the plugin needs to send this log link at least in that phase.
    • If task links are sent in an earlier phase, they will be ignored by admin
    • Subsequently, the task log still doesn't have to be sent again since flyteadmin still aggregates the log links.
    • Plugins can mark log links to be deleted once the execution finished.

Copy link
Member Author

@fg91 fg91 May 29, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I did briefly consider whether a plugin should be able to send a log link which will e.g. be shown only from Running onwards only a single time already in a prior phase, e.g. Pending. This would mean that flyteadmin would have to track the still inactive log link and then make it active once the running phase is reached.

I personally think this is an overkill though since currently all plugins always send all links. I feel the contract "If you want a log link to be shown from a specific phase onwards, you need to send the log link at least in this phase" is reasonable.

mergedLogs := mergeLogs(taskExecutionClosure.Logs, request.Event.Logs)
filteredLogs := filterLogsByPhase(mergedLogs, request.Event.Phase)
taskExecutionClosure.Logs = filteredLogs

if len(request.Event.Reasons) > 0 {
for _, reason := range request.Event.Reasons {
taskExecutionClosure.Reasons = append(
Expand Down Expand Up @@ -437,6 +467,11 @@ func UpdateTaskExecutionModel(ctx context.Context, request *admin.TaskExecutionE
return errors.NewFlyteAdminErrorf(codes.Internal, "failed to merge task event custom_info with error: %v", err)
}
taskExecutionClosure.Metadata = mergeMetadata(taskExecutionClosure.Metadata, request.Event.Metadata)

if isPhaseChange && taskExecutionClosure.Metadata != nil && len(taskExecutionClosure.Metadata.ExternalResources) > 0 {
filterExternalResourceLogsByPhase(taskExecutionClosure.Metadata.ExternalResources, request.Event.Phase)
}

Copy link
Member Author

@fg91 fg91 Apr 8, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is required for map tasks which treat log links differently for performance/scalability reasons.

Master:

map_master

This branch:

map_branch

Note that the task execution view with the log links only becomes available in the running phase for map tasks, meaning that there is no use case for showing log links already in the pending phase. However, log links can be removed once finished.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note: array node map tasks replaced map tasks in flytekit 1.12.0 and this distinction is no longer true, i.e. array node map tasks log links no longer use the separate map tasks log links.

if request.Event.EventVersion > taskExecutionClosure.EventVersion {
taskExecutionClosure.EventVersion = request.Event.EventVersion
}
Expand Down
296 changes: 296 additions & 0 deletions flyteadmin/pkg/repositories/transformers/task_execution_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -652,6 +652,183 @@ func TestUpdateTaskExecutionModelRunningToFailed(t *testing.T) {

}

func TestUpdateTaskExecutionModelFilterLogLinks(t *testing.T) {
existingClosure := &admin.TaskExecutionClosure{
Phase: core.TaskExecution_QUEUED,
StartedAt: taskEventOccurredAtProto,
CreatedAt: taskEventOccurredAtProto,
UpdatedAt: taskEventOccurredAtProto,
Logs: []*core.TaskLog{},
Reason: "task submitted to k8s",
Reasons: []*admin.Reason{
{
OccurredAt: taskEventOccurredAtProto,
Message: "task submitted to k8s",
},
},
}

closureBytes, err := proto.Marshal(existingClosure)
assert.Nil(t, err)

existingTaskExecution := models.TaskExecution{
TaskExecutionKey: models.TaskExecutionKey{
TaskKey: models.TaskKey{
Project: sampleTaskID.Project,
Domain: sampleTaskID.Domain,
Name: sampleTaskID.Name,
Version: sampleTaskID.Version,
},
NodeExecutionKey: models.NodeExecutionKey{
NodeID: sampleNodeExecID.NodeId,
ExecutionKey: models.ExecutionKey{
Project: sampleNodeExecID.ExecutionId.Project,
Domain: sampleNodeExecID.ExecutionId.Domain,
Name: sampleNodeExecID.ExecutionId.Name,
},
},
RetryAttempt: &retryAttemptValue,
},
Phase: "TaskExecutionPhase_TASK_PHASE_QUEUED",
InputURI: "input uri",
Closure: closureBytes,
StartedAt: &taskEventOccurredAt,
TaskExecutionCreatedAt: &taskEventOccurredAt,
TaskExecutionUpdatedAt: &taskEventOccurredAt,
}

occuredAt := taskEventOccurredAt.Add(time.Minute)
occuredAtProto, err := ptypes.TimestampProto(occuredAt)
assert.Nil(t, err)

updatedEventRequest := &admin.TaskExecutionEventRequest{
Event: &event.TaskExecutionEvent{
TaskId: sampleTaskID,
ParentNodeExecutionId: sampleNodeExecID,
Phase: core.TaskExecution_QUEUED,
OccurredAt: occuredAtProto,
Logs: []*core.TaskLog{
{
Uri: "uri-show-pending",
ShowWhilePending: true,
},
{
Uri: "uri-default",
},
},
Reason: "task update",
},
}

err = UpdateTaskExecutionModel(context.TODO(), updatedEventRequest, &existingTaskExecution,
interfaces.InlineEventDataPolicyStoreInline, commonMocks.GetMockStorageClient())
assert.Nil(t, err)

updatedClosure := &admin.TaskExecutionClosure{}
err = proto.Unmarshal(existingTaskExecution.Closure, updatedClosure)
assert.Nil(t, err)

assert.Equal(t, updatedClosure.Logs, []*core.TaskLog{
{
Uri: "uri-show-pending",
ShowWhilePending: true,
},
},
)

}

func TestUpdateTaskExecutionModelFilterLogLinksArray(t *testing.T) {
existingClosure := &admin.TaskExecutionClosure{
Phase: core.TaskExecution_RUNNING,
StartedAt: taskEventOccurredAtProto,
CreatedAt: taskEventOccurredAtProto,
UpdatedAt: taskEventOccurredAtProto,
Logs: []*core.TaskLog{},
Reason: "task started",
Reasons: []*admin.Reason{
{
OccurredAt: taskEventOccurredAtProto,
Message: "task started",
},
},
Metadata: &event.TaskExecutionMetadata{
ExternalResources: []*event.ExternalResourceInfo{
{
Logs: []*core.TaskLog{
{
Uri: "uri-default",
},
{
Uri: "uri-hide-finished",
HideOnceFinished: true,
},
},
},
},
},
}

closureBytes, err := proto.Marshal(existingClosure)
assert.Nil(t, err)

existingTaskExecution := models.TaskExecution{
TaskExecutionKey: models.TaskExecutionKey{
TaskKey: models.TaskKey{
Project: sampleTaskID.Project,
Domain: sampleTaskID.Domain,
Name: sampleTaskID.Name,
Version: sampleTaskID.Version,
},
NodeExecutionKey: models.NodeExecutionKey{
NodeID: sampleNodeExecID.NodeId,
ExecutionKey: models.ExecutionKey{
Project: sampleNodeExecID.ExecutionId.Project,
Domain: sampleNodeExecID.ExecutionId.Domain,
Name: sampleNodeExecID.ExecutionId.Name,
},
},
RetryAttempt: &retryAttemptValue,
},
Phase: "TaskExecutionPhase_TASK_PHASE_RUNNING",
InputURI: "input uri",
Closure: closureBytes,
StartedAt: &taskEventOccurredAt,
TaskExecutionCreatedAt: &taskEventOccurredAt,
TaskExecutionUpdatedAt: &taskEventOccurredAt,
}

occuredAt := taskEventOccurredAt.Add(time.Minute)
occuredAtProto, err := ptypes.TimestampProto(occuredAt)
assert.Nil(t, err)

failedEventRequest := &admin.TaskExecutionEventRequest{
Event: &event.TaskExecutionEvent{
TaskId: sampleTaskID,
ParentNodeExecutionId: sampleNodeExecID,
Phase: core.TaskExecution_FAILED,
OccurredAt: occuredAtProto,
Reason: "something went wrong",
},
}

err = UpdateTaskExecutionModel(context.TODO(), failedEventRequest, &existingTaskExecution,
interfaces.InlineEventDataPolicyStoreInline, commonMocks.GetMockStorageClient())
assert.Nil(t, err)

updatedClosure := &admin.TaskExecutionClosure{}
err = proto.Unmarshal(existingTaskExecution.Closure, updatedClosure)
assert.Nil(t, err)

assert.Equal(t, updatedClosure.Metadata.ExternalResources[0].Logs, []*core.TaskLog{
{
Uri: "uri-default",
},
},
)

}

func TestUpdateTaskExecutionModelSingleEvents(t *testing.T) {
existingClosure := &admin.TaskExecutionClosure{
Phase: core.TaskExecution_RUNNING,
Expand Down Expand Up @@ -1208,6 +1385,125 @@ func TestMergeLogs(t *testing.T) {
}
}

func TestFilterLogsByPhase(t *testing.T) {
type testCase struct {
existing []*core.TaskLog
expected []*core.TaskLog
phase core.TaskExecution_Phase
name string
}

testCases := []testCase{
{
existing: []*core.TaskLog{
{
Uri: "default-uri",
ShowWhilePending: false,
HideOnceFinished: false,
},
{
Uri: "show-pending-uri",
ShowWhilePending: true,
HideOnceFinished: false,
},
{
Uri: "hide-finished-uri",
ShowWhilePending: false,
HideOnceFinished: true,
},
},
expected: []*core.TaskLog{
{
Uri: "show-pending-uri",
ShowWhilePending: true,
HideOnceFinished: false,
},
},
phase: core.TaskExecution_QUEUED,
name: "Filtered logs in QUEUED phase",
},
{
existing: []*core.TaskLog{
{
Uri: "default-uri",
ShowWhilePending: false,
HideOnceFinished: false,
},
{
Uri: "show-pending-uri",
ShowWhilePending: true,
HideOnceFinished: false,
},
{
Uri: "hide-finished-uri",
ShowWhilePending: false,
HideOnceFinished: true,
},
},
expected: []*core.TaskLog{
{
Uri: "default-uri",
ShowWhilePending: false,
HideOnceFinished: false,
},
{
Uri: "show-pending-uri",
ShowWhilePending: true,
HideOnceFinished: false,
},
{
Uri: "hide-finished-uri",
ShowWhilePending: false,
HideOnceFinished: true,
},
},
phase: core.TaskExecution_RUNNING,
name: "Filtered logs in RUNNING phase",
},
{
existing: []*core.TaskLog{
{
Uri: "default-uri",
ShowWhilePending: false,
HideOnceFinished: false,
},
{
Uri: "show-pending-uri",
ShowWhilePending: true,
HideOnceFinished: false,
},
{
Uri: "hide-finished-uri",
ShowWhilePending: false,
HideOnceFinished: true,
},
},
expected: []*core.TaskLog{
{
Uri: "default-uri",
ShowWhilePending: false,
HideOnceFinished: false,
},
{
Uri: "show-pending-uri",
ShowWhilePending: true,
HideOnceFinished: false,
},
},
phase: core.TaskExecution_SUCCEEDED,
name: "Filtered logs in terminated phase",
},
}
for _, filterTestCase := range testCases {
filteredLogs := filterLogsByPhase(filterTestCase.existing, filterTestCase.phase)

assert.Equal(t, len(filterTestCase.expected), len(filteredLogs), fmt.Sprintf("%s failed", filterTestCase.name))
for idx, expectedLog := range filterTestCase.expected {
assert.True(t, proto.Equal(expectedLog, filteredLogs[idx]), fmt.Sprintf("%s failed", filterTestCase.name))
}
}
}

func TestMergeCustoms(t *testing.T) {
t.Run("nothing to do", func(t *testing.T) {
custom, err := mergeCustom(nil, nil)
Expand Down
6 changes: 6 additions & 0 deletions flyteidl/clients/go/assets/admin.swagger.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading
Loading