Skip to content

Commit

Permalink
DeckStatus
Browse files Browse the repository at this point in the history
Signed-off-by: Future-Outlier <[email protected]>
  • Loading branch information
Future-Outlier committed Dec 18, 2024
1 parent 4e24e91 commit 8d1d0e4
Showing 1 changed file with 31 additions and 9 deletions.
40 changes: 31 additions & 9 deletions flytepropeller/pkg/controller/nodes/task/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"fmt"
"runtime/debug"
"strconv"
"strings"
"time"

Expand Down Expand Up @@ -43,6 +44,14 @@ import (
const pluginContextKey = contextutils.Key("plugin")
const FLYTE_ENABLE_DECK = string("FLYTE_ENABLE_DECK")

type DeckStatus int

const (
DeckUnknown DeckStatus = iota
DeckEnabled
DeckDisabled
)

type metrics struct {
pluginPanics labeled.Counter
unsupportedTaskType labeled.Counter
Expand Down Expand Up @@ -423,19 +432,32 @@ func (t Handler) fetchPluginTaskMetrics(pluginID, taskType string) (*taskMetrics
return t.taskMetricsMap[metricNameKey], nil
}

func IsDeckEnabled(ctx context.Context, tCtx *taskExecutionContext) (bool, error) {
func IsDeckEnabled(ctx context.Context, tCtx *taskExecutionContext) (DeckStatus, error) {
template, err := tCtx.tr.Read(ctx)
if err != nil {
return false, regErrors.Wrapf(err, "failed to read task template")
return DeckUnknown, regErrors.Wrapf(err, "failed to read task template")
}

templateConfig := template.GetConfig()
if templateConfig == nil {
return false, nil
return DeckUnknown, nil
}

rawValue, ok := templateConfig[FLYTE_ENABLE_DECK]
if !ok {
return DeckUnknown, nil
}

deckEnabled := strings.ToLower(templateConfig[FLYTE_ENABLE_DECK])
return deckEnabled == "1" || deckEnabled == "t" || deckEnabled == "true", nil
rawValue = strings.ToLower(rawValue)
deckEnabled, err := strconv.ParseBool(rawValue)
if err != nil {
return DeckUnknown, nil
}

if deckEnabled {
return DeckEnabled, nil
}
return DeckDisabled, nil
}

func (t Handler) invokePlugin(ctx context.Context, p pluginCore.Plugin, tCtx *taskExecutionContext, ts handler.TaskNodeState) (*pluginRequestedTransition, error) {
Expand Down Expand Up @@ -526,11 +548,11 @@ func (t Handler) invokePlugin(ctx context.Context, p pluginCore.Plugin, tCtx *ta
// The deck should be accessible even if the task is still running or has failed.
// It's possible that the deck URI may not exist in remote storage yet or will never exist.
// So, it is console's responsibility to handle the case when the deck URI actually does not exist.
deckEnabled, err := IsDeckEnabled(ctx, tCtx)
deckStatus, err := IsDeckEnabled(ctx, tCtx)
if err != nil {
return nil, err
}
if deckEnabled {
if deckStatus == DeckEnabled {
pluginTrns.AddDeckURI(tCtx)
}

Expand All @@ -551,7 +573,7 @@ func (t Handler) invokePlugin(ctx context.Context, p pluginCore.Plugin, tCtx *ta
switch pluginTrns.pInfo.Phase() {
case pluginCore.PhaseSuccess:
// This is for backward compatibility with older Flytekit versions.
if !deckEnabled {
if deckStatus == DeckUnknown {
err = pluginTrns.AddDeckURIIfDeckExists(ctx, tCtx)
}
if err != nil {
Expand Down Expand Up @@ -601,7 +623,7 @@ func (t Handler) invokePlugin(ctx context.Context, p pluginCore.Plugin, tCtx *ta
fallthrough
case pluginCore.PhasePermanentFailure:
// This is for backward compatibility with older Flytekit versions.
if !deckEnabled {
if deckStatus == DeckUnknown {
err = pluginTrns.AddDeckURIIfDeckExists(ctx, tCtx)
}
if err != nil {
Expand Down

0 comments on commit 8d1d0e4

Please sign in to comment.