Skip to content

Commit

Permalink
feat(propeller): Add new config to disable fallback to container task…
Browse files Browse the repository at this point in the history
… handler

Resolves: #5076
Signed-off-by: Chi-Sheng Liu <[email protected]>
  • Loading branch information
MortalHappiness committed Apr 4, 2024
1 parent f1c2231 commit 862f846
Show file tree
Hide file tree
Showing 7 changed files with 659 additions and 24 deletions.
2 changes: 2 additions & 0 deletions flyte-single-binary-local.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@ tasks:
default-for-task-types:
- container: container
- container_array: K8S-ARRAY
- sidecar: sidecar
fallback-to-container-handler: true

plugins:
logs:
Expand Down
2 changes: 2 additions & 0 deletions flytepropeller/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,12 @@ require (
github.com/go-redis/redis v6.15.7+incompatible
github.com/go-test/deep v1.0.7
github.com/golang/protobuf v1.5.3
github.com/google/martian v2.1.0+incompatible
github.com/google/uuid v1.6.0
github.com/grpc-ecosystem/go-grpc-middleware v1.3.0
github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0
github.com/imdario/mergo v0.3.13
github.com/jinzhu/copier v0.3.5
github.com/magiconair/properties v1.8.6
github.com/mitchellh/mapstructure v1.5.0
github.com/pkg/errors v0.9.1
Expand Down
11 changes: 8 additions & 3 deletions flytepropeller/pkg/controller/nodes/task/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,17 @@ import (
"github.com/flyteorg/flyte/flytestdlib/logger"
)

//go:generate pflags Config
//go:generate pflags Config --default-var=defaultConfig

const SectionKey = "tasks"

var (
defaultConfig = &Config{
TaskPlugins: TaskPluginConfig{EnabledPlugins: []string{}, DefaultForTaskTypes: map[string]string{}},
TaskPlugins: TaskPluginConfig{
EnabledPlugins: []string{},
DefaultForTaskTypes: map[string]string{},
FallbackToContainerHandler: true,
},
MaxPluginPhaseVersions: 100000,
BackOffConfig: BackOffConfig{
BaseSecond: 2,
Expand All @@ -39,7 +43,8 @@ type Config struct {
type TaskPluginConfig struct {
EnabledPlugins []string `json:"enabled-plugins" pflag:",Plugins enabled currently"`
// Maps task types to their plugin handler (by ID).
DefaultForTaskTypes map[string]string `json:"default-for-task-types" pflag:"-,"`
DefaultForTaskTypes map[string]string `json:"default-for-task-types" pflag:"-,"`
FallbackToContainerHandler bool `json:"fallback-to-container-handler" pflag:",Fallback to container handler if a task does not have a registered plugin handler. Defaults to true"`
}

type BackOffConfig struct {
Expand Down
3 changes: 3 additions & 0 deletions flytepropeller/pkg/controller/nodes/task/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -337,6 +337,9 @@ func (t Handler) ResolvePlugin(ctx context.Context, ttype string, executionConfi
logger.Debugf(ctx, "Plugin [%s] resolved for Handler type [%s]", p.GetID(), ttype)
return p, nil
}
if !t.cfg.TaskPlugins.FallbackToContainerHandler {
return nil, fmt.Errorf("no plugin defined for Handler type [%s] and fallback-to-container-handler is set to false", ttype)
}
if t.defaultPlugin != nil {
logger.Warnf(ctx, "No plugin found for Handler-type [%s], defaulting to [%s]", ttype, t.defaultPlugin.GetID())
return t.defaultPlugin, nil
Expand Down
76 changes: 64 additions & 12 deletions flytepropeller/pkg/controller/nodes/task/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ import (
"testing"

"github.com/golang/protobuf/proto"
"github.com/google/martian/log"
"github.com/jinzhu/copier"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
v1 "k8s.io/api/core/v1"
Expand Down Expand Up @@ -282,31 +284,70 @@ func Test_task_ResolvePlugin(t *testing.T) {
plugins map[pluginCore.TaskType]pluginCore.Plugin
defaultPlugin pluginCore.Plugin
pluginsForType map[pluginCore.TaskType]map[pluginID]pluginCore.Plugin
cfg *config.Config
}
type args struct {
ttype string
executionConfig v1alpha1.ExecutionConfig
}
cfg := config.GetConfig()
cfgNoFallbackContainerHandler := &config.Config{}
if err := copier.CopyWithOption(cfgNoFallbackContainerHandler, cfg, copier.Option{DeepCopy: true}); err != nil {
log.Errorf("Failed to copy config")
return
}
cfgNoFallbackContainerHandler.TaskPlugins.FallbackToContainerHandler = false

tests := []struct {
name string
fields fields
args args
want string
wantErr bool
}{
{"no-plugins", fields{}, args{}, "", true},
{"default",
{
"no-plugins",
fields{
cfg: cfg,
},
args{},
"",
true,
},
{
"no-plugins-no-fallback-container-handler",
fields{
cfg: cfgNoFallbackContainerHandler,
},
args{},
"",
true,
},
{
"default",
fields{
defaultPlugin: defaultPlugin,
}, args{ttype: someID}, defaultID, false},
{"actual",
cfg: cfg,
},
args{ttype: someID},
defaultID,
false,
},
{
"actual",
fields{
plugins: map[pluginCore.TaskType]pluginCore.Plugin{
someID: somePlugin,
},
defaultPlugin: defaultPlugin,
}, args{ttype: someID}, someID, false},
{"override",
cfg: cfg,
},
args{ttype: someID},
someID,
false,
},
{
"override",
fields{
plugins: make(map[pluginCore.TaskType]pluginCore.Plugin),
defaultPlugin: defaultPlugin,
Expand All @@ -315,21 +356,30 @@ func Test_task_ResolvePlugin(t *testing.T) {
someID: somePlugin,
},
},
}, args{ttype: someID, executionConfig: v1alpha1.ExecutionConfig{
TaskPluginImpls: map[string]v1alpha1.TaskPluginOverride{
someID: {
PluginIDs: []string{someID},
MissingPluginBehavior: admin.PluginOverride_FAIL,
cfg: cfg,
},
args{
ttype: someID,
executionConfig: v1alpha1.ExecutionConfig{
TaskPluginImpls: map[string]v1alpha1.TaskPluginOverride{
someID: {
PluginIDs: []string{someID},
MissingPluginBehavior: admin.PluginOverride_FAIL,
},
},
},
}}, someID, false},
},
someID,
false,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
tk := Handler{
defaultPlugins: tt.fields.plugins,
defaultPlugin: tt.fields.defaultPlugin,
pluginsForType: tt.fields.pluginsForType,
cfg: tt.fields.cfg,
}
got, err := tk.ResolvePlugin(context.TODO(), tt.args.ttype, tt.args.executionConfig)
if (err != nil) != tt.wantErr {
Expand Down Expand Up @@ -889,6 +939,7 @@ func Test_task_Abort(t *testing.T) {
tk := Handler{
defaultPlugin: m,
resourceManager: noopRm,
cfg: config.GetConfig(),
}
nCtx := createNodeCtx(tt.args.ev)
if err := tk.Abort(context.TODO(), nCtx, "reason"); (err != nil) != tt.wantErr {
Expand Down Expand Up @@ -1051,6 +1102,7 @@ func Test_task_Abort_v1(t *testing.T) {
tk := Handler{
defaultPlugin: m,
resourceManager: noopRm,
cfg: config.GetConfig(),
}
nCtx := createNodeCtx(tt.args.ev)
if err := tk.Abort(context.TODO(), nCtx, "reason"); (err != nil) != tt.wantErr {
Expand Down
44 changes: 39 additions & 5 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,9 @@ go 1.21
require (
github.com/flyteorg/flyte/datacatalog v0.0.0-00010101000000-000000000000
github.com/flyteorg/flyte/flyteadmin v0.0.0-00010101000000-000000000000
github.com/flyteorg/flyte/flytepropeller v0.0.0-00010101000000-000000000000
github.com/flyteorg/flyte/flytestdlib v0.0.0-00010101000000-000000000000
github.com/flyteorg/flyte/flytepropeller v1.9.12
github.com/flyteorg/flyte/flytestdlib v1.9.12
github.com/flyteorg/flytectl v0.8.14
github.com/golang/glog v1.2.0
github.com/prometheus/client_golang v1.16.0
github.com/spf13/cobra v1.7.0
Expand All @@ -27,11 +28,16 @@ require (
github.com/Azure/azure-sdk-for-go/sdk/azidentity v1.3.1 // indirect
github.com/Azure/azure-sdk-for-go/sdk/internal v1.3.0 // indirect
github.com/Azure/azure-sdk-for-go/sdk/storage/azblob v1.1.0 // indirect
github.com/Azure/go-ansiterm v0.0.0-20210617225240-d185dfc1b5a1 // indirect
github.com/AzureAD/microsoft-authentication-library-for-go v1.2.0 // indirect
github.com/GoogleCloudPlatform/spark-on-k8s-operator v0.0.0-20200723154620-6f35a1152625 // indirect
github.com/Microsoft/go-winio v0.5.0 // indirect
github.com/NYTimes/gizmo v1.3.6 // indirect
github.com/Shopify/sarama v1.26.4 // indirect
github.com/apoorvam/goterminal v0.0.0-20180523175556-614d345c47e5 // indirect
github.com/asaskevich/govalidator v0.0.0-20200428143746-21a406dcc535 // indirect
github.com/avast/retry-go v3.0.0+incompatible // indirect
github.com/awalterschulze/gographviz v2.0.3+incompatible // indirect
github.com/aws/aws-sdk-go v1.44.2 // indirect
github.com/aws/aws-sdk-go-v2 v1.2.0 // indirect
github.com/aws/aws-sdk-go-v2/config v1.0.0 // indirect
Expand All @@ -50,21 +56,31 @@ require (
github.com/cloudevents/sdk-go/binding/format/protobuf/v2 v2.14.0 // indirect
github.com/cloudevents/sdk-go/protocol/kafka_sarama/v2 v2.8.0 // indirect
github.com/cloudevents/sdk-go/v2 v2.15.2 // indirect
github.com/containerd/containerd v1.5.10 // indirect
github.com/coocood/freecache v1.1.1 // indirect
github.com/coreos/go-oidc/v3 v3.6.0 // indirect
github.com/cpuguy83/go-md2man/v2 v2.0.2 // indirect
github.com/danieljoos/wincred v1.1.0 // indirect
github.com/dask/dask-kubernetes/v2023 v2023.0.0-20230626103304-abd02cd17b26 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/decred/dcrd/dcrec/secp256k1/v4 v4.2.0 // indirect
github.com/dgraph-io/ristretto v0.0.3 // indirect
github.com/disiqueira/gotree v1.0.0 // indirect
github.com/docker/distribution v2.8.0+incompatible // indirect
github.com/docker/docker v20.10.7+incompatible // indirect
github.com/docker/go-connections v0.4.0 // indirect
github.com/docker/go-units v0.4.0 // indirect
github.com/dustin/go-humanize v1.0.1 // indirect
github.com/eapache/go-resiliency v1.2.0 // indirect
github.com/eapache/go-xerial-snappy v0.0.0-20180814174437-776d5712da21 // indirect
github.com/eapache/queue v1.1.0 // indirect
github.com/emicklei/go-restful/v3 v3.12.0 // indirect
github.com/enescakir/emoji v1.0.0 // indirect
github.com/evanphx/json-patch v5.6.0+incompatible // indirect
github.com/evanphx/json-patch/v5 v5.9.0 // indirect
github.com/fatih/color v1.13.0 // indirect
github.com/felixge/httpsnoop v1.0.4 // indirect
github.com/flyteorg/flyte/flyteidl v0.0.0-00010101000000-000000000000 // indirect
github.com/flyteorg/flyte/flyteidl v1.9.12 // indirect
github.com/flyteorg/flyte/flyteplugins v0.0.0-00010101000000-000000000000 // indirect
github.com/flyteorg/stow v0.3.10 // indirect
github.com/fsnotify/fsnotify v1.6.0 // indirect
Expand All @@ -76,9 +92,11 @@ require (
github.com/go-openapi/jsonpointer v0.19.6 // indirect
github.com/go-openapi/jsonreference v0.20.2 // indirect
github.com/go-openapi/swag v0.22.3 // indirect
github.com/go-ozzo/ozzo-validation/v4 v4.3.0 // indirect
github.com/go-redis/redis v6.15.7+incompatible // indirect
github.com/go-test/deep v1.0.7 // indirect
github.com/goccy/go-json v0.10.2 // indirect
github.com/godbus/dbus/v5 v5.0.4 // indirect
github.com/gofrs/uuid v4.2.0+incompatible // indirect
github.com/gogo/protobuf v1.3.2 // indirect
github.com/golang-jwt/jwt/v4 v4.5.0 // indirect
Expand All @@ -88,6 +106,8 @@ require (
github.com/golang/snappy v0.0.4 // indirect
github.com/google/gnostic-models v0.6.8 // indirect
github.com/google/go-cmp v0.6.0 // indirect
github.com/google/go-github/v42 v42.0.0 // indirect
github.com/google/go-querystring v1.1.0 // indirect
github.com/google/gofuzz v1.2.0 // indirect
github.com/google/s2a-go v0.1.7 // indirect
github.com/google/uuid v1.6.0 // indirect
Expand All @@ -101,8 +121,10 @@ require (
github.com/grpc-ecosystem/grpc-gateway/v2 v2.19.0 // indirect
github.com/gtank/cryptopasta v0.0.0-20170601214702-1f550f6f2f69 // indirect
github.com/hashicorp/go-uuid v1.0.2 // indirect
github.com/hashicorp/go-version v1.3.0 // indirect
github.com/hashicorp/golang-lru v0.5.4 // indirect
github.com/hashicorp/hcl v1.0.0 // indirect
github.com/hexops/gotextdiff v1.0.3 // indirect
github.com/imdario/mergo v0.3.13 // indirect
github.com/inconshreveable/mousetrap v1.1.0 // indirect
github.com/jackc/chunkreader/v2 v2.0.1 // indirect
Expand All @@ -119,11 +141,14 @@ require (
github.com/jmespath/go-jmespath v0.4.0 // indirect
github.com/josharian/intern v1.0.0 // indirect
github.com/json-iterator/go v1.1.12 // indirect
github.com/kardianos/osext v0.0.0-20190222173326-2bc1f35cddc0 // indirect
github.com/kataras/tablewriter v0.0.0-20180708051242-e063d29b7c23 // indirect
github.com/kelseyhightower/envconfig v1.4.0 // indirect
github.com/klauspost/compress v1.9.8 // indirect
github.com/klauspost/compress v1.11.13 // indirect
github.com/kubeflow/common v0.4.3 // indirect
github.com/kubeflow/training-operator v1.5.0-rc.0 // indirect
github.com/kylelemons/godebug v1.1.0 // indirect
github.com/landoop/tableprinter v0.0.0-20180806200924-8bd8c2576d27 // indirect
github.com/lestrrat-go/backoff/v2 v2.0.8 // indirect
github.com/lestrrat-go/blackmagic v1.0.2 // indirect
github.com/lestrrat-go/httpcc v1.0.1 // indirect
Expand All @@ -134,16 +159,21 @@ require (
github.com/mailru/easyjson v0.7.7 // indirect
github.com/mattn/go-colorable v0.1.12 // indirect
github.com/mattn/go-isatty v0.0.16 // indirect
github.com/mattn/go-runewidth v0.0.13 // indirect
github.com/mattn/go-sqlite3 v2.0.3+incompatible // indirect
github.com/mattn/goveralls v0.0.6 // indirect
github.com/matttproud/golang_protobuf_extensions v1.0.4 // indirect
github.com/mitchellh/mapstructure v1.5.0 // indirect
github.com/moby/term v0.0.0-20221205130635-1aeaba878587 // indirect
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
github.com/modern-go/reflect2 v1.0.2 // indirect
github.com/mohae/deepcopy v0.0.0-20170929034955-c48cc78d4826 // indirect
github.com/morikuni/aec v1.0.0 // indirect
github.com/mouuff/go-rocket-update v1.5.1 // indirect
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect
github.com/ncw/swift v1.0.53 // indirect
github.com/nxadm/tail v1.4.11 // indirect
github.com/opencontainers/go-digest v1.0.0 // indirect
github.com/opencontainers/image-spec v1.0.2 // indirect
github.com/ory/fosite v0.42.2 // indirect
github.com/ory/go-acc v0.2.6 // indirect
github.com/ory/go-convenience v0.1.0 // indirect
Expand All @@ -161,7 +191,9 @@ require (
github.com/prometheus/procfs v0.10.1 // indirect
github.com/ray-project/kuberay/ray-operator v1.1.0-rc.1 // indirect
github.com/rcrowley/go-metrics v0.0.0-20200313005456-10cdbea86bc0 // indirect
github.com/rivo/uniseg v0.2.0 // indirect
github.com/robfig/cron/v3 v3.0.0 // indirect
github.com/russross/blackfriday/v2 v2.1.0 // indirect
github.com/sendgrid/rest v2.6.8+incompatible // indirect
github.com/sendgrid/sendgrid-go v3.10.0+incompatible // indirect
github.com/sirupsen/logrus v1.9.3 // indirect
Expand All @@ -177,6 +209,8 @@ require (
github.com/tidwall/pretty v1.2.0 // indirect
github.com/tidwall/sjson v1.2.5 // indirect
github.com/wI2L/jsondiff v0.5.0 // indirect
github.com/yalp/jsonpath v0.0.0-20180802001716-5cc68e5049a0 // indirect
github.com/zalando/go-keyring v0.1.1 // indirect
go.opencensus.io v0.24.0 // indirect
go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.46.1 // indirect
go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.46.1 // indirect
Expand Down
Loading

0 comments on commit 862f846

Please sign in to comment.