From 25fea2956a0108c9b1937c6a4f5bb7c1210898e1 Mon Sep 17 00:00:00 2001 From: Future-Outlier Date: Tue, 17 Dec 2024 15:19:17 +0800 Subject: [PATCH] add comments and better solution for backward compativle Signed-off-by: Future-Outlier --- .../pkg/controller/nodes/task/handler.go | 59 ++++++++++++++++--- 1 file changed, 52 insertions(+), 7 deletions(-) diff --git a/flytepropeller/pkg/controller/nodes/task/handler.go b/flytepropeller/pkg/controller/nodes/task/handler.go index 6e792a5698..ef24535b3c 100644 --- a/flytepropeller/pkg/controller/nodes/task/handler.go +++ b/flytepropeller/pkg/controller/nodes/task/handler.go @@ -4,6 +4,7 @@ import ( "context" "fmt" "runtime/debug" + "strings" "time" regErrors "github.com/pkg/errors" @@ -40,6 +41,7 @@ import ( ) const pluginContextKey = contextutils.Key("plugin") +const FLYTE_ENABLE_DECK = string("FLYTE_ENABLE_DECK") type metrics struct { pluginPanics labeled.Counter @@ -83,8 +85,7 @@ func (p *pluginRequestedTransition) AddDeckURI(tCtx *taskExecutionContext) { p.execInfo.OutputInfo.DeckURI = deckURI } -// RemoveNonexistentDeckURI removes the deck URI from the plugin execution info if the URI does not exist in remote storage. -func (p *pluginRequestedTransition) RemoveNonexistentDeckURI(ctx context.Context, tCtx *taskExecutionContext) error { +func (p *pluginRequestedTransition) AddDeckURIIfDeckExists(ctx context.Context, tCtx *taskExecutionContext) error { reader := tCtx.ow.GetReader() if reader == nil && p.execInfo.OutputInfo != nil { p.execInfo.OutputInfo.DeckURI = nil @@ -97,8 +98,13 @@ func (p *pluginRequestedTransition) RemoveNonexistentDeckURI(ctx context.Context return regErrors.Wrapf(err, "failed to check existence of deck file") } - if !exists && p.execInfo.OutputInfo != nil { - p.execInfo.OutputInfo.DeckURI = nil + if p.execInfo.OutputInfo == nil { + p.execInfo.OutputInfo = &handler.OutputInfo{} + } + + if exists { + deckURIValue := tCtx.ow.GetDeckPath() + p.execInfo.OutputInfo.DeckURI = &deckURIValue } return nil @@ -417,6 +423,21 @@ func (t Handler) fetchPluginTaskMetrics(pluginID, taskType string) (*taskMetrics return t.taskMetricsMap[metricNameKey], nil } +func IsDeckEnabled(ctx context.Context, tCtx *taskExecutionContext) (bool, error) { + template, err := tCtx.tr.Read(ctx) + if err != nil { + return false, regErrors.Wrapf(err, "failed to read task template") + } + + templateConfig := template.GetConfig() + if templateConfig == nil { + return false, nil + } + + deckEnabled := strings.ToLower(templateConfig[FLYTE_ENABLE_DECK]) + return deckEnabled == "1" || deckEnabled == "t" || deckEnabled == "true", nil +} + func (t Handler) invokePlugin(ctx context.Context, p pluginCore.Plugin, tCtx *taskExecutionContext, ts handler.TaskNodeState) (*pluginRequestedTransition, error) { pluginTrns := &pluginRequestedTransition{} @@ -505,12 +526,34 @@ func (t Handler) invokePlugin(ctx context.Context, p pluginCore.Plugin, tCtx *ta // The deck should be accessible even if the task is still running or has failed. // It's possible that the deck URI may not exist in remote storage yet or will never exist. // So, it is console's responsibility to handle the case when the deck URI actually does not exist. - pluginTrns.AddDeckURI(tCtx) + deckEnabled, err := IsDeckEnabled(ctx, tCtx) + if err != nil { + return nil, err + } + if deckEnabled { + pluginTrns.AddDeckURI(tCtx) + } + // Handle backward compatibility for Flyte deck display behavior. + // + // Before (legacy behavior): + // - Deck URI was only shown if the deck file existed in the terminal state. + // - We relied on a HEAD request to check if the deck file exists, then added the URI to the event. + // + // After (new behavior): + // - If `FLYTE_ENABLE_DECK = true` is set in the task template config (requires Flytekit > 1.14.0), + // we display the deck URI from the beginning rather than waiting until the terminal state. + // + // For backward compatibility with older Flytekit versions (which don't support `FLYTE_ENABLE_DECK`), + // we still need to check deck file existence in the terminal state. This ensures that when the deck + // isn't enabled via config or doesn't exist, we only show the URI in terminal states if the deck file + // is actually present. switch pluginTrns.pInfo.Phase() { case pluginCore.PhaseSuccess: // This is to prevent the console from potentially checking the deck URI that does not exist if in final phase(PhaseSuccess). - err = pluginTrns.RemoveNonexistentDeckURI(ctx, tCtx) + if !deckEnabled { + err = pluginTrns.AddDeckURIIfDeckExists(ctx, tCtx) + } if err != nil { return pluginTrns, err } @@ -559,7 +602,9 @@ func (t Handler) invokePlugin(ctx context.Context, p pluginCore.Plugin, tCtx *ta case pluginCore.PhasePermanentFailure: // This is to prevent the console from potentially checking the deck URI that does not exist if in final // phase(PhaseFailure). - err = pluginTrns.RemoveNonexistentDeckURI(ctx, tCtx) + if !deckEnabled { + err = pluginTrns.AddDeckURIIfDeckExists(ctx, tCtx) + } if err != nil { return pluginTrns, err }