From 1ec6dc65ec7cac0c3f10e96c111be4a9065f8953 Mon Sep 17 00:00:00 2001 From: Daniel Rammer Date: Thu, 2 Nov 2023 14:45:00 -0500 Subject: [PATCH 1/4] adding PluginIdentifier to TaskExecutionMetadata Signed-off-by: Daniel Rammer --- flytepropeller/pkg/controller/nodes/array/event_recorder.go | 1 + 1 file changed, 1 insertion(+) diff --git a/flytepropeller/pkg/controller/nodes/array/event_recorder.go b/flytepropeller/pkg/controller/nodes/array/event_recorder.go index 55cbbce89f..9846bd0f9c 100644 --- a/flytepropeller/pkg/controller/nodes/array/event_recorder.go +++ b/flytepropeller/pkg/controller/nodes/array/event_recorder.go @@ -120,6 +120,7 @@ func (e *externalResourcesEventRecorder) finalize(ctx context.Context, nCtx inte OccurredAt: occurredAt, Metadata: &event.TaskExecutionMetadata{ ExternalResources: e.externalResources, + PluginIdentifier: "k8s-array", }, TaskType: "k8s-array", EventVersion: 1, From bee84d6931fdce65e76c24d3bb04596b0ea16ed2 Mon Sep 17 00:00:00 2001 From: Daniel Rammer Date: Thu, 2 Nov 2023 15:36:17 -0500 Subject: [PATCH 2/4] sending input and output metadata - removing validating check on flyteadmin temporarily Signed-off-by: Daniel Rammer --- .../manager/impl/task_execution_manager.go | 4 +-- .../controller/nodes/array/event_recorder.go | 31 ++++++++++++++++++- 2 files changed, 32 insertions(+), 3 deletions(-) diff --git a/flyteadmin/pkg/manager/impl/task_execution_manager.go b/flyteadmin/pkg/manager/impl/task_execution_manager.go index 82b872dbd4..2df0348905 100644 --- a/flyteadmin/pkg/manager/impl/task_execution_manager.go +++ b/flyteadmin/pkg/manager/impl/task_execution_manager.go @@ -295,9 +295,9 @@ func (m *TaskExecutionManager) ListTaskExecutions( func (m *TaskExecutionManager) GetTaskExecutionData( ctx context.Context, request admin.TaskExecutionGetDataRequest) (*admin.TaskExecutionGetDataResponse, error) { - if err := validation.ValidateTaskExecutionIdentifier(request.Id); err != nil { + /*if err := validation.ValidateTaskExecutionIdentifier(request.Id); err != nil { logger.Debugf(ctx, "Invalid identifier [%+v]: %v", request.Id, err) - } + }*/ ctx = getTaskExecutionContext(ctx, request.Id) taskExecution, err := m.GetTaskExecution(ctx, admin.TaskExecutionGetRequest{ Id: request.Id, diff --git a/flytepropeller/pkg/controller/nodes/array/event_recorder.go b/flytepropeller/pkg/controller/nodes/array/event_recorder.go index 9846bd0f9c..c59eb81bd7 100644 --- a/flytepropeller/pkg/controller/nodes/array/event_recorder.go +++ b/flytepropeller/pkg/controller/nodes/array/event_recorder.go @@ -120,12 +120,41 @@ func (e *externalResourcesEventRecorder) finalize(ctx context.Context, nCtx inte OccurredAt: occurredAt, Metadata: &event.TaskExecutionMetadata{ ExternalResources: e.externalResources, - PluginIdentifier: "k8s-array", + PluginIdentifier: "container", }, TaskType: "k8s-array", EventVersion: 1, } + // only attach input values if taskPhase is QUEUED meaning this the first evaluation + if taskPhase == idlcore.TaskExecution_QUEUED { + if eventConfig.RawOutputPolicy == config.RawOutputPolicyInline { + fmt.Printf("HAMERSAW - FIRST HERE\n") + // pass inputs by value + literalMap, err := nCtx.InputReader().Get(ctx) + if err != nil { + return err + } + + taskExecutionEvent.InputValue = &event.TaskExecutionEvent_InputData{ + InputData: literalMap, + } + } else { + fmt.Printf("HAMERSAW - THEN HERE\n") + // pass inputs by reference + taskExecutionEvent.InputValue = &event.TaskExecutionEvent_InputUri{ + InputUri: nCtx.InputReader().GetInputPath().String(), + } + } + } + + // only attach output uri if taskPhase is SUCCEEDED + if taskPhase == idlcore.TaskExecution_SUCCEEDED { + taskExecutionEvent.OutputResult = &event.TaskExecutionEvent_OutputUri{ + OutputUri: v1alpha1.GetOutputsFile(nCtx.NodeStatus().GetOutputDir()).String(), + } + } + // record TaskExecutionEvent return e.EventRecorder.RecordTaskEvent(ctx, taskExecutionEvent, eventConfig) } From 4003ad1da49d731e8c66f639392e758c348813ee Mon Sep 17 00:00:00 2001 From: Daniel Rammer Date: Thu, 2 Nov 2023 16:02:38 -0500 Subject: [PATCH 3/4] readded task execution id verification Signed-off-by: Daniel Rammer --- flyteadmin/pkg/manager/impl/task_execution_manager.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/flyteadmin/pkg/manager/impl/task_execution_manager.go b/flyteadmin/pkg/manager/impl/task_execution_manager.go index 2df0348905..82b872dbd4 100644 --- a/flyteadmin/pkg/manager/impl/task_execution_manager.go +++ b/flyteadmin/pkg/manager/impl/task_execution_manager.go @@ -295,9 +295,9 @@ func (m *TaskExecutionManager) ListTaskExecutions( func (m *TaskExecutionManager) GetTaskExecutionData( ctx context.Context, request admin.TaskExecutionGetDataRequest) (*admin.TaskExecutionGetDataResponse, error) { - /*if err := validation.ValidateTaskExecutionIdentifier(request.Id); err != nil { + if err := validation.ValidateTaskExecutionIdentifier(request.Id); err != nil { logger.Debugf(ctx, "Invalid identifier [%+v]: %v", request.Id, err) - }*/ + } ctx = getTaskExecutionContext(ctx, request.Id) taskExecution, err := m.GetTaskExecution(ctx, admin.TaskExecutionGetRequest{ Id: request.Id, From a88a4df3ff39a16b27472c4296c6cb67999cc261 Mon Sep 17 00:00:00 2001 From: Daniel Rammer Date: Wed, 8 Nov 2023 11:22:01 -0600 Subject: [PATCH 4/4] removed dead code Signed-off-by: Daniel Rammer --- flytepropeller/pkg/controller/nodes/array/event_recorder.go | 2 -- 1 file changed, 2 deletions(-) diff --git a/flytepropeller/pkg/controller/nodes/array/event_recorder.go b/flytepropeller/pkg/controller/nodes/array/event_recorder.go index c59eb81bd7..c4e3004011 100644 --- a/flytepropeller/pkg/controller/nodes/array/event_recorder.go +++ b/flytepropeller/pkg/controller/nodes/array/event_recorder.go @@ -129,7 +129,6 @@ func (e *externalResourcesEventRecorder) finalize(ctx context.Context, nCtx inte // only attach input values if taskPhase is QUEUED meaning this the first evaluation if taskPhase == idlcore.TaskExecution_QUEUED { if eventConfig.RawOutputPolicy == config.RawOutputPolicyInline { - fmt.Printf("HAMERSAW - FIRST HERE\n") // pass inputs by value literalMap, err := nCtx.InputReader().Get(ctx) if err != nil { @@ -140,7 +139,6 @@ func (e *externalResourcesEventRecorder) finalize(ctx context.Context, nCtx inte InputData: literalMap, } } else { - fmt.Printf("HAMERSAW - THEN HERE\n") // pass inputs by reference taskExecutionEvent.InputValue = &event.TaskExecutionEvent_InputUri{ InputUri: nCtx.InputReader().GetInputPath().String(),