From 54aa165a22633a5e5ff5e814db67c3e9994753fa Mon Sep 17 00:00:00 2001 From: Future-Outlier Date: Wed, 27 Nov 2024 23:36:49 +0800 Subject: [PATCH 01/12] add tests from Yi-Cheng Signed-off-by: Future-Outlier --- .../transformers/node_execution.go | 1 + .../transformers/node_execution_test.go | 5 + .../pkg/controller/nodes/task/handler.go | 96 +++++++++++++++---- 3 files changed, 83 insertions(+), 19 deletions(-) diff --git a/flyteadmin/pkg/repositories/transformers/node_execution.go b/flyteadmin/pkg/repositories/transformers/node_execution.go index 107e9efb70..1f17f0f131 100644 --- a/flyteadmin/pkg/repositories/transformers/node_execution.go +++ b/flyteadmin/pkg/repositories/transformers/node_execution.go @@ -42,6 +42,7 @@ func addNodeRunningState(request *admin.NodeExecutionEventRequest, nodeExecution "failed to marshal occurredAt into a timestamp proto with error: %v", err) } closure.StartedAt = startedAtProto + closure.DeckUri = request.GetEvent().GetDeckUri() return nil } diff --git a/flyteadmin/pkg/repositories/transformers/node_execution_test.go b/flyteadmin/pkg/repositories/transformers/node_execution_test.go index e37d312612..11dfd7256e 100644 --- a/flyteadmin/pkg/repositories/transformers/node_execution_test.go +++ b/flyteadmin/pkg/repositories/transformers/node_execution_test.go @@ -51,6 +51,7 @@ var childExecutionID = &core.WorkflowExecutionIdentifier{ const dynamicWorkflowClosureRef = "s3://bucket/admin/metadata/workflow" const testInputURI = "fake://bucket/inputs.pb" +const DeckURI = "fake://bucket/deck.html" var testInputs = &core.LiteralMap{ Literals: map[string]*core.Literal{ @@ -65,6 +66,7 @@ func TestAddRunningState(t *testing.T) { Event: &event.NodeExecutionEvent{ Phase: core.NodeExecution_RUNNING, OccurredAt: startedAtProto, + DeckUri: DeckURI, }, } nodeExecutionModel := models.NodeExecution{} @@ -73,6 +75,7 @@ func TestAddRunningState(t *testing.T) { assert.Nil(t, err) assert.Equal(t, startedAt, *nodeExecutionModel.StartedAt) assert.True(t, proto.Equal(startedAtProto, closure.GetStartedAt())) + assert.Equal(t, DeckURI, closure.GetDeckUri()) } func TestAddTerminalState_OutputURI(t *testing.T) { @@ -84,6 +87,7 @@ func TestAddTerminalState_OutputURI(t *testing.T) { OutputUri: outputURI, }, OccurredAt: occurredAtProto, + DeckUri: DeckURI, }, } startedAt := occurredAt.Add(-time.Minute) @@ -99,6 +103,7 @@ func TestAddTerminalState_OutputURI(t *testing.T) { assert.Nil(t, err) assert.EqualValues(t, outputURI, closure.GetOutputUri()) assert.Equal(t, time.Minute, nodeExecutionModel.Duration) + assert.Equal(t, DeckURI, closure.GetDeckUri()) } func TestAddTerminalState_OutputData(t *testing.T) { diff --git a/flytepropeller/pkg/controller/nodes/task/handler.go b/flytepropeller/pkg/controller/nodes/task/handler.go index 000d6bd7e7..f291189f9f 100644 --- a/flytepropeller/pkg/controller/nodes/task/handler.go +++ b/flytepropeller/pkg/controller/nodes/task/handler.go @@ -71,10 +71,43 @@ func getPluginMetricKey(pluginID, taskType string) string { return taskType + "_" + pluginID } -func (p *pluginRequestedTransition) CacheHit(outputPath storage.DataReference, deckPath *storage.DataReference, entry catalog.Entry) { +func (p *pluginRequestedTransition) AddDeckURI(ctx context.Context, tCtx *taskExecutionContext) { + var deckURI *storage.DataReference + deckURIValue := tCtx.ow.GetDeckPath() + deckURI = &deckURIValue + + if p.execInfo.OutputInfo == nil { + p.execInfo.OutputInfo = &handler.OutputInfo{} + } + + p.execInfo.OutputInfo.DeckURI = deckURI +} + +// RemoveNonexistentDeckURI removes the deck URI from the plugin execution info if the URI does not exist in remote storage. +func (p *pluginRequestedTransition) RemoveNonexistentDeckURI(ctx context.Context, tCtx *taskExecutionContext) error { + reader := tCtx.ow.GetReader() + if reader == nil && p.execInfo.OutputInfo != nil { + p.execInfo.OutputInfo.DeckURI = nil + return nil + } + + exists, err := reader.DeckExists(ctx) + if err != nil { + logger.Errorf(ctx, "Failed to check deck file existence. Error: %v", err) + return regErrors.Wrapf(err, "failed to check existence of deck file") + } + + if !exists && p.execInfo.OutputInfo != nil { + p.execInfo.OutputInfo.DeckURI = nil + } + + return nil +} + +func (p *pluginRequestedTransition) CacheHit(outputPath storage.DataReference, entry catalog.Entry) { p.ttype = handler.TransitionTypeEphemeral p.pInfo = pluginCore.PhaseInfoSuccess(nil) - p.ObserveSuccess(outputPath, deckPath, &event.TaskNodeMetadata{CacheStatus: entry.GetStatus().GetCacheStatus(), CatalogKey: entry.GetStatus().GetMetadata()}) + p.ObserveSuccess(outputPath, &event.TaskNodeMetadata{CacheStatus: entry.GetStatus().GetCacheStatus(), CatalogKey: entry.GetStatus().GetMetadata()}) } func (p *pluginRequestedTransition) PopulateCacheInfo(entry catalog.Entry) { @@ -144,10 +177,13 @@ func (p *pluginRequestedTransition) FinalTaskEvent(input ToTaskExecutionEventInp return ToTaskExecutionEvent(input) } -func (p *pluginRequestedTransition) ObserveSuccess(outputPath storage.DataReference, deckPath *storage.DataReference, taskMetadata *event.TaskNodeMetadata) { - p.execInfo.OutputInfo = &handler.OutputInfo{ - OutputURI: outputPath, - DeckURI: deckPath, +func (p *pluginRequestedTransition) ObserveSuccess(outputPath storage.DataReference, taskMetadata *event.TaskNodeMetadata) { + if p.execInfo.OutputInfo == nil { + p.execInfo.OutputInfo = &handler.OutputInfo{ + OutputURI: outputPath, + } + } else { + p.execInfo.OutputInfo.OutputURI = outputPath } p.execInfo.TaskNodeInfo = &handler.TaskNodeInfo{ @@ -171,7 +207,7 @@ func (p *pluginRequestedTransition) FinalTransition(ctx context.Context) (handle } logger.Debugf(ctx, "Task still running") - return handler.DoTransition(p.ttype, handler.PhaseInfoRunning(nil)), nil + return handler.DoTransition(p.ttype, handler.PhaseInfoRunning(&p.execInfo)), nil } // The plugin interface available especially for testing. @@ -464,8 +500,19 @@ func (t Handler) invokePlugin(ctx context.Context, p pluginCore.Plugin, tCtx *ta } } + // Regardless of the observed phase, we always add the DeckUri to support real-time deck functionality. + // The deck should be accessible even if the task is still running or has failed. + // It's possible that the deck URI may not exist in remote storage yet or will never exist. + // So, it is console's responsibility to handle the case when the deck URI actually does not exist. + pluginTrns.AddDeckURI(ctx, tCtx) + switch pluginTrns.pInfo.Phase() { case pluginCore.PhaseSuccess: + // This is to prevent the console from potentially checking the deck URI that does not exist if in final phase(PhaseSuccess). + err = pluginTrns.RemoveNonexistentDeckURI(ctx, tCtx) + if err != nil { + return pluginTrns, err + } // ------------------------------------- // TODO: @kumare create Issue# Remove the code after we use closures to handle dynamic nodes // This code only exists to support Dynamic tasks. Eventually dynamic tasks will use closure nodes to execute @@ -501,25 +548,36 @@ func (t Handler) invokePlugin(ctx context.Context, p pluginCore.Plugin, tCtx *ta CheckpointUri: tCtx.ow.GetCheckpointPrefix().String(), }) } else { - var deckURI *storage.DataReference - if tCtx.ow.GetReader() != nil { - exists, err := tCtx.ow.GetReader().DeckExists(ctx) - if err != nil { - logger.Errorf(ctx, "Failed to check deck file existence. Error: %v", err) - return pluginTrns, regErrors.Wrapf(err, "failed to check existence of deck file") - } else if exists { - deckURIValue := tCtx.ow.GetDeckPath() - deckURI = &deckURIValue - } - } - pluginTrns.ObserveSuccess(tCtx.ow.GetOutputPath(), deckURI, + pluginTrns.ObserveSuccess(tCtx.ow.GetOutputPath(), &event.TaskNodeMetadata{ CheckpointUri: tCtx.ow.GetCheckpointPrefix().String(), }) + + ////var deckURI *storage.DataReference + //if tCtx.ow.GetReader() != nil { + // exists, err := tCtx.ow.GetReader().DeckExists(ctx) + // if err != nil { + // logger.Errorf(ctx, "Failed to check deck file existence. Error: %v", err) + // return pluginTrns, regErrors.Wrapf(err, "failed to check existence of deck file") + // } else if exists { + // deckURIValue := tCtx.ow.GetDeckPath() + // deckURI = &deckURIValue + // } + //} + //pluginTrns.ObserveSuccess(tCtx.ow.GetOutputPath(), + // &event.TaskNodeMetadata{ + // CheckpointUri: tCtx.ow.GetCheckpointPrefix().String(), + // }) } case pluginCore.PhaseRetryableFailure: fallthrough case pluginCore.PhasePermanentFailure: + // This is to prevent the console from potentially checking the deck URI that does not exist if in final + // phase(PhaseFailure). + err = pluginTrns.RemoveNonexistentDeckURI(ctx, tCtx) + if err != nil { + return pluginTrns, err + } pluginTrns.ObservedFailure( &event.TaskNodeMetadata{ CheckpointUri: tCtx.ow.GetCheckpointPrefix().String(), From 9ed6b6e45b2d645be99ceb412defb9424d466cfa Mon Sep 17 00:00:00 2001 From: Future-Outlier Date: Wed, 27 Nov 2024 23:40:17 +0800 Subject: [PATCH 02/12] helped by Kevin and Yi-Cheng Signed-off-by: Future-Outlier Co-authored-by: Yi Cheng Co-authored-by: pingsutw From 4b4f6bd95a5b4ca3cd55db4a605df651d07ccacb Mon Sep 17 00:00:00 2001 From: Future-Outlier Date: Wed, 27 Nov 2024 23:47:44 +0800 Subject: [PATCH 03/12] lint Signed-off-by: Future-Outlier --- .../pkg/controller/nodes/task/handler.go | 16 ---------------- 1 file changed, 16 deletions(-) diff --git a/flytepropeller/pkg/controller/nodes/task/handler.go b/flytepropeller/pkg/controller/nodes/task/handler.go index f291189f9f..10b6bd029a 100644 --- a/flytepropeller/pkg/controller/nodes/task/handler.go +++ b/flytepropeller/pkg/controller/nodes/task/handler.go @@ -552,22 +552,6 @@ func (t Handler) invokePlugin(ctx context.Context, p pluginCore.Plugin, tCtx *ta &event.TaskNodeMetadata{ CheckpointUri: tCtx.ow.GetCheckpointPrefix().String(), }) - - ////var deckURI *storage.DataReference - //if tCtx.ow.GetReader() != nil { - // exists, err := tCtx.ow.GetReader().DeckExists(ctx) - // if err != nil { - // logger.Errorf(ctx, "Failed to check deck file existence. Error: %v", err) - // return pluginTrns, regErrors.Wrapf(err, "failed to check existence of deck file") - // } else if exists { - // deckURIValue := tCtx.ow.GetDeckPath() - // deckURI = &deckURIValue - // } - //} - //pluginTrns.ObserveSuccess(tCtx.ow.GetOutputPath(), - // &event.TaskNodeMetadata{ - // CheckpointUri: tCtx.ow.GetCheckpointPrefix().String(), - // }) } case pluginCore.PhaseRetryableFailure: fallthrough From dd774cbe560b265bba25d82aaa80cdaecba31a8f Mon Sep 17 00:00:00 2001 From: Future-Outlier Date: Thu, 28 Nov 2024 21:58:07 +0800 Subject: [PATCH 04/12] nit Signed-off-by: Future-Outlier --- flytepropeller/pkg/controller/nodes/task/handler.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/flytepropeller/pkg/controller/nodes/task/handler.go b/flytepropeller/pkg/controller/nodes/task/handler.go index 10b6bd029a..d0f6d9ddb5 100644 --- a/flytepropeller/pkg/controller/nodes/task/handler.go +++ b/flytepropeller/pkg/controller/nodes/task/handler.go @@ -71,7 +71,7 @@ func getPluginMetricKey(pluginID, taskType string) string { return taskType + "_" + pluginID } -func (p *pluginRequestedTransition) AddDeckURI(ctx context.Context, tCtx *taskExecutionContext) { +func (p *pluginRequestedTransition) AddDeckURI(tCtx *taskExecutionContext) { var deckURI *storage.DataReference deckURIValue := tCtx.ow.GetDeckPath() deckURI = &deckURIValue @@ -504,7 +504,7 @@ func (t Handler) invokePlugin(ctx context.Context, p pluginCore.Plugin, tCtx *ta // The deck should be accessible even if the task is still running or has failed. // It's possible that the deck URI may not exist in remote storage yet or will never exist. // So, it is console's responsibility to handle the case when the deck URI actually does not exist. - pluginTrns.AddDeckURI(ctx, tCtx) + pluginTrns.AddDeckURI(tCtx) switch pluginTrns.pInfo.Phase() { case pluginCore.PhaseSuccess: From 0bb8e919592a5a886096b45addb8bffb714da4d9 Mon Sep 17 00:00:00 2001 From: Future-Outlier Date: Fri, 13 Dec 2024 23:18:32 +0800 Subject: [PATCH 05/12] add comments Signed-off-by: Future-Outlier --- flytepropeller/pkg/controller/nodes/task/handler.go | 1 + 1 file changed, 1 insertion(+) diff --git a/flytepropeller/pkg/controller/nodes/task/handler.go b/flytepropeller/pkg/controller/nodes/task/handler.go index d0f6d9ddb5..6e792a5698 100644 --- a/flytepropeller/pkg/controller/nodes/task/handler.go +++ b/flytepropeller/pkg/controller/nodes/task/handler.go @@ -207,6 +207,7 @@ func (p *pluginRequestedTransition) FinalTransition(ctx context.Context) (handle } logger.Debugf(ctx, "Task still running") + // Here will send the deck uri to flyteadmin return handler.DoTransition(p.ttype, handler.PhaseInfoRunning(&p.execInfo)), nil } From 25fea2956a0108c9b1937c6a4f5bb7c1210898e1 Mon Sep 17 00:00:00 2001 From: Future-Outlier Date: Tue, 17 Dec 2024 15:19:17 +0800 Subject: [PATCH 06/12] add comments and better solution for backward compativle Signed-off-by: Future-Outlier --- .../pkg/controller/nodes/task/handler.go | 59 ++++++++++++++++--- 1 file changed, 52 insertions(+), 7 deletions(-) diff --git a/flytepropeller/pkg/controller/nodes/task/handler.go b/flytepropeller/pkg/controller/nodes/task/handler.go index 6e792a5698..ef24535b3c 100644 --- a/flytepropeller/pkg/controller/nodes/task/handler.go +++ b/flytepropeller/pkg/controller/nodes/task/handler.go @@ -4,6 +4,7 @@ import ( "context" "fmt" "runtime/debug" + "strings" "time" regErrors "github.com/pkg/errors" @@ -40,6 +41,7 @@ import ( ) const pluginContextKey = contextutils.Key("plugin") +const FLYTE_ENABLE_DECK = string("FLYTE_ENABLE_DECK") type metrics struct { pluginPanics labeled.Counter @@ -83,8 +85,7 @@ func (p *pluginRequestedTransition) AddDeckURI(tCtx *taskExecutionContext) { p.execInfo.OutputInfo.DeckURI = deckURI } -// RemoveNonexistentDeckURI removes the deck URI from the plugin execution info if the URI does not exist in remote storage. -func (p *pluginRequestedTransition) RemoveNonexistentDeckURI(ctx context.Context, tCtx *taskExecutionContext) error { +func (p *pluginRequestedTransition) AddDeckURIIfDeckExists(ctx context.Context, tCtx *taskExecutionContext) error { reader := tCtx.ow.GetReader() if reader == nil && p.execInfo.OutputInfo != nil { p.execInfo.OutputInfo.DeckURI = nil @@ -97,8 +98,13 @@ func (p *pluginRequestedTransition) RemoveNonexistentDeckURI(ctx context.Context return regErrors.Wrapf(err, "failed to check existence of deck file") } - if !exists && p.execInfo.OutputInfo != nil { - p.execInfo.OutputInfo.DeckURI = nil + if p.execInfo.OutputInfo == nil { + p.execInfo.OutputInfo = &handler.OutputInfo{} + } + + if exists { + deckURIValue := tCtx.ow.GetDeckPath() + p.execInfo.OutputInfo.DeckURI = &deckURIValue } return nil @@ -417,6 +423,21 @@ func (t Handler) fetchPluginTaskMetrics(pluginID, taskType string) (*taskMetrics return t.taskMetricsMap[metricNameKey], nil } +func IsDeckEnabled(ctx context.Context, tCtx *taskExecutionContext) (bool, error) { + template, err := tCtx.tr.Read(ctx) + if err != nil { + return false, regErrors.Wrapf(err, "failed to read task template") + } + + templateConfig := template.GetConfig() + if templateConfig == nil { + return false, nil + } + + deckEnabled := strings.ToLower(templateConfig[FLYTE_ENABLE_DECK]) + return deckEnabled == "1" || deckEnabled == "t" || deckEnabled == "true", nil +} + func (t Handler) invokePlugin(ctx context.Context, p pluginCore.Plugin, tCtx *taskExecutionContext, ts handler.TaskNodeState) (*pluginRequestedTransition, error) { pluginTrns := &pluginRequestedTransition{} @@ -505,12 +526,34 @@ func (t Handler) invokePlugin(ctx context.Context, p pluginCore.Plugin, tCtx *ta // The deck should be accessible even if the task is still running or has failed. // It's possible that the deck URI may not exist in remote storage yet or will never exist. // So, it is console's responsibility to handle the case when the deck URI actually does not exist. - pluginTrns.AddDeckURI(tCtx) + deckEnabled, err := IsDeckEnabled(ctx, tCtx) + if err != nil { + return nil, err + } + if deckEnabled { + pluginTrns.AddDeckURI(tCtx) + } + // Handle backward compatibility for Flyte deck display behavior. + // + // Before (legacy behavior): + // - Deck URI was only shown if the deck file existed in the terminal state. + // - We relied on a HEAD request to check if the deck file exists, then added the URI to the event. + // + // After (new behavior): + // - If `FLYTE_ENABLE_DECK = true` is set in the task template config (requires Flytekit > 1.14.0), + // we display the deck URI from the beginning rather than waiting until the terminal state. + // + // For backward compatibility with older Flytekit versions (which don't support `FLYTE_ENABLE_DECK`), + // we still need to check deck file existence in the terminal state. This ensures that when the deck + // isn't enabled via config or doesn't exist, we only show the URI in terminal states if the deck file + // is actually present. switch pluginTrns.pInfo.Phase() { case pluginCore.PhaseSuccess: // This is to prevent the console from potentially checking the deck URI that does not exist if in final phase(PhaseSuccess). - err = pluginTrns.RemoveNonexistentDeckURI(ctx, tCtx) + if !deckEnabled { + err = pluginTrns.AddDeckURIIfDeckExists(ctx, tCtx) + } if err != nil { return pluginTrns, err } @@ -559,7 +602,9 @@ func (t Handler) invokePlugin(ctx context.Context, p pluginCore.Plugin, tCtx *ta case pluginCore.PhasePermanentFailure: // This is to prevent the console from potentially checking the deck URI that does not exist if in final // phase(PhaseFailure). - err = pluginTrns.RemoveNonexistentDeckURI(ctx, tCtx) + if !deckEnabled { + err = pluginTrns.AddDeckURIIfDeckExists(ctx, tCtx) + } if err != nil { return pluginTrns, err } From 4e24e914c06d127c3559f59b03616f1a85743a24 Mon Sep 17 00:00:00 2001 From: Future-Outlier Date: Tue, 17 Dec 2024 15:28:16 +0800 Subject: [PATCH 07/12] better comments Signed-off-by: Future-Outlier --- flytepropeller/pkg/controller/nodes/task/handler.go | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/flytepropeller/pkg/controller/nodes/task/handler.go b/flytepropeller/pkg/controller/nodes/task/handler.go index ef24535b3c..d2d538549d 100644 --- a/flytepropeller/pkg/controller/nodes/task/handler.go +++ b/flytepropeller/pkg/controller/nodes/task/handler.go @@ -550,7 +550,7 @@ func (t Handler) invokePlugin(ctx context.Context, p pluginCore.Plugin, tCtx *ta // is actually present. switch pluginTrns.pInfo.Phase() { case pluginCore.PhaseSuccess: - // This is to prevent the console from potentially checking the deck URI that does not exist if in final phase(PhaseSuccess). + // This is for backward compatibility with older Flytekit versions. if !deckEnabled { err = pluginTrns.AddDeckURIIfDeckExists(ctx, tCtx) } @@ -600,8 +600,7 @@ func (t Handler) invokePlugin(ctx context.Context, p pluginCore.Plugin, tCtx *ta case pluginCore.PhaseRetryableFailure: fallthrough case pluginCore.PhasePermanentFailure: - // This is to prevent the console from potentially checking the deck URI that does not exist if in final - // phase(PhaseFailure). + // This is for backward compatibility with older Flytekit versions. if !deckEnabled { err = pluginTrns.AddDeckURIIfDeckExists(ctx, tCtx) } From 8d1d0e43d79e2172ec49ee48df7b30949429c045 Mon Sep 17 00:00:00 2001 From: Future-Outlier Date: Wed, 18 Dec 2024 23:39:30 +0800 Subject: [PATCH 08/12] DeckStatus Signed-off-by: Future-Outlier --- .../pkg/controller/nodes/task/handler.go | 40 ++++++++++++++----- 1 file changed, 31 insertions(+), 9 deletions(-) diff --git a/flytepropeller/pkg/controller/nodes/task/handler.go b/flytepropeller/pkg/controller/nodes/task/handler.go index d2d538549d..3764a340ff 100644 --- a/flytepropeller/pkg/controller/nodes/task/handler.go +++ b/flytepropeller/pkg/controller/nodes/task/handler.go @@ -4,6 +4,7 @@ import ( "context" "fmt" "runtime/debug" + "strconv" "strings" "time" @@ -43,6 +44,14 @@ import ( const pluginContextKey = contextutils.Key("plugin") const FLYTE_ENABLE_DECK = string("FLYTE_ENABLE_DECK") +type DeckStatus int + +const ( + DeckUnknown DeckStatus = iota + DeckEnabled + DeckDisabled +) + type metrics struct { pluginPanics labeled.Counter unsupportedTaskType labeled.Counter @@ -423,19 +432,32 @@ func (t Handler) fetchPluginTaskMetrics(pluginID, taskType string) (*taskMetrics return t.taskMetricsMap[metricNameKey], nil } -func IsDeckEnabled(ctx context.Context, tCtx *taskExecutionContext) (bool, error) { +func IsDeckEnabled(ctx context.Context, tCtx *taskExecutionContext) (DeckStatus, error) { template, err := tCtx.tr.Read(ctx) if err != nil { - return false, regErrors.Wrapf(err, "failed to read task template") + return DeckUnknown, regErrors.Wrapf(err, "failed to read task template") } templateConfig := template.GetConfig() if templateConfig == nil { - return false, nil + return DeckUnknown, nil + } + + rawValue, ok := templateConfig[FLYTE_ENABLE_DECK] + if !ok { + return DeckUnknown, nil } - deckEnabled := strings.ToLower(templateConfig[FLYTE_ENABLE_DECK]) - return deckEnabled == "1" || deckEnabled == "t" || deckEnabled == "true", nil + rawValue = strings.ToLower(rawValue) + deckEnabled, err := strconv.ParseBool(rawValue) + if err != nil { + return DeckUnknown, nil + } + + if deckEnabled { + return DeckEnabled, nil + } + return DeckDisabled, nil } func (t Handler) invokePlugin(ctx context.Context, p pluginCore.Plugin, tCtx *taskExecutionContext, ts handler.TaskNodeState) (*pluginRequestedTransition, error) { @@ -526,11 +548,11 @@ func (t Handler) invokePlugin(ctx context.Context, p pluginCore.Plugin, tCtx *ta // The deck should be accessible even if the task is still running or has failed. // It's possible that the deck URI may not exist in remote storage yet or will never exist. // So, it is console's responsibility to handle the case when the deck URI actually does not exist. - deckEnabled, err := IsDeckEnabled(ctx, tCtx) + deckStatus, err := IsDeckEnabled(ctx, tCtx) if err != nil { return nil, err } - if deckEnabled { + if deckStatus == DeckEnabled { pluginTrns.AddDeckURI(tCtx) } @@ -551,7 +573,7 @@ func (t Handler) invokePlugin(ctx context.Context, p pluginCore.Plugin, tCtx *ta switch pluginTrns.pInfo.Phase() { case pluginCore.PhaseSuccess: // This is for backward compatibility with older Flytekit versions. - if !deckEnabled { + if deckStatus == DeckUnknown { err = pluginTrns.AddDeckURIIfDeckExists(ctx, tCtx) } if err != nil { @@ -601,7 +623,7 @@ func (t Handler) invokePlugin(ctx context.Context, p pluginCore.Plugin, tCtx *ta fallthrough case pluginCore.PhasePermanentFailure: // This is for backward compatibility with older Flytekit versions. - if !deckEnabled { + if deckStatus == DeckUnknown { err = pluginTrns.AddDeckURIIfDeckExists(ctx, tCtx) } if err != nil { From 31853bbe2cef1945a05626221c6d901f745c1e2d Mon Sep 17 00:00:00 2001 From: Future-Outlier Date: Wed, 18 Dec 2024 23:43:21 +0800 Subject: [PATCH 09/12] rename GetDeckStatus Signed-off-by: Future-Outlier --- flytepropeller/pkg/controller/nodes/task/handler.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/flytepropeller/pkg/controller/nodes/task/handler.go b/flytepropeller/pkg/controller/nodes/task/handler.go index 3764a340ff..9ea70bf486 100644 --- a/flytepropeller/pkg/controller/nodes/task/handler.go +++ b/flytepropeller/pkg/controller/nodes/task/handler.go @@ -432,7 +432,7 @@ func (t Handler) fetchPluginTaskMetrics(pluginID, taskType string) (*taskMetrics return t.taskMetricsMap[metricNameKey], nil } -func IsDeckEnabled(ctx context.Context, tCtx *taskExecutionContext) (DeckStatus, error) { +func GetDeckStatus(ctx context.Context, tCtx *taskExecutionContext) (DeckStatus, error) { template, err := tCtx.tr.Read(ctx) if err != nil { return DeckUnknown, regErrors.Wrapf(err, "failed to read task template") @@ -548,7 +548,7 @@ func (t Handler) invokePlugin(ctx context.Context, p pluginCore.Plugin, tCtx *ta // The deck should be accessible even if the task is still running or has failed. // It's possible that the deck URI may not exist in remote storage yet or will never exist. // So, it is console's responsibility to handle the case when the deck URI actually does not exist. - deckStatus, err := IsDeckEnabled(ctx, tCtx) + deckStatus, err := GetDeckStatus(ctx, tCtx) if err != nil { return nil, err } From 40680430c871d4bd3d39690d834e73b8b55256f4 Mon Sep 17 00:00:00 2001 From: Future-Outlier Date: Wed, 18 Dec 2024 23:47:44 +0800 Subject: [PATCH 10/12] comments Signed-off-by: Future-Outlier --- flytepropeller/pkg/controller/nodes/task/handler.go | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/flytepropeller/pkg/controller/nodes/task/handler.go b/flytepropeller/pkg/controller/nodes/task/handler.go index 9ea70bf486..f24abbcb6a 100644 --- a/flytepropeller/pkg/controller/nodes/task/handler.go +++ b/flytepropeller/pkg/controller/nodes/task/handler.go @@ -433,6 +433,10 @@ func (t Handler) fetchPluginTaskMetrics(pluginID, taskType string) (*taskMetrics } func GetDeckStatus(ctx context.Context, tCtx *taskExecutionContext) (DeckStatus, error) { + // FLYTE_ENABLE_DECK is used when flytekit > 1.14.0 + // For backward compatibility, + // we will return DeckUnknow and call a HEAD request to check if the deck file exists in the terminal state. + template, err := tCtx.tr.Read(ctx) if err != nil { return DeckUnknown, regErrors.Wrapf(err, "failed to read task template") From 65b6efedd9c4986e4dc1ae8a97a2ee58ab333d3f Mon Sep 17 00:00:00 2001 From: Future-Outlier Date: Thu, 2 Jan 2025 15:18:28 +0800 Subject: [PATCH 11/12] lint Signed-off-by: Future-Outlier --- .../pkg/controller/nodes/task/handler.go | 23 ++++--------------- 1 file changed, 5 insertions(+), 18 deletions(-) diff --git a/flytepropeller/pkg/controller/nodes/task/handler.go b/flytepropeller/pkg/controller/nodes/task/handler.go index f24abbcb6a..98c605d5df 100644 --- a/flytepropeller/pkg/controller/nodes/task/handler.go +++ b/flytepropeller/pkg/controller/nodes/task/handler.go @@ -4,8 +4,6 @@ import ( "context" "fmt" "runtime/debug" - "strconv" - "strings" "time" regErrors "github.com/pkg/errors" @@ -435,32 +433,21 @@ func (t Handler) fetchPluginTaskMetrics(pluginID, taskType string) (*taskMetrics func GetDeckStatus(ctx context.Context, tCtx *taskExecutionContext) (DeckStatus, error) { // FLYTE_ENABLE_DECK is used when flytekit > 1.14.0 // For backward compatibility, - // we will return DeckUnknow and call a HEAD request to check if the deck file exists in the terminal state. + // we will return DeckUnknown and call a HEAD request to check if the deck file exists in the terminal state. template, err := tCtx.tr.Read(ctx) if err != nil { return DeckUnknown, regErrors.Wrapf(err, "failed to read task template") } - templateConfig := template.GetConfig() - if templateConfig == nil { + metadata := template.GetMetadata() + if metadata == nil { return DeckUnknown, nil } - - rawValue, ok := templateConfig[FLYTE_ENABLE_DECK] - if !ok { - return DeckUnknown, nil - } - - rawValue = strings.ToLower(rawValue) - deckEnabled, err := strconv.ParseBool(rawValue) - if err != nil { - return DeckUnknown, nil - } - - if deckEnabled { + if metadata.GetGeneratesDeck() { return DeckEnabled, nil } + return DeckDisabled, nil } From 137579f6b11d612e0e1bfc6e8fd9c724ea30dbfb Mon Sep 17 00:00:00 2001 From: Future-Outlier Date: Thu, 9 Jan 2025 12:27:44 +0800 Subject: [PATCH 12/12] fix Signed-off-by: Future-Outlier --- .../pkg/controller/nodes/task/handler.go | 40 +++++++------------ 1 file changed, 14 insertions(+), 26 deletions(-) diff --git a/flytepropeller/pkg/controller/nodes/task/handler.go b/flytepropeller/pkg/controller/nodes/task/handler.go index 98c605d5df..d4fc654356 100644 --- a/flytepropeller/pkg/controller/nodes/task/handler.go +++ b/flytepropeller/pkg/controller/nodes/task/handler.go @@ -40,14 +40,12 @@ import ( ) const pluginContextKey = contextutils.Key("plugin") -const FLYTE_ENABLE_DECK = string("FLYTE_ENABLE_DECK") type DeckStatus int const ( DeckUnknown DeckStatus = iota DeckEnabled - DeckDisabled ) type metrics struct { @@ -431,24 +429,29 @@ func (t Handler) fetchPluginTaskMetrics(pluginID, taskType string) (*taskMetrics } func GetDeckStatus(ctx context.Context, tCtx *taskExecutionContext) (DeckStatus, error) { - // FLYTE_ENABLE_DECK is used when flytekit > 1.14.0 - // For backward compatibility, - // we will return DeckUnknown and call a HEAD request to check if the deck file exists in the terminal state. + // GetDeckStatus determines whether a task generates a deck based on its execution context. + // + // This function ensures backward compatibility with older Flytekit versions using the following logic: + // 1. For Flytekit > 1.14.3, the task template's metadata includes the `generates_deck` flag: + // - If `generates_deck` is set to true, it indicates that the task generates a deck, and DeckEnabled is returned. + // 2. If `generates_deck` is set to false or is not set (likely from older Flytekit versions): + // - DeckUnknown is returned as a placeholder status. + // - In terminal states, a HEAD request can be made to check if the deck file exists. + // + // In future implementations, a `DeckDisabled` status could be introduced for better performance optimization: + // - This would eliminate the need for a HEAD request in the final phase. + // - However, the tradeoff is that a new field would need to be added to FlyteIDL to support this behavior. template, err := tCtx.tr.Read(ctx) if err != nil { return DeckUnknown, regErrors.Wrapf(err, "failed to read task template") } - metadata := template.GetMetadata() - if metadata == nil { - return DeckUnknown, nil - } - if metadata.GetGeneratesDeck() { + if template.GetMetadata().GetGeneratesDeck() { return DeckEnabled, nil } - return DeckDisabled, nil + return DeckUnknown, nil } func (t Handler) invokePlugin(ctx context.Context, p pluginCore.Plugin, tCtx *taskExecutionContext, ts handler.TaskNodeState) (*pluginRequestedTransition, error) { @@ -547,23 +550,8 @@ func (t Handler) invokePlugin(ctx context.Context, p pluginCore.Plugin, tCtx *ta pluginTrns.AddDeckURI(tCtx) } - // Handle backward compatibility for Flyte deck display behavior. - // - // Before (legacy behavior): - // - Deck URI was only shown if the deck file existed in the terminal state. - // - We relied on a HEAD request to check if the deck file exists, then added the URI to the event. - // - // After (new behavior): - // - If `FLYTE_ENABLE_DECK = true` is set in the task template config (requires Flytekit > 1.14.0), - // we display the deck URI from the beginning rather than waiting until the terminal state. - // - // For backward compatibility with older Flytekit versions (which don't support `FLYTE_ENABLE_DECK`), - // we still need to check deck file existence in the terminal state. This ensures that when the deck - // isn't enabled via config or doesn't exist, we only show the URI in terminal states if the deck file - // is actually present. switch pluginTrns.pInfo.Phase() { case pluginCore.PhaseSuccess: - // This is for backward compatibility with older Flytekit versions. if deckStatus == DeckUnknown { err = pluginTrns.AddDeckURIIfDeckExists(ctx, tCtx) }